Source code for madmom.processors

# encoding: utf-8
# pylint: disable=no-member
# pylint: disable=invalid-name
# pylint: disable=too-many-arguments
"""
This module contains all processor related functionality.

Notes
-----
All features should be implemented as classes which inherit from Processor
(or provide a XYZProcessor(Processor) variant). This way, multiple Processor
objects can be chained/combined to achieve the wanted functionality.

"""

from __future__ import absolute_import, division, print_function

import argparse
import itertools as it
import multiprocessing as mp
import os
import sys
from collections import MutableSequence

import numpy as np

from .utils import integer_types


[docs]class Processor(object): """ Abstract base class for processing data. """
[docs] @classmethod def load(cls, infile): """ Instantiate a new Processor from a file. This method un-pickles a saved Processor object. Subclasses should overwrite this method with a better performing solution if speed is an issue. Parameters ---------- infile : str or file handle Pickled processor. Returns ------- :class:`Processor` instance Processor. """ import pickle from .io import open_file # instantiate a new Processor and return it with open_file(infile, 'rb') as f: # Python 2 and 3 behave differently try: # Python 3 obj = pickle.load(f, encoding='latin1') except TypeError: # Python 2 doesn't have/need the encoding obj = pickle.load(f) # warn if the unpickled Processor is of other type if obj.__class__ is not cls: import warnings warnings.warn("Expected Processor of class '%s' but loaded " "Processor is of class '%s', processing anyways." % (cls.__name__, obj.__class__.__name__)) return obj
[docs] def dump(self, outfile): """ Save the Processor to a file. This method pickles a Processor object and saves it. Subclasses should overwrite this method with a better performing solution if speed is an issue. Parameters ---------- outfile : str or file handle Output file for pickling the processor. """ import pickle from .io import open_file # dump the Processor to the given file # Note: for Python 2 / 3 compatibility reason use protocol 2 with open_file(outfile, 'wb') as f: pickle.dump(self, f, protocol=2)
[docs] def process(self, data, **kwargs): """ Process the data. This method must be implemented by the derived class and should process the given data and return the processed output. Parameters ---------- data : depends on the implementation of subclass Data to be processed. kwargs : dict, optional Keyword arguments for processing. Returns ------- depends on the implementation of subclass Processed data. """ raise NotImplementedError('Must be implemented by subclass.')
def __call__(self, *args, **kwargs): # this magic method makes a Processor callable return self.process(*args, **kwargs)
[docs]class OnlineProcessor(Processor): """ Abstract base class for processing data in online mode. Derived classes must implement the following methods: - process_online(): process the data in online mode, - process_offline(): process the data in offline mode. """ def __init__(self, online=False): self.online = online
[docs] def process(self, data, **kwargs): """ Process the data either in online or offline mode. Parameters ---------- data : depends on the implementation of subclass Data to be processed. kwargs : dict, optional Keyword arguments for processing. Returns ------- depends on the implementation of subclass Processed data. Notes ----- This method is used to pass the data to either `process_online` or `process_offline`, depending on the `online` setting of the processor. """ if self.online: return self.process_online(data, **kwargs) return self.process_offline(data, **kwargs)
[docs] def process_online(self, data, reset=True, **kwargs): """ Process the data in online mode. This method must be implemented by the derived class and should process the given data frame by frame and return the processed output. Parameters ---------- data : depends on the implementation of subclass Data to be processed. reset : bool, optional Reset the processor to its initial state before processing. kwargs : dict, optional Keyword arguments for processing. Returns ------- depends on the implementation of subclass Processed data. """ raise NotImplementedError('Must be implemented by subclass.')
[docs] def process_offline(self, data, **kwargs): """ Process the data in offline mode. This method must be implemented by the derived class and should process the given data and return the processed output. Parameters ---------- data : depends on the implementation of subclass Data to be processed. kwargs : dict, optional Keyword arguments for processing. Returns ------- depends on the implementation of subclass Processed data. """ raise NotImplementedError('Must be implemented by subclass.')
[docs] def reset(self): """ Reset the OnlineProcessor. This method must be implemented by the derived class and should reset the processor to its initial state. """ raise NotImplementedError('Must be implemented by subclass.')
[docs]class OutputProcessor(Processor): """ Class for processing data and/or feeding it into some sort of output. """
[docs] def process(self, data, output, **kwargs): """ Processes the data and feed it to the output. This method must be implemented by the derived class and should process the given data and return the processed output. Parameters ---------- data : depends on the implementation of subclass Data to be processed (e.g. written to file). output : str or file handle Output file name or file handle. kwargs : dict, optional Keyword arguments for processing. Returns ------- depends on the implementation of subclass Processed data. """ # pylint: disable=arguments-differ raise NotImplementedError('Must be implemented by subclass.')
# functions for processing file(s) with a Processor def _process(process_tuple): """ Function to process a Processor with data. The processed data is returned and if applicable also piped to the given output. Parameters ---------- process_tuple : tuple (Processor/function, data[, output], kwargs) The tuple must contain a Processor object as the first item and the data to be processed as the second tuple item. If a third tuple item is given, it is used as an output argument. The last item is passed as keyword arguments to the processor's process() method. Instead of a Processor also a function accepting a single positional argument (data) or two positional arguments (data, output) can be given. It must behave exactly as a :class:`Processor`, i.e. return the processed data and optionally pipe it to the output. Keyword arguments are not passed to the function. Returns ------- depends on the processor Processed data. Notes ----- This must be a top-level function to be pickle-able. """ # do not process the data, if the first item (i.e. Processor) is None if process_tuple[0] is None: return process_tuple[1] # call the Processor with data and kwargs elif isinstance(process_tuple[0], Processor): return process_tuple[0](*process_tuple[1:-1], **process_tuple[-1]) # just call whatever we got here (e.g. a function) without kwargs return process_tuple[0](*process_tuple[1:-1])
[docs]class SequentialProcessor(MutableSequence, Processor): """ Processor class for sequential processing of data. Parameters ---------- processors : list Processor instances to be processed sequentially. Notes ----- If the `processors` list contains lists or tuples, these get wrapped as a SequentialProcessor itself. """ def __init__(self, processors): self.processors = [] # iterate over all given processors and save them for processor in processors: # wrap lists and tuples as a SequentialProcessor if isinstance(processor, (list, tuple)): processor = SequentialProcessor(processor) # save the processors self.processors.append(processor) def __getitem__(self, index): """ Get the Processor at the given processing chain position. Parameters ---------- index : int Position inside the processing chain. Returns ------- :class:`Processor` Processor at the given position. """ return self.processors[index] def __setitem__(self, index, processor): """ Set the Processor at the given processing chain position. Parameters ---------- index : int Position inside the processing chain. processor : :class:`Processor` Processor to set. """ self.processors[index] = processor def __delitem__(self, index): """ Delete the Processor at the given processing chain position. Parameters ---------- index : int Position inside the processing chain. """ del self.processors[index] def __len__(self): """Length of the processing chain.""" return len(self.processors)
[docs] def insert(self, index, processor): """ Insert a Processor at the given processing chain position. Parameters ---------- index : int Position inside the processing chain. processor : :class:`Processor` Processor to insert. """ self.processors.insert(index, processor)
[docs] def append(self, other): """ Append another Processor to the processing chain. Parameters ---------- other : :class:`Processor` Processor to append to the processing chain. """ self.processors.append(other)
[docs] def extend(self, other): """ Extend the processing chain with a list of Processors. Parameters ---------- other : list Processors to be appended to the processing chain. """ self.processors.extend(other)
[docs] def process(self, data, **kwargs): """ Process the data sequentially with the defined processing chain. Parameters ---------- data : depends on the first processor of the processing chain Data to be processed. kwargs : dict, optional Keyword arguments for processing. Returns ------- depends on the last processor of the processing chain Processed data. """ # sequentially process the data for processor in self.processors: data = _process((processor, data, kwargs)) return data
# inherit from SequentialProcessor because of append() and extend()
[docs]class ParallelProcessor(SequentialProcessor): """ Processor class for parallel processing of data. Parameters ---------- processors : list Processor instances to be processed in parallel. num_threads : int, optional Number of parallel working threads. Notes ----- If the `processors` list contains lists or tuples, these get wrapped as a :class:`SequentialProcessor`. """ # pylint: disable=too-many-ancestors def __init__(self, processors, num_threads=None): # set the processing chain super(ParallelProcessor, self).__init__(processors) # number of threads if num_threads is None: num_threads = 1 # Note: we must define the map function here, otherwise it leaks both # memory and file descriptors if we init the pool in the process # method. This also means that we must use only 1 thread if we # want to pickle the Processor, because map is pickle-able, # whereas mp.Pool().map is not. self.map = map if min(len(processors), max(1, num_threads)) > 1: self.map = mp.Pool(num_threads).map
[docs] def process(self, data, **kwargs): """ Process the data in parallel. Parameters ---------- data : depends on the processors Data to be processed. kwargs : dict, optional Keyword arguments for processing. Returns ------- list Processed data. """ # if only a single processor is given, there's no need to map() if len(self.processors) == 1: return [_process((self.processors[0], data, kwargs))] # process data in parallel and return a list with processed data return list(self.map(_process, zip(self.processors, it.repeat(data), it.repeat(kwargs))))
[docs]class IOProcessor(OutputProcessor): """ Input/Output Processor which processes the input data with the input processor and pipes everything into the given output processor. All Processors defined in the input chain are sequentially called with the 'data' argument only. The output Processor is the only one ever called with two arguments ('data', 'output'). Parameters ---------- in_processor : :class:`Processor`, function, tuple or list Input processor. Can be a :class:`Processor` (or subclass thereof like :class:`SequentialProcessor` or :class:`ParallelProcessor`), a function accepting a single argument ('data'). If a tuple or list is given, it is wrapped as a :class:`SequentialProcessor`. out_processor : :class:`OutputProcessor`, function, tuple or list OutputProcessor or function accepting two arguments ('data', 'output'). If a tuple or list is given, it is wrapped in an :class:`IOProcessor` itself with the last element regarded as the `out_processor` and all others as `in_processor`. """ def __init__(self, in_processor, out_processor=None): # TODO: check the input and output processors!? # as input a Processor, SequentialProcessor, ParallelProcessor # or a function with only one argument should be accepted # as output a OutputProcessor, IOProcessor or function with two # arguments should be accepted # wrap the input processor in a SequentialProcessor if needed if isinstance(in_processor, (list, tuple)): self.in_processor = SequentialProcessor(in_processor) else: self.in_processor = in_processor # wrap the output processor in an IOProcessor if needed if isinstance(out_processor, (list, tuple)): if len(out_processor) >= 2: # use the last processor as output and all others as input self.out_processor = IOProcessor(out_processor[:-1], out_processor[-1]) if len(out_processor) == 1: self.out_processor = out_processor[0] else: self.out_processor = out_processor def __getitem__(self, index): """ Get the Processor at the given position. Parameters ---------- index : int Processor position. Index '0' refers to the `in_processor`, index '1' to the `out_processor`. Returns ------- :class:`Processor` Processor at the given position. """ if index == 0: return self.in_processor elif index == 1: return self.out_processor else: raise IndexError('Only `in_processor` at index 0 and ' '`out_processor` at index 1 are defined.')
[docs] def process(self, data, output=None, **kwargs): """ Processes the data with the input processor and pipe everything into the output processor, which also pipes it to `output`. Parameters ---------- data : depends on the input processors Data to be processed. output: str or file handle Output file (handle). kwargs : dict, optional Keyword arguments for processing. Returns ------- depends on the output processors Processed data. """ # process the data by the input processor data = _process((self.in_processor, data, kwargs)) # process the data by the output processor and return it return _process((self.out_processor, data, output, kwargs))
# functions and classes to process files with a Processor
[docs]def process_single(processor, infile, outfile, **kwargs): """ Process a single file with the given Processor. Parameters ---------- processor : :class:`Processor` instance Processor to be processed. infile : str or file handle Input file (handle). outfile : str or file handle Output file (handle). """ # pylint: disable=unused-argument # adjust origin in online mode if kwargs.get('online'): kwargs['origin'] = 'online' kwargs['reset'] = False # process the input file _process((processor, infile, outfile, kwargs))
class _ParallelProcess(mp.Process): """ Class for processing tasks in a queue. Parameters ---------- task_queue : Queue with tasks, i.e. tuples ('processor', 'infile', 'outfile') Notes ----- Usually, multiple instances are created via :func:`process_batch`. """ def __init__(self, task_queue): super(_ParallelProcess, self).__init__() self.task_queue = task_queue def run(self): """Process all tasks from the task queue.""" from .audio.signal import LoadAudioFileError while True: # get the task tuple processor, infile, outfile, kwargs = self.task_queue.get() try: # process the Processor with the data _process((processor, infile, outfile, kwargs)) except LoadAudioFileError as e: print(e) # signal that it is done self.task_queue.task_done() # function to batch process multiple files with a processor
[docs]def process_batch(processor, files, output_dir=None, output_suffix=None, strip_ext=True, num_workers=mp.cpu_count(), shuffle=False, **kwargs): """ Process a list of files with the given Processor in batch mode. Parameters ---------- processor : :class:`Processor` instance Processor to be processed. files : list Input file(s) (handles). output_dir : str, optional Output directory. output_suffix : str, optional Output suffix (e.g. '.txt' including the dot). strip_ext : bool, optional Strip off the extension from the input files. num_workers : int, optional Number of parallel working threads. shuffle : bool, optional Shuffle the `files` before distributing them to the working threads Notes ----- Either `output_dir` and/or `output_suffix` must be set. If `strip_ext` is True, the extension of the input file names is stripped off before the `output_suffix` is appended to the input file names. Use `shuffle` if you experience out of memory errors (can occur for certain methods with high memory consumptions if consecutive files are rather long). """ # pylint: disable=unused-argument # either output_dir or output_suffix must be given if output_dir is None and output_suffix is None: raise ValueError('either output directory or suffix must be given') # make sure the directory exists if output_dir is not None: try: # create output directory os.mkdir(output_dir) except OSError: # directory exists already pass # create task queue tasks = mp.JoinableQueue() # create working threads processes = [_ParallelProcess(tasks) for _ in range(num_workers)] for p in processes: p.daemon = True p.start() # shuffle files? if shuffle: from random import shuffle shuffle(files) # process all the files for input_file in files: # set the output file name if output_dir is not None: output_file = "%s/%s" % (output_dir, os.path.basename(input_file)) else: output_file = input_file # strip off the extension if strip_ext: output_file = os.path.splitext(output_file)[0] # append the suffix if needed if output_suffix is not None: output_file += output_suffix # put processing tasks in the queue tasks.put((processor, input_file, output_file, kwargs)) # wait for all processing tasks to finish tasks.join()
# processor for buffering data
[docs]class BufferProcessor(Processor): """ Buffer for processors which need context to do their processing. Parameters ---------- buffer_size : int or tuple Size of the buffer (time steps, [additional dimensions]). init : numpy array, optional Init the buffer with this array. init_value : float, optional If only `buffer_size` is given but no `init`, use this value to initialise the buffer. Notes ----- If `buffer_size` (or the first item thereof in case of tuple) is 1, only the un-buffered current value is returned. If context is needed, `buffer_size` must be set to >1. E.g. SpectrogramDifference needs a context of two frames to be able to compute the difference between two consecutive frames. """ def __init__(self, buffer_size=None, init=None, init_value=0): # if init is given, infer buffer_size from it if buffer_size is None and init is not None: buffer_size = init.shape # if buffer_size is int, make a tuple elif isinstance(buffer_size, integer_types): buffer_size = (buffer_size, ) # TODO: use np.pad for fancy initialisation (can be done in process()) # init buffer if needed if buffer_size is not None and init is None: init = np.ones(buffer_size) * init_value # save variables self.buffer_size = buffer_size self.init = init self.data = init
[docs] def reset(self, init=None): """ Reset BufferProcessor to its initial state. Parameters ---------- init : numpy array, shape (num_hiddens,), optional Reset BufferProcessor to this initial state. """ self.data = init if init is not None else self.init
[docs] def process(self, data, **kwargs): """ Buffer the data. Parameters ---------- data : numpy array or subclass thereof Data to be buffered. Returns ------- numpy array or subclass thereof Data with buffered context. """ # expected minimum number of dimensions ndmin = len(self.buffer_size) # cast the data to have that many dimensions if data.ndim < ndmin: data = np.array(data, copy=False, subok=True, ndmin=ndmin) # length of the data data_length = len(data) # remove `data_length` from buffer at the beginning and append new data self.data = np.roll(self.data, -data_length, axis=0) self.data[-data_length:] = data # return the complete buffer return self.data
# alias for easier / more intuitive calling buffer = process def __getitem__(self, index): """ Direct access to the buffer data. Parameters ---------- index : int, slice, ndarray, Any NumPy indexing method to access the buffer data directly. Returns ------- numpy array or subclass thereof Requested view of the buffered data. """ return self.data[index]
# function to process live input
[docs]def process_online(processor, infile, outfile, **kwargs): """ Process a file or audio stream with the given Processor. Parameters ---------- processor : :class:`Processor` instance Processor to be processed. infile : str or file handle, optional Input file (handle). If none is given, the stream present at the system's audio inpup is used. Additional keyword arguments can be used to influence the frame size and hop size. outfile : str or file handle Output file (handle). kwargs : dict, optional Keyword arguments passed to :class:`.audio.signal.Stream` if `in_stream` is 'None'. Notes ----- Right now there is no way to determine if a processor is online-capable or not. Thus, calling any processor with this function may not produce the results expected. """ from madmom.audio.signal import Stream, FramedSignal # set default values kwargs['sample_rate'] = kwargs.get('sample_rate', 44100) kwargs['num_channels'] = kwargs.get('num_channels', 1) # if no iput file is given, create a Stream with the given arguments if infile is None: # open a stream and start if not running already stream = Stream(**kwargs) if not stream.is_running(): stream.start() # use the input file else: # set parameters for opening the file from .audio.signal import FRAME_SIZE, HOP_SIZE, FPS, NUM_CHANNELS frame_size = kwargs.get('frame_size', FRAME_SIZE) hop_size = kwargs.get('hop_size', HOP_SIZE) fps = kwargs.get('fps', FPS) num_channels = kwargs.get('num_channels', NUM_CHANNELS) # FIXME: overwrite the frame size with the maximum value of all used # processors. This is needed if multiple frame sizes are used import warnings warnings.warn('make sure that the `frame_size` (%d) is equal to the ' 'maximum value used by any `FramedSignalProcessor`.' % frame_size) # Note: origin must be 'online' and num_frames 'None' to behave exactly # the same as with live input stream = FramedSignal(infile, frame_size=frame_size, hop_size=hop_size, fps=fps, origin='online', num_frames=None, num_channels=num_channels) # set arguments for online processing # Note: pass only certain arguments, because these will be passed to the # processors at every time step (kwargs contains file handles etc.) process_args = {'reset': False} # do not reset stateful processors # process everything frame-by-frame for frame in stream: _process((processor, frame, outfile, process_args))
# function for pickling a processor
[docs]def pickle_processor(processor, outfile, **kwargs): """ Pickle the Processor to a file. Parameters ---------- processor : :class:`Processor` instance Processor to be pickled. outfile : str or file handle Output file (handle) where to pickle it. """ # pylint: disable=unused-argument processor.dump(outfile)
# generic input/output arguments for scripts
[docs]def io_arguments(parser, output_suffix='.txt', pickle=True, online=False): """ Add input / output related arguments to an existing parser. Parameters ---------- parser : argparse parser instance Existing argparse parser object. output_suffix : str, optional Suffix appended to the output files. pickle : bool, optional Add a 'pickle' sub-parser to the parser. online : bool, optional Add a 'online' sub-parser to the parser. """ # default output try: output = sys.stdout.buffer except AttributeError: output = sys.stdout # add general options parser.add_argument('-v', dest='verbose', action='count', help='increase verbosity level') # add subparsers sub_parsers = parser.add_subparsers(title='processing options') # pickle processor options if pickle: sp = sub_parsers.add_parser('pickle', help='pickle processor') sp.set_defaults(func=pickle_processor) # Note: requiring '-o' is a simple safety measure to not overwrite # existing audio files after using the processor in 'batch' mode sp.add_argument('-o', dest='outfile', type=argparse.FileType('wb'), default=output, help='output file [default: STDOUT]') # single file processing options sp = sub_parsers.add_parser('single', help='single file processing') sp.set_defaults(func=process_single) sp.add_argument('infile', type=argparse.FileType('rb'), help='input audio file') # Note: requiring '-o' is a simple safety measure to not overwrite existing # audio files after using the processor in 'batch' mode sp.add_argument('-o', dest='outfile', type=argparse.FileType('wb'), default=output, help='output file [default: STDOUT]') sp.add_argument('-j', dest='num_threads', type=int, default=mp.cpu_count(), help='number of threads [default=%(default)s]') # add arguments needed for loading processors if online: sp.add_argument('--online', action='store_true', default=None, help='use online settings [default: offline]') # batch file processing options sp = sub_parsers.add_parser('batch', help='batch file processing') sp.set_defaults(func=process_batch) sp.add_argument('files', nargs='+', help='files to be processed') sp.add_argument('-o', dest='output_dir', default=None, help='output directory [default=%(default)s]') sp.add_argument('-s', dest='output_suffix', default=output_suffix, help='suffix appended to the files (dot must be included ' 'if wanted) [default=%(default)s]') sp.add_argument('--ext', dest='strip_ext', action='store_false', help='keep the extension of the input file [default=' 'strip it off before appending the output suffix]') sp.add_argument('-j', dest='num_workers', type=int, default=mp.cpu_count(), help='number of workers [default=%(default)s]') sp.add_argument('--shuffle', action='store_true', help='shuffle files before distributing them to the ' 'working threads [default=process them in sorted ' 'order]') sp.set_defaults(num_threads=1) # online processing options if online: sp = sub_parsers.add_parser('online', help='online processing') sp.set_defaults(func=process_online) sp.add_argument('infile', nargs='?', type=argparse.FileType('rb'), default=None, help='input audio file (if no file is ' 'given, a stream operating on the ' 'system audio input is used)') sp.add_argument('-o', dest='outfile', type=argparse.FileType('wb'), default=output, help='output file [default: STDOUT]') sp.add_argument('-j', dest='num_threads', type=int, default=1, help='number of threads [default=%(default)s]') # set arguments for loading processors sp.set_defaults(online=True) # use online settings/parameters sp.set_defaults(num_frames=1) # process everything frame-by-frame sp.set_defaults(origin='stream') # set origin to get whole frame