Filtering

This section provides comprehensive documentation for the filtering techniques used in the VitalDSP library. Each submodule offers specialized functionalities for processing and improving the quality of physiological signals, with particular focus on healthcare and biomedical applications.

Overview

The VitalDSP filtering module provides a comprehensive suite of signal processing techniques designed specifically for physiological signals such as ECG, PPG, EEG, and respiratory signals. The filtering capabilities range from basic traditional filters to advanced machine learning-inspired techniques, all optimized for healthcare applications.

Key Features

  • Multi-Type Filtering: Traditional, advanced, artifact removal, neural network, and ensemble filtering

  • Signal-Specific Optimization: Specialized algorithms for ECG, PPG, and other physiological signals

  • Real-Time Processing: Optimized for live signal processing and monitoring applications

  • Quality Assessment: Built-in signal quality metrics and validation

  • Clinical Validation: Methods validated on clinical datasets and real-world applications

Signal Filtering

The core signal filtering module provides essential preprocessing techniques for physiological signals. This module implements traditional digital signal processing filters that are fundamental to signal preprocessing and noise reduction.

Key Capabilities

  • Traditional Filters: Butterworth, Chebyshev (Type I & II), Elliptic, and Bessel filters

  • Filter Types: Low-pass, high-pass, band-pass, and band-stop filters

  • Adaptive Parameters: Automatic parameter adjustment based on signal characteristics

  • Multi-Channel Support: Processing of multi-channel physiological signals

  • Real-Time Optimization: Optimized for streaming and real-time applications

Filter Families

Butterworth Filters

Provides maximally flat frequency response in the passband, ideal for general-purpose filtering of physiological signals.

Chebyshev Type I Filters

Offers steeper roll-off than Butterworth with passband ripple, suitable for applications requiring sharp frequency cutoffs.

Chebyshev Type II Filters

Provides steeper roll-off with stopband ripple, ideal for applications where stopband attenuation is critical.

Elliptic Filters

Combines the steepest roll-off with both passband and stopband ripple, offering the most efficient filtering for demanding applications.

Bessel Filters

Maintains linear phase response, crucial for preserving signal timing in physiological measurements.

Clinical Applications

  • ECG Processing: Removal of powerline interference, muscle artifacts, and baseline wander

  • PPG Enhancement: Filtering of motion artifacts and ambient light interference

  • EEG Preprocessing: Removal of eye movement artifacts and electrical interference

  • Respiratory Signal Processing: Extraction of breathing patterns from chest movement or airflow signals

Signal Filtering Module for Physiological Signal Processing

This module provides comprehensive signal filtering capabilities for physiological signals including ECG, PPG, EEG, and other vital signs. It implements various filtering techniques including bandpass, lowpass, highpass, and notch filters with multiple filter types and adaptive parameter optimization.

Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0

Key Features: - Multiple filter types (Butterworth, Chebyshev, Elliptic, Bessel) - Bandpass, lowpass, highpass, and notch filtering - Adaptive parameter optimization - Signal validation and error handling - Real-time filtering capabilities - Comprehensive filter design options

Examples:

Basic bandpass filtering:
>>> import numpy as np
>>> from vitalDSP.filtering.signal_filtering import SignalFiltering, BandpassFilter
>>> signal = np.random.randn(1000) + np.sin(np.linspace(0, 10, 1000))
>>> filter_obj = SignalFiltering(signal, fs=250)
>>> filtered = filter_obj.bandpass_filter(low=0.5, high=40)
>>> print(f"Filtered signal shape: {filtered.shape}")
Advanced filtering with different types:
>>> bp_filter = BandpassFilter(band_type="butter", fs=250)
>>> butter_filtered = bp_filter.filter(signal, lowcut=0.5, highcut=40)
>>> cheby_filter = BandpassFilter(band_type="cheby1", fs=250)
>>> cheby_filtered = cheby_filter.filter(signal, lowcut=0.5, highcut=40)
Notch filtering for power line interference:
>>> notch_filtered = filter_obj.notch_filter(freq=50, quality_factor=30)
>>> print(f"Notch filtered signal shape: {notch_filtered.shape}")
class vitalDSP.filtering.signal_filtering.BandpassFilter(band_type='butter', fs=100)[source]

Bases: object

signal_bypass(cutoff, order, a_pass=3, rp=4, rs=40, btype='high')[source]

Generate the filter coefficients for the specified filter type and parameters.

Parameters:
  • cutoff (float) – Cutoff frequency of the filter.

  • order (int) – Order of the filter.

  • a_pass (float) – Passband ripple for Chebyshev and Elliptic filters.

  • rp (float) – Passband ripple for Elliptic filters.

  • rs (float) – Stopband attenuation for Elliptic filters.

  • btype (str) – Type of filter (‘high’, ‘low’, ‘bandpass’).

Returns:

b, a – Numerator (b) and denominator (a) polynomials of the IIR filter.

Return type:

tuple

Examples

>>> bp_filter = BandpassFilter("butter", fs=100)
>>> b, a = bp_filter.signal_bypass(cutoff=0.3, order=4, a_pass=3, rp=4, rs=40, btype='low')
>>> print(b, a)
signal_highpass_filter(data, cutoff, order=5, a_pass=3, rp=4, rs=40)[source]

Apply a high-pass filter to the data.

Parameters:
  • data (numpy.ndarray) – The input signal.

  • cutoff (float) – Cutoff frequency of the filter.

  • order (int, optional) – Order of the filter. Default is 5.

  • a_pass (float, optional) – Passband ripple for Chebyshev and Elliptic filters. Default is 3.

  • rp (float, optional) – Passband ripple for Elliptic filters. Default is 4.

  • rs (float, optional) – Stopband attenuation for Elliptic filters. Default is 40.

Returns:

y – The filtered signal.

Return type:

numpy.ndarray

Raises:

ValueError – If the length of the input signal is too short for the specified filter.

Examples

>>> signal = np.array([1, 2, 3, 4, 5])
>>> bp_filter = BandpassFilter("butter", fs=100)
>>> filtered_signal = bp_filter.signal_highpass_filter(signal, cutoff=0.3, order=4)
>>> print(filtered_signal)
signal_lowpass_filter(data, cutoff, order=3, a_pass=3, rp=4, rs=40)[source]

Apply a low-pass filter to the data.

Parameters:
  • data (numpy.ndarray) – The input signal.

  • cutoff (float) – Cutoff frequency of the filter.

  • order (int, optional) – Order of the filter. Default is 3.

  • a_pass (float, optional) – Passband ripple for Chebyshev and Elliptic filters. Default is 3.

  • rp (float, optional) – Passband ripple for Elliptic filters. Default is 4.

  • rs (float, optional) – Stopband attenuation for Elliptic filters. Default is 40.

