Skip to content

Base class

pynapple.core.base_class

Abstract class for core time series.

Base

Bases: ABC

Abstract base class for time series and timestamps objects. Implement most of the shared functions across concrete classes Ts, Tsd, TsdFrame, TsdTensor

Source code in pynapple/core/base_class.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
class Base(abc.ABC):
    """
    Abstract base class for time series and timestamps objects.
    Implement most of the shared functions across concrete classes `Ts`, `Tsd`, `TsdFrame`, `TsdTensor`
    """

    _initialized = False

    def __init__(self, t, time_units="s", time_support=None):
        if isinstance(t, TsIndex):
            self.index = t
        else:
            self.index = TsIndex(convert_to_numpy_array(t, "t"), time_units)

        if time_support is not None:
            assert isinstance(
                time_support, IntervalSet
            ), "time_support should be an IntervalSet"

        # Restrict should occur in the inherited class
        if len(self.index):
            if isinstance(time_support, IntervalSet):
                self.time_support = time_support
            else:
                self.time_support = IntervalSet(start=self.index[0], end=self.index[-1])

            self.rate = self.index.shape[0] / np.sum(
                self.time_support.values[:, 1] - self.time_support.values[:, 0]
            )
        else:
            self.rate = np.nan
            self.time_support = IntervalSet(start=[], end=[])

    @property
    def t(self):
        return self.index.values

    @property
    def start(self):
        return self.start_time()

    @property
    def end(self):
        return self.end_time()

    @property
    def shape(self):
        return self.index.shape

    def __repr__(self):
        return str(self.__class__)

    def __str__(self):
        return self.__repr__()

    def __len__(self):
        return len(self.index)

    def __setattr__(self, name, value):
        """Object is immutable"""
        if self._initialized:
            raise RuntimeError(
                "Changing directly attributes is not permitted for {}.".format(
                    self.nap_class
                )
            )
        else:
            object.__setattr__(self, name, value)

    @abc.abstractmethod
    def __getitem__(self, key, *args, **kwargs):
        """getter for time series"""
        pass

    def __setitem__(self, key, value):
        pass

    def times(self, units="s"):
        """
        The time index of the object, returned as np.double in the desired time units.

        Parameters
        ----------
        units : str, optional
            ('us', 'ms', 's' [default])

        Returns
        -------
        out: numpy.ndarray
            the time indexes
        """
        return self.index.in_units(units)

    def start_time(self, units="s"):
        """
        The first time index in the time series object

        Parameters
        ----------
        units : str, optional
            ('us', 'ms', 's' [default])

        Returns
        -------
        out: numpy.float64
            _
        """
        if len(self.index):
            return self.times(units=units)[0]
        else:
            return None

    def end_time(self, units="s"):
        """
        The last time index in the time series object

        Parameters
        ----------
        units : str, optional
            ('us', 'ms', 's' [default])

        Returns
        -------
        out: numpy.float64
            _
        """
        if len(self.index):
            return self.times(units=units)[-1]
        else:
            return None

    def value_from(self, data, ep=None):
        """
        Replace the value with the closest value from Tsd/TsdFrame/TsdTensor argument

        Parameters
        ----------
        data : Tsd, TsdFrame or TsdTensor
            The object holding the values to replace.
        ep : IntervalSet (optional)
            The IntervalSet object to restrict the operation.
            If None, the time support of the tsd input object is used.

        Returns
        -------
        out : Tsd, TsdFrame or TsdTensor
            Object with the new values

        Examples
        --------
        In this example, the ts object will receive the closest values in time from tsd.

        >>> import pynapple as nap
        >>> import numpy as np
        >>> t = np.unique(np.sort(np.random.randint(0, 1000, 100))) # random times
        >>> ts = nap.Ts(t=t, time_units='s')
        >>> tsd = nap.Tsd(t=np.arange(0,1000), d=np.random.rand(1000), time_units='s')
        >>> ep = nap.IntervalSet(start = 0, end = 500, time_units = 's')

        The variable ts is a timestamp object.
        The tsd object containing the values, for example the tracking data, and the epoch to restrict the operation.

        >>> newts = ts.value_from(tsd, ep)

        newts is the same size as ts restrict to ep.

        >>> print(len(ts.restrict(ep)), len(newts))
            52 52
        """
        if ep is None:
            ep = data.time_support
        time_array = self.index.values
        time_target_array = data.index.values
        data_target_array = data.values
        starts = ep.start
        ends = ep.end

        t, d = _value_from(
            time_array, time_target_array, data_target_array, starts, ends
        )

        time_support = IntervalSet(start=starts, end=ends)

        kwargs = {}
        if hasattr(data, "columns"):
            kwargs["columns"] = data.columns

        return t, d, time_support, kwargs

    def count(self, *args, dtype=None, **kwargs):
        """
        Count occurences of events within bin_size or within a set of bins defined as an IntervalSet.
        You can call this function in multiple ways :

        1. *tsd.count(bin_size=1, time_units = 'ms')*
        -> Count occurence of events within a 1 ms bin defined on the time support of the object.

        2. *tsd.count(1, ep=my_epochs)*
        -> Count occurent of events within a 1 second bin defined on the IntervalSet my_epochs.

        3. *tsd.count(ep=my_bins)*
        -> Count occurent of events within each epoch of the intervalSet object my_bins

        4. *tsd.count()*
        -> Count occurent of events within each epoch of the time support.

        bin_size should be seconds unless specified.
        If bin_size is used and no epochs is passed, the data will be binned based on the time support of the object.

        Parameters
        ----------
        bin_size : None or float, optional
            The bin size (default is second)
        ep : None or IntervalSet, optional
            IntervalSet to restrict the operation
        time_units : str, optional
            Time units of bin size ('us', 'ms', 's' [default])
        dtype: type, optional
            Data type for the count. Default is np.int64.

        Returns
        -------
        out: Tsd
            A Tsd object indexed by the center of the bins.

        Examples
        --------
        This example shows how to count events within bins of 0.1 second.

        >>> import pynapple as nap
        >>> import numpy as np
        >>> t = np.unique(np.sort(np.random.randint(0, 1000, 100)))
        >>> ts = nap.Ts(t=t, time_units='s')
        >>> bincount = ts.count(0.1)

        An epoch can be specified:

        >>> ep = nap.IntervalSet(start = 100, end = 800, time_units = 's')
        >>> bincount = ts.count(0.1, ep=ep)

        And bincount automatically inherit ep as time support:

        >>> bincount.time_support
            start    end
        0  100.0  800.0
        """
        bin_size = None
        if "bin_size" in kwargs:
            bin_size = kwargs["bin_size"]
            if isinstance(bin_size, int):
                bin_size = float(bin_size)
            if not isinstance(bin_size, float):
                raise ValueError("bin_size argument should be float.")
        else:
            for a in args:
                if isinstance(a, (float, int)):
                    bin_size = float(a)

        time_units = "s"
        if "time_units" in kwargs:
            time_units = kwargs["time_units"]
            if not isinstance(time_units, str):
                raise ValueError("time_units argument should be 's', 'ms' or 'us'.")
        else:
            for a in args:
                if isinstance(a, str) and a in ["s", "ms", "us"]:
                    time_units = a

        ep = self.time_support
        if "ep" in kwargs:
            ep = kwargs["ep"]
            if not isinstance(ep, IntervalSet):
                raise ValueError("ep argument should be IntervalSet")
        else:
            for a in args:
                if isinstance(a, IntervalSet):
                    ep = a

        if dtype is None:
            dtype = np.dtype(np.int64)
        else:
            try:
                dtype = np.dtype(dtype)
            except Exception:
                raise ValueError(f"{dtype} is not a valid numpy dtype.")

        starts = ep.start
        ends = ep.end

        if isinstance(bin_size, (float, int)):
            bin_size = TsIndex.format_timestamps(np.array([bin_size]), time_units)[0]

        time_array = self.index.values

        t, d = _count(time_array, starts, ends, bin_size, dtype=dtype)

        return t, d, ep

    def restrict(self, iset):
        """
        Restricts a time series object to a set of time intervals delimited by an IntervalSet object

        Parameters
        ----------
        iset : IntervalSet
            the IntervalSet object

        Returns
        -------
        Ts, Tsd, TsdFrame or TsdTensor
            Tsd object restricted to ep

        Examples
        --------
        The Ts object is restrict to the intervals defined by ep.

        >>> import pynapple as nap
        >>> import numpy as np
        >>> t = np.unique(np.sort(np.random.randint(0, 1000, 100)))
        >>> ts = nap.Ts(t=t, time_units='s')
        >>> ep = nap.IntervalSet(start=0, end=500, time_units='s')
        >>> newts = ts.restrict(ep)

        The time support of newts automatically inherit the epochs defined by ep.

        >>> newts.time_support
            start    end
        0    0.0  500.0

        """

        assert isinstance(iset, IntervalSet), "Argument should be IntervalSet"

        time_array = self.index.values
        starts = iset.start
        ends = iset.end

        idx = _restrict(time_array, starts, ends)

        kwargs = {}
        if hasattr(self, "columns"):
            kwargs["columns"] = self.columns

        if hasattr(self, "values"):
            data_array = self.values
            return self.__class__(
                t=time_array[idx], d=data_array[idx], time_support=iset, **kwargs
            )
        else:
            return self.__class__(t=time_array[idx], time_support=iset)

    def copy(self):
        """Copy the data, index and time support"""
        return self.__class__(t=self.index.copy(), time_support=self.time_support)

    def find_support(self, min_gap, time_units="s"):
        """
        find the smallest (to a min_gap resolution) IntervalSet containing all the times in the Tsd

        Parameters
        ----------
        min_gap : float or int
            minimal interval between timestamps
        time_units : str, optional
            Time units of min gap

        Returns
        -------
        IntervalSet
            Description
        """
        assert isinstance(min_gap, Number), "min_gap should be a float or int"
        min_gap = TsIndex.format_timestamps(np.array([min_gap]), time_units)[0]
        time_array = self.index.values

        starts = [time_array[0]]
        ends = []
        for i in range(len(time_array) - 1):
            if (time_array[i + 1] - time_array[i]) > min_gap:
                ends.append(time_array[i] + 1e-6)
                starts.append(time_array[i + 1])

        ends.append(time_array[-1] + 1e-6)

        return IntervalSet(start=starts, end=ends)

    def get(self, start, end=None, time_units="s"):
        """Slice the time series from `start` to `end` such that all the timestamps satisfy `start<=t<=end`.
        If `end` is None, only the timepoint closest to `start` is returned.

        By default, the time support doesn't change. If you want to change the time support, use the `restrict` function.

        Parameters
        ----------
        start : float or int
            The start (or closest time point if `end` is None)
        end : float or int or None
            The end
        """
        sl = self.get_slice(start, end, time_units)

        if end is None:
            sl = sl.start

        return self[sl]

    def get_slice(self, start, end=None, time_unit="s"):
        """
        Get a slice object from the time series data based on the start and end values such that all the timestamps satisfy `start<=t<=end`.
        If `end` is None, only the timepoint closest to `start` is returned.

        By default, the time support doesn't change. If you want to change the time support, use the `restrict` function.

        This function is equivalent of calling the `get` method.

        Parameters
        ----------
        start : int or float
            The starting value for the slice.
        end : int or float, optional
            The ending value for the slice. Defaults to None.
        time_unit : str, optional
            The time unit for the start and end values. Defaults to "s" (seconds).

        Returns
        -------
        slice : slice
            A slice determining the start and end indices, with unit step
            Slicing the array will be equivalent to calling get: `ts[s].t == ts.get(start, end).t` with `s` being the slice object.


        Raises
        ------
        ValueError
            - If start or end is not a number.
            - If start is greater than end.

        Examples
        --------
        >>> import pynapple as nap

        >>> ts = nap.Ts(t = [0, 1, 2, 3])

        >>> # slice over a range
        >>> start, end = 1.2, 2.6
        >>> print(ts.get_slice(start, end))  # returns `slice(2, 3, None)`
        >>> start, end = 1., 2.
        >>> print(ts.get_slice(start, end, mode="forward"))  # returns `slice(1, 3, None)`

        >>> # slice a single value
        >>> start = 1.2
        >>> print(ts.get_slice(start))  # returns `slice(1, 2, None)`
        >>> start = 2.
        >>> print(ts.get_slice(start)) # returns `slice(2, 3, None)`
        """
        mode = "closest_t" if end is None else "restrict"
        return self._get_slice(
            start, end=end, mode=mode, n_points=None, time_unit=time_unit
        )

    def _get_slice(
        self, start, end=None, mode="closest_t", n_points=None, time_unit="s"
    ):
        """
        Get a slice from the time series data based on the start and end values with the specified mode.

        For a given time t, mode `before_t` means you want the timepoint right before t to start the slice.
        Mode `after_t` means you want the timepoint right after t to start the slice.

        Parameters
        ----------
        start : int or float
            The starting value for the slice.
        end : int or float, optional
            The ending value for the slice. Defaults to None.
        mode : str, optional
            The mode for slicing. Can be "after_t", "before_t", "restrict", or "closest_t". Defaults to "closest_t".
        time_unit : str, optional
            The time unit for the start and end values. Defaults to "s" (seconds).
        n_points : int, optional
            Number of time point that will result from applying the slice. This parameter is used to
            calculate a step size for the slice.

        Returns
        -------
        slice : slice
            If end is not provided:
                - For mode == "before_t":
                    - An empty slice for start < self.t[0]
                    - slice(idx, idx+1) with self.t[idx] <= start < self.t[idx+1]
                - For mode == "after_t":
                    - An empty slice for start >= self.t[-1]
                    - slice(idx, idx+1) with self.t[idx-1] < start <= self.t[idx]
                - For mode == "closest_t":
                    - slice(idx, idx+1) with the closest index to start
                - For mode == "restrict":
                    - slice the indices such that start <= self.t[idx] <= end
            If end is provided:
                - For mode == "before_t":
                    - An empty slice if end < self.t[0]
                    - slice(idx_start, idx_end) with self.t[idx_start] <= start < self.t[idx_start+1] and
                    self.t[idx_end] <= end < self.t[idx_end+1]
                - For mode == "after_t":
                    - An empty slice if start > self.t[-1]
                     - slice(idx_start, idx_end) with self.t[idx_start-1] <= start < self.t[idx_start] and
                    self.t[idx_end-1] <= end < self.t[idx_end]
                - For mode == "closest":
                    - slice(idx_start, idx_end) with the closest indices to start and end
                - For mode == "restrict":
                    - An empty slice if start > self.t[-1] or end < self.t[0]
                    - slice(idx_start, idx_end) with self.t[idx_start] <= start <= self.t[idx_start+1] and
                    self.t[idx_end] <= end <= self.t[idx_end+1]

        Raises
        ------
        ValueError
            - If start or end is not a number.
            - If start is greater than end.

        """
        if not isinstance(start, Number):
            raise ValueError(
                f"'start' must be an int or a float. Type {type(start)} provided instead!"
            )

        if n_points is not None and not isinstance(n_points, int):
            raise TypeError(
                f"'n_points' must be of type int or None. Type {type(n_points)} provided instead!"
            )

        if end is None and n_points:
            raise ValueError("'n_points' can be used only when 'end' is specified!")

        if mode not in ["before_t", "after_t", "closest_t", "restrict"]:
            raise ValueError(
                "'mode' only accepts 'before_t', 'after_t', 'closest_t' or 'restrict'."
            )

        if mode == "restrict" and n_points:
            raise ValueError(
                "Fixing the number of time points is incompatible with 'restrict' mode."
            )

        # convert and get index for start
        start = TsIndex.format_timestamps(np.array([start]), time_unit)[0]

        # check end
        if end is not None and not isinstance(end, Number):
            raise ValueError(
                f"'end' must be an int or a float. Type {type(end)} provided instead!"
            )

        # get index of preceding time value
        idx_start = np.searchsorted(self.t, start, side="left")
        if idx_start == len(self.t) and mode != "restrict":
            idx_start -= 1  # make sure the index is not out of bound

        if mode == "before_t":
            # in order to get the index preceding start
            # subtract one except if self.t[idx_start] is exactly equal to start
            idx_start -= self.t[idx_start] > start
        elif mode == "closest_t":
            # subtract 1 if start is closer to the previous index
            di = self.t[idx_start] - start > np.abs(self.t[idx_start - 1] - start)
            idx_start -= di

        if end is None:
            if idx_start < 0:  # happens only on backwards if start < self.t[0]
                return slice(0, 0)
            elif (
                idx_start == len(self.t) - 1 and mode == "after_t"
            ):  # happens only on forward if start >= self.t[-1]
                return slice(idx_start, idx_start)
            return slice(idx_start, idx_start + 1)
        else:
            idx_start = max([0, idx_start])  # if taking a range set slice index to 0

        # convert and get index for end
        end = TsIndex.format_timestamps(np.array([end]), time_unit)[0]
        if start > end:
            raise ValueError("'start' should not precede 'end'.")

        idx_end = np.searchsorted(self.t, end, side="left")
        add_if_forward = 0
        if idx_end == len(self.t):
            idx_end -= 1  # make sure the index is not out of bound
            add_if_forward = 1  # add back the index if forward

        if mode == "before_t":
            # remove 1 if self.t[idx_end] is larger than end, except if idx_end is 0
            idx_end -= (self.t[idx_end] > end) - int(idx_end == 0)
        elif mode == "closest_t":
            # subtract 1 if end is closer to self.t[idx_end - 1]
            di = self.t[idx_end] - end > np.abs(self.t[idx_end - 1] - end)
            idx_end -= di
        elif mode == "after_t" and idx_end == len(self.t) - 1:
            idx_end += add_if_forward  # add one if idx_start < len(self.t)
        elif mode == "restrict":
            idx_end += int(self.t[idx_end] <= end)

        step = None
        if n_points:
            tot_tps = idx_end - idx_start
            if tot_tps > n_points:
                rounding = tot_tps % n_points
                step = tot_tps // n_points
                idx_end -= rounding

        return slice(idx_start, idx_end, step)

    def _get_filename(self, filename):
        """Check if the filename is valid and return the path

        Parameters
        ----------
        filename : str or Path
            The filename

        Returns
        -------
        Path
            The path to the file

        Raises
        ------
        RuntimeError
            If the filename is a directory or the parent does not exist
        """

        return check_filename(filename)

    @classmethod
    def _from_npz_reader(cls, file):
        """Load a time series object from a npz file interface.

        Parameters
        ----------
        file : NPZFile object
            opened npz file interface.

        Returns
        -------
        out : Ts or Tsd or TsdFrame or TsdTensor
            The time series object
        """
        kwargs = {
            key: file[key] for key in file.keys() if key not in ["start", "end", "type"]
        }
        iset = IntervalSet(start=file["start"], end=file["end"])
        return cls(time_support=iset, **kwargs)

