Source code for vitalDSP.feature_engineering.ppg_light_features

"""
Feature Engineering Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological
signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0

Key Features:
- Object-oriented design with comprehensive classes
- Multiple processing methods and functions
- NumPy integration for numerical computations
- SciPy integration for advanced signal processing
- Feature extraction capabilities

Examples:
--------
Basic usage:
    >>> import numpy as np
    >>> from vitalDSP.feature_engineering.ppg_light_features import PpgLightFeatures
    >>> signal = np.random.randn(1000)
    >>> processor = PpgLightFeatures(signal)
    >>> result = processor.process()
    >>> print(f'Processing result: {result}')
"""

import numpy as np
from vitalDSP.utils.config_utilities.common import find_peaks


[docs] class PPGLightFeatureExtractor: """ A class to extract physiological features from PPG signals based on raw data from infrared (IR) and red light sources. This includes SpO2, Perfusion Index (PI), Respiratory Rate (RR), and Photoplethysmogram Ratio (PPR). Parameters: ----------- ir_signal : np.array The infrared light PPG signal. red_signal : np.array The red light PPG signal (optional for features like PI and RR). sampling_freq : int The sampling frequency of the signals in Hz. Example usage: -------------- ppg_extractor = PPGLightFeatureExtractor(ir_signal, red_signal, sampling_freq) spo2, times_spo2 = ppg_extractor.calculate_spo2() pi, times_pi = ppg_extractor.calculate_perfusion_index() rr, times_rr = ppg_extractor.calculate_respiratory_rate() ppr, times_ppr = ppg_extractor.calculate_ppr() """ def __init__(self, ir_signal, red_signal=None, sampling_freq=100): self.ir_signal = ir_signal self.red_signal = red_signal self.sampling_freq = sampling_freq
[docs] def calculate_spo2(self, window_seconds=1): """ Calculate SpO2 based on infrared (IR) and red light PPG signals. Parameters: ----------- window_seconds : int, optional The window length in seconds to calculate SpO2 (default is 1 second). Returns: -------- spo2_values : np.array Calculated SpO2 values for each window of the signal. timestamps : np.array Time (in seconds) for each SpO2 value. """ if self.red_signal is None: raise ValueError("Red signal is required to compute SpO2.") window_size = int(window_seconds * self.sampling_freq) spo2_values = [] timestamps = [] for start in range(0, len(self.ir_signal), window_size): end = start + window_size if end > len(self.ir_signal): break ir_segment = self.ir_signal[start:end] red_segment = self.red_signal[start:end] # AC and DC components ir_ac = np.max(ir_segment) - np.min(ir_segment) red_ac = np.max(red_segment) - np.min(red_segment) ir_dc = np.mean(ir_segment) red_dc = np.mean(red_segment) # SpO2 calculation — skip segment if AC or DC component is zero if ir_ac == 0 or ir_dc == 0 or red_dc == 0: continue ratio = (red_ac / red_dc) / (ir_ac / ir_dc) spo2 = np.clip(110 - 25 * ratio, 0, 100) # Clip between 0 and 100 # spo2 = 110 - 25 * ratio # Empirical constants (A=110, B=25) spo2_values.append(spo2) timestamps.append(start / self.sampling_freq) return np.array(spo2_values), np.array(timestamps)
[docs] def calculate_perfusion_index(self, window_seconds=1): """ Calculate the Perfusion Index (PI) from the infrared (IR) PPG signal. Parameters: ----------- window_seconds : int, optional The window length in seconds to calculate PI (default is 1 second). Returns: -------- pi_values : np.array Calculated perfusion index values for each window. timestamps : np.array Time (in seconds) for each PI value. """ window_size = int(window_seconds * self.sampling_freq) pi_values = [] timestamps = [] for start in range(0, len(self.ir_signal), window_size): end = start + window_size if end > len(self.ir_signal): break segment = self.ir_signal[start:end] # AC and DC components ac_component = np.max(segment) - np.min(segment) dc_component = np.mean(segment) if dc_component > 0: pi = ac_component / dc_component else: pi = 0 # Avoid division by negative or zero pi_values.append(pi) timestamps.append(start / self.sampling_freq) return np.array(pi_values), np.array(timestamps)
[docs] def calculate_respiratory_rate(self, window_seconds=60): """ Calculate the Respiratory Rate (RR) from a PPG signal by isolating the respiratory modulation via bandpass filtering in the respiratory frequency band (0.1-0.5 Hz / 6-30 breaths per minute). Parameters: ----------- window_seconds : int, optional The window length in seconds to calculate RR (default is 60 seconds). Returns: -------- rr_values : np.array Calculated respiratory rate (in breaths per minute) for each window. timestamps : np.array Time (in seconds) for each RR value. """ window_size = int(window_seconds * self.sampling_freq) rr_values = [] timestamps = [] for start in range(0, len(self.ir_signal), window_size): end = start + window_size if end > len(self.ir_signal): break segment = self.ir_signal[start:end] # FFT-based bandpass filter to isolate respiratory component (0.1-0.5 Hz) n = len(segment) fft_vals = np.fft.rfft(segment - np.mean(segment)) freqs = np.fft.rfftfreq(n, d=1.0 / self.sampling_freq) resp_mask = (freqs >= 0.1) & (freqs <= 0.5) fft_filtered = np.zeros_like(fft_vals) fft_filtered[resp_mask] = fft_vals[resp_mask] respiratory_signal = np.fft.irfft(fft_filtered, n=n) # Minimum distance between respiratory peaks: ~1.2 seconds (50 bpm max) min_dist = max(1, int(1.2 * self.sampling_freq)) peaks = find_peaks(respiratory_signal, distance=min_dist) if len(peaks) > 1: breaths_per_minute = len(peaks) * (60 / window_seconds) else: breaths_per_minute = 0 rr_values.append(breaths_per_minute) timestamps.append(start / self.sampling_freq) if len(rr_values) == 0: rr_values.append(0) timestamps.append(0) return np.array(rr_values), np.array(timestamps)
[docs] def calculate_ppr(self, window_seconds=1): """ Calculate the Photoplethysmogram Ratio (PPR) between infrared (IR) and red light PPG signals. Parameters: ----------- window_seconds : int, optional The window length in seconds to calculate PPR (default is 1 second). Returns: -------- ppr_values : np.array Calculated PPR values for each window. timestamps : np.array Time (in seconds) for each PPR value. """ if self.red_signal is None: raise ValueError("Red signal is required to compute PPR.") window_size = int(window_seconds * self.sampling_freq) ppr_values = [] timestamps = [] for start in range(0, len(self.ir_signal), window_size): end = start + window_size if end > len(self.ir_signal): break ir_segment = self.ir_signal[start:end] red_segment = self.red_signal[start:end] ir_ac = np.max(ir_segment) - np.min(ir_segment) red_ac = np.max(red_segment) - np.min(red_segment) ir_dc = np.mean(ir_segment) red_dc = np.mean(red_segment) if ir_dc > 0 and red_dc > 0: ppr = np.clip( (red_ac / red_dc) / (ir_ac / ir_dc), 0, None ) # Clip to non-negative values else: ppr = 0 # Set to zero if DC components are invalid ppr_values.append(ppr) timestamps.append(start / self.sampling_freq) return np.array(ppr_values), np.array(timestamps)