Returns:

y – The filtered signal.

Return type:

numpy.ndarray

Examples

>>> signal = np.array([1, 2, 3, 4, 5])
>>> bp_filter = BandpassFilter("butter", fs=100)
>>> filtered_signal = bp_filter.signal_lowpass_filter(signal, cutoff=0.3, order=4)
>>> print(filtered_signal)
class vitalDSP.filtering.signal_filtering.SignalFiltering(signal)[source]

Bases: object

A class for applying various filtering techniques to signals.

This class provides methods for common signal filtering tasks, such as applying moving averages, Gaussian filters, Butterworth filters, and median filters. These techniques are essential for preprocessing signals in various fields, including biomedical signal processing (e.g., ECG, EEG).

savgol_filter : static method

Applies a Savitzky-Golay filter.

moving_average : function

Applies a moving average filter.

gaussian : function

Applies a Gaussian filter.

butterworth : function

Applies a Butterworth filter.

chebyshev : function

Applies a Chebyshev filter.

elliptic : function

Applies an Elliptic filter.

bandpass : function

Applies a bandpass filter using a selected filter type.

median : function

Applies a median filter.

_apply_iir_filter : function

Internal method to apply IIR filters.

Examples

>>> import numpy as np
>>> from vitalDSP.filtering.signal_filtering import SignalFiltering
>>>
>>> # Example 1: Basic signal filtering
>>> signal = np.random.randn(1000)  # Simulated signal
>>> sf = SignalFiltering(signal)
>>> filtered_signal = sf.bandpass(lowcut=0.5, highcut=30, fs=256, order=4)
>>> print(f"Filtered signal shape: {filtered_signal.shape}")
>>>
>>> # Example 2: Moving average filtering
>>> ma_filtered = sf.moving_average(window_size=5)
>>> print(f"Moving average filtered: {ma_filtered.shape}")
>>>
>>> # Example 3: Gaussian filtering
>>> gaussian_filtered = sf.gaussian(sigma=1.0)
>>> print(f"Gaussian filtered: {gaussian_filtered.shape}")
>>>
>>> # Example 4: Savitzky-Golay filtering
>>> sg_filtered = SignalFiltering.savgol_filter(signal, window_length=5, polyorder=2)
>>> print(f"Savitzky-Golay filtered: {sg_filtered.shape}")
bandpass(lowcut, highcut, fs, order=4, filter_type='butter', iterations=1)[source]

Apply a bandpass filter using the selected filter type.

Parameters:
  • lowcut (float) – The lower cutoff frequency.

  • highcut (float) – The upper cutoff frequency.

  • fs (float) – The sampling frequency.

  • order (int, optional) – The order of the filter. Default is 4.

  • filter_type (str, optional) – Type of filter to apply (‘butter’, ‘cheby’, ‘elliptic’). Default is ‘butter’.

Returns:

filtered_signal – The filtered signal.

Return type:

numpy.ndarray

Examples

>>> import numpy as np
>>> fs = 1000  # Sampling frequency
>>> lowcut = 0.5  # Lower cutoff frequency
>>> highcut = 50  # Upper cutoff frequency
>>> signal = np.sin(2 * np.pi * 10 * np.arange(0, 10, 1/fs)) + np.random.randn(10 * fs) * 0.1
>>> sf = SignalFiltering(signal)
>>> filtered_signal = sf.bandpass(lowcut, highcut, fs, order=4, filter_type='butter')
>>> print(filtered_signal)
bessel(cutoff, fs, order=4, btype='low', iterations=1)[source]

Bessel (Thomson) filter implementation.

The Bessel filter has a maximally flat group delay, which means it preserves the waveform shape of filtered signals in the passband. This makes it ideal for applications where maintaining pulse shape is critical, such as ECG and PPG signal processing.

Parameters:
  • cutoff (float or list) – Cutoff frequency of the filter. For bandpass/bandstop, provide [low, high].

  • fs (float) – Sampling frequency of the signal.

  • order (int, optional) – Order of the Bessel filter. Default is 4.

  • btype (str, optional) – Type of filter - ‘low’, ‘high’, ‘band’, or ‘bandstop’. Default is ‘low’.

  • iterations (int, optional) – The number of times to apply the filter. Default is 1.

Returns:

filtered_signal – The filtered signal.

Return type:

numpy.ndarray

Notes

The Bessel filter is particularly useful for: - ECG signal processing (preserves QRS complex shape) - PPG signal processing (preserves pulse waveform) - Applications requiring linear phase response - Situations where overshoot and ringing must be minimized

Examples

>>> import numpy as np
>>> from vitalDSP.filtering.signal_filtering import SignalFiltering
>>> # ECG-like signal with sharp peaks
>>> t = np.linspace(0, 1, 1000)
>>> ecg_signal = np.sin(2 * np.pi * 1.2 * t) + 0.5 * np.sin(2 * np.pi * 60 * t)
>>> sf = SignalFiltering(ecg_signal)
>>> # Apply Bessel lowpass to remove high-frequency noise while preserving peak shape
>>> filtered = sf.bessel(cutoff=50, fs=1000, order=4, btype='low')
>>> print(f"Filtered signal shape: {filtered.shape}")
butter(order, cutoff, btype='low', fs=1.0)[source]

Custom implementation of the Butterworth filter design using bilinear transformation.

Parameters:
  • order (int) – The order of the filter.

  • cutoff (float or list of float) – The critical frequency or frequencies.

  • btype (str) – The type of filter (‘low’, ‘high’, ‘band’).

  • fs (float) – The sampling frequency.

Returns:

b, a – Numerator (b) and denominator (a) polynomials of the IIR filter.

Return type:

tuple

Examples

>>> import numpy as np
>>> b, a = SignalFiltering().butter(4, 0.3, btype='low', fs=1.0)
>>> print(b, a)
butterworth(cutoff, fs, order=4, btype='low', iterations=1, adaptive=True)[source]

Apply a Butterworth filter to the signal.

Parameters:
  • cutoff (float) – Cutoff frequency of the filter.

  • fs (float) – Sampling frequency of the signal.

  • order (int, optional) – Order of the Butterworth filter. Default is 4.

  • btype (str, optional) – Type of filter - ‘low’ or ‘high’. Default is ‘low’.

  • iterations (int, optional) – The number of times to apply the Butterworth filter for additional filtering. Default is 1.

  • adaptive (bool, optional) – Whether to use adaptive parameter adjustment. Default is True.

Returns:

filtered_signal – The filtered signal.

Return type:

numpy.ndarray

Examples

