Source code for madmom.audio.signal

# encoding: utf-8
# pylint: disable=no-member
# pylint: disable=invalid-name
# pylint: disable=too-many-arguments
"""
This module contains basic signal processing functionality.

"""

from __future__ import absolute_import, division, print_function

import warnings
import numpy as np

from ..processors import BufferProcessor, Processor
from ..utils import integer_types


# signal functions
[docs]def smooth(signal, kernel): """ Smooth the signal along its first axis. Parameters ---------- signal : numpy array Signal to be smoothed. kernel : numpy array or int Smoothing kernel (size). Returns ------- numpy array Smoothed signal. Notes ----- If `kernel` is an integer, a Hamming window of that length will be used as a smoothing kernel. """ # check if a kernel is given if kernel is None: return signal # size for the smoothing kernel is given elif isinstance(kernel, integer_types): if kernel == 0: return signal elif kernel > 1: # use a Hamming window of given length kernel = np.hamming(kernel) else: raise ValueError("can't create a smoothing kernel of size %d" % kernel) # otherwise use the given smoothing kernel directly elif isinstance(kernel, np.ndarray): kernel = kernel else: raise ValueError("can't smooth signal with %s" % kernel) # convolve with the kernel and return if signal.ndim == 1: return np.convolve(signal, kernel, 'same') elif signal.ndim == 2: from scipy.signal import convolve2d return convolve2d(signal, kernel[:, np.newaxis], 'same') else: raise ValueError('signal must be either 1D or 2D')
[docs]def adjust_gain(signal, gain): """" Adjust the gain of the signal. Parameters ---------- signal : numpy array Signal to be adjusted. gain : float Gain adjustment level [dB]. Returns ------- numpy array Signal with adjusted gain. Notes ----- The signal is returned with the same dtype, thus rounding errors may occur with integer dtypes. `gain` values > 0 amplify the signal and are only supported for signals with float dtype to prevent clipping and integer overflows. """ # convert the gain in dB to a scaling factor gain = np.power(np.sqrt(10.), 0.1 * gain) # prevent overflow and clipping if gain > 1 and np.issubdtype(signal.dtype, np.integer): raise ValueError('positive gain adjustments are only supported for ' 'float dtypes.') # Note: np.asanyarray returns the signal's ndarray subclass return np.asanyarray(signal * gain, dtype=signal.dtype)
[docs]def attenuate(signal, attenuation): """ Attenuate the signal. Parameters ---------- signal : numpy array Signal to be attenuated. attenuation : float Attenuation level [dB]. Returns ------- numpy array Attenuated signal (same dtype as `signal`). Notes ----- The signal is returned with the same dtype, thus rounding errors may occur with integer dtypes. """ # return the signal unaltered if no attenuation is given if attenuation == 0: return signal return adjust_gain(signal, -attenuation)
[docs]def normalize(signal): """ Normalize the signal to have maximum amplitude. Parameters ---------- signal : numpy array Signal to be normalized. Returns ------- numpy array Normalized signal. Notes ----- Signals with float dtypes cover the range [-1, +1], signals with integer dtypes will cover the maximally possible range, e.g. [-32768, 32767] for np.int16. The signal is returned with the same dtype, thus rounding errors may occur with integer dtypes. """ # scaling factor to be applied scaling = float(np.max(np.abs(signal))) if np.issubdtype(signal.dtype, np.integer): if signal.dtype in (np.int16, np.int32): scaling /= np.iinfo(signal.dtype).max else: raise ValueError('only float and np.int16/32 dtypes supported, ' 'not %s.' % signal.dtype) # Note: np.asanyarray returns the signal's ndarray subclass return np.asanyarray(signal / scaling, dtype=signal.dtype)
[docs]def remix(signal, num_channels): """ Remix the signal to have the desired number of channels. Parameters ---------- signal : numpy array Signal to be remixed. num_channels : int Number of channels. Returns ------- numpy array Remixed signal (same dtype as `signal`). Notes ----- This function does not support arbitrary channel number conversions. Only down-mixing to and up-mixing from mono signals is supported. The signal is returned with the same dtype, thus rounding errors may occur with integer dtypes. If the signal should be down-mixed to mono and has an integer dtype, it will be converted to float internally and then back to the original dtype to prevent clipping of the signal. To avoid this double conversion, convert the dtype first. """ # convert to the desired number of channels if num_channels == signal.ndim or num_channels is None: # return as many channels as there are. return signal elif num_channels == 1 and signal.ndim > 1: # down-mix to mono # Note: to prevent clipping, the signal is converted to float first # and then converted back to the original dtype # TODO: add weighted mixing return np.mean(signal, axis=-1).astype(signal.dtype) elif num_channels > 1 and signal.ndim == 1: # up-mix a mono signal simply by copying channels return np.tile(signal[:, np.newaxis], num_channels) else: # any other channel conversion is not supported raise NotImplementedError("Requested %d channels, but got %d channels " "and channel conversion is not implemented." % (num_channels, signal.shape[1]))
[docs]def resample(signal, sample_rate, **kwargs): """ Resample the signal. Parameters ---------- signal : numpy array or Signal Signal to be resampled. sample_rate : int Sample rate of the signal. kwargs : dict, optional Keyword arguments passed to :func:`load_ffmpeg_file`. Returns ------- numpy array or Signal Resampled signal. Notes ----- This function uses ``ffmpeg`` to resample the signal. """ from ..io.audio import load_ffmpeg_file # is the given signal a Signal? if not isinstance(signal, Signal): raise ValueError('only Signals can resampled, not %s' % type(signal)) if signal.sample_rate == sample_rate: return signal # per default use the signal's dtype and num_channels dtype = kwargs.get('dtype', signal.dtype) num_channels = kwargs.get('num_channels', signal.num_channels) # resample the signal signal, sample_rate = load_ffmpeg_file(signal, sample_rate=sample_rate, num_channels=num_channels, dtype=dtype) # return it return Signal(signal, sample_rate=sample_rate)
[docs]def rescale(signal, dtype=np.float32): """ Rescale the signal to range [-1, 1] and return as float dtype. Parameters ---------- signal : numpy array Signal to be remixed. dtype : numpy dtype Data type of the signal. Returns ------- numpy array Signal rescaled to range [-1, 1]. """ # allow only float dtypes if not np.issubdtype(dtype, np.floating): raise ValueError('only float dtypes are supported, not %s.' % dtype) # float signals don't need rescaling if np.issubdtype(signal.dtype, np.floating): return signal.astype(dtype) elif np.issubdtype(signal.dtype, np.integer): return signal.astype(dtype) / np.iinfo(signal.dtype).max else: raise ValueError('unsupported signal dtype: %s.' % signal.dtype)
[docs]def trim(signal, where='fb'): """ Trim leading and trailing zeros of the signal. Parameters ---------- signal : numpy array Signal to be trimmed. where : str, optional A string with 'f' representing trim from front and 'b' to trim from back. Default is 'fb', trim zeros from both ends of the signal. Returns ------- numpy array Trimmed signal. """ # code borrowed from np.trim_zeros() first = 0 where = where.upper() if 'F' in where: for i in signal: if np.sum(i) != 0.: break else: first += 1 last = len(signal) if 'B' in where: for i in signal[::-1]: if np.sum(i) != 0.: break else: last -= 1 return signal[first:last]
[docs]def energy(signal): """ Compute the energy of a (framed) signal. Parameters ---------- signal : numpy array Signal. Returns ------- energy : float Energy of the signal. Notes ----- If `signal` is a `FramedSignal`, the energy is computed for each frame individually. """ # compute the energy for every frame of the signal if isinstance(signal, FramedSignal): return np.array([energy(frame) for frame in signal]) # make sure the signal is a numpy array if not isinstance(signal, np.ndarray): raise TypeError("Invalid type for signal, must be a numpy array.") # take the abs if the signal is complex if np.iscomplex(signal).any(): signal = np.abs(signal) # Note: type conversion needed because of integer overflows if signal.dtype != np.float: signal = signal.astype(np.float) # return energy return np.dot(signal.flatten(), signal.flatten())
[docs]def root_mean_square(signal): """ Compute the root mean square of a (framed) signal. This can be used as a measurement of power. Parameters ---------- signal : numpy array Signal. Returns ------- rms : float Root mean square of the signal. Notes ----- If `signal` is a `FramedSignal`, the root mean square is computed for each frame individually. """ # compute the root mean square for every frame of the signal if isinstance(signal, FramedSignal): return np.array([root_mean_square(frame) for frame in signal]) return np.sqrt(energy(signal) / signal.size)
[docs]def sound_pressure_level(signal, p_ref=None): """ Compute the sound pressure level of a (framed) signal. Parameters ---------- signal : numpy array Signal. p_ref : float, optional Reference sound pressure level; if 'None', take the max amplitude value for the data-type, if the data-type is float, assume amplitudes are between -1 and +1. Returns ------- spl : float Sound pressure level of the signal [dB]. Notes ----- From http://en.wikipedia.org/wiki/Sound_pressure: Sound pressure level (SPL) or sound level is a logarithmic measure of the effective sound pressure of a sound relative to a reference value. It is measured in decibels (dB) above a standard reference level. If `signal` is a `FramedSignal`, the sound pressure level is computed for each frame individually. """ # compute the sound pressure level for every frame of the signal if isinstance(signal, FramedSignal): return np.array([sound_pressure_level(frame) for frame in signal]) # compute the RMS rms = root_mean_square(signal) # find a reasonable default reference value if None is given if p_ref is None: if np.issubdtype(signal.dtype, np.integer): p_ref = float(np.iinfo(signal.dtype).max) else: p_ref = 1.0 # normal SPL computation. ignore warnings when taking the log of 0, # then replace the resulting -inf values with the smallest finite number with np.errstate(divide='ignore'): return np.nan_to_num(20.0 * np.log10(rms / p_ref))
# functions to load / write audio files
[docs]class LoadAudioFileError(Exception): """ Deprecated as of version 0.16. Please use madmom.io.audio.LoadAudioFileError instead. Will be removed in version 0.18. """ # pylint: disable=super-init-not-called def __init__(self, value=None): warnings.warn(LoadAudioFileError.__doc__) if value is None: value = 'Could not load audio file.' self.value = value
[docs]def load_wave_file(*args, **kwargs): """ Deprecated as of version 0.16. Please use madmom.io.audio.load_wave_file instead. Will be removed in version 0.18. """ warnings.warn('Deprecated as of version 0.16. Please use madmom.io.audio.' 'load_wave_file instead. Will be removed in version 0.18.') from ..io.audio import load_wave_file return load_wave_file(*args, **kwargs)
[docs]def write_wave_file(*args, **kwargs): """ Deprecated as of version 0.16. Please use madmom.io.audio.write_wave_file instead. Will be removed in version 0.18. """ warnings.warn('Deprecated as of version 0.16. Please use madmom.io.audio.' 'write_wave_file instead. Will be removed in version 0.18.') from ..io.audio import write_wave_file return write_wave_file(*args, **kwargs)
# function for automatically determining how to open audio files
[docs]def load_audio_file(*args, **kwargs): """ Deprecated as of version 0.16. Please use madmom.io.audio.load_audio_file instead. Will be removed in version 0.18. """ warnings.warn('Deprecated as of version 0.16. Please use madmom.io.audio.' 'load_audio_file instead. Will be removed in version 0.18.') from ..io.audio import load_audio_file return load_audio_file(*args, **kwargs)
# signal classes SAMPLE_RATE = None NUM_CHANNELS = None START = None STOP = None NORM = False GAIN = 0. DTYPE = None
[docs]class Signal(np.ndarray): """ The :class:`Signal` class represents a signal as a (memory-mapped) numpy array and enhances it with a number of attributes. Parameters ---------- data : numpy array, str or file handle Signal data or file name or file handle. sample_rate : int, optional Desired sample rate of the signal [Hz], or 'None' to return the signal in its original rate. num_channels : int, optional Reduce or expand the signal to `num_channels` channels, or 'None' to return the signal with its original channels. start : float, optional Start position [seconds]. stop : float, optional Stop position [seconds]. norm : bool, optional Normalize the signal to maximum range of the data type. gain : float, optional Adjust the gain of the signal [dB]. dtype : numpy data type, optional The data is returned with the given dtype. If 'None', it is returned with its original dtype, otherwise the signal gets rescaled. Integer dtypes use the complete value range, float dtypes the range [-1, +1]. Notes ----- `sample_rate` or `num_channels` can be used to set the desired sample rate and number of channels if the audio is read from file. If set to 'None' the audio signal is used as is, i.e. the sample rate and number of channels are determined directly from the audio file. If the `data` is a numpy array, the `sample_rate` is set to the given value and `num_channels` is set to the number of columns of the array. The `gain` can be used to adjust the level of the signal. If both `norm` and `gain` are set, the signal is first normalized and then the gain is applied afterwards. If `norm` or `gain` is set, the selected part of the signal is loaded into memory completely, i.e. .wav files are not memory-mapped any more. Examples -------- Load a mono audio file: >>> sig = Signal('tests/data/audio/sample.wav') >>> sig Signal([-2494, -2510, ..., 655, 639], dtype=int16) >>> sig.sample_rate 44100 Load a stereo audio file, down-mix it to mono: >>> sig = Signal('tests/data/audio/stereo_sample.flac', num_channels=1) >>> sig Signal([ 36, 36, ..., 524, 495], dtype=int16) >>> sig.num_channels 1 Load and re-sample an audio file: >>> sig = Signal('tests/data/audio/sample.wav', sample_rate=22050) >>> sig Signal([-2470, -2553, ..., 517, 677], dtype=int16) >>> sig.sample_rate 22050 Load an audio file with `float32` data type (i.e. rescale it to [-1, 1]): >>> sig = Signal('tests/data/audio/sample.wav', dtype=np.float32) >>> sig Signal([-0.07611, -0.0766 , ..., 0.01999, 0.0195 ], dtype=float32) >>> sig.dtype dtype('float32') """ # pylint: disable=super-on-old-class # pylint: disable=super-init-not-called # pylint: disable=attribute-defined-outside-init def __init__(self, data, sample_rate=SAMPLE_RATE, num_channels=NUM_CHANNELS, start=START, stop=STOP, norm=NORM, gain=GAIN, dtype=DTYPE, **kwargs): # this method is for documentation purposes only pass def __new__(cls, data, sample_rate=SAMPLE_RATE, num_channels=NUM_CHANNELS, start=START, stop=STOP, norm=NORM, gain=GAIN, dtype=DTYPE, **kwargs): from ..io.audio import load_audio_file # try to load an audio file if the data is not a numpy array if not isinstance(data, np.ndarray): data, sample_rate = load_audio_file(data, sample_rate=sample_rate, num_channels=num_channels, start=start, stop=stop, dtype=dtype) # cast as Signal if needed if not isinstance(data, Signal): data = np.asarray(data).view(cls) data.sample_rate = sample_rate # remix to desired number of channels if num_channels: data = remix(data, num_channels) # normalize signal if needed if norm: data = normalize(data) # adjust the gain if needed if gain is not None and gain != 0: data = adjust_gain(data, gain) # resample if needed if sample_rate != data.sample_rate: data = resample(data, sample_rate) # save start and stop position if start is not None: # FIXME: start and stop settings are not checked data.start = start data.stop = start + float(len(data)) / sample_rate # return the object return data def __array_finalize__(self, obj): if obj is None: return # set default values here, also needed for views of the Signal self.sample_rate = getattr(obj, 'sample_rate', None) self.start = getattr(obj, 'start', None) self.stop = getattr(obj, 'stop', None) @property def num_samples(self): """Number of samples.""" return len(self) @property def num_channels(self): """Number of channels.""" # mono file if self.ndim == 1: return 1 # multi channel file return np.shape(self)[1] @property def length(self): """Length of signal in seconds.""" # n/a if the signal has no sample rate if self.sample_rate is None: return None return float(self.num_samples) / self.sample_rate
[docs] def write(self, filename): """ Write the signal to disk as a .wav file. Parameters ---------- filename : str Name of the file. Returns ------- filename : str Name of the written file. """ return write_wave_file(self, filename)
[docs] def energy(self): """Energy of signal.""" return energy(self)
[docs] def root_mean_square(self): """Root mean square of signal.""" return root_mean_square(self)
rms = root_mean_square
[docs] def sound_pressure_level(self): """Sound pressure level of signal.""" return sound_pressure_level(self)
spl = sound_pressure_level
[docs]class SignalProcessor(Processor): """ The :class:`SignalProcessor` class is a basic signal processor. Parameters ---------- sample_rate : int, optional Sample rate of the signal [Hz]; if set the signal will be re-sampled to that sample rate; if 'None' the sample rate of the audio file will be used. num_channels : int, optional Number of channels of the signal; if set, the signal will be reduced to that number of channels; if 'None' as many channels as present in the audio file are returned. start : float, optional Start position [seconds]. stop : float, optional Stop position [seconds]. norm : bool, optional Normalize the signal to the range [-1, +1]. gain : float, optional Adjust the gain of the signal [dB]. dtype : numpy data type, optional The data is returned with the given dtype. If 'None', it is returned with its original dtype, otherwise the signal gets rescaled. Integer dtypes use the complete value range, float dtypes the range [-1, +1]. Examples -------- Processor for loading the first two seconds of an audio file, re-sampling it to 22.05 kHz and down-mixing it to mono: >>> proc = SignalProcessor(sample_rate=22050, num_channels=1, stop=2) >>> sig = proc('tests/data/audio/sample.wav') >>> sig Signal([-2470, -2553, ..., -173, -265], dtype=int16) >>> sig.sample_rate 22050 >>> sig.num_channels 1 >>> sig.length 2.0 """ def __init__(self, sample_rate=SAMPLE_RATE, num_channels=NUM_CHANNELS, start=START, stop=STOP, norm=NORM, gain=GAIN, dtype=DTYPE, **kwargs): # pylint: disable=unused-argument self.sample_rate = sample_rate self.num_channels = num_channels self.start = start self.stop = stop self.norm = norm self.gain = gain self.dtype = dtype
[docs] def process(self, data, **kwargs): """ Processes the given audio file. Parameters ---------- data : numpy array, str or file handle Data to be processed. kwargs : dict, optional Keyword arguments passed to :class:`Signal`. Returns ------- signal : :class:`Signal` instance :class:`Signal` instance. """ # pylint: disable=unused-argument # update arguments passed to FramedSignal args = dict(sample_rate=self.sample_rate, num_channels=self.num_channels, start=self.start, stop=self.stop, norm=self.norm, gain=self.gain, dtype=self.dtype) args.update(kwargs) # instantiate a Signal and return it return Signal(data, **args)
[docs] @staticmethod def add_arguments(parser, sample_rate=None, mono=None, start=None, stop=None, norm=None, gain=None): """ Add signal processing related arguments to an existing parser. Parameters ---------- parser : argparse parser instance Existing argparse parser object. sample_rate : int, optional Re-sample the signal to this sample rate [Hz]. mono : bool, optional Down-mix the signal to mono. start : float, optional Start position [seconds]. stop : float, optional Stop position [seconds]. norm : bool, optional Normalize the signal to the range [-1, +1]. gain : float, optional Adjust the gain of the signal [dB]. Returns ------- argparse argument group Signal processing argument parser group. Notes ----- Parameters are included in the group only if they are not 'None'. To include `start` and `stop` arguments with a default value of 'None', i.e. do not set any start or stop time, they can be set to 'True'. """ # add signal processing options to the existing parser g = parser.add_argument_group('signal processing arguments') if sample_rate is not None: g.add_argument('--sample_rate', action='store', type=int, default=sample_rate, help='re-sample the signal to ' 'this sample rate [Hz]') if mono is not None: g.add_argument('--mono', dest='num_channels', action='store_const', const=1, help='down-mix the signal to mono') if start is not None: g.add_argument('--start', action='store', type=float, help='start position of the signal [seconds]') if stop is not None: g.add_argument('--stop', action='store', type=float, help='stop position of the signal [seconds]') if norm is not None: g.add_argument('--norm', action='store_true', default=norm, help='normalize the signal [default=%(default)s]') if gain is not None: g.add_argument('--gain', action='store', type=float, default=gain, help='adjust the gain of the signal ' '[dB, default=%(default).1f]') # return the argument group so it can be modified if needed return g
# functions for splitting a signal into frames
[docs]def signal_frame(signal, index, frame_size, hop_size, origin=0): """ This function returns frame at `index` of the `signal`. Parameters ---------- signal : numpy array Signal. index : int Index of the frame to return. frame_size : int Size of each frame in samples. hop_size : float Hop size in samples between adjacent frames. origin : int Location of the window center relative to the signal position. Returns ------- frame : numpy array Requested frame of the signal. Notes ----- The reference sample of the first frame (index == 0) refers to the first sample of the `signal`, and each following frame is placed `hop_size` samples after the previous one. The window is always centered around this reference sample. Its location relative to the reference sample can be set with the `origin` parameter. Arbitrary integer values can be given: - zero centers the window on its reference sample - negative values shift the window to the right - positive values shift the window to the left An `origin` of half the size of the `frame_size` results in windows located to the left of the reference sample, i.e. the first frame starts at the first sample of the signal. The part of the frame which is not covered by the signal is padded with zeros. This function is totally independent of the length of the signal. Thus, contrary to common indexing, the index '-1' refers NOT to the last frame of the signal, but instead the frame left of the first frame is returned. """ # cast variables to int frame_size = int(frame_size) # length of the signal num_samples = len(signal) # seek to the correct position in the audio signal ref_sample = int(index * hop_size) # position the window start = ref_sample - frame_size // 2 - int(origin) stop = start + frame_size # return the requested portion of the signal # Note: np.pad(signal[from: to], (pad_left, pad_right), mode='constant') # always returns a ndarray, not the subclass (and is slower); # usually np.zeros_like(signal[:frame_size]) is exactly what we want # (i.e. zeros of frame_size length and the same type/class as the # signal and not just the dtype), but since we have no guarantee that # the signal is that long, we have to use the np.repeat workaround # Note: use NumPy's advanced indexing (i.e. trailing comma) in order to # avoid a memory leak (issue #321). This returns a copy of the data, # however, returning a simple copy of the relevant portion of the # signal also leaks memory if (stop < 0) or (start > num_samples): # window falls completely outside the actual signal, return just zeros frame = np.repeat(signal[:1] * 0, frame_size, axis=0) return frame elif (start < 0) and (stop > num_samples): # window surrounds the actual signal, position signal accordingly frame = np.repeat(signal[:1] * 0, frame_size, axis=0) frame[-start:num_samples - start] = signal return frame elif start < 0: # window crosses left edge of actual signal, pad zeros from left frame = np.repeat(signal[:1] * 0, frame_size, axis=0) frame[-start:] = signal[:stop, ] return frame elif stop > num_samples: # window crosses right edge of actual signal, pad zeros from right frame = np.repeat(signal[:1] * 0, frame_size, axis=0) frame[:num_samples - start] = signal[start:, ] return frame # normal read operation return signal[start:stop, ]
FRAME_SIZE = 2048 HOP_SIZE = 441. FPS = None ORIGIN = 0 END_OF_SIGNAL = 'normal' NUM_FRAMES = None # classes for splitting a signal into frames
[docs]class FramedSignal(object): """ The :class:`FramedSignal` splits a :class:`Signal` into frames and makes it iterable and indexable. Parameters ---------- signal : :class:`Signal` instance Signal to be split into frames. frame_size : int, optional Size of one frame [samples]. hop_size : float, optional Progress `hop_size` samples between adjacent frames. fps : float, optional Use given frames per second; if set, this computes and overwrites the given `hop_size` value. origin : int, optional Location of the window relative to the reference sample of a frame. end : int or str, optional End of signal handling (see notes below). num_frames : int, optional Number of frames to return. kwargs : dict, optional If no :class:`Signal` instance was given, one is instantiated with these additional keyword arguments. Notes ----- The :class:`FramedSignal` class is implemented as an iterator. It splits the given `signal` automatically into frames of `frame_size` length with `hop_size` samples (can be float, normal rounding applies) between the frames. The reference sample of the first frame refers to the first sample of the `signal`. The location of the window relative to the reference sample of a frame can be set with the `origin` parameter (with the same behaviour as used by ``scipy.ndimage`` filters). Arbitrary integer values can be given: - zero centers the window on its reference sample, - negative values shift the window to the right, - positive values shift the window to the left. Additionally, it can have the following literal values: - 'center', 'offline': the window is centered on its reference sample, - 'left', 'past', 'online': the window is located to the left of its reference sample (including the reference sample), - 'right', 'future', 'stream': the window is located to the right of its reference sample. The `end` parameter is used to handle the end of signal behaviour and can have these values: - 'normal': stop as soon as the whole signal got covered by at least one frame (i.e. pad maximally one frame), - 'extend': frames are returned as long as part of the frame overlaps with the signal to cover the whole signal. Alternatively, `num_frames` can be used to retrieve a fixed number of frames. In order to be able to stack multiple frames obtained with different frame sizes, the number of frames to be returned must be independent from the set `frame_size`. It is not guaranteed that every sample of the signal is returned in a frame unless the `origin` is either 'right' or 'future'. If used in online real-time mode the parameters `origin` and `num_frames` should be set to 'stream' and 1, respectively. Examples -------- To chop a :class:`Signal` (or anything a :class:`Signal` can be instantiated from) into overlapping frames of size 2048 with adjacent frames being 441 samples apart: >>> sig = Signal('tests/data/audio/sample.wav') >>> sig Signal([-2494, -2510, ..., 655, 639], dtype=int16) >>> frames = FramedSignal(sig, frame_size=2048, hop_size=441) >>> frames # doctest: +ELLIPSIS <madmom.audio.signal.FramedSignal object at 0x...> >>> frames[0] Signal([ 0, 0, ..., -4666, -4589], dtype=int16) >>> frames[10] Signal([-6156, -5645, ..., -253, 671], dtype=int16) >>> frames.fps 100.0 Instead of passing a :class:`Signal` instance as the first argument, anything a :class:`Signal` can be instantiated from (e.g. a file name) can be used. We can also set the frames per second (`fps`) instead, they get converted to `hop_size` based on the `sample_rate` of the signal: >>> frames = FramedSignal('tests/data/audio/sample.wav', fps=100) >>> frames # doctest: +ELLIPSIS <madmom.audio.signal.FramedSignal object at 0x...> >>> frames[0] Signal([ 0, 0, ..., -4666, -4589], dtype=int16) >>> frames.frame_size, frames.hop_size (2048, 441.0) When trying to access an out of range frame, an IndexError is raised. Thus the FramedSignal can be used the same way as a numpy array or any other iterable. >>> frames = FramedSignal('tests/data/audio/sample.wav') >>> frames.num_frames 281 >>> frames[281] Traceback (most recent call last): IndexError: end of signal reached >>> frames.shape (281, 2048) Slices are FramedSignals itself: >>> frames[:4] # doctest: +ELLIPSIS <madmom.audio.signal.FramedSignal object at 0x...> To obtain a numpy array from a FramedSignal, simply use np.array() on the full FramedSignal or a slice of it. Please note, that this requires a full memory copy. >>> np.array(frames[2:4]) array([[ 0, 0, ..., -5316, -5405], [ 2215, 2281, ..., 561, 653]], dtype=int16) """ def __init__(self, signal, frame_size=FRAME_SIZE, hop_size=HOP_SIZE, fps=FPS, origin=ORIGIN, end=END_OF_SIGNAL, num_frames=NUM_FRAMES, **kwargs): # signal handling if not isinstance(signal, Signal): # try to instantiate a Signal signal = Signal(signal, **kwargs) # save the signal self.signal = signal # arguments for splitting the signal into frames if frame_size: self.frame_size = int(frame_size) if hop_size: self.hop_size = float(hop_size) # use fps instead of hop_size if fps: # overwrite the hop_size self.hop_size = self.signal.sample_rate / float(fps) # translate literal window location values to numeric origin if origin in ('center', 'offline'): # window centered around the origin origin = 0 elif origin in ('left', 'past', 'online'): # origin is the right edge of the frame, i.e. window to the left # Note: used when simulating online mode, where only past # information of the audio signal can be used origin = (frame_size - 1) / 2 elif origin in ('right', 'future', 'stream'): # origin is the left edge of the frame, i.e. window to the right # Note: used when operating on live audio streams where we want # to retrieve a single frame. Instead of using 'online', we # "fake" the origin in order to retrieve the complete frame # provided by FramedSignalProcessor. This is a workaround to # be able to use the same processing chain in different modes origin = -(frame_size / 2) self.origin = int(origin) # number of frames determination if num_frames is None: if end == 'extend': # return frames as long as a frame covers any signal num_frames = np.floor(len(self.signal) / float(self.hop_size) + 1) elif end == 'normal': # return frames as long as the origin sample covers the signal num_frames = np.ceil(len(self.signal) / float(self.hop_size)) else: raise ValueError("end of signal handling '%s' unknown" % end) self.num_frames = int(num_frames) # make the object indexable / iterable def __getitem__(self, index): """ This makes the :class:`FramedSignal` class indexable and/or iterable. The signal is split into frames (of length `frame_size`) automatically. Two frames are located `hop_size` samples apart. If `hop_size` is a float, normal rounding applies. """ # a single index is given if isinstance(index, integer_types): # negative indices if index < 0: index += self.num_frames # return the frame at the given index if index < self.num_frames: return signal_frame(self.signal, index, frame_size=self.frame_size, hop_size=self.hop_size, origin=self.origin) # otherwise raise an error to indicate the end of signal raise IndexError("end of signal reached") # a slice is given elif isinstance(index, slice): # determine the frames to return (limited to the number of frames) start, stop, step = index.indices(self.num_frames) # allow only normal steps if step != 1: raise ValueError('only slices with a step size of 1 supported') # determine the number of frames num_frames = stop - start # determine the new origin, i.e. start position origin = self.origin - self.hop_size * start # return a new FramedSignal instance covering the requested frames return FramedSignal(self.signal, frame_size=self.frame_size, hop_size=self.hop_size, origin=origin, num_frames=num_frames) # other index types are invalid else: raise TypeError("frame indices must be slices or integers") # len() returns the number of frames, consistent with __getitem__() def __len__(self): return self.num_frames @property def frame_rate(self): """Frame rate (same as fps).""" # n/a if the signal has no sample rate if self.signal.sample_rate is None: return None return float(self.signal.sample_rate) / self.hop_size @property def fps(self): """Frames per second.""" return self.frame_rate @property def overlap_factor(self): """Overlapping factor of two adjacent frames.""" return 1.0 - self.hop_size / self.frame_size @property def shape(self): """ Shape of the FramedSignal (num_frames, frame_size[, num_channels]). """ shape = self.num_frames, self.frame_size if self.signal.num_channels != 1: shape += (self.signal.num_channels, ) return shape @property def ndim(self): """Dimensionality of the FramedSignal.""" return len(self.shape)
[docs] def energy(self): """Energy of the individual frames.""" return energy(self)
[docs] def root_mean_square(self): """Root mean square of the individual frames.""" return root_mean_square(self)
rms = root_mean_square
[docs] def sound_pressure_level(self): """Sound pressure level of the individual frames.""" return sound_pressure_level(self)
spl = sound_pressure_level
[docs]class FramedSignalProcessor(Processor): """ Slice a Signal into frames. Parameters ---------- frame_size : int, optional Size of one frame [samples]. hop_size : float, optional Progress `hop_size` samples between adjacent frames. fps : float, optional Use given frames per second; if set, this computes and overwrites the given `hop_size` value. origin : int, optional Location of the window relative to the reference sample of a frame. end : int or str, optional End of signal handling (see :class:`FramedSignal`). num_frames : int, optional Number of frames to return. Notes ----- When operating on live audio signals, `origin` must be set to 'stream' in order to retrieve always the last `frame_size` samples. Examples -------- Processor for chopping a :class:`Signal` (or anything a :class:`Signal` can be instantiated from) into overlapping frames of size 2048, and a frame rate of 100 frames per second: >>> proc = FramedSignalProcessor(frame_size=2048, fps=100) >>> frames = proc('tests/data/audio/sample.wav') >>> frames # doctest: +ELLIPSIS <madmom.audio.signal.FramedSignal object at 0x...> >>> frames[0] Signal([ 0, 0, ..., -4666, -4589], dtype=int16) >>> frames[10] Signal([-6156, -5645, ..., -253, 671], dtype=int16) >>> frames.hop_size 441.0 """ def __init__(self, frame_size=FRAME_SIZE, hop_size=HOP_SIZE, fps=FPS, origin=ORIGIN, end=END_OF_SIGNAL, num_frames=NUM_FRAMES, **kwargs): # pylint: disable=unused-argument self.frame_size = frame_size self.hop_size = hop_size self.fps = fps # do not convert here, pass it to FramedSignal self.origin = origin self.end = end self.num_frames = num_frames
[docs] def process(self, data, **kwargs): """ Slice the signal into (overlapping) frames. Parameters ---------- data : :class:`Signal` instance Signal to be sliced into frames. kwargs : dict, optional Keyword arguments passed to :class:`FramedSignal`. Returns ------- frames : :class:`FramedSignal` instance FramedSignal instance """ # update arguments passed to FramedSignal args = dict(frame_size=self.frame_size, hop_size=self.hop_size, fps=self.fps, origin=self.origin, end=self.end, num_frames=self.num_frames) args.update(kwargs) # always use the last `frame_size` samples if we operate on a live # audio stream, otherwise we get the wrong portion of the signal if self.origin == 'stream': data = data[-self.frame_size:] # instantiate a FramedSignal from the data and return it return FramedSignal(data, **args)
[docs] @staticmethod def add_arguments(parser, frame_size=FRAME_SIZE, fps=FPS, online=None): """ Add signal framing related arguments to an existing parser. Parameters ---------- parser : argparse parser instance Existing argparse parser object. frame_size : int, optional Size of one frame in samples. fps : float, optional Frames per second. online : bool, optional Online mode (use only past signal information, i.e. align the window to the left of the reference sample). Returns ------- argparse argument group Signal framing argument parser group. Notes ----- Parameters are included in the group only if they are not 'None'. """ # add signal framing options to the existing parser g = parser.add_argument_group('signal framing arguments') # depending on the type of frame_size, use different options if isinstance(frame_size, integer_types): g.add_argument('--frame_size', action='store', type=int, default=frame_size, help='frame size [samples, default=%(default)i]') elif isinstance(frame_size, list): # Note: this option can be used to stack multiple spectrograms # with different frame sizes from ..utils import OverrideDefaultListAction g.add_argument('--frame_size', type=int, default=frame_size, action=OverrideDefaultListAction, sep=',', help='(comma separated list of) frame size(s) to ' 'use [samples, default=%(default)s]') if fps is not None: g.add_argument('--fps', action='store', type=float, default=fps, help='frames per second [default=%(default).1f]') if online is False: g.add_argument('--online', dest='origin', action='store_const', const='online', default='offline', help='operate in online mode [default=offline]') elif online is True: g.add_argument('--offline', dest='origin', action='store_const', const='offline', default='online', help='operate in offline mode [default=online]') # return the argument group so it can be modified if needed return g
# class for online processing
[docs]class Stream(object): """ A Stream handles live (i.e. online, real-time) audio input via PyAudio. Parameters ---------- sample_rate : int Sample rate of the signal. num_channels : int, optional Number of channels. dtype : numpy dtype, optional Data type for the signal. frame_size : int, optional Size of one frame [samples]. hop_size : int, optional Progress `hop_size` samples between adjacent frames. fps : float, optional Use given frames per second; if set, this computes and overwrites the given `hop_size` value (the resulting `hop_size` must be an integer). queue_size : int Size of the FIFO (first in first out) queue. If the queue is full and new audio samples arrive, the oldest item in the queue will be dropped. Notes ----- Stream is implemented as an iterable which blocks until enough new data is available. """ def __init__(self, sample_rate=SAMPLE_RATE, num_channels=NUM_CHANNELS, dtype=np.float32, frame_size=FRAME_SIZE, hop_size=HOP_SIZE, fps=FPS, **kwargs): # import PyAudio here and not at the module level import pyaudio # set attributes self.sample_rate = sample_rate self.num_channels = 1 if None else num_channels self.dtype = dtype if frame_size: self.frame_size = int(frame_size) if fps: # use fps instead of hop_size hop_size = self.sample_rate / float(fps) if int(hop_size) != hop_size: raise ValueError( 'only integer `hop_size` supported, not %s' % hop_size) self.hop_size = int(hop_size) # init PyAudio self.pa = pyaudio.PyAudio() # init a stream to read audio samples from self.stream = self.pa.open(rate=self.sample_rate, channels=self.num_channels, format=pyaudio.paFloat32, input=True, frames_per_buffer=self.hop_size, start=True) # create a buffer self.buffer = BufferProcessor(self.frame_size) # frame index counter self.frame_idx = 0 # PyAudio flags self.paComplete = pyaudio.paComplete self.paContinue = pyaudio.paContinue def __iter__(self): return self def __next__(self): # get the desired number of samples (block until all are present) data = self.stream.read(self.hop_size, exception_on_overflow=False) # convert it to a numpy array data = np.fromstring(data, 'float32').astype(self.dtype, copy=False) # buffer the data (i.e. append hop_size samples and rotate) data = self.buffer(data) # wrap the last frame_size samples as a Signal # TODO: check float / int hop size; theoretically a float hop size # can be accomplished by making the buffer N samples bigger and # take the correct portion of the buffer start = (self.frame_idx * float(self.hop_size) / self.sample_rate) signal = Signal(data[-self.frame_size:], sample_rate=self.sample_rate, dtype=self.dtype, num_channels=self.num_channels, start=start) # increment the frame index self.frame_idx += 1 return signal next = __next__ def is_running(self): return self.stream.is_active() def close(self): self.stream.close() # TODO: is this the correct place to terminate PyAudio? self.pa.terminate() @property def shape(self): """Shape of the Stream (None, frame_size[, num_channels]).""" shape = None, self.frame_size if self.signal.num_channels != 1: shape += (self.signal.num_channels,) return shape