"""
Pynapple time series are containers specialized for neurophysiological time series.
They provides standardized time representation, plus various functions for manipulating times series with identical sampling frequency.
Multiple time series object are avaible depending on the shape of the data.
- `TsdTensor` : for data with of more than 2 dimensions, typically movies.
- `TsdFrame` : for column-based data. It can be easily converted to a pandas.DataFrame. Columns can be labelled and selected similar to pandas.
- `Tsd` : One-dimensional time series. It can be converted to a pandas.Series.
- `Ts` : For timestamps data only.
Most of the same functions are available through all classes. Objects behaves like numpy.ndarray. Slicing can be done the same way for example
`tsd[0:10]` returns the first 10 rows. Similarly, you can call any numpy functions like `np.mean(tsd, 1)`.
"""
import abc
import importlib
import warnings
from numbers import Number
import numpy as np
import pandas as pd
from numpy.lib.mixins import NDArrayOperatorsMixin
from scipy import signal
from tabulate import tabulate
from ._core_functions import _bin_average, _convolve, _dropna, _restrict, _threshold
from .base_class import _Base
from .interval_set import IntervalSet
from .time_index import TsIndex
from .utils import (
_concatenate_tsd,
_get_terminal_size,
_split_tsd,
_TsdFrameSliceHelper,
convert_to_array,
is_array_like,
)
def _get_class(data):
"""Select the right time series object and return the class
Parameters
----------
data : numpy.ndarray
The data to hold in the time series object
Returns
-------
Class
The class
"""
if data.ndim == 1:
return Tsd
elif data.ndim == 2:
return TsdFrame
else:
return TsdTensor
class _BaseTsd(_Base, NDArrayOperatorsMixin, abc.ABC):
"""
Abstract base class for time series objects.
Implement most of the shared functions across concrete classes `Tsd`, `TsdFrame`, `TsdTensor`
"""
def __init__(self, t, d, time_units="s", time_support=None, load_array=True):
super().__init__(t, time_units, time_support)
if load_array or isinstance(d, np.ndarray):
self.values = convert_to_array(d, "d")
else:
if not is_array_like(d):
raise TypeError(
"Data should be array-like, i.e. be indexable, iterable and, have attributes "
"`shape`, `ndim` and, `dtype`)."
)
self.values = d
assert len(self.index) == len(
self.values
), "Length of values {} does not match length of index {}".format(
len(self.values), len(self.index)
)
if isinstance(time_support, IntervalSet) and len(self.index):
starts = time_support.start
ends = time_support.end
idx = _restrict(self.index.values, starts, ends)
t = self.index.values[idx]
d = self.values[idx]
self.index = TsIndex(t)
self.values = d
self.rate = self.index.shape[0] / np.sum(
time_support.values[:, 1] - time_support.values[:, 0]
)
self.dtype = self.values.dtype
def __setitem__(self, key, value):
"""setter for time series"""
try:
self.values.__setitem__(key, value)
except IndexError:
raise IndexError
def __getattr__(self, name):
"""Allow numpy functions to be attached as attributes of Tsd objects"""
if hasattr(np, name):
np_func = getattr(np, name)
def method(*args, **kwargs):
return np_func(self, *args, **kwargs)
return method
raise AttributeError(
"Time series object does not have the attribute {}".format(name)
)
@property
def d(self):
return self.values
@property
def shape(self):
return self.values.shape
@property
def ndim(self):
return self.values.ndim
@property
def size(self):
return self.values.size
def __array__(self, dtype=None):
return np.asarray(self.values, dtype=dtype)
def __array_ufunc__(self, ufunc, method, *args, **kwargs):
# print("In __array_ufunc__")
# print(" ufunc = ", ufunc)
# print(" method = ", method)
# print(" args = ", args)
# for inp in args:
# print(type(inp))
# print(" kwargs = ", kwargs)
if method == "__call__":
new_args = []
n_object = 0
for a in args:
if isinstance(a, self.__class__):
new_args.append(a.values)
n_object += 1
else:
new_args.append(a)
# Meant to prevent addition of two Tsd for example
if n_object > 1:
return NotImplemented
else:
out = ufunc(*new_args, **kwargs)
if isinstance(out, np.ndarray) or is_array_like(out):
if out.shape[0] == self.index.shape[0]:
kwargs = {}
if hasattr(self, "columns"):
kwargs["columns"] = self.columns
return _get_class(out)(
t=self.index, d=out, time_support=self.time_support, **kwargs
)
else:
return out
else:
return out
else:
return NotImplemented
def __array_function__(self, func, types, args, kwargs):
if func in [
np.sort,
np.lexsort,
np.sort_complex,
np.partition,
np.argpartition,
]:
return NotImplemented
if hasattr(np.fft, func.__name__):
return NotImplemented
if func in [np.split, np.array_split, np.dsplit, np.hsplit, np.vsplit]:
return _split_tsd(func, *args, **kwargs)
if func in [np.concatenate, np.vstack, np.hstack, np.dstack]:
return _concatenate_tsd(func, *args, **kwargs)
new_args = []
for a in args:
if isinstance(a, self.__class__):
new_args.append(a.values)
else:
new_args.append(a)
out = func._implementation(*new_args, **kwargs)
if isinstance(out, np.ndarray) or is_array_like(out):
# # if dims increased in any case, we can't return safely a time series
# if out.ndim > self.ndim:
# return out
if out.shape[0] == self.index.shape[0]:
kwargs = {}
if hasattr(self, "columns"):
kwargs["columns"] = self.columns
return _get_class(out)(
t=self.index, d=out, time_support=self.time_support, **kwargs
)
else:
return out
else:
return out
def as_array(self):
"""
Return the data.
Returns
-------
out: array-like
_
"""
return self.values
def data(self):
"""
Return the data.
Returns
-------
out: array-like
_
"""
return self.values
def to_numpy(self):
"""
Return the data as a numpy.ndarray.
Mostly useful for matplotlib plotting when calling `plot(tsd)`.
"""
return np.asarray(self.values)
def copy(self):
"""Copy the data, index and time support"""
return self.__class__(
t=self.index.copy(), d=self.values[:].copy(), time_support=self.time_support
)
def value_from(self, data, ep=None):
"""
Replace the value with the closest value from Tsd/TsdFrame/TsdTensor argument
Parameters
----------
data : Tsd, TsdFrame or TsdTensor
The object holding the values to replace.
ep : IntervalSet (optional)
The IntervalSet object to restrict the operation.
If None, the time support of the tsd input object is used.
Returns
-------
out : Tsd, TsdFrame or TsdTensor
Object with the new values
Examples
--------
In this example, the ts object will receive the closest values in time from tsd.
>>> import pynapple as nap
>>> import numpy as np
>>> t = np.unique(np.sort(np.random.randint(0, 1000, 100))) # random times
>>> ts = nap.Ts(t=t, time_units='s')
>>> tsd = nap.Tsd(t=np.arange(0,1000), d=np.random.rand(1000), time_units='s')
>>> ep = nap.IntervalSet(start = 0, end = 500, time_units = 's')
The variable ts is a time series object containing only nan.
The tsd object containing the values, for example the tracking data, and the epoch to restrict the operation.
>>> newts = ts.value_from(tsd, ep)
newts has the same size of ts restrict to ep.
>>> print(len(ts.restrict(ep)), len(newts))
52 52
"""
assert isinstance(
data, _BaseTsd
), "First argument should be an instance of Tsd, TsdFrame or TsdTensor"
t, d, time_support, kwargs = super().value_from(data, ep)
return data.__class__(t=t, d=d, time_support=time_support, **kwargs)
def count(self, *args, dtype=None, **kwargs):
"""
Count occurences of events within bin_size or within a set of bins defined as an IntervalSet.
You can call this function in multiple ways :
1. *tsd.count(bin_size=1, time_units = 'ms')*
-> Count occurence of events within a 1 ms bin defined on the time support of the object.
2. *tsd.count(1, ep=my_epochs)*
-> Count occurent of events within a 1 second bin defined on the IntervalSet my_epochs.
3. *tsd.count(ep=my_bins)*
-> Count occurent of events within each epoch of the intervalSet object my_bins
4. *tsd.count()*
-> Count occurent of events within each epoch of the time support.
bin_size should be seconds unless specified.
If bin_size is used and no epochs is passed, the data will be binned based on the time support of the object.
Parameters
----------
bin_size : None or float, optional
The bin size (default is second)
ep : None or IntervalSet, optional
IntervalSet to restrict the operation
time_units : str, optional
Time units of bin size ('us', 'ms', 's' [default])
dtype: type, optional
Data type for the count. Default is np.int64.
Returns
-------
out: Tsd
A Tsd object indexed by the center of the bins.
Examples
--------
This example shows how to count events within bins of 0.1 second.
>>> import pynapple as nap
>>> import numpy as np
>>> t = np.unique(np.sort(np.random.randint(0, 1000, 100)))
>>> ts = nap.Ts(t=t, time_units='s')
>>> bincount = ts.count(0.1)
An epoch can be specified:
>>> ep = nap.IntervalSet(start = 100, end = 800, time_units = 's')
>>> bincount = ts.count(0.1, ep=ep)
And bincount automatically inherit ep as time support:
>>> bincount.time_support
start end
0 100.0 800.0
"""
t, d, ep = super().count(*args, dtype=dtype, **kwargs)
return Tsd(t=t, d=d, time_support=ep)
def bin_average(self, bin_size, ep=None, time_units="s"):
"""
Bin the data by averaging points within bin_size
bin_size should be seconds unless specified.
If no epochs is passed, the data will be binned based on the time support.
Parameters
----------
bin_size : float
The bin size (default is second)
ep : None or IntervalSet, optional
IntervalSet to restrict the operation
time_units : str, optional
Time units of bin size ('us', 'ms', 's' [default])
Returns
-------
out: Tsd, TsdFrame, TsdTensor
A Tsd object indexed by the center of the bins and holding the averaged data points.
Examples
--------
This example shows how to bin data within bins of 0.1 second.
>>> import pynapple as nap
>>> import numpy as np
>>> tsd = nap.Tsd(t=np.arange(100), d=np.random.rand(100))
>>> bintsd = tsd.bin_average(0.1)
An epoch can be specified:
>>> ep = nap.IntervalSet(start = 10, end = 80, time_units = 's')
>>> bintsd = tsd.bin_average(0.1, ep=ep)
And bintsd automatically inherit ep as time support:
>>> bintsd.time_support
>>> start end
>>> 0 10.0 80.0
"""
if not isinstance(ep, IntervalSet):
ep = self.time_support
bin_size = TsIndex.format_timestamps(np.array([bin_size]), time_units)[0]
time_array = self.index.values
data_array = self.values
starts = ep.start
ends = ep.end
t, d = _bin_average(time_array, data_array, starts, ends, bin_size)
kwargs = {}
if hasattr(self, "columns"):
kwargs["columns"] = self.columns
return self.__class__(t=t, d=d, time_support=ep, **kwargs)
def dropna(self, update_time_support=True):
"""Drop every rows containing NaNs. By default, the time support is updated to start and end around the time points that are non NaNs.
To change this behavior, you can set update_time_support=False.
Parameters
----------
update_time_support : bool, optional
Returns
-------
Tsd, TsdFrame or TsdTensor
The time series without the NaNs
"""
assert isinstance(update_time_support, bool)
time_array = self.index.values
data_array = self.values
starts = self.time_support.start
ends = self.time_support.end
t, d, starts, ends = _dropna(
time_array, data_array, starts, ends, update_time_support, self.ndim
)
if update_time_support:
if is_array_like(starts) and is_array_like(ends):
ep = IntervalSet(starts, ends)
else:
ep = None
else:
ep = self.time_support
kwargs = {}
if hasattr(self, "columns"):
kwargs["columns"] = self.columns
return self.__class__(t=t, d=d, time_support=ep, **kwargs)
def convolve(self, array, ep=None, trim="both"):
"""Return the discrete linear convolution of the time series with a one dimensional sequence.
A parameter ep can control the epochs for which the convolution will apply. Otherwise the convolution is made over the time support.
This function assume a constant sampling rate of the time series.
The only mode supported is full. The returned object is trimmed to match the size of the original object. The parameter trim controls which side the trimming operates. Default is 'both'.
See the numpy documentation here : https://numpy.org/doc/stable/reference/generated/numpy.convolve.html
Parameters
----------
array : array-like
1-D or 2-D array with kernel(s) to be used for convolution.
First dimension is assumed to be time.
ep : None, optional
The epochs to apply the convolution
trim : str, optional
The side on which to trim the output of the convolution ('left', 'right', 'both' [default])
Returns
-------
Tsd, TsdFrame or TsdTensor
The convolved time series
"""
if not is_array_like(array):
raise IOError(
"Input should be a numpy array (or jax array if pynajax is installed)."
)
if len(array) == 0:
raise IOError("Input array is length 0")
if array.ndim > 2:
raise IOError("Array should be 1 or 2 dimension.")
if trim not in ["both", "left", "right"]:
raise IOError("Unknow argument. trim should be 'both', 'left' or 'right'.")
time_array = self.index.values
data_array = self.values
if ep is None:
ep = self.time_support
starts = ep.start
ends = ep.end
else:
if not isinstance(ep, IntervalSet):
raise IOError("ep should be an object of type IntervalSet")
starts = ep.start
ends = ep.end
idx = _restrict(time_array, starts, ends)
time_array = time_array[idx]
data_array = data_array[idx]
new_data_array = _convolve(time_array, data_array, starts, ends, array, trim)
kwargs_dict = dict(time_support=ep)
nap_class = _get_class(new_data_array)
if isinstance(self, TsdFrame) and array.ndim == 1: # keep columns
kwargs_dict["columns"] = self.columns
return nap_class(t=time_array, d=new_data_array, **kwargs_dict)
def smooth(self, std, windowsize=None, time_units="s", size_factor=100, norm=True):
"""Smooth a time series with a gaussian kernel.
`std` is the standard deviation of the gaussian kernel in units of time.
If only `std` is passed, the function will compute the standard deviation and size in number
of time points automatically based on the sampling rate of the time series.
For example, if the time series `tsd` has a sample rate of 100 Hz and `std` is 50 ms,
the standard deviation will be converted to an integer through
`tsd.rate * std = int(100 * 0.05) = 5`.
If `windowsize` is None, the function will select a kernel size as 100 times
the std in number of time points. This behavior can be controlled with the
parameter `size_factor`.
`norm` set to True normalizes the gaussian kernel to sum to 1.
In the following example, a time series `tsd` with a sampling rate of 100 Hz
is convolved with a gaussian kernel. The standard deviation is
0.05 second and the windowsize is 2 second. When instantiating the gaussian kernel
from scipy, it corresponds to parameters `M = 200` and `std=5`
>>> tsd.smooth(std=0.05, windowsize=2, time_units='s', norm=False)
This line is equivalent to :
>>> from scipy.signal.windows import gaussian
>>> kernel = gaussian(M = 200, std=5)
>>> tsd.convolve(window)
It is generally a good idea to visualize the kernel before applying any convolution.
See the scipy documentation for the [gaussian window](https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.windows.gaussian.html)
Parameters
----------
std : Number
Standard deviation in units of time
windowsize : Number
Size of the gaussian window in units of time.
time_units : str, optional
The time units in which std and windowsize are specified ('us', 'ms', 's' [default]).
size_factor : int, optional
How long should be the kernel size as a function of the standard deviation. Default is 100.
Bypassed if windowsize is used.
norm : bool, optional
Whether to normalized the gaussian kernel or not. Default is `True`.
Returns
-------
Tsd, TsdFrame, TsdTensor
Time series convolved with a gaussian kernel
"""
if not isinstance(std, (int, float)):
raise IOError("std should be type int or float")
if not isinstance(size_factor, int):
raise IOError("size_factor should be of type int")
if not isinstance(norm, bool):
raise IOError("norm should be of type boolean")
if not isinstance(time_units, str):
raise IOError("time_units should be of type str")
std = TsIndex.format_timestamps(np.array([std]), time_units)[0]
std_size = int(self.rate * std)
if windowsize is not None:
if not isinstance(windowsize, Number):
raise IOError("windowsize should be type int or float")
windowsize = TsIndex.format_timestamps(np.array([windowsize]), time_units)[
0
]
M = int(self.rate * windowsize)
else:
M = std_size * size_factor
if M % 2 == 0:
M += 1
window = signal.windows.gaussian(M=M, std=std_size)
if norm:
window = window / window.sum()
return self.convolve(window)
def interpolate(self, ts, ep=None, left=None, right=None):
"""Wrapper of the numpy linear interpolation method. See [numpy interpolate](https://numpy.org/doc/stable/reference/generated/numpy.interp.html)
for an explanation of the parameters.
The argument ts should be Ts, Tsd, TsdFrame, TsdTensor to ensure interpolating from sorted timestamps in the right unit,
Parameters
----------
ts : Ts, Tsd, TsdFrame or TsdTensor
The object holding the timestamps
ep : IntervalSet, optional
The epochs to use to interpolate. If None, the time support of Tsd is used.
left : None, optional
Value to return for ts < tsd[0], default is tsd[0].
right : None, optional
Value to return for ts > tsd[-1], default is tsd[-1].
"""
if not isinstance(ts, _Base):
raise IOError(
"First argument should be an instance of Ts, Tsd, TsdFrame or TsdTensor"
)
if left is not None and not isinstance(left, Number):
raise IOError("Argument left should be of type float or int")
if right is not None and not isinstance(right, Number):
raise IOError("Argument right should be of type float or int")
if ep is None:
ep = self.time_support
else:
if not isinstance(ep, IntervalSet):
raise IOError("ep should be an object of type IntervalSet")
new_t = ts.restrict(ep).index
new_shape = (
len(new_t) if self.values.ndim == 1 else (len(new_t),) + self.shape[1:]
)
new_d = np.full(new_shape, np.nan)
start = 0
for i in range(len(ep)):
t = ts.get(ep[i, 0], ep[i, 1])
tmp = self.get(ep[i, 0], ep[i, 1])
if len(t) and len(tmp):
if self.values.ndim == 1:
new_d[start : start + len(t)] = np.interp(
t.index.values,
tmp.index.values,
tmp.values,
left=left,
right=right,
)
else:
interpolated_values = np.apply_along_axis(
lambda row: np.interp(
t.index.values,
tmp.index.values,
row,
left=left,
right=right,
),
0,
tmp.values,
)
new_d[start : start + len(t), ...] = interpolated_values
start += len(t)
kwargs_dict = dict(time_support=ep)
if hasattr(self, "columns"):
kwargs_dict["columns"] = self.columns
return self.__class__(t=new_t, d=new_d, **kwargs_dict)
[docs]
class TsdTensor(_BaseTsd):
"""
Container for neurophysiological time series with more than 2 dimensions (movies).
Attributes
----------
rate : float
Frequency of the time series (Hz) computed over the time support
time_support : IntervalSet
The time support of the time series
"""
[docs]
def __init__(
self, t, d, time_units="s", time_support=None, load_array=True, **kwargs
):
"""
TsdTensor initializer
Parameters
----------
t : numpy.ndarray
the time index t
d : numpy.ndarray
The data
time_units : str, optional
The time units in which times are specified ('us', 'ms', 's' [default]).
time_support : IntervalSet, optional
The time support of the TsdFrame object
load_array : bool, optional
Whether the data should be converted to a numpy (or jax) array. Useful when passing a memory map object like zarr.
Default is True. Does not apply if `d` is already a numpy array.
"""
super().__init__(t, d, time_units, time_support, load_array)
assert (
self.values.ndim >= 3
), "Data should have more than 2 dimensions. If ndim < 3, use TsdFrame or Tsd object"
self.nap_class = self.__class__.__name__
self._initialized = True
def __repr__(self):
headers = ["Time (s)", ""]
bottom = "dtype: {}".format(self.dtype) + ", shape: {}".format(self.shape)
max_rows = 2
rows = _get_terminal_size()[1]
max_rows = np.maximum(rows - 10, 2)
if len(self):
def create_str(array):
if array.ndim == 1:
if len(array) > 2:
return np.array2string(
np.array([array[0], array[-1]]),
precision=6,
separator=" ... ",
)
else:
return np.array2string(array, precision=6, separator=", ")
else:
return "[" + create_str(array[0]) + " ...]"
_str_ = []
if self.shape[0] > max_rows:
n_rows = max_rows // 2
for i, array in zip(self.index[0:n_rows], self.values[0:n_rows]):
_str_.append([i, create_str(array)])
_str_.append(["...", ""])
for i, array in zip(
self.index[-n_rows:],
self.values[self.values.shape[0] - n_rows : self.values.shape[0]],
):
_str_.append([i, create_str(array)])
else:
for i, array in zip(self.index, self.values):
_str_.append([i, create_str(array)])
return tabulate(_str_, headers=headers, colalign=("left",)) + "\n" + bottom
else:
return tabulate([], headers=headers) + "\n" + bottom
def __getitem__(self, key, *args, **kwargs):
output = self.values.__getitem__(key)
if isinstance(key, tuple):
index = self.index.__getitem__(key[0])
else:
index = self.index.__getitem__(key)
if isinstance(index, Number):
index = np.array([index])
if all(is_array_like(a) for a in [index, output]):
if output.shape[0] == index.shape[0]:
if output.ndim == 1:
return Tsd(t=index, d=output, time_support=self.time_support)
elif output.ndim == 2:
return TsdFrame(
t=index, d=output, time_support=self.time_support, **kwargs
)
else:
return TsdTensor(t=index, d=output, time_support=self.time_support)
else:
return output
else:
return output
[docs]
def save(self, filename):
"""
Save TsdTensor object in npz format. The file will contain the timestamps, the
data and the time support.
The main purpose of this function is to save small/medium sized time series
objects. For example, you extracted several channels from your recording and
filtered them. You can save the filtered channels as a npz to avoid
reprocessing it.
You can load the object with `nap.load_file`. Keys are 't', 'd', 'start', 'end', 'type'
and 'columns' for columns names.
Parameters
----------
filename : str
The filename
Examples
--------
>>> import pynapple as nap
>>> import numpy as np
>>> tsdtensor = nap.TsdTensor(t=np.array([0., 1.]), d = np.zeros((2,3,4)))
>>> tsdtensor.save("my_path/my_tsdtensor.npz")
To load you file, you can use the `nap.load_file` function :
>>> tsdtensor = nap.load_file("my_path/my_tsdtensor.npz")
Raises
------
RuntimeError
If filename is not str, path does not exist or filename is a directory.
"""
filename = self._get_filename(filename)
np.savez(
filename,
t=self.index.values,
d=self.values,
start=self.time_support.start,
end=self.time_support.end,
type=np.array([self.nap_class], dtype=np.str_),
)
return
[docs]
class TsdFrame(_BaseTsd):
"""
Column-based container for neurophysiological time series.
Attributes
----------
rate : float
Frequency of the time series (Hz) computed over the time support
time_support : IntervalSet
The time support of the time series
"""
[docs]
def __init__(
self,
t,
d=None,
time_units="s",
time_support=None,
columns=None,
load_array=True,
):
"""
TsdFrame initializer
A pandas.DataFrame can be passed directly
Parameters
----------
t : numpy.ndarray or pandas.DataFrame
the time index t, or a pandas.DataFrame (if d is None)
d : numpy.ndarray
The data
time_units : str, optional
The time units in which times are specified ('us', 'ms', 's' [default]).
time_support : IntervalSet, optional
The time support of the TsdFrame object
columns : iterables
Column names
load_array : bool, optional
Whether the data should be converted to a numpy (or jax) array. Useful when passing a memory map object like zarr.
Default is True. Does not apply if `d` is already a numpy array.
"""
c = columns
if isinstance(t, pd.DataFrame):
d = t.values
c = t.columns.values
t = t.index.values
else:
assert d is not None, "Missing argument d when initializing TsdFrame"
super().__init__(t, d, time_units, time_support, load_array)
assert self.values.ndim <= 2, "Data should be 1 or 2 dimensional."
if self.values.ndim == 1:
self.values = np.expand_dims(self.values, 1)
if c is None or len(c) != self.values.shape[1]:
c = np.arange(self.values.shape[1], dtype="int")
else:
assert (
len(c) == self.values.shape[1]
), "Number of columns should match the second dimension of d"
self.columns = pd.Index(c)
self.nap_class = self.__class__.__name__
self._initialized = True
@property
def loc(self):
return _TsdFrameSliceHelper(self)
def __repr__(self):
headers = ["Time (s)"] + [str(k) for k in self.columns]
bottom = "dtype: {}".format(self.dtype) + ", shape: {}".format(self.shape)
cols, rows = _get_terminal_size()
max_cols = np.maximum(cols // 100, 5)
max_rows = np.maximum(rows - 10, 2)
if self.shape[1] > max_cols:
headers = headers[0 : max_cols + 1] + ["..."]
def round_if_float(x):
if isinstance(x, float):
return np.round(x, 5)
else:
return x
with warnings.catch_warnings():
warnings.simplefilter("ignore")
if len(self):
table = []
end = ["..."] if self.shape[1] > max_cols else []
if len(self) > max_rows:
n_rows = max_rows // 2
for i, array in zip(
self.index[0:n_rows], self.values[0:n_rows, 0:max_cols]
):
table.append([i] + [round_if_float(k) for k in array] + end)
table.append(["..."])
for i, array in zip(
self.index[-n_rows:],
self.values[
self.values.shape[0] - n_rows : self.values.shape[0],
0:max_cols,
],
):
table.append([i] + [round_if_float(k) for k in array] + end)
return (
tabulate(table, headers=headers, colalign=("left",))
+ "\n"
+ bottom
)
else:
for i, array in zip(self.index, self.values[:, 0:max_cols]):
table.append([i] + [round_if_float(k) for k in array] + end)
return (
tabulate(table, headers=headers, colalign=("left",))
+ "\n"
+ bottom
)
else:
return tabulate([], headers=headers) + "\n" + bottom
def __setitem__(self, key, value):
try:
if isinstance(key, str):
new_key = self.columns.get_indexer([key])
self.values.__setitem__((slice(None, None, None), new_key[0]), value)
elif hasattr(key, "__iter__") and all([isinstance(k, str) for k in key]):
new_key = self.columns.get_indexer(key)
self.values.__setitem__((slice(None, None, None), new_key), value)
else:
self.values.__setitem__(key, value)
except IndexError:
raise IndexError
def __getitem__(self, key, *args, **kwargs):
if (
isinstance(key, str)
or hasattr(key, "__iter__")
and all([isinstance(k, str) for k in key])
):
return self.loc[key]
else:
output = self.values.__getitem__(key)
columns = self.columns
if isinstance(key, tuple):
index = self.index.__getitem__(key[0])
if len(key) == 2:
columns = self.columns.__getitem__(key[1])
else:
index = self.index.__getitem__(key)
if isinstance(index, Number):
index = np.array([index])
if all(is_array_like(a) for a in [index, output]):
if output.shape[0] == index.shape[0]:
# if isinstance(columns, pd.Index):
# if not pd.api.types.is_integer_dtype(columns):
kwargs["columns"] = columns
return _get_class(output)(
t=index, d=output, time_support=self.time_support, **kwargs
)
else:
return output
else:
return output
[docs]
def as_dataframe(self):
"""
Convert the TsdFrame object to a pandas.DataFrame object.
Returns
-------
out: pandas.DataFrame
_
"""
return pd.DataFrame(
index=self.index.values, data=self.values, columns=self.columns
)
[docs]
def as_units(self, units="s"):
"""
Returns a DataFrame with time expressed in the desired unit.
Parameters
----------
units : str, optional
('us', 'ms', 's' [default])
Returns
-------
pandas.DataFrame
the series object with adjusted times
"""
t = self.index.in_units(units)
if units == "us":
t = t.astype(np.int64)
df = pd.DataFrame(index=t, data=self.values)
df.index.name = "Time (" + str(units) + ")"
df.columns = self.columns.copy()
return df
[docs]
def save(self, filename):
"""
Save TsdFrame object in npz format. The file will contain the timestamps, the
data and the time support.
The main purpose of this function is to save small/medium sized time series
objects. For example, you extracted several channels from your recording and
filtered them. You can save the filtered channels as a npz to avoid
reprocessing it.
You can load the object with `nap.load_file`. Keys are 't', 'd', 'start', 'end', 'type'
and 'columns' for columns names.
Parameters
----------
filename : str
The filename
Examples
--------
>>> import pynapple as nap
>>> import numpy as np
>>> tsdframe = nap.TsdFrame(t=np.array([0., 1.]), d = np.array([[2, 3],[4,5]]), columns=['a', 'b'])
>>> tsdframe.save("my_path/my_tsdframe.npz")
To load you file, you can use the `nap.load_file` function :
>>> tsdframe = nap.load_file("my_path/my_tsdframe.npz")
>>> tsdframe
a b
Time (s)
0.0 2 3
1.0 4 5
Raises
------
RuntimeError
If filename is not str, path does not exist or filename is a directory.
"""
filename = self._get_filename(filename)
cols_name = self.columns
if cols_name.dtype == np.dtype("O"):
cols_name = cols_name.astype(str)
np.savez(
filename,
t=self.index.values,
d=self.values[:],
start=self.time_support.start,
end=self.time_support.end,
columns=cols_name,
type=np.array(["TsdFrame"], dtype=np.str_),
)
return
[docs]
class Tsd(_BaseTsd):
"""
1-dimensional container for neurophysiological time series.
Tsd provides standardized time representation, plus various functions for manipulating times series.
Attributes
----------
rate : float
Frequency of the time series (Hz) computed over the time support
time_support : IntervalSet
The time support of the time series
"""
[docs]
def __init__(
self, t, d=None, time_units="s", time_support=None, load_array=True, **kwargs
):
"""
Tsd Initializer.
Parameters
----------
t : numpy.ndarray or pandas.Series
An object transformable in a time series, or a pandas.Series equivalent (if d is None)
d : numpy.ndarray, optional
The data of the time series
time_units : str, optional
The time units in which times are specified ('us', 'ms', 's' [default])
time_support : IntervalSet, optional
The time support of the tsd object
load_array : bool, optional
Whether the data should be converted to a numpy (or jax) array. Useful when passing a memory map object like zarr.
Default is True. Does not apply if `d` is already a numpy array.
"""
if isinstance(t, pd.Series):
d = t.values
t = t.index.values
else:
assert d is not None, "Missing argument d when initializing Tsd"
super().__init__(t, d, time_units, time_support, load_array)
assert self.values.ndim == 1, "Data should be 1 dimensional"
self.nap_class = self.__class__.__name__
self._initialized = True
def __repr__(self):
headers = ["Time (s)", ""]
bottom = "dtype: {}".format(self.dtype) + ", shape: {}".format(self.shape)
max_rows = 2
rows = _get_terminal_size()[1]
max_rows = np.maximum(rows - 10, 2)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
if len(self):
if len(self) > max_rows:
n_rows = max_rows // 2
table = []
for i, v in zip(self.index[0:n_rows], self.values[0:n_rows]):
table.append([i, v])
table.append(["..."])
for i, v in zip(
self.index[-n_rows:],
self.values[
self.values.shape[0] - n_rows : self.values.shape[0]
],
):
table.append([i, v])
return (
tabulate(table, headers=headers, colalign=("left",))
+ "\n"
+ bottom
)
else:
return (
tabulate(
np.vstack((self.index, self.values)).T,
headers=headers,
colalign=("left",),
)
+ "\n"
+ bottom
)
else:
return tabulate([], headers=headers) + "\n" + bottom
def __getitem__(self, key, *args, **kwargs):
output = self.values.__getitem__(key)
if isinstance(key, tuple):
index = self.index.__getitem__(key[0])
else:
index = self.index.__getitem__(key)
if isinstance(index, Number):
index = np.array([index])
if all(is_array_like(a) for a in [index, output]):
if output.shape[0] == index.shape[0]:
return _get_class(output)(
t=index, d=output, time_support=self.time_support, **kwargs
)
else:
return output
else:
return output
[docs]
def as_series(self):
"""
Convert the Ts/Tsd object to a pandas.Series object.
Returns
-------
out: pandas.Series
_
"""
return pd.Series(
index=self.index.values, data=self.values, copy=True, dtype="float64"
)
[docs]
def as_units(self, units="s"):
"""
Returns a pandas Series with time expressed in the desired unit.
Parameters
----------
units : str, optional
('us', 'ms', 's' [default])
Returns
-------
pandas.Series
the series object with adjusted times
"""
ss = self.as_series()
t = self.index.in_units(units)
if units == "us":
t = t.astype(np.int64)
ss.index = t
ss.index.name = "Time (" + str(units) + ")"
return ss
[docs]
def threshold(self, thr, method="above"):
"""
Apply a threshold function to the tsd to return a new tsd
with the time support being the epochs above/below/>=/<= the threshold
Parameters
----------
thr : float
The threshold value
method : str, optional
The threshold method ("above"[default], "below", "aboveequal", "belowequal")
Returns
-------
out: Tsd
All the time points below/ above/greater than equal to/less than equal to the threshold
Raises
------
ValueError
Raise an error if method is unknown.
RuntimeError
Raise an error if thr is too high/low and no epochs is found.
Examples
--------
This example finds all epoch above 0.5 within the tsd object.
>>> import pynapple as nap
>>> tsd = nap.Tsd(t=np.arange(100), d=np.random.rand(100))
>>> newtsd = tsd.threshold(0.5)
The epochs with the times above/below the threshold can be accessed through the time support:
>>> tsd = nap.Tsd(t=np.arange(100), d=np.arange(100), time_units='s')
>>> tsd.threshold(50).time_support
>>> start end
>>> 0 50.5 99.0
"""
if method not in ["above", "below", "aboveequal", "belowequal"]:
raise ValueError(
"Method {} for thresholding is not accepted.".format(method)
)
time_array = self.index.values
data_array = self.values
starts = self.time_support.start
ends = self.time_support.end
t, d, ns, ne = _threshold(time_array, data_array, starts, ends, thr, method)
time_support = IntervalSet(start=ns, end=ne)
return Tsd(t=t, d=d, time_support=time_support)
[docs]
def to_tsgroup(self):
"""
Convert Tsd to a TsGroup by grouping timestamps with the same values.
By default, the values are converted to integers.
Examples
--------
>>> import pynapple as nap
>>> import numpy as np
>>> tsd = nap.Tsd(t = np.array([0, 1, 2, 3]), d = np.array([0, 2, 0, 1]))
Time (s)
0.0 0
1.0 2
2.0 0
3.0 1
dtype: int64
>>> tsd.to_tsgroup()
Index rate
------- ------
0 0.67
1 0.33
2 0.33
The reverse operation can be done with the TsGroup.to_tsd function :
>>> tsgroup.to_tsd()
Time (s)
0.0 0.0
1.0 2.0
2.0 0.0
3.0 1.0
dtype: float64
Returns
-------
TsGroup
Grouped timestamps
"""
ts_group = importlib.import_module(".ts_group", "pynapple.core")
t = self.index.values
d = self.values.astype("int")
idx = np.unique(d)
group = {}
for k in idx:
group[k] = Ts(t=t[d == k], time_support=self.time_support)
return ts_group.TsGroup(
group, time_support=self.time_support, bypass_check=True
)
[docs]
def save(self, filename):
"""
Save Tsd object in npz format. The file will contain the timestamps, the
data and the time support.
The main purpose of this function is to save small/medium sized time series
objects. For example, you extracted one channel from your recording and
filtered it. You can save the filtered channel as a npz to avoid
reprocessing it.
You can load the object with `nap.load_file`. Keys are 't', 'd', 'start', 'end' and 'type'.
See the example below.
Parameters
----------
filename : str
The filename
Examples
--------
>>> import pynapple as nap
>>> import numpy as np
>>> tsd = nap.Tsd(t=np.array([0., 1.]), d = np.array([2, 3]))
>>> tsd.save("my_path/my_tsd.npz")
To load you file, you can use the `nap.load_file` function :
>>> tsd = nap.load_file("my_path/my_tsd.npz")
>>> tsd
Time (s)
0.0 2
1.0 3
dtype: int64
Raises
------
RuntimeError
If filename is not str, path does not exist or filename is a directory.
"""
filename = self._get_filename(filename)
np.savez(
filename,
t=self.index.values,
d=self.values,
start=self.time_support.start,
end=self.time_support.end,
type=np.array([self.nap_class], dtype=np.str_),
)
return
[docs]
class Ts(_Base):
"""
Timestamps only object for a time series with only time index.
Attributes
----------
rate : float
Frequency of the time series (Hz) computed over the time support
time_support : IntervalSet
The time support of the time series
"""
[docs]
def __init__(self, t, time_units="s", time_support=None):
"""
Ts Initializer
Parameters
----------
t : numpy.ndarray or pandas.Series
An object transformable in timestamps, or a pandas.Series equivalent (if d is None)
time_units : str, optional
The time units in which times are specified ('us', 'ms', 's' [default])
time_support : IntervalSet, optional
The time support of the Ts object
"""
super().__init__(t, time_units, time_support)
if isinstance(time_support, IntervalSet) and len(self.index):
starts = time_support.start
ends = time_support.end
idx = _restrict(self.index.values, starts, ends)
self.index = TsIndex(self.index.values[idx])
self.rate = self.index.shape[0] / np.sum(
time_support.values[:, 1] - time_support.values[:, 0]
)
self.nap_class = self.__class__.__name__
self._initialized = True
def __repr__(self):
upper = "Time (s)"
max_rows = 2
rows = _get_terminal_size()[1]
max_rows = np.maximum(rows - 10, 2)
if len(self) > max_rows:
n_rows = max_rows // 2
_str_ = "\n".join(
[str(i) for i in self.index[0:n_rows]]
+ ["..."]
+ [str(i) for i in self.index[-n_rows:]]
)
else:
_str_ = "\n".join([str(i) for i in self.index])
bottom = "shape: {}".format(len(self.index))
return "\n".join((upper, _str_, bottom))
def __getitem__(self, key):
if isinstance(key, tuple):
index = self.index.__getitem__(key[0])
else:
index = self.index.__getitem__(key)
if isinstance(index, Number):
index = np.array([index])
return Ts(t=index, time_support=self.time_support)
[docs]
def as_series(self):
"""
Convert the Ts/Tsd object to a pandas.Series object.
Returns
-------
out: pandas.Series
_
"""
return pd.Series(index=self.index.values, dtype="object")
[docs]
def as_units(self, units="s"):
"""
Returns a pandas Series with time expressed in the desired unit.
Parameters
----------
units : str, optional
('us', 'ms', 's' [default])
Returns
-------
pandas.Series
the series object with adjusted times
"""
t = self.index.in_units(units)
if units == "us":
t = t.astype(np.int64)
ss = pd.Series(index=t, dtype="object")
ss.index.name = "Time (" + str(units) + ")"
return ss
[docs]
def value_from(self, data, ep=None):
"""
Replace the value with the closest value from Tsd/TsdFrame/TsdTensor argument
Parameters
----------
data : Tsd, TsdFrame or TsdTensor
The object holding the values to replace.
ep : IntervalSet (optional)
The IntervalSet object to restrict the operation.
If None, the time support of the tsd input object is used.
Returns
-------
out : Tsd, TsdFrame or TsdTensor
Object with the new values
Examples
--------
In this example, the ts object will receive the closest values in time from tsd.
>>> import pynapple as nap
>>> import numpy as np
>>> t = np.unique(np.sort(np.random.randint(0, 1000, 100))) # random times
>>> ts = nap.Ts(t=t, time_units='s')
>>> tsd = nap.Tsd(t=np.arange(0,1000), d=np.random.rand(1000), time_units='s')
>>> ep = nap.IntervalSet(start = 0, end = 500, time_units = 's')
The variable ts is a time series object containing only nan.
The tsd object containing the values, for example the tracking data, and the epoch to restrict the operation.
>>> newts = ts.value_from(tsd, ep)
newts is the same size as ts restrict to ep.
>>> print(len(ts.restrict(ep)), len(newts))
52 52
"""
assert isinstance(
data, _BaseTsd
), "First argument should be an instance of Tsd, TsdFrame or TsdTensor"
t, d, time_support, kwargs = super().value_from(data, ep)
return data.__class__(t, d, time_support=time_support, **kwargs)
[docs]
def count(self, *args, dtype=None, **kwargs):
"""
Count occurences of events within bin_size or within a set of bins defined as an IntervalSet.
You can call this function in multiple ways :
1. *tsd.count(bin_size=1, time_units = 'ms')*
-> Count occurence of events within a 1 ms bin defined on the time support of the object.
2. *tsd.count(1, ep=my_epochs)*
-> Count occurent of events within a 1 second bin defined on the IntervalSet my_epochs.
3. *tsd.count(ep=my_bins)*
-> Count occurent of events within each epoch of the intervalSet object my_bins
4. *tsd.count()*
-> Count occurent of events within each epoch of the time support.
bin_size should be seconds unless specified.
If bin_size is used and no epochs is passed, the data will be binned based on the time support of the object.
Parameters
----------
bin_size : None or float, optional
The bin size (default is second)
ep : None or IntervalSet, optional
IntervalSet to restrict the operation
time_units : str, optional
Time units of bin size ('us', 'ms', 's' [default])
dtype: type, optional
Data type for the count. Default is np.int64.
Returns
-------
out: Tsd
A Tsd object indexed by the center of the bins.
Examples
--------
This example shows how to count events within bins of 0.1 second.
>>> import pynapple as nap
>>> import numpy as np
>>> t = np.unique(np.sort(np.random.randint(0, 1000, 100)))
>>> ts = nap.Ts(t=t, time_units='s')
>>> bincount = ts.count(0.1)
An epoch can be specified:
>>> ep = nap.IntervalSet(start = 100, end = 800, time_units = 's')
>>> bincount = ts.count(0.1, ep=ep)
And bincount automatically inherit ep as time support:
>>> bincount.time_support
start end
0 100.0 800.0
"""
t, d, ep = super().count(*args, dtype=dtype, **kwargs)
return Tsd(t=t, d=d, time_support=ep)
[docs]
def fillna(self, value):
"""
Similar to pandas fillna function.
Parameters
----------
value : Number
Value for filling
Returns
-------
Tsd
"""
assert isinstance(value, Number), "Only a scalar can be passed to fillna"
d = np.empty(len(self))
d.fill(value)
return Tsd(t=self.index, d=d, time_support=self.time_support)
[docs]
def save(self, filename):
"""
Save Ts object in npz format. The file will contain the timestamps and
the time support.
The main purpose of this function is to save small/medium sized timestamps
object.
You can load the object with `nap.load_file`. Keys are 't', 'start' and 'end' and 'type'.
See the example below.
Parameters
----------
filename : str
The filename
Examples
--------
>>> import pynapple as nap
>>> import numpy as np
>>> ts = nap.Ts(t=np.array([0., 1., 1.5]))
>>> ts.save("my_path/my_ts.npz")
To load you file, you can use the `nap.load_file` function :
>>> ts = nap.load_file("my_path/my_ts.npz")
>>> ts
Time (s)
0.0
1.0
1.5
Raises
------
RuntimeError
If filename is not str, path does not exist or filename is a directory.
"""
filename = self._get_filename(filename)
np.savez(
filename,
t=self.index.values,
start=self.time_support.start,
end=self.time_support.end,
type=np.array(["Ts"], dtype=np.str_),
)
return