# encoding: utf-8
# pylint: disable=no-member
# pylint: disable=invalid-name
# pylint: disable=too-many-arguments
"""
This module contains onset detection related functionality.
"""
from __future__ import absolute_import, division, print_function
import numpy as np
from scipy.ndimage import uniform_filter
from scipy.ndimage.filters import maximum_filter, minimum_filter
from ..audio.signal import smooth as smooth_signal
from ..processors import (BufferProcessor, OnlineProcessor, ParallelProcessor,
Processor, SequentialProcessor, )
from ..utils import combine_events
EPSILON = np.spacing(1)
# onset detection helper functions
[docs]def wrap_to_pi(phase):
"""
Wrap the phase information to the range -π...π.
Parameters
----------
phase : numpy array
Phase of the STFT.
Returns
-------
wrapped_phase : numpy array
Wrapped phase.
"""
return np.mod(phase + np.pi, 2.0 * np.pi) - np.pi
[docs]def correlation_diff(spec, diff_frames=1, pos=False, diff_bins=1):
"""
Calculates the difference of the magnitude spectrogram relative to the
N-th previous frame shifted in frequency to achieve the highest
correlation between these two frames.
Parameters
----------
spec : numpy array
Magnitude spectrogram.
diff_frames : int, optional
Calculate the difference to the `diff_frames`-th previous frame.
pos : bool, optional
Keep only positive values.
diff_bins : int, optional
Maximum number of bins shifted for correlation calculation.
Returns
-------
correlation_diff : numpy array
(Positive) magnitude spectrogram differences.
Notes
-----
This function is only because of completeness, it is not intended to be
actually used, since it is extremely slow. Please consider the superflux()
function, since if performs equally well but much faster.
"""
# init diff matrix
diff_spec = np.zeros_like(spec)
if diff_frames < 1:
raise ValueError("number of `diff_frames` must be >= 1")
# calculate the diff
frames, bins = diff_spec.shape
corr = np.zeros((frames, diff_bins * 2 + 1))
for f in range(diff_frames, frames):
# correlate the frame with the previous one
# resulting size = bins * 2 - 1
c = np.correlate(spec[f], spec[f - diff_frames], mode='full')
# save the middle part
centre = len(c) / 2
corr[f] = c[centre - diff_bins: centre + diff_bins + 1]
# shift the frame for difference calculation according to the
# highest peak in correlation
bin_offset = diff_bins - np.argmax(corr[f])
bin_start = diff_bins + bin_offset
bin_stop = bins - 2 * diff_bins + bin_start
diff_spec[f, diff_bins:-diff_bins] = spec[f, diff_bins:-diff_bins] - \
spec[f - diff_frames, bin_start:bin_stop]
# keep only positive values
if pos:
np.maximum(diff_spec, 0, diff_spec)
return np.asarray(diff_spec)
# onset detection functions pluggable into SpectralOnsetDetection
# Note: all functions here expect a Spectrogram object as their sole argument
# thus it is not enforced that the algorithm does exactly what it is
# supposed to do, but new configurations can be built easily
[docs]def high_frequency_content(spectrogram):
"""
High Frequency Content.
Parameters
----------
spectrogram : :class:`Spectrogram` instance
Spectrogram instance.
Returns
-------
high_frequency_content : numpy array
High frequency content onset detection function.
References
----------
.. [1] Paul Masri,
"Computer Modeling of Sound for Transformation and Synthesis of
Musical Signals",
PhD thesis, University of Bristol, 1996.
"""
# HFC emphasizes high frequencies by weighting the magnitude spectrogram
# bins by their respective "number" (starting at low frequencies)
hfc = spectrogram * np.arange(spectrogram.num_bins)
return np.asarray(np.mean(hfc, axis=1))
[docs]def spectral_diff(spectrogram, diff_frames=None):
"""
Spectral Diff.
Parameters
----------
spectrogram : :class:`Spectrogram` instance
Spectrogram instance.
diff_frames : int, optional
Number of frames to calculate the diff to.
Returns
-------
spectral_diff : numpy array
Spectral diff onset detection function.
References
----------
.. [1] Chris Duxbury, Mark Sandler and Matthew Davis,
"A hybrid approach to musical note onset detection",
Proceedings of the 5th International Conference on Digital Audio
Effects (DAFx), 2002.
"""
from madmom.audio.spectrogram import SpectrogramDifference
# if the diff of a spectrogram is given, do not calculate the diff twice
if not isinstance(spectrogram, SpectrogramDifference):
spectrogram = spectrogram.diff(diff_frames=diff_frames,
positive_diffs=True)
# Spectral diff is the sum of all squared positive 1st order differences
return np.asarray(np.sum(spectrogram ** 2, axis=1))
[docs]def spectral_flux(spectrogram, diff_frames=None):
"""
Spectral Flux.
Parameters
----------
spectrogram : :class:`Spectrogram` instance
Spectrogram instance.
diff_frames : int, optional
Number of frames to calculate the diff to.
Returns
-------
spectral_flux : numpy array
Spectral flux onset detection function.
References
----------
.. [1] Paul Masri,
"Computer Modeling of Sound for Transformation and Synthesis of
Musical Signals",
PhD thesis, University of Bristol, 1996.
"""
from madmom.audio.spectrogram import SpectrogramDifference
# if the diff of a spectrogram is given, do not calculate the diff twice
if not isinstance(spectrogram, SpectrogramDifference):
spectrogram = spectrogram.diff(diff_frames=diff_frames,
positive_diffs=True)
# Spectral flux is the sum of all positive 1st order differences
return np.asarray(np.sum(spectrogram, axis=1))
[docs]def superflux(spectrogram, diff_frames=None, diff_max_bins=3):
"""
SuperFlux method with a maximum filter vibrato suppression stage.
Calculates the difference of bin k of the magnitude spectrogram relative to
the N-th previous frame with the maximum filtered spectrogram.
Parameters
----------
spectrogram : :class:`Spectrogram` instance
Spectrogram instance.
diff_frames : int, optional
Number of frames to calculate the diff to.
diff_max_bins : int, optional
Number of bins used for maximum filter.
Returns
-------
superflux : numpy array
SuperFlux onset detection function.
Notes
-----
This method works only properly, if the spectrogram is filtered with a
filterbank of the right frequency spacing. Filter banks with 24 bands per
octave (i.e. quarter-tone resolution) usually yield good results. With
`max_bins` = 3, the maximum of the bins k-1, k, k+1 of the frame
`diff_frames` to the left is used for the calculation of the difference.
References
----------
.. [1] Sebastian Böck and Gerhard Widmer,
"Maximum Filter Vibrato Suppression for Onset Detection",
Proceedings of the 16th International Conference on Digital Audio
Effects (DAFx), 2013.
"""
from madmom.audio.spectrogram import SpectrogramDifference
# if the diff of a spectrogram is given, do not calculate the diff twice
if not isinstance(spectrogram, SpectrogramDifference):
spectrogram = spectrogram.diff(diff_frames=diff_frames,
diff_max_bins=diff_max_bins,
positive_diffs=True)
# SuperFlux is the sum of all positive 1st order max. filtered differences
return np.asarray(np.sum(spectrogram, axis=1))
# TODO: should this be its own class so that we can set the filter
# sizes in seconds instead of frames?
[docs]def complex_flux(spectrogram, diff_frames=None, diff_max_bins=3,
temporal_filter=3, temporal_origin=0):
"""
ComplexFlux.
ComplexFlux is based on the SuperFlux, but adds an additional local group
delay based tremolo suppression.
Parameters
----------
spectrogram : :class:`Spectrogram` instance
:class:`Spectrogram` instance.
diff_frames : int, optional
Number of frames to calculate the diff to.
diff_max_bins : int, optional
Number of bins used for maximum filter.
temporal_filter : int, optional
Temporal maximum filtering of the local group delay [frames].
temporal_origin : int, optional
Origin of the temporal maximum filter.
Returns
-------
complex_flux : numpy array
ComplexFlux onset detection function.
References
----------
.. [1] Sebastian Böck and Gerhard Widmer,
"Local group delay based vibrato and tremolo suppression for onset
detection",
Proceedings of the 14th International Society for Music Information
Retrieval Conference (ISMIR), 2013.
"""
# create a mask based on the local group delay information
# take only absolute values of the local group delay and normalize them
lgd = np.abs(spectrogram.stft.phase().lgd()) / np.pi
# maximum filter along the temporal axis
# TODO: use HPSS instead of simple temporal filtering
if temporal_filter > 0:
lgd = maximum_filter(lgd, size=[temporal_filter, 1],
origin=temporal_origin)
# lgd = uniform_filter(lgd, size=[1, 3]) # better for percussive onsets
# create the weighting mask
try:
# if the magnitude spectrogram was filtered, use the minimum local
# group delay value of each filterbank (expanded by one frequency
# bin in both directions) as the mask
mask = np.zeros_like(spectrogram)
num_bins = lgd.shape[1]
for b in range(mask.shape[1]):
# determine the corner bins for the mask
corner_bins = np.nonzero(spectrogram.filterbank[:, b])[0]
# always expand to the next neighbour
start_bin = corner_bins[0] - 1
stop_bin = corner_bins[-1] + 2
# constrain the range
if start_bin < 0:
start_bin = 0
if stop_bin > num_bins:
stop_bin = num_bins
# set mask
mask[:, b] = np.amin(lgd[:, start_bin: stop_bin], axis=1)
except AttributeError:
# if the spectrogram is not filtered, use a simple minimum filter
# covering only the current bin and its neighbours
mask = minimum_filter(lgd, size=[1, 3])
# sum all positive 1st order max. filtered and weighted differences
diff = spectrogram.diff(diff_frames=diff_frames,
diff_max_bins=diff_max_bins,
positive_diffs=True)
return np.asarray(np.sum(diff * mask, axis=1))
[docs]def modified_kullback_leibler(spectrogram, diff_frames=1, epsilon=EPSILON):
"""
Modified Kullback-Leibler.
Parameters
----------
spectrogram : :class:`Spectrogram` instance
:class:`Spectrogram` instance.
diff_frames : int, optional
Number of frames to calculate the diff to.
epsilon : float, optional
Add `epsilon` to the `spectrogram` avoid division by 0.
Returns
-------
modified_kullback_leibler : numpy array
MKL onset detection function.
Notes
-----
The implementation presented in [1]_ is used instead of the original work
presented in [2]_.
References
----------
.. [1] Paul Brossier,
"Automatic Annotation of Musical Audio for Interactive
Applications",
PhD thesis, Queen Mary University of London, 2006.
.. [2] Stephen Hainsworth and Malcolm Macleod,
"Onset Detection in Musical Audio Signals",
Proceedings of the International Computer Music Conference (ICMC),
2003.
"""
if epsilon <= 0:
raise ValueError("a positive value must be added before division")
mkl = np.zeros_like(spectrogram)
mkl[diff_frames:] = (spectrogram[diff_frames:] /
(spectrogram[:-diff_frames] + epsilon))
# note: the original MKL uses sum instead of mean,
# but the range of mean is much more suitable
return np.asarray(np.mean(np.log(1 + mkl), axis=1))
def _phase_deviation(phase):
"""
Helper function used by phase_deviation() & weighted_phase_deviation().
Parameters
----------
phase : numpy array
Phase of the STFT.
Returns
-------
numpy array
Phase deviation.
"""
pd = np.zeros_like(phase)
# instantaneous frequency is given by the first difference
# ψ′(n, k) = ψ(n, k) − ψ(n − 1, k)
# change in instantaneous frequency is given by the second order difference
# ψ′′(n, k) = ψ′(n, k) − ψ′(n − 1, k)
pd[2:] = phase[2:] - 2 * phase[1:-1] + phase[:-2]
# map to the range -pi..pi
return np.asarray(wrap_to_pi(pd))
[docs]def phase_deviation(spectrogram):
"""
Phase Deviation.
Parameters
----------
spectrogram : :class:`Spectrogram` instance
:class:`Spectrogram` instance.
Returns
-------
phase_deviation : numpy array
Phase deviation onset detection function.
References
----------
.. [1] Juan Pablo Bello, Chris Duxbury, Matthew Davies and Mark Sandler,
"On the use of phase and energy for musical onset detection in the
complex domain",
IEEE Signal Processing Letters, Volume 11, Number 6, 2004.
"""
# absolute phase changes in instantaneous frequency
pd = np.abs(_phase_deviation(spectrogram.stft.phase()))
return np.asarray(np.mean(pd, axis=1))
[docs]def weighted_phase_deviation(spectrogram):
"""
Weighted Phase Deviation.
Parameters
----------
spectrogram : :class:`Spectrogram` instance
:class:`Spectrogram` instance.
Returns
-------
weighted_phase_deviation : numpy array
Weighted phase deviation onset detection function.
References
----------
.. [1] Simon Dixon,
"Onset Detection Revisited",
Proceedings of the 9th International Conference on Digital Audio
Effects (DAFx), 2006.
"""
# cache phase
phase = spectrogram.stft.phase()
# make sure the spectrogram is not filtered before
if np.shape(phase) != np.shape(spectrogram):
raise ValueError('spectrogram and phase must be of same shape')
# weighted_phase_deviation = spectrogram * phase_deviation
wpd = np.abs(_phase_deviation(phase) * spectrogram)
return np.asarray(np.mean(wpd, axis=1))
[docs]def normalized_weighted_phase_deviation(spectrogram, epsilon=EPSILON):
"""
Normalized Weighted Phase Deviation.
Parameters
----------
spectrogram : :class:`Spectrogram` instance
:class:`Spectrogram` instance.
epsilon : float, optional
Add `epsilon` to the `spectrogram` avoid division by 0.
Returns
-------
normalized_weighted_phase_deviation : numpy array
Normalized weighted phase deviation onset detection function.
References
----------
.. [1] Simon Dixon,
"Onset Detection Revisited",
Proceedings of the 9th International Conference on Digital Audio
Effects (DAFx), 2006.
"""
if epsilon <= 0:
raise ValueError("a positive value must be added before division")
# normalize WPD by the sum of the spectrogram
# (add a small epsilon so that we don't divide by 0)
norm = np.add(np.mean(spectrogram, axis=1), epsilon)
return np.asarray(weighted_phase_deviation(spectrogram) / norm)
def _complex_domain(spectrogram):
"""
Helper method used by complex_domain() & rectified_complex_domain().
Parameters
----------
spectrogram : :class:`Spectrogram` instance
:class:`Spectrogram` instance.
Returns
-------
numpy array
Complex domain onset detection function.
Notes
-----
We use the simple implementation presented in [1]_.
References
----------
.. [1] Simon Dixon,
"Onset Detection Revisited",
Proceedings of the 9th International Conference on Digital Audio
Effects (DAFx), 2006.
"""
# cache phase
phase = spectrogram.stft.phase()
# make sure the spectrogram is not filtered before
if np.shape(phase) != np.shape(spectrogram):
raise ValueError('spectrogram and phase must be of same shape')
# expected spectrogram
cd_target = np.zeros_like(phase)
# assume constant phase change
cd_target[1:] = 2 * phase[1:] - phase[:-1]
# add magnitude
cd_target = spectrogram * np.exp(1j * cd_target)
# create complex spectrogram
cd = spectrogram * np.exp(1j * phase)
# subtract the target values
cd[1:] -= cd_target[:-1]
return np.asarray(cd)
[docs]def complex_domain(spectrogram):
"""
Complex Domain.
Parameters
----------
spectrogram : :class:`Spectrogram` instance
:class:`Spectrogram` instance.
Returns
-------
complex_domain : numpy array
Complex domain onset detection function.
References
----------
.. [1] Juan Pablo Bello, Chris Duxbury, Matthew Davies and Mark Sandler,
"On the use of phase and energy for musical onset detection in the
complex domain",
IEEE Signal Processing Letters, Volume 11, Number 6, 2004.
"""
# take the sum of the absolute changes
return np.asarray(np.sum(np.abs(_complex_domain(spectrogram)), axis=1))
[docs]def rectified_complex_domain(spectrogram, diff_frames=None):
"""
Rectified Complex Domain.
Parameters
----------
spectrogram : :class:`Spectrogram` instance
:class:`Spectrogram` instance.
diff_frames : int, optional
Number of frames to calculate the diff to.
Returns
-------
rectified_complex_domain : numpy array
Rectified complex domain onset detection function.
References
----------
.. [1] Simon Dixon,
"Onset Detection Revisited",
Proceedings of the 9th International Conference on Digital Audio
Effects (DAFx), 2006.
"""
# rectified complex domain
rcd = _complex_domain(spectrogram)
# only keep values where the magnitude rises
pos_diff = spectrogram.diff(diff_frames=diff_frames, positive_diffs=True)
rcd *= pos_diff.astype(bool)
# take the sum of the absolute changes
return np.asarray(np.sum(np.abs(rcd), axis=1))
[docs]class SpectralOnsetProcessor(SequentialProcessor):
"""
The SpectralOnsetProcessor class implements most of the common onset
detection functions based on the magnitude or phase information of a
spectrogram.
Parameters
----------
onset_method : str, optional
Onset detection function. See `METHODS` for possible values.
kwargs : dict, optional
Keyword arguments passed to the pre-processing chain to obtain a
spectral representation of the signal.
Notes
-----
If the spectrogram should be filtered, the `filterbank` parameter must
contain a valid Filterbank, if it should be scaled logarithmically, `log`
must be set accordingly.
References
----------
.. [1] Paul Masri,
"Computer Modeling of Sound for Transformation and Synthesis of
Musical Signals",
PhD thesis, University of Bristol, 1996.
.. [2] Sebastian Böck and Gerhard Widmer,
"Maximum Filter Vibrato Suppression for Onset Detection",
Proceedings of the 16th International Conference on Digital Audio
Effects (DAFx), 2013.
Examples
--------
Create a SpectralOnsetProcessor and pass a file through the processor to
obtain an onset detection function. Per default the spectral flux [1]_ is
computed on a simple Spectrogram.
>>> sodf = SpectralOnsetProcessor()
>>> sodf # doctest: +ELLIPSIS
<madmom.features.onsets.SpectralOnsetProcessor object at 0x...>
>>> sodf.processors[-1] # doctest: +ELLIPSIS
<function spectral_flux at 0x...>
>>> sodf('tests/data/audio/sample.wav')
... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
array([ 0. , 100.90121, ..., 26.30577, 20.94439], dtype=float32)
The parameters passed to the signal pre-processing chain can be set when
creating the SpectralOnsetProcessor. E.g. to obtain the SuperFlux [2]_
onset detection function set these parameters:
>>> from madmom.audio.filters import LogarithmicFilterbank
>>> sodf = SpectralOnsetProcessor(onset_method='superflux', fps=200,
... filterbank=LogarithmicFilterbank,
... num_bands=24, log=np.log10)
>>> sodf('tests/data/audio/sample.wav')
... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
array([ 0. , 0. , 2.0868 , 1.02404, ..., 0.29888, 0.12122], dtype=float32)
"""
METHODS = ['superflux', 'complex_flux', 'high_frequency_content',
'spectral_diff', 'spectral_flux', 'modified_kullback_leibler',
'phase_deviation', 'weighted_phase_deviation',
'normalized_weighted_phase_deviation', 'complex_domain',
'rectified_complex_domain']
def __init__(self, onset_method='spectral_flux', **kwargs):
import inspect
from ..audio.signal import SignalProcessor, FramedSignalProcessor
from ..audio.stft import ShortTimeFourierTransformProcessor
from ..audio.spectrogram import (SpectrogramProcessor,
FilteredSpectrogramProcessor,
LogarithmicSpectrogramProcessor)
# for certain methods we need to circular shift the signal before STFT
if any(odf in onset_method for odf in ('phase', 'complex')):
kwargs['circular_shift'] = True
# always use mono signals
kwargs['num_channels'] = 1
# define processing chain
sig = SignalProcessor(**kwargs)
frames = FramedSignalProcessor(**kwargs)
stft = ShortTimeFourierTransformProcessor(**kwargs)
spec = SpectrogramProcessor(**kwargs)
processors = [sig, frames, stft, spec]
# filtering needed?
if 'filterbank' in kwargs.keys() and kwargs['filterbank'] is not None:
processors.append(FilteredSpectrogramProcessor(**kwargs))
# scaling needed?
if 'log' in kwargs.keys() and kwargs['log'] is not None:
processors.append(LogarithmicSpectrogramProcessor(**kwargs))
# odf function
if not inspect.isfunction(onset_method):
try:
onset_method = globals()[onset_method]
except KeyError:
raise ValueError('%s not a valid onset detection function, '
'choose %s.' % (onset_method, self.METHODS))
processors.append(onset_method)
# instantiate a SequentialProcessor
super(SpectralOnsetProcessor, self).__init__(processors)
[docs] @classmethod
def add_arguments(cls, parser, onset_method=None):
"""
Add spectral onset detection arguments to an existing parser.
Parameters
----------
parser : argparse parser instance
Existing argparse parser object.
onset_method : str, optional
Default onset detection method.
Returns
-------
parser_group : argparse argument group
Spectral onset detection argument parser group.
"""
# add onset detection method arguments to the existing parser
g = parser.add_argument_group('spectral onset detection arguments')
if onset_method is not None:
g.add_argument('--odf', dest='onset_method',
default=onset_method, choices=cls.METHODS,
help='use this onset detection function '
'[default=%(default)s]')
# return the argument group so it can be modified if needed
return g
# classes for detecting onsets with NNs
[docs]class RNNOnsetProcessor(SequentialProcessor):
"""
Processor to get a onset activation function from multiple RNNs.
Parameters
----------
online : bool, optional
Choose networks suitable for online onset detection, i.e. use
unidirectional RNNs.
Notes
-----
This class uses either uni- or bi-directional RNNs. Contrary to [1], it
uses simple tanh units as in [2]. Also the input representations changed
to use logarithmically filtered and scaled spectrograms.
References
----------
.. [1] "Universal Onset Detection with bidirectional Long Short-Term Memory
Neural Networks"
Florian Eyben, Sebastian Böck, Björn Schuller and Alex Graves.
Proceedings of the 11th International Society for Music Information
Retrieval Conference (ISMIR), 2010.
.. [2] "Online Real-time Onset Detection with Recurrent Neural Networks"
Sebastian Böck, Andreas Arzt, Florian Krebs and Markus Schedl.
Proceedings of the 15th International Conference on Digital Audio
Effects (DAFx), 2012.
Examples
--------
Create a RNNOnsetProcessor and pass a file through the processor to obtain
an onset detection function (sampled with 100 frames per second).
>>> proc = RNNOnsetProcessor()
>>> proc # doctest: +ELLIPSIS
<madmom.features.onsets.RNNOnsetProcessor object at 0x...>
>>> proc('tests/data/audio/sample.wav') # doctest: +ELLIPSIS
array([0.08313, 0.0024 , ... 0.00527], dtype=float32)
"""
def __init__(self, **kwargs):
# pylint: disable=unused-argument
from ..audio.signal import SignalProcessor, FramedSignalProcessor
from ..audio.stft import ShortTimeFourierTransformProcessor
from ..audio.spectrogram import (
FilteredSpectrogramProcessor, LogarithmicSpectrogramProcessor,
SpectrogramDifferenceProcessor)
from ..models import ONSETS_RNN, ONSETS_BRNN
from ..ml.nn import NeuralNetworkEnsemble
# choose the appropriate models and set frame sizes accordingly
if kwargs.get('online'):
nn_files = ONSETS_RNN
frame_sizes = [512, 1024, 2048]
else:
nn_files = ONSETS_BRNN
frame_sizes = [1024, 2048, 4096]
# define pre-processing chain
sig = SignalProcessor(num_channels=1, sample_rate=44100)
# process the multi-resolution spec & diff in parallel
multi = ParallelProcessor([])
for frame_size in frame_sizes:
# pass **kwargs in order to be able to process in online mode
frames = FramedSignalProcessor(frame_size=frame_size, **kwargs)
stft = ShortTimeFourierTransformProcessor() # caching FFT window
filt = FilteredSpectrogramProcessor(
num_bands=6, fmin=30, fmax=17000, norm_filters=True)
spec = LogarithmicSpectrogramProcessor(mul=5, add=1)
diff = SpectrogramDifferenceProcessor(
diff_ratio=0.25, positive_diffs=True, stack_diffs=np.hstack)
# process each frame size with spec and diff sequentially
multi.append(SequentialProcessor((frames, stft, filt, spec, diff)))
# stack the features and processes everything sequentially
pre_processor = SequentialProcessor((sig, multi, np.hstack))
# process the pre-processed signal with a NN ensemble
nn = NeuralNetworkEnsemble.load(nn_files, **kwargs)
# instantiate a SequentialProcessor
super(RNNOnsetProcessor, self).__init__((pre_processor, nn))
# must be a top-level function to be pickle-able
def _cnn_onset_processor_pad(data):
"""Pad the data by repeating the first and last frame 7 times."""
pad_start = np.repeat(data[:1], 7, axis=0)
pad_stop = np.repeat(data[-1:], 7, axis=0)
return np.concatenate((pad_start, data, pad_stop))
[docs]class CNNOnsetProcessor(SequentialProcessor):
"""
Processor to get a onset activation function from a CNN.
References
----------
.. [1] "Musical Onset Detection with Convolutional Neural Networks"
Jan Schlüter and Sebastian Böck.
Proceedings of the 6th International Workshop on Machine Learning
and Music, 2013.
Notes
-----
The implementation follows as closely as possible the original one, but
part of the signal pre-processing differs in minor aspects, so results can
differ slightly, too.
Examples
--------
Create a CNNOnsetProcessor and pass a file through the processor to obtain
an onset detection function (sampled with 100 frames per second).
>>> proc = CNNOnsetProcessor()
>>> proc # doctest: +ELLIPSIS
<madmom.features.onsets.CNNOnsetProcessor object at 0x...>
>>> proc('tests/data/audio/sample.wav') # doctest: +ELLIPSIS
array([0.05369, 0.04205, ... 0.00014], dtype=float32)
"""
def __init__(self, **kwargs):
# pylint: disable=unused-argument
from ..audio.signal import SignalProcessor, FramedSignalProcessor
from ..audio.stft import ShortTimeFourierTransformProcessor
from ..audio.filters import MelFilterbank
from ..audio.spectrogram import (FilteredSpectrogramProcessor,
LogarithmicSpectrogramProcessor)
from ..models import ONSETS_CNN
from ..ml.nn import NeuralNetwork
# define pre-processing chain
sig = SignalProcessor(num_channels=1, sample_rate=44100)
# process the multi-resolution spec in parallel
multi = ParallelProcessor([])
for frame_size in [2048, 1024, 4096]:
frames = FramedSignalProcessor(frame_size=frame_size, fps=100)
stft = ShortTimeFourierTransformProcessor() # caching FFT window
filt = FilteredSpectrogramProcessor(
filterbank=MelFilterbank, num_bands=80, fmin=27.5, fmax=16000,
norm_filters=True, unique_filters=False)
spec = LogarithmicSpectrogramProcessor(log=np.log, add=EPSILON)
# process each frame size with spec and diff sequentially
multi.append(SequentialProcessor((frames, stft, filt, spec)))
# stack the features (in depth) and pad at beginning and end
stack = np.dstack
pad = _cnn_onset_processor_pad
# pre-processes everything sequentially
pre_processor = SequentialProcessor((sig, multi, stack, pad))
# process the pre-processed signal with a NN ensemble
nn = NeuralNetwork.load(ONSETS_CNN[0])
# instantiate a SequentialProcessor
super(CNNOnsetProcessor, self).__init__((pre_processor, nn))
# universal peak-picking method
[docs]def peak_picking(activations, threshold, smooth=None, pre_avg=0, post_avg=0,
pre_max=1, post_max=1):
"""
Perform thresholding and peak-picking on the given activation function.
Parameters
----------
activations : numpy array
Activation function.
threshold : float
Threshold for peak-picking
smooth : int or numpy array, optional
Smooth the activation function with the kernel (size).
pre_avg : int, optional
Use `pre_avg` frames past information for moving average.
post_avg : int, optional
Use `post_avg` frames future information for moving average.
pre_max : int, optional
Use `pre_max` frames past information for moving maximum.
post_max : int, optional
Use `post_max` frames future information for moving maximum.
Returns
-------
peak_idx : numpy array
Indices of the detected peaks.
See Also
--------
:func:`smooth`
Notes
-----
If no moving average is needed (e.g. the activations are independent of
the signal's level as for neural network activations), set `pre_avg` and
`post_avg` to 0.
For peak picking of local maxima, set `pre_max` and `post_max` to 1.
For online peak picking, set all `post_` parameters to 0.
References
----------
.. [1] Sebastian Böck, Florian Krebs and Markus Schedl,
"Evaluating the Online Capabilities of Onset Detection Methods",
Proceedings of the 13th International Society for Music Information
Retrieval Conference (ISMIR), 2012.
"""
# smooth activations
activations = smooth_signal(activations, smooth)
# compute a moving average
avg_length = pre_avg + post_avg + 1
if avg_length > 1:
# TODO: make the averaging function exchangeable (mean/median/etc.)
avg_origin = int(np.floor((pre_avg - post_avg) / 2))
if activations.ndim == 1:
filter_size = avg_length
elif activations.ndim == 2:
filter_size = [avg_length, 1]
else:
raise ValueError('`activations` must be either 1D or 2D')
mov_avg = uniform_filter(activations, filter_size, mode='constant',
origin=avg_origin)
else:
# do not use a moving average
mov_avg = 0
# detections are those activations above the moving average + the threshold
detections = activations * (activations >= mov_avg + threshold)
# peak-picking
max_length = pre_max + post_max + 1
if max_length > 1:
# compute a moving maximum
max_origin = int(np.floor((pre_max - post_max) / 2))
if activations.ndim == 1:
filter_size = max_length
elif activations.ndim == 2:
filter_size = [max_length, 1]
else:
raise ValueError('`activations` must be either 1D or 2D')
mov_max = maximum_filter(detections, filter_size, mode='constant',
origin=max_origin)
# detections are peak positions
detections *= (detections == mov_max)
# return indices
if activations.ndim == 1:
return np.nonzero(detections)[0]
elif activations.ndim == 2:
return np.nonzero(detections)
else:
raise ValueError('`activations` must be either 1D or 2D')
[docs]class PeakPickingProcessor(Processor):
"""
Deprecated as of version 0.15. Will be removed in version 0.16. Use either
:class:`OnsetPeakPickingProcessor` or :class:`NotePeakPickingProcessor`
instead.
"""
def __init__(self, **kwargs):
# pylint: disable=unused-argument
self.kwargs = kwargs
[docs] def process(self, activations, **kwargs):
"""
Detect the peaks in the given activation function.
Parameters
----------
activations : numpy array
Onset activation function.
Returns
-------
peaks : numpy array
Detected onsets [seconds[, frequency bin]].
"""
import warnings
if activations.ndim == 1:
warnings.warn('`PeakPickingProcessor` is deprecated as of version '
'0.15 and will be removed in version 0.16. Use '
'`OnsetPeakPickingProcessor` instead.')
ppp = OnsetPeakPickingProcessor(**self.kwargs)
return ppp(activations, **kwargs)
elif activations.ndim == 2:
warnings.warn('`PeakPickingProcessor` is deprecated as of version '
'0.15 and will be removed in version 0.16. Use '
'`NotePeakPickingProcessor` instead.')
from .notes import NotePeakPickingProcessor
ppp = NotePeakPickingProcessor(**self.kwargs)
return ppp(activations, **kwargs)
[docs] @staticmethod
def add_arguments(parser, **kwargs):
"""
Deprecated as of version 0.15. Will be removed in version 0.16. Use
either :class:`OnsetPeakPickingProcessor` or
:class:`NotePeakPickingProcessor` instead.
"""
return OnsetPeakPickingProcessor.add_arguments(parser, **kwargs)
[docs]class OnsetPeakPickingProcessor(OnlineProcessor):
"""
This class implements the onset peak-picking functionality.
It transparently converts the chosen values from seconds to frames.
Parameters
----------
threshold : float
Threshold for peak-picking.
smooth : float, optional
Smooth the activation function over `smooth` seconds.
pre_avg : float, optional
Use `pre_avg` seconds past information for moving average.
post_avg : float, optional
Use `post_avg` seconds future information for moving average.
pre_max : float, optional
Use `pre_max` seconds past information for moving maximum.
post_max : float, optional
Use `post_max` seconds future information for moving maximum.
combine : float, optional
Only report one onset within `combine` seconds.
delay : float, optional
Report the detected onsets `delay` seconds delayed.
online : bool, optional
Use online peak-picking, i.e. no future information.
fps : float, optional
Frames per second used for conversion of timings.
Returns
-------
onsets : numpy array
Detected onsets [seconds].
Notes
-----
If no moving average is needed (e.g. the activations are independent of
the signal's level as for neural network activations), `pre_avg` and
`post_avg` should be set to 0.
For peak picking of local maxima, set `pre_max` >= 1. / `fps` and
`post_max` >= 1. / `fps`.
For online peak picking, all `post_` parameters are set to 0.
References
----------
.. [1] Sebastian Böck, Florian Krebs and Markus Schedl,
"Evaluating the Online Capabilities of Onset Detection Methods",
Proceedings of the 13th International Society for Music Information
Retrieval Conference (ISMIR), 2012.
Examples
--------
Create a PeakPickingProcessor. The returned array represents the positions
of the onsets in seconds, thus the expected sampling rate has to be given.
>>> proc = OnsetPeakPickingProcessor(fps=100)
>>> proc # doctest: +ELLIPSIS
<madmom.features.onsets.OnsetPeakPickingProcessor object at 0x...>
Call this OnsetPeakPickingProcessor with the onset activation function from
an RNNOnsetProcessor to obtain the onset positions.
>>> act = RNNOnsetProcessor()('tests/data/audio/sample.wav')
>>> proc(act) # doctest: +ELLIPSIS
array([0.09, 0.29, 0.45, ..., 2.34, 2.49, 2.67])
"""
FPS = 100
THRESHOLD = 0.5 # binary threshold
SMOOTH = 0.
PRE_AVG = 0.
POST_AVG = 0.
PRE_MAX = 0.
POST_MAX = 0.
COMBINE = 0.03
DELAY = 0.
ONLINE = False
def __init__(self, threshold=THRESHOLD, smooth=SMOOTH, pre_avg=PRE_AVG,
post_avg=POST_AVG, pre_max=PRE_MAX, post_max=POST_MAX,
combine=COMBINE, delay=DELAY, online=ONLINE, fps=FPS,
**kwargs):
# pylint: disable=unused-argument
# instantiate OnlineProcessor
super(OnsetPeakPickingProcessor, self).__init__(online=online)
if self.online:
# set some parameters to 0 (i.e. no future information available)
smooth = 0
post_avg = 0
post_max = 0
# init buffer
self.buffer = None
self.counter = 0
self.last_onset = None
# save parameters
self.threshold = threshold
self.smooth = smooth
self.pre_avg = pre_avg
self.post_avg = post_avg
self.pre_max = pre_max
self.post_max = post_max
self.combine = combine
self.delay = delay
self.fps = fps
[docs] def reset(self):
"""Reset OnsetPeakPickingProcessor."""
self.buffer = None
self.counter = 0
self.last_onset = None
[docs] def process_offline(self, activations, **kwargs):
"""
Detect the onsets in the given activation function.
Parameters
----------
activations : numpy array
Onset activation function.
Returns
-------
onsets : numpy array
Detected onsets [seconds].
"""
# convert timing information to frames and set default values
# TODO: use at least 1 frame if any of these values are > 0?
timings = np.array([self.smooth, self.pre_avg, self.post_avg,
self.pre_max, self.post_max]) * self.fps
timings = np.round(timings).astype(int)
# detect the peaks (function returns int indices)
onsets = peak_picking(activations, self.threshold, *timings)
# convert to timestamps
onsets = onsets.astype(np.float) / self.fps
# shift if necessary
if self.delay:
onsets += self.delay
# combine onsets
if self.combine:
onsets = combine_events(onsets, self.combine, 'left')
# return the onsets
return np.asarray(onsets)
[docs] def process_online(self, activations, reset=True, **kwargs):
"""
Detect the onsets in the given activation function.
Parameters
----------
activations : numpy array
Onset activation function.
reset : bool, optional
Reset the processor to its initial state before processing.
Returns
-------
onsets : numpy array
Detected onsets [seconds].
"""
# buffer data
if self.buffer is None or reset:
# reset the processor
self.reset()
# put 0s in front (depending on conext given by pre_max
init = np.zeros(int(np.round(self.pre_max * self.fps)))
buffer = np.insert(activations, 0, init, axis=0)
# offset the counter, because we buffer the activations
self.counter = -len(init)
# use the data for the buffer
self.buffer = BufferProcessor(init=buffer)
else:
buffer = self.buffer(activations)
# convert timing information to frames and set default values
# TODO: use at least 1 frame if any of these values are > 0?
timings = np.array([self.smooth, self.pre_avg, self.post_avg,
self.pre_max, self.post_max]) * self.fps
timings = np.round(timings).astype(int)
# detect the peaks (function returns int indices)
peaks = peak_picking(buffer, self.threshold, *timings)
# convert to onset timings
onsets = (self.counter + peaks) / float(self.fps)
# increase counter
self.counter += len(activations)
# shift if necessary
if self.delay:
raise ValueError('delay not supported yet in online mode')
# report only if there was no onset within the last combine seconds
if self.combine and onsets.any():
# prepend the last onset to be able to combine them correctly
start = 0
if self.last_onset is not None:
onsets = np.append(self.last_onset, onsets)
start = 1
# combine the onsets
onsets = combine_events(onsets, self.combine, 'left')
# use only if the last onsets differ
if onsets[-1] != self.last_onset:
self.last_onset = onsets[-1]
# remove the first onset if we added it previously
onsets = onsets[start:]
else:
# don't report an onset
onsets = np.empty(0)
# return the onsets
return onsets
process_sequence = process_offline
[docs] @staticmethod
def add_arguments(parser, threshold=THRESHOLD, smooth=None, pre_avg=None,
post_avg=None, pre_max=None, post_max=None,
combine=COMBINE, delay=DELAY):
"""
Add onset peak-picking related arguments to an existing parser.
Parameters
----------
parser : argparse parser instance
Existing argparse parser object.
threshold : float
Threshold for peak-picking.
smooth : float, optional
Smooth the activation function over `smooth` seconds.
pre_avg : float, optional
Use `pre_avg` seconds past information for moving average.
post_avg : float, optional
Use `post_avg` seconds future information for moving average.
pre_max : float, optional
Use `pre_max` seconds past information for moving maximum.
post_max : float, optional
Use `post_max` seconds future information for moving maximum.
combine : float, optional
Only report one onset within `combine` seconds.
delay : float, optional
Report the detected onsets `delay` seconds delayed.
Returns
-------
parser_group : argparse argument group
Onset peak-picking argument parser group.
Notes
-----
Parameters are included in the group only if they are not 'None'.
"""
# add onset peak-picking related options to the existing parser
g = parser.add_argument_group('peak-picking arguments')
g.add_argument('-t', dest='threshold', action='store', type=float,
default=threshold,
help='detection threshold [default=%(default).2f]')
if smooth is not None:
g.add_argument('--smooth', action='store', type=float,
default=smooth,
help='smooth the activation function over N '
'seconds [default=%(default).2f]')
if pre_avg is not None:
g.add_argument('--pre_avg', action='store', type=float,
default=pre_avg,
help='build average over N previous seconds '
'[default=%(default).2f]')
if post_avg is not None:
g.add_argument('--post_avg', action='store', type=float,
default=post_avg,
help='build average over N following seconds '
'[default=%(default).2f]')
if pre_max is not None:
g.add_argument('--pre_max', action='store', type=float,
default=pre_max,
help='search maximum over N previous seconds '
'[default=%(default).2f]')
if post_max is not None:
g.add_argument('--post_max', action='store', type=float,
default=post_max,
help='search maximum over N following seconds '
'[default=%(default).2f]')
if combine is not None:
g.add_argument('--combine', action='store', type=float,
default=combine,
help='combine events within N seconds '
'[default=%(default).2f]')
if delay is not None:
g.add_argument('--delay', action='store', type=float,
default=delay,
help='report the events N seconds delayed '
'[default=%(default)i]')
# return the argument group so it can be modified if needed
return g