Source code for pynapple.process.filtering

"""Functions for highpass, lowpass, bandpass or bandstop filtering."""

import inspect
from collections.abc import Iterable
from functools import wraps
from numbers import Number

import numpy as np
import pandas as pd
from scipy.signal import butter, sosfiltfilt, sosfreqz

from .. import core as nap


def _validate_filtering_inputs(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        # Validate each positional argument
        sig = inspect.signature(func)
        kwargs = sig.bind_partial(*args, **kwargs).arguments

        cutoff = kwargs["cutoff"]
        filter_type = kwargs["filter_type"]
        if filter_type in ["lowpass", "highpass"] and not isinstance(cutoff, Number):
            raise ValueError(
                f"{filter_type} filter require a single number. {cutoff} provided instead."
            )
        if filter_type in ["bandpass", "bandstop"]:
            if (
                not isinstance(cutoff, Iterable)
                or len(cutoff) != 2
                or not all(isinstance(fq, Number) for fq in cutoff)
            ):
                raise ValueError(
                    f"{filter_type} filter require a tuple of two numbers. {cutoff} provided instead."
                )

        if "fs" in kwargs:
            if kwargs["fs"] is not None and not isinstance(kwargs["fs"], Number):
                raise ValueError(
                    "Invalid value for 'fs'. Parameter 'fs' should be of type float or int"
                )

        if "order" in kwargs:
            if not isinstance(kwargs["order"], int):
                raise ValueError(
                    "Invalid value for 'order': Parameter 'order' should be of type int"
                )

        if "transition_bandwidth" in kwargs:
            if not isinstance(kwargs["transition_bandwidth"], float):
                raise ValueError(
                    "Invalid value for 'transition_bandwidth'. 'transition_bandwidth' should be of type float"
                )

        # Call the original function with validated inputs
        return func(**kwargs)

    return wrapper


def _get_butter_coefficients(cutoff, filter_type, sampling_frequency, order=4):
    """Calls scipy butter"""
    return butter(order, cutoff, btype=filter_type, fs=sampling_frequency, output="sos")


def _compute_butterworth_filter(
    data, cutoff, sampling_frequency=None, filter_type="bandpass", order=4
):
    """
    Apply a Butterworth filter to the provided signal.
    """
    sos = _get_butter_coefficients(cutoff, filter_type, sampling_frequency, order)

    if nap.utils.get_backend() == "jax":
        from pynajax.jax_process_filtering import jax_sosfiltfilt

        out = jax_sosfiltfilt(
            sos,
            data.index.values,
            data.values,
            data.time_support.start,
            data.time_support.end,
        )

    else:
        out = np.zeros_like(data.d)
        for ep in data.time_support:
            slc = data.get_slice(start=ep.start[0], end=ep.end[0])
            out[slc] = sosfiltfilt(sos, data.d[slc], axis=0)

    kwargs = dict(t=data.t, d=out, time_support=data.time_support)
    if isinstance(data, nap.TsdFrame):
        kwargs["columns"] = data.columns
    return data.__class__(**kwargs)


def _compute_spectral_inversion(kernel):
    """
    Compute the spectral inversion.
    Parameters
    ----------
    kernel: ndarray

    Returns
    -------
    ndarray
    """
    kernel *= -1.0
    kernel[len(kernel) // 2] = 1.0 + kernel[len(kernel) // 2]
    return kernel


def _get_windowed_sinc_kernel(
    fc, filter_type, sampling_frequency, transition_bandwidth=0.02
):
    """
    Get the windowed-sinc kernel.
    Smith, S. (2003). Digital signal processing: a practical guide for engineers and scientists.
    Chapter 16, equation 16-4

    Parameters
    ----------
    fc: float or tuple of float
        Cutting frequency in Hz. Single float for 'lowpass' and 'highpass'. Tuple of float for
        'bandpass' and 'bandstop'.
    filter_type: str
        Either 'lowpass', 'highpass', 'bandstop' or 'bandpass'.
    sampling_frequency: float
        Sampling frequency in Hz.
    transition_bandwidth: float
        Percentage between 0 and 0.5
    Returns
    -------
    np.ndarray
    """
    M = int(np.rint(4.0 / transition_bandwidth))
    x = np.arange(-(M // 2), 1 + (M // 2))
    fc = np.transpose(np.atleast_2d(fc / sampling_frequency))
    kernel = np.sinc(2 * fc * x)
    kernel = kernel * np.blackman(len(x))
    kernel = np.transpose(kernel)
    kernel = kernel / kernel.sum(0)

    if filter_type == "lowpass":
        return kernel.flatten()
    elif filter_type == "highpass":
        return _compute_spectral_inversion(kernel.flatten())
    elif filter_type == "bandstop":
        kernel[:, 1] = _compute_spectral_inversion(kernel[:, 1])
        kernel = np.sum(kernel, axis=1)
        return kernel
    elif filter_type == "bandpass":
        kernel[:, 1] = _compute_spectral_inversion(kernel[:, 1])
        kernel = _compute_spectral_inversion(np.sum(kernel, axis=1))
        return kernel
    else:
        raise ValueError


def _compute_windowed_sinc_filter(
    data, freq, filter_type, sampling_frequency, transition_bandwidth=0.02
):
    """
    Apply a windowed-sinc filter to the provided signal.

    Parameters
    ----------
    data: Tsd, TsdFrame or TsdTensor

    freq: float or tuple of float
        Cutting frequency in Hz. Single float for 'lowpass' and 'highpass'. Tuple of float for
        'bandpass' and 'bandstop'.
    sampling_frequency: float
        Sampling frequency in Hz.
    filter_type: str
        Either 'lowpass', 'highpass', 'bandstop' or 'bandpass'.
    transition_bandwidth: float
        Percentage between 0 and 0.5
    Returns
    -------
    Tsd, TsdFrame or TsdTensor
    """
    kernel = _get_windowed_sinc_kernel(
        freq, filter_type, sampling_frequency, transition_bandwidth
    )
    return data.convolve(kernel)


@_validate_filtering_inputs
def _compute_filter(
    data,
    cutoff,
    fs=None,
    mode="butter",
    order=4,
    transition_bandwidth=0.02,
    filter_type="bandpass",
):
    """
    Filter the signal.
    """
    if not isinstance(data, nap.time_series._BaseTsd):
        raise ValueError(
            f"Invalid value: {data}. First argument should be of type Tsd, TsdFrame or TsdTensor"
        )

    if np.any(np.isnan(data)):
        raise ValueError(
            "The input signal contains NaN values, which are not supported for filtering. "
            "Please remove or handle NaNs before applying the filter. "
            "You can use the `dropna()` method to drop all NaN values."
        )

    if fs is None:
        fs = data.rate

    cutoff = np.array(cutoff, dtype=float)

    if mode == "butter":
        return _compute_butterworth_filter(
            data, cutoff, fs, filter_type=filter_type, order=order
        )
    if mode == "sinc":
        return _compute_windowed_sinc_filter(
            data, cutoff, filter_type, fs, transition_bandwidth=transition_bandwidth
        )
    else:
        raise ValueError("Unrecognized filter mode. Choose either 'butter' or 'sinc'")


[docs] def apply_bandpass_filter( data, cutoff, fs=None, mode="butter", order=4, transition_bandwidth=0.02 ): """ Apply a band-pass filter to the provided signal. Mode can be : - `"butter"` for Butterworth filter. In this case, `order` determines the order of the filter. - `"sinc"` for Windowed-Sinc convolution. `transition_bandwidth` determines the transition bandwidth. Parameters ---------- data : Tsd, TsdFrame, or TsdTensor The signal to be filtered. cutoff : (Numeric, Numeric) Cutoff frequencies in Hz. fs : float, optional The sampling frequency of the signal in Hz. If not provided, it will be inferred from the time axis of the data. mode : {'butter', 'sinc'}, optional Filtering mode. Default is 'butter'. order : int, optional The order of the Butterworth filter. Higher values result in sharper frequency cutoffs. Default is 4. transition_bandwidth : float, optional The transition bandwidth. 0.2 corresponds to 20% of the frequency band between 0 and the sampling frequency. The smaller the transition bandwidth, the larger the windowed-sinc kernel. Default is 0.02. Returns ------- filtered_data : Tsd, TsdFrame, or TsdTensor The filtered signal, with the same data type as the input. Raises ------ ValueError If `data` is not a Tsd, TsdFrame, or TsdTensor. If `cutoff` is not a tuple of two floats for "bandpass" and "bandstop" filters. If `fs` is not float or None. If `mode` is not "butter" or "sinc". If `order` is not an int. If "transition_bandwidth" is not a float. Notes ----- For the Butterworth filter, the cutoff frequency is defined as the frequency at which the amplitude of the signal is reduced by -3 dB (decibels). """ return _compute_filter( data, cutoff, fs=fs, mode=mode, order=order, transition_bandwidth=transition_bandwidth, filter_type="bandpass", )
[docs] def apply_bandstop_filter( data, cutoff, fs=None, mode="butter", order=4, transition_bandwidth=0.02 ): """ Apply a band-stop filter to the provided signal. Mode can be : - `"butter"` for Butterworth filter. In this case, `order` determines the order of the filter. - `"sinc"` for Windowed-Sinc convolution. `transition_bandwidth` determines the transition bandwidth. Parameters ---------- data : Tsd, TsdFrame, or TsdTensor The signal to be filtered. cutoff : (Numeric, Numeric) Cutoff frequencies in Hz. fs : float, optional The sampling frequency of the signal in Hz. If not provided, it will be inferred from the time axis of the data. mode : {'butter', 'sinc'}, optional Filtering mode. Default is 'butter'. order : int, optional The order of the Butterworth filter. Higher values result in sharper frequency cutoffs. Default is 4. transition_bandwidth : float, optional The transition bandwidth. 0.2 corresponds to 20% of the frequency band between 0 and the sampling frequency. The smaller the transition bandwidth, the larger the windowed-sinc kernel. Default is 0.02. Returns ------- filtered_data : Tsd, TsdFrame, or TsdTensor The filtered signal, with the same data type as the input. Raises ------ ValueError If `data` is not a Tsd, TsdFrame, or TsdTensor. If `cutoff` is not a tuple of two floats for "bandpass" and "bandstop" filters. If `fs` is not float or None. If `mode` is not "butter" or "sinc". If `order` is not an int. If "transition_bandwidth" is not a float. Notes ----- For the Butterworth filter, the cutoff frequency is defined as the frequency at which the amplitude of the signal is reduced by -3 dB (decibels). """ return _compute_filter( data, cutoff, fs=fs, mode=mode, order=order, transition_bandwidth=transition_bandwidth, filter_type="bandstop", )
[docs] def apply_highpass_filter( data, cutoff, fs=None, mode="butter", order=4, transition_bandwidth=0.02 ): """ Apply a high-pass filter to the provided signal. Mode can be : - `"butter"` for Butterworth filter. In this case, `order` determines the order of the filter. - `"sinc"` for Windowed-Sinc convolution. `transition_bandwidth` determines the transition bandwidth. Parameters ---------- data : Tsd, TsdFrame, or TsdTensor The signal to be filtered. cutoff : Numeric Cutoff frequency in Hz. fs : float, optional The sampling frequency of the signal in Hz. If not provided, it will be inferred from the time axis of the data. mode : {'butter', 'sinc'}, optional Filtering mode. Default is 'butter'. order : int, optional The order of the Butterworth filter. Higher values result in sharper frequency cutoffs. Default is 4. transition_bandwidth : float, optional The transition bandwidth. 0.2 corresponds to 20% of the frequency band between 0 and the sampling frequency. The smaller the transition bandwidth, the larger the windowed-sinc kernel. Default is 0.02. Returns ------- filtered_data : Tsd, TsdFrame, or TsdTensor The filtered signal, with the same data type as the input. Raises ------ ValueError If `data` is not a Tsd, TsdFrame, or TsdTensor. If `cutoff` is not a number. If `fs` is not float or None. If `mode` is not "butter" or "sinc". If `order` is not an int. If "transition_bandwidth" is not a float. Notes ----- For the Butterworth filter, the cutoff frequency is defined as the frequency at which the amplitude of the signal is reduced by -3 dB (decibels). """ return _compute_filter( data, cutoff, fs=fs, mode=mode, order=order, transition_bandwidth=transition_bandwidth, filter_type="highpass", )
[docs] def apply_lowpass_filter( data, cutoff, fs=None, mode="butter", order=4, transition_bandwidth=0.02 ): """ Apply a low-pass filter to the provided signal. Mode can be : - `"butter"` for Butterworth filter. In this case, `order` determines the order of the filter. - `"sinc"` for Windowed-Sinc convolution. `transition_bandwidth` determines the transition bandwidth. Parameters ---------- data : Tsd, TsdFrame, or TsdTensor The signal to be filtered. cutoff : Numeric Cutoff frequency in Hz. fs : float, optional The sampling frequency of the signal in Hz. If not provided, it will be inferred from the time axis of the data. mode : {'butter', 'sinc'}, optional Filtering mode. Default is 'butter'. order : int, optional The order of the Butterworth filter. Higher values result in sharper frequency cutoffs. Default is 4. transition_bandwidth : float, optional The transition bandwidth. 0.2 corresponds to 20% of the frequency band between 0 and the sampling frequency. The smaller the transition bandwidth, the larger the windowed-sinc kernel. Default is 0.02. Returns ------- filtered_data : Tsd, TsdFrame, or TsdTensor The filtered signal, with the same data type as the input. Raises ------ ValueError If `data` is not a Tsd, TsdFrame, or TsdTensor. If `cutoff` is not a number. If `fs` is not float or None. If `mode` is not "butter" or "sinc". If `order` is not an int. If "transition_bandwidth" is not a float. Notes ----- For the Butterworth filter, the cutoff frequency is defined as the frequency at which the amplitude of the signal is reduced by -3 dB (decibels). """ return _compute_filter( data, cutoff, fs=fs, mode=mode, order=order, transition_bandwidth=transition_bandwidth, filter_type="lowpass", )
[docs] @_validate_filtering_inputs def get_filter_frequency_response( cutoff, fs, filter_type, mode, order=4, transition_bandwidth=0.02 ): """ Utility function to evaluate the frequency response of a particular type of filter. The arguments are the same as the function `apply_lowpass_filter`, `apply_highpass_filter`, `apply_bandpass_filter` and `apply_bandstop_filter`. This function returns a pandas Series object with the index as frequencies. Parameters ---------- cutoff : Numeric or tuple of Numeric Cutoff frequency in Hz. fs : float The sampling frequency of the signal in Hz. filter_type: str Can be "lowpass", "highpass", "bandpass" or "bandstop" mode: str Can be "butter" or "sinc". order : int, optional The order of the Butterworth filter. Higher values result in sharper frequency cutoffs. Default is 4. transition_bandwidth : float, optional The transition bandwidth. 0.2 corresponds to 20% of the frequency band between 0 and the sampling frequency. The smaller the transition bandwidth, the larger the windowed-sinc kernel. Default is 0.02. Returns ------- pandas.Series """ cutoff = np.array(cutoff) if mode == "butter": sos = _get_butter_coefficients(cutoff, filter_type, fs, order) w, h = sosfreqz(sos, worN=1024, fs=fs) return pd.Series(index=w, data=np.abs(h)) if mode == "sinc": kernel = _get_windowed_sinc_kernel( cutoff, filter_type, fs, transition_bandwidth ) fft_result = np.fft.fft(kernel) fft_result = np.fft.fftshift(fft_result) fft_freq = np.fft.fftfreq(n=len(kernel), d=1 / fs) fft_freq = np.fft.fftshift(fft_freq) return pd.Series( index=fft_freq[fft_freq >= 0], data=np.abs(fft_result[fft_freq >= 0]) ) else: raise ValueError("Unrecognized filter mode. Choose either 'butter' or 'sinc'")