Source code for vitalDSP.preprocess.preprocess_operations

"""
Preprocessing Operations Module for Physiological Signal Processing

This module provides comprehensive preprocessing capabilities for physiological
signals including ECG, PPG, EEG, and other vital signs. It implements various
filtering techniques, noise reduction methods, and signal conditioning operations
to prepare signals for analysis.

Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0

Key Features:
- Multiple filtering types (bandpass, Butterworth, Chebyshev, elliptic)
- Advanced noise reduction methods (wavelet, Savitzky-Golay, median, Gaussian)
- Signal-specific preprocessing configurations
- Respiratory signal optimization
- Comprehensive parameter configuration
- Integration with signal filtering modules

Examples:
---------
Basic ECG preprocessing:
    >>> import numpy as np
    >>> from vitalDSP.preprocess.preprocess_operations import PreprocessConfig, preprocess_signal
    >>> config = PreprocessConfig(
    ...     filter_type="bandpass",
    ...     lowcut=0.5,
    ...     highcut=40.0,
    ...     noise_reduction_method="wavelet"
    ... )
    >>> processed_signal = preprocess_signal(ecg_signal, fs=250, config=config)

PPG preprocessing with Savitzky-Golay:
    >>> ppg_config = PreprocessConfig(
    ...     filter_type="bandpass",
    ...     lowcut=0.5,
    ...     highcut=8.0,
    ...     noise_reduction_method="savgol",
    ...     window_length=21,
    ...     polyorder=3
    ... )
    >>> processed_ppg = preprocess_signal(ppg_signal, fs=128, config=ppg_config)

Respiratory signal preprocessing:
    >>> resp_config = PreprocessConfig(
    ...     filter_type="bandpass",
    ...     lowcut=0.1,
    ...     highcut=2.0,
    ...     respiratory_mode=True
    ... )
    >>> processed_resp = preprocess_signal(resp_signal, fs=64, config=resp_config)
"""

from vitalDSP.preprocess.noise_reduction import (
    wavelet_denoising,
    savgol_denoising,
    median_denoising,
    gaussian_denoising,
    moving_average_denoising,
)
from vitalDSP.filtering.signal_filtering import SignalFiltering
from scipy.signal import butter, filtfilt, medfilt
import numpy as np


