Source code for madmom.features.downbeats

# encoding: utf-8
# pylint: disable=no-member
# pylint: disable=invalid-name
# pylint: disable=too-many-arguments
"""
This module contains downbeat and bar tracking related functionality.

"""

from __future__ import absolute_import, division, print_function

import sys
import warnings

import numpy as np

from .beats_hmm import (BarStateSpace, BarTransitionModel,
                        GMMPatternTrackingObservationModel,
                        MultiPatternStateSpace,
                        MultiPatternTransitionModel,
                        RNNBeatTrackingObservationModel,
                        RNNDownBeatTrackingObservationModel, )
from ..ml.hmm import HiddenMarkovModel
from ..processors import ParallelProcessor, Processor, SequentialProcessor
from ..utils import string_types


# downbeat tracking, i.e. track beats and downbeats directly from signal
[docs]class RNNDownBeatProcessor(SequentialProcessor): """ Processor to get a joint beat and downbeat activation function from multiple RNNs. References ---------- .. [1] Sebastian Böck, Florian Krebs and Gerhard Widmer, "Joint Beat and Downbeat Tracking with Recurrent Neural Networks" Proceedings of the 17th International Society for Music Information Retrieval Conference (ISMIR), 2016. Examples -------- Create a RNNDownBeatProcessor and pass a file through the processor. The returned 2d array represents the probabilities at each frame, sampled at 100 frames per second. The columns represent 'beat' and 'downbeat'. >>> proc = RNNDownBeatProcessor() >>> proc # doctest: +ELLIPSIS <madmom.features.downbeats.RNNDownBeatProcessor object at 0x...> >>> proc('tests/data/audio/sample.wav') ... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS array([[0.00011, 0.00037], [0.00008, 0.00043], ..., [0.00791, 0.00169], [0.03425, 0.00494]], dtype=float32) """ def __init__(self, **kwargs): # pylint: disable=unused-argument from functools import partial from ..audio.signal import SignalProcessor, FramedSignalProcessor from ..audio.stft import ShortTimeFourierTransformProcessor from ..audio.spectrogram import ( FilteredSpectrogramProcessor, LogarithmicSpectrogramProcessor, SpectrogramDifferenceProcessor) from ..ml.nn import NeuralNetworkEnsemble from ..models import DOWNBEATS_BLSTM # define pre-processing chain sig = SignalProcessor(num_channels=1, sample_rate=44100) # process the multi-resolution spec & diff in parallel multi = ParallelProcessor([]) frame_sizes = [1024, 2048, 4096] num_bands = [3, 6, 12] for frame_size, num_bands in zip(frame_sizes, num_bands): frames = FramedSignalProcessor(frame_size=frame_size, fps=100) stft = ShortTimeFourierTransformProcessor() # caching FFT window filt = FilteredSpectrogramProcessor( num_bands=num_bands, fmin=30, fmax=17000, norm_filters=True) spec = LogarithmicSpectrogramProcessor(mul=1, add=1) diff = SpectrogramDifferenceProcessor( diff_ratio=0.5, positive_diffs=True, stack_diffs=np.hstack) # process each frame size with spec and diff sequentially multi.append(SequentialProcessor((frames, stft, filt, spec, diff))) # stack the features and processes everything sequentially pre_processor = SequentialProcessor((sig, multi, np.hstack)) # process the pre-processed signal with a NN ensemble nn = NeuralNetworkEnsemble.load(DOWNBEATS_BLSTM, **kwargs) # use only the beat & downbeat (i.e. remove non-beat) activations act = partial(np.delete, obj=0, axis=1) # instantiate a SequentialProcessor super(RNNDownBeatProcessor, self).__init__((pre_processor, nn, act))
def _process_dbn(process_tuple): """ Extract the best path through the state space in an observation sequence. This proxy function is necessary to process different sequences in parallel using the multiprocessing module. Parameters ---------- process_tuple : tuple Tuple with (HMM, observations). Returns ------- path : numpy array Best path through the state space. log_prob : float Log probability of the path. """ # pylint: disable=no-name-in-module return process_tuple[0].viterbi(process_tuple[1])
[docs]class DBNDownBeatTrackingProcessor(Processor): """ Downbeat tracking with RNNs and a dynamic Bayesian network (DBN) approximated by a Hidden Markov Model (HMM). Parameters ---------- beats_per_bar : int or list Number of beats per bar to be modeled. Can be either a single number or a list or array with bar lengths (in beats). min_bpm : float or list, optional Minimum tempo used for beat tracking [bpm]. If a list is given, each item corresponds to the number of beats per bar at the same position. max_bpm : float or list, optional Maximum tempo used for beat tracking [bpm]. If a list is given, each item corresponds to the number of beats per bar at the same position. num_tempi : int or list, optional Number of tempi to model; if set, limit the number of tempi and use a log spacing, otherwise a linear spacing. If a list is given, each item corresponds to the number of beats per bar at the same position. transition_lambda : float or list, optional Lambda for the exponential tempo change distribution (higher values prefer a constant tempo from one beat to the next one). If a list is given, each item corresponds to the number of beats per bar at the same position. observation_lambda : int, optional Split one (down-)beat period into `observation_lambda` parts, the first representing (down-)beat states and the remaining non-beat states. threshold : float, optional Threshold the RNN (down-)beat activations before Viterbi decoding. correct : bool, optional Correct the beats (i.e. align them to the nearest peak of the (down-)beat activation function). fps : float, optional Frames per second. References ---------- .. [1] Sebastian Böck, Florian Krebs and Gerhard Widmer, "Joint Beat and Downbeat Tracking with Recurrent Neural Networks" Proceedings of the 17th International Society for Music Information Retrieval Conference (ISMIR), 2016. Examples -------- Create a DBNDownBeatTrackingProcessor. The returned array represents the positions of the beats and their position inside the bar. The position is given in seconds, thus the expected sampling rate is needed. The position inside the bar follows the natural counting and starts at 1. The number of beats per bar which should be modelled must be given, all other parameters (e.g. tempo range) are optional but must have the same length as `beats_per_bar`, i.e. must be given for each bar length. >>> proc = DBNDownBeatTrackingProcessor(beats_per_bar=[3, 4], fps=100) >>> proc # doctest: +ELLIPSIS <madmom.features.downbeats.DBNDownBeatTrackingProcessor object at 0x...> Call this DBNDownBeatTrackingProcessor with the beat activation function returned by RNNDownBeatProcessor to obtain the beat positions. >>> act = RNNDownBeatProcessor()('tests/data/audio/sample.wav') >>> proc(act) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE array([[0.09, 1. ], [0.45, 2. ], ..., [2.14, 3. ], [2.49, 4. ]]) """ MIN_BPM = 55. MAX_BPM = 215. NUM_TEMPI = 60 TRANSITION_LAMBDA = 100 OBSERVATION_LAMBDA = 16 THRESHOLD = 0.05 CORRECT = True def __init__(self, beats_per_bar, min_bpm=MIN_BPM, max_bpm=MAX_BPM, num_tempi=NUM_TEMPI, transition_lambda=TRANSITION_LAMBDA, observation_lambda=OBSERVATION_LAMBDA, threshold=THRESHOLD, correct=CORRECT, fps=None, **kwargs): # pylint: disable=unused-argument # pylint: disable=no-name-in-module # expand arguments to arrays beats_per_bar = np.array(beats_per_bar, ndmin=1) min_bpm = np.array(min_bpm, ndmin=1) max_bpm = np.array(max_bpm, ndmin=1) num_tempi = np.array(num_tempi, ndmin=1) transition_lambda = np.array(transition_lambda, ndmin=1) # make sure the other arguments are long enough by repeating them # TODO: check if they are of length 1? if len(min_bpm) != len(beats_per_bar): min_bpm = np.repeat(min_bpm, len(beats_per_bar)) if len(max_bpm) != len(beats_per_bar): max_bpm = np.repeat(max_bpm, len(beats_per_bar)) if len(num_tempi) != len(beats_per_bar): num_tempi = np.repeat(num_tempi, len(beats_per_bar)) if len(transition_lambda) != len(beats_per_bar): transition_lambda = np.repeat(transition_lambda, len(beats_per_bar)) if not (len(min_bpm) == len(max_bpm) == len(num_tempi) == len(beats_per_bar) == len(transition_lambda)): raise ValueError('`min_bpm`, `max_bpm`, `num_tempi`, `num_beats` ' 'and `transition_lambda` must all have the same ' 'length.') # get num_threads from kwargs num_threads = min(len(beats_per_bar), kwargs.get('num_threads', 1)) # init a pool of workers (if needed) self.map = map if num_threads != 1: import multiprocessing as mp self.map = mp.Pool(num_threads).map # convert timing information to construct a beat state space min_interval = 60. * fps / max_bpm max_interval = 60. * fps / min_bpm # model the different bar lengths self.hmms = [] for b, beats in enumerate(beats_per_bar): st = BarStateSpace(beats, min_interval[b], max_interval[b], num_tempi[b]) tm = BarTransitionModel(st, transition_lambda[b]) om = RNNDownBeatTrackingObservationModel(st, observation_lambda) self.hmms.append(HiddenMarkovModel(tm, om)) # save variables self.beats_per_bar = beats_per_bar self.threshold = threshold self.correct = correct self.fps = fps
[docs] def process(self, activations, **kwargs): """ Detect the (down-)beats in the given activation function. Parameters ---------- activations : numpy array, shape (num_frames, 2) Activation function with probabilities corresponding to beats and downbeats given in the first and second column, respectively. Returns ------- beats : numpy array, shape (num_beats, 2) Detected (down-)beat positions [seconds] and beat numbers. """ # pylint: disable=arguments-differ import itertools as it # use only the activations > threshold (init offset to be added later) first = 0 if self.threshold: idx = np.nonzero(activations >= self.threshold)[0] if idx.any(): first = max(first, np.min(idx)) last = min(len(activations), np.max(idx) + 1) else: last = first activations = activations[first:last] # return no beats if no activations given / remain after thresholding if not activations.any(): return np.empty((0, 2)) # (parallel) decoding of the activations with HMM results = list(self.map(_process_dbn, zip(self.hmms, it.repeat(activations)))) # choose the best HMM (highest log probability) best = np.argmax(np.asarray(results)[:, 1]) # the best path through the state space path, _ = results[best] # the state space and observation model of the best HMM st = self.hmms[best].transition_model.state_space om = self.hmms[best].observation_model # the positions inside the pattern (0..num_beats) positions = st.state_positions[path] # corresponding beats (add 1 for natural counting) beat_numbers = positions.astype(int) + 1 if self.correct: beats = np.empty(0, dtype=np.int) # for each detection determine the "beat range", i.e. states where # the pointers of the observation model are >= 1 beat_range = om.pointers[path] >= 1 # get all change points between True and False (cast to int before) idx = np.nonzero(np.diff(beat_range.astype(np.int)))[0] + 1 # if the first frame is in the beat range, add a change at frame 0 if beat_range[0]: idx = np.r_[0, idx] # if the last frame is in the beat range, append the length of the # array if beat_range[-1]: idx = np.r_[idx, beat_range.size] # iterate over all regions if idx.any(): for left, right in idx.reshape((-1, 2)): # pick the frame with the highest activations value # Note: we look for both beats and down-beat activations; # since np.argmax works on the flattened array, we # need to divide by 2 peak = np.argmax(activations[left:right]) // 2 + left beats = np.hstack((beats, peak)) else: # transitions are the points where the beat numbers change # FIXME: we might miss the first or last beat! # we could calculate the interval towards the beginning/end # to decide whether to include these points beats = np.nonzero(np.diff(beat_numbers))[0] + 1 # return the beat positions (converted to seconds) and beat numbers return np.vstack(((beats + first) / float(self.fps), beat_numbers[beats])).T
[docs] @staticmethod def add_arguments(parser, beats_per_bar, min_bpm=MIN_BPM, max_bpm=MAX_BPM, num_tempi=NUM_TEMPI, transition_lambda=TRANSITION_LAMBDA, observation_lambda=OBSERVATION_LAMBDA, threshold=THRESHOLD, correct=CORRECT): """ Add DBN downbeat tracking related arguments to an existing parser object. Parameters ---------- parser : argparse parser instance Existing argparse parser object. beats_per_bar : int or list, optional Number of beats per bar to be modeled. Can be either a single number or a list with bar lengths (in beats). min_bpm : float or list, optional Minimum tempo used for beat tracking [bpm]. If a list is given, each item corresponds to the number of beats per bar at the same position. max_bpm : float or list, optional Maximum tempo used for beat tracking [bpm]. If a list is given, each item corresponds to the number of beats per bar at the same position. num_tempi : int or list, optional Number of tempi to model; if set, limit the number of tempi and use a log spacing, otherwise a linear spacing. If a list is given, each item corresponds to the number of beats per bar at the same position. transition_lambda : float or list, optional Lambda for the exponential tempo change distribution (higher values prefer a constant tempo over a tempo change from one beat to the next one). If a list is given, each item corresponds to the number of beats per bar at the same position. observation_lambda : float, optional Split one (down-)beat period into `observation_lambda` parts, the first representing (down-)beat states and the remaining non-beat states. threshold : float, optional Threshold the RNN (down-)beat activations before Viterbi decoding. correct : bool, optional Correct the beats (i.e. align them to the nearest peak of the (down-)beat activation function). Returns ------- parser_group : argparse argument group DBN downbeat tracking argument parser group """ # pylint: disable=arguments-differ from ..utils import OverrideDefaultListAction # add DBN parser group g = parser.add_argument_group('dynamic Bayesian Network arguments') # add a transition parameters g.add_argument('--beats_per_bar', action=OverrideDefaultListAction, default=beats_per_bar, type=int, sep=',', help='number of beats per bar to be modeled (comma ' 'separated list of bar length in beats) ' '[default=%(default)s]') g.add_argument('--min_bpm', action=OverrideDefaultListAction, default=min_bpm, type=float, sep=',', help='minimum tempo (comma separated list with one ' 'value per bar length) [bpm, default=%(default)s]') g.add_argument('--max_bpm', action=OverrideDefaultListAction, default=max_bpm, type=float, sep=',', help='maximum tempo (comma separated list with one ' 'value per bar length) [bpm, default=%(default)s]') g.add_argument('--num_tempi', action=OverrideDefaultListAction, default=num_tempi, type=int, sep=',', help='limit the number of tempi; if set, align the ' 'tempi with log spacings, otherwise linearly ' '(comma separated list with one value per bar ' 'length) [default=%(default)s]') g.add_argument('--transition_lambda', action=OverrideDefaultListAction, default=transition_lambda, type=float, sep=',', help='lambda of the tempo transition distribution; ' 'higher values prefer a constant tempo over a ' 'tempo change from one beat to the next one (' 'comma separated list with one value per bar ' 'length) [default=%(default)s]') # observation model stuff g.add_argument('--observation_lambda', action='store', type=float, default=observation_lambda, help='split one (down-)beat period into N parts, the ' 'first representing beat states and the remaining ' 'non-beat states [default=%(default)i]') g.add_argument('-t', dest='threshold', action='store', type=float, default=threshold, help='threshold the observations before Viterbi ' 'decoding [default=%(default).2f]') # option to correct the beat positions if correct is True: g.add_argument('--no_correct', dest='correct', action='store_false', default=correct, help='do not correct the (down-)beat positions ' '(i.e. do not align them to the nearest peak ' 'of the (down-)beat activation function)') elif correct is False: g.add_argument('--correct', dest='correct', action='store_true', default=correct, help='correct the (down-)beat positions (i.e. ' 'align them to the nearest peak of the ' '(down-)beat activation function)') # add output format stuff g = parser.add_argument_group('output arguments') g.add_argument('--downbeats', action='store_true', default=False, help='output only the downbeats') # return the argument group so it can be modified if needed return g
[docs]class PatternTrackingProcessor(Processor): """ Pattern tracking with a dynamic Bayesian network (DBN) approximated by a Hidden Markov Model (HMM). Parameters ---------- pattern_files : list List of files with the patterns (including the fitted GMMs and information about the number of beats). min_bpm : list, optional Minimum tempi used for pattern tracking [bpm]. max_bpm : list, optional Maximum tempi used for pattern tracking [bpm]. num_tempi : int or list, optional Number of tempi to model; if set, limit the number of tempi and use a log spacings, otherwise a linear spacings. transition_lambda : float or list, optional Lambdas for the exponential tempo change distributions (higher values prefer constant tempi from one beat to the next one). fps : float, optional Frames per second. Notes ----- `min_bpm`, `max_bpm`, `num_tempo_states`, and `transition_lambda` must contain as many items as rhythmic patterns are modeled (i.e. length of `pattern_files`). If a single value is given for `num_tempo_states` and `transition_lambda`, this value is used for all rhythmic patterns. Instead of the originally proposed state space and transition model for the DBN [1]_, the more efficient version proposed in [2]_ is used. References ---------- .. [1] Florian Krebs, Sebastian Böck and Gerhard Widmer, "Rhythmic Pattern Modeling for Beat and Downbeat Tracking in Musical Audio", Proceedings of the 15th International Society for Music Information Retrieval Conference (ISMIR), 2013. .. [2] Florian Krebs, Sebastian Böck and Gerhard Widmer, "An Efficient State Space Model for Joint Tempo and Meter Tracking", Proceedings of the 16th International Society for Music Information Retrieval Conference (ISMIR), 2015. Examples -------- Create a PatternTrackingProcessor from the given pattern files. These pattern files include fitted GMMs for the observation model of the HMM. The returned array represents the positions of the beats and their position inside the bar. The position is given in seconds, thus the expected sampling rate is needed. The position inside the bar follows the natural counting and starts at 1. >>> from madmom.models import PATTERNS_BALLROOM >>> proc = PatternTrackingProcessor(PATTERNS_BALLROOM, fps=50) >>> proc # doctest: +ELLIPSIS <madmom.features.downbeats.PatternTrackingProcessor object at 0x...> Call this PatternTrackingProcessor with a multi-band spectrogram to obtain the beat and downbeat positions. The parameters of the spectrogram have to correspond to those used to fit the GMMs. >>> from madmom.audio.spectrogram import LogarithmicSpectrogramProcessor, \ SpectrogramDifferenceProcessor, MultiBandSpectrogramProcessor >>> from madmom.processors import SequentialProcessor >>> log = LogarithmicSpectrogramProcessor() >>> diff = SpectrogramDifferenceProcessor(positive_diffs=True) >>> mb = MultiBandSpectrogramProcessor(crossover_frequencies=[270]) >>> pre_proc = SequentialProcessor([log, diff, mb]) >>> act = pre_proc('tests/data/audio/sample.wav') >>> proc(act) # doctest: +ELLIPSIS array([[0.82, 4. ], [1.78, 1. ], ..., [3.7 , 3. ], [4.66, 4. ]]) """ MIN_BPM = (55, 60) MAX_BPM = (205, 225) NUM_TEMPI = None # Note: if multiple values are given, the individual values represent the # lambdas for each transition into the beat at this index position TRANSITION_LAMBDA = 100 def __init__(self, pattern_files, min_bpm=MIN_BPM, max_bpm=MAX_BPM, num_tempi=NUM_TEMPI, transition_lambda=TRANSITION_LAMBDA, fps=None, **kwargs): # pylint: disable=unused-argument # pylint: disable=no-name-in-module import pickle min_bpm = np.array(min_bpm, ndmin=1) max_bpm = np.array(max_bpm, ndmin=1) num_tempi = np.array(num_tempi, ndmin=1) transition_lambda = np.array(transition_lambda, ndmin=1) # make sure arguments are given for each pattern (expand if needed) if len(min_bpm) != len(pattern_files): min_bpm = np.repeat(min_bpm, len(pattern_files)) if len(max_bpm) != len(pattern_files): max_bpm = np.repeat(max_bpm, len(pattern_files)) if len(num_tempi) != len(pattern_files): num_tempi = np.repeat(num_tempi, len(pattern_files)) if len(transition_lambda) != len(pattern_files): transition_lambda = np.repeat(transition_lambda, len(pattern_files)) # check if all lists have the same length if not (len(min_bpm) == len(max_bpm) == len(num_tempi) == len(transition_lambda) == len(pattern_files)): raise ValueError('`min_bpm`, `max_bpm`, `num_tempi` and ' '`transition_lambda` must have the same length ' 'as number of patterns.') # save some variables self.fps = fps self.num_beats = [] # convert timing information to construct a state space min_interval = 60. * self.fps / np.asarray(max_bpm) max_interval = 60. * self.fps / np.asarray(min_bpm) # collect beat/bar state spaces, transition models, and GMMs state_spaces = [] transition_models = [] gmms = [] # check that at least one pattern is given if not pattern_files: raise ValueError('at least one rhythmical pattern must be given.') # load the patterns for p, pattern_file in enumerate(pattern_files): with open(pattern_file, 'rb') as f: # Python 2 and 3 behave differently try: # Python 3 pattern = pickle.load(f, encoding='latin1') except TypeError: # Python 2 doesn't have/need the encoding pattern = pickle.load(f) # get the fitted GMMs and number of beats gmms.append(pattern['gmms']) num_beats = pattern['num_beats'] self.num_beats.append(num_beats) # model each rhythmic pattern as a bar state_space = BarStateSpace(num_beats, min_interval[p], max_interval[p], num_tempi[p]) transition_model = BarTransitionModel(state_space, transition_lambda[p]) state_spaces.append(state_space) transition_models.append(transition_model) # create multi pattern state space, transition and observation model self.st = MultiPatternStateSpace(state_spaces) self.tm = MultiPatternTransitionModel(transition_models) self.om = GMMPatternTrackingObservationModel(gmms, self.st) # instantiate a HMM self.hmm = HiddenMarkovModel(self.tm, self.om, None)
[docs] def process(self, features, **kwargs): """ Detect the (down-)beats given the features. Parameters ---------- features : numpy array Multi-band spectral features. Returns ------- beats : numpy array, shape (num_beats, 2) Detected (down-)beat positions [seconds] and beat numbers. """ # pylint: disable=arguments-differ # get the best state path by calling the viterbi algorithm path, _ = self.hmm.viterbi(features) # the positions inside the pattern (0..num_beats) positions = self.st.state_positions[path] # corresponding beats (add 1 for natural counting) beat_numbers = positions.astype(int) + 1 # transitions are the points where the beat numbers change # FIXME: we might miss the first or last beat! # we could calculate the interval towards the beginning/end to # decide whether to include these points beat_positions = np.nonzero(np.diff(beat_numbers))[0] + 1 # return the beat positions (converted to seconds) and beat numbers return np.vstack((beat_positions / float(self.fps), beat_numbers[beat_positions])).T
[docs] @staticmethod def add_arguments(parser, pattern_files=None, min_bpm=MIN_BPM, max_bpm=MAX_BPM, num_tempi=NUM_TEMPI, transition_lambda=TRANSITION_LAMBDA): """ Add DBN related arguments for pattern tracking to an existing parser object. Parameters ---------- parser : argparse parser instance Existing argparse parser object. pattern_files : list Load the patterns from these files. min_bpm : list, optional Minimum tempi used for beat tracking [bpm]. max_bpm : list, optional Maximum tempi used for beat tracking [bpm]. num_tempi : int or list, optional Number of tempi to model; if set, limit the number of states and use log spacings, otherwise a linear spacings. transition_lambda : float or list, optional Lambdas for the exponential tempo change distribution (higher values prefer constant tempi from one beat to the next one). Returns ------- parser_group : argparse argument group Pattern tracking argument parser group Notes ----- `pattern_files`, `min_bpm`, `max_bpm`, `num_tempi`, and `transition_lambda` must have the same number of items. """ from ..utils import OverrideDefaultListAction # add GMM options if pattern_files is not None: g = parser.add_argument_group('GMM arguments') g.add_argument('--pattern_files', action=OverrideDefaultListAction, default=pattern_files, help='load the patterns (with the fitted GMMs) ' 'from these files (comma separated list)') # add HMM parser group g = parser.add_argument_group('dynamic Bayesian Network arguments') g.add_argument('--min_bpm', action=OverrideDefaultListAction, default=min_bpm, type=float, sep=',', help='minimum tempo (comma separated list with one ' 'value per pattern) [bpm, default=%(default)s]') g.add_argument('--max_bpm', action=OverrideDefaultListAction, default=max_bpm, type=float, sep=',', help='maximum tempo (comma separated list with one ' 'value per pattern) [bpm, default=%(default)s]') g.add_argument('--num_tempi', action=OverrideDefaultListAction, default=num_tempi, type=int, sep=',', help='limit the number of tempi; if set, align the ' 'tempi with log spacings, otherwise linearly ' '(comma separated list with one value per pattern)' ' [default=%(default)s]') g.add_argument('--transition_lambda', action=OverrideDefaultListAction, default=transition_lambda, type=float, sep=',', help='lambda of the tempo transition distribution; ' 'higher values prefer a constant tempo over a ' 'tempo change from one bar to the next one (comma ' 'separated list with one value per pattern) ' '[default=%(default)s]') # add output format stuff g = parser.add_argument_group('output arguments') g.add_argument('--downbeats', action='store_true', default=False, help='output only the downbeats') # return the argument group so it can be modified if needed return g
# bar tracking, i.e. track downbeats from signal given beat positions
[docs]class LoadBeatsProcessor(Processor): """ Load beat times from file or handle. """ def __init__(self, beats, files=None, beats_suffix=None, **kwargs): # pylint: disable=unused-argument from ..utils import search_files if isinstance(files, list) and beats_suffix is not None: # overwrite beats with the files matching the suffix beats = search_files(files, suffix=beats_suffix) self.mode = 'batch' else: self.mode = 'single' self.beats = beats self.beats_suffix = beats_suffix
[docs] def process(self, data=None, **kwargs): """ Load the beats from file (handle) or read them from STDIN. """ # pylint: disable=unused-argument if self.mode == 'single': return self.process_single() elif self.mode == 'batch': return self.process_batch(data) else: raise ValueError("don't know how to obtain the beats")
[docs] def process_single(self): """ Load the beats in bulk-mode (i.e. all at once) from the input stream or file. Returns ------- beats : numpy array Beat positions [seconds]. """ # pylint: disable=unused-argument from ..io import load_events return load_events(self.beats)
[docs] def process_batch(self, filename): """ Load beat times from file. First match the given input filename to the beat filenames, then load the beats. Parameters ---------- filename : str Input file name. Returns ------- beats : numpy array Beat positions [seconds]. Notes ----- Both the file names to search for the beats as well as the suffix to determine the beat files must be given at instantiation time. """ import os from ..utils import match_file if not isinstance(filename, string_types): raise SystemExit('Please supply a filename, not %s.' % filename) # select the matching beat file to a given input file from all files basename, ext = os.path.splitext(os.path.basename(filename)) matches = match_file(basename, self.beats, suffix=ext, match_suffix=self.beats_suffix) if not matches: raise SystemExit("can't find a beat file for %s" % filename) # load the beats and return them # TODO: Use load_beats function beats = np.loadtxt(matches[0]) if beats.ndim == 2: # only use beat times, omit the beat positions inside the bar beats = beats[:, 0] return beats
[docs] @staticmethod def add_arguments(parser, beats=sys.stdin, beats_suffix='.beats.txt'): """ Add beat loading related arguments to an existing parser. Parameters ---------- parser : argparse parser instance Existing argparse parser object. beats : FileType, optional Where to read the beats from ('single' mode). beats_suffix : str, optional Suffix of beat files ('batch' mode) Returns ------- argparse argument group Beat loading argument parser group. """ import argparse # add beat loading options to the existing parser g = parser.add_argument_group('beat loading arguments') g.add_argument('--beats', type=argparse.FileType('rb'), default=beats, help='where/how to read the beat positions from ' '[default: single: STDIN]') g.add_argument('--beats_suffix', type=str, default=beats_suffix, help='file suffix of the beat files [default: ' '%(default)s]') # return the argument group so it can be modified if needed return g
[docs]class SyncronizeFeaturesProcessor(Processor): """ Synchronize features to beats. First, divide a beat interval into `beat_subdivision` divisions. Then summarise all features that fall into one subdivision. If no feature value for a subdivision is found, it is set to 0. Parameters ---------- beat_subdivisions : int Number of subdivisions a beat is divided into. fps : float Frames per second. """ def __init__(self, beat_subdivisions, fps, **kwargs): # pylint: disable=unused-argument self.beat_subdivisions = beat_subdivisions self.fps = fps
[docs] def process(self, data, **kwargs): """ Synchronize features to beats. Average all feature values that fall into a window of beat duration / beat subdivisions, centered on the beat positions or interpolated subdivisions, starting with the first beat. Parameters ---------- data : tuple (features, beats) Tuple of two numpy arrays, the first containing features to be synchronized and second the beat times. Returns ------- numpy array (num beats - 1, beat subdivisions, features dim.) Beat synchronous features. """ features, beats = data # no beats, return immediately if beats.size == 0: return np.array([]), np.array([]) # beats can be 1D (only beat times) or 2D (times, position inside bar) if beats.ndim > 1: beats = beats[:, 0] # trim beat sequence while (float(len(features)) / self.fps) < beats[-1]: beats = beats[:-1] warnings.warn('Beat sequence too long compared to features.') # number of beats num_beats = len(beats) # feature dimension (make sure features are 2D) features = np.array(features.T, copy=False, ndmin=2).T feat_dim = features.shape[-1] # init a 3D feature aggregation array beat_features = np.zeros( (num_beats - 1, self.beat_subdivisions, feat_dim)) # start first beat 20ms before actual annotation beat_start = int(max(0, np.floor((beats[0] - 0.02) * self.fps))) # TODO: speed this up, could propably be done without a loop for i in range(num_beats - 1): # aggregate all feature values that fall into a window of # length = beat_duration / beat_subdivisions, centered on the beat # annotations or interpolated subdivisions beat_duration = beats[i + 1] - beats[i] offset = 0.5 * beat_duration / self.beat_subdivisions # offset should be < 50 ms offset = np.min([offset, 0.05]) # last frame of beat beat_end = int(np.floor((beats[i + 1] - offset) * self.fps)) # we need to put each feature frame into its corresponding # beat subdivison; linearly align the subdivisions up to the # length of the beat subdiv = np.floor(np.linspace(0, self.beat_subdivisions, beat_end - beat_start, endpoint=False)) beat = features[beat_start:beat_end] # group features by beat subdivisions and aggregate them subdiv_features = [beat[subdiv == div] for div in range(self.beat_subdivisions)] beat_features[i, :, :] = np.array([np.mean(x, axis=0) for x in subdiv_features]) # progress to next beat beat_start = beat_end # return beats and beat-synchronous features return beat_features
[docs]class RNNBarProcessor(Processor): """ Retrieve a downbeat activation function from a signal and pre-determined beat positions by obtaining beat-synchronous harmonic and percussive features which are processed with a GRU-RNN. Parameters ---------- beat_subdivisions : tuple, optional Number of beat subdivisions for the percussive and harmonic feature. References ---------- .. [1] Florian Krebs, Sebastian Böck and Gerhard Widmer, "Downbeat Tracking Using Beat-Synchronous Features and Recurrent Networks", Proceedings of the 17th International Society for Music Information Retrieval Conference (ISMIR), 2016. Examples -------- Create an RNNBarProcessor and pass an audio file and pre-determined (or given) beat positions through the processor. The returned tuple contains the beats positions and the probability to be a downbeat. >>> proc = RNNBarProcessor() >>> proc # doctest: +ELLIPSIS <madmom.features.downbeats.RNNBarProcessor object at 0x...> >>> beats = np.loadtxt('tests/data/detections/sample.dbn_beat_tracker.txt') >>> downbeat_prob = proc(('tests/data/audio/sample.wav', beats)) >>> np.around(downbeat_prob, decimals=3) ... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS +NORMALIZE_ARRAYS array([[0.1 , 0.378], [0.45 , 0.19 ], [0.8 , 0.112], [1.12 , 0.328], [1.48 , 0.27 ], [1.8 , 0.181], [2.15 , 0.162], [2.49 , nan]]) """ def __init__(self, beat_subdivisions=(4, 2), fps=100, **kwargs): # pylint: disable=unused-argument from ..audio.signal import SignalProcessor, FramedSignalProcessor from ..audio.stft import ShortTimeFourierTransformProcessor from ..audio.spectrogram import ( FilteredSpectrogramProcessor, LogarithmicSpectrogramProcessor, SpectrogramDifferenceProcessor) from ..audio.chroma import CLPChromaProcessor from ..ml.nn import NeuralNetworkEnsemble from ..models import DOWNBEATS_BGRU # define percussive feature sig = SignalProcessor(num_channels=1, sample_rate=44100) frames = FramedSignalProcessor(frame_size=2048, fps=fps) stft = ShortTimeFourierTransformProcessor() # caching FFT window spec = FilteredSpectrogramProcessor( num_bands=6, fmin=30., fmax=17000., norm_filters=True) log_spec = LogarithmicSpectrogramProcessor(mul=1, add=1) diff = SpectrogramDifferenceProcessor( diff_ratio=0.5, positive_diffs=True) self.perc_feat = SequentialProcessor( (sig, frames, stft, spec, log_spec, diff)) # define harmonic feature self.harm_feat = CLPChromaProcessor( fps=fps, fmin=27.5, fmax=4200., compression_factor=100, norm=True, threshold=0.001) # sync features to the beats # TODO: can beat_subdivisions extracted from somewhere? self.perc_beat_sync = SyncronizeFeaturesProcessor( beat_subdivisions[0], fps=fps, **kwargs) self.harm_beat_sync = SyncronizeFeaturesProcessor( beat_subdivisions[1], fps=fps, **kwargs) # NN ensembles to process beat-synchronous features self.perc_nn = NeuralNetworkEnsemble.load(DOWNBEATS_BGRU[0], **kwargs) self.harm_nn = NeuralNetworkEnsemble.load(DOWNBEATS_BGRU[1], **kwargs)
[docs] def process(self, data, **kwargs): """ Retrieve a downbeat activation function from a signal and beat positions. Parameters ---------- data : tuple Tuple containg a signal or file (handle) and corresponding beat times [seconds]. Returns ------- numpy array, shape (num_beats, 2) Array containing the beat positions (first column) and the corresponding downbeat activations, i.e. the probability that a beat is a downbeat (second column). Notes ----- Since features are synchronized to the beats, and the probability of being a downbeat depends on a whole beat duration, only num_beats-1 activations can be computed and the last value is filled with 'NaN'. """ # pylint: disable=unused-argument # split the input data signal, beats = data # process the signal perc = self.perc_feat(signal) harm = self.harm_feat(signal) # sync to the beats perc_synced = self.perc_beat_sync((perc, beats)) harm_synced = self.harm_beat_sync((harm, beats)) # process with NNs and average the predictions # Note: reshape the NN input to length of synced features perc = self.perc_nn(perc_synced.reshape((len(perc_synced), -1))) harm = self.harm_nn(harm_synced.reshape((len(harm_synced), -1))) # since the synchronized features contain 1 value less than the number # of beats, append an artificial value act = np.mean([perc, harm], axis=0) act = np.append(act, np.ones(1) * np.nan) return np.vstack((beats, act)).T
[docs]class DBNBarTrackingProcessor(Processor): """ Bar tracking with a dynamic Bayesian network (DBN) approximated by a Hidden Markov Model (HMM). Parameters ---------- beats_per_bar : int or list Number of beats per bar to be modeled. Can be either a single number or a list or array with bar lengths (in beats). observation_weight : int, optional Weight for the downbeat activations. meter_change_prob : float, optional Probability to change meter at bar boundaries. Examples -------- Create a DBNBarTrackingProcessor. The returned array represents the positions of the beats and their position inside the bar. The position inside the bar follows the natural counting and starts at 1. The number of beats per bar which should be modelled must be given, all other parameters (e.g. probability to change the meter at bar boundaries) are optional but must have the same length as `beats_per_bar`. >>> proc = DBNBarTrackingProcessor(beats_per_bar=[3, 4]) >>> proc # doctest: +ELLIPSIS <madmom.features.downbeats.DBNBarTrackingProcessor object at 0x...> Call this DBNDownBeatTrackingProcessor with beat positions and downbeat activation function returned by RNNBarProcessor to obtain the positions. >>> beats = np.loadtxt('tests/data/detections/sample.dbn_beat_tracker.txt') >>> act = RNNBarProcessor()(('tests/data/audio/sample.wav', beats)) >>> proc(act) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE array([[0.1 , 1. ], [0.45, 2. ], [0.8 , 3. ], [1.12, 1. ], [1.48, 2. ], [1.8 , 3. ], [2.15, 1. ], [2.49, 2. ]]) """ OBSERVATION_WEIGHT = 100 METER_CHANGE_PROB = 1e-7 def __init__(self, beats_per_bar=(3, 4), observation_weight=OBSERVATION_WEIGHT, meter_change_prob=METER_CHANGE_PROB, **kwargs): # pylint: disable=unused-argument # save variables self.beats_per_bar = beats_per_bar # state space & transition model for each bar length state_spaces = [] transition_models = [] for beats in self.beats_per_bar: # Note: tempo and transition_lambda is not relevant st = BarStateSpace(beats, min_interval=1, max_interval=1) tm = BarTransitionModel(st, transition_lambda=1) state_spaces.append(st) transition_models.append(tm) # Note: treat diffrent bar lengths as different patterns and use the # existing MultiPatternStateSpace and MultiPatternTransitionModel self.st = MultiPatternStateSpace(state_spaces) self.tm = MultiPatternTransitionModel( transition_models, transition_prob=meter_change_prob) # observation model self.om = RNNBeatTrackingObservationModel(self.st, observation_weight) # instantiate a HMM self.hmm = HiddenMarkovModel(self.tm, self.om, None)
[docs] def process(self, data, **kwargs): """ Detect downbeats from the given beats and activation function with Viterbi decoding. Parameters ---------- data : numpy array, shape (num_beats, 2) Array containing beat positions (first column) and corresponding downbeat activations (second column). Returns ------- numpy array, shape (num_beats, 2) Decoded (down-)beat positions and beat numbers. Notes ----- The position of the last beat is not decoded, but rather extrapolated based on the position and meter of the second to last beat. """ # pylint: disable=unused-argument beats = data[:, 0] activations = data[:, 1] # remove unsynchronised (usually the last) values activations = activations[:-1] # TODO: expand to generic extrapolation of values? e.g.: # activations = activations[~np.isnan(activations)] # Viterbi decoding path, _ = self.hmm.viterbi(activations) # get the position inside the bar position = self.st.state_positions[path] # the beat numbers are the counters + 1 at the transition points beat_numbers = position.astype(int) + 1 # add the last beat (which has no activation function value) meter = self.beats_per_bar[self.st.state_patterns[path[-1]]] last_beat_number = np.mod(beat_numbers[-1], meter) + 1 beat_numbers = np.append(beat_numbers, last_beat_number) # return beats and their beat numbers return np.vstack(zip(beats, beat_numbers))
[docs] @classmethod def add_arguments(cls, parser, beats_per_bar, observation_weight=OBSERVATION_WEIGHT, meter_change_prob=METER_CHANGE_PROB): """ Add DBN related arguments to an existing parser. Parameters ---------- parser : argparse parser instance Existing argparse parser object. beats_per_bar : int or list, optional Number of beats per bar to be modeled. Can be either a single number or a list with bar lengths (in beats). observation_weight : float, optional Weight for the activations at downbeat times. meter_change_prob : float, optional Probability to change meter at bar boundaries. Returns ------- parser_group : argparse argument group DBN bar tracking argument parser group """ # pylint: disable=arguments-differ from ..utils import OverrideDefaultListAction # add DBN parser group g = parser.add_argument_group('dynamic Bayesian Network arguments') g.add_argument('--beats_per_bar', action=OverrideDefaultListAction, default=beats_per_bar, type=int, sep=',', help='number of beats per bar to be modeled (comma ' 'separated list of bar length in beats) ' '[default=%(default)s]') g.add_argument('--observation_weight', action='store', type=float, default=observation_weight, help='weight for the downbeat activations ' '[default=%(default)i]') g.add_argument('--meter_change_prob', action='store', type=float, default=meter_change_prob, help='meter change probability [default=%(default).g]') # add output format stuff parser = parser.add_argument_group('output arguments') parser.add_argument('--downbeats', action='store_true', default=False, help='output only the downbeats') # return the argument group so it can be modified if needed return parser