__setattr__

__setattr__(name, value)

Object is immutable

Source code in pynapple/core/base_class.py
def __setattr__(self, name, value):
    """Object is immutable"""
    if self._initialized:
        raise RuntimeError(
            "Changing directly attributes is not permitted for {}.".format(
                self.nap_class
            )
        )
    else:
        object.__setattr__(self, name, value)

__getitem__ abstractmethod

__getitem__(key, *args, **kwargs)

getter for time series

Source code in pynapple/core/base_class.py
@abc.abstractmethod
def __getitem__(self, key, *args, **kwargs):
    """getter for time series"""
    pass

times

times(units='s')

The time index of the object, returned as np.double in the desired time units.

Parameters:

Name Type Description Default
units str

('us', 'ms', 's' [default])

's'

Returns:

Name Type Description
out ndarray

the time indexes

Source code in pynapple/core/base_class.py
def times(self, units="s"):
    """
    The time index of the object, returned as np.double in the desired time units.

    Parameters
    ----------
    units : str, optional
        ('us', 'ms', 's' [default])

    Returns
    -------
    out: numpy.ndarray
        the time indexes
    """
    return self.index.in_units(units)

start_time

start_time(units='s')

The first time index in the time series object

Parameters:

Name Type Description Default
units str

('us', 'ms', 's' [default])

