# encoding: utf-8
# pylint: disable=no-member
# pylint: disable=invalid-name
# pylint: disable=too-many-arguments
"""
This module contains spectrogram related functionality.
"""
from __future__ import absolute_import, division, print_function
import inspect
import numpy as np
from ..processors import Processor, SequentialProcessor, BufferProcessor
from .filters import (Filterbank, LogarithmicFilterbank, NUM_BANDS, FMIN, FMAX,
A4, NORM_FILTERS, UNIQUE_FILTERS)
[docs]def spec(stft):
"""
Computes the magnitudes of the complex Short Time Fourier Transform of a
signal.
Parameters
----------
stft : numpy array
Complex STFT of a signal.
Returns
-------
spec : numpy array
Magnitude spectrogram.
"""
return np.abs(stft)
# magnitude spectrogram of STFT
[docs]class Spectrogram(np.ndarray):
"""
A :class:`Spectrogram` represents the magnitude spectrogram of a
:class:`.audio.stft.ShortTimeFourierTransform`.
Parameters
----------
stft : :class:`.audio.stft.ShortTimeFourierTransform` instance
Short Time Fourier Transform.
kwargs : dict, optional
If no :class:`.audio.stft.ShortTimeFourierTransform` instance was
given, one is instantiated with these additional keyword arguments.
Examples
--------
Create a :class:`Spectrogram` from a
:class:`.audio.stft.ShortTimeFourierTransform` (or anything it can be
instantiated from:
>>> spec = Spectrogram('tests/data/audio/sample.wav')
>>> spec # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
Spectrogram([[ 3.15249, 4.00272, ..., 0.03634, 0.03671],
[ 4.28429, 2.85158, ..., 0.0219 , 0.02227],
...,
[ 4.92274, 10.27775, ..., 0.00607, 0.00593],
[ 9.22709, 9.6387 , ..., 0.00981, 0.00984]], dtype=float32)
"""
# pylint: disable=super-on-old-class
# pylint: disable=super-init-not-called
# pylint: disable=attribute-defined-outside-init
def __init__(self, stft, **kwargs):
# this method is for documentation purposes only
pass
def __new__(cls, stft, **kwargs):
from .stft import ShortTimeFourierTransform
# check stft type
if isinstance(stft, Spectrogram):
# already a Spectrogram
data = stft
elif isinstance(stft, ShortTimeFourierTransform):
# take the abs of the STFT
data = np.abs(stft)
else:
# try to instantiate a ShortTimeFourierTransform
stft = ShortTimeFourierTransform(stft, **kwargs)
# take the abs of the STFT
data = np.abs(stft)
# cast as Spectrogram
obj = np.asarray(data).view(cls)
# save additional attributes
obj.stft = stft
# return the object
return obj
def __array_finalize__(self, obj):
if obj is None:
return
# set default values here, also needed for views
self.stft = getattr(obj, 'stft', None)
@property
def num_frames(self):
"""Number of frames."""
return len(self)
@property
def num_bins(self):
"""Number of bins."""
return int(self.shape[1])
@property
def bin_frequencies(self):
"""Bin frequencies."""
return self.stft.bin_frequencies
[docs] def diff(self, **kwargs):
"""
Return the difference of the magnitude spectrogram.
Parameters
----------
kwargs : dict
Keyword arguments passed to :class:`SpectrogramDifference`.
Returns
-------
diff : :class:`SpectrogramDifference` instance
The differences of the magnitude spectrogram.
"""
return SpectrogramDifference(self, **kwargs)
[docs] def filter(self, **kwargs):
"""
Return a filtered version of the magnitude spectrogram.
Parameters
----------
kwargs : dict
Keyword arguments passed to :class:`FilteredSpectrogram`.
Returns
-------
filt_spec : :class:`FilteredSpectrogram` instance
Filtered version of the magnitude spectrogram.
"""
return FilteredSpectrogram(self, **kwargs)
[docs] def log(self, **kwargs):
"""
Return a logarithmically scaled version of the magnitude spectrogram.
Parameters
----------
kwargs : dict
Keyword arguments passed to :class:`LogarithmicSpectrogram`.
Returns
-------
log_spec : :class:`LogarithmicSpectrogram` instance
Logarithmically scaled version of the magnitude spectrogram.
"""
return LogarithmicSpectrogram(self, **kwargs)
[docs]class SpectrogramProcessor(Processor):
"""
SpectrogramProcessor class.
"""
def __init__(self, **kwargs):
pass
[docs] def process(self, data, **kwargs):
"""
Create a Spectrogram from the given data.
Parameters
----------
data : numpy array
Data to be processed.
kwargs : dict
Keyword arguments passed to :class:`Spectrogram`.
Returns
-------
spec : :class:`Spectrogram` instance
Spectrogram.
"""
return Spectrogram(data, **kwargs)
# filtered spectrogram stuff
FILTERBANK = LogarithmicFilterbank
[docs]class FilteredSpectrogram(Spectrogram):
"""
FilteredSpectrogram class.
Parameters
----------
spectrogram : :class:`Spectrogram` instance
Spectrogram.
filterbank : :class:`.audio.filters.Filterbank`, optional
Filterbank class or instance; if a class is given (rather than an
instance), one will be created with the given type and parameters.
num_bands : int, optional
Number of filter bands (per octave, depending on the type of the
`filterbank`).
fmin : float, optional
Minimum frequency of the filterbank [Hz].
fmax : float, optional
Maximum frequency of the filterbank [Hz].
fref : float, optional
Tuning frequency of the filterbank [Hz].
norm_filters : bool, optional
Normalize the filter bands of the filterbank to area 1.
unique_filters : bool, optional
Indicate if the filterbank should contain only unique filters, i.e.
remove duplicate filters resulting from insufficient resolution at
low frequencies.
kwargs : dict, optional
If no :class:`Spectrogram` instance was given, one is instantiated
with these additional keyword arguments.
Examples
--------
Create a :class:`FilteredSpectrogram` from a :class:`Spectrogram` (or
anything it can be instantiated from. Per default a
:class:`.madmom.audio.filters.LogarithmicFilterbank` with 12 bands per
octave is used.
>>> spec = FilteredSpectrogram('tests/data/audio/sample.wav')
>>> spec # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
FilteredSpectrogram([[ 5.66156, 6.30141, ..., 0.05426, 0.06461],
[ 8.44266, 8.69582, ..., 0.07703, 0.0902 ],
...,
[ 10.04626, 1.12018, ..., 0.0487 , 0.04282],
[ 8.60186, 6.81195, ..., 0.03721, 0.03371]],
dtype=float32)
The resulting spectrogram has fewer frequency bins, with the centers of
the bins aligned logarithmically (lower frequency bins still have a linear
spacing due to the coarse resolution of the DFT at low frequencies):
>>> spec.shape
(281, 81)
>>> spec.num_bins
81
>>> spec.bin_frequencies # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
array([ 43.06641, 64.59961, 86.13281, 107.66602,
129.19922, 150.73242, 172.26562, 193.79883, ...,
10551.26953, 11175.73242, 11843.26172, 12553.85742,
13285.98633, 14082.71484, 14922.50977, 15805.37109])
The filterbank used to filter the spectrogram is saved as an attribute:
>>> spec.filterbank # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
LogarithmicFilterbank([[ 0., 0., ..., 0., 0.],
[ 0., 0., ..., 0., 0.],
...,
[ 0., 0., ..., 0., 0.],
[ 0., 0., ..., 0., 0.]], dtype=float32)
>>> spec.filterbank.num_bands
81
The filterbank can be chosen at instantiation time:
>>> from madmom.audio.filters import MelFilterbank
>>> spec = FilteredSpectrogram('tests/data/audio/sample.wav', \
filterbank=MelFilterbank, num_bands=40)
>>> type(spec.filterbank)
<class 'madmom.audio.filters.MelFilterbank'>
>>> spec.shape
(281, 40)
"""
# pylint: disable=super-on-old-class
# pylint: disable=super-init-not-called
# pylint: disable=attribute-defined-outside-init
def __init__(self, spectrogram, filterbank=FILTERBANK, num_bands=NUM_BANDS,
fmin=FMIN, fmax=FMAX, fref=A4, norm_filters=NORM_FILTERS,
unique_filters=UNIQUE_FILTERS, **kwargs):
# this method is for documentation purposes only
pass
def __new__(cls, spectrogram, filterbank=FILTERBANK, num_bands=NUM_BANDS,
fmin=FMIN, fmax=FMAX, fref=A4, norm_filters=NORM_FILTERS,
unique_filters=UNIQUE_FILTERS, **kwargs):
# pylint: disable=unused-argument
# instantiate a Spectrogram if needed
if not isinstance(spectrogram, Spectrogram):
# try to instantiate a Spectrogram object
spectrogram = Spectrogram(spectrogram, **kwargs)
# instantiate a Filterbank if needed
if inspect.isclass(filterbank) and issubclass(filterbank, Filterbank):
# a Filterbank class is given, create a filterbank of this type
filterbank = filterbank(spectrogram.bin_frequencies,
num_bands=num_bands, fmin=fmin, fmax=fmax,
fref=fref, norm_filters=norm_filters,
unique_filters=unique_filters)
if not isinstance(filterbank, Filterbank):
raise TypeError('not a Filterbank type or instance: %s' %
filterbank)
# filter the spectrogram
data = np.dot(spectrogram, filterbank)
# cast as FilteredSpectrogram
obj = np.asarray(data).view(cls)
# save additional attributes
obj.filterbank = filterbank
# and those from the given spectrogram
obj.stft = spectrogram.stft
# return the object
return obj
def __array_finalize__(self, obj):
if obj is None:
return
# set default values here, also needed for views
self.stft = getattr(obj, 'stft', None)
self.filterbank = getattr(obj, 'filterbank', None)
@property
def bin_frequencies(self):
"""Bin frequencies."""
# use the center frequencies of the filterbank as bin_frequencies
return self.filterbank.center_frequencies
[docs]class FilteredSpectrogramProcessor(Processor):
"""
FilteredSpectrogramProcessor class.
Parameters
----------
filterbank : :class:`.audio.filters.Filterbank`
Filterbank used to filter a spectrogram.
num_bands : int
Number of bands (per octave).
fmin : float, optional
Minimum frequency of the filterbank [Hz].
fmax : float, optional
Maximum frequency of the filterbank [Hz].
fref : float, optional
Tuning frequency of the filterbank [Hz].
norm_filters : bool, optional
Normalize the filter of the filterbank to area 1.
unique_filters : bool, optional
Indicate if the filterbank should contain only unique filters, i.e.
remove duplicate filters resulting from insufficient resolution at
low frequencies.
"""
def __init__(self, filterbank=FILTERBANK, num_bands=NUM_BANDS, fmin=FMIN,
fmax=FMAX, fref=A4, norm_filters=NORM_FILTERS,
unique_filters=UNIQUE_FILTERS, **kwargs):
# pylint: disable=unused-argument
self.filterbank = filterbank
self.num_bands = num_bands
self.fmin = fmin
self.fmax = fmax
self.fref = fref
self.norm_filters = norm_filters
self.unique_filters = unique_filters
[docs] def process(self, data, **kwargs):
"""
Create a FilteredSpectrogram from the given data.
Parameters
----------
data : numpy array
Data to be processed.
kwargs : dict
Keyword arguments passed to :class:`FilteredSpectrogram`.
Returns
-------
filt_spec : :class:`FilteredSpectrogram` instance
Filtered spectrogram.
"""
# update arguments passed to FilteredSpectrogram
args = dict(filterbank=self.filterbank, num_bands=self.num_bands,
fmin=self.fmin, fmax=self.fmax, fref=self.fref,
norm_filters=self.norm_filters,
unique_filters=self.unique_filters)
args.update(kwargs)
# instantiate a FilteredSpectrogram and return it
data = FilteredSpectrogram(data, **args)
# cache the filterbank
self.filterbank = data.filterbank
return data
# logarithmic spectrogram stuff
LOG = np.log10
MUL = 1.
ADD = 1.
[docs]class LogarithmicSpectrogram(Spectrogram):
"""
LogarithmicSpectrogram class.
Parameters
----------
spectrogram : :class:`Spectrogram` instance
Spectrogram.
log : numpy ufunc, optional
Logarithmic scaling function to apply.
mul : float, optional
Multiply the magnitude spectrogram with this factor before taking
the logarithm.
add : float, optional
Add this value before taking the logarithm of the magnitudes.
kwargs : dict, optional
If no :class:`Spectrogram` instance was given, one is instantiated
with these additional keyword arguments.
Examples
--------
Create a :class:`LogarithmicSpectrogram` from a :class:`Spectrogram` (or
anything it can be instantiated from. Per default `np.log10` is used as
the scaling function and a value of 1 is added to avoid negative values.
>>> spec = LogarithmicSpectrogram('tests/data/audio/sample.wav')
>>> spec # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
LogarithmicSpectrogram([[...]], dtype=float32)
>>> spec.min()
LogarithmicSpectrogram(1.604927092557773e-06, dtype=float32)
"""
# pylint: disable=super-on-old-class
# pylint: disable=super-init-not-called
# pylint: disable=attribute-defined-outside-init
def __init__(self, spectrogram, log=LOG, mul=MUL, add=ADD, **kwargs):
# this method is for documentation purposes only
pass
def __new__(cls, spectrogram, log=LOG, mul=MUL, add=ADD, **kwargs):
# instantiate a Spectrogram if needed
if not isinstance(spectrogram, Spectrogram):
# try to instantiate a Spectrogram object
spectrogram = Spectrogram(spectrogram, **kwargs)
data = spectrogram
else:
# make a copy of the spectrogram
data = spectrogram.copy()
# scale the spectrogram
if mul is not None:
data *= mul
if add is not None:
data += add
if log is not None:
log(data, data)
# cast as FilteredSpectrogram
obj = np.asarray(data).view(cls)
# save additional attributes
obj.mul = mul
obj.add = add
# and those from the given spectrogram
obj.stft = spectrogram.stft
obj.spectrogram = spectrogram
# return the object
return obj
def __array_finalize__(self, obj):
if obj is None:
return
# set default values here, also needed for views
self.stft = getattr(obj, 'stft', None)
self.spectrogram = getattr(obj, 'spectrogram', None)
self.mul = getattr(obj, 'mul', MUL)
self.add = getattr(obj, 'add', ADD)
@property
def filterbank(self):
"""Filterbank."""
return self.spectrogram.filterbank
@property
def bin_frequencies(self):
"""Bin frequencies."""
return self.spectrogram.bin_frequencies
[docs]class LogarithmicSpectrogramProcessor(Processor):
"""
Logarithmic Spectrogram Processor class.
Parameters
----------
log : numpy ufunc, optional
Loagrithmic scaling function to apply.
mul : float, optional
Multiply the magnitude spectrogram with this factor before taking the
logarithm.
add : float, optional
Add this value before taking the logarithm of the magnitudes.
"""
def __init__(self, log=LOG, mul=MUL, add=ADD, **kwargs):
# pylint: disable=unused-argument
self.log = log
self.mul = mul
self.add = add
[docs] def process(self, data, **kwargs):
"""
Perform logarithmic scaling of a spectrogram.
Parameters
----------
data : numpy array
Data to be processed.
kwargs : dict
Keyword arguments passed to :class:`LogarithmicSpectrogram`.
Returns
-------
log_spec : :class:`LogarithmicSpectrogram` instance
Logarithmically scaled spectrogram.
"""
# update arguments passed to LogarithmicSpectrogram
args = dict(log=self.log, mul=self.mul, add=self.add)
args.update(kwargs)
# instantiate a LogarithmicSpectrogram
return LogarithmicSpectrogram(data, **args)
@staticmethod
[docs] def add_arguments(parser, log=None, mul=None, add=None):
"""
Add spectrogram scaling related arguments to an existing parser.
Parameters
----------
parser : argparse parser instance
Existing argparse parser object.
log : bool, optional
Take the logarithm of the spectrogram.
mul : float, optional
Multiply the magnitude spectrogram with this factor before taking
the logarithm.
add : float, optional
Add this value before taking the logarithm of the magnitudes.
Returns
-------
argparse argument group
Spectrogram scaling argument parser group.
Notes
-----
Parameters are included in the group only if they are not 'None'.
"""
# add log related options to the existing parser
g = parser.add_argument_group('magnitude scaling arguments')
# log
if log is True:
g.add_argument('--linear', dest='log', action='store_const',
const=None, default=LOG,
help='linear magnitudes [default=logarithmic]')
elif log is False:
g.add_argument('--log', action='store_const',
const=LOG, default=None,
help='logarithmic magnitudes [default=linear]')
# mul
if mul is not None:
g.add_argument('--mul', action='store', type=float,
default=mul, help='multiplier (before taking '
'the log) [default=%(default)i]')
# add
if add is not None:
g.add_argument('--add', action='store', type=float,
default=add, help='value added (before taking '
'the log) [default=%(default)i]')
# return the group
return g
# logarithmic filtered spectrogram class
[docs]class LogarithmicFilteredSpectrogram(LogarithmicSpectrogram,
FilteredSpectrogram):
"""
LogarithmicFilteredSpectrogram class.
Parameters
----------
spectrogram : :class:`FilteredSpectrogram` instance
Filtered spectrogram.
kwargs : dict, optional
If no :class:`FilteredSpectrogram` instance was given, one is
instantiated with these additional keyword arguments and
logarithmically scaled afterwards, i.e. passed to
:class:`LogarithmicSpectrogram`.
Notes
-----
For the filtering and scaling parameters, please refer to
:class:`FilteredSpectrogram` and :class:`LogarithmicSpectrogram`.
See Also
--------
:class:`FilteredSpectrogram`
:class:`LogarithmicSpectrogram`
Examples
--------
Create a :class:`LogarithmicFilteredSpectrogram` from a
:class:`Spectrogram` (or anything it can be instantiated from. This is
mainly a convenience class which first filters the spectrogram and then
scales it logarithmically.
>>> spec = LogarithmicFilteredSpectrogram('tests/data/audio/sample.wav')
>>> spec # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
LogarithmicFilteredSpectrogram([[ 0.82358, 0.86341, ...,
0.02295, 0.02719],
[ 0.97509, 0.98658, ...,
0.03223, 0.0375 ],
...,
[ 1.04322, 0.32637, ...,
0.02065, 0.01821],
[ 0.98236, 0.89276, ...,
0.01587, 0.0144 ]], dtype=float32)
>>> spec.shape
(281, 81)
>>> spec.filterbank # doctest: +ELLIPSIS
LogarithmicFilterbank([[...]], dtype=float32)
>>> spec.min() # doctest: +ELLIPSIS
LogarithmicFilteredSpectrogram(0.00830..., dtype=float32)
"""
# pylint: disable=super-on-old-class
# pylint: disable=super-init-not-called
# pylint: disable=attribute-defined-outside-init
def __init__(self, spectrogram, **kwargs):
# this method is for documentation purposes only
pass
def __new__(cls, spectrogram, **kwargs):
# get the log args
mul = kwargs.pop('mul', MUL)
add = kwargs.pop('add', ADD)
# instantiate a FilteredSpectrogram if needed
if not isinstance(spectrogram, FilteredSpectrogram):
spectrogram = FilteredSpectrogram(spectrogram, **kwargs)
# take the logarithm
data = LogarithmicSpectrogram(spectrogram, mul=mul, add=add, **kwargs)
# cast as LogarithmicFilteredSpectrogram
obj = np.asarray(data).view(cls)
# save additional attributes
obj.mul = data.mul
obj.add = data.add
# and those from the given spectrogram
obj.stft = spectrogram.stft
obj.spectrogram = spectrogram
# return the object
return obj
@property
def filterbank(self):
"""Filterbank."""
return self.spectrogram.filterbank
@property
def bin_frequencies(self):
"""Bin frequencies."""
return self.filterbank.center_frequencies
[docs]class LogarithmicFilteredSpectrogramProcessor(Processor):
"""
Logarithmic Filtered Spectrogram Processor class.
Parameters
----------
filterbank : :class:`.audio.filters.Filterbank`
Filterbank used to filter a spectrogram.
num_bands : int
Number of bands (per octave).
fmin : float, optional
Minimum frequency of the filterbank [Hz].
fmax : float, optional
Maximum frequency of the filterbank [Hz].
fref : float, optional
Tuning frequency of the filterbank [Hz].
norm_filters : bool, optional
Normalize the filter of the filterbank to area 1.
unique_filters : bool, optional
Indicate if the filterbank should contain only unique filters, i.e.
remove duplicate filters resulting from insufficient resolution at
low frequencies.
mul : float, optional
Multiply the magnitude spectrogram with this factor before taking the
logarithm.
add : float, optional
Add this value before taking the logarithm of the magnitudes.
"""
def __init__(self, filterbank=FILTERBANK, num_bands=NUM_BANDS, fmin=FMIN,
fmax=FMAX, fref=A4, norm_filters=NORM_FILTERS,
unique_filters=UNIQUE_FILTERS, mul=MUL, add=ADD, **kwargs):
# pylint: disable=unused-argument
self.filterbank = filterbank
self.num_bands = num_bands
self.fmin = fmin
self.fmax = fmax
self.fref = fref
self.norm_filters = norm_filters
self.unique_filters = unique_filters
self.mul = mul
self.add = add
[docs] def process(self, data, **kwargs):
"""
Perform filtering and logarithmic scaling of a spectrogram.
Parameters
----------
data : numpy array
Data to be processed.
kwargs : dict
Keyword arguments passed to
:class:`LogarithmicFilteredSpectrogram`.
Returns
-------
log_filt_spec : :class:`LogarithmicFilteredSpectrogram` instance
Logarithmically scaled filtered spectrogram.
"""
# update arguments passed to LogarithmicFilteredSpectrogram
args = dict(filterbank=self.filterbank, num_bands=self.num_bands,
fmin=self.fmin, fmax=self.fmax, fref=self.fref,
norm_filters=self.norm_filters,
unique_filters=self.unique_filters, mul=self.mul,
add=self.add)
args.update(kwargs)
# instantiate a LogarithmicFilteredSpectrogram
data = LogarithmicFilteredSpectrogram(data, **args)
# cache the filterbank
self.filterbank = data.filterbank
return data
# spectrogram difference stuff
DIFF_RATIO = 0.5
DIFF_FRAMES = None
DIFF_MAX_BINS = None
POSITIVE_DIFFS = False
def _diff_frames(diff_ratio, hop_size, frame_size, window=np.hanning):
"""
Compute the number of `diff_frames` for the given ratio of overlap.
Parameters
----------
diff_ratio : float
Ratio of overlap of windows of two consecutive STFT frames.
hop_size : int
Samples between two adjacent frames.
frame_size : int
Size of one frames in samples.
window : numpy ufunc or array
Window funtion.
Returns
-------
diff_frames : int
Number of frames to calculate the difference to.
"""
# calculate the number of diff frames on basis of the diff_ratio
# first sample of the window with a higher magnitude than given ratio
if hasattr(window, '__call__'):
# Note: if only a window function is given (default in audio.stft),
# generate a window of size `frame_size` with the given shape
window = window(frame_size)
sample = np.argmax(window > float(diff_ratio) * max(window))
diff_samples = len(window) / 2 - sample
# convert to frames, must be at least 1
return int(max(1, round(diff_samples / hop_size)))
[docs]class SpectrogramDifference(Spectrogram):
"""
SpectrogramDifference class.
Parameters
----------
spectrogram : :class:`Spectrogram` instance
Spectrogram.
diff_ratio : float, optional
Calculate the difference to the frame at which the window used for the
STFT yields this ratio of the maximum height.
diff_frames : int, optional
Calculate the difference to the `diff_frames`-th previous frame (if
set, this overrides the value calculated from the `diff_ratio`)
diff_max_bins : int, optional
Apply a maximum filter with this width (in bins in frequency dimension)
to the spectrogram the difference is calculated to.
positive_diffs : bool, optional
Keep only the positive differences, i.e. set all diff values < 0 to 0.
keep_dims : bool, optional
Indicate if the dimensions (i.e. shape) of the spectrogram should be
kept.
kwargs : dict, optional
If no :class:`Spectrogram` instance was given, one is instantiated with
these additional keyword arguments.
Notes
-----
The first `diff_frames` frames will have a value of 0.
If `keep_dims` is 'True' the returned difference has the same shape as the
spectrogram. This is needed if the diffs should be stacked on top of it.
If set to 'False', the length will be `diff_frames` frames shorter (mostly
used by the SpectrogramDifferenceProcessor which first buffers that many
frames.
The SuperFlux algorithm [1]_ uses a maximum filtered spectrogram with 3
`diff_max_bins` together with a 24 band logarithmic filterbank to calculate
the difference spectrogram with a `diff_ratio` of 0.5.
The effect of this maximum filter applied to the spectrogram is that the
magnitudes are "widened" in frequency direction, i.e. the following
difference calculation is less sensitive against frequency fluctuations.
This effect is exploited to suppress false positive energy fragments
originating from vibrato.
References
----------
.. [1] Sebastian Böck and Gerhard Widmer
"Maximum Filter Vibrato Suppression for Onset Detection"
Proceedings of the 16th International Conference on Digital Audio
Effects (DAFx), 2013.
Examples
--------
To obtain the SuperFlux feature as described above first create a filtered
and logarithmically spaced spectrogram:
>>> spec = LogarithmicFilteredSpectrogram('tests/data/audio/sample.wav', \
num_bands=24, fps=200)
>>> spec # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
LogarithmicFilteredSpectrogram([[ 0.82358, 0.86341, ...,
0.02809, 0.02672],
[ 0.92514, 0.93211, ...,
0.03607, 0.0317 ],
...,
[ 1.03826, 0.767 , ...,
0.01814, 0.01138],
[ 0.98236, 0.89276, ...,
0.01669, 0.00919]], dtype=float32)
>>> spec.shape
(561, 140)
Then use the temporal first order difference and apply a maximum filter
with 3 bands, keeping only the positive differences (i.e. rise in energy):
>>> superflux = SpectrogramDifference(spec, diff_max_bins=3, \
positive_diffs=True)
>>> superflux # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
SpectrogramDifference([[ 0. , 0. , ..., 0. , 0. ],
[ 0. , 0. , ..., 0. , 0. ],
...,
[ 0.01941, 0. , ..., 0. , 0. ],
[ 0. , 0. , ..., 0. , 0. ]], dtype=float32)
"""
# pylint: disable=super-on-old-class
# pylint: disable=super-init-not-called
# pylint: disable=attribute-defined-outside-init
def __init__(self, spectrogram, diff_ratio=DIFF_RATIO,
diff_frames=DIFF_FRAMES, diff_max_bins=DIFF_MAX_BINS,
positive_diffs=POSITIVE_DIFFS, keep_dims=True, **kwargs):
# this method is for documentation purposes only
pass
def __new__(cls, spectrogram, diff_ratio=DIFF_RATIO,
diff_frames=DIFF_FRAMES, diff_max_bins=DIFF_MAX_BINS,
positive_diffs=POSITIVE_DIFFS, keep_dims=True, **kwargs):
# instantiate a Spectrogram if needed
if not isinstance(spectrogram, Spectrogram):
# try to instantiate a Spectrogram object
spectrogram = Spectrogram(spectrogram, **kwargs)
# calculate the number of diff frames to use
if diff_frames is None:
diff_frames = _diff_frames(
diff_ratio, hop_size=spectrogram.stft.frames.hop_size,
frame_size=spectrogram.stft.frames.frame_size,
window=spectrogram.stft.window)
# apply a maximum filter to diff_spec if needed
if diff_max_bins is not None and diff_max_bins > 1:
from scipy.ndimage.filters import maximum_filter
# widen the spectrogram in frequency dimension
size = [1, int(diff_max_bins)]
diff_spec = maximum_filter(spectrogram, size=size)
else:
diff_spec = spectrogram
# calculate the diff
if keep_dims:
diff = np.zeros_like(spectrogram)
diff[diff_frames:] = (spectrogram[diff_frames:] -
diff_spec[:-diff_frames])
else:
diff = spectrogram[diff_frames:] - diff_spec[:-diff_frames]
# positive differences only?
if positive_diffs:
np.maximum(diff, 0, out=diff)
# cast as FilteredSpectrogram
obj = np.asarray(diff).view(cls)
# save additional attributes
obj.spectrogram = spectrogram
obj.diff_ratio = diff_ratio
obj.diff_frames = diff_frames
obj.diff_max_bins = diff_max_bins
obj.positive_diffs = positive_diffs
# return the object
return obj
def __array_finalize__(self, obj):
if obj is None:
return
# set default values here, also needed for views
self.diff_ratio = getattr(obj, 'diff_ratio', 0.5)
self.diff_frames = getattr(obj, 'diff_frames', None)
self.diff_max_bins = getattr(obj, 'diff_max_bins', None)
self.positive_diffs = getattr(obj, 'positive_diffs', False)
@property
def bin_frequencies(self):
"""Bin frequencies."""
return self.spectrogram.bin_frequencies
[docs] def positive_diff(self):
"""Positive diff."""
return np.maximum(self, 0)
[docs]class SpectrogramDifferenceProcessor(Processor):
"""
Difference Spectrogram Processor class.
Parameters
----------
diff_ratio : float, optional
Calculate the difference to the frame at which the window used for the
STFT yields this ratio of the maximum height.
diff_frames : int, optional
Calculate the difference to the `diff_frames`-th previous frame (if
set, this overrides the value calculated from the `diff_ratio`)
diff_max_bins : int, optional
Apply a maximum filter with this width (in bins in frequency dimension)
to the spectrogram the difference is calculated to.
positive_diffs : bool, optional
Keep only the positive differences, i.e. set all diff values < 0 to 0.
stack_diffs : numpy stacking function, optional
If 'None', only the differences are returned. If set, the diffs are
stacked with the underlying spectrogram data according to the `stack`
function:
- ``np.vstack``
the differences and spectrogram are stacked vertically, i.e. in time
direction,
- ``np.hstack``
the differences and spectrogram are stacked horizontally, i.e. in
frequency direction,
- ``np.dstack``
the differences and spectrogram are stacked in depth, i.e. return
them as a 3D representation with depth as the third dimension.
"""
def __init__(self, diff_ratio=DIFF_RATIO, diff_frames=DIFF_FRAMES,
diff_max_bins=DIFF_MAX_BINS, positive_diffs=POSITIVE_DIFFS,
stack_diffs=None, **kwargs):
# pylint: disable=unused-argument
self.diff_ratio = diff_ratio
self.diff_frames = diff_frames
self.diff_max_bins = diff_max_bins
self.positive_diffs = positive_diffs
self.stack_diffs = stack_diffs
# attributes needed for stateful processing
# Note: do not init the buffer here, since it depends on the data
self._buffer = None
def __getstate__(self):
# copy everything to a pickleable object
state = self.__dict__.copy()
# do not pickle attributes needed for stateful processing
state.pop('_buffer', None)
return state
def __setstate__(self, state):
# restore pickled instance attributes
self.__dict__.update(state)
# add non-pickled attributes needed for stateful processing
self._buffer = None
[docs] def process(self, data, reset=True, **kwargs):
"""
Perform a temporal difference calculation on the given data.
Parameters
----------
data : numpy array
Data to be processed.
reset : bool, optional
Reset the spectrogram buffer before computing the difference.
kwargs : dict
Keyword arguments passed to :class:`SpectrogramDifference`.
Returns
-------
diff : :class:`SpectrogramDifference` instance
Spectrogram difference.
Notes
-----
If `reset` is 'True', the first `diff_frames` differences will be 0.
"""
# update arguments passed to SpectrogramDifference
args = dict(diff_ratio=self.diff_ratio, diff_frames=self.diff_frames,
diff_max_bins=self.diff_max_bins,
positive_diffs=self.positive_diffs)
args.update(kwargs)
# calculate the number of diff frames
if self.diff_frames is None:
# Note: use diff_ration from args, not self.diff_ratio
self.diff_frames = _diff_frames(
args['diff_ratio'], frame_size=data.stft.frames.frame_size,
hop_size=data.stft.frames.hop_size, window=data.stft.window)
# init buffer or shift it
if self._buffer is None or reset:
# put diff_frames NaNs before the data (will be replaced by 0s)
init = np.empty((self.diff_frames, data.shape[1]))
init[:] = np.nan
data = np.insert(data, 0, init, axis=0)
# use the data for the buffer
self._buffer = BufferProcessor(init=data)
else:
# shift buffer by length of data and put new data at end of buffer
data = self._buffer(data)
# compute difference based on this data (reduce 1st dimension)
diff = SpectrogramDifference(data, keep_dims=False, **args)
# set all NaN-diffs to 0
diff[np.isnan(diff)] = 0
# stack the diff and the data if needed
if self.stack_diffs is None:
return diff
else:
# Note: don't use `data` directly, because it could be a str
# we ave to access diff.spectrogram (i.e. converted data)
return self.stack_diffs((diff.spectrogram[self.diff_frames:],
diff))
[docs] def reset(self):
"""Reset the SpectrogramDifferenceProcessor."""
# reset cached spectrogram data
self._buffer = None
@staticmethod
[docs] def add_arguments(parser, diff=None, diff_ratio=None, diff_frames=None,
diff_max_bins=None, positive_diffs=None):
"""
Add spectrogram difference related arguments to an existing parser.
Parameters
----------
parser : argparse parser instance
Existing argparse parser object.
diff : bool, optional
Take the difference of the spectrogram.
diff_ratio : float, optional
Calculate the difference to the frame at which the window used for
the STFT yields this ratio of the maximum height.
diff_frames : int, optional
Calculate the difference to the `diff_frames`-th previous frame (if
set, this overrides the value calculated from the `diff_ratio`)
diff_max_bins : int, optional
Apply a maximum filter with this width (in bins in frequency
dimension) to the spectrogram the difference is calculated to.
positive_diffs : bool, optional
Keep only the positive differences, i.e. set all diff values < 0
to 0.
Returns
-------
argparse argument group
Spectrogram difference argument parser group.
Notes
-----
Parameters are included in the group only if they are not 'None'.
Only the `diff_frames` parameter behaves differently, it is included
if either the `diff_ratio` is set or a value != 'None' is given.
"""
# add diff related options to the existing parser
g = parser.add_argument_group('spectrogram difference arguments')
# diff
if diff is True:
g.add_argument('--no_diff', dest='diff', action='store_false',
help='use the spectrogram [default=differences '
'of the spectrogram]')
elif diff is False:
g.add_argument('--diff', action='store_true',
help='use the differences of the spectrogram '
'[default=spectrogram]')
# diff ratio
if diff_ratio is not None:
g.add_argument('--diff_ratio', action='store', type=float,
default=diff_ratio,
help='calculate the difference to the frame at '
'which the window of the STFT have this ratio '
'of the maximum height '
'[default=%(default).1f]')
# diff frames
if diff_ratio is not None or diff_frames:
g.add_argument('--diff_frames', action='store', type=int,
default=diff_frames,
help='calculate the difference to the N-th previous'
' frame (this overrides the value calculated '
'with `diff_ratio`) [default=%(default)s]')
# positive diffs
if positive_diffs is True:
g.add_argument('--all_diffs', dest='positive_diffs',
action='store_false',
help='keep both positive and negative diffs '
'[default=only the positive diffs]')
elif positive_diffs is False:
g.add_argument('--positive_diffs', action='store_true',
help='keep only positive diffs '
'[default=positive and negative diffs]')
# add maximum filter related options to the existing parser
if diff_max_bins is not None:
g.add_argument('--max_bins', action='store', type=int,
dest='diff_max_bins', default=diff_max_bins,
help='apply a maximum filter with this width (in '
'frequency bins) [default=%(default)d]')
# return the group
return g
[docs]class SuperFluxProcessor(SequentialProcessor):
"""
Spectrogram processor which sets the default values suitable for the
SuperFlux algorithm.
"""
# pylint: disable=too-many-ancestors
def __init__(self, **kwargs):
from .stft import ShortTimeFourierTransformProcessor
# set the default values (can be overwritten if set)
# we need an un-normalized LogarithmicFilterbank with 24 bands
filterbank = kwargs.pop('filterbank', FILTERBANK)
num_bands = kwargs.pop('num_bands', 24)
norm_filters = kwargs.pop('norm_filters', False)
# we want max filtered diffs
diff_ratio = kwargs.pop('diff_ratio', 0.5)
diff_max_bins = kwargs.pop('diff_max_bins', 3)
positive_diffs = kwargs.pop('positive_diffs', True)
# processing chain
stft = ShortTimeFourierTransformProcessor(**kwargs)
spec = SpectrogramProcessor(**kwargs)
filt = FilteredSpectrogramProcessor(filterbank=filterbank,
num_bands=num_bands,
norm_filters=norm_filters,
**kwargs)
log = LogarithmicSpectrogramProcessor(**kwargs)
diff = SpectrogramDifferenceProcessor(diff_ratio=diff_ratio,
diff_max_bins=diff_max_bins,
positive_diffs=positive_diffs,
**kwargs)
# sequentially process everything
super(SuperFluxProcessor, self).__init__([stft, spec, filt, log, diff])
[docs]class MultiBandSpectrogram(FilteredSpectrogram):
"""
MultiBandSpectrogram class.
Parameters
----------
spectrogram : :class:`Spectrogram` instance
Spectrogram.
crossover_frequencies : list or numpy array
List of crossover frequencies at which the `spectrogram` is split
into multiple bands.
fmin : float, optional
Minimum frequency of the filterbank [Hz].
fmax : float, optional
Maximum frequency of the filterbank [Hz].
norm_filters : bool, optional
Normalize the filter bands of the filterbank to area 1.
unique_filters : bool, optional
Indicate if the filterbank should contain only unique filters, i.e.
remove duplicate filters resulting from insufficient resolution at
low frequencies.
kwargs : dict, optional
If no :class:`Spectrogram` instance was given, one is instantiated
with these additional keyword arguments.
Notes
-----
The MultiBandSpectrogram is implemented as a :class:`Spectrogram` which
uses a :class:`.audio.filters.RectangularFilterbank` to combine multiple
frequency bins.
"""
# pylint: disable=super-on-old-class
# pylint: disable=super-init-not-called
# pylint: disable=attribute-defined-outside-init
def __init__(self, spectrogram, crossover_frequencies, fmin=FMIN,
fmax=FMAX, norm_filters=NORM_FILTERS,
unique_filters=UNIQUE_FILTERS, **kwargs):
# this method is for documentation purposes only
pass
def __new__(cls, spectrogram, crossover_frequencies, fmin=FMIN, fmax=FMAX,
norm_filters=NORM_FILTERS, unique_filters=UNIQUE_FILTERS,
**kwargs):
from .filters import RectangularFilterbank
# instantiate a Spectrogram if needed
if not isinstance(spectrogram, Spectrogram):
spectrogram = Spectrogram(spectrogram, **kwargs)
# create a rectangular filterbank
filterbank = RectangularFilterbank(spectrogram.bin_frequencies,
crossover_frequencies,
fmin=fmin, fmax=fmax,
norm_filters=norm_filters,
unique_filters=unique_filters)
# filter the spectrogram
data = np.dot(spectrogram, filterbank)
# cast as FilteredSpectrogram
obj = np.asarray(data).view(cls)
# save additional attributes
obj.spectrogram = spectrogram
obj.filterbank = filterbank
obj.crossover_frequencies = crossover_frequencies
# return the object
return obj
def __array_finalize__(self, obj):
if obj is None:
return
# set default values here, also needed for views
self.spectrogram = getattr(obj, 'spectrogram', None)
self.filterbank = getattr(obj, 'filterbank', None)
self.crossover_frequencies = getattr(obj, 'crossover_frequencies',
None)
[docs]class MultiBandSpectrogramProcessor(Processor):
"""
Spectrogram processor which combines the spectrogram magnitudes into
multiple bands.
Parameters
----------
crossover_frequencies : list or numpy array
List of crossover frequencies at which a spectrogram is split into
the individual bands.
fmin : float, optional
Minimum frequency of the filterbank [Hz].
fmax : float, optional
Maximum frequency of the filterbank [Hz].
norm_filters : bool, optional
Normalize the filter bands of the filterbank to area 1.
unique_filters : bool, optional
Indicate if the filterbank should contain only unique filters, i.e.
remove duplicate filters resulting from insufficient resolution at
low frequencies.
"""
def __init__(self, crossover_frequencies, fmin=FMIN, fmax=FMAX,
norm_filters=NORM_FILTERS, unique_filters=UNIQUE_FILTERS,
**kwargs):
# pylint: disable=unused-argument
self.crossover_frequencies = np.array(crossover_frequencies)
self.fmin = fmin
self.fmax = fmax
self.norm_filters = norm_filters
self.unique_filters = unique_filters
[docs] def process(self, data, **kwargs):
"""
Return the a multi-band representation of the given data.
Parameters
----------
data : numpy array
Data to be processed.
kwargs : dict
Keyword arguments passed to :class:`MultiBandSpectrogram`.
Returns
-------
multi_band_spec : :class:`MultiBandSpectrogram` instance
Spectrogram split into multiple bands.
"""
# update arguments passed to MultiBandSpectrogram
args = dict(crossover_frequencies=self.crossover_frequencies,
fmin=self.fmin, fmax=self.fmax,
norm_filters=self.norm_filters,
unique_filters=self.unique_filters)
args.update(kwargs)
# instantiate a MultiBandSpectrogram
return MultiBandSpectrogram(data, **args)
[docs]class SemitoneBandpassSpectrogram(FilteredSpectrogram):
"""
Construct a semitone spectrogram by using a time domain filterbank of
bandpass filters as described in [1]_.
Parameters
----------
signal : Signal
Signal instance.
fps : float, optional
Frame rate of the spectrogram [Hz].
fmin : float, optional
Lowest frequency of the spectrogram [Hz].
fmax : float, optional
Highest frequency of the spectrogram [Hz].
References
----------
.. [1] Meinard Müller,
"Information retrieval for music and motion", Springer, 2007.
"""
# pylint: disable=super-on-old-class
# pylint: disable=super-init-not-called
# pylint: disable=attribute-defined-outside-init
def __init__(self, signal, fps=50., fmin=27.5, fmax=4200.):
# this method is for documentation purposes only
pass
def __new__(cls, signal, fps=50., fmin=27.5, fmax=4200.):
from scipy.signal import filtfilt
from .filters import SemitoneBandpassFilterbank
from .signal import FramedSignal, Signal, energy, resample
# check if we got a mono Signal
if not isinstance(signal, Signal) or signal.num_channels != 1:
signal = Signal(signal, num_channels=1)
sample_rate = float(signal.sample_rate)
# keep a reference to the original signal
signal_ = signal
# determine how many frames the filtered signal will have
num_frames = np.round(len(signal) * fps / sample_rate) + 1
# compute the energy of the frames of the bandpass filtered signal
filterbank = SemitoneBandpassFilterbank(fmin=fmin, fmax=fmax)
bands = []
for filt, band_sample_rate in zip(filterbank.filters,
filterbank.band_sample_rates):
# frames should overlap 50%
frame_size = np.round(2 * band_sample_rate / float(fps))
# down-sample audio if needed
if band_sample_rate != signal.sample_rate:
signal = resample(signal_, band_sample_rate)
# filter the signal
b, a = filt
filtered_signal = filtfilt(b, a, signal)
# normalise the signal if it has an integer dtype
try:
filtered_signal /= np.iinfo(signal.dtype).max
except ValueError:
pass
# split into overlapping frames
frames = FramedSignal(filtered_signal, frame_size=frame_size,
fps=fps, sample_rate=band_sample_rate,
num_frames=num_frames)
# compute total energy of the frames
# Note: the energy of the signal is computed with respect to the
# reference sampling rate as in the MATLAB chroma toolbox
bands.append(energy(frames) / band_sample_rate * 22050.)
# cast as SemitoneBandpassSpectrogram
obj = np.vstack(bands).T.view(cls)
# save additional attributes
obj.filterbank = filterbank
obj.fps = fps
return obj
def __array_finalize__(self, obj):
if obj is None:
return
# set default values here
self.filterbank = getattr(obj, 'filterbank', None)
self.fps = getattr(obj, 'fps', None)