"""
The class `TsGroup` helps group objects with different timestamps
(i.e. timestamps of spikes of a population of neurons).
"""
import warnings
from collections import UserDict
from collections.abc import Hashable
import numpy
import numpy as np
import pandas as pd
from tabulate import tabulate
from ._core_functions import _count
from ._jitted_functions import jitunion, jitunion_isets
from .base_class import _Base
from .config import nap_config
from .interval_set import IntervalSet
from .time_index import TsIndex
from .time_series import Ts, Tsd, TsdFrame, _BaseTsd, is_array_like
from .utils import _get_terminal_size, check_filename, convert_to_numpy_array
def _union_intervals(i_sets):
"""
Helper to merge intervals from ts_group
"""
n = len(i_sets)
if n == 1:
return i_sets[0]
new_start = np.zeros(0)
new_end = np.zeros(0)
if n == 2:
new_start, new_end = jitunion(
i_sets[0].start,
i_sets[0].end,
i_sets[1].start,
i_sets[1].end,
)
if n > 2:
sizes = np.array([i_sets[i].shape[0] for i in range(n)])
startends = np.zeros((np.sum(sizes), 2))
ct = 0
for i in range(sizes.shape[0]):
startends[ct : ct + sizes[i], :] = i_sets[i].values
ct += sizes[i]
new_start, new_end = jitunion_isets(startends[:, 0], startends[:, 1])
return IntervalSet(new_start, new_end)
[docs]
class TsGroup(UserDict):
"""
Dictionary-like object to group objects with different timestamps (for example timestamps of spikes of a population of neurons).
Attributes
----------
time_support: IntervalSet
The time support of the TsGroup
rates : pandas.Series
The rate of each element of the TsGroup
"""
[docs]
def __init__(
self, data, time_support=None, time_units="s", bypass_check=False, **kwargs
):
"""
TsGroup Initializer.
Parameters
----------
data : dict or iterable
Dictionary or iterable of Ts/Tsd objects. The keys should be integer-convertible; if a non-dict iterator is
passed, its values will be used to create a dict with integer keys.
time_support : IntervalSet, optional
The time support of the TsGroup. Ts/Tsd objects will be restricted to the time support if passed.
If no time support is specified, TsGroup will merge time supports from all the Ts/Tsd objects in data.
time_units : str, optional
Time units if data does not contain Ts/Tsd objects ('us', 'ms', 's' [default]).
bypass_check: bool, optional
To avoid checking that each element is within time_support.
Useful to speed up initialization of TsGroup when Ts/Tsd objects have already been restricted beforehand
**kwargs
Meta-info about the Ts/Tsd objects. Can be either pandas.Series, numpy.ndarray, list or tuple
Note that the index should match the index of the input dictionary if pandas Series
Raises
------
RuntimeError
Raise error if the union of time support of Ts/Tsd object is empty.
ValueError
- If a key cannot be converted to integer.
- If a key was a floating point with non-negligible decimal part.
- If the converted keys are not unique, i.e. {1: ts_2, "2": ts_2} is valid,
{1: ts_2, "1": ts_2} is invalid.
"""
# Check input type
if time_units not in ["s", "ms", "us"]:
raise ValueError("Argument time_units should be 's', 'ms' or 'us'")
if not isinstance(bypass_check, bool):
raise TypeError("Argument bypass_check should be of type bool")
passed_time_support = False
if isinstance(time_support, IntervalSet):
passed_time_support = True
else:
if time_support is not None:
raise TypeError("Argument time_support should be of type IntervalSet")
else:
passed_time_support = False
self._initialized = False
if not isinstance(data, dict):
data = dict(enumerate(data))
# convert all keys to integer
try:
keys = [int(k) for k in data.keys()]
except Exception:
raise ValueError("All keys must be convertible to integer.")
# check that there were no floats with decimal points in keys.
# i.e. 0.5 is not a valid key
if not all(np.allclose(keys[j], float(k)) for j, k in enumerate(data.keys())):
raise ValueError("All keys must have integer value!}")
# check that we have the same num of unique keys
# {"0":val, 0:val} would be a problem...
if len(keys) != len(np.unique(keys)):
raise ValueError("Two dictionary keys contain the same integer value!")
data = {keys[j]: data[k] for j, k in enumerate(data.keys())}
self.index = np.sort(keys)
# Make sure data dict and index are ordered the same
data = {k: data[k] for k in self.index}
self._metadata = pd.DataFrame(index=self.index, columns=["rate"], dtype="float")
# Transform elements to Ts/Tsd objects
for k in self.index:
if not isinstance(data[k], _Base):
if isinstance(data[k], list) or is_array_like(data[k]):
warnings.warn(
"Elements should not be passed as {}. Default time units is seconds when creating the Ts object.".format(
type(data[k])
),
stacklevel=2,
)
data[k] = Ts(
t=convert_to_numpy_array(data[k], "key {}".format(k)),
time_support=time_support,
time_units=time_units,
)
# If time_support is passed, all elements of data are restricted prior to init
if passed_time_support:
self.time_support = time_support
if not bypass_check:
data = {k: data[k].restrict(self.time_support) for k in self.index}
else:
# Otherwise do the union of all time supports
time_support = _union_intervals([data[k].time_support for k in self.index])
if len(time_support) == 0:
raise RuntimeError(
"Union of time supports is empty. Consider passing a time support as argument."
)
self.time_support = time_support
if not bypass_check:
data = {k: data[k].restrict(self.time_support) for k in self.index}
UserDict.__init__(self, data)
# Making the TsGroup non mutable
self._initialized = True
# Trying to add argument as metainfo
self.set_info(**kwargs)
"""
Base functions
"""
def __getattr__(self, name):
"""
Allows dynamic access to metadata columns as properties.
Parameters
----------
name : str
The name of the metadata column to access.
Returns
-------
pandas.Series
The series of values for the requested metadata column.
Raises
------
AttributeError
If the requested attribute is not a metadata column.
"""
# avoid infinite recursion when pickling due to
# self._metadata.column having attributes '__reduce__', '__reduce_ex__'
if name in ("__getstate__", "__setstate__", "__reduce__", "__reduce_ex__"):
raise AttributeError(name)
# Check if the requested attribute is part of the metadata
if name in self._metadata.columns:
return self._metadata[name]
else:
# If the attribute is not part of the metadata, raise AttributeError
raise AttributeError(
f"'{type(self).__name__}' object has no attribute '{name}'"
)
def __setitem__(self, key, value):
if not self._initialized:
self._metadata.loc[int(key), "rate"] = float(value.rate)
super().__setitem__(int(key), value)
else:
if not isinstance(key, str):
raise ValueError("Metadata keys must be strings!")
# replicate pandas behavior of over-writing cols
if key in self._metadata.columns:
old_meta = self._metadata.copy()
self._metadata.pop(key)
try:
self.set_info(**{key: value})
except Exception:
self._metadata = old_meta
raise
else:
self.set_info(**{key: value})
def __getitem__(self, key):
# Standard dict keys are Hashable
if isinstance(key, Hashable):
if self.__contains__(key):
return self.data[key]
elif key in self._metadata.columns:
return self.get_info(key)
else:
raise KeyError(r"Key {} not in group index.".format(key))
# array boolean are transformed into indices
# note that raw boolean are hashable, and won't be
# tsd == tsg.to_tsd()
elif np.asarray(key).dtype == bool:
key = np.asarray(key)
if key.ndim != 1:
raise IndexError("Only 1-dimensional boolean indices are allowed!")
if len(key) != self.__len__():
raise IndexError(
"Boolean index length must be equal to the number of Ts in the group! "
f"The number of Ts is {self.__len__()}, but the bolean array"
f"has length {len(key)} instead!"
)
key = self.index[key]
keys_not_in = list(filter(lambda x: x not in self.index, key))
if len(keys_not_in):
raise KeyError(r"Key {} not in group index.".format(keys_not_in))
return self._ts_group_from_keys(key)
def _ts_group_from_keys(self, keys):
metadata = self._metadata.loc[
np.sort(keys), self._metadata.columns.drop("rate")
]
return TsGroup(
{k: self[k] for k in keys}, time_support=self.time_support, **metadata
)
def __repr__(self):
col_names = self._metadata.columns.drop("rate")
headers = ["Index", "rate"] + [c for c in col_names]
max_cols = 6
max_rows = 2
cols, rows = _get_terminal_size()
max_cols = np.maximum(cols // 12, 6)
max_rows = np.maximum(rows - 10, 2)
end_line = []
lines = []
def round_if_float(x):
if isinstance(x, float):
return np.round(x, 5)
else:
return x
if len(headers) > max_cols:
headers = headers[0:max_cols] + ["..."]
end_line.append("...")
if len(self) > max_rows:
n_rows = max_rows // 2
index = self.keys()
for i in index[0:n_rows]:
lines.append(
[i, np.round(self._metadata.loc[i, "rate"], 5)]
+ [
round_if_float(self._metadata.loc[i, c])
for c in col_names[0 : max_cols - 2]
]
+ end_line
)
lines.append(["..." for _ in range(len(headers))])
for i in index[-n_rows:]:
lines.append(
[i, np.round(self._metadata.loc[i, "rate"], 5)]
+ [
round_if_float(self._metadata.loc[i, c])
for c in col_names[0 : max_cols - 2]
]
+ end_line
)
else:
for i in self.data.keys():
lines.append(
[i, np.round(self._metadata.loc[i, "rate"], 5)]
+ [
round_if_float(self._metadata.loc[i, c])
for c in col_names[0 : max_cols - 2]
]
+ end_line
)
return tabulate(lines, headers=headers)
def __str__(self):
return self.__repr__()
[docs]
def keys(self):
"""
Return index/keys of TsGroup
Returns
-------
list
List of keys
"""
return list(self.data.keys())
[docs]
def items(self):
"""
Return a list of key/object.
Returns
-------
list
List of tuples
"""
return list(self.data.items())
[docs]
def values(self):
"""
Return a list of all the Ts/Tsd objects in the TsGroup
Returns
-------
list
List of Ts/Tsd objects
"""
return list(self.data.values())
@property
def rates(self):
"""
Return the rates of each element of the group in Hz
"""
return self._metadata["rate"]
#######################
# Metadata
#######################
@property
def metadata_columns(self):
"""
Returns list of metadata columns
"""
return list(self._metadata.columns)
def _check_metadata_column_names(self, *args, **kwargs):
invalid_cols = []
for arg in args:
if isinstance(arg, pd.DataFrame):
invalid_cols += [col for col in arg.columns if hasattr(self, col)]
for k, v in kwargs.items():
if isinstance(v, (list, numpy.ndarray, pd.Series)) and hasattr(self, k):
invalid_cols += [k]
if invalid_cols:
raise ValueError(
f"Invalid metadata name(s) {invalid_cols}. Metadata name must differ from "
f"TsGroup attribute names!"
)
[docs]
def set_info(self, *args, **kwargs):
"""
Add metadata information about the TsGroup.
Metadata are saved as a DataFrame.
Parameters
----------
*args
pandas.Dataframe or list of pandas.DataFrame
**kwargs
Can be either pandas.Series, numpy.ndarray, list or tuple
Raises
------
RuntimeError
Raise an error if
no column labels are found when passing simple arguments,
indexes are not equals for a pandas series,+
not the same length when passing numpy array.
TypeError
If some of the provided metadata could not be set.
Examples
--------
>>> import pynapple as nap
>>> import numpy as np
>>> tmp = { 0:nap.Ts(t=np.arange(0,200), time_units='s'),
1:nap.Ts(t=np.arange(0,200,0.5), time_units='s'),
2:nap.Ts(t=np.arange(0,300,0.25), time_units='s'),
}
>>> tsgroup = nap.TsGroup(tmp)
To add metadata with a pandas.DataFrame:
>>> import pandas as pd
>>> structs = pd.DataFrame(index = [0,1,2], data=['pfc','pfc','ca1'], columns=['struct'])
>>> tsgroup.set_info(structs)
>>> tsgroup
Index Freq. (Hz) struct
------- ------------ --------
0 1 pfc
1 2 pfc
2 4 ca1
To add metadata with a pd.Series, numpy.ndarray, list or tuple:
>>> hd = pd.Series(index = [0,1,2], data = [0,1,1])
>>> tsgroup.set_info(hd=hd)
>>> tsgroup
Index Freq. (Hz) struct hd
------- ------------ -------- ----
0 1 pfc 0
1 2 pfc 1
2 4 ca1 1
"""
# check for duplicate names, otherwise "self.metadata_name"
# syntax would behave unexpectedly.
self._check_metadata_column_names(*args, **kwargs)
not_set = []
if len(args):
for arg in args:
if isinstance(arg, pd.DataFrame):
if pd.Index.equals(self._metadata.index, arg.index):
self._metadata = self._metadata.join(arg)
else:
raise RuntimeError("Index are not equals")
elif isinstance(arg, (pd.Series, np.ndarray, list)):
raise RuntimeError("Argument should be passed as keyword argument.")
else:
not_set.append(arg)
if len(kwargs):
for k, v in kwargs.items():
if isinstance(v, pd.Series):
if pd.Index.equals(self._metadata.index, v.index):
self._metadata[k] = v
else:
raise RuntimeError(
"Index are not equals for argument {}".format(k)
)
elif isinstance(v, (np.ndarray, list, tuple)):
if len(self._metadata) == len(v):
self._metadata[k] = np.asarray(v)
else:
raise RuntimeError("Array is not the same length.")
else:
not_set.append({k: v})
if not_set:
raise TypeError(
f"Cannot set the following metadata:\n{not_set}.\nMetadata columns provided must be "
f"of type `panda.Series`, `tuple`, `list`, or `numpy.ndarray`."
)
[docs]
def get_info(self, key):
"""
Returns the metainfo located in one column.
The key for the column frequency is "rate".
Parameters
----------
key : str
One of the metainfo columns name
Returns
-------
pandas.Series
The metainfo
"""
if key in ["freq", "frequency"]:
key = "rate"
return self._metadata[key]
#################################
# Generic functions of Tsd objects
#################################
[docs]
def restrict(self, ep):
"""
Restricts a TsGroup object to a set of time intervals delimited by an IntervalSet object
Parameters
----------
ep : IntervalSet
the IntervalSet object
Returns
-------
TsGroup
TsGroup object restricted to ep
Examples
--------
>>> import pynapple as nap
>>> import numpy as np
>>> tmp = { 0:nap.Ts(t=np.arange(0,200), time_units='s'),
1:nap.Ts(t=np.arange(0,200,0.5), time_units='s'),
2:nap.Ts(t=np.arange(0,300,0.25), time_units='s'),
}
>>> tsgroup = nap.TsGroup(tmp)
>>> ep = nap.IntervalSet(start=0, end=100, time_units='s')
>>> newtsgroup = tsgroup.restrict(ep)
All objects within the TsGroup automatically inherit the epochs defined by ep.
>>> newtsgroup.time_support
start end
0 0.0 100.0
>>> newtsgroup[0].time_support
start end
0 0.0 100.0
"""
newgr = {}
for k in self.index:
newgr[k] = self.data[k].restrict(ep)
cols = self._metadata.columns.drop("rate")
return TsGroup(
newgr, time_support=ep, bypass_check=True, **self._metadata[cols]
)
[docs]
def value_from(self, tsd, ep=None):
"""
Replace the value of each Ts/Tsd object within the Ts group with the closest value from tsd argument
Parameters
----------
tsd : Tsd
The Tsd object holding the values to replace
ep : IntervalSet
The IntervalSet object to restrict the operation.
If None, the time support of the tsd input object is used.
Returns
-------
TsGroup
TsGroup object with the new values
Examples
--------
>>> import pynapple as nap
>>> import numpy as np
>>> tmp = { 0:nap.Ts(t=np.arange(0,200), time_units='s'),
1:nap.Ts(t=np.arange(0,200,0.5), time_units='s'),
2:nap.Ts(t=np.arange(0,300,0.25), time_units='s'),
}
>>> tsgroup = nap.TsGroup(tmp)
>>> ep = nap.IntervalSet(start=0, end=100, time_units='s')
The variable tsd is a time series object containing the values to assign, for example the tracking data:
>>> tsd = nap.Tsd(t=np.arange(0,100), d=np.random.rand(100), time_units='s')
>>> ep = nap.IntervalSet(start = 0, end = 100, time_units = 's')
>>> newtsgroup = tsgroup.value_from(tsd, ep)
"""
if ep is None:
ep = tsd.time_support
newgr = {}
for k in self.data:
newgr[k] = self.data[k].value_from(tsd, ep)
cols = self._metadata.columns.drop("rate")
return TsGroup(newgr, time_support=ep, **self._metadata[cols])
[docs]
def count(self, *args, dtype=None, **kwargs):
"""
Count occurences of events within bin_size or within a set of bins defined as an IntervalSet.
You can call this function in multiple ways :
1. *tsgroup.count(bin_size=1, time_units = 'ms')*
-> Count occurence of events within a 1 ms bin defined on the time support of the object.
2. *tsgroup.count(1, ep=my_epochs)*
-> Count occurent of events within a 1 second bin defined on the IntervalSet my_epochs.
3. *tsgroup.count(ep=my_bins)*
-> Count occurent of events within each epoch of the intervalSet object my_bins
4. *tsgroup.count()*
-> Count occurent of events within each epoch of the time support.
bin_size should be seconds unless specified.
If bin_size is used and no epochs is passed, the data will be binned based on the time support of the object.
Parameters
----------
bin_size : None or float, optional
The bin size (default is second)
ep : None or IntervalSet, optional
IntervalSet to restrict the operation
time_units : str, optional
Time units of bin size ('us', 'ms', 's' [default])
dtype: type, optional
Data type for the count. Default is np.int64.
Returns
-------
out: TsdFrame
A TsdFrame with the columns being the index of each item in the TsGroup.
Examples
--------
This example shows how to count events within bins of 0.1 second for the first 100 seconds.
>>> import pynapple as nap
>>> import numpy as np
>>> tmp = { 0:nap.Ts(t=np.arange(0,200), time_units='s'),
1:nap.Ts(t=np.arange(0,200,0.5), time_units='s'),
2:nap.Ts(t=np.arange(0,300,0.25), time_units='s'),
}
>>> tsgroup = nap.TsGroup(tmp)
>>> ep = nap.IntervalSet(start=0, end=100, time_units='s')
>>> bincount = tsgroup.count(0.1, ep)
>>> bincount
0 1 2
Time (s)
0.05 0 0 0
0.15 0 0 0
0.25 0 0 1
0.35 0 0 0
0.45 0 0 0
... .. .. ..
99.55 0 1 1
99.65 0 0 0
99.75 0 0 1
99.85 0 0 0
99.95 1 1 1
[1000 rows x 3 columns]
"""
bin_size = None
if "bin_size" in kwargs:
bin_size = kwargs["bin_size"]
if isinstance(bin_size, int):
bin_size = float(bin_size)
if not isinstance(bin_size, float):
raise ValueError("bin_size argument should be float.")
else:
for a in args:
if isinstance(a, (float, int)):
bin_size = float(a)
time_units = "s"
if "time_units" in kwargs:
time_units = kwargs["time_units"]
if not isinstance(time_units, str):
raise ValueError("time_units argument should be 's', 'ms' or 'us'.")
else:
for a in args:
if isinstance(a, str) and a in ["s", "ms", "us"]:
time_units = a
ep = self.time_support
if "ep" in kwargs:
ep = kwargs["ep"]
if not isinstance(ep, IntervalSet):
raise ValueError("ep argument should be IntervalSet")
else:
for a in args:
if isinstance(a, IntervalSet):
ep = a
if dtype:
try:
dtype = np.dtype(dtype)
except Exception:
raise ValueError(f"{dtype} is not a valid numpy dtype.")
starts = ep.start
ends = ep.end
if isinstance(bin_size, (float, int)):
bin_size = float(bin_size)
bin_size = TsIndex.format_timestamps(np.array([bin_size]), time_units)[0]
# Call it on first element to pre-allocate the array
if len(self) >= 1:
time_index, d = _count(
self.data[self.index[0]].index.values,
starts,
ends,
bin_size,
dtype=dtype,
)
count = np.zeros((len(time_index), len(self.index)), dtype=dtype)
count[:, 0] = d
for i in range(1, len(self.index)):
count[:, i] = _count(
self.data[self.index[i]].index.values,
starts,
ends,
bin_size,
dtype=dtype,
)[1]
return TsdFrame(t=time_index, d=count, time_support=ep, columns=self.index)
else:
time_index, _ = _count(np.array([]), starts, ends, bin_size, dtype=dtype)
return TsdFrame(
t=time_index, d=np.empty((len(time_index), 0)), time_support=ep
)
[docs]
def to_tsd(self, *args):
"""
Convert TsGroup to a Tsd. The timestamps of the TsGroup are merged together and sorted.
Parameters
----------
*args
string, list, numpy.ndarray or pandas.Series
Examples
--------
>>> import pynapple as nap
>>> import numpy as np
>>> tsgroup = nap.TsGroup({0:nap.Ts(t=np.array([0, 1])), 5:nap.Ts(t=np.array([2, 3]))})
Index rate
------- ------
0 1
5 1
By default, the values of the Tsd is the index of the timestamp in the TsGroup:
>>> tsgroup.to_tsd()
Time (s)
0.0 0.0
1.0 0.0
2.0 5.0
3.0 5.0
dtype: float64
Values can be inherited from the metadata of the TsGroup by giving the key of the corresponding columns.
>>> tsgroup.set_info( phase=np.array([np.pi, 2*np.pi]) ) # assigning a phase to my 2 elements of the TsGroup
>>> tsgroup.to_tsd("phase")
Time (s)
0.0 3.141593
1.0 3.141593
2.0 6.283185
3.0 6.283185
dtype: float64
Values can also be passed directly to the function from a list, numpy.ndarray or pandas.Series of values as long as the length matches :
>>> tsgroup.to_tsd([-1, 1])
Time (s)
0.0 -1.0
1.0 -1.0
2.0 1.0
3.0 1.0
dtype: float64
The reverse operation can be done with the Tsd.to_tsgroup function :
>>> my_tsd
Time (s)
0.0 0.0
1.0 0.0
2.0 5.0
3.0 5.0
dtype: float64
>>> my_tsd.to_tsgroup()
Index rate
------- ------
0 1
5 1
Returns
-------
Tsd
Raises
------
RuntimeError
"Index are not equals" : if pandas.Series indexes don't match the TsGroup indexes
"Values is not the same length" : if numpy.ndarray/list object is not the same size as the TsGroup object
"Key not in metadata of TsGroup" : if string argument does not match any column names of the metadata,
"Unknown argument format" ; if argument is not a string, list, numpy.ndarray or pandas.Series
"""
if len(args):
if isinstance(args[0], pd.Series):
if pd.Index.equals(self._metadata.index, args[0].index):
_values = args[0].values.flatten()
else:
raise RuntimeError("Index are not equals")
elif isinstance(args[0], (np.ndarray, list)):
if len(self._metadata) == len(args[0]):
_values = np.array(args[0])
else:
raise RuntimeError("Values is not the same length.")
elif isinstance(args[0], str):
if args[0] in self._metadata.columns:
_values = self._metadata[args[0]].values
else:
raise RuntimeError(
"Key {} not in metadata of TsGroup".format(args[0])
)
else:
possible_keys = []
for k, d in self._metadata.dtypes.items():
if "int" in str(d) or "float" in str(d):
possible_keys.append(k)
raise RuntimeError(
"Unknown argument format. Must be pandas.Series, numpy.ndarray or a string from one of the following values : [{}]".format(
", ".join(possible_keys)
)
)
else:
_values = self.index
nt = 0
for n in self.index:
nt += len(self[n])
times = np.zeros(nt)
data = np.zeros(nt)
k = 0
for n, v in zip(self.index, _values):
kl = len(self[n])
times[k : k + kl] = self[n].index
data[k : k + kl] = v
k += kl
idx = np.argsort(times)
toreturn = Tsd(t=times[idx], d=data[idx], time_support=self.time_support)
return toreturn
[docs]
def get(self, start, end=None, time_units="s"):
"""Slice the `TsGroup` object from `start` to `end` such that all the timestamps within the group satisfy `start<=t<=end`.
If `end` is None, only the timepoint closest to `start` is returned.
By default, the time support doesn't change. If you want to change the time support, use the `restrict` function.
Parameters
----------
start : float or int
The start (or closest time point if `end` is None)
end : float or int or None
The end
"""
newgr = {}
for k in self.index:
newgr[k] = self.data[k].get(start, end, time_units)
cols = self._metadata.columns.drop("rate")
return TsGroup(
newgr,
time_support=self.time_support,
bypass_check=True,
**self._metadata[cols],
)
#################################
# Special slicing of metadata
#################################
[docs]
def getby_threshold(self, key, thr, op=">"):
"""
Return a TsGroup with all Ts/Tsd objects with values above threshold for metainfo under key.
Parameters
----------
key : str
One of the metainfo columns name
thr : float
THe value for thresholding
op : str, optional
The type of operation. Possibilities are '>', '<', '>=' or '<='.
Returns
-------
TsGroup
The new TsGroup
Raises
------
RuntimeError
Raise eror is operation is not recognized.
Examples
--------
>>> import pynapple as nap
>>> import numpy as np
>>> tmp = { 0:nap.Ts(t=np.arange(0,200), time_units='s'),
1:nap.Ts(t=np.arange(0,200,0.5), time_units='s'),
2:nap.Ts(t=np.arange(0,300,0.25), time_units='s'),
}
>>> tsgroup = nap.TsGroup(tmp)
Index Freq. (Hz)
------- ------------
0 1
1 2
2 4
This exemple shows how to get a new TsGroup with all elements for which the metainfo frequency is above 1.
>>> newtsgroup = tsgroup.getby_threshold('freq', 1, op = '>')
Index Freq. (Hz)
------- ------------
1 2
2 4
"""
if op == ">":
ix = list(self._metadata.index[self._metadata[key] > thr])
return self[ix]
elif op == "<":
ix = list(self._metadata.index[self._metadata[key] < thr])
return self[ix]
elif op == ">=":
ix = list(self._metadata.index[self._metadata[key] >= thr])
return self[ix]
elif op == "<=":
ix = list(self._metadata.index[self._metadata[key] <= thr])
return self[ix]
else:
raise RuntimeError("Operation {} not recognized.".format(op))
[docs]
def getby_intervals(self, key, bins):
"""
Return a list of TsGroup binned.
Parameters
----------
key : str
One of the metainfo columns name
bins : numpy.ndarray or list
The bin intervals
Returns
-------
list
A list of TsGroup
Examples
--------
>>> import pynapple as nap
>>> import numpy as np
>>> tmp = { 0:nap.Ts(t=np.arange(0,200), time_units='s'),
1:nap.Ts(t=np.arange(0,200,0.5), time_units='s'),
2:nap.Ts(t=np.arange(0,300,0.25), time_units='s'),
}
>>> tsgroup = nap.TsGroup(tmp, alpha = np.arange(3))
Index Freq. (Hz) alpha
------- ------------ -------
0 1 0
1 2 1
2 4 2
This exemple shows how to bin the TsGroup according to one metainfo key.
>>> newtsgroup, bincenter = tsgroup.getby_intervals('alpha', [0, 1, 2])
>>> newtsgroup
[ Index Freq. (Hz) alpha
------- ------------ -------
0 1 0,
Index Freq. (Hz) alpha
------- ------------ -------
1 2 1]
By default, the function returns the center of the bins.
>>> bincenter
array([0.5, 1.5])
"""
idx = np.digitize(self._metadata[key], bins) - 1
groups = self._metadata.index.groupby(idx)
ix = np.unique(list(groups.keys()))
ix = ix[ix >= 0]
ix = ix[ix < len(bins) - 1]
xb = bins[0:-1] + np.diff(bins) / 2
sliced = [self[list(groups[i])] for i in ix]
return sliced, xb[ix]
[docs]
def getby_category(self, key):
"""
Return a list of TsGroup grouped by category.
Parameters
----------
key : str
One of the metainfo columns name
Returns
-------
dict
A dictionary of TsGroup
Examples
--------
>>> import pynapple as nap
>>> import numpy as np
>>> tmp = { 0:nap.Ts(t=np.arange(0,200), time_units='s'),
1:nap.Ts(t=np.arange(0,200,0.5), time_units='s'),
2:nap.Ts(t=np.arange(0,300,0.25), time_units='s'),
}
>>> tsgroup = nap.TsGroup(tmp, group = [0,1,1])
Index Freq. (Hz) group
------- ------------ -------
0 1 0
1 2 1
2 4 1
This exemple shows how to group the TsGroup according to one metainfo key.
>>> newtsgroup = tsgroup.getby_category('group')
>>> newtsgroup
{0: Index Freq. (Hz) group
------- ------------ -------
0 1 0,
1: Index Freq. (Hz) group
------- ------------ -------
1 2 1
2 4 1}
"""
groups = self._metadata.groupby(key).groups
sliced = {k: self[list(groups[k])] for k in groups.keys()}
return sliced
[docs]
@staticmethod
def merge_group(
*tsgroups, reset_index=False, reset_time_support=False, ignore_metadata=False
):
"""
Merge multiple TsGroup objects into a single TsGroup object.
Parameters
----------
*tsgroups : TsGroup
The TsGroup objects to merge
reset_index : bool, optional
If True, the keys will be reset to range(len(data))
If False, the keys of the TsGroup objects should be non-overlapping and will be preserved
reset_time_support : bool, optional
If True, the merged TsGroup will merge time supports from all the Ts/Tsd objects in data
If False, the time support of the TsGroup objects should be the same and will be preserved
ignore_metadata : bool, optional
If True, the merged TsGroup will not have any metadata columns other than 'rate'
If False, all metadata columns should be the same and all metadata will be concatenated
Returns
-------
TsGroup
A TsGroup of merged objects
Raises
------
TypeError
If the input objects are not TsGroup objects
ValueError
If `ignore_metadata=False` but metadata columns are not the same
If `reset_index=False` but keys overlap
If `reset_time_support=False` but time supports are not the same
"""
is_tsgroup = [isinstance(tsg, TsGroup) for tsg in tsgroups]
if not all(is_tsgroup):
not_tsgroup_index = [i + 1 for i, boo in enumerate(is_tsgroup) if not boo]
raise TypeError(f"Input at positions {not_tsgroup_index} are not TsGroup!")
if len(tsgroups) == 1:
print("Only one TsGroup object provided, no merge needed.")
return tsgroups[0]
tsg1 = tsgroups[0]
items = tsg1.items()
keys = set(tsg1.keys())
metadata = tsg1._metadata
for i, tsg in enumerate(tsgroups[1:]):
if not ignore_metadata:
if tsg1.metadata_columns != tsg.metadata_columns:
raise ValueError(
f"TsGroup at position {i+2} has different metadata columns from previous TsGroup objects. "
"Set `ignore_metadata=True` to bypass the check."
)
metadata = pd.concat([metadata, tsg._metadata], axis=0)
if not reset_index:
key_overlap = keys.intersection(tsg.keys())
if key_overlap:
raise ValueError(
f"TsGroup at position {i+2} has overlapping keys {key_overlap} with previous TsGroup objects. "
"Set `reset_index=True` to bypass the check."
)
keys.update(tsg.keys())
if reset_time_support:
time_support = None
else:
if not np.allclose(
tsg1.time_support.as_units("s").to_numpy(),
tsg.time_support.as_units("s").to_numpy(),
atol=10 ** (-nap_config.time_index_precision),
rtol=0,
):
raise ValueError(
f"TsGroup at position {i+2} has different time support from previous TsGroup objects. "
"Set `reset_time_support=True` to bypass the check."
)
time_support = tsg1.time_support
items.extend(tsg.items())
if reset_index:
metadata.index = range(len(metadata))
data = {i: ts[1] for i, ts in enumerate(items)}
else:
data = dict(items)
if ignore_metadata:
return TsGroup(data, time_support=time_support, bypass_check=False)
else:
cols = metadata.columns.drop("rate")
return TsGroup(
data, time_support=time_support, bypass_check=False, **metadata[cols]
)
[docs]
def merge(
self,
*tsgroups,
reset_index=False,
reset_time_support=False,
ignore_metadata=False,
):
"""
Merge the TsGroup object with other TsGroup objects.
Common uses include adding more neurons/channels (supposing each Ts/Tsd corresponds to data from a neuron/channel) or adding more trials (supposing each Ts/Tsd corresponds to data from a trial).
Parameters
----------
*tsgroups : TsGroup
The TsGroup objects to merge with
reset_index : bool, optional
If True, the keys will be reset to range(len(data))
If False, the keys of the TsGroup objects should be non-overlapping and will be preserved
reset_time_support : bool, optional
If True, the merged TsGroup will merge time supports from all the Ts/Tsd objects in data
If False, the time support of the TsGroup objects should be the same and will be preserved
ignore_metadata : bool, optional
If True, the merged TsGroup will not have any metadata columns other than 'rate'
If False, all metadata columns should be the same and all metadata will be concatenated
Returns
-------
TsGroup
A TsGroup of merged objects
Raises
------
TypeError
If the input objects are not TsGroup objects
ValueError
If `ignore_metadata=False` but metadata columns are not the same
If `reset_index=False` but keys overlap
If `reset_time_support=False` but time supports are not the same
Examples
--------
>>> import pynapple as nap
>>> time_support_a = nap.IntervalSet(start=-1, end=1, time_units='s')
>>> time_support_b = nap.IntervalSet(start=-5, end=5, time_units='s')
>>> dict1 = {0: nap.Ts(t=[-1, 0, 1], time_units='s')}
>>> tsgroup1 = nap.TsGroup(dict1, time_support=time_support_a)
>>> dict2 = {10: nap.Ts(t=[-1, 0, 1], time_units='s')}
>>> tsgroup2 = nap.TsGroup(dict2, time_support=time_support_a)
>>> dict3 = {0: nap.Ts(t=[-.1, 0, .1], time_units='s')}
>>> tsgroup3 = nap.TsGroup(dict3, time_support=time_support_a)
>>> dict4 = {10: nap.Ts(t=[-1, 0, 1], time_units='s')}
>>> tsgroup4 = nap.TsGroup(dict2, time_support=time_support_b)
Merge with default options if have the same time support and non-overlapping indexes:
>>> tsgroup_12 = tsgroup1.merge(tsgroup2)
>>> tsgroup_12
Index rate
------- ------
0 1.5
10 1.5
Set `reset_index=True` if indexes are overlapping:
>>> tsgroup_13 = tsgroup1.merge(tsgroup3, reset_index=True)
>>> tsgroup_13
Index rate
------- ------
0 1.5
1 1.5
Set `reset_time_support=True` if time supports are different:
>>> tsgroup_14 = tsgroup1.merge(tsgroup4, reset_time_support=True)
>>> tsgroup_14
>>> tsgroup_14.time_support
Index rate
------- ------
0 0.3
10 0.3
start end
0 -5 5
shape: (1, 2), time unit: sec.
See Also
--------
[`TsGroup.merge_group`](./#pynapple.core.ts_group.TsGroup.merge_group)
"""
return TsGroup.merge_group(
self,
*tsgroups,
reset_index=reset_index,
reset_time_support=reset_time_support,
ignore_metadata=ignore_metadata,
)
[docs]
def save(self, filename):
"""
Save TsGroup object in npz format. The file will contain the timestamps,
the data (if group of Tsd), group index, the time support and the metadata
The main purpose of this function is to save small/medium sized TsGroup
objects.
The function will "flatten" the TsGroup by sorting all the timestamps
and assigning to each the corresponding index. Typically, a TsGroup like
this :
>>> TsGroup({
0 : Tsd(t=[0, 2, 4], d=[1, 2, 3])
1 : Tsd(t=[1, 5], d=[5, 6])})
will be saved as npz with the following keys:
>>> {
't' : [0, 1, 2, 4, 5],
'd' : [1, 5, 2, 3, 5],
'index' : [0, 1, 0, 0, 1],
'start' : [0],
'end' : [5],
'keys' : [0, 1],
'type' : 'TsGroup'
}
Metadata are saved by columns with the column name as the npz key. To avoid
potential conflicts, make sure the columns name of the metadata are different
from ['t', 'd', 'start', 'end', 'index', 'keys']
You can load the object with `nap.load_file`. Default keys are 't', 'd'(optional),
'start', 'end', 'index', 'keys' and 'type'.
See the example below.
Parameters
----------
filename : str
The filename
Examples
--------
>>> import pynapple as nap
>>> import numpy as np
>>> tsgroup = nap.TsGroup({
0 : nap.Ts(t=np.array([0.0, 2.0, 4.0])),
6 : nap.Ts(t=np.array([1.0, 5.0]))
},
group = np.array([0, 1]),
location = np.array(['right foot', 'left foot'])
)
>>> tsgroup
Index rate group location
------- ------ ------- ----------
0 0.6 0 right foot
6 0.4 1 left foot
>>> tsgroup.save("my_tsgroup.npz")
To get back to pynapple, you can use the `nap.load_file` function :
>>> tsgroup = nap.load_file("my_tsgroup.npz")
>>> tsgroup
Index rate group location
------- ------ ------- ----------
0 0.6 0 right foot
6 0.4 1 left foot
Raises
------
RuntimeError
If filename is not str, path does not exist or filename is a directory.
"""
filename = check_filename(filename)
dicttosave = {"type": np.array(["TsGroup"], dtype=np.str_)}
for k in self._metadata.columns:
if k not in ["t", "d", "start", "end", "index", "keys"]:
tmp = self._metadata[k].values
if tmp.dtype == np.dtype("O"):
tmp = tmp.astype(np.str_)
dicttosave[k] = tmp
# We can't use to_tsd here in case tsgroup contains Tsd and not only Ts.
nt = 0
for n in self.index:
nt += len(self[n])
times = np.zeros(nt)
data = np.full(nt, np.nan)
index = np.zeros(nt, dtype=np.int64)
k = 0
for n in self.index:
kl = len(self[n])
times[k : k + kl] = self[n].index
if isinstance(self[n], _BaseTsd):
data[k : k + kl] = self[n].values
index[k : k + kl] = int(n)
k += kl
idx = np.argsort(times)
times = times[idx]
index = index[idx]
dicttosave["t"] = times
dicttosave["index"] = index
if not np.all(np.isnan(data)):
dicttosave["d"] = data[idx]
dicttosave["keys"] = np.array(self.keys())
dicttosave["start"] = self.time_support.start
dicttosave["end"] = self.time_support.end
np.savez(filename, **dicttosave)
return
@classmethod
def _from_npz_reader(cls, file):
"""
Load a Tsd object from a npz file.
Parameters
----------
file : str
The opened npz file
Returns
-------
Tsd
The Tsd object
"""
times = file["t"]
index = file["index"]
has_data = "d" in file.keys()
time_support = IntervalSet(file["start"], file["end"])
if has_data:
data = file["data"]
if "keys" in file.keys():
keys = file["keys"]
else:
keys = np.unique(index)
group = {}
for key in keys:
filtering_index = index == key
t = times[filtering_index]
if has_data:
group[key] = Tsd(
t=t,
d=data[filtering_index],
time_support=time_support,
)
else:
group[key] = Ts(t=t, time_support=time_support)
tsgroup = cls(group, time_support=time_support, bypass_check=True)
metainfo = {}
not_info_keys = {"start", "end", "t", "index", "d", "rate", "keys"}
for k in set(file.keys()) - not_info_keys:
tmp = file[k]
if len(tmp) == len(tsgroup):
metainfo[k] = tmp
tsgroup.set_info(**metainfo)
return tsgroup