>>> import numpy as np
>>> fs = 1000  # Sampling frequency
>>> cutoff = 0.5  # Cutoff frequency
>>> signal = np.sin(2 * np.pi * 0.1 * np.arange(0, 10, 1/fs)) + np.random.randn(10 * fs) * 0.1
>>> sf = SignalFiltering(signal)
>>> filtered_signal = sf.butterworth(cutoff, fs, order=4)
>>> print(filtered_signal)
chebyshev(cutoff, fs, order=4, btype='low', ripple=0.05, iterations=1)[source]

Custom implementation of the Chebyshev Type I filter.

Parameters:
  • cutoff (float) – Cutoff frequency of the filter.

  • fs (float) – Sampling frequency of the signal.

  • order (int, optional) – Order of the Chebyshev filter. Default is 4.

  • btype (str, optional) – Type of filter - ‘low’ or ‘high’. Default is ‘low’.

  • ripple (float, optional) – The maximum ripple allowed in the passband. Default is 0.05.

  • iterations (int, optional) – The number of times to apply the Chebyshev filter for additional filtering. Default is 1.

Returns:

filtered_signal – The filtered signal.

Return type:

numpy.ndarray

chebyshev2(cutoff, fs, order=4, btype='low', stopband_attenuation=40, iterations=1)[source]

Chebyshev Type II filter implementation.

The Chebyshev Type II filter has a flat passband and equiripple stopband attenuation. Unlike Type I, it has zeros in the stopband which provides sharper roll-off characteristics. This filter is useful when you need better stopband attenuation with acceptable passband characteristics.

Parameters:
  • cutoff (float or list) – Cutoff frequency of the filter. For bandpass/bandstop, provide [low, high].

  • fs (float) – Sampling frequency of the signal.

  • order (int, optional) – Order of the Chebyshev Type II filter. Default is 4.

  • btype (str, optional) – Type of filter - ‘low’, ‘high’, ‘band’, or ‘bandstop’. Default is ‘low’.

  • stopband_attenuation (float, optional) – Minimum attenuation required in the stopband in dB. Default is 40 dB.

  • iterations (int, optional) – The number of times to apply the filter. Default is 1.

Returns:

filtered_signal – The filtered signal.

Return type:

numpy.ndarray

Examples

>>> import numpy as np
>>> from vitalDSP.filtering.signal_filtering import SignalFiltering
>>> signal = np.random.randn(1000)
>>> sf = SignalFiltering(signal)
>>> filtered = sf.chebyshev2(cutoff=50, fs=250, order=4, btype='low', stopband_attenuation=40)
>>> print(f"Filtered signal shape: {filtered.shape}")
elliptic(cutoff, fs, order=4, btype='low', ripple=0.05, stopband_attenuation=40, iterations=1)[source]

Custom implementation of the Elliptic filter.

Parameters:
  • cutoff (float) – Cutoff frequency of the filter.

  • fs (float) – Sampling frequency of the signal.

  • order (int, optional) – Order of the Elliptic filter. Default is 4.

  • btype (str, optional) – Type of filter - ‘low’ or ‘high’. Default is ‘low’.

  • ripple (float, optional) – The maximum ripple allowed in the passband. Default is 0.05.

  • stopband_attenuation (float, optional) – Minimum attenuation in the stopband. Default is 40 dB.

  • iterations (int, optional) – The number of times to apply the Elliptic filter for additional filtering. Default is 1.

Returns:

filtered_signal – The filtered signal.

Return type:

numpy.ndarray

gaussian(sigma=1.0, iterations=1)[source]

Applies a Gaussian filter to the signal.

Parameters:
  • sigma (float) – The standard deviation of the Gaussian kernel.

  • iterations (int, optional) – The number of times to apply the Gaussian filter for additional smoothing. Default is 1.

Returns:

filtered_signal – The filtered signal.

Return type:

numpy.ndarray

Examples

>>> import numpy as np
>>> signal = np.array([1, 2, 3, 4, 5])
>>> sf = SignalFiltering(signal)
>>> filtered_signal = sf.gaussian(1.5)
>>> print(filtered_signal)
static gaussian_filter1d(signal, sigma)[source]

Custom implementation of a 1D Gaussian filter.

Parameters:
  • signal (numpy.ndarray) – The input signal.

  • sigma (float) – The standard deviation of the Gaussian kernel.

Returns:

smoothed_signal – The smoothed signal.

Return type:

numpy.ndarray

Examples

>>> import numpy as np
>>> signal = np.array([1, 2, 3, 4, 5])
>>> smoothed_signal = SignalFiltering.gaussian_filter1d(signal, 1.0)
>>> print(smoothed_signal)
[1.14285714 2.14285714 3. 4. 5.]
static gaussian_kernel(size, sigma)[source]

Generate a Gaussian kernel.

Parameters:
  • size (int) – The size of the kernel (must be odd).

  • sigma (float) – Standard deviation of the Gaussian distribution.

Returns:

kernel – The Gaussian kernel.

Return type:

numpy.ndarray

Examples

>>> kernel = SignalFiltering.gaussian_kernel(5, sigma=1.0)
>>> print(kernel)
[0.05448868 0.24420134 0.40261995 0.24420134 0.05448868]
median(kernel_size=3, iterations=1, method='edge')[source]

Apply a median filter to the signal with optional repeated filtering.

Parameters:
  • kernel_size (int, optional) – Size of the median filter kernel. Default is 3.

  • iterations (int, optional) – The number of times to apply the median filter for additional filtering. Default is 1.

  • method (str, optional) – Padding method: ‘edge’ (default), ‘reflect’, or ‘constant’.

Returns:

filtered_signal – The filtered signal.

Return type:

numpy.ndarray

Examples

>>> import numpy as np
>>> signal = np.array([1, 2, 3, 100, 5, 6, 7, 8, 9, 10])  # Note the spike at value 100
>>> sf = SignalFiltering(signal)
>>> filtered_signal = sf.median(kernel_size=3)
>>> print(filtered_signal)
[1 2 3 5 5 6 7 8 9 9]
moving_average(window_size, iterations=1, method='edge')[source]

Applies a moving average filter to the signal with optional repeated scanning.

This method smooths the signal by averaging neighboring data points within a defined window size. Optionally, the smoothing can be repeated multiple times for enhanced effect. This technique is commonly used to reduce random noise and reveal trends in signals like EEG, ECG, and PPG.

Parameters:
  • window_size (int) – The size of the moving window.

  • iterations (int, optional) – The number of times to apply the moving average for additional smoothing. Default is 1.

  • method (str, optional) – Padding method: ‘edge’ (default), ‘reflect’, or ‘constant’. Different methods may yield better results for specific types of signals (e.g., vital signals like EEG, ECG, PPG).

Returns:

filtered_signal – The filtered signal.

Return type:

numpy.ndarray

Examples

