# encoding: utf-8
# pylint: disable=no-member
# pylint: disable=invalid-name
# pylint: disable=too-many-arguments
# pylint: disable=wrong-import-position
"""
Utility package.
"""
from __future__ import absolute_import, division, print_function
import argparse
import contextlib
import numpy as np
# Python 2/3 string compatibility (like six does it)
try:
string_types = basestring
integer_types = (int, long, np.integer)
except NameError:
string_types = str
integer_types = (int, np.integer)
# decorator to suppress warnings
[docs]def suppress_warnings(function):
"""
Decorate the given function to suppress any warnings.
Parameters
----------
function : function
Function to be decorated.
Returns
-------
decorated function
Decorated function.
"""
# needed to preserve docstring of the decorated function
from functools import wraps
@wraps(function)
def decorator_function(*args, **kwargs):
"""
Decorator function to suppress warnings.
Parameters
----------
args : arguments, optional
Arguments passed to function to be decorated.
kwargs : keyword arguments, optional
Keyword arguments passed to function to be decorated.
Returns
-------
decorated function
Decorated function.
"""
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
return function(*args, **kwargs)
return decorator_function
# file handling routines
[docs]def filter_files(files, suffix):
"""
Filter the list to contain only files matching the given `suffix`.
Parameters
----------
files : list
List of files to be filtered.
suffix : str
Return only files matching this suffix.
Returns
-------
list
List of files.
"""
import fnmatch
# make sure files is a list
if not isinstance(files, list):
files = [files]
# no suffix given, return the list unaltered
if suffix is None:
return files
# filter the files with the given suffix
file_list = []
if isinstance(suffix, list):
# a list of suffices is given
for s in suffix:
file_list.extend(fnmatch.filter(files, "*%s" % s))
else:
# a single suffix is given
file_list.extend(fnmatch.filter(files, "*%s" % suffix))
# return the filtered list
return file_list
[docs]def search_path(path, recursion_depth=0):
"""
Returns a list of files in a directory (recursively).
Parameters
----------
path : str or list
Directory to be searched.
recursion_depth : int, optional
Recursively search sub-directories up to this depth.
Returns
-------
list
List of files.
"""
# adapted from http://stackoverflow.com/a/234329
import os
# remove the rightmost path separator (needed for recursion depth count)
path = path.rstrip(os.path.sep)
# we can only handle directories
if not os.path.isdir(path):
raise IOError("%s is not a directory." % path)
# files to be returned
file_list = []
# keep track of the initial recursion depth
initial_depth = path.count(os.path.sep)
for root, dirs, files in os.walk(path):
# add all files of this directory to the list
for f in files:
file_list.append(os.path.join(root, f))
# remove all sub directories exceeding the wanted recursion depth
if initial_depth + recursion_depth <= root.count(os.path.sep):
del dirs[:]
# return the sorted file list
return sorted(file_list)
[docs]def search_files(files, suffix=None, recursion_depth=0):
"""
Returns the files matching the given `suffix`.
Parameters
----------
files : str or list
File, path or a list thereof to be searched / filtered.
suffix : str, optional
Return only files matching this suffix.
recursion_depth : int, optional
Recursively search sub-directories up to this depth.
Returns
-------
list
List of files.
Notes
-----
The list of returned files is sorted.
"""
import os
file_list = []
# determine the files
if isinstance(files, list):
# a list is given, recursively call the function on each element
for f in files:
file_list.extend(search_files(f))
elif os.path.isdir(files):
# add all files in the given path (up to the given recursion depth)
file_list.extend(search_path(files, recursion_depth))
elif os.path.isfile(files):
# add the given file
file_list.append(files)
else:
raise IOError("%s does not exist." % files)
# filter with the given sufix
if suffix is not None:
file_list = filter_files(file_list, suffix)
# remove duplicates
file_list = list(set(file_list))
# return the sorted file list
return sorted(file_list)
[docs]def strip_suffix(filename, suffix=None):
"""
Strip off the suffix of the given filename or string.
Parameters
----------
filename : str
Filename or string to strip.
suffix : str, optional
Suffix to be stripped off (e.g. '.txt' including the dot).
Returns
-------
str
Filename or string without suffix.
"""
if suffix is not None and filename.endswith(suffix):
return filename[:-len(suffix)]
return filename
[docs]def match_file(filename, match_list, suffix=None, match_suffix=None,
match_exactly=True):
"""
Match a filename or string against a list of other filenames or strings.
Parameters
----------
filename : str
Filename or string to match.
match_list : list
Match to this list of filenames or strings.
suffix : str, optional
Suffix of `filename` to be ignored.
match_suffix : str, optional
Match only files from `match_list` with this suffix.
match_exactly : bool, optional
Matches must be exact, i.e. have the same base name.
Returns
-------
list
List of matched files.
Notes
-----
Asterisks "*" can be used to match any string or suffix.
"""
import os
import fnmatch
# get the base name without the path
basename = os.path.basename(strip_suffix(filename, suffix))
# init return list
matches = []
# look for files with the same base name in the files_list
if match_suffix is not None:
pattern = "*%s*%s" % (basename, match_suffix)
else:
pattern = "*%s" % basename
for match in fnmatch.filter(match_list, pattern):
# base names must match exactly if indicated
if (not match_exactly) or (basename == os.path.basename(
strip_suffix(match, match_suffix))):
matches.append(match)
# return the matches
return matches
[docs]def combine_events(events, delta, combine='mean'):
"""
Combine all events within a certain range.
Parameters
----------
events : list or numpy array
Events to be combined.
delta : float
Combination delta. All events within this `delta` are combined.
combine : {'mean', 'left', 'right'}
How to combine two adjacent events:
- 'mean': replace by the mean of the two events
- 'left': replace by the left of the two events
- 'right': replace by the right of the two events
Returns
-------
numpy array
Combined events.
"""
# add a small value to delta, otherwise we end up in floating point hell
delta += 1e-12
# return immediately if possible
if len(events) <= 1:
return events
# convert to numpy array or create a copy if needed
events = np.array(events, dtype=np.float)
# can handle only 1D events
if events.ndim > 1:
raise ValueError('only 1-dimensional events supported.')
# set start position
idx = 0
# get first event
left = events[idx]
# iterate over all remaining events
for right in events[1:]:
if right - left <= delta:
# combine the two events
if combine == 'mean':
left = events[idx] = 0.5 * (right + left)
elif combine == 'left':
left = events[idx] = left
elif combine == 'right':
left = events[idx] = right
else:
raise ValueError("don't know how to combine two events with "
"%s" % combine)
else:
# move forward
idx += 1
left = events[idx] = right
# return the combined events
return events[:idx + 1]
[docs]def quantize_events(events, fps, length=None, shift=None):
"""
Quantize the events with the given resolution.
Parameters
----------
events : list or numpy array
Events to be quantized.
fps : float
Quantize with `fps` frames per second.
length : int, optional
Length of the returned array. If 'None', the length will be set
according to the latest event.
shift : float, optional
Shift the events by `shift` seconds before quantization.
Returns
-------
numpy array
Quantized events.
"""
# convert to numpy array or create a copy if needed
events = np.array(events, dtype=np.float)
# can handle only 1D events
if events.ndim != 1:
raise ValueError('only 1-dimensional events supported.')
# shift all events if needed
if shift is not None:
import warnings
warnings.warn('`shift` parameter is deprecated as of version 0.16 and '
'will be removed in version 0.18. Please shift the '
'events manually before calling this function.')
events += shift
# determine the length for the quantized array
if length is None:
# set the length to be long enough to cover all events
length = int(round(np.max(events) * float(fps))) + 1
else:
# else filter all events which do not fit in the array
# since we apply rounding later, we need to subtract half a bin
events = events[:np.searchsorted(events, float(length - 0.5) / fps)]
# init array
quantized = np.zeros(length)
# quantize
events *= fps
# indices to be set in the quantized array
idx = np.unique(np.round(events).astype(np.int))
quantized[idx] = 1
# return the quantized array
return quantized
[docs]def quantize_notes(notes, fps, length=None, num_pitches=None, velocity=None):
"""
Quantize the notes with the given resolution.
Create a sparse 2D array with rows corresponding to points in time
(according to `fps` and `length`), and columns to note pitches (according
to `num_pitches`). The values of the array correspond to the velocity of a
sounding note at a given point in time (based on the note pitch, onset,
duration and velocity). If no values for `length` and `num_pitches` are
given, they are inferred from `notes`.
Parameters
----------
notes : 2D numpy array
Notes to be quantized. Expected columns:
'note_time' 'note_number' ['duration' ['velocity']]
If `notes` contains no 'duration' column, only the frame of the
onset will be set. If `notes` has no velocity column, a velocity
of 1 is assumed.
fps : float
Quantize with `fps` frames per second.
length : int, optional
Length of the returned array. If 'None', the length will be set
according to the latest sounding note.
num_pitches : int, optional
Number of pitches of the returned array. If 'None', the number of
pitches will be based on the highest pitch in the `notes` array.
velocity : float, optional
Use this velocity for all quantized notes. If set, the last column of
`notes` (if present) will be ignored.
Returns
-------
numpy array
Quantized notes.
"""
# convert to numpy array or create a copy if needed
notes = np.array(np.array(notes).T, dtype=np.float, ndmin=2).T
# check supported dims and shapes
if notes.ndim != 2:
raise ValueError('only 2-dimensional notes supported.')
if notes.shape[1] < 2:
raise ValueError('notes must have at least 2 columns.')
# split the notes into columns
note_onsets = notes[:, 0]
note_numbers = notes[:, 1].astype(np.int)
note_offsets = np.copy(note_onsets)
if notes.shape[1] > 2:
note_offsets += notes[:, 2]
if notes.shape[1] > 3 and velocity is None:
note_velocities = notes[:, 3]
else:
velocity = velocity or 1
note_velocities = np.ones(len(notes)) * velocity
# determine length and width of quantized array
if length is None:
# set the length to be long enough to cover all notes
length = int(round(np.max(note_offsets) * float(fps))) + 1
if num_pitches is None:
num_pitches = int(np.max(note_numbers)) + 1
# init array
quantized = np.zeros((length, num_pitches))
# quantize onsets and offsets
note_onsets = np.round((note_onsets * fps)).astype(np.int)
note_offsets = np.round((note_offsets * fps)).astype(np.int) + 1
# iterate over all notes
for n, note in enumerate(notes):
# use only the notes which fit in the array and note number >= 0
if num_pitches > note_numbers[n] >= 0:
quantized[note_onsets[n]:note_offsets[n], note_numbers[n]] = \
note_velocities[n]
# return quantized array
return quantized
[docs]def expand_notes(notes, duration=0.6, velocity=100):
"""
Expand notes to include duration and velocity.
The given duration and velocity is only used if they are not set already.
Parameters
----------
notes : numpy array, shape (num_notes, 2)
Notes, one per row. Expected columns:
'note_time' 'note_number' ['duration' ['velocity']]
duration : float, optional
Note duration if not defined by `notes`.
velocity : int, optional
Note velocity if not defined by `notes`.
Returns
-------
notes : numpy array, shape (num_notes, 2)
Notes (including note duration and velocity).
"""
if not notes.ndim == 2:
raise ValueError('unknown format for `notes`')
rows, columns = notes.shape
if columns == 4:
return notes
elif columns == 3:
new_columns = np.ones((rows, 1)) * velocity
elif columns == 2:
new_columns = np.ones((rows, 2)) * velocity
new_columns[:, 0] = duration
else:
raise ValueError('unable to handle `notes` with %d columns' % columns)
# return the notes
notes = np.hstack((notes, new_columns))
return notes
# argparse action to set and overwrite default lists
[docs]class OverrideDefaultListAction(argparse.Action):
"""
OverrideDefaultListAction
An argparse action that works similarly to the regular 'append' action.
The default value is deleted when a new value is specified. The 'append'
action would append the new value to the default.
Parameters
----------
sep : str, optional
Separator to be used if multiple values should be parsed from a list.
"""
def __init__(self, sep=None, *args, **kwargs):
super(OverrideDefaultListAction, self).__init__(*args, **kwargs)
self.set_to_default = True
# save the type as the type for the list
self.list_type = self.type
if sep is not None:
# if multiple values (separated by the given separator) should be
# parsed we need to fake the type of the argument to be a string
self.type = str
self.sep = sep
def __call__(self, parser, namespace, value, option_string=None):
# if this Action is called for the first time, remove the defaults
if self.set_to_default:
setattr(namespace, self.dest, [])
self.set_to_default = False
# get the current values
cur_values = getattr(namespace, self.dest)
# convert to correct type and append the newly parsed values
try:
cur_values.extend([self.list_type(v)
for v in value.split(self.sep)])
except ValueError as e:
raise argparse.ArgumentError(self, str(e) + value)
# taken from: http://www.scipy.org/Cookbook/SegmentAxis
[docs]def segment_axis(signal, frame_size, hop_size, axis=None, end='cut',
end_value=0):
"""
Generate a new array that chops the given array along the given axis into
(overlapping) frames.
Parameters
----------
signal : numpy array
Signal.
frame_size : int
Size of each frame [samples].
hop_size : int
Hop size between adjacent frames [samples].
axis : int, optional
Axis to operate on; if 'None', operate on the flattened array.
end : {'cut', 'wrap', 'pad'}, optional
What to do with the last frame, if the array is not evenly divisible
into pieces; possible values:
- 'cut'
simply discard the extra values,
- 'wrap'
copy values from the beginning of the array,
- 'pad'
pad with a constant value.
end_value : float, optional
Value used to pad if `end` is 'pad'.
Returns
-------
numpy array, shape (num_frames, frame_size)
Array with overlapping frames
Notes
-----
The array is not copied unless necessary (either because it is unevenly
strided and being flattened or because end is set to 'pad' or 'wrap').
The returned array is always of type np.ndarray.
Examples
--------
>>> segment_axis(np.arange(10), 4, 2)
array([[0, 1, 2, 3],
[2, 3, 4, 5],
[4, 5, 6, 7],
[6, 7, 8, 9]])
"""
# make sure that both frame_size and hop_size are integers
frame_size = int(frame_size)
hop_size = int(hop_size)
# TODO: add comments!
if axis is None:
signal = np.ravel(signal) # may copy
axis = 0
if axis != 0:
raise ValueError('please check if the resulting array is correct.')
length = signal.shape[axis]
if hop_size <= 0:
raise ValueError("hop_size must be positive.")
if frame_size <= 0:
raise ValueError("frame_size must be positive.")
if length < frame_size or (length - frame_size) % hop_size:
if length > frame_size:
round_up = (frame_size + (1 + (length - frame_size) // hop_size) *
hop_size)
round_down = (frame_size + ((length - frame_size) // hop_size) *
hop_size)
else:
round_up = frame_size
round_down = 0
assert round_down < length < round_up
assert round_up == round_down + hop_size or (round_up == frame_size and
round_down == 0)
signal = signal.swapaxes(-1, axis)
if end == 'cut':
signal = signal[..., :round_down]
elif end in ['pad', 'wrap']:
# need to copy
s = list(signal.shape)
s[-1] = round_up
y = np.empty(s, dtype=signal.dtype)
y[..., :length] = signal
if end == 'pad':
y[..., length:] = end_value
elif end == 'wrap':
y[..., length:] = signal[..., :round_up - length]
signal = y
signal = signal.swapaxes(-1, axis)
length = signal.shape[axis]
if length == 0:
raise ValueError("Not enough data points to segment array in 'cut' "
"mode; try end='pad' or end='wrap'")
assert length >= frame_size
assert (length - frame_size) % hop_size == 0
n = 1 + (length - frame_size) // hop_size
s = signal.strides[axis]
new_shape = (signal.shape[:axis] + (n, frame_size) +
signal.shape[axis + 1:])
new_strides = (signal.strides[:axis] + (hop_size * s, s) +
signal.strides[axis + 1:])
try:
return np.ndarray.__new__(np.ndarray, strides=new_strides,
shape=new_shape, buffer=signal,
dtype=signal.dtype)
except TypeError:
# TODO: remove warning?
import warnings
warnings.warn("Problem with ndarray creation forces copy.")
signal = signal.copy()
# shape doesn't change but strides does
new_strides = (signal.strides[:axis] + (hop_size * s, s) +
signal.strides[axis + 1:])
return np.ndarray.__new__(np.ndarray, strides=new_strides,
shape=new_shape, buffer=signal,
dtype=signal.dtype)
# keep namespace clean
del contextlib