[docs] class PreprocessConfig: """ Configuration class for signal preprocessing, which includes filtering and noise reduction parameters. Attributes ---------- filter_type : str The type of filtering to apply. Options: 'bandpass', 'butterworth', 'chebyshev', 'elliptic', 'ignore'. noise_reduction_method : str The noise reduction method to apply. Options: 'wavelet', 'savgol', 'median', 'gaussian', 'moving_average', 'ignore'. lowcut : float The lower cutoff frequency for filtering. highcut : float The upper cutoff frequency for filtering. order : int The order of the filter. wavelet_name : str The name of the wavelet to use for wavelet-based noise reduction. level : int The level of wavelet decomposition. window_length : int The window length for Savitzky-Golay filtering. polyorder : int The polynomial order for Savitzky-Golay filtering. kernel_size : int The kernel size for median filtering. sigma : float The standard deviation for Gaussian filtering. respiratory_mode: bool Apply the preprocessing function specifically for respiratory signals (e.g., PPG or ECG-derived respiration). repreprocess: bool Re preprocessing function Examples -------- >>> # Basic configuration for ECG preprocessing >>> config = PreprocessConfig( ... filter_type="bandpass", ... noise_reduction_method="wavelet", ... lowcut=0.5, ... highcut=40.0, ... order=4 ... ) >>> >>> # Configuration for PPG preprocessing with respiratory analysis >>> config_ppg = PreprocessConfig( ... filter_type="bandpass", ... noise_reduction_method="savgol", ... lowcut=0.5, ... highcut=8.0, ... respiratory_mode=True, ... window_length=5, ... polyorder=2 ... ) >>> >>> # Configuration for noise reduction only >>> config_denoise = PreprocessConfig( ... filter_type="ignore", ... noise_reduction_method="median", ... kernel_size=5 ... ) """ def __init__( self, filter_type="bandpass", noise_reduction_method="wavelet", lowcut=0.1, highcut=10, order=4, wavelet_name="haar", level=1, window_length=5, polyorder=2, kernel_size=3, sigma=1.0, respiratory_mode=False, repreprocess=False, ): self.filter_type = filter_type self.noise_reduction_method = noise_reduction_method self.lowcut = lowcut self.highcut = highcut self.order = order self.wavelet_name = wavelet_name self.level = level self.window_length = window_length self.polyorder = polyorder self.kernel_size = kernel_size self.sigma = sigma self.respiratory_mode = respiratory_mode self.repreprocess = repreprocess
[docs] def preprocess_signal( signal, sampling_rate, filter_type="bandpass", lowcut=0.1, highcut=4.5, order=4, noise_reduction_method="wavelet", wavelet_name="haar", level=1, window_length=5, polyorder=2, kernel_size=3, sigma=1.0, respiratory_mode=False, repreprocess=False, ): """ Preprocesses the signal by applying bandpass filtering and noise reduction. Parameters ---------- signal : numpy.ndarray The input signal to be preprocessed. sampling_rate : float The sampling rate of the signal in Hz. filter_type : str, optional (default="bandpass") The type of filtering to apply. Options: 'bandpass', 'butterworth', 'chebyshev', 'elliptic', 'ignore'. lowcut : float, optional (default=0.1) The lower cutoff frequency for bandpass filtering. highcut : float, optional (default=4.5) The upper cutoff frequency for bandpass filtering. order : int, optional (default=4) The order of the filter. noise_reduction_method : str, optional (default="wavelet") The noise reduction method to apply. Options: 'wavelet', 'savgol', 'median', 'gaussian', 'moving_average', 'ignore'. wavelet_name : str, optional (default="db") The type of wavelet to use for wavelet denoising. level : int, optional (default=1) The level of wavelet decomposition for wavelet denoising. window_length : int, optional (default=5) The window length for Savitzky-Golay filtering. polyorder : int, optional (default=2) The polynomial order for Savitzky-Golay filtering. kernel_size : int, optional (default=3) The kernel size for median filtering. sigma : float, optional (default=1.0) The standard deviation for Gaussian filtering. respiratory_mode : bool, optional (default=False) If True, apply preprocessing specifically for respiratory signals (e.g., PPG or ECG-derived respiration). Returns ------- preprocessed_signal : numpy.ndarray The preprocessed signal. Examples -------- >>> signal = np.sin(np.linspace(0, 10, 100)) + np.random.normal(0, 0.2, 100) >>> sampling_rate = 1000 >>> preprocessed_signal = preprocess_signal(signal, sampling_rate, filter_type='bandpass', noise_reduction_method='wavelet') >>> print(preprocessed_signal) """ # Ensure signal is in float64 format signal = np.asarray(signal, dtype=np.float64) # Optionally, clip large signal values to avoid overflow signal = np.clip(signal, -1e10, 1e10) # Check if signal is too short for filtering if len(signal) < 30: # Minimum length for reliable filtering import warnings warnings.warn( f"Signal length ({len(signal)}) is too short for reliable filtering. Skipping filtering step." ) return signal # Apply bandpass filtering or other filter types signal_filter = SignalFiltering(signal) if filter_type == "bandpass": filtered_signal = signal_filter.bandpass( lowcut, highcut, sampling_rate, order=order ) elif filter_type == "butterworth": filtered_signal = signal_filter.butterworth( cutoff=highcut, fs=sampling_rate, order=order, btype="low" ) elif filter_type == "chebyshev": filtered_signal = signal_filter.chebyshev( cutoff=highcut, fs=sampling_rate, order=order, btype="low" ) elif filter_type == "elliptic": filtered_signal = signal_filter.elliptic( cutoff=highcut, fs=sampling_rate, order=order, btype="low" ) elif filter_type == "ignore": filtered_signal = signal.copy() else: raise ValueError( "Unsupported filter type. Choose from 'bandpass', 'butterworth', 'chebyshev', or 'elliptic'." ) # Additional preprocessing for respiratory signals (if respiratory_mode is True) if respiratory_mode: filtered_signal = respiratory_filtering( filtered_signal, sampling_rate, lowcut, highcut, order ) # Apply noise reduction if noise_reduction_method == "wavelet": preprocessed_signal = wavelet_denoising( filtered_signal, wavelet_name=wavelet_name, level=level ) elif noise_reduction_method == "savgol": preprocessed_signal = savgol_denoising( filtered_signal, window_length=window_length, polyorder=polyorder ) elif noise_reduction_method == "median": preprocessed_signal = median_denoising(filtered_signal, kernel_size=kernel_size) elif noise_reduction_method == "gaussian": preprocessed_signal = gaussian_denoising(filtered_signal, sigma=sigma) elif noise_reduction_method == "moving_average": preprocessed_signal = moving_average_denoising( filtered_signal, window_size=window_length ) elif noise_reduction_method == "ignore": preprocessed_signal = filtered_signal else: raise ValueError( "Unsupported noise reduction method. Choose from 'wavelet', 'savgol', 'median', 'gaussian', or 'moving_average'." ) return preprocessed_signal
[docs] def respiratory_filtering(signal, sampling_rate, lowcut=0.1, highcut=0.5, order=4): """ Filters the signal specifically for respiratory-related frequency bands. Parameters ---------- signal : numpy.ndarray The input signal to be filtered. sampling_rate : float The sampling rate of the signal in Hz. lowcut : float, optional (default=0.1) The lower cutoff frequency for respiratory filtering. highcut : float, optional (default=0.5) The upper cutoff frequency for respiratory filtering. order : int, optional (default=4) The order of the filter. Returns ------- filtered_signal : numpy.ndarray The filtered respiratory signal. Examples -------- >>> signal = np.sin(np.linspace(0, 10, 100)) + np.random.normal(0, 0.2, 100) >>> sampling_rate = 1000 >>> filtered_signal = respiratory_filtering(signal, sampling_rate) >>> print(filtered_signal) """ # Validate input parameters if sampling_rate <= 0: raise ValueError(f"Sampling rate must be positive, got {sampling_rate}") if lowcut >= highcut: raise ValueError(f"lowcut ({lowcut}) must be less than highcut ({highcut})") if highcut >= sampling_rate / 2: raise ValueError( f"highcut ({highcut}) must be less than Nyquist frequency ({sampling_rate/2})" ) # Calculate normalized frequencies correctly nyquist = sampling_rate / 2.0 low_norm = lowcut / nyquist high_norm = highcut / nyquist # Ensure frequencies are within valid range [0, 1] low_norm = max(0.001, min(0.999, low_norm)) # Avoid 0 and 1 high_norm = max(0.001, min(0.999, high_norm)) # Avoid 0 and 1 if low_norm >= high_norm: # If frequencies are too close, adjust them center = (low_norm + high_norm) / 2 bandwidth = 0.1 # 10% bandwidth low_norm = max(0.001, center - bandwidth / 2) high_norm = min(0.999, center + bandwidth / 2) try: b, a = butter(order, [low_norm, high_norm], btype="band") filtered_signal = filtfilt(b, a, signal) # Check for extreme values (indicates filter instability) if np.any(np.isnan(filtered_signal)) or np.any(np.isinf(filtered_signal)): raise ValueError( "Filter produced NaN or Inf values - filter may be unstable" ) if np.max(np.abs(filtered_signal)) > 1e6: # Filter is unstable, try with lower order if order > 1: return respiratory_filtering( signal, sampling_rate, lowcut, highcut, order - 1 ) else: std_val = np.std(signal) if std_val == 0: return signal - np.mean(signal) return (signal - np.mean(signal)) / std_val return filtered_signal except Exception as e: std_val = np.std(signal) if std_val == 0: return signal - np.mean(signal) return (signal - np.mean(signal)) / std_val
[docs] def estimate_baseline(signal, fs, method="moving_average", window_size=5): """ Estimate a stable baseline for a physiological signal. Parameters ---------- signal : np.ndarray The physiological signal (e.g., PPG or ECG). fs : float Sampling frequency of the signal in Hz. method : str, optional Method for baseline estimation. Options are 'moving_average', 'low_pass', 'polynomial_fit', 'median_filter', and 'wavelet'. Default is 'moving_average'. window_size : int, optional Window size in seconds for moving average or median filter. Default is 5. Returns ------- baseline : np.ndarray The estimated baseline of the signal. """ if method == "moving_average": baseline = np.convolve( signal, np.ones(int(fs * window_size)) / (fs * window_size), mode="same" ) elif method == "low_pass": cutoff = 0.5 # Adjust cutoff frequency as needed (0.5 Hz for baseline trends) b, a = butter(2, cutoff / (0.5 * fs), btype="low") baseline = filtfilt(b, a, signal) elif method == "polynomial_fit": x = np.arange(len(signal)) poly_coeff = np.polyfit(x, signal, 2) # 2nd-degree polynomial baseline = np.polyval(poly_coeff, x) elif method == "median_filter": kernel_size = int(fs * window_size) if kernel_size % 2 == 0: kernel_size += 1 kernel_size = max(3, kernel_size) baseline = medfilt(signal, kernel_size=kernel_size) else: raise ValueError(f"Unsupported baseline estimation method: {method}") return baseline