's'

Returns:

Name Type Description
out float64

_

Source code in pynapple/core/base_class.py
def start_time(self, units="s"):
    """
    The first time index in the time series object

    Parameters
    ----------
    units : str, optional
        ('us', 'ms', 's' [default])

    Returns
    -------
    out: numpy.float64
        _
    """
    if len(self.index):
        return self.times(units=units)[0]
    else:
        return None

end_time

end_time(units='s')

The last time index in the time series object

Parameters:

Name Type Description Default
units str

('us', 'ms', 's' [default])

's'

Returns:

Name Type Description
out float64

_

Source code in pynapple/core/base_class.py
def end_time(self, units="s"):
    """
    The last time index in the time series object

    Parameters
    ----------
    units : str, optional
        ('us', 'ms', 's' [default])

    Returns
    -------
    out: numpy.float64
        _
    """
    if len(self.index):
        return self.times(units=units)[-1]
    else:
        return None

value_from

value_from(data, ep=None)

Replace the value with the closest value from Tsd/TsdFrame/TsdTensor argument

Parameters:

Name Type Description Default
data (Tsd, TsdFrame or TsdTensor)

The object holding the values to replace.

required
ep IntervalSet(optional)

The IntervalSet object to restrict the operation. If None, the time support of the tsd input object is used.

