# encoding: utf-8
# pylint: disable=no-member
# pylint: disable=invalid-name
# pylint: disable=too-many-arguments
"""
This module contains basic signal processing functionality.
"""
from __future__ import absolute_import, division, print_function
import numpy as np
from madmom.processors import Processor
# signal functions
[docs]def smooth(signal, kernel):
"""
Smooth the signal along its first axis.
Parameters
----------
signal : numpy array
Signal to be smoothed.
kernel : numpy array or int
Smoothing kernel (size).
Returns
-------
numpy array
Smoothed signal.
Notes
-----
If `kernel` is an integer, a Hamming window of that length will be used
as a smoothing kernel.
"""
# check if a kernel is given
if kernel is None:
return signal
# size for the smoothing kernel is given
elif isinstance(kernel, int):
if kernel == 0:
return signal
elif kernel > 1:
# use a Hamming window of given length
kernel = np.hamming(kernel)
else:
raise ValueError("can't create a smoothing kernel of size %d" %
kernel)
# otherwise use the given smoothing kernel directly
elif isinstance(kernel, np.ndarray) and len(kernel) > 1:
kernel = kernel
else:
raise ValueError("can't smooth signal with %s" % kernel)
# convolve with the kernel and return
if signal.ndim == 1:
return np.convolve(signal, kernel, 'same')
elif signal.ndim == 2:
from scipy.signal import convolve2d
return convolve2d(signal, kernel[:, np.newaxis], 'same')
else:
raise ValueError('signal must be either 1D or 2D')
[docs]def adjust_gain(signal, gain):
""""
Adjust the gain of the signal.
Parameters
----------
signal : numpy array
Signal to be adjusted.
gain : float
Gain adjustment level [dB].
Returns
-------
numpy array
Signal with adjusted gain.
Notes
-----
The signal is returned with the same dtype, thus rounding errors may occur
with integer dtypes.
`gain` values > 0 amplify the signal and are only supported for signals
with float dtype to prevent clipping and integer overflows.
"""
# convert the gain in dB to a scaling factor
gain = np.power(np.sqrt(10.), 0.1 * gain)
# prevent overflow and clipping
if gain > 1 and np.issubdtype(signal.dtype, np.int):
raise ValueError('positive gain adjustments are only supported for '
'float dtypes.')
# Note: np.asanyarray returns the signal's ndarray subclass
return np.asanyarray(signal * gain, dtype=signal.dtype)
[docs]def attenuate(signal, attenuation):
"""
Attenuate the signal.
Parameters
----------
signal : numpy array
Signal to be attenuated.
attenuation : float
Attenuation level [dB].
Returns
-------
numpy array
Attenuated signal (same dtype as `signal`).
Notes
-----
The signal is returned with the same dtype, thus rounding errors may occur
with integer dtypes.
"""
# return the signal unaltered if no attenuation is given
if attenuation == 0:
return signal
return adjust_gain(signal, -attenuation)
[docs]def normalize(signal):
"""
Normalize the signal to have maximum amplitude.
Parameters
----------
signal : numpy array
Signal to be normalized.
Returns
-------
numpy array
Normalized signal.
Notes
-----
Signals with float dtypes cover the range [-1, +1], signals with integer
dtypes will cover the maximally possible range, e.g. [-32768, 32767] for
np.int16.
The signal is returned with the same dtype, thus rounding errors may occur
with integer dtypes.
"""
# scaling factor to be applied
scaling = float(np.max(np.abs(signal)))
if np.issubdtype(signal.dtype, np.int):
if signal.dtype in (np.int16, np.int32):
scaling /= np.iinfo(signal.dtype).max
else:
raise ValueError('only float and np.int16/32 dtypes supported, '
'not %s.' % signal.dtype)
# Note: np.asanyarray returns the signal's ndarray subclass
return np.asanyarray(signal / scaling, dtype=signal.dtype)
[docs]def remix(signal, num_channels):
"""
Remix the signal to have the desired number of channels.
Parameters
----------
signal : numpy array
Signal to be remixed.
num_channels : int
Number of channels.
Returns
-------
numpy array
Remixed signal (same dtype as `signal`).
Notes
-----
This function does not support arbitrary channel number conversions.
Only down-mixing to and up-mixing from mono signals is supported.
The signal is returned with the same dtype, thus rounding errors may occur
with integer dtypes.
If the signal should be down-mixed to mono and has an integer dtype, it
will be converted to float internally and then back to the original dtype
to prevent clipping of the signal. To avoid this double conversion,
convert the dtype first.
"""
# convert to the desired number of channels
if num_channels == signal.ndim or num_channels is None:
# return as many channels as there are.
return signal
elif num_channels == 1 and signal.ndim > 1:
# down-mix to mono
# Note: to prevent clipping, the signal is converted to float first
# and then converted back to the original dtype
# TODO: add weighted mixing
return np.mean(signal, axis=-1).astype(signal.dtype)
elif num_channels > 1 and signal.ndim == 1:
# up-mix a mono signal simply by copying channels
return np.tile(signal[:, np.newaxis], num_channels)
else:
# any other channel conversion is not supported
raise NotImplementedError("Requested %d channels, but got %d channels "
"and channel conversion is not implemented."
% (num_channels, signal.shape[1]))
[docs]def rescale(signal, dtype=np.float32):
"""
Rescale the signal to range [-1, 1] and return as float dtype.
Parameters
----------
signal : numpy array
Signal to be remixed.
dtype : numpy dtype
Data type of the signal.
Returns
-------
numpy array
Signal rescaled to range [-1, 1].
"""
# allow only float dtypes
if not np.issubdtype(dtype, np.float):
raise ValueError('only float dtypes are supported, not %s.' % dtype)
# float signals don't need rescaling
if np.issubdtype(signal.dtype, np.float):
return signal.astype(dtype)
elif np.issubdtype(signal.dtype, np.int):
return signal.astype(dtype) / np.iinfo(signal.dtype).max
else:
# TODO: not sure if this can happen or not. Either add the
# functionality if it is supposed to work or add a test
raise ValueError('unsupported signal dtypes: %s.' % signal.dtype)
[docs]def trim(signal, where='fb'):
"""
Trim leading and trailing zeros of the signal.
Parameters
----------
signal : numpy array
Signal to be trimmed.
where : str, optional
A string with 'f' representing trim from front and 'b' to trim from
back. Default is 'fb', trim zeros from both ends of the signal.
Returns
-------
numpy array
Trimmed signal.
"""
# code borrowed from np.trim_zeros()
first = 0
where = where.upper()
if 'F' in where:
for i in signal:
if np.sum(i) != 0.:
break
else:
first += 1
last = len(signal)
if 'B' in where:
for i in signal[::-1]:
if np.sum(i) != 0.:
break
else:
last -= 1
return signal[first:last]
[docs]def root_mean_square(signal):
"""
Computes the root mean square of the signal. This can be used as a
measurement of power.
Parameters
----------
signal : numpy array
Signal.
Returns
-------
rms : float
Root mean square of the signal.
"""
# make sure the signal is a numpy array
if not isinstance(signal, np.ndarray):
raise TypeError("Invalid type for signal, must be a numpy array.")
# Note: type conversion needed because of integer overflows
if signal.dtype != np.float:
signal = signal.astype(np.float)
# return
return np.sqrt(np.dot(signal.flatten(), signal.flatten()) / signal.size)
[docs]def sound_pressure_level(signal, p_ref=None):
"""
Computes the sound pressure level of a signal.
Parameters
----------
signal : numpy array
Signal.
p_ref : float, optional
Reference sound pressure level; if 'None', take the max amplitude
value for the data-type, if the data-type is float, assume amplitudes
are between -1 and +1.
Returns
-------
spl : float
Sound pressure level of the signal [dB].
Notes
-----
From http://en.wikipedia.org/wiki/Sound_pressure: Sound pressure level
(SPL) or sound level is a logarithmic measure of the effective sound
pressure of a sound relative to a reference value. It is measured in
decibels (dB) above a standard reference level.
"""
# compute the RMS
rms = root_mean_square(signal)
# compute the SPL
if rms == 0:
# return the smallest possible negative number
return -np.finfo(float).max
else:
if p_ref is None:
# find a reasonable default reference value
if np.issubdtype(signal.dtype, np.integer):
p_ref = float(np.iinfo(signal.dtype).max)
else:
p_ref = 1.0
# normal SPL computation
return 20.0 * np.log10(rms / p_ref)
[docs]def load_wave_file(filename, sample_rate=None, num_channels=None, start=None,
stop=None, dtype=None):
"""
Load the audio data from the given file and return it as a numpy array.
Only supports wave files, does not support re-sampling or arbitrary
channel number conversions. Reads the data as a memory-mapped file with
copy-on-write semantics to defer I/O costs until needed.
Parameters
----------
filename : string
Name of the file.
sample_rate : int, optional
Desired sample rate of the signal [Hz], or 'None' to return the
signal in its original rate.
num_channels : int, optional
Reduce or expand the signal to `num_channels` channels, or 'None'
to return the signal with its original channels.
start : float, optional
Start position [seconds].
stop : float, optional
Stop position [seconds].
dtype : numpy data type, optional
The data is returned with the given dtype. If 'None', it is returned
with its original dtype, otherwise the signal gets rescaled. Integer
dtypes use the complete value range, float dtypes the range [-1, +1].
Returns
-------
signal : numpy array
Audio signal.
sample_rate : int
Sample rate of the signal [Hz].
Notes
-----
The `start` and `stop` positions are rounded to the closest sample; the
sample corresponding to the `stop` value is not returned, thus consecutive
segment starting with the previous `stop` can be concatenated to obtain
the original signal without gaps or overlaps.
"""
from scipy.io import wavfile
file_sample_rate, signal = wavfile.read(filename, mmap=True)
# if the sample rate is not the desired one, raise exception
if sample_rate is not None and sample_rate != file_sample_rate:
raise ValueError('Requested sample rate of %f Hz, but got %f Hz and '
're-sampling is not implemented.' %
(sample_rate, file_sample_rate))
# same for the data type
if dtype is not None and signal.dtype != dtype:
raise ValueError('Requested dtype %s, but got %s and re-scaling is '
'not implemented.' % (dtype, signal.dtype))
# only request the desired part of the signal
if start is not None:
start = int(start * file_sample_rate)
if stop is not None:
stop = min(len(signal), int(stop * file_sample_rate))
if start is not None or stop is not None:
signal = signal[start: stop]
# up-/down-mix if needed
if num_channels is not None:
signal = remix(signal, num_channels)
# return the signal
return signal, file_sample_rate
[docs]class LoadAudioFileError(Exception):
"""
Exception to be raised whenever an audio file could not be loaded.
"""
# pylint: disable=super-init-not-called
def __init__(self, value=None):
if value is None:
value = 'Could not load audio file.'
self.value = value
def __str__(self):
return repr(self.value)
# function for automatically determining how to open audio files
[docs]def load_audio_file(filename, sample_rate=None, num_channels=None, start=None,
stop=None, dtype=None):
"""
Load the audio data from the given file and return it as a numpy array.
This tries load_wave_file() load_ffmpeg_file() (for ffmpeg and avconv).
Parameters
----------
filename : str or file handle
Name of the file or file handle.
sample_rate : int, optional
Desired sample rate of the signal [Hz], or 'None' to return the
signal in its original rate.
num_channels: int, optional
Reduce or expand the signal to `num_channels` channels, or 'None'
to return the signal with its original channels.
start : float, optional
Start position [seconds].
stop : float, optional
Stop position [seconds].
dtype : numpy data type, optional
The data is returned with the given dtype. If 'None', it is returned
with its original dtype, otherwise the signal gets rescaled. Integer
dtypes use the complete value range, float dtypes the range [-1, +1].
Returns
-------
signal : numpy array
Audio signal.
sample_rate : int
Sample rate of the signal [Hz].
Notes
-----
For wave files, the `start` and `stop` positions are rounded to the closest
sample; the sample corresponding to the `stop` value is not returned, thus
consecutive segment starting with the previous `stop` can be concatenated
to obtain the original signal without gaps or overlaps.
For all other audio files, this can not be guaranteed.
"""
from subprocess import CalledProcessError
from .ffmpeg import load_ffmpeg_file
# determine the name of the file if it is a file handle
if not isinstance(filename, str):
# close the file handle if it is open
filename.close()
# use the file name
filename = filename.name
# try reading as a wave file
error = "All attempts to load audio file %r failed." % filename
try:
return load_wave_file(filename, sample_rate=sample_rate,
num_channels=num_channels, start=start,
stop=stop, dtype=dtype)
except ValueError:
pass
# not a wave file (or other sample rate requested), try ffmpeg
try:
return load_ffmpeg_file(filename, sample_rate=sample_rate,
num_channels=num_channels, start=start,
stop=stop, dtype=dtype)
except OSError:
# ffmpeg is not present, try avconv
try:
return load_ffmpeg_file(filename, sample_rate=sample_rate,
num_channels=num_channels, start=start,
stop=stop, dtype=dtype,
cmd_decode='avconv', cmd_probe='avprobe')
except OSError:
error += " Try installing ffmpeg (or avconv on Ubuntu Linux)."
except CalledProcessError:
pass
except CalledProcessError:
pass
raise LoadAudioFileError(error)
# signal classes
[docs]class Signal(np.ndarray):
"""
The :class:`Signal` class represents a signal as a (memory-mapped) numpy
array and enhances it with a number of attributes.
Parameters
----------
data : numpy array, str or file handle
Signal data or file name or file handle.
sample_rate : int, optional
Desired sample rate of the signal [Hz], or 'None' to return the
signal in its original rate.
num_channels : int, optional
Reduce or expand the signal to `num_channels` channels, or 'None'
to return the signal with its original channels.
start : float, optional
Start position [seconds].
stop : float, optional
Stop position [seconds].
norm : bool, optional
Normalize the signal to the range [-1, +1].
gain : float, optional
Adjust the gain of the signal [dB].
dtype : numpy data type, optional
The data is returned with the given dtype. If 'None', it is returned
with its original dtype, otherwise the signal gets rescaled. Integer
dtypes use the complete value range, float dtypes the range [-1, +1].
Notes
-----
`sample_rate` or `num_channels` can be used to set the desired sample rate
and number of channels if the audio is read from file. If set to 'None'
the audio signal is used as is, i.e. the sample rate and number of channels
are determined directly from the audio file.
If the `data` is a numpy array, the `sample_rate` is set to the given value
and `num_channels` is set to the number of columns of the array.
The `gain` can be used to adjust the level of the signal.
If both `norm` and `gain` are set, the signal is first normalized and then
the gain is applied afterwards.
If `norm` or `gain` is set, the selected part of the signal is loaded into
memory completely, i.e. .wav files are not memory-mapped any more.
"""
# pylint: disable=super-on-old-class
# pylint: disable=super-init-not-called
# pylint: disable=attribute-defined-outside-init
def __init__(self, data, sample_rate=None, num_channels=None, start=None,
stop=None, norm=False, gain=0, dtype=None):
# this method is for documentation purposes only
pass
def __new__(cls, data, sample_rate=None, num_channels=None, start=None,
stop=None, norm=False, gain=0, dtype=None):
# try to load an audio file if the data is not a numpy array
if not isinstance(data, np.ndarray):
data, sample_rate = load_audio_file(data, sample_rate=sample_rate,
num_channels=num_channels,
start=start, stop=stop,
dtype=dtype)
# process it if needed
if norm:
# normalize signal
data = normalize(data)
if gain is not None and gain != 0:
# adjust the gain
data = adjust_gain(data, gain)
# cast as Signal
obj = np.asarray(data).view(cls)
if sample_rate is not None:
sample_rate = sample_rate
obj.sample_rate = sample_rate
# return the object
return obj
def __array_finalize__(self, obj):
if obj is None:
return
# set default values here, also needed for views of the Signal
self.sample_rate = getattr(obj, 'sample_rate', None)
@property
def num_samples(self):
"""Number of samples."""
return len(self)
@property
def num_channels(self):
"""Number of channels."""
if self.ndim == 1:
# mono file
return 1
else:
# multi channel file
return np.shape(self)[1]
@property
def length(self):
"""Length of signal in seconds."""
# n/a if the signal has no sample rate
if self.sample_rate is None:
return None
return float(self.num_samples) / self.sample_rate
[docs]class SignalProcessor(Processor):
"""
The :class:`SignalProcessor` class is a basic signal processor.
Parameters
----------
sample_rate : int, optional
Sample rate of the signal [Hz]; if set the signal will be re-sampled
to that sample rate; if 'None' the sample rate of the audio file will
be used.
num_channels : int, optional
Number of channels of the signal; if set, the signal will be reduced
to that number of channels; if 'None' as many channels as present in
the audio file are returned.
start : float, optional
Start position [seconds].
stop : float, optional
Stop position [seconds].
norm : bool, optional
Normalize the signal to the range [-1, +1].
att : float, optional
Deprecated in version 0.13, use `gain` instead.
gain : float, optional
Adjust the gain of the signal [dB].
dtype : numpy data type, optional
The data is returned with the given dtype. If 'None', it is returned
with its original dtype, otherwise the signal gets rescaled. Integer
dtypes use the complete value range, float dtypes the range [-1, +1].
"""
SAMPLE_RATE = None
NUM_CHANNELS = None
START = None
STOP = None
NORM = False
ATT = None
GAIN = 0.
def __init__(self, sample_rate=SAMPLE_RATE, num_channels=NUM_CHANNELS,
start=None, stop=None, norm=NORM, att=ATT, gain=GAIN,
**kwargs):
# pylint: disable=unused-argument
self.sample_rate = sample_rate
self.num_channels = num_channels
self.start = start
self.stop = stop
self.norm = norm
if att is not None:
raise DeprecationWarning('`att` has been renamed to `gain` in '
'v0.13 and will be removed in version '
'v0.14')
self.gain = gain
@property
def att(self):
"""Attenuation of the signal [dB]."""
raise DeprecationWarning('`att` has been renamed to `gain` in v0.13 '
'and will be removed in version v0.14.')
return -self.gain
[docs] def process(self, data, start=None, stop=None, **kwargs):
"""
Processes the given audio file.
Parameters
----------
data : numpy array, str or file handle
Data to be processed.
start : float, optional
Start position [seconds].
stop : float, optional
Stop position [seconds].
Returns
-------
signal : :class:`Signal` instance
:class:`Signal` instance.
"""
# pylint: disable=unused-argument
# overwrite the default start & stop time
if start is None:
start = self.start
if stop is None:
stop = self.stop
# instantiate a Signal
data = Signal(data, sample_rate=self.sample_rate,
num_channels=self.num_channels, start=start, stop=stop,
norm=self.norm, gain=self.gain)
# return processed data
return data
@staticmethod
[docs] def add_arguments(parser, sample_rate=None, mono=None, start=None,
stop=None, norm=None, gain=None):
"""
Add signal processing related arguments to an existing parser.
Parameters
----------
parser : argparse parser instance
Existing argparse parser object.
sample_rate : int, optional
Re-sample the signal to this sample rate [Hz].
mono : bool, optional
Down-mix the signal to mono.
start : float, optional
Start position [seconds].
stop : float, optional
Stop position [seconds].
norm : bool, optional
Normalize the signal to the range [-1, +1].
gain : float, optional
Adjust the gain of the signal [dB].
Returns
-------
argparse argument group
Signal processing argument parser group.
Notes
-----
Parameters are included in the group only if they are not 'None'. To
include `start` and `stop` arguments with a default value of 'None',
i.e. do not set any start or stop time, they can be set to 'True'.
"""
# add signal processing options to the existing parser
g = parser.add_argument_group('signal processing arguments')
if sample_rate is not None:
g.add_argument('--sample_rate', action='store_true',
default=sample_rate,
help='re-sample the signal to this sample rate '
'[Hz]')
if mono is not None:
g.add_argument('--mono', dest='num_channels', action='store_const',
const=1,
help='down-mix the signal to mono')
if start is not None:
g.add_argument('--start', action='store', type=float,
help='start position of the signal [seconds]')
if stop is not None:
g.add_argument('--stop', action='store', type=float,
help='stop position of the signal [seconds]')
if norm is not None:
g.add_argument('--norm', action='store_true', default=norm,
help='normalize the signal [default=%(default)s]')
if gain is not None:
g.add_argument('--gain', action='store', type=float, default=gain,
help='adjust the gain of the signal '
'[dB, default=%(default).1f]')
# return the argument group so it can be modified if needed
return g
# functions for splitting a signal into frames
[docs]def signal_frame(signal, index, frame_size, hop_size, origin=0):
"""
This function returns frame at `index` of the `signal`.
Parameters
----------
signal : numpy array
Signal.
index : int
Index of the frame to return.
frame_size : int
Size of each frame in samples.
hop_size : float
Hop size in samples between adjacent frames.
origin : int
Location of the window center relative to the signal position.
Returns
-------
frame : numpy array
Requested frame of the signal.
Notes
-----
The reference sample of the first frame (index == 0) refers to the first
sample of the `signal`, and each following frame is placed `hop_size`
samples after the previous one.
The window is always centered around this reference sample. Its location
relative to the reference sample can be set with the `origin` parameter.
Arbitrary integer values can be given:
- zero centers the window on its reference sample
- negative values shift the window to the right
- positive values shift the window to the left
An `origin` of half the size of the `frame_size` results in windows located
to the left of the reference sample, i.e. the first frame starts at the
first sample of the signal.
The part of the frame which is not covered by the signal is padded with
zeros.
This function is totally independent of the length of the signal. Thus,
contrary to common indexing, the index '-1' refers NOT to the last frame
of the signal, but instead the frame left of the first frame is returned.
"""
# length of the signal
num_samples = len(signal)
# seek to the correct position in the audio signal
ref_sample = int(index * hop_size)
# position the window
start = ref_sample - frame_size // 2 - int(origin)
stop = start + frame_size
# return the requested portion of the signal
# Note: np.pad(signal[from: to], (pad_left, pad_right), mode='constant')
# always returns a ndarray, not the subclass (and is slower);
# usually np.zeros_like(signal[:frame_size]) is exactly what we want
# (i.e. zeros of frame_size length and the same type/class as the
# signal and not just the dtype), but since we have no guarantee that
# the signal is that long, we have to use the np.repeat workaround
if (stop < 0) or (start > num_samples):
# window falls completely outside the actual signal, return just zeros
frame = np.repeat(signal[:1] * 0, frame_size, axis=0)
return frame
elif (start < 0) and (stop > num_samples):
# window surrounds the actual signal, position signal accordingly
frame = np.repeat(signal[:1] * 0, frame_size, axis=0)
frame[-start:num_samples - start] = signal
return frame
elif start < 0:
# window crosses left edge of actual signal, pad zeros from left
frame = np.repeat(signal[:1] * 0, frame_size, axis=0)
frame[-start:] = signal[:stop]
return frame
elif stop > num_samples:
# window crosses right edge of actual signal, pad zeros from right
frame = np.repeat(signal[:1] * 0, frame_size, axis=0)
frame[:num_samples - start] = signal[start:]
return frame
else:
# normal read operation
return signal[start:stop]
# classes for splitting a signal into frames
[docs]class FramedSignal(object):
"""
The :class:`FramedSignal` splits a :class:`Signal` into frames and makes it
iterable and indexable.
Parameters
----------
signal : :class:`Signal` instance
Signal to be split into frames.
frame_size : int, optional
Size of one frame [samples].
hop_size : float, optional
Progress `hop_size` samples between adjacent frames.
fps : float, optional
Use given frames per second; if set, this computes and overwrites the
given `hop_size` value.
origin : int, optional
Location of the window relative to the reference sample of a frame.
end : int or str, optional
End of signal handling (see notes below).
num_frames : int, optional
Number of frames to return.
kwargs : dict, optional
If no :class:`Signal` instance was given, one is instantiated with
these additional keyword arguments.
Notes
-----
The :class:`FramedSignal` class is implemented as an iterator. It splits
the given `signal` automatically into frames of `frame_size` length with
`hop_size` samples (can be float, normal rounding applies) between the
frames. The reference sample of the first frame refers to the first sample
of the `signal`.
The location of the window relative to the reference sample of a frame can
be set with the `origin` parameter (with the same behaviour as used by
``scipy.ndimage`` filters). Arbitrary integer values can be given:
- zero centers the window on its reference sample,
- negative values shift the window to the right,
- positive values shift the window to the left.
Additionally, it can have the following literal values:
- 'center', 'offline': the window is centered on its reference sample,
- 'left', 'past', 'online': the window is located to the left of its
reference sample (including the reference sample),
- 'right', 'future': the window is located to the right of its reference
sample.
The `end` parameter is used to handle the end of signal behaviour and
can have these values:
- 'normal': stop as soon as the whole signal got covered by at least one
frame (i.e. pad maximally one frame),
- 'extend': frames are returned as long as part of the frame overlaps
with the signal to cover the whole signal.
Alternatively, `num_frames` can be used to retrieve a fixed number of
frames.
In order to be able to stack multiple frames obtained with different frame
sizes, the number of frames to be returned must be independent from the set
`frame_size`. It is not guaranteed that every sample of the signal is
returned in a frame unless the `origin` is either 'right' or 'future'.
"""
def __init__(self, signal, frame_size=2048, hop_size=441., fps=None,
origin=0, end='normal', num_frames=None, **kwargs):
# signal handling
if not isinstance(signal, Signal):
# try to instantiate a Signal
signal = Signal(signal, **kwargs)
# save the signal
self.signal = signal
# arguments for splitting the signal into frames
if frame_size:
self.frame_size = int(frame_size)
if hop_size:
self.hop_size = float(hop_size)
# use fps instead of hop_size
if fps:
# overwrite the hop_size
self.hop_size = self.signal.sample_rate / float(fps)
# translate literal window location values to numeric origin
if origin in ('center', 'offline'):
# the current position is the center of the frame
origin = 0
elif origin in ('left', 'past', 'online'):
# the current position is the right edge of the frame
# this is usually used when simulating online mode, where only past
# information of the audio signal can be used
origin = (frame_size - 1) / 2
elif origin in ('right', 'future'):
# the current position is the left edge of the frame
origin = -(frame_size / 2)
self.origin = int(origin)
# number of frames determination
if num_frames is None:
if end == 'extend':
# return frames as long as a frame covers any signal
num_frames = np.floor(len(self.signal) /
float(self.hop_size) + 1)
elif end == 'normal':
# return frames as long as the origin sample covers the signal
num_frames = np.ceil(len(self.signal) / float(self.hop_size))
else:
raise ValueError("end of signal handling '%s' unknown" %
end)
self.num_frames = int(num_frames)
# make the object indexable / iterable
def __getitem__(self, index):
"""
This makes the :class:`FramedSignal` class indexable and/or iterable.
The signal is split into frames (of length `frame_size`) automatically.
Two frames are located `hop_size` samples apart. If `hop_size` is a
float, normal rounding applies.
"""
# a single index is given
if isinstance(index, int):
# negative indices
if index < 0:
index += self.num_frames
# return the frame at the given index
if index < self.num_frames:
return signal_frame(self.signal, index,
frame_size=self.frame_size,
hop_size=self.hop_size, origin=self.origin)
# otherwise raise an error to indicate the end of signal
raise IndexError("end of signal reached")
# a slice is given
elif isinstance(index, slice):
# determine the frames to return (limited to the number of frames)
start, stop, step = index.indices(self.num_frames)
# allow only normal steps
if step != 1:
raise ValueError('only slices with a step size of 1 supported')
# determine the number of frames
num_frames = stop - start
# determine the new origin, i.e. start position
origin = self.origin - self.hop_size * start
# return a new FramedSignal instance covering the requested frames
return FramedSignal(self.signal, frame_size=self.frame_size,
hop_size=self.hop_size, origin=origin,
num_frames=num_frames)
# other index types are invalid
else:
raise TypeError("frame indices must be slices or integers")
# len() returns the number of frames, consistent with __getitem__()
def __len__(self):
return self.num_frames
@property
def frame_rate(self):
"""Frame rate (same as fps)."""
# n/a if the signal has no sample rate
if self.signal.sample_rate is None:
return None
return float(self.signal.sample_rate) / self.hop_size
@property
def fps(self):
"""Frames per second."""
return self.frame_rate
@property
def overlap_factor(self):
"""Overlapping factor of two adjacent frames."""
return 1.0 - self.hop_size / self.frame_size
@property
def shape(self):
"""Shape of the FramedSignal (frames x samples)."""
shape = self.num_frames, self.frame_size
if self.signal.num_channels != 1:
shape += (self.signal.num_channels, )
return shape
@property
def ndim(self):
"""Dimensionality of the FramedSignal."""
return len(self.shape)
[docs]class FramedSignalProcessor(Processor):
"""
Slice a Signal into frames.
Parameters
----------
frame_size : int, optional
Size of one frame [samples].
hop_size : float, optional
Progress `hop_size` samples between adjacent frames.
fps : float, optional
Use given frames per second; if set, this computes and overwrites the
given `hop_size` value.
online : bool, optional
Operate in online mode (see notes below).
end : int or str, optional
End of signal handling (see :class:`FramedSignal`).
num_frames : int, optional
Number of frames to return.
kwargs : dict, optional
If no :class:`Signal` instance was given, one is instantiated with
these additional keyword arguments.
Notes
-----
The location of the window relative to its reference sample can be set
with the `online` parameter:
- 'False': the window is centered on its reference sample,
- 'True': the window is located to the left of its reference sample
(including the reference sample), i.e. only past information is used.
"""
FRAME_SIZE = 2048
HOP_SIZE = 441.
FPS = 100.
START = 0
END_OF_SIGNAL = 'normal'
def __init__(self, frame_size=FRAME_SIZE, hop_size=HOP_SIZE, fps=None,
online=False, end=END_OF_SIGNAL, **kwargs):
# pylint: disable=unused-argument
self.frame_size = frame_size
self.hop_size = hop_size
self.fps = fps # do not convert here, pass it to FramedSignal
self.online = online
self.end = end
[docs] def process(self, data, **kwargs):
"""
Slice the signal into (overlapping) frames.
Parameters
----------
data : :class:`Signal` instance
Signal to be sliced into frames.
kwargs : dict
Keyword arguments passed to :class:`FramedSignal` to instantiate
the returned object.
Returns
-------
frames : :class:`FramedSignal` instance
FramedSignal instance
"""
# translate online / offline mode
if self.online:
origin = 'online'
else:
origin = 'offline'
# instantiate a FramedSignal from the data and return it
return FramedSignal(data, frame_size=self.frame_size,
hop_size=self.hop_size, fps=self.fps,
origin=origin, end=self.end, **kwargs)
@staticmethod
[docs] def add_arguments(parser, frame_size=FRAME_SIZE, fps=FPS,
online=None):
"""
Add signal framing related arguments to an existing parser.
Parameters
----------
parser : argparse parser instance
Existing argparse parser object.
frame_size : int, optional
Size of one frame in samples.
fps : float, optional
Frames per second.
online : bool, optional
Online mode (use only past signal information, i.e. align the
window to the left of the reference sample).
Returns
-------
argparse argument group
Signal framing argument parser group.
Notes
-----
Parameters are included in the group only if they are not 'None'.
"""
# add signal framing options to the existing parser
g = parser.add_argument_group('signal framing arguments')
# depending on the type of frame_size, use different options
if isinstance(frame_size, int):
g.add_argument('--frame_size', action='store', type=int,
default=frame_size,
help='frame size [samples, default=%(default)i]')
elif isinstance(frame_size, list):
# Note: this option is used for e.g. stacking multiple spectrograms
# with different frame sizes
from madmom.utils import OverrideDefaultListAction
g.add_argument('--frame_size', type=int, default=frame_size,
action=OverrideDefaultListAction, sep=',',
help='(comma separated list of) frame size(s) to '
'use [samples, default=%(default)s]')
if fps is not None:
g.add_argument('--fps', action='store', type=float, default=fps,
help='frames per second [default=%(default).1f]')
if online is False:
g.add_argument('--online', dest='online', action='store_true',
help='operate in online mode [default=offline]')
elif online is True:
g.add_argument('--offline', dest='online', action='store_false',
help='operate in offline mode [default=online]')
# return the argument group so it can be modified if needed
return g