# encoding: utf-8
# pylint: disable=no-member
# pylint: disable=invalid-name
# pylint: disable=too-many-arguments
# pylint: disable=wrong-import-position
"""
Utility package.
"""
from __future__ import absolute_import, division, print_function
import argparse
import contextlib
import numpy as np
# decorator to suppress warnings
[docs]def suppress_warnings(function):
"""
Decorate the given function to suppress any warnings.
Parameters
----------
function : function
Function to be decorated.
Returns
-------
decorated function
Decorated function.
"""
# needed to preserve docstring of the decorated function
from functools import wraps
@wraps(function)
def decorator_function(*args, **kwargs):
"""
Decorator function to suppress warnings.
Parameters
----------
args : arguments, optional
Arguments passed to function to be decorated.
kwargs : keyword arguments, optional
Keyword arguments passed to function to be decorated.
Returns
-------
decorated function
Decorated function.
"""
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
return function(*args, **kwargs)
return decorator_function
# file handling routines
[docs]def filter_files(files, suffix):
"""
Filter the list to contain only files matching the given `suffix`.
Parameters
----------
files : list
List of files to be filtered.
suffix : str
Return only files matching this suffix.
Returns
-------
list
List of files.
"""
import fnmatch
# make sure files is a list
if not isinstance(files, list):
files = [files]
# no suffix given, return the list unaltered
if suffix is None:
return files
# filter the files with the given suffix
file_list = []
if isinstance(suffix, list):
# a list of suffices is given
for s in suffix:
file_list.extend(fnmatch.filter(files, "*%s" % s))
else:
# a single suffix is given
file_list.extend(fnmatch.filter(files, "*%s" % suffix))
# return the filtered list
return file_list
[docs]def search_path(path, recursion_depth=0):
"""
Returns a list of files in a directory (recursively).
Parameters
----------
path : str or list
Directory to be searched.
recursion_depth : int, optional
Recursively search sub-directories up to this depth.
Returns
-------
list
List of files.
"""
# adapted from http://stackoverflow.com/a/234329
import os
# remove the rightmost path separator (needed for recursion depth count)
path = path.rstrip(os.path.sep)
# we can only handle directories
if not os.path.isdir(path):
raise IOError("%s is not a directory." % path)
# files to be returned
file_list = []
# keep track of the initial recursion depth
initial_depth = path.count(os.path.sep)
for root, dirs, files in os.walk(path):
# add all files of this directory to the list
for f in files:
file_list.append(os.path.join(root, f))
# remove all sub directories exceeding the wanted recursion depth
if initial_depth + recursion_depth <= root.count(os.path.sep):
del dirs[:]
# return the sorted file list
return sorted(file_list)
[docs]def search_files(files, suffix=None, recursion_depth=0):
"""
Returns the files matching the given `suffix`.
Parameters
----------
files : str or list
File, path or a list thereof to be searched / filtered.
suffix : str, optional
Return only files matching this suffix.
recursion_depth : int, optional
Recursively search sub-directories up to this depth.
Returns
-------
list
List of files.
Notes
-----
The list of returned files is sorted.
"""
import os
file_list = []
# determine the files
if isinstance(files, list):
# a list is given, recursively call the function on each element
for f in files:
file_list.extend(search_files(f))
elif os.path.isdir(files):
# add all files in the given path (up to the given recursion depth)
file_list.extend(search_path(files, recursion_depth))
elif os.path.isfile(files):
# add the given file
file_list.append(files)
else:
raise IOError("%s does not exist." % files)
# filter with the given sufix
if suffix is not None:
file_list = filter_files(file_list, suffix)
# remove duplicates
file_list = list(set(file_list))
# return the sorted file list
return sorted(file_list)
[docs]def strip_suffix(filename, suffix=None):
"""
Strip off the suffix of the given filename or string.
Parameters
----------
filename : str
Filename or string to strip.
suffix : str, optional
Suffix to be stripped off (e.g. '.txt' including the dot).
Returns
-------
str
Filename or string without suffix.
"""
if suffix is not None and filename.endswith(suffix):
return filename[:-len(suffix)]
return filename
[docs]def match_file(filename, match_list, suffix=None, match_suffix=None,
match_exactly=True):
"""
Match a filename or string against a list of other filenames or strings.
Parameters
----------
filename : str
Filename or string to match.
match_list : list
Match to this list of filenames or strings.
suffix : str, optional
Suffix of `filename` to be ignored.
match_suffix : str, optional
Match only files from `match_list` with this suffix.
match_exactly : bool, optional
Matches must be exact, i.e. have the same base name.
Returns
-------
list
List of matched files.
Notes
-----
Asterisks "*" can be used to match any string or suffix.
"""
import os
import fnmatch
# get the base name without the path
basename = os.path.basename(strip_suffix(filename, suffix))
# init return list
matches = []
# look for files with the same base name in the files_list
if match_suffix is not None:
pattern = "*%s*%s" % (basename, match_suffix)
else:
pattern = "*%s" % basename
for match in fnmatch.filter(match_list, pattern):
# base names must match exactly if indicated
if (not match_exactly) or (basename == os.path.basename(
strip_suffix(match, match_suffix))):
matches.append(match)
# return the matches
return matches
@suppress_warnings
[docs]def load_events(filename):
"""
Load a events from a text file, one floating point number per line.
Parameters
----------
filename : str or file handle
File to load the events from.
Returns
-------
numpy array
Events.
Notes
-----
Comments (lines starting with '#') and additional columns are ignored,
i.e. only the first column is returned.
"""
# read in the events, one per line
events = np.loadtxt(filename, ndmin=2)
# 1st column is the event's time, the rest is ignored
return events[:, 0]
[docs]def write_events(events, filename, fmt='%.3f', delimiter='\t', header=''):
"""
Write events to a text file, one event per line.
Parameters
----------
events : numpy array
Events to be written to file.
filename : str or file handle
File to write the events to.
fmt : str, optional
How to format the events.
delimiter : str, optional
String or character separating multiple columns.
header : str, optional
Header to be written (as a comment).
Returns
-------
numpy array
Events.
Notes
-----
This function is just a wrapper to ``np.savetxt``, but reorders the
arguments in a way it can be used as an
:class:`.processors.OutputProcessor`.
"""
# write the events to the output
np.savetxt(filename, np.asarray(events),
fmt=fmt, delimiter=delimiter, header=header)
# also return them
return events
[docs]def combine_events(events, delta, combine='mean'):
"""
Combine all events within a certain range.
Parameters
----------
events : list or numpy array
Events to be combined.
delta : float
Combination delta. All events within this `delta` are combined.
combine : {'mean', 'left', 'right'}
How to combine two adjacent events:
- 'mean': replace by the mean of the two events
- 'left': replace by the left of the two events
- 'right': replace by the right of the two events
Returns
-------
numpy array
Combined events.
"""
# add a small value to delta, otherwise we end up in floating point hell
delta += 1e-12
# return immediately if possible
if len(events) <= 1:
return events
# create working copy
events = np.array(events, copy=True)
# set start position
idx = 0
# get first event
left = events[idx]
# iterate over all remaining events
for right in events[1:]:
if right - left <= delta:
# combine the two events
if combine == 'mean':
left = events[idx] = 0.5 * (right + left)
elif combine == 'left':
left = events[idx] = left
elif combine == 'right':
left = events[idx] = right
else:
raise ValueError("don't know how to combine two events with "
"%s" % combine)
else:
# move forward
idx += 1
left = events[idx] = right
# return the combined events
return events[:idx + 1]
[docs]def quantize_events(events, fps, length=None, shift=None):
"""
Quantize the events with the given resolution.
Parameters
----------
events : numpy array
Events to be quantized.
fps : float
Quantize with `fps` frames per second.
length : int, optional
Length of the returned array.
shift : float, optional
Shift the events by this value before quantisation
Returns
-------
numpy array
Quantized events.
"""
# convert to numpy array if needed
events = np.asarray(events, dtype=np.float)
# shift all events if needed
if shift:
events += shift
# determine the length for the quantized array
if length is None:
# set the length to be long enough to cover all events
length = int(round(np.max(events) * float(fps))) + 1
else:
# else filter all events which do not fit in the array
# since we apply rounding later, we need to subtract half a bin
events = events[:np.searchsorted(events, float(length - 0.5) / fps)]
# init array
quantized = np.zeros(length)
# quantize
events *= fps
# indices to be set in the quantized array
idx = np.unique(np.round(events).astype(np.int))
quantized[idx] = 1
# return the quantized array
return quantized
# argparse action to set and overwrite default lists
[docs]class OverrideDefaultListAction(argparse.Action):
"""
OverrideDefaultListAction
An argparse action that works similarly to the regular 'append' action.
The default value is deleted when a new value is specified. The 'append'
action would append the new value to the default.
Parameters
----------
sep : str, optional
Separator to be used if multiple values should be parsed from a list.
"""
def __init__(self, sep=None, *args, **kwargs):
super(OverrideDefaultListAction, self).__init__(*args, **kwargs)
self.set_to_default = True
# save the type as the type for the list
self.list_type = self.type
if sep is not None:
# if multiple values (separated by the given separator) should be
# parsed we need to fake the type of the argument to be a string
self.type = str
self.sep = sep
def __call__(self, parser, namespace, value, option_string=None):
# if this Action is called for the first time, remove the defaults
if self.set_to_default:
setattr(namespace, self.dest, [])
self.set_to_default = False
# get the current values
cur_values = getattr(namespace, self.dest)
# convert to correct type and append the newly parsed values
try:
cur_values.extend([self.list_type(v)
for v in value.split(self.sep)])
except ValueError as e:
raise argparse.ArgumentError(self, str(e) + value)
# taken from: http://www.scipy.org/Cookbook/SegmentAxis
[docs]def segment_axis(signal, frame_size, hop_size, axis=None, end='cut',
end_value=0):
"""
Generate a new array that chops the given array along the given axis into
(overlapping) frames.
Parameters
----------
signal : numpy array
Signal.
frame_size : int
Size of each frame [samples].
hop_size : int
Hop size between adjacent frames [samples].
axis : int, optional
Axis to operate on; if 'None', operate on the flattened array.
end : {'cut', 'wrap', 'pad'}, optional
What to do with the last frame, if the array is not evenly divisible
into pieces; possible values:
- 'cut'
simply discard the extra values,
- 'wrap'
copy values from the beginning of the array,
- 'pad'
pad with a constant value.
end_value : float, optional
Value used to pad if `end` is 'pad'.
Returns
-------
numpy array, shape (num_frames, frame_size)
Array with overlapping frames
Notes
-----
The array is not copied unless necessary (either because it is unevenly
strided and being flattened or because end is set to 'pad' or 'wrap').
The returned array is always of type np.ndarray.
Examples
--------
>>> segment_axis(np.arange(10), 4, 2)
array([[0, 1, 2, 3],
[2, 3, 4, 5],
[4, 5, 6, 7],
[6, 7, 8, 9]])
"""
# make sure that both frame_size and hop_size are integers
frame_size = int(frame_size)
hop_size = int(hop_size)
# TODO: add comments!
if axis is None:
signal = np.ravel(signal) # may copy
axis = 0
if axis != 0:
raise ValueError('please check if the resulting array is correct.')
length = signal.shape[axis]
if hop_size <= 0:
raise ValueError("hop_size must be positive.")
if frame_size <= 0:
raise ValueError("frame_size must be positive.")
if length < frame_size or (length - frame_size) % hop_size:
if length > frame_size:
round_up = (frame_size + (1 + (length - frame_size) // hop_size) *
hop_size)
round_down = (frame_size + ((length - frame_size) // hop_size) *
hop_size)
else:
round_up = frame_size
round_down = 0
assert round_down < length < round_up
assert round_up == round_down + hop_size or (round_up == frame_size and
round_down == 0)
signal = signal.swapaxes(-1, axis)
if end == 'cut':
signal = signal[..., :round_down]
elif end in ['pad', 'wrap']:
# need to copy
s = list(signal.shape)
s[-1] = round_up
y = np.empty(s, dtype=signal.dtype)
y[..., :length] = signal
if end == 'pad':
y[..., length:] = end_value
elif end == 'wrap':
y[..., length:] = signal[..., :round_up - length]
signal = y
signal = signal.swapaxes(-1, axis)
length = signal.shape[axis]
if length == 0:
raise ValueError("Not enough data points to segment array in 'cut' "
"mode; try end='pad' or end='wrap'")
assert length >= frame_size
assert (length - frame_size) % hop_size == 0
n = 1 + (length - frame_size) // hop_size
s = signal.strides[axis]
new_shape = (signal.shape[:axis] + (n, frame_size) +
signal.shape[axis + 1:])
new_strides = (signal.strides[:axis] + (hop_size * s, s) +
signal.strides[axis + 1:])
try:
# noinspection PyArgumentList
return np.ndarray.__new__(np.ndarray, strides=new_strides,
shape=new_shape, buffer=signal,
dtype=signal.dtype)
except TypeError:
# TODO: remove warning?
import warnings
warnings.warn("Problem with ndarray creation forces copy.")
signal = signal.copy()
# shape doesn't change but strides does
new_strides = (signal.strides[:axis] + (hop_size * s, s) +
signal.strides[axis + 1:])
# noinspection PyArgumentList
return np.ndarray.__new__(np.ndarray, strides=new_strides,
shape=new_shape, buffer=signal,
dtype=signal.dtype)
# keep namespace clean
del contextlib
# finally import the submodules
from . import midi