None

Returns:

Name Type Description
out (Tsd, TsdFrame or TsdTensor)

Object with the new values

Examples:

In this example, the ts object will receive the closest values in time from tsd.

>>> import pynapple as nap
>>> import numpy as np
>>> t = np.unique(np.sort(np.random.randint(0, 1000, 100))) # random times
>>> ts = nap.Ts(t=t, time_units='s')
>>> tsd = nap.Tsd(t=np.arange(0,1000), d=np.random.rand(1000), time_units='s')
>>> ep = nap.IntervalSet(start = 0, end = 500, time_units = 's')

The variable ts is a timestamp object. The tsd object containing the values, for example the tracking data, and the epoch to restrict the operation.

>>> newts = ts.value_from(tsd, ep)

newts is the same size as ts restrict to ep.

>>> print(len(ts.restrict(ep)), len(newts))
    52 52
Source code in pynapple/core/base_class.py
def value_from(self, data, ep=None):
    """
    Replace the value with the closest value from Tsd/TsdFrame/TsdTensor argument

    Parameters
    ----------
    data : Tsd, TsdFrame or TsdTensor
        The object holding the values to replace.
    ep : IntervalSet (optional)
        The IntervalSet object to restrict the operation.
        If None, the time support of the tsd input object is used.

    Returns
    -------
    out : Tsd, TsdFrame or TsdTensor
        Object with the new values

    Examples
    --------
    In this example, the ts object will receive the closest values in time from tsd.

    >>> import pynapple as nap
    >>> import numpy as np
    >>> t = np.unique(np.sort(np.random.randint(0, 1000, 100))) # random times
    >>> ts = nap.Ts(t=t, time_units='s')
    >>> tsd = nap.Tsd(t=np.arange(0,1000), d=np.random.rand(1000), time_units='s')
    >>> ep = nap.IntervalSet(start = 0, end = 500, time_units = 's')

    The variable ts is a timestamp object.
    The tsd object containing the values, for example the tracking data, and the epoch to restrict the operation.

    >>> newts = ts.value_from(tsd, ep)

    newts is the same size as ts restrict to ep.

    >>> print(len(ts.restrict(ep)), len(newts))
        52 52
    """
    if ep is None:
        ep = data.time_support
    time_array = self.index.values
    time_target_array = data.index.values
    data_target_array = data.values
    starts = ep.start
    ends = ep.end

    t, d = _value_from(
        time_array, time_target_array, data_target_array, starts, ends
    )

    time_support = IntervalSet(start=starts, end=ends)

    kwargs = {}
    if hasattr(data, "columns"):
        kwargs["columns"] = data.columns

    return t, d, time_support, kwargs