>>> import numpy as np
>>> signal = np.array([1, 2, 3, 4, 5])
>>> sf = SignalFiltering(signal)
>>> filtered_signal = sf.moving_average(3, iterations=2, method="reflect")
>>> print(filtered_signal)
static savgol_filter(signal, window_length, polyorder)[source]

Apply a Savitzky-Golay filter to smooth the signal.

Parameters:
  • signal (numpy.ndarray) – The input signal.

  • window_length (int) – The length of the filter window (must be odd).

  • polyorder (int) – The order of the polynomial to fit.

Returns:

smoothed_signal – The smoothed signal.

Return type:

numpy.ndarray

Examples

>>> import numpy as np
>>> signal = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9])
>>> filter = SignalFiltering(signal)
>>> smoothed_signal = filter.savgol_filter(signal, 3, 2)
>>> print(smoothed_signal)
[1. 2. 3. 4. 5. 6. 7. 8. 9.]

Artifact Removal

The artifact removal module specializes in identifying and removing various types of artifacts that commonly affect physiological signals. This module uses advanced signal processing techniques to preserve the underlying physiological information while removing unwanted components.

Key Capabilities

  • Motion Artifact Removal: Detection and removal of movement-related artifacts in wearable devices

  • Powerline Interference: Removal of 50/60 Hz electrical interference

  • Baseline Wander: Correction of slow baseline variations in ECG and PPG signals

  • Muscle Artifacts: Identification and removal of electromyographic (EMG) interference

  • Eye Movement Artifacts: Specialized removal for EEG signals

Advanced Techniques

Adaptive Thresholding

Dynamically adjusts detection thresholds based on signal characteristics and noise levels.

Wavelet-Based Removal

Uses wavelet transforms to identify and remove artifacts in specific frequency bands while preserving physiological content.

Iterative Techniques

Employs iterative algorithms to progressively refine artifact detection and removal.

Machine Learning Integration

Uses trained models to identify artifact patterns and improve removal accuracy.

Clinical Applications

  • Wearable Device Data: Processing of signals from smartwatches, fitness trackers, and other wearable devices

  • ICU Monitoring: Real-time artifact removal in critical care settings

  • Sleep Studies: Processing of overnight physiological recordings

  • Ambulatory Monitoring: Long-term signal processing for outpatient monitoring

Signal Filtering Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0

Key Features: - Object-oriented design with comprehensive classes - Multiple processing methods and functions - NumPy integration for numerical computations - SciPy integration for advanced signal processing

Examples:

Basic usage:
>>> import numpy as np
>>> from vitalDSP.filtering.artifact_removal import ArtifactRemoval
>>> signal = np.random.randn(1000)
>>> ar = ArtifactRemoval(signal, fs=256)
>>> cleaned = ar.baseline_correction()
>>> print(f'Processing result: {result}')
class vitalDSP.filtering.artifact_removal.ArtifactRemoval(signal)[source]

Bases: object

A class for removing various types of artifacts from signals.

mean_subtraction : function

Removes artifacts by subtracting the mean of the signal.

baseline_correction : function

Corrects baseline drift by applying a high-pass filter.

median_filter_removal : function

Removes spike artifacts using a median filter.

wavelet_denoising : function

Removes noise using wavelet-based denoising with various mother wavelets.

adaptive_filtering : function

Uses an adaptive filter to remove artifacts correlated with reference signals.

notch_filter : function

Removes powerline interference using a notch filter.

pca_artifact_removal : function

Uses Principal Component Analysis (PCA) to remove artifacts.

ica_artifact_removal : function

Uses Independent Component Analysis (ICA) to remove artifacts using NumPy.

adaptive_filtering(reference_signal=None, learning_rate=0.01, num_iterations=100)[source]

Use an adaptive filter to remove artifacts correlated with a reference signal.

This method uses Least Mean Squares (LMS) adaptive filtering to iteratively adjust the signal to minimize the error between the filtered signal and a reference signal. It is particularly useful for removing artifacts that are correlated with another signal, such as EOG artifacts in EEG recordings, motion artifacts in PPG, or respiratory artifacts.

If no reference signal is provided, the filter adapts towards zero (artifact removal/denoising).

Parameters:
  • reference_signal (numpy.ndarray, optional) – The reference signal correlated with the artifact. If None, adapts towards zero (removes DC offset and baseline drift). Must have the same length as the input signal.

  • learning_rate (float, default=0.01) – The learning rate (step size) for the adaptive filter. Controls convergence speed. Typical range: 0.001 - 0.5 - Lower values (0.001-0.01): Slower convergence, more stable - Higher values (0.1-0.5): Faster convergence, may oscillate

  • num_iterations (int, default=100) – The number of iterations for adaptation. More iterations = better convergence.

Returns:

clean_signal – The artifact-removed signal with the same length as the input.

Return type:

numpy.ndarray

Raises:

ValueError – If reference_signal length doesn’t match signal length.

Notes

Algorithm: Least Mean Squares (LMS) adaptive filtering - The filter iteratively adjusts the signal based on the error - Error = filtered_signal - reference_signal - Update rule: filtered_signal -= learning_rate * error

Use Cases: 1. With reference signal: Remove correlated artifacts (EOG from EEG, motion from PPG) 2. Without reference signal: General denoising and DC offset removal

Convergence: - Monitor the error reduction to ensure proper convergence - If the filter diverges (error increases), reduce the learning rate - Typical convergence: 50-200 iterations with learning_rate=0.01

See also

baseline_correction

For simple baseline drift removal

mean_subtraction

For DC offset removal

Examples

Example 1: Remove EOG artifacts from EEG using reference EOG signal

>>> import numpy as np
>>> # Simulate EEG contaminated with EOG
>>> eeg_clean = np.sin(2*np.pi*10*np.linspace(0, 1, 1000))  # 10 Hz alpha wave
>>> eog_artifact = 2 * np.sin(2*np.pi*2*np.linspace(0, 1, 1000))  # 2 Hz eye movement
>>> eeg_contaminated = eeg_clean + 0.5 * eog_artifact
>>>
>>> # Remove EOG artifact using reference EOG channel
>>> ar = ArtifactRemoval(eeg_contaminated)
>>> eeg_cleaned = ar.adaptive_filtering(
...     reference_signal=eog_artifact,
...     learning_rate=0.05,
...     num_iterations=150
... )
>>> print(f"Original SNR: {10*np.log10(np.var(eeg_clean)/np.var(eeg_contaminated-eeg_clean)):.2f} dB")
>>> print(f"Cleaned SNR: {10*np.log10(np.var(eeg_clean)/np.var(eeg_cleaned-eeg_clean)):.2f} dB")

Example 2: Remove motion artifacts from PPG using accelerometer reference

