# encoding: utf-8
# pylint: disable=no-member
# pylint: disable=invalid-name
# pylint: disable=too-many-arguments
"""
This module contains beat tracking related functionality.
"""
from __future__ import absolute_import, division, print_function
import sys
import numpy as np
from ..audio.signal import signal_frame, smooth as smooth_signal
from ..ml.nn import average_predictions
from ..processors import (OnlineProcessor, ParallelProcessor, Processor,
SequentialProcessor)
# classes for tracking (down-)beats with RNNs
[docs]class RNNBeatProcessor(SequentialProcessor):
"""
Processor to get a beat activation function from multiple RNNs.
Parameters
----------
post_processor : Processor, optional
Post-processor, default is to average the predictions.
online : bool, optional
Use signal processing parameters and RNN models suitable for online
mode.
nn_files : list, optional
List with trained RNN model files. Per default ('None'), an ensemble
of networks will be used.
References
----------
.. [1] Sebastian Böck and Markus Schedl,
"Enhanced Beat Tracking with Context-Aware Neural Networks",
Proceedings of the 14th International Conference on Digital Audio
Effects (DAFx), 2011.
Examples
--------
Create a RNNBeatProcessor and pass a file through the processor.
The returned 1d array represents the probability of a beat at each frame,
sampled at 100 frames per second.
>>> proc = RNNBeatProcessor()
>>> proc # doctest: +ELLIPSIS
<madmom.features.beats.RNNBeatProcessor object at 0x...>
>>> proc('tests/data/audio/sample.wav') # doctest: +ELLIPSIS
array([0.00479, 0.00603, 0.00927, 0.01419, ... 0.02725], dtype=float32)
For online processing, `online` must be set to 'True'. If processing power
is limited, fewer number of RNN models can be defined via `nn_files`. The
audio signal is then processed frame by frame.
>>> from madmom.models import BEATS_LSTM
>>> proc = RNNBeatProcessor(online=True, nn_files=[BEATS_LSTM[0]])
>>> proc # doctest: +ELLIPSIS
<madmom.features.beats.RNNBeatProcessor object at 0x...>
>>> proc('tests/data/audio/sample.wav') # doctest: +ELLIPSIS
array([0.03887, 0.02619, 0.00747, 0.00218, ... 0.04825], dtype=float32)
"""
def __init__(self, post_processor=average_predictions, online=False,
nn_files=None, **kwargs):
# pylint: disable=unused-argument
from ..audio.signal import SignalProcessor, FramedSignalProcessor
from ..audio.stft import ShortTimeFourierTransformProcessor
from ..audio.spectrogram import (
FilteredSpectrogramProcessor, LogarithmicSpectrogramProcessor,
SpectrogramDifferenceProcessor)
from ..ml.nn import NeuralNetworkEnsemble
from ..models import BEATS_LSTM, BEATS_BLSTM
# choose the appropriate models and set frame sizes accordingly
if online:
if nn_files is None:
nn_files = BEATS_LSTM
frame_sizes = [2048]
num_bands = 12
else:
if nn_files is None:
nn_files = BEATS_BLSTM
frame_sizes = [1024, 2048, 4096]
num_bands = 6
# define pre-processing chain
sig = SignalProcessor(num_channels=1, sample_rate=44100)
# process the multi-resolution spec & diff in parallel
multi = ParallelProcessor([])
for frame_size in frame_sizes:
frames = FramedSignalProcessor(frame_size=frame_size, **kwargs)
stft = ShortTimeFourierTransformProcessor() # caching FFT window
filt = FilteredSpectrogramProcessor(num_bands=num_bands, fmin=30,
fmax=17000, norm_filters=True)
spec = LogarithmicSpectrogramProcessor(mul=1, add=1)
diff = SpectrogramDifferenceProcessor(
diff_ratio=0.5, positive_diffs=True, stack_diffs=np.hstack)
# process each frame size with spec and diff sequentially
multi.append(SequentialProcessor((frames, stft, filt, spec, diff)))
# stack the features and processes everything sequentially
pre_processor = SequentialProcessor((sig, multi, np.hstack))
# process the pre-processed signal with a NN ensemble and the given
# post_processor
nn = NeuralNetworkEnsemble.load(nn_files,
ensemble_fn=post_processor, **kwargs)
# instantiate a SequentialProcessor
super(RNNBeatProcessor, self).__init__((pre_processor, nn))
# class for selecting a certain beat activation functions from (multiple) NNs
[docs]class MultiModelSelectionProcessor(Processor):
"""
Processor for selecting the most suitable model (i.e. the predictions
thereof) from a multiple models/predictions.
Parameters
----------
num_ref_predictions : int
Number of reference predictions (see below).
Notes
-----
This processor selects the most suitable prediction from multiple models by
comparing them to the predictions of a reference model. The one with the
smallest mean squared error is chosen.
If `num_ref_predictions` is 0 or None, an averaged prediction is computed
from the given predictions and used as reference.
References
----------
.. [1] Sebastian Böck, Florian Krebs and Gerhard Widmer,
"A Multi-Model Approach to Beat Tracking Considering Heterogeneous
Music Styles",
Proceedings of the 15th International Society for Music Information
Retrieval Conference (ISMIR), 2014.
Examples
--------
The MultiModelSelectionProcessor takes a list of model predictions as it's
call argument. Thus, `ppost_processor` of `RNNBeatProcessor` hast to be set
to 'None' in order to get the predictions of all models.
>>> proc = RNNBeatProcessor(post_processor=None)
>>> proc # doctest: +ELLIPSIS
<madmom.features.beats.RNNBeatProcessor object at 0x...>
When passing a file through the processor, a list with predictions, one for
each model tested, is returned.
>>> predictions = proc('tests/data/audio/sample.wav')
>>> predictions # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
[array([0.00535, 0.00774, ..., 0.02343, 0.04931], dtype=float32),
array([0.0022 , 0.00282, ..., 0.00825, 0.0152 ], dtype=float32),
...,
array([0.005 , 0.0052 , ..., 0.00472, 0.01524], dtype=float32),
array([0.00319, 0.0044 , ..., 0.0081 , 0.01498], dtype=float32)]
We can feed these predictions to the MultiModelSelectionProcessor.
Since we do not have a dedicated reference prediction (which had to be the
first element of the list and `num_ref_predictions` set to 1), we simply
set `num_ref_predictions` to 'None'. MultiModelSelectionProcessor averages
all predictions to obtain a reference prediction it compares all others to.
>>> mm_proc = MultiModelSelectionProcessor(num_ref_predictions=None)
>>> mm_proc(predictions) # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
array([0.00759, 0.00901, ..., 0.00843, 0.01834], dtype=float32)
"""
def __init__(self, num_ref_predictions, **kwargs):
# pylint: disable=unused-argument
self.num_ref_predictions = num_ref_predictions
[docs] def process(self, predictions, **kwargs):
"""
Selects the most appropriate predictions form the list of predictions.
Parameters
----------
predictions : list
Predictions (beat activation functions) of multiple models.
Returns
-------
numpy array
Most suitable prediction.
Notes
-----
The reference beat activation function must be the first one in the
list of given predictions.
"""
# TODO: right now we only have 1D predictions, what to do with
# multi-dim?
num_refs = self.num_ref_predictions
# determine the reference prediction
if num_refs in (None, 0):
# just average all predictions to simulate a reference network
reference = average_predictions(predictions)
elif num_refs > 0:
# average the reference predictions
reference = average_predictions(predictions[:num_refs])
else:
raise ValueError('`num_ref_predictions` must be positive or None, '
'%s given' % num_refs)
# init the error with the max. possible value (i.e. prediction length)
best_error = len(reference)
# init the best_prediction with an empty array
best_prediction = np.empty(0)
# compare the (remaining) predictions with the reference prediction
for prediction in predictions[num_refs:]:
# calculate the squared error w.r.t. the reference prediction
error = np.sum((prediction - reference) ** 2.)
# chose the best activation
if error < best_error:
best_prediction = prediction
best_error = error
# return the best prediction
return best_prediction.ravel()
# function for detecting the beats based on the given dominant interval
[docs]def detect_beats(activations, interval, look_aside=0.2):
"""
Detects the beats in the given activation function as in [1]_.
Parameters
----------
activations : numpy array
Beat activations.
interval : int
Look for the next beat each `interval` frames.
look_aside : float
Look this fraction of the `interval` to each side to detect the beats.
Returns
-------
numpy array
Beat positions [frames].
Notes
-----
A Hamming window of 2 * `look_aside` * `interval` is applied around the
position where the beat is expected to prefer beats closer to the centre.
References
----------
.. [1] Sebastian Böck and Markus Schedl,
"Enhanced Beat Tracking with Context-Aware Neural Networks",
Proceedings of the 14th International Conference on Digital Audio
Effects (DAFx), 2011.
"""
# TODO: make this faster!
sys.setrecursionlimit(len(activations))
# always look at least 1 frame to each side
frames_look_aside = max(1, int(interval * look_aside))
win = np.hamming(2 * frames_look_aside)
# list to be filled with beat positions from inside the recursive function
positions = []
def recursive(position):
"""
Recursively detect the next beat.
Parameters
----------
position : int
Start at this position.
"""
# detect the nearest beat around the actual position
act = signal_frame(activations, position, frames_look_aside * 2, 1)
# apply a filtering window to prefer beats closer to the centre
act = np.multiply(act, win)
# search max
if np.argmax(act) > 0:
# maximum found, take that position
position = np.argmax(act) + position - frames_look_aside
# add the found position
positions.append(position)
# go to the next beat, until end is reached
if position + interval < len(activations):
recursive(position + interval)
else:
return
# calculate the beats for each start position (up to the interval length)
sums = np.zeros(interval)
for i in range(interval):
positions = []
# detect the beats for this start position
recursive(i)
# calculate the sum of the activations at the beat positions
sums[i] = np.sum(activations[positions])
# take the winning start position
start_position = np.argmax(sums)
# and calc the beats for this start position
positions = []
recursive(start_position)
# return indices
return np.array(positions)
# classes for detecting/tracking of beat inside a beat activation function
[docs]class BeatTrackingProcessor(Processor):
"""
Track the beats according to previously determined (local) tempo by
iteratively aligning them around the estimated position [1]_.
Parameters
----------
look_aside : float, optional
Look this fraction of the estimated beat interval to each side of the
assumed next beat position to look for the most likely position of the
next beat.
look_ahead : float, optional
Look `look_ahead` seconds in both directions to determine the local
tempo and align the beats accordingly.
tempo_estimator : :class:`TempoEstimationProcessor`, optional
Use this processor to estimate the (local) tempo. If 'None' a default
tempo estimator will be created and used.
fps : float, optional
Frames per second.
kwargs : dict, optional
Keyword arguments passed to
:class:`madmom.features.tempo.TempoEstimationProcessor` if no
`tempo_estimator` was given.
Notes
-----
If `look_ahead` is not set, a constant tempo throughout the whole piece
is assumed. If `look_ahead` is set, the local tempo (in a range +/-
`look_ahead` seconds around the actual position) is estimated and then
the next beat is tracked accordingly. This procedure is repeated from
the new position to the end of the piece.
Instead of the auto-correlation based method for tempo estimation proposed
in [1]_, it uses a comb filter based method [2]_ per default. The behaviour
can be controlled with the `tempo_method` parameter.
References
----------
.. [1] Sebastian Böck and Markus Schedl,
"Enhanced Beat Tracking with Context-Aware Neural Networks",
Proceedings of the 14th International Conference on Digital Audio
Effects (DAFx), 2011.
.. [2] Sebastian Böck, Florian Krebs and Gerhard Widmer,
"Accurate Tempo Estimation based on Recurrent Neural Networks and
Resonating Comb Filters",
Proceedings of the 16th International Society for Music Information
Retrieval Conference (ISMIR), 2015.
Examples
--------
Create a BeatTrackingProcessor. The returned array represents the positions
of the beats in seconds, thus the expected sampling rate has to be given.
>>> proc = BeatTrackingProcessor(fps=100)
>>> proc # doctest: +ELLIPSIS
<madmom.features.beats.BeatTrackingProcessor object at 0x...>
Call this BeatTrackingProcessor with the beat activation function returned
by RNNBeatProcessor to obtain the beat positions.
>>> act = RNNBeatProcessor()('tests/data/audio/sample.wav')
>>> proc(act)
array([0.11, 0.45, 0.79, 1.13, 1.47, 1.81, 2.15, 2.49])
"""
LOOK_ASIDE = 0.2
LOOK_AHEAD = 10.
def __init__(self, look_aside=LOOK_ASIDE, look_ahead=LOOK_AHEAD, fps=None,
tempo_estimator=None, **kwargs):
# save variables
self.look_aside = look_aside
self.look_ahead = look_ahead
self.fps = fps
# tempo estimator
if tempo_estimator is None:
# import the TempoEstimation here otherwise we have a loop
from .tempo import TempoEstimationProcessor
# create default tempo estimator
tempo_estimator = TempoEstimationProcessor(fps=fps, **kwargs)
self.tempo_estimator = tempo_estimator
[docs] def process(self, activations, **kwargs):
"""
Detect the beats in the given activation function.
Parameters
----------
activations : numpy array
Beat activation function.
Returns
-------
beats : numpy array
Detected beat positions [seconds].
"""
# smooth activations
act_smooth = int(self.fps * self.tempo_estimator.act_smooth)
activations = smooth_signal(activations, act_smooth)
# TODO: refactor interval stuff to use TempoEstimation
# if look_ahead is not defined, assume a global tempo
if self.look_ahead is None:
# create a interval histogram
histogram = self.tempo_estimator.interval_histogram(activations)
# get the dominant interval
interval = self.tempo_estimator.dominant_interval(histogram)
# detect beats based on this interval
detections = detect_beats(activations, interval, self.look_aside)
else:
# allow varying tempo
look_ahead_frames = int(self.look_ahead * self.fps)
# detect the beats
detections = []
pos = 0
# TODO: make this _much_ faster!
while pos < len(activations):
# look N frames around the actual position
act = signal_frame(activations, pos, look_ahead_frames * 2, 1)
# create a interval histogram
histogram = self.tempo_estimator.interval_histogram(act)
# get the dominant interval
interval = self.tempo_estimator.dominant_interval(histogram)
# add the offset (i.e. the new detected start position)
positions = detect_beats(act, interval, self.look_aside)
# correct the beat positions
positions += pos - look_ahead_frames
# remove all positions < already detected beats + min_interval
next_pos = (detections[-1] + self.tempo_estimator.min_interval
if detections else 0)
positions = positions[positions >= next_pos]
# search the closest beat to the predicted beat position
pos = positions[(np.abs(positions - pos)).argmin()]
# append to the beats
detections.append(pos)
pos += interval
# convert detected beats to a list of timestamps
detections = np.array(detections) / float(self.fps)
# remove beats with negative times and return them
return detections[np.searchsorted(detections, 0):]
[docs] @staticmethod
def add_arguments(parser, look_aside=LOOK_ASIDE,
look_ahead=LOOK_AHEAD):
"""
Add beat tracking related arguments to an existing parser.
Parameters
----------
parser : argparse parser instance
Existing argparse parser object.
look_aside : float, optional
Look this fraction of the estimated beat interval to each side of
the assumed next beat position to look for the most likely position
of the next beat.
look_ahead : float, optional
Look `look_ahead` seconds in both directions to determine the local
tempo and align the beats accordingly.
Returns
-------
parser_group : argparse argument group
Beat tracking argument parser group.
Notes
-----
Parameters are included in the group only if they are not 'None'.
"""
# add beat detection related options to the existing parser
g = parser.add_argument_group('beat detection arguments')
# TODO: unify look_aside with CRFBeatDetection's interval_sigma
if look_aside is not None:
g.add_argument('--look_aside', action='store', type=float,
default=look_aside,
help='look this fraction of a beat interval to '
'each side of the assumed next beat position '
'to look for the most likely position of the '
'next beat [default=%(default).2f]')
if look_ahead is not None:
g.add_argument('--look_ahead', action='store', type=float,
default=look_ahead,
help='look this many seconds in both directions '
'to determine the local tempo and align the '
'beats accordingly [default=%(default).2f]')
# return the argument group so it can be modified if needed
return g
[docs]class BeatDetectionProcessor(BeatTrackingProcessor):
"""
Class for detecting beats according to the previously determined global
tempo by iteratively aligning them around the estimated position [1]_.
Parameters
----------
look_aside : float
Look this fraction of the estimated beat interval to each side of the
assumed next beat position to look for the most likely position of the
next beat.
fps : float, optional
Frames per second.
Notes
-----
A constant tempo throughout the whole piece is assumed.
Instead of the auto-correlation based method for tempo estimation proposed
in [1]_, it uses a comb filter based method [2]_ per default. The behaviour
can be controlled with the `tempo_method` parameter.
See Also
--------
:class:`BeatTrackingProcessor`
References
----------
.. [1] Sebastian Böck and Markus Schedl,
"Enhanced Beat Tracking with Context-Aware Neural Networks",
Proceedings of the 14th International Conference on Digital Audio
Effects (DAFx), 2011.
.. [2] Sebastian Böck, Florian Krebs and Gerhard Widmer,
"Accurate Tempo Estimation based on Recurrent Neural Networks and
Resonating Comb Filters",
Proceedings of the 16th International Society for Music Information
Retrieval Conference (ISMIR), 2015.
Examples
--------
Create a BeatDetectionProcessor. The returned array represents the
positions of the beats in seconds, thus the expected sampling rate has to
be given.
>>> proc = BeatDetectionProcessor(fps=100)
>>> proc # doctest: +ELLIPSIS
<madmom.features.beats.BeatDetectionProcessor object at 0x...>
Call this BeatDetectionProcessor with the beat activation function returned
by RNNBeatProcessor to obtain the beat positions.
>>> act = RNNBeatProcessor()('tests/data/audio/sample.wav')
>>> proc(act)
array([0.11, 0.45, 0.79, 1.13, 1.47, 1.81, 2.15, 2.49])
"""
LOOK_ASIDE = 0.2
def __init__(self, look_aside=LOOK_ASIDE, fps=None, **kwargs):
super(BeatDetectionProcessor, self).__init__(look_aside=look_aside,
look_ahead=None, fps=fps,
**kwargs)
def _process_crf(process_tuple):
"""
Extract the best beat sequence for a piece.
This proxy function is necessary to process different intervals in parallel
using the multiprocessing module.
Parameters
----------
process_tuple : tuple
Tuple with (activations, dominant_interval, allowed deviation from the
dominant interval per beat).
Returns
-------
beats : numpy array
Extracted beat positions [frames].
log_prob : float
Log probability of the beat sequence.
"""
# pylint: disable=no-name-in-module
from .beats_crf import best_sequence
# activations, dominant_interval, interval_sigma = process_tuple
return best_sequence(*process_tuple)
[docs]class CRFBeatDetectionProcessor(BeatTrackingProcessor):
"""
Conditional Random Field Beat Detection.
Tracks the beats according to the previously determined global tempo using
a conditional random field (CRF) model.
Parameters
----------
interval_sigma : float, optional
Allowed deviation from the dominant beat interval per beat.
use_factors : bool, optional
Use dominant interval multiplied by factors instead of intervals
estimated by tempo estimator.
num_intervals : int, optional
Maximum number of estimated intervals to try.
factors : list or numpy array, optional
Factors of the dominant interval to try.
References
----------
.. [1] Filip Korzeniowski, Sebastian Böck and Gerhard Widmer,
"Probabilistic Extraction of Beat Positions from a Beat Activation
Function",
Proceedings of the 15th International Society for Music Information
Retrieval Conference (ISMIR), 2014.
Examples
--------
Create a CRFBeatDetectionProcessor. The returned array represents the
positions of the beats in seconds, thus the expected sampling rate has to
be given.
>>> proc = CRFBeatDetectionProcessor(fps=100)
>>> proc # doctest: +ELLIPSIS
<madmom.features.beats.CRFBeatDetectionProcessor object at 0x...>
Call this BeatDetectionProcessor with the beat activation function returned
by RNNBeatProcessor to obtain the beat positions.
>>> act = RNNBeatProcessor()('tests/data/audio/sample.wav')
>>> proc(act)
array([0.09, 0.79, 1.49])
"""
INTERVAL_SIGMA = 0.18
USE_FACTORS = False
FACTORS = np.array([0.5, 0.67, 1.0, 1.5, 2.0])
NUM_INTERVALS = 5
# tempo defaults
MIN_BPM = 20
MAX_BPM = 240
ACT_SMOOTH = 0.09
HIST_SMOOTH = 7
def __init__(self, interval_sigma=INTERVAL_SIGMA, use_factors=USE_FACTORS,
num_intervals=NUM_INTERVALS, factors=FACTORS, **kwargs):
super(CRFBeatDetectionProcessor, self).__init__(**kwargs)
# save parameters
self.interval_sigma = interval_sigma
self.use_factors = use_factors
self.num_intervals = num_intervals
self.factors = factors
# get num_threads from kwargs
num_threads = min(len(factors) if use_factors else num_intervals,
kwargs.get('num_threads', 1))
# init a pool of workers (if needed)
self.map = map
if num_threads != 1:
import multiprocessing as mp
self.map = mp.Pool(num_threads).map
[docs] def process(self, activations, **kwargs):
"""
Detect the beats in the given activation function.
Parameters
----------
activations : numpy array
Beat activation function.
Returns
-------
numpy array
Detected beat positions [seconds].
"""
import itertools as it
# estimate the tempo
tempi = self.tempo_estimator.process(activations)
intervals = self.fps * 60. / tempi[:, 0]
# compute possible intervals
if self.use_factors:
# use the dominant interval with different factors
possible_intervals = [int(intervals[0] * f) for f in self.factors]
possible_intervals = [i for i in possible_intervals if
self.tempo_estimator.max_interval >= i >=
self.tempo_estimator.min_interval]
else:
# take the top n intervals from the tempo estimator
possible_intervals = intervals[:self.num_intervals]
# sort and start from the greatest interval
possible_intervals.sort()
possible_intervals = [int(i) for i in possible_intervals[::-1]]
# smooth activations
act_smooth = int(self.fps * self.tempo_estimator.act_smooth)
activations = smooth_signal(activations, act_smooth)
# since the cython code uses memory views, we need to make sure that
# the activations are C-contiguous and of C-type float (np.float32)
contiguous_act = np.ascontiguousarray(activations, dtype=np.float32)
results = list(self.map(
_process_crf, zip(it.repeat(contiguous_act), possible_intervals,
it.repeat(self.interval_sigma))))
# normalize their probabilities
normalized_seq_probabilities = np.array([r[1] / r[0].shape[0]
for r in results])
# pick the best one
best_seq = results[normalized_seq_probabilities.argmax()][0]
# convert the detected beat positions to seconds and return them
return best_seq.astype(np.float) / self.fps
[docs] @staticmethod
def add_arguments(parser, interval_sigma=INTERVAL_SIGMA,
use_factors=USE_FACTORS, num_intervals=NUM_INTERVALS,
factors=FACTORS):
"""
Add CRFBeatDetection related arguments to an existing parser.
Parameters
----------
parser : argparse parser instance
Existing argparse parser object.
interval_sigma : float, optional
allowed deviation from the dominant beat interval per beat
use_factors : bool, optional
use dominant interval multiplied by factors instead of intervals
estimated by tempo estimator
num_intervals : int, optional
max number of estimated intervals to try
factors : list or numpy array, optional
factors of the dominant interval to try
Returns
-------
parser_group : argparse argument group
CRF beat tracking argument parser group.
"""
# pylint: disable=arguments-differ
from ..utils import OverrideDefaultListAction
# add CRF related arguments
g = parser.add_argument_group('conditional random field arguments')
g.add_argument('--interval_sigma', action='store', type=float,
default=interval_sigma,
help='allowed deviation from the dominant interval '
'[default=%(default).2f]')
g.add_argument('--use_factors', action='store_true',
default=use_factors,
help='use dominant interval multiplied with factors '
'instead of multiple estimated intervals '
'[default=%(default)s]')
g.add_argument('--num_intervals', action='store', type=int,
default=num_intervals, dest='num_intervals',
help='number of estimated intervals to try '
'[default=%(default)s]')
g.add_argument('--factors', action=OverrideDefaultListAction,
default=factors, type=float, sep=',',
help='(comma separated) list with factors of dominant '
'interval to try [default=%(default)s]')
return g
[docs]class DBNBeatTrackingProcessor(OnlineProcessor):
"""
Beat tracking with RNNs and a dynamic Bayesian network (DBN) approximated
by a Hidden Markov Model (HMM).
Parameters
----------
min_bpm : float, optional
Minimum tempo used for beat tracking [bpm].
max_bpm : float, optional
Maximum tempo used for beat tracking [bpm].
num_tempi : int, optional
Number of tempi to model; if set, limit the number of tempi and use a
log spacing, otherwise a linear spacing.
transition_lambda : float, optional
Lambda for the exponential tempo change distribution (higher values
prefer a constant tempo from one beat to the next one).
observation_lambda : int, optional
Split one beat period into `observation_lambda` parts, the first
representing beat states and the remaining non-beat states.
threshold : float, optional
Threshold the observations before Viterbi decoding.
correct : bool, optional
Correct the beats (i.e. align them to the nearest peak of the beat
activation function).
fps : float, optional
Frames per second.
online : bool, optional
Use the forward algorithm (instead of Viterbi) to decode the beats.
Notes
-----
Instead of the originally proposed state space and transition model for
the DBN [1]_, the more efficient version proposed in [2]_ is used.
References
----------
.. [1] Sebastian Böck, Florian Krebs and Gerhard Widmer,
"A Multi-Model Approach to Beat Tracking Considering Heterogeneous
Music Styles",
Proceedings of the 15th International Society for Music Information
Retrieval Conference (ISMIR), 2014.
.. [2] Florian Krebs, Sebastian Böck and Gerhard Widmer,
"An Efficient State Space Model for Joint Tempo and Meter Tracking",
Proceedings of the 16th International Society for Music Information
Retrieval Conference (ISMIR), 2015.
Examples
--------
Create a DBNBeatTrackingProcessor. The returned array represents the
positions of the beats in seconds, thus the expected sampling rate has to
be given.
>>> proc = DBNBeatTrackingProcessor(fps=100)
>>> proc # doctest: +ELLIPSIS
<madmom.features.beats.DBNBeatTrackingProcessor object at 0x...>
Call this DBNBeatTrackingProcessor with the beat activation function
returned by RNNBeatProcessor to obtain the beat positions.
>>> act = RNNBeatProcessor()('tests/data/audio/sample.wav')
>>> proc(act)
array([0.1 , 0.45, 0.8 , 1.12, 1.48, 1.8 , 2.15, 2.49])
"""
MIN_BPM = 55.
MAX_BPM = 215.
NUM_TEMPI = None
TRANSITION_LAMBDA = 100
OBSERVATION_LAMBDA = 16
THRESHOLD = 0
CORRECT = True
def __init__(self, min_bpm=MIN_BPM, max_bpm=MAX_BPM, num_tempi=NUM_TEMPI,
transition_lambda=TRANSITION_LAMBDA,
observation_lambda=OBSERVATION_LAMBDA, correct=CORRECT,
threshold=THRESHOLD, fps=None, online=False, **kwargs):
# pylint: disable=unused-argument
# pylint: disable=no-name-in-module
from .beats_hmm import (BeatStateSpace, BeatTransitionModel,
RNNBeatTrackingObservationModel)
from ..ml.hmm import HiddenMarkovModel
# convert timing information to construct a beat state space
min_interval = 60. * fps / max_bpm
max_interval = 60. * fps / min_bpm
self.st = BeatStateSpace(min_interval, max_interval, num_tempi)
# transition model
self.tm = BeatTransitionModel(self.st, transition_lambda)
# observation model
self.om = RNNBeatTrackingObservationModel(self.st, observation_lambda)
# instantiate a HMM
self.hmm = HiddenMarkovModel(self.tm, self.om, None)
# save variables
self.correct = correct
self.threshold = threshold
self.fps = fps
self.min_bpm = min_bpm
self.max_bpm = max_bpm
# kepp state in online mode
self.online = online
# TODO: refactor the visualisation stuff
if self.online:
self.visualize = kwargs.get('verbose', False)
self.counter = 0
self.beat_counter = 0
self.strength = 0
self.last_beat = 0
self.tempo = 0
[docs] def reset(self):
"""Reset the DBNBeatTrackingProcessor."""
# pylint: disable=attribute-defined-outside-init
# reset the HMM
self.hmm.reset()
# reset other variables
self.counter = 0
self.beat_counter = 0
self.strength = 0
self.last_beat = 0
self.tempo = 0
[docs] def process_offline(self, activations, **kwargs):
"""
Detect the beats in the given activation function with Viterbi
decoding.
Parameters
----------
activations : numpy array
Beat activation function.
Returns
-------
beats : numpy array
Detected beat positions [seconds].
"""
# init the beats to return and the offset
beats = np.empty(0, dtype=np.int)
first = 0
# use only the activations > threshold
if self.threshold:
idx = np.nonzero(activations >= self.threshold)[0]
if idx.any():
first = max(first, np.min(idx))
last = min(len(activations), np.max(idx) + 1)
else:
last = first
activations = activations[first:last]
# return the beats if no activations given / remain after thresholding
if not activations.any():
return beats
# get the best state path by calling the viterbi algorithm
path, _ = self.hmm.viterbi(activations)
# correct the beat positions if needed
if self.correct:
# for each detection determine the "beat range", i.e. states where
# the pointers of the observation model are 1
beat_range = self.om.pointers[path]
# get all change points between True and False
idx = np.nonzero(np.diff(beat_range))[0] + 1
# if the first frame is in the beat range, add a change at frame 0
if beat_range[0]:
idx = np.r_[0, idx]
# if the last frame is in the beat range, append the length of the
# array
if beat_range[-1]:
idx = np.r_[idx, beat_range.size]
# iterate over all regions
if idx.any():
for left, right in idx.reshape((-1, 2)):
# pick the frame with the highest activations value
peak = np.argmax(activations[left:right]) + left
beats = np.hstack((beats, peak))
else:
# just take the frames with the smallest beat state values
from scipy.signal import argrelmin
beats = argrelmin(self.st.state_positions[path], mode='wrap')[0]
# recheck if they are within the "beat range", i.e. the pointers
# of the observation model for that state must be 1
# Note: interpolation and alignment of the beats to be at state 0
# does not improve results over this simple method
beats = beats[self.om.pointers[path[beats]] == 1]
# convert the detected beats to seconds and return them
return (beats + first) / float(self.fps)
[docs] def process_online(self, activations, reset=True, **kwargs):
"""
Detect the beats in the given activation function with the forward
algorithm.
Parameters
----------
activations : numpy array
Beat activation for a single frame.
reset : bool, optional
Reset the DBNBeatTrackingProcessor to its initial state before
processing.
Returns
-------
beats : numpy array
Detected beat position [seconds].
"""
# reset to initial state
if reset:
self.reset()
# use forward path to get best state
fwd = self.hmm.forward(activations, reset=reset)
# choose the best state for each step
states = np.argmax(fwd, axis=1)
# decide which time steps are beats
beats = self.om.pointers[states] == 1
# the positions inside the beats
positions = self.st.state_positions[states]
# visualisation stuff (only when called frame by frame)
if self.visualize and len(activations) == 1:
beat_length = 80
display = [' '] * beat_length
display[int(positions * beat_length)] = '*'
# activation strength indicator
strength_length = 10
self.strength = int(max(self.strength, activations * 10))
display.append('| ')
display.extend(['*'] * self.strength)
display.extend([' '] * (strength_length - self.strength))
# reduce the displayed strength every couple of frames
if self.counter % 5 == 0:
self.strength -= 1
# beat indicator
if beats:
self.beat_counter = 3
if self.beat_counter > 0:
display.append('| X ')
else:
display.append('| ')
self.beat_counter -= 1
# display tempo
display.append('| %5.1f | ' % self.tempo)
sys.stderr.write('\r%s' % ''.join(display))
sys.stderr.flush()
# forward path often reports multiple beats close together, thus report
# only beats more than the minimum interval apart
beats_ = []
for frame in np.nonzero(beats)[0]:
cur_beat = (frame + self.counter) / float(self.fps)
next_beat = self.last_beat + 60. / self.max_bpm
# FIXME: this skips the first beat, but maybe this has a positive
# effect on the overall beat tracking accuracy
if cur_beat >= next_beat:
# update tempo
self.tempo = 60. / (cur_beat - self.last_beat)
# update last beat
self.last_beat = cur_beat
# append to beats
beats_.append(cur_beat)
# increase counter
self.counter += len(activations)
# return beat(s)
return np.array(beats_)
process_forward = process_online
process_viterbi = process_offline
[docs] @staticmethod
def add_arguments(parser, min_bpm=MIN_BPM, max_bpm=MAX_BPM,
num_tempi=NUM_TEMPI, transition_lambda=TRANSITION_LAMBDA,
observation_lambda=OBSERVATION_LAMBDA,
threshold=THRESHOLD, correct=CORRECT):
"""
Add DBN related arguments to an existing parser object.
Parameters
----------
parser : argparse parser instance
Existing argparse parser object.
min_bpm : float, optional
Minimum tempo used for beat tracking [bpm].
max_bpm : float, optional
Maximum tempo used for beat tracking [bpm].
num_tempi : int, optional
Number of tempi to model; if set, limit the number of tempi and use
a log spacing, otherwise a linear spacing.
transition_lambda : float, optional
Lambda for the exponential tempo change distribution (higher values
prefer a constant tempo over a tempo change from one beat to the
next one).
observation_lambda : float, optional
Split one beat period into `observation_lambda` parts, the first
representing beat states and the remaining non-beat states.
threshold : float, optional
Threshold the observations before Viterbi decoding.
correct : bool, optional
Correct the beats (i.e. align them to the nearest peak of the beat
activation function).
Returns
-------
parser_group : argparse argument group
DBN beat tracking argument parser group
"""
# pylint: disable=arguments-differ
# add DBN parser group
g = parser.add_argument_group('dynamic Bayesian Network arguments')
# add a transition parameters
g.add_argument('--min_bpm', action='store', type=float,
default=min_bpm,
help='minimum tempo [bpm, default=%(default).2f]')
g.add_argument('--max_bpm', action='store', type=float,
default=max_bpm,
help='maximum tempo [bpm, default=%(default).2f]')
g.add_argument('--num_tempi', action='store', type=int,
default=num_tempi,
help='limit the number of tempi; if set, align the '
'tempi with a log spacing, otherwise linearly')
g.add_argument('--transition_lambda', action='store', type=float,
default=transition_lambda,
help='lambda of the tempo transition distribution; '
'higher values prefer a constant tempo over a '
'tempo change from one beat to the next one '
'[default=%(default).1f]')
# observation model stuff
g.add_argument('--observation_lambda', action='store', type=float,
default=observation_lambda,
help='split one beat period into N parts, the first '
'representing beat states and the remaining '
'non-beat states [default=%(default)i]')
g.add_argument('-t', dest='threshold', action='store', type=float,
default=threshold,
help='threshold the observations before Viterbi '
'decoding [default=%(default).2f]')
# option to correct the beat positions
if correct:
g.add_argument('--no_correct', dest='correct',
action='store_false', default=correct,
help='do not correct the beat positions (i.e. do '
'not align them to the nearest peak of the '
'beat activation function)')
else:
g.add_argument('--correct', dest='correct',
action='store_true', default=correct,
help='correct the beat positions (i.e. align them '
'to the nearest peak of the beat activation'
'function)')
# return the argument group so it can be modified if needed
return g