count

count(*args, dtype=None, **kwargs)

Count occurences of events within bin_size or within a set of bins defined as an IntervalSet. You can call this function in multiple ways :

  1. tsd.count(bin_size=1, time_units = 'ms') -> Count occurence of events within a 1 ms bin defined on the time support of the object.

  2. tsd.count(1, ep=my_epochs) -> Count occurent of events within a 1 second bin defined on the IntervalSet my_epochs.

  3. tsd.count(ep=my_bins) -> Count occurent of events within each epoch of the intervalSet object my_bins

  4. tsd.count() -> Count occurent of events within each epoch of the time support.

bin_size should be seconds unless specified. If bin_size is used and no epochs is passed, the data will be binned based on the time support of the object.

Parameters:

Name Type Description Default
bin_size None or float

The bin size (default is second)

required
ep None or IntervalSet

IntervalSet to restrict the operation

required
time_units str

Time units of bin size ('us', 'ms', 's' [default])

required
dtype

Data type for the count. Default is np.int64.

None

Returns:

Name Type Description
out Tsd

A Tsd object indexed by the center of the bins.

Examples:

This example shows how to count events within bins of 0.1 second.

>>> import pynapple as nap
>>> import numpy as np
>>> t = np.unique(np.sort(np.random.randint(0, 1000, 100)))
>>> ts = nap.Ts(t=t, time_units='s')
>>> bincount = ts.count(0.1)