>>> # Simulate PPG with motion artifact
>>> ppg_signal = np.sin(2*np.pi*1.2*np.linspace(0, 10, 1000))  # Heart rate ~72 bpm
>>> motion_artifact = 0.8 * np.sin(2*np.pi*0.5*np.linspace(0, 10, 1000))  # Motion
>>> ppg_contaminated = ppg_signal + motion_artifact
>>>
>>> # Remove motion using accelerometer as reference
>>> ar = ArtifactRemoval(ppg_contaminated)
>>> ppg_cleaned = ar.adaptive_filtering(
...     reference_signal=motion_artifact,
...     learning_rate=0.02,
...     num_iterations=100
... )

Example 3: General denoising without reference signal

>>> # Noisy signal
>>> signal = np.sin(2*np.pi*5*np.linspace(0, 2, 500)) + 0.3*np.random.randn(500)
>>> ar = ArtifactRemoval(signal)
>>> denoised = ar.adaptive_filtering(learning_rate=0.01, num_iterations=100)
>>> print(f"DC offset removed: {np.mean(signal):.3f} -> {np.mean(denoised):.3f}")

Example 4: Respiratory artifact removal from ECG

>>> # ECG with respiratory baseline wander
>>> ecg = np.sin(2*np.pi*1.2*np.linspace(0, 5, 1000))  # Heart rate
>>> respiratory = 0.4 * np.sin(2*np.pi*0.3*np.linspace(0, 5, 1000))  # Breathing
>>> ecg_with_resp = ecg + respiratory
>>>
>>> # Remove using respiratory belt signal as reference
>>> ar = ArtifactRemoval(ecg_with_resp)
>>> ecg_cleaned = ar.adaptive_filtering(
...     reference_signal=respiratory,
...     learning_rate=0.03,
...     num_iterations=120
... )
baseline_correction(cutoff=0.5, fs=1000)[source]

Correct baseline drift by applying a high-pass filter.

This method is particularly effective for removing low-frequency baseline wander in signals such as ECG or PPG, where baseline drift can obscure important features.

Parameters:
  • cutoff (float) – The cutoff frequency for the high-pass filter.

  • fs (float) – The sampling frequency of the signal.

Returns:

clean_signal – The baseline-corrected signal.

Return type:

numpy.ndarray

Examples

>>> signal = np.array([1, 2, 3, 4, 5])
>>> ar = ArtifactRemoval(signal)
>>> clean_signal = ar.baseline_correction(cutoff=0.5, fs=1000)
>>> print(clean_signal)
[-0.4995 -0.4995 -0.4995 -0.4995 -0.4995]
ica_artifact_removal(num_components=1, max_iterations=1000, tol=1e-05, seed=23, window_size=None, step_size=None, batch_size=1000)[source]

Use Independent Component Analysis (ICA) to remove artifacts.

This enhanced version automatically handles 1D signals by creating synthetic components. ICA separates the signal into independent components and allows for the removal of specific components identified as artifacts.

For 1D signals (single channel), synthetic components are automatically generated using derivatives and delayed versions, enabling ICA to separate artifacts from the underlying physiological signal.

Parameters:
  • num_components (int) – The number of independent components to retain. For 1D signals, this determines how many synthetic components to generate. Recommended: 3-5 for good artifact separation.

  • max_iterations (int) – The maximum number of iterations for convergence.

  • tol (float) – The tolerance level for convergence.

  • seed (int) – The seed for random number generation to ensure reproducibility.

  • window_size (int, optional) – The size of the sliding window to create a multi-dimensional signal. If None, automatic synthetic component generation is used for 1D signals.

  • step_size (int, optional) – The step size for the sliding window. Must be used with window_size.

  • batch_size (int, optional) – The batch size for IncrementalPCA to manage memory usage (legacy parameter).

Returns:

clean_signal – The artifact-removed signal (same shape as input).

Return type:

numpy.ndarray

Notes

For 1D Signals (Single Channel): The method automatically creates synthetic components from: - Original signal - First derivative (captures rapid changes/spikes) - Delayed version (captures temporal patterns) - Second derivative (captures acceleration/motion artifacts) - Smoothed version (captures baseline trends)

For Multi-Channel Signals: Uses traditional windowing approach if window_size is specified.

Examples

>>> # Example 1: 1D signal (most common case)
>>> import numpy as np
>>> signal_1d = np.sin(2*np.pi*np.linspace(0,10,1000)) + 0.1*np.random.randn(1000)
>>> ar = ArtifactRemoval(signal_1d)
>>> clean = ar.ica_artifact_removal(num_components=3)
>>> print(clean.shape)  # (1000,)
>>>
>>> # Example 2: With windowing (for backward compatibility)
>>> signal = np.array([1, 2, 3, 4, 5, 6, 7, 8])
>>> ar = ArtifactRemoval(signal)
>>> clean = ar.ica_artifact_removal(num_components=1, window_size=4, step_size=2)
>>> print(clean.shape)  # (8,)
mean_subtraction()[source]

Remove artifacts by subtracting the mean of the signal.

This method is effective for removing constant or slow-varying baseline artifacts, which are common in many physiological signals like ECG or EEG.

Returns:

clean_signal – The artifact-removed signal.

Return type:

numpy.ndarray

Examples

>>> signal = np.array([1, 2, 3, 4, 5])
>>> ar = ArtifactRemoval(signal)
>>> clean_signal = ar.mean_subtraction()
>>> print(clean_signal)
[0 1 2 3 4]
median_filter_removal(kernel_size=3)[source]

Remove spike artifacts using a median filter.

This method is particularly useful for removing sharp spikes or noise in the signal, such as motion artifacts in PPG or EOG signals.

Parameters:

kernel_size (int) – The size of the median filter kernel. A larger kernel size will smooth more but may remove important signal features.

Returns:

clean_signal – The artifact-removed signal.

Return type:

numpy.ndarray

Examples

>>> signal = np.array([1, 100, 3, 4, 5])
>>> ar = ArtifactRemoval(signal)
>>> clean_signal = ar.median_filter_removal(kernel_size=3)
>>> print(clean_signal)
[1 3 4 4 5]
notch_filter(freq=50, fs=1000, Q=30)[source]

Remove powerline interference using a notch filter.

This method is effective for removing specific frequency artifacts like powerline interference (50/60 Hz) from physiological signals.

Parameters:
  • freq (float) – The frequency to be removed (e.g., 50 Hz for powerline interference).

  • fs (float) – The sampling frequency of the signal.

  • Q (float) – The quality factor of the notch filter, which controls the bandwidth of the filter.

Returns:

clean_signal – The artifact-removed signal.

Return type:

numpy.ndarray

Examples

