Source code for madmom.processors

# encoding: utf-8
# pylint: disable=no-member
# pylint: disable=invalid-name
# pylint: disable=too-many-arguments
"""
This module contains all processor related functionality.

Notes
-----
All features should be implemented as classes which inherit from Processor
(or provide a XYZProcessor(Processor) variant). This way, multiple Processor
objects can be chained/combined to achieve the wanted functionality.

"""

from __future__ import absolute_import, division, print_function

import os
import sys
import argparse
import multiprocessing as mp

from collections import MutableSequence


[docs]class Processor(object): """ Abstract base class for processing data. """ @classmethod
[docs] def load(cls, infile): """ Instantiate a new Processor from a file. This method un-pickles a saved Processor object. Subclasses should overwrite this method with a better performing solution if speed is an issue. Parameters ---------- infile : str or file handle Pickled processor. Returns ------- :class:`Processor` instance Processor. """ import pickle # close the open file if needed and use its name if not isinstance(infile, str): infile.close() infile = infile.name # instantiate a new Processor and return it with open(infile, 'rb') as f: # Python 2 and 3 behave differently try: # Python 3 obj = pickle.load(f, encoding='latin1') except TypeError: # Python 2 doesn't have/need the encoding obj = pickle.load(f) # warn if the unpickled Processor is of other type if obj.__class__ is not cls: import warnings warnings.warn("Expected Processor of class '%s' but loaded " "Processor is of class '%s', processing anyways." % (cls.__name__, obj.__class__.__name__)) return obj
[docs] def dump(self, outfile): """ Save the Processor to a file. This method pickles a Processor object and saves it. Subclasses should overwrite this method with a better performing solution if speed is an issue. Parameters ---------- outfile : str or file handle Output file for pickling the processor. """ import pickle # close the open file if needed and use its name if not isinstance(outfile, str): outfile.close() outfile = outfile.name # dump the Processor to the given file # Note: for Python 2 / 3 compatibility reason use protocol 2 pickle.dump(self, open(outfile, 'wb'), protocol=2)
[docs] def process(self, data): """ Process the data. This method must be implemented by the derived class and should process the given data and return the processed output. Parameters ---------- data : depends on the implementation of subclass Data to be processed. Returns ------- depends on the implementation of subclass Processed data. """ raise NotImplementedError('must be implemented by subclass.')
def __call__(self, *args, **kwargs): # this magic method makes a Processor callable return self.process(*args, **kwargs)
[docs]class OutputProcessor(Processor): """ Class for processing data and/or feeding it into some sort of output. """
[docs] def process(self, data, output): """ Processes the data and feed it to the output. This method must be implemented by the derived class and should process the given data and return the processed output. Parameters ---------- data : depends on the implementation of subclass Data to be processed (e.g. written to file). output : str or file handle Output file name or file handle. Returns ------- depends on the implementation of subclass Processed data. """ # pylint: disable=arguments-differ raise NotImplementedError('must be implemented by subclass.')
# functions for processing file(s) with a Processor def _process(process_tuple): """ Function to process a Processor with data. The processed data is returned and if applicable also piped to the given output. Parameters ---------- process_tuple : tuple (Processor/function, data, [output]) The tuple must contain a Processor object as the first item and the data to be processed as the second tuple item. If a third tuple item is given, it is used as an output. Instead of a Processor also a function accepting a single positional argument (data) or two positional arguments (data, output) can be given. It must behave exactly as a :class:`Processor`, i.e. return the processed data and optionally pipe it to the output. Returns ------- depends on the processor Processed data. Notes ----- This must be a top-level function to be pickle-able. """ if process_tuple[0] is None: # do not process the data, if the first item (i.e. Processor) is None return process_tuple[1] else: # just call whatever we got here (every Processor is callable) return process_tuple[0](*process_tuple[1:])
[docs]class SequentialProcessor(MutableSequence, Processor): """ Processor class for sequential processing of data. Parameters ---------- processors : list Processor instances to be processed sequentially. Notes ----- If the `processors` list contains lists or tuples, these get wrapped as a SequentialProcessor itself. """ def __init__(self, processors): self.processors = [] # iterate over all given processors and save them for processor in processors: # wrap lists and tuples as a SequentialProcessor if isinstance(processor, (list, tuple)): processor = SequentialProcessor(processor) # save the processors self.processors.append(processor) def __getitem__(self, index): """ Get the Processor at the given processing chain position. Parameters ---------- index : int Position inside the processing chain. Returns ------- :class:`Processor` Processor at the given position. """ return self.processors[index] def __setitem__(self, index, processor): """ Set the Processor at the given processing chain position. Parameters ---------- index : int Position inside the processing chain. processor : :class:`Processor` Processor to set. """ self.processors[index] = processor def __delitem__(self, index): """ Delete the Processor at the given processing chain position. Parameters ---------- index : int Position inside the processing chain. """ del self.processors[index] def __len__(self): """Length of the processing chain.""" return len(self.processors)
[docs] def insert(self, index, processor): """ Insert a Processor at the given processing chain position. Parameters ---------- index : int Position inside the processing chain. processor : :class:`Processor` Processor to insert. """ self.processors.insert(index, processor)
[docs] def append(self, other): """ Append another Processor to the processing chain. Parameters ---------- other : :class:`Processor` Processor to append to the processing chain. """ self.processors.append(other)
[docs] def extend(self, other): """ Extend the processing chain with a list of Processors. Parameters ---------- other : list Processors to be appended to the processing chain. """ self.processors.extend(other)
[docs] def process(self, data): """ Process the data sequentially with the defined processing chain. Parameters ---------- data : depends on the first processor of the processing chain Data to be processed. Returns ------- depends on the last processor of the processing chain Processed data. """ # sequentially process the data for processor in self.processors: data = _process((processor, data)) return data
# inherit from SequentialProcessor because of append() and extend()
[docs]class ParallelProcessor(SequentialProcessor): """ Processor class for parallel processing of data. Parameters ---------- processors : list Processor instances to be processed in parallel. num_threads : int, optional Number of parallel working threads. Notes ----- If the `processors` list contains lists or tuples, these get wrapped as a :class:`SequentialProcessor`. """ # pylint: disable=too-many-ancestors def __init__(self, processors, num_threads=None): # set the processing chain super(ParallelProcessor, self).__init__(processors) # number of threads if num_threads is None: num_threads = 1 # Note: we must define the map function here, otherwise it leaks both # memory and file descriptors if we init the pool in the process # method. This also means that we must use only 1 thread if we # want to pickle the Processor, because map is pickle-able, # whereas mp.Pool().map is not. self.map = map if min(len(processors), max(1, num_threads)) > 1: self.map = mp.Pool(num_threads).map
[docs] def process(self, data): """ Process the data in parallel. Parameters ---------- data : depends on the processors Data to be processed. Returns ------- list Processed data. """ import itertools as it # process data in parallel and return a list with processed data return list(self.map(_process, zip(self.processors, it.repeat(data))))
@staticmethod
[docs] def add_arguments(parser, num_threads): """ Add parallel processing options to an existing parser object. Parameters ---------- parser : argparse parser instance Existing argparse parser object. num_threads : int, optional Number of parallel working threads. Returns ------- argparse argument group Parallel processing argument parser group. Notes ----- The group is only returned if only if `num_threads` is not 'None'. Setting it smaller or equal to 0 sets it the number of CPU cores. """ # add parallel processing options g = parser.add_argument_group('parallel processing arguments') g.add_argument('-j', '--threads', dest='num_threads', action='store', type=int, default=num_threads, help='number of parallel threads [default=%(default)s]') # return the argument group so it can be modified if needed return g
[docs]class IOProcessor(OutputProcessor): """ Input/Output Processor which processes the input data with the input processor and pipes everything into the given output processor. All Processors defined in the input chain are sequentially called with the 'data' argument only. The output Processor is the only one ever called with two arguments ('data', 'output'). Parameters ---------- in_processor : :class:`Processor`, function, tuple or list Input processor. Can be a :class:`Processor` (or subclass thereof like :class:`SequentialProcessor` or :class:`ParallelProcessor`), a function accepting a single argument ('data'). If a tuple or list is given, it is wrapped as a :class:`SequentialProcessor`. out_processor : :class:`OutputProcessor`, function, tuple or list OutputProcessor or function accepting two arguments ('data', 'output'). If a tuple or list is given, it is wrapped in an :class:`IOProcessor` itself with the last element regarded as the `out_processor` and all others as `in_processor`. """ def __init__(self, in_processor, out_processor=None): # TODO: check the input and output processors!? # as input a Processor, SequentialProcessor, ParallelProcessor # or a function with only one argument should be accepted # as output a OutputProcessor, IOProcessor or function with two # arguments should be accepted # wrap the input processor in a SequentialProcessor if needed if isinstance(in_processor, (list, tuple)): self.in_processor = SequentialProcessor(in_processor) else: self.in_processor = in_processor # wrap the output processor in an IOProcessor if needed if isinstance(out_processor, list): if len(out_processor) >= 2: # use the last processor as output and all others as input self.out_processor = IOProcessor(out_processor[:-1], out_processor[-1]) if len(out_processor) == 1: self.out_processor = out_processor[0] else: self.out_processor = out_processor def __getitem__(self, index): """ Get the Processor at the given position. Parameters ---------- index : int Processor position. Index '0' refers to the `in_processor`, index '1' to the `out_processor`. Returns ------- :class:`Processor` Processor at the given position. """ if index == 0: return self.in_processor elif index == 1: return self.out_processor else: raise IndexError('Only `in_processor` at index 0 and ' '`out_processor` at index 1 are defined.')
[docs] def process(self, data, output=None): """ Processes the data with the input processor and pipe everything into the output processor, which also pipes it to `output`. Parameters ---------- data : depends on the input processors Data to be processed. output: str or file handle Output file (handle). Returns ------- depends on the output processors Processed data. """ # process the data by the input processor data = _process((self.in_processor, data, )) # process the data by the output processor and return it return _process((self.out_processor, data, output))
# functions and classes to process files with a Processor
[docs]def process_single(processor, infile, outfile, **kwargs): """ Process a single file with the given Processor. Parameters ---------- processor : :class:`Processor` instance Processor to be processed. infile : str or file handle Input file (handle). outfile : str or file handle Output file (handle). """ # pylint: disable=unused-argument processor(infile, outfile)
class _ParallelProcess(mp.Process): """ Class for processing tasks in a queue. Parameters ---------- task_queue : Queue with tasks, i.e. tuples ('processor', 'infile', 'outfile') Notes ----- Usually, multiple instances are created via :func:`process_batch`. """ def __init__(self, task_queue): super(_ParallelProcess, self).__init__() self.task_queue = task_queue def run(self): """Process all tasks from the task queue.""" from .audio.signal import LoadAudioFileError while True: # get the task tuple processor, infile, outfile = self.task_queue.get() try: # process the Processor with the data processor.process(infile, outfile) except LoadAudioFileError as e: print(e) # signal that it is done self.task_queue.task_done() # function to batch process multiple files with a processor
[docs]def process_batch(processor, files, output_dir=None, output_suffix=None, strip_ext=True, num_workers=mp.cpu_count(), shuffle=False, **kwargs): """ Process a list of files with the given Processor in batch mode. Parameters ---------- processor : :class:`Processor` instance Processor to be processed. files : list Input file(s) (handles). output_dir : str, optional Output directory. output_suffix : str, optional Output suffix (e.g. '.txt' including the dot). strip_ext : bool, optional Strip off the extension from the input files. num_workers : int, optional Number of parallel working threads. shuffle : bool, optional Shuffle the `files` before distributing them to the working threads Notes ----- Either `output_dir` and/or `output_suffix` must be set. If `strip_ext` is True, the extension of the input file names is stripped off before the `output_suffix` is appended to the input file names. Use `shuffle` if you experience out of memory errors (can occur for certain methods with high memory consumptions if consecutive files are rather long). """ # pylint: disable=unused-argument # either output_dir or output_suffix must be given if output_dir is None and output_suffix is None: raise ValueError('either output directory or suffix must be given') # make sure the directory exists if output_dir is not None: try: # create output directory os.mkdir(output_dir) except OSError: # directory exists already pass # create task queue tasks = mp.JoinableQueue() # create working threads processes = [_ParallelProcess(tasks) for _ in range(num_workers)] for p in processes: p.daemon = True p.start() # shuffle files? if shuffle: from random import shuffle shuffle(files) # process all the files for input_file in files: # set the output file name if output_dir is not None: output_file = "%s/%s" % (output_dir, os.path.basename(input_file)) else: output_file = input_file # strip off the extension if strip_ext: output_file = os.path.splitext(output_file)[0] # append the suffix if needed if output_suffix is not None: output_file += output_suffix # put processing tasks in the queue tasks.put((processor, input_file, output_file)) # wait for all processing tasks to finish tasks.join()
# function for pickling a processor
[docs]def pickle_processor(processor, outfile, **kwargs): """ Pickle the Processor to a file. Parameters ---------- processor : :class:`Processor` instance Processor to be pickled. outfile : str or file handle Output file (handle) where to pickle it. """ # pylint: disable=unused-argument processor.dump(outfile)
# generic input/output arguments for scripts
[docs]def io_arguments(parser, output_suffix='.txt', pickle=True): """ Add input / output related arguments to an existing parser. Parameters ---------- parser : argparse parser instance Existing argparse parser object. output_suffix : str, optional Suffix appended to the output files. pickle : bool, optional Add a 'pickle' sub-parser to the parser? """ # default output try: output = sys.stdout.buffer except AttributeError: output = sys.stdout # add general options parser.add_argument('-v', dest='verbose', action='count', help='increase verbosity level') # add subparsers sub_parsers = parser.add_subparsers(title='processing options') if pickle: # pickle processor options sp = sub_parsers.add_parser('pickle', help='pickle processor') sp.set_defaults(func=pickle_processor) # Note: requiring '-o' is a simple safety measure to not overwrite # existing audio files after using the processor in 'batch' mode sp.add_argument('-o', dest='outfile', type=argparse.FileType('wb'), default=output, help='output file [default: STDOUT]') # single file processing options sp = sub_parsers.add_parser('single', help='single file processing') sp.set_defaults(func=process_single) sp.add_argument('infile', type=argparse.FileType('rb'), help='input audio file') # Note: requiring '-o' is a simple safety measure to not overwrite existing # audio files after using the processor in 'batch' mode sp.add_argument('-o', dest='outfile', type=argparse.FileType('wb'), default=output, help='output file [default: STDOUT]') sp.add_argument('-j', dest='num_threads', type=int, default=mp.cpu_count(), help='number of parallel threads [default=%(default)s]') # batch file processing options sp = sub_parsers.add_parser('batch', help='batch file processing') sp.set_defaults(func=process_batch) sp.add_argument('files', nargs='+', help='files to be processed') sp.add_argument('-o', dest='output_dir', default=None, help='output directory [default=%(default)s]') sp.add_argument('-s', dest='output_suffix', default=output_suffix, help='suffix appended to the files (dot must be included ' 'if wanted) [default=%(default)s]') sp.add_argument('--ext', dest='strip_ext', action='store_false', help='keep the extension of the input file [default=' 'strip it off before appending the output suffix]') sp.add_argument('-j', dest='num_workers', type=int, default=mp.cpu_count(), help='number of parallel workers [default=%(default)s]') sp.add_argument('--shuffle', action='store_true', help='shuffle files before distributing them to the ' 'working threads [default=process them in sorted ' 'order]') sp.set_defaults(num_threads=1)