"""
Functions to compute phases and envelopes
"""
import numbers
import numpy as np
import pynapple as nap
[docs]
def compute_hilbert_envelope(data):
"""
Compute the Hilbert envelope of a time-series.
This function computes the envelope of the signal, which is the magnitude of the analytic
signal obtained by applying the Hilbert transform. The envelope provides a smooth
representation of the amplitude modulation of the signal.
Parameters
----------
data : Tsd, TsdFrame
The time-series data to compute the Hilbert envelope for.
Returns
-------
Tsd, TsdFrame
The Hilbert envelope.
Examples
--------
>>> import numpy as np
>>> import pynapple as nap
>>> times = np.arange(0, 20, 0.1)
>>> data = nap.Tsd(d=np.sin(times), t=times)
>>> envelope = nap.compute_hilbert_envelope(data)
>>> envelope
Time (s)
---------- --------
0.0 0.168638
0.1 0.435858
0.2 0.444004
0.3 0.632753
0.4 0.64262
0.5 0.7477
0.6 0.759316
...
19.3 0.953938
19.4 0.985048
19.5 0.922744
19.6 0.958683
19.7 0.87651
19.8 0.919777
19.9 0.885858
dtype: float64, shape: (200,)
Can be used for multiple signals in a `TsdFrame`:
>>> data = nap.TsdFrame(d=np.stack([np.sin(times), np.cos(times)], axis=1), t=times)
>>> envelopes = nap.compute_hilbert_envelope(data)
>>> envelopes
Time (s) 0 1
---------- -------- --------
0.0 0.168638 1.00596
0.1 0.435858 1.02622
0.2 0.444004 1.02272
0.3 0.632753 1.05857
0.4 0.64262 1.05261
0.5 0.7477 1.0822
0.6 0.759316 1.07408
...
19.3 0.953938 0.905652
19.4 0.985048 0.871714
19.5 0.922744 0.818226
19.6 0.958683 0.775808
19.7 0.87651 0.689073
19.8 0.919777 0.630159
19.9 0.885858 0.505618
dtype: float64, shape: (200, 2)
"""
analytic_signal = apply_hilbert_transform(data)
return np.abs(analytic_signal)
[docs]
def compute_hilbert_phase(data):
"""
Compute the Hilbert phase of a time-series.
This function computes the instantaneous phase of the signal using the Hilbert transform.
The phase is unwrapped to provide a continuous representation, and it is then wrapped to
ensure it stays within the range [0, 2π].
Parameters
----------
data : Tsd, TsdFrame
The time-series data to compute the Hilbert phase for.
Returns
-------
Tsd, TsdFrame
The instantaneous phase of the signal, with values wrapped between [0, 2π].
Examples
--------
>>> import numpy as np
>>> import pynapple as nap
>>> times = np.arange(0, 20, 0.1)
>>> data = nap.Tsd(d=np.sin(times), t=times)
>>> phase = nap.compute_hilbert_envelope(data)
>>> phase
Time (s)
---------- --------
0.0 0.168638
0.1 0.435858
0.2 0.444004
0.3 0.632753
0.4 0.64262
0.5 0.7477
0.6 0.759316
...
19.3 0.953938
19.4 0.985048
19.5 0.922744
19.6 0.958683
19.7 0.87651
19.8 0.919777
19.9 0.885858
dtype: float64, shape: (200,)
Can be used for multiple signals in a `TsdFrame`:
>>> data = nap.TsdFrame(d=np.stack([np.sin(times), np.cos(times)], axis=1), t=times)
>>> phases = nap.compute_hilbert_envelope(data)
>>> phases
Time (s) 0 1
---------- -------- --------
0.0 0.168638 1.00596
0.1 0.435858 1.02622
0.2 0.444004 1.02272
0.3 0.632753 1.05857
0.4 0.64262 1.05261
0.5 0.7477 1.0822
0.6 0.759316 1.07408
...
19.3 0.953938 0.905652
19.4 0.985048 0.871714
19.5 0.922744 0.818226
19.6 0.958683 0.775808
19.7 0.87651 0.689073
19.8 0.919777 0.630159
19.9 0.885858 0.505618
dtype: float64, shape: (200, 2)
"""
analytic_signal = apply_hilbert_transform(data)
phase = np.angle(analytic_signal)
phase = np.mod(np.unwrap(phase), 2 * np.pi)
return phase
[docs]
def detect_oscillatory_events(
data,
epochs,
frequency_band,
threshold_band,
duration_band,
min_interval,
fs=None,
sliding_window_size=51,
):
"""
Function for detecting oscillatory events such as ripples, spindles, and more.
Parameters
----------
data : Tsd
One-dimensional time series.
epochs : IntervalSet
The epochs for restricting the detection.
frequency_band : tuple
The lower and upper frequency to bandpass filter the signal.
threshold_band : tuple
The lower and upper threshold applied to the normalized envelope of the
filtered signal.
duration_band : tuple
The minimal and maximal duration of an event (in seconds).
min_interval : float
The minimum duration between two events (in seconds).
If shorter, the two events are merged.
fs : float, optional
The sampling frequency of the signal in Hz.
If not provided, it will be inferred from the time axis of the data.
sliding_window_size : int, optional
The size of the smoothing window.
Returns
-------
IntervalSet
The interval set of detected events with metadata containing
the power, amplitude, and peak_time
"""
import warnings
if not isinstance(data, nap.Tsd):
raise TypeError(f"`data` must be `Tsd`, got {type(data)}")
if not isinstance(epochs, nap.IntervalSet):
raise TypeError(f"`epochs` must be `IntervalSet`, got {type(epochs)}")
def _check_tuple(name, val):
if not isinstance(val, tuple):
raise TypeError(f"`{name}` must be a tuple, got {type(val)}")
if len(val) != 2:
raise ValueError(f"`{name}` must have length 2, got {len(val)}")
if not all(isinstance(x, numbers.Real) for x in val):
raise TypeError(f"`{name}` must contain numeric values")
if val[0] >= val[1]:
raise ValueError(f"`{name}` must be (min, max) with min < max")
_check_tuple("frequency_band", frequency_band)
_check_tuple("threshold_band", threshold_band)
_check_tuple("duration_band", duration_band)
if not isinstance(min_interval, numbers.Real):
raise TypeError("`min_interval` must be a number")
if min_interval < 0:
raise ValueError("`min_interval` must be >= 0")
if fs is not None:
if not isinstance(fs, numbers.Real):
raise TypeError("`fs` must be a number or None")
if fs <= 0:
raise ValueError("`fs` must be > 0")
else:
fs = data.rate
if not isinstance(sliding_window_size, int):
raise TypeError("`sliding_window_size` must be an integer")
if sliding_window_size <= 0:
raise ValueError("`sliding_window_size` must be > 0")
data = data.restrict(epochs)
# Frequency filter
filtered = nap.apply_bandpass_filter(data, frequency_band, fs)
# Compute envelope
envelope = nap.compute_hilbert_envelope(filtered)
# Smooth
window = np.ones(sliding_window_size) / sliding_window_size
smoothed = envelope.convolve(window)
# Z-score
zscored_smoothed = (smoothed - smoothed.mean()) / smoothed.std()
# Detect oscillation periods by thresholding normalized signal
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message="Some epochs have no duration",
category=UserWarning,
)
zscored_smoothed_above = zscored_smoothed.threshold(
threshold_band[0], method="above"
)
zscored_smoothed_thresholded = zscored_smoothed_above.threshold(
threshold_band[1], method="below"
)
# Exclude oscillations where min_duration < length < max_duration
osc_ep = zscored_smoothed_thresholded.time_support
osc_ep = osc_ep.drop_short_intervals(duration_band[0], time_units="s")
osc_ep = osc_ep.drop_long_intervals(duration_band[1], time_units="s")
# Merge if inter-oscillation period is too short
osc_ep = osc_ep.merge_close_intervals(min_interval, time_units="s")
# Compute power, amplitude, and peak_time for each interval
powers = []
amplitudes = []
peak_times = []
for s, e in osc_ep.values:
seg = envelope.get(s, e)
if len(seg) == 0:
powers.append(np.nan)
amplitudes.append(np.nan)
peak_times.append(np.nan)
continue
power = np.mean(seg.values**2)
power_db = 10 * np.log10(power) if power > 0 else np.nan
amplitude = np.max(seg.values)
peak_idx = np.argmax(seg.values)
peak_time = seg.index.values[peak_idx]
powers.append(power_db)
amplitudes.append(amplitude)
peak_times.append(peak_time)
metadata = {
"power": powers,
"amplitude": amplitudes,
"peak_time": peak_times,
}
return nap.IntervalSet(start=osc_ep.start, end=osc_ep.end, metadata=metadata)