>>> signal = np.array([1, 2, 3, 4, 5])
>>> ar = ArtifactRemoval(signal)
>>> clean_signal = ar.notch_filter(freq=50, fs=1000, Q=30)
>>> print(clean_signal)
pca_artifact_removal(num_components=1, window_size=100, overlap=50)[source]

Use Principal Component Analysis (PCA) to remove artifacts.

This method removes artifacts by reconstructing the signal with a reduced number of principal components, which can be particularly useful for signals with multiple overlapping noise sources.

Parameters:
  • num_components (int) – The number of principal components to retain.

  • window_size (int, optional) – The size of each window used to segment the signal (default is 100).

  • overlap (int, optional) – The number of samples that each window should overlap (default is 50).

Returns:

clean_signal – The artifact-removed signal.

Return type:

numpy.ndarray

Examples

>>> signal = np.array([1, 2, 3, 4, 5])
>>> ar = ArtifactRemoval(signal)
>>> clean_signal = ar.pca_artifact_removal(num_components=1, window_size=2, overlap=1)
>>> print(clean_signal)
wavelet_denoising(wavelet_type='db', level=1, order=4, custom_wavelet=None, smoothing='lowpass', **smoothing_params)[source]

Remove noise using wavelet-based denoising with various mother wavelets.

This method decomposes the signal into approximation and detail coefficients using wavelets, thresholds the detail coefficients, and reconstructs the signal. It is effective for denoising signals where noise is present at multiple scales.

Parameters:
  • wavelet_type (str, optional) – The type of wavelet to use (‘haar’, ‘db’, ‘sym’, ‘coif’, ‘custom’). Default is ‘db’.

  • level (int, optional) – The level of decomposition. Higher levels capture more global features. Default is 1.

  • order (int, optional) – The order of the wavelet (used for ‘db’, ‘sym’, and ‘coif’ wavelets). Default is 4.

  • custom_wavelet (numpy.ndarray, optional) – A custom wavelet provided by the user if wavelet_type is ‘custom’. Default is None.

Returns:

clean_signal – The denoised signal with the same length as the original signal.

Return type:

numpy.ndarray

Examples

>>> signal = np.array([1, 2, 3, 4, 5])
>>> ar = ArtifactRemoval(signal)
>>> clean_signal = ar.wavelet_denoising(wavelet_type='db', level=2, order=4)
>>> print(clean_signal)
>>> # Example using a custom wavelet
>>> custom_wavelet = np.array([0.2, 0.5, 0.2])
>>> clean_signal = ar.wavelet_denoising(wavelet_type='custom', custom_wavelet=custom_wavelet)
>>> print(clean_signal)

Advanced Signal Filtering

The advanced signal filtering module implements sophisticated filtering techniques that go beyond traditional DSP methods. These techniques are particularly suited for dynamic, non-linear physiological systems and challenging signal processing scenarios.

Key Capabilities

  • Adaptive Filtering: Self-adjusting filters that adapt to changing signal characteristics

  • Kalman Filtering: Optimal state estimation for dynamic systems with noise

  • Particle Filtering: Non-linear filtering for complex physiological models

  • Ensemble Methods: Combination of multiple filtering approaches for robust performance

  • Neural Network Filters: Deep learning-based filtering for complex artifact patterns

Advanced Techniques

Adaptive Filters

Automatically adjust filter parameters based on real-time signal analysis and noise characteristics.

Kalman Filters

Provide optimal state estimation for linear systems with Gaussian noise, ideal for tracking physiological parameters.

Particle Filters

Handle non-linear and non-Gaussian systems, suitable for complex physiological modeling.

Ensemble Filtering

Combines multiple filtering approaches to achieve robust performance across diverse signal conditions.

Neural Network Integration

Uses deep learning models trained on large datasets to identify and remove complex artifact patterns.

Clinical Applications

  • Critical Care Monitoring: Advanced filtering for ICU and emergency room applications

  • Research Applications: High-precision signal processing for clinical research

  • Diagnostic Support: Enhanced signal quality for automated diagnosis systems

  • Long-Term Monitoring: Robust filtering for extended patient monitoring periods

Performance Optimization

  • GPU Acceleration: CUDA support for high-performance computing applications

  • Parallel Processing: Multi-threaded implementations for real-time applications

  • Memory Optimization: Efficient memory usage for large datasets and long-term monitoring

  • Adaptive Complexity: Dynamic adjustment of algorithm complexity based on available computational resources

Advanced Signal Filtering Module for Physiological Signal Processing

This module implements advanced filtering techniques for physiological signals including Kalman filtering, optimization-based filtering, gradient descent filtering, ensemble filtering, convolution-based filtering, and attention-based filtering. These methods are designed for complex scenarios such as real-time filtering, adaptive filtering, and filtering in noisy environments.

Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0

Key Features: - Kalman filtering for real-time signal estimation - Optimization-based filtering with custom loss functions - Gradient descent adaptive filtering - Ensemble filtering combining multiple methods - Convolution-based filtering with custom kernels - Attention-based dynamic filtering

Examples:

Basic Kalman filtering:
>>> import numpy as np
>>> from vitalDSP.filtering.advanced_signal_filtering import AdvancedSignalFiltering
>>> signal = np.random.randn(1000) + np.sin(np.linspace(0, 10, 1000))
>>> af = AdvancedSignalFiltering(signal)
>>> filtered = af.kalman_filter(R=0.1, Q=0.01)
Optimization-based filtering:
>>> target = np.sin(np.linspace(0, 10, 1000))
>>> filtered = af.optimization_based_filtering(target, loss_type="mse", iterations=50)
Ensemble filtering:
>>> filters = ['kalman', 'optimization', 'gradient_descent']
>>> ensemble_result = af.ensemble_filtering(filters, method="mean", num_iterations=5)
class vitalDSP.filtering.advanced_signal_filtering.AdvancedSignalFiltering(signal)[source]

Bases: object

A class for applying advanced filtering techniques to signals.

This class implements a variety of advanced filtering methods, including Kalman filters, optimization-based filtering, gradient descent filtering, ensemble filtering, convolution-based filtering, and attention-based filtering. These methods are designed to enhance signal processing in complex scenarios such as real-time filtering, adaptive filtering, and filtering in noisy environments.

kalman_filter(R=1, Q=1)[source]

Applies a Kalman filter for real-time signal estimation and noise reduction.

optimization_based_filtering(target, loss_type='mse', custom_loss_func=None, initial_guess=0, learning_rate=0.01, iterations=100)[source]

Applies filtering based on a custom or predefined loss function using optimization techniques.

gradient_descent_filter(target, learning_rate=0.01, iterations=100)[source]

Uses gradient descent for adaptive filtering to reduce the difference between the signal and a target.

ensemble_filtering(filters, method='mean', weights=None, num_iterations=10, learning_rate=0.01)[source]