An epoch can be specified:

>>> ep = nap.IntervalSet(start = 100, end = 800, time_units = 's')
>>> bincount = ts.count(0.1, ep=ep)

And bincount automatically inherit ep as time support:

>>> bincount.time_support
    start    end
0  100.0  800.0
Source code in pynapple/core/base_class.py
def count(self, *args, dtype=None, **kwargs):
    """
    Count occurences of events within bin_size or within a set of bins defined as an IntervalSet.
    You can call this function in multiple ways :

    1. *tsd.count(bin_size=1, time_units = 'ms')*
    -> Count occurence of events within a 1 ms bin defined on the time support of the object.

    2. *tsd.count(1, ep=my_epochs)*
    -> Count occurent of events within a 1 second bin defined on the IntervalSet my_epochs.

    3. *tsd.count(ep=my_bins)*
    -> Count occurent of events within each epoch of the intervalSet object my_bins

    4. *tsd.count()*
    -> Count occurent of events within each epoch of the time support.

    bin_size should be seconds unless specified.
    If bin_size is used and no epochs is passed, the data will be binned based on the time support of the object.

    Parameters
    ----------
    bin_size : None or float, optional
        The bin size (default is second)
    ep : None or IntervalSet, optional
        IntervalSet to restrict the operation
    time_units : str, optional
        Time units of bin size ('us', 'ms', 's' [default])
    dtype: type, optional
        Data type for the count. Default is np.int64.

    Returns
    -------
    out: Tsd
        A Tsd object indexed by the center of the bins.

    Examples
    --------
    This example shows how to count events within bins of 0.1 second.

    >>> import pynapple as nap
    >>> import numpy as np
    >>> t = np.unique(np.sort(np.random.randint(0, 1000, 100)))
    >>> ts = nap.Ts(t=t, time_units='s')
    >>> bincount = ts.count(0.1)

    An epoch can be specified:

    >>> ep = nap.IntervalSet(start = 100, end = 800, time_units = 's')
    >>> bincount = ts.count(0.1, ep=ep)

    And bincount automatically inherit ep as time support:

    >>> bincount.time_support
        start    end
    0  100.0  800.0
    """
    bin_size = None
    if "bin_size" in kwargs:
        bin_size = kwargs["bin_size"]
        if isinstance(bin_size, int):
            bin_size = float(bin_size)
        if not isinstance(bin_size, float):
            raise ValueError("bin_size argument should be float.")
    else:
        for a in args:
            if isinstance(a, (float, int)):
                bin_size = float(a)

    time_units = "s"
    if "time_units" in kwargs:
        time_units = kwargs["time_units"]
        if not isinstance(time_units, str):
            raise ValueError("time_units argument should be 's', 'ms' or 'us'.")
    else:
        for a in args:
            if isinstance(a, str) and a in ["s", "ms", "us"]:
                time_units = a

    ep = self.time_support
    if "ep" in kwargs:
        ep = kwargs["ep"]
        if not isinstance(ep, IntervalSet):
            raise ValueError("ep argument should be IntervalSet")
    else:
        for a in args:
            if isinstance(a, IntervalSet):
                ep = a

    if dtype is None:
        dtype = np.dtype(np.int64)
    else:
        try:
            dtype = np.dtype(dtype)
        except Exception:
            raise ValueError(f"{dtype} is not a valid numpy dtype.")

    starts = ep.start
    ends = ep.end

    if isinstance(bin_size, (float, int)):
        bin_size = TsIndex.format_timestamps(np.array([bin_size]), time_units)[0]

    time_array = self.index.values

    t, d = _count(time_array, starts, ends, bin_size, dtype=dtype)

    return t, d, ep

