Source code for vitalDSP.transforms.vital_transformation

"""
Signal Transforms Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological
signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0

Key Features:
- Object-oriented design with comprehensive classes
- Multiple processing methods and functions
- NumPy integration for numerical computations
- SciPy integration for advanced signal processing
- Performance optimization

Examples:
--------
Basic usage:
    >>> import numpy as np
    >>> from vitalDSP.transforms.vital_transformation import VitalTransformation
    >>> signal = np.random.randn(1000)
    >>> processor = VitalTransformation(signal)
    >>> result = processor.process()
    >>> print(f'Processing result: {result}')
"""

import numpy as np
from scipy import signal
from vitalDSP.filtering.artifact_removal import ArtifactRemoval
from vitalDSP.filtering.advanced_signal_filtering import AdvancedSignalFiltering
from vitalDSP.filtering.signal_filtering import BandpassFilter, SignalFiltering


[docs] class VitalTransformation: """ A class to perform comprehensive signal processing on ECG and PPG signals using advanced filtering and artifact removal techniques. This class provides a series of transformations aimed at enhancing the quality of ECG and PPG signals by eliminating noise, detrending, normalizing, and enhancing critical points for easier detection. Each transformation step is modular and customizable. Parameters ---------- signal : numpy.ndarray The input ECG or PPG signal to be transformed. fs : int, optional Sampling frequency of the signal. Default is 256 Hz. signal_type : str, optional Type of signal. Options: 'ECG', 'PPG', 'EEG'. Default is 'ECG'. Methods ------- apply_transformations(options=None, method_order=None) Apply a sequence of transformations to process the signal, with options for customization. apply_artifact_removal(method='baseline_correction', options=None) Apply artifact removal using various techniques such as mean subtraction, baseline correction, or wavelet denoising. apply_bandpass_filter(options=None) Apply bandpass filtering with customizable filter type, cutoff frequencies, and order. apply_detrending(options=None) Detrend the signal with customizable options. apply_normalization(options=None) Normalize the signal to a specified range. apply_smoothing(options=None) Smooth the signal using different methods such as moving average or Gaussian. apply_enhancement(options=None) Enhance critical points in the signal using methods such as squaring or absolute value. apply_advanced_filtering(options=None) Apply advanced signal filtering techniques using the AdvancedSignalFiltering class. Examples -------- >>> import numpy as np >>> from vitalDSP.transforms.vital_transformation import VitalTransformation >>> >>> # Example 1: Basic ECG signal transformation >>> ecg_signal = np.random.randn(1000) # Simulated ECG signal >>> transformer = VitalTransformation(ecg_signal, fs=256, signal_type="ECG") >>> transformed_signal = transformer.apply_transformations() >>> print(f"Transformed signal shape: {transformed_signal.shape}") >>> >>> # Example 2: PPG signal with custom options >>> ppg_signal = np.random.randn(2000) # Simulated PPG signal >>> transformer_ppg = VitalTransformation(ppg_signal, fs=128, signal_type="PPG") >>> options = { ... 'artifact_removal': 'baseline_correction', ... 'artifact_removal_options': {'cutoff': 0.5}, ... 'bandpass_filter': {'lowcut': 0.5, 'highcut': 8.0, 'filter_order': 4, 'filter_type': 'butter'}, ... 'detrending': {'detrend_type': 'linear'}, ... 'normalization': {'normalization_range': (0, 1)} ... } >>> transformed_ppg = transformer_ppg.apply_transformations(options=options) >>> print(f"Transformed PPG shape: {transformed_ppg.shape}") >>> >>> # Example 3: Individual transformation methods >>> transformer = VitalTransformation(ecg_signal, fs=256, signal_type="ECG") >>> detrended = transformer.apply_detrending(method='linear') >>> normalized = transformer.apply_normalization(normalization_range=(0, 1)) >>> filtered = transformer.apply_bandpass_filter(options={'lowcut': 0.5, 'highcut': 40, 'order': 4}) >>> print(f"Detrended signal range: [{detrended.min():.3f}, {detrended.max():.3f}]") >>> print(f"Normalized signal range: [{normalized.min():.3f}, {normalized.max():.3f}]") """ def __init__(self, signal, fs=256, signal_type="ECG"): self.signal = signal self.fs = fs self.signal_type = signal_type
[docs] def apply_transformations(self, options=None, method_order=None): """ Apply a sequence of transformations to process the signal. Parameters ---------- options : dict, optional A dictionary of options to customize each step in the transformation process. Default options will be used if not provided. method_order : list, optional A list specifying the order in which to apply the methods. Default is the order defined in the class. Returns ------- transformed_signal : numpy.ndarray The fully transformed signal. Examples -------- >>> signal = np.random.randn(1000) # Example signal >>> options = { >>> 'artifact_removal': 'baseline_correction', >>> 'artifact_removal_options': {'cutoff': 0.5}, >>> 'bandpass_filter': {'lowcut': 0.5, 'highcut': 30, 'filter_order': 4, 'filter_type': 'butter'}, >>> 'detrending': {'detrend_type': 'linear'}, >>> 'normalization': {'normalization_range': (0, 1)}, >>> 'smoothing': {'smoothing_method': 'moving_average', 'window_size': 5, 'iterations': 2}, >>> 'enhancement': {'enhance_method': 'square'}, >>> 'advanced_filtering': {'filter_type': 'kalman_filter', 'options': {'R': 0.1, 'Q': 0.01}}, >>> } >>> method_order = ['artifact_removal', 'bandpass_filter', 'detrending', 'normalization', 'smoothing', 'enhancement', 'advanced_filtering'] >>> transformer = ECG_PPG_Transformation(signal, fs=256, signal_type='ECG') >>> transformed_signal = transformer.apply_transformations(options, method_order) >>> print(transformed_signal) """ if options is None: options = { "artifact_removal": "baseline_correction", "artifact_removal_options": {"cutoff": 0.5}, "bandpass_filter": { "lowcut": 0.5, "highcut": 30, "filter_order": 4, "filter_type": "butter", }, "detrending": {"detrend_type": "linear"}, "normalization": {"normalization_range": (0, 1)}, "smoothing": { "smoothing_method": "moving_average", "window_size": 5, "iterations": 2, }, "enhancement": {"enhance_method": "square"}, "advanced_filtering": { "filter_type": "kalman_filter", "options": {"R": 0.1, "Q": 0.01}, }, } if method_order is None: method_order = [ "artifact_removal", "bandpass_filter", "detrending", "normalization", "smoothing", "enhancement", "advanced_filtering", ] # Filter out options that aren't actual methods method_order = [m for m in method_order if not m.endswith("_options")] for method in method_order: if method == "artifact_removal": self.apply_artifact_removal( options.get("artifact_removal", "baseline_correction"), options.get("artifact_removal_options", {}), ) elif method == "bandpass_filter": self.apply_bandpass_filter(options.get("bandpass_filter", {})) elif method == "detrending": self.apply_detrending(options.get("detrending", {})) elif method == "normalization": self.apply_normalization(options.get("normalization", {})) elif method == "smoothing": self.apply_smoothing(options.get("smoothing", {})) elif method == "enhancement": self.apply_enhancement(options.get("enhancement", {})) elif method == "advanced_filtering": self.apply_advanced_filtering(options.get("advanced_filtering", {})) else: raise ValueError(f"Unknown method: {method}") return self.signal
[docs] def apply_artifact_removal(self, method="baseline_correction", options=None): """ Apply artifact removal to the signal. Parameters ---------- method : str, optional The artifact removal method to use. Default is 'baseline_correction'. options : dict, optional Additional options specific to the selected method. Returns ------- None Examples -------- >>> signal = np.random.randn(1000) # Example signal >>> transformer = ECG_PPG_Transformation(signal, fs=256, signal_type='ECG') >>> transformer.apply_artifact_removal(method='baseline_correction', options={'cutoff': 0.5}) >>> print(transformer.signal) """ if options is None: options = {"lowcut": 0.5, "highcut": 30, "filter_order": 4, "filter_type": "butter"} artifact_removal = ArtifactRemoval(self.signal) if method == "mean_subtraction": self.signal = artifact_removal.mean_subtraction() elif method == "baseline_correction": self.signal = artifact_removal.baseline_correction( cutoff=options.get("cutoff", 0.5), fs=self.fs ) elif method == "median_filter_removal": self.signal = artifact_removal.median_filter_removal( kernel_size=options.get("kernel_size", 3) ) elif method == "wavelet_denoising": self.signal = artifact_removal.wavelet_denoising( wavelet_type=options.get("wavelet_type", "db"), level=options.get("level", 1), order=options.get("order", 4), ) elif method == "adaptive_filtering": reference_signal = options.get("reference_signal", None) if reference_signal is None: raise ValueError("Reference signal is required for adaptive filtering.") self.signal = artifact_removal.adaptive_filtering( reference_signal, learning_rate=options.get("learning_rate", 0.01), num_iterations=options.get("num_iterations", 100), ) elif method == "notch_filter": self.signal = artifact_removal.notch_filter( freq=options.get("freq", 50), fs=self.fs, Q=options.get("Q", 30) ) elif method == "pca_artifact_removal": self.signal = artifact_removal.pca_artifact_removal( num_components=options.get("num_components", 1) ) elif method == "ica_artifact_removal": self.signal = artifact_removal.ica_artifact_removal( num_components=options.get("num_components", 1), max_iterations=options.get("max_iterations", 1000), tol=options.get("tol", 1e-5), seed=options.get("seed", 23), ) else: raise ValueError(f"Unknown artifact removal method: {method}")
[docs] def apply_bandpass_filter(self, options=None): """ Apply bandpass filtering with customizable filter type, cutoff frequencies, and order. Parameters ---------- options : dict, optional A dictionary of filter options. Returns ------- None Examples -------- >>> signal = np.random.randn(1000) # Example signal >>> transformer = ECG_PPG_Transformation(signal, fs=256, signal_type='ECG') >>> transformer.apply_bandpass_filter(options={'lowcut': 0.5, 'highcut': 30, 'filter_order': 4, 'filter_type': 'butter'}) >>> print(transformer.signal) """ # Ensure the signal length is long enough for the filter if options is None: options = { "lowcut": 0.5, "highcut": 30, "filter_order": 4, "filter_type": "butter", } if len(self.signal) < 3 * options.get("filter_order", 4): raise ValueError("Signal too short for the specified filter order.") lowcut = options.get("lowcut", 0.2) highcut = options.get("highcut", 3.0) filter_order = options.get("filter_order", 4) filter_type = options.get("filter_type", "butter") bandpass_filter = BandpassFilter(band_type=filter_type, fs=self.fs) # Check signal length and adjust filter order if necessary padlen = 3 * max( len(bandpass_filter.signal_bypass(lowcut, filter_order, btype="low")[0]), len(bandpass_filter.signal_bypass(lowcut, filter_order, btype="low")[1]), ) if len(self.signal) <= padlen: filter_order = max( 1, int(len(self.signal) / 3) ) # Reduce filter order dynamically # Apply bandpass filter self.signal = bandpass_filter.signal_highpass_filter( data=self.signal, cutoff=lowcut, order=filter_order, a_pass=options.get("a_pass", 3), rp=options.get("rp", 4), rs=options.get("rs", 40), ) self.signal = bandpass_filter.signal_lowpass_filter( data=self.signal, cutoff=highcut, order=filter_order, a_pass=options.get("a_pass", 3), rp=options.get("rp", 4), rs=options.get("rs", 40), )
[docs] def apply_detrending(self, options=None): """ Detrend the signal with customizable options using pure NumPy implementation. This method removes trends from the signal, which is crucial for physiological signal processing. Detrending eliminates baseline wander and drift that can interfere with feature extraction and analysis. Parameters ---------- options : dict, optional A dictionary of options for detrending. Default options: { 'detrend_type': 'linear', # 'linear', 'constant', or 'polynomial' 'polynomial_order': 3, # Used when detrend_type='polynomial' 'break_points': None, # For piecewise detrending } Returns ------- None Modifies self.signal in place. Notes ----- Detrending Types: - **linear**: Removes linear trend using least squares fitting - **constant**: Removes DC component (mean value) - **polynomial**: Removes polynomial trend of specified order The implementation uses pure NumPy for better performance and removes the scipy dependency. Clinical Applications: - ECG: Removes baseline wander due to respiration or movement - PPG: Eliminates DC shifts and slow baseline drift - General: Ensures zero-mean signal for accurate feature extraction Examples -------- >>> import numpy as np >>> from vitalDSP.transforms.vital_transformation import VitalTransformation >>> >>> # Example 1: Linear detrending (default) >>> signal_with_drift = np.random.randn(1000) + np.linspace(0, 10, 1000) >>> transformer = VitalTransformation(signal_with_drift, fs=256, signal_type='ECG') >>> transformer.apply_detrending(options={'detrend_type': 'linear'}) >>> print(f"Signal mean after detrending: {np.mean(transformer.signal):.6f}") >>> >>> # Example 2: Constant (mean) detrending >>> ecg_signal = np.random.randn(1000) + 5.0 # Signal with DC offset >>> transformer2 = VitalTransformation(ecg_signal, fs=256, signal_type='ECG') >>> transformer2.apply_detrending(options={'detrend_type': 'constant'}) >>> print(f"Signal mean after constant detrend: {np.mean(transformer2.signal):.6f}") >>> >>> # Example 3: Polynomial detrending >>> ppg_signal = np.random.randn(1000) + 0.001 * np.arange(1000)**2 >>> transformer3 = VitalTransformation(ppg_signal, fs=128, signal_type='PPG') >>> transformer3.apply_detrending(options={ ... 'detrend_type': 'polynomial', ... 'polynomial_order': 2 ... }) >>> print(f"Detrended PPG signal length: {len(transformer3.signal)}") """ if options is None: options = { "detrend_type": "linear", "polynomial_order": 3, "break_points": None, } detrend_type = options.get("detrend_type", "linear") # Get signal length N = len(self.signal) if detrend_type == "constant": # Remove mean (DC component) self.signal = self.signal - np.mean(self.signal) elif detrend_type == "linear": # Remove linear trend using least squares # Create time vector x = np.arange(N) # Fit linear model: y = mx + b # Using normal equations: [m, b] = (X^T X)^-1 X^T y # where X = [x, ones] X = np.vstack([x, np.ones(N)]).T # Solve least squares coeffs = np.linalg.lstsq(X, self.signal, rcond=None)[0] # Compute and subtract trend trend = coeffs[0] * x + coeffs[1] self.signal = self.signal - trend elif detrend_type == "polynomial": # Remove polynomial trend polynomial_order = options.get("polynomial_order", 3) # Create time vector x = np.arange(N) # Fit polynomial coeffs = np.polyfit(x, self.signal, polynomial_order) # Compute and subtract polynomial trend trend = np.polyval(coeffs, x) self.signal = self.signal - trend else: raise ValueError( f"Unknown detrending method: {detrend_type}. " f"Supported methods: 'constant', 'linear', 'polynomial'" )
[docs] def apply_normalization(self, options=None): """ Normalize the signal to a specified range. Parameters ---------- options : dict, optional A dictionary of normalization options. Returns ------- None Examples -------- >>> signal = np.random.randn(1000) # Example signal >>> transformer = ECG_PPG_Transformation(signal, fs=256, signal_type='ECG') >>> transformer.apply_normalization(options={'normalization_range': (0, 1)}) >>> print(transformer.signal) """ if options is None: options = {"normalization_range": (0, 1)} normalization_range = options.get("normalization_range", (0, 1)) min_val = np.min(self.signal) max_val = np.max(self.signal) # Avoid divide by zero warning if max_val - min_val > 0: self.signal = (self.signal - min_val) / (max_val - min_val) * ( normalization_range[1] - normalization_range[0] ) + normalization_range[0] else: # If signal is constant, set to middle of range self.signal = np.full_like( self.signal, (normalization_range[0] + normalization_range[1]) / 2 )
[docs] def apply_smoothing(self, options=None): """ Smooth the signal using different methods such as moving average or Gaussian. Parameters ---------- options : dict, optional Options to customize the smoothing process. Returns ------- None Examples -------- >>> signal = np.random.randn(1000) # Example signal >>> transformer = ECG_PPG_Transformation(signal, fs=256, signal_type='ECG') >>> transformer.apply_smoothing(options={'smoothing_method': 'moving_average', 'window_size': 5, 'iterations': 2}) >>> print(transformer.signal) """ if options is None: options = { "smoothing_method": "moving_average", "window_size": 5, "iterations": 2, } smoothing_method = options.get("smoothing_method", "moving_average") signal_filtering = SignalFiltering(self.signal) if smoothing_method == "moving_average": window_size = options.get("window_size", 5) iterations = options.get("iterations", 1) self.signal = signal_filtering.moving_average( window_size, iterations=iterations ) elif smoothing_method == "gaussian": sigma = options.get("sigma", 1.0) iterations = options.get("iterations", 1) self.signal = signal_filtering.gaussian(sigma=sigma, iterations=iterations) else: raise ValueError(f"Unknown smoothing method: {smoothing_method}")
[docs] def apply_enhancement(self, options=None): """ Enhance critical points in the signal using methods such as squaring or absolute value. Parameters ---------- options : dict, optional Options to customize the enhancement process. Returns ------- None Examples -------- >>> signal = np.random.randn(1000) # Example signal >>> transformer = ECG_PPG_Transformation(signal, fs=256, signal_type='ECG') >>> transformer.apply_enhancement(options={'enhance_method': 'square'}) >>> print(transformer.signal) """ if options is None: options = {"enhance_method": "square"} enhance_method = options.get("enhance_method", "square") if enhance_method == "square": self.signal = np.square(self.signal) elif enhance_method == "abs": self.signal = np.abs(self.signal) elif enhance_method == "gradient": # Use the gradient to enhance rapid changes in the signal self.signal = np.gradient(self.signal) else: raise ValueError(f"Unknown enhancement method: {enhance_method}")
[docs] def apply_advanced_filtering(self, options=None): """ Apply advanced signal filtering using the AdvancedSignalFiltering class. Parameters ---------- options : dict, optional Options to customize the advanced filtering process. Returns ------- None Examples -------- >>> signal = np.random.randn(1000) # Example signal >>> transformer = ECG_PPG_Transformation(signal, fs=256, signal_type='ECG') >>> transformer.apply_advanced_filtering(options={'filter_type': 'kalman_filter', 'R': 0.1, 'Q': 0.01}) >>> print(transformer.signal) """ if options is None: options = {"filter_type": "kalman_filter", "R": 0.1, "Q": 0.01} filter_type = options.get("filter_type", "kalman_filter") advanced_filtering = AdvancedSignalFiltering( self.signal ) # Initialize with the signal if filter_type == "kalman_filter": R = options.get("R", 0.1) Q = options.get("Q", 0.01) self.signal = advanced_filtering.kalman_filter( R=R, Q=Q ) # Call method from instance elif filter_type == "optimization_based_filtering": target_signal = options.get("target_signal", self.signal) loss_type = options.get("loss_type", "mse") learning_rate = options.get("learning_rate", 0.01) iterations = options.get("iterations", 100) self.signal = advanced_filtering.optimization_based_filtering( target=target_signal, loss_type=loss_type, learning_rate=learning_rate, iterations=iterations, ) elif filter_type == "gradient_descent_filter": target_signal = options.get("target_signal", self.signal) learning_rate = options.get("learning_rate", 0.01) iterations = options.get("iterations", 100) self.signal = advanced_filtering.gradient_descent_filter( target=target_signal, learning_rate=learning_rate, iterations=iterations ) elif filter_type == "ensemble_filtering": filters = options.get("filters", [advanced_filtering.kalman_filter]) method = options.get("method", "mean") weights = options.get("weights", None) num_iterations = options.get("num_iterations", 10) learning_rate = options.get("learning_rate", 0.01) self.signal = advanced_filtering.ensemble_filtering( filters=filters, method=method, weights=weights, num_iterations=num_iterations, learning_rate=learning_rate, ) elif filter_type == "convolution_based_filter": kernel_type = options.get("kernel_type", "smoothing") kernel_size = options.get("kernel_size", 3) self.signal = advanced_filtering.convolution_based_filter( kernel_type=kernel_type, kernel_size=kernel_size ) elif filter_type == "attention_based_filter": attention_type = options.get("attention_type", "uniform") size = options.get("size", 5) self.signal = advanced_filtering.attention_based_filter( attention_type=attention_type, size=size, **options.get("kwargs", {}) ) elif filter_type == "adaptive_filtering": desired_signal = options.get("desired_signal", self.signal) mu = options.get("mu", 0.5) filter_order = options.get("filter_order", 4) self.signal = advanced_filtering.adaptive_filtering( desired_signal=desired_signal, mu=mu, filter_order=filter_order ) else: raise ValueError(f"Unknown advanced filtering method: {filter_type}")