Combines multiple filters using ensemble methods such as mean, weighted mean, bagging, or boosting.

convolution_based_filter(kernel_type='smoothing', custom_kernel=None, kernel_size=3)[source]

Applies convolutional filtering using predefined or custom kernels.

attention_based_filter(attention_type="uniform", custom_weights=None, size=5, \*\*kwargs)[source]

Uses attention mechanisms for dynamic filtering based on various attention schemes.

adaptive_filtering(desired_signal, mu=0.01, filter_order=4)[source]

Apply an adaptive filter to the signal using the Least Mean Squares (LMS) algorithm.

The LMS adaptive filter is useful for real-time signal processing tasks where the characteristics of the signal may change over time. The filter coefficients are updated iteratively to minimize the error between the desired signal and the filter output.

Parameters:
  • desired_signal (numpy.ndarray) – The desired signal that the filter output should approximate.

  • mu (float, optional) – The step size or learning rate for the LMS algorithm. Default is 0.01.

  • filter_order (int, optional) – The order of the adaptive filter. Default is 4.

Returns:

filtered_signal – The filtered signal after applying the LMS adaptive filter.

Return type:

numpy.ndarray

Examples

>>> signal = np.array([1, 2, 3, 4, 5, 6])
>>> desired_signal = np.array([1.1, 1.9, 3.1, 4.1, 5.0, 5.9])
>>> af = AdvancedSignalFiltering(signal)
>>> filtered_signal = af.adaptive_filtering(desired_signal, mu=0.01, filter_order=4)
>>> print(filtered_signal)
attention_based_filter(attention_type='uniform', custom_weights=None, size=5, **kwargs)[source]

Apply an attention-based filter to the signal using predefined or custom attention weights.

Attention-based filtering uses an attention mechanism to dynamically adjust the influence of different parts of the signal. This method supports various types of attention mechanisms, including uniform, linear, gaussian, and exponential, as well as custom-defined attention weights.

Parameters:
  • attention_type (str) – The type of attention weights to use (‘uniform’, ‘linear’, ‘gaussian’, ‘exponential’, ‘custom’).

  • custom_weights (numpy.ndarray, optional) – The custom attention weights to use if ‘custom’ is selected.

  • size (int, optional) – The size of the attention window for predefined attention weights. Default is 5.

  • **kwargs (dict, optional) – Additional parameters for specific attention types (e.g., ‘ascending’ for linear, ‘sigma’ for gaussian, ‘base’ for exponential).

Returns:

filtered_signal – The filtered signal after applying the attention-based filter.

Return type:

numpy.ndarray

Examples

>>> signal = np.array([1, 2, 3, 4, 5, 6])
>>> af = AdvancedSignalFiltering(signal)
>>> filtered_signal = af.attention_based_filter(attention_type='gaussian', size=5, sigma=1.0)
>>> print(filtered_signal)
convolution_based_filter(kernel_type='smoothing', custom_kernel=None, kernel_size=3)[source]

Apply a convolution-based filter to the signal using predefined or custom kernels.

Convolution-based filtering is a powerful technique that applies a kernel to the signal to achieve various effects such as smoothing, sharpening, and edge detection. This method allows for the use of standard convolutional kernels or custom kernels provided by the user.

Parameters:
  • kernel_type (str) – The type of kernel to use (‘smoothing’, ‘sharpening’, ‘edge_detection’, ‘custom’).

  • custom_kernel (numpy.ndarray, optional) – The custom kernel to use if ‘custom’ is selected. This must be provided by the user.

  • kernel_size (int, optional) – The size of the kernel for predefined kernels. Default is 3.

Returns:

filtered_signal – The filtered signal after convolution.

Return type:

numpy.ndarray

Examples

>>> signal = np.array([1, 2, 3, 4, 5, 6])
>>> af = AdvancedSignalFiltering(signal)
>>> filtered_signal = af.convolution_based_filter(kernel_type='sharpening')
>>> print(filtered_signal)
ensemble_filtering(filters, method='mean', weights=None, num_iterations=10, learning_rate=0.01)[source]

Apply ensemble filtering by combining the results of multiple filters using various ensemble techniques.

This method aggregates the results of different filtering techniques to achieve a more robust and reliable filtered signal. Ensemble methods like mean, weighted mean, bagging, and boosting are implemented to leverage the strengths of multiple filters.

Parameters:
  • filters (list of callable) – A list of filtering functions to apply. Each function should return a filtered signal.

  • method (str) – The ensemble method to use (‘mean’, ‘weighted_mean’, ‘bagging’, ‘boosting’).

  • weights (list of float, optional) – Weights for weighted mean if ‘weighted_mean’ is chosen. Should match the number of filters.

  • num_iterations (int, optional) – Number of iterations for bagging/boosting. Default is 10.

  • learning_rate (float, optional) – Learning rate for boosting. Default is 0.01.

Returns:

filtered_signal – The ensemble filtered signal.

Return type:

numpy.ndarray

Examples

>>> signal = np.array([1, 2, 3, 4, 5, 6])
>>> af = AdvancedSignalFiltering(signal)
>>> filters = [af.kalman_filter, af.kalman_filter]  # Example with the same filter twice
>>> filtered_signal = af.ensemble_filtering(filters, method='mean')
>>> print(filtered_signal)
gradient_descent_filter(target=None, learning_rate=0.01, iterations=100)[source]

Apply a gradient descent adaptive filter to the signal.

This method uses gradient descent to iteratively minimize the difference between the signal and a target signal. It is useful in scenarios where the signal needs to be progressively adjusted to match a desired outcome. If no target is provided, the filter will adapt towards zero (denoising).

Adaptive Filtering: This is a true adaptive filter that adjusts the signal iteratively based on the gradient of the error between the filtered signal and the target. The learning rate controls how quickly the filter adapts, and the number of iterations determines convergence.

Parameters:
  • target (numpy.ndarray, optional) – The target signal to adapt to. If None, adapts towards zero (removes DC offset and trends). Must have the same length as the input signal.

  • learning_rate (float, default=0.01) – Learning rate for the gradient descent, controlling how much the signal is adjusted at each step. Typical range: 0.001 - 0.5 - Lower values (0.001-0.01): Slower convergence, more stable - Higher values (0.1-0.5): Faster convergence, may oscillate

  • iterations (int, default=100) – Number of iterations for the gradient descent process. More iterations = better convergence to target, but slower processing

Returns:

filtered_signal – The filtered signal after applying gradient descent adaptive filtering.

Return type:

numpy.ndarray

Raises:

ValueError – If target is provided but has different length than signal

Examples

Example 1: Adaptive filtering towards a target (basic)