restrict

restrict(iset)

Restricts a time series object to a set of time intervals delimited by an IntervalSet object

Parameters:

Name Type Description Default
iset IntervalSet

the IntervalSet object

required

Returns:

Type Description
(Ts, Tsd, TsdFrame or TsdTensor)

Tsd object restricted to ep

Examples:

The Ts object is restrict to the intervals defined by ep.

>>> import pynapple as nap
>>> import numpy as np
>>> t = np.unique(np.sort(np.random.randint(0, 1000, 100)))
>>> ts = nap.Ts(t=t, time_units='s')
>>> ep = nap.IntervalSet(start=0, end=500, time_units='s')
>>> newts = ts.restrict(ep)

The time support of newts automatically inherit the epochs defined by ep.

>>> newts.time_support
    start    end
0    0.0  500.0
Source code in pynapple/core/base_class.py
def restrict(self, iset):
    """
    Restricts a time series object to a set of time intervals delimited by an IntervalSet object

    Parameters
    ----------
    iset : IntervalSet
        the IntervalSet object

    Returns
    -------
    Ts, Tsd, TsdFrame or TsdTensor
        Tsd object restricted to ep

    Examples
    --------
    The Ts object is restrict to the intervals defined by ep.

    >>> import pynapple as nap
    >>> import numpy as np
    >>> t = np.unique(np.sort(np.random.randint(0, 1000, 100)))
    >>> ts = nap.Ts(t=t, time_units='s')
    >>> ep = nap.IntervalSet(start=0, end=500, time_units='s')
    >>> newts = ts.restrict(ep)

    The time support of newts automatically inherit the epochs defined by ep.

    >>> newts.time_support
        start    end
    0    0.0  500.0

    """

    assert isinstance(iset, IntervalSet), "Argument should be IntervalSet"

    time_array = self.index.values
    starts = iset.start
    ends = iset.end

    idx = _restrict(time_array, starts, ends)

    kwargs = {}
    if hasattr(self, "columns"):
        kwargs["columns"] = self.columns

    if hasattr(self, "values"):
        data_array = self.values
        return self.__class__(
            t=time_array[idx], d=data_array[idx], time_support=iset, **kwargs
        )
    else:
        return self.__class__(t=time_array[idx], time_support=iset)

copy

copy()

Copy the data, index and time support

Source code in pynapple/core/base_class.py
def copy(self):
    """Copy the data, index and time support"""
    return self.__class__(t=self.index.copy(), time_support=self.time_support)

find_support

find_support(min_gap, time_units='s')

find the smallest (to a min_gap resolution) IntervalSet containing all the times in the Tsd

Parameters:

Name Type Description Default
min_gap float or int

minimal interval between timestamps

required
time_units str

Time units of min gap

's'

Returns:

Type Description
IntervalSet

Description

