"""
Physiological Features Module for Physiological Signal Processing
This module provides comprehensive capabilities for physiological
signal processing including ECG, PPG, EEG, and other vital signs.
Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0
Key Features:
- Object-oriented design with comprehensive classes
- Multiple processing methods and functions
- NumPy integration for numerical computations
Examples:
---------
Basic usage:
>>> import numpy as np
>>> from vitalDSP.physiological_features.cross_correlation import CrossCorrelationFeatures
>>> signal = np.random.rand >>> ccf = CrossCorrelationFeatures(signal1, signal2)
>>> cc = ccf.compute_cross_correlation()esult}')
"""
import numpy as np
from vitalDSP.utils.signal_processing.peak_detection import PeakDetection
[docs]
class CrossCorrelationFeatures:
"""
A class for computing cross-correlation features between physiological signals (ECG, PPG, EEG).
Attributes:
signal1 (np.array): The first physiological signal (e.g., ECG).
signal2 (np.array): The second physiological signal (e.g., PPG).
fs (int): The sampling frequency of the signals in Hz. Default is 1000 Hz.
"""
def __init__(self, signal1, signal2, fs=1000):
"""
Initializes the CrossCorrelationFeatures object.
Args:
signal1 (np.array): The first physiological signal (e.g., ECG).
signal2 (np.array): The second physiological signal (e.g., PPG).
fs (int): The sampling frequency of the signals in Hz. Default is 1000 Hz.
"""
self.signal1 = np.array(signal1)
self.signal2 = np.array(signal2)
self.fs = fs # Sampling frequency
[docs]
def compute_cross_correlation(self, mode="full"):
"""
Computes the cross-correlation between two physiological signals (e.g., ECG and PPG).
Args:
mode (str): Specifies the mode for cross-correlation. Can be 'full', 'valid', or 'same'.
Default is 'full'.
Returns:
tuple: The cross-correlation values and the corresponding lag.
Example:
>>> ecg_signal = [...] # Sample ECG signal
>>> ppg_signal = [...] # Sample PPG signal
>>> ccf = CrossCorrelationFeatures(ecg_signal, ppg_signal)
>>> cross_corr, lag = ccf.compute_cross_correlation()
>>> print(f"Cross-Correlation: {cross_corr}, Lag: {lag}")
"""
cross_corr = np.correlate(
self.signal1 - np.mean(self.signal1),
self.signal2 - np.mean(self.signal2),
mode=mode,
)
lag = int(
np.argmax(np.abs(cross_corr)) - (len(self.signal2) - 1)
) # Ensure lag is an integer
return cross_corr, lag
[docs]
def compute_normalized_cross_correlation(self):
"""
Computes the normalized cross-correlation between two physiological signals to account for differences
in amplitude.
Returns:
tuple: The normalized cross-correlation values and the corresponding lag.
Example:
>>> ecg_signal = [...] # Sample ECG signal
>>> ppg_signal = [...] # Sample PPG signal
>>> ccf = CrossCorrelationFeatures(ecg_signal, ppg_signal)
>>> norm_corr, lag = ccf.compute_normalized_cross_correlation()
>>> print(f"Normalized Cross-Correlation: {norm_corr}, Lag: {lag}")
"""
cross_corr, lag = self.compute_cross_correlation()
std1 = np.std(self.signal1)
std2 = np.std(self.signal2)
if std1 == 0 or std2 == 0:
return np.zeros_like(cross_corr), lag
norm_corr = cross_corr / (std1 * std2 * len(self.signal1))
return norm_corr, lag
[docs]
def compute_pulse_transit_time(self, r_peaks):
"""
Computes Pulse Transit Time (PTT) as the time difference between the ECG signal (R-peaks)
and the corresponding PPG signal (foot of the waveform). PTT is used to estimate blood pressure
and vascular health.
Args:
r_peaks (np.array): Indices of R-peaks detected in the ECG signal.
Returns:
float: The average Pulse Transit Time (PTT) in milliseconds.
Example:
>>> r_peaks = [50, 150, 250] # Detected R-peaks
>>> ptt = ccf.compute_pulse_transit_time(r_peaks)
>>> print(f"Pulse Transit Time: {ptt} ms")
"""
# Detect foot of the PPG waveform
foot_detector = PeakDetection(self.signal2, method="ppg_first_derivative")
foot_points = foot_detector.detect_peaks()
ptt_values = []
for r_peak in r_peaks:
# Find the closest foot point in the PPG signal after the R-peak
foot_after_r = [foot for foot in foot_points if foot > r_peak]
if foot_after_r:
ptt = (
(foot_after_r[0] - r_peak) * 1000 / self.fs
) # Convert to milliseconds
ptt_values.append(ptt)
return np.mean(ptt_values) if ptt_values else 0.0
[docs]
def compute_lag(self, r_peaks):
"""
Computes the time lag between the ECG signal (R-peaks) and the corresponding PPG signal
(foot of the waveform) based on cross-correlation.
Args:
r_peaks (np.array): Indices of R-peaks detected in the ECG signal.
Returns:
float: The average time lag between the signals in milliseconds.
Example:
>>> r_peaks = [50, 150, 250] # Detected R-peaks
>>> lag = ccf.compute_lag(r_peaks)
>>> print(f"Average Lag: {lag} ms")
"""
foot_detector = PeakDetection(self.signal2, method="ppg_first_derivative")
foot_points = foot_detector.detect_peaks()
lag_values = []
for r_peak in r_peaks:
foot_after_r = [foot for foot in foot_points if foot > r_peak]
if foot_after_r:
lag = (
(foot_after_r[0] - r_peak) * 1000 / self.fs
) # Convert to milliseconds
lag_values.append(lag)
return np.mean(lag_values) if lag_values else 0.0