>>> import numpy as np
>>> from vitalDSP.filtering.advanced_signal_filtering import AdvancedSignalFiltering
>>>
>>> # Create noisy ECG-like signal
>>> t = np.linspace(0, 2, 200)
>>> clean_signal = np.sin(2 * np.pi * 1.2 * t)  # Simulated ECG
>>> noisy_signal = clean_signal + 0.3 * np.random.randn(len(t))  # Add noise
>>>
>>> # Apply adaptive filtering
>>> af = AdvancedSignalFiltering(noisy_signal)
>>> filtered = af.gradient_descent_filter(target=clean_signal, learning_rate=0.1, iterations=100)
>>>
>>> print(f"Original signal std: {np.std(noisy_signal):.3f}")
>>> print(f"Filtered signal std: {np.std(filtered):.3f}")
>>> print(f"MSE reduction: {np.mean((noisy_signal - clean_signal)**2) / np.mean((filtered - clean_signal)**2):.2f}x")

Example 2: Denoising without target (remove DC and trends)

>>> # Signal with DC offset and noise
>>> signal = np.sin(2 * np.pi * 0.5 * np.linspace(0, 4, 200)) + 2.0  # DC offset = 2.0
>>> signal += 0.2 * np.random.randn(len(signal))  # Add noise
>>>
>>> af = AdvancedSignalFiltering(signal)
>>> denoised = af.gradient_descent_filter(learning_rate=0.05, iterations=200)
>>>
>>> print(f"Original mean (DC): {np.mean(signal):.3f}")
>>> print(f"Filtered mean (DC removed): {np.mean(denoised):.3f}")

Example 3: Real-time PPG signal processing

>>> # Simulated PPG signal with baseline wander
>>> t = np.linspace(0, 10, 1000)
>>> ppg_clean = np.sin(2 * np.pi * 1.0 * t)  # Heart rate ~60 BPM
>>> baseline_wander = 0.3 * np.sin(2 * np.pi * 0.1 * t)  # Slow drift
>>> ppg_noisy = ppg_clean + baseline_wander + 0.1 * np.random.randn(len(t))
>>>
>>> # Remove baseline using adaptive filter
>>> af = AdvancedSignalFiltering(ppg_noisy)
>>> ppg_filtered = af.gradient_descent_filter(
...     target=ppg_clean,
...     learning_rate=0.08,
...     iterations=150
... )
>>>
>>> # Calculate improvement
>>> snr_before = 10 * np.log10(np.var(ppg_clean) / np.var(ppg_noisy - ppg_clean))
>>> snr_after = 10 * np.log10(np.var(ppg_clean) / np.var(ppg_filtered - ppg_clean))
>>> print(f"SNR improvement: {snr_after - snr_before:.2f} dB")

Example 4: Respiratory signal adaptive filtering

>>> # Respiratory signal from PPG/ECG amplitude modulation
>>> t = np.linspace(0, 30, 3000)  # 30 seconds at 100 Hz
>>> respiratory = 0.2 * np.sin(2 * np.pi * 0.25 * t)  # 15 breaths/min
>>> noise = 0.1 * np.random.randn(len(t))
>>> signal = respiratory + noise
>>>
>>> # Adaptive filtering to extract respiratory pattern
>>> af = AdvancedSignalFiltering(signal)
>>> filtered = af.gradient_descent_filter(
...     target=respiratory,
...     learning_rate=0.05,
...     iterations=100
... )
>>>
>>> print(f"Respiratory rate estimation error: {abs(np.mean(filtered) - np.mean(respiratory)):.4f}")

Notes

  • The gradient descent filter is a simple adaptive filter that works well for gradual changes

  • For rapid signal changes, use lower learning rates and more iterations

  • This implementation uses the sign of the gradient (sign function) which makes it robust to outliers

  • For better results with complex signals, consider using Kalman filtering or ensemble methods

  • The filter converges to the target signal as iterations increase

  • If target is None, the filter will remove trends and DC offset (adapt towards zero)

See also

kalman_filter

For optimal estimation with known noise characteristics

ensemble_filtering

For combining multiple adaptive filters

optimization_based_filtering

For custom loss functions

kalman_filter(signal=None, R=1, Q=1)[source]

Apply a Kalman filter to the signal.

The Kalman filter provides real-time signal estimation and noise reduction, making it suitable for scenarios where signal measurements are noisy and need continuous estimation.

Parameters:
  • signal (numpy.ndarray, optional) – The signal to be filtered. If not provided, self.signal will be used.

  • R (float) – Measurement noise covariance. A lower value assumes more trust in the measurements.

  • Q (float) – Process noise covariance. A lower value assumes less uncertainty in the model.

Returns:

filtered_signal – The filtered signal.

Return type:

numpy.ndarray

Examples

>>> signal = np.array([1, 2, 3, 2, 1, 2, 3, 4, 3, 2])
>>> af = AdvancedSignalFiltering(signal)
>>> filtered_signal = af.kalman_filter(R=0.1, Q=0.01)
>>> print(filtered_signal)
[1.  1.75 2.5  2.12 1.56 1.94 2.7  3.5  3.12 2.56]
optimization_based_filtering(target, loss_type='mse', custom_loss_func=None, initial_guess=0, learning_rate=0.01, iterations=100)[source]

Apply an optimization-based filter using a custom or predefined loss function.

This method uses optimization techniques to filter the signal by minimizing a specified loss function. It can be customized to use different types of loss functions, such as mean squared error (MSE), mean absolute error (MAE), Huber loss, and more, depending on the specific needs of the signal processing task.

Parameters:
  • target (numpy.ndarray) – The target signal to compare against. The filter will attempt to make the signal closer to this target.

  • loss_type (str) – The type of loss function to use (‘mse’, ‘mae’, ‘huber’, ‘smooth_l1’, ‘log_cosh’, ‘quantile’, or ‘custom’).

  • custom_loss_func (callable, optional) – A custom loss function provided by the user if ‘custom’ is selected for loss_type.

  • initial_guess (float) – Initial guess for the filter parameter, used as a starting point in optimization.

  • learning_rate (float) – Learning rate for the optimization, determining the step size of each iteration.

  • iterations (int) – Number of iterations for the optimization process.

Returns:

filtered_signal – The filtered signal after optimization.

Return type:

numpy.ndarray

Examples

>>> signal = np.array([1, 2, 3, 2, 1, 2, 3, 4, 3, 2])
>>> target = np.array([1, 1.5, 2.5, 3, 2.5, 2, 2.5, 3.5, 3, 2.5])
>>> af = AdvancedSignalFiltering(signal)
>>> filtered_signal = af.optimization_based_filtering(target, loss_type='mse', initial_guess=2, learning_rate=0.01, iterations=100)
>>> print(filtered_signal)