Source code in pynapple/core/base_class.py
def find_support(self, min_gap, time_units="s"):
    """
    find the smallest (to a min_gap resolution) IntervalSet containing all the times in the Tsd

    Parameters
    ----------
    min_gap : float or int
        minimal interval between timestamps
    time_units : str, optional
        Time units of min gap

    Returns
    -------
    IntervalSet
        Description
    """
    assert isinstance(min_gap, Number), "min_gap should be a float or int"
    min_gap = TsIndex.format_timestamps(np.array([min_gap]), time_units)[0]
    time_array = self.index.values

    starts = [time_array[0]]
    ends = []
    for i in range(len(time_array) - 1):
        if (time_array[i + 1] - time_array[i]) > min_gap:
            ends.append(time_array[i] + 1e-6)
            starts.append(time_array[i + 1])

    ends.append(time_array[-1] + 1e-6)

    return IntervalSet(start=starts, end=ends)

get

get(start, end=None, time_units='s')

Slice the time series from start to end such that all the timestamps satisfy start<=t<=end. If end is None, only the timepoint closest to start is returned.

By default, the time support doesn't change. If you want to change the time support, use the restrict function.

Parameters:

Name Type Description Default
start float or int

The start (or closest time point if end is None)

required
end float or int or None

The end

None
Source code in pynapple/core/base_class.py
def get(self, start, end=None, time_units="s"):
    """Slice the time series from `start` to `end` such that all the timestamps satisfy `start<=t<=end`.
    If `end` is None, only the timepoint closest to `start` is returned.

    By default, the time support doesn't change. If you want to change the time support, use the `restrict` function.

    Parameters
    ----------
    start : float or int
        The start (or closest time point if `end` is None)
    end : float or int or None
        The end
    """
    sl = self.get_slice(start, end, time_units)

    if end is None:
        sl = sl.start

    return self[sl]

get_slice

get_slice(start, end=None, time_unit='s')

Get a slice object from the time series data based on the start and end values such that all the timestamps satisfy start<=t<=end. If end is None, only the timepoint closest to start is returned.

By default, the time support doesn't change. If you want to change the time support, use the restrict function.

This function is equivalent of calling the get method.

Parameters:

Name Type Description Default
start int or float

The starting value for the slice.

required
end int or float

The ending value for the slice. Defaults to None.

None
time_unit str

The time unit for the start and end values. Defaults to "s" (seconds).

's'

Returns:

Name Type Description
slice slice

A slice determining the start and end indices, with unit step Slicing the array will be equivalent to calling get: ts[s].t == ts.get(start, end).t with s being the slice object.

Raises:

Type Description
ValueError
  • If start or end is not a number.
  • If start is greater than end.

Examples:

>>> import pynapple as nap
>>> ts = nap.Ts(t = [0, 1, 2, 3])
>>> # slice over a range
>>> start, end = 1.2, 2.6
>>> print(ts.get_slice(start, end))  # returns `slice(2, 3, None)`
>>> start, end = 1., 2.
>>> print(ts.get_slice(start, end, mode="forward"))  # returns `slice(1, 3, None)`
>>> # slice a single value
>>> start = 1.2
>>> print(ts.get_slice(start))  # returns `slice(1, 2, None)`
>>> start = 2.
>>> print(ts.get_slice(start)) # returns `slice(2, 3, None)`
Source code in pynapple/core/base_class.py
def get_slice(self, start, end=None, time_unit="s"):
    """
    Get a slice object from the time series data based on the start and end values such that all the timestamps satisfy `start<=t<=end`.
    If `end` is None, only the timepoint closest to `start` is returned.

    By default, the time support doesn't change. If you want to change the time support, use the `restrict` function.

    This function is equivalent of calling the `get` method.

    Parameters
    ----------
    start : int or float
        The starting value for the slice.
    end : int or float, optional
        The ending value for the slice. Defaults to None.
    time_unit : str, optional
        The time unit for the start and end values. Defaults to "s" (seconds).

    Returns
    -------
    slice : slice
        A slice determining the start and end indices, with unit step
        Slicing the array will be equivalent to calling get: `ts[s].t == ts.get(start, end).t` with `s` being the slice object.


    Raises
    ------
    ValueError
        - If start or end is not a number.
        - If start is greater than end.

    Examples
    --------
    >>> import pynapple as nap

    >>> ts = nap.Ts(t = [0, 1, 2, 3])

    >>> # slice over a range
    >>> start, end = 1.2, 2.6
    >>> print(ts.get_slice(start, end))  # returns `slice(2, 3, None)`
    >>> start, end = 1., 2.
    >>> print(ts.get_slice(start, end, mode="forward"))  # returns `slice(1, 3, None)`

    >>> # slice a single value
    >>> start = 1.2
    >>> print(ts.get_slice(start))  # returns `slice(1, 2, None)`
    >>> start = 2.
    >>> print(ts.get_slice(start)) # returns `slice(2, 3, None)`
    """
    mode = "closest_t" if end is None else "restrict"
    return self._get_slice(
        start, end=end, mode=mode, n_points=None, time_unit=time_unit
    )