Source code for pynapple.process.warping

"""
Functions to create trial-based tensors and warp times
"""

import inspect
from functools import wraps
from numbers import Number

import numpy as np

from .. import core as nap


def _validate_warping_inputs(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        # Validate each positional argument
        sig = inspect.signature(func)
        kwargs = sig.bind_partial(*args, **kwargs).arguments

        parameters_type = {
            "input": (nap.Ts, nap.Tsd, nap.TsdFrame, nap.TsdTensor, nap.TsGroup),
            "ep": (nap.IntervalSet,),
            "bin_size": (Number,),
            "time_unit": (str,),
            "align": (str,),
            "padding_value": (Number,),
            "num_bins": (int,),
        }
        for param, param_type in parameters_type.items():
            if param in kwargs:
                if not isinstance(kwargs[param], param_type):
                    raise TypeError(
                        f"Invalid type. Parameter {param} must be of type {[p.__name__ for p in param_type]}."
                    )

        # Call the original function with validated inputs
        return func(**kwargs)

    return wrapper


[docs] @_validate_warping_inputs def build_tensor( input, ep, bin_size=None, align="start", padding_value=np.nan, time_unit="s" ): """ Return trial-based tensor from an IntervalSet object. This is equivalent to calling `input.to_trial_tensor` for Tsd, TsdFrame, and TsdTensor objects, or is equivalent to calling `input.trial_count` for Ts and TsGroup objects. - If `input` is a `TsGroup`, returns a numpy array of shape (number of group elements, number of trials, number of time bins). The `bin_size` parameter determines the number of time bins. - If `input` is `Tsd`, `TsdFrame` or `TsdTensor`, returns a numpy array of shape (shape of time series, number of trials, number of time points). The `align` parameter controls how the time series are aligned. If `align="start"`, the time series are aligned to the start of each trial. If `align="end"`, the time series are aligned to the end of each trial. If trials have uneven durations, the returned array is padded. The parameter `padding_value` determine which value is used to pad the array. Default is NaN. Parameters ---------- input : Ts, Tsd, TsdFrame, TsdTensor or TsGroup Input to slice and align to the trials within the `ep` parameter. ep : IntervalSet Epochs holding the trials. Each interval can be of unequal size. bin_size : Number, optional Size of the time bins for TsGroup and Ts objects. align: str, optional How to align the time series ('start' [default], 'end') padding_value: Number, optional How to pad the array if unequal intervals. Default is np.nan. time_unit : str, optional Time units of the bin_size parameter ('s' [default], 'ms', 'us'). Returns ------- numpy.ndarray Raises ------ RuntimeError If `time_unit` not in ["s", "ms", "us"] Examples -------- >>> import pynapple as nap >>> import numpy as np >>> group = nap.TsGroup({0:nap.Ts(t=np.arange(0, 100))}) >>> ep = nap.IntervalSet(start=np.arange(20, 100, 20), end=np.arange(20, 100, 20) + np.arange(2, 10, 2)) >>> print(ep) index start end 0 20 22 1 40 44 2 60 66 3 80 88 shape: (4, 2), time unit: sec. Create a trial-based tensor by counting events within 1 second bin for each interval of `ep`. >>> tensor = nap.build_tensor(group, ep, bin_size=1) >>> tensor array([[[ 1., 1., nan, nan, nan, nan, nan, nan], [ 1., 1., 1., 1., nan, nan, nan, nan], [ 1., 1., 1., 1., 1., 1., nan, nan], [ 1., 1., 1., 1., 1., 1., 1., 1.]]]) By default, the time series are aligned to the start of the epochs. The parameter `align` control this behavior. >>> tensor = nap.build_tensor(group, ep, bin_size=1, align="end") >>> tensor array([[[nan, nan, nan, nan, nan, nan, 1., 1.], [nan, nan, nan, nan, 1., 1., 1., 1.], [nan, nan, 1., 1., 1., 1., 1., 1.], [ 1., 1., 1., 1., 1., 1., 1., 1.]]]) This function works for any time series. >>> tsdframe = nap.TsdFrame(t=np.arange(100), d=np.arange(200).reshape(2,100).T) >>> tensor = nap.build_tensor(tsdframe, ep) >>> tensor array([[[ 20., 21., 22., nan, nan, nan, nan, nan, nan], [ 40., 41., 42., 43., 44., nan, nan, nan, nan], [ 60., 61., 62., 63., 64., 65., 66., nan, nan], [ 80., 81., 82., 83., 84., 85., 86., 87., 88.]], [[120., 121., 122., nan, nan, nan, nan, nan, nan], [140., 141., 142., 143., 144., nan, nan, nan, nan], [160., 161., 162., 163., 164., 165., 166., nan, nan], [180., 181., 182., 183., 184., 185., 186., 187., 188.]]]) """ if time_unit not in ["s", "ms", "us"]: raise RuntimeError("time_unit should be 's', 'ms' or 'us'") if align not in ["start", "end"]: raise RuntimeError("align should be 'start' or 'end'") if isinstance(input, (nap.TsGroup, nap.Ts)): if not isinstance(bin_size, Number): raise RuntimeError( "When input is a TsGroup or Ts object, bin_size should be specified" ) return input.trial_count(ep, bin_size, align, padding_value, time_unit) else: return input.to_trial_tensor(ep, align, padding_value)
def _warp_tensor_from_tsgroup(input, ep, num_bins): if isinstance(input, nap.Ts): output = np.zeros(shape=(1, len(ep), num_bins)) else: output = np.zeros(shape=(len(input), len(ep), num_bins)) bin_sizes = (ep.end - ep.start) / num_bins for i in range(len(ep)): tmp = input.count(bin_sizes[i], ep[i]) output[:, i, :] = np.transpose(tmp.values) if isinstance(input, nap.Ts): # Removing first axis if Ts. output = output[0] return output def _warp_tensor_from_tsd(input, ep, num_bins): slices = [input.get_slice(s, e) for s, e in ep.values] lengths = list(map(lambda sl: sl.stop - sl.start, slices)) output = np.zeros(shape=(len(ep), num_bins, *input.shape[1:])) for i, sl in enumerate(slices): if lengths[i] == num_bins: output[i] = input[sl].values elif lengths[i] > num_bins: # Call bin_average output[i] = input[sl].bin_average( (ep.end[i] - ep.start[i]) / num_bins, ep[i] ) else: # Call interpolate output[i] = input[sl].interpolate( ts=nap.Ts(t=np.linspace(ep.start[i], ep.end[i], num_bins)), ep=ep[i] ) if output.ndim > 2: output = np.moveaxis(output, source=[0, 1], destination=[-2, -1]) return output
[docs] @_validate_warping_inputs def warp_tensor(input, ep, num_bins): """ Return linearly time-warped trial-based tensor from an IntervalSet object. - If `input` is a `TsGroup`, returns a numpy array of shape (number of group element, number of trial, `num_bins`). - If `input` is `Tsd`, `TsdFrame` or `TsdTensor`, returns a numpy array of shape (shape of time series, number of trial, `num_bins`). Parameters ---------- input : Ts , Tsd, TsdFrame, TsdTensor or TsGroup Input object ep : IntervalSet Epochs holding the trials. Each interval can be of unequal size. num_bins : int Returns ------- numpy.ndarray Examples -------- >>> import pynapple as nap >>> import numpy as np >>> group = nap.TsGroup({0:nap.Ts(t=np.arange(0, 100))}) >>> ep = nap.IntervalSet(start=np.arange(20, 100, 20), end=np.arange(20, 100, 20) + np.arange(2, 10, 2)) >>> print(ep) index start end 0 20 22 1 40 44 2 60 66 3 80 88 shape: (4, 2), time unit: sec. Create a trial-based tensor by counting events within 10 bins between start and end of each interval of `ep`. >>> tensor = nap.warp_tensor(group, ep, num_bins=10) >>> tensor array([[[1., 0., 0., 0., 0., 1., 0., 0., 0., 0.], [1., 0., 1., 0., 0., 1., 0., 1., 0., 0.], [1., 1., 0., 1., 0., 1., 1., 0., 1., 0.], [1., 1., 1., 1., 0., 1., 1., 1., 1., 0.]]]) This function works for any time series. Under the hood, the time series is either bin-averaged or interpolated depending on the number of bins. >>> tsd = nap.Tsd(t=np.arange(100), d=np.arange(100)) >>> tensor = nap.warp_tensor(tsd, ep, num_bins=3) >>> tensor array([[20. , 21. , 22. ], [40.5, 42. , 43. ], [60.5, 62.5, 64.5], [81. , 84. , 87. ]]) """ if num_bins <= 0: raise RuntimeError("num_bins should be positive integer.") if isinstance(input, (nap.TsGroup, nap.Ts)): return _warp_tensor_from_tsgroup(input, ep, num_bins) else: return _warp_tensor_from_tsd(input, ep, num_bins)