Source code for vitalDSP.physiological_features.waveform

"""
Physiological Features Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological
signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0

Key Features:
- Object-oriented design with comprehensive classes
- Multiple processing methods and functions
- NumPy integration for numerical computations
- SciPy integration for advanced signal processing
- Interactive visualization capabilities

Examples:
--------
Basic usage:
    >>> import numpy as np
    >>> from vitalDSP.physiological_features.waveform import Waveform
    >>> signal = np.random.randn(1000)
    >>> processor = Waveform(signal)
    >>> result = processor.process()
    >>> print(f'Processing result: {result}')
"""

import numpy as np
from vitalDSP.utils.signal_processing.peak_detection import PeakDetection
from vitalDSP.preprocess.preprocess_operations import estimate_baseline
from scipy.stats import skew, linregress
from scipy import integrate
import logging as logger
from scipy.signal import savgol_filter
from vitalDSP.transforms.vital_transformation import VitalTransformation


[docs] class WaveformMorphology: """ A class for computing morphological features from physiological waveforms (ECG, PPG, EEG). Attributes: waveform (np.array): The waveform signal (ECG, PPG, EEG). fs (int): The sampling frequency of the signal in Hz. Default is 1000 Hz. signal_type (str): The type of signal ('ECG', 'PPG', 'EEG'). simple_mode (bool, optional): If True, uses simplified diastolic peak detection (midpoint-based). Default is True. Examples -------- >>> import numpy as np >>> from vitalDSP.physiological_features.waveform import WaveformMorphology >>> >>> # Example 1: ECG waveform analysis >>> ecg_signal = np.random.randn(1000) # Simulated ECG signal >>> wm_ecg = WaveformMorphology(ecg_signal, fs=256, signal_type="ECG") >>> r_peaks = wm_ecg.r_peaks >>> q_valleys = wm_ecg.detect_q_valley() >>> print(f"Detected {len(r_peaks)} R-peaks") >>> >>> # Example 2: PPG waveform analysis >>> ppg_signal = np.random.randn(2000) # Simulated PPG signal >>> wm_ppg = WaveformMorphology(ppg_signal, fs=128, signal_type="PPG") >>> systolic_peaks = wm_ppg.detect_systolic_peaks() >>> diastolic_peaks = wm_ppg.detect_diastolic_peaks() >>> print(f"Detected {len(systolic_peaks)} systolic peaks") >>> >>> # Example 3: Custom peak detection configuration >>> custom_config = { ... "distance": 30, ... "window_size": 5, ... "threshold_factor": 1.2, ... "search_window": 4 ... } >>> wm_custom = WaveformMorphology(ecg_signal, fs=256, signal_type="ECG", peak_config=custom_config) >>> r_peaks_custom = wm_custom.r_peaks >>> print(f"Custom detection found {len(r_peaks_custom)} R-peaks") """ def __init__( self, waveform, fs=256, qrs_ratio=0.05, signal_type="ECG", peak_config=None, simple_mode=True, options=None, ): """ Initializes the WaveformMorphology object. Args: waveform (np.array): The waveform signal. fs (int): The sampling frequency of the signal in Hz. Default is 256 Hz. qrs_ratio (float): The ratio for QRS detection. Default is 0.05. signal_type (str): The type of signal. Options: 'ECG', 'PPG', 'EEG'. peak_config : dict, optional Configuration for peak detection parameters. If None, default settings are used. For ECG: { "distance": 50, "window_size": 7, "threshold_factor": 1.6, "search_window": 6, } For PPG: { "distance": 50, "window_size": 7, "threshold_factor": 0.8, "search_window": 6, "fs": fs, "dicrotic_config": { "slope_quartile_factor": 0.25, "search_range_start": 0.1, "search_range_end": 0.3, }, "diastolic_config": { # New settings for diastolic peaks "search_range_start": 0.2, # Start 20% after notch "search_range_end": 0.8, # End 80% before trough "min_height_factor": 0.1, # Min height as fraction of notch-to-trough amp "min_distance": 5 # Min samples from notch } } """ self.waveform = np.array(waveform) self.fs = fs # Sampling frequency self.signal_type = signal_type # 'ECG', 'PPG', 'EEG' self.qrs_ratio = qrs_ratio self.simple_mode = simple_mode self._cache = {} # Cache for computed results # Precompute derivatives for reuse self._signal_derivative = np.diff(self.waveform) self._signal_second_derivative = np.diff(self._signal_derivative) # self._smoothed_signal = savgol_filter(self.waveform, window_length=5, polyorder=2) if options is not None: # options = { # 'artifact_removal': 'baseline_correction', # 'artifact_removal_options': {'cutoff': 0.5}, # 'bandpass_filter': {'lowcut': 0.2, 'highcut': 10, 'filter_order': 4, 'filter_type': 'butter'}, # 'detrending': {'detrend_type': 'linear'}, # 'normalization': {'normalization_range': (0, 20)}, # "smoothing": { # "smoothing_method": "moving_average", # "window_size": 5, # "iterations": 2, # }, # 'enhancement': {'enhance_method': 'square'}, # 'advanced_filtering': {'filter_type': 'kalman_filter', 'options': {'R': 0.1, 'Q': 0.01}}, # } method_order = options.keys() transformer = VitalTransformation( self.waveform, fs=fs, signal_type=signal_type ) self._smoothed_signal = transformer.apply_transformations( options, method_order ) else: self._smoothed_signal = savgol_filter( self.waveform, window_length=5, polyorder=2 ) # Default peak detection configurations default_ecg_config = { "distance": 50, "window_size": 7, "threshold_factor": 1.6, "search_window": 6, } default_ppg_config = { "distance": 50, "window_size": 7, "threshold_factor": 0.8, "search_window": 6, "fs": self.fs, "dicrotic_config": { "slope_quartile_factor": 0.25, "search_range_start": 0.1, "search_range_end": 0.3, }, "diastolic_config": { "search_range_start": 0.2, "search_range_end": 0.8, "min_height_factor": 0.1, "min_distance": 5, "min_distance_to_trough": 5, # New: Min distance from trough "slope_window": 3, # New: Window for slope calculation }, } # Store the configuration for later use self.peak_config = ( peak_config if peak_config is not None else (default_ecg_config if signal_type == "ECG" else default_ppg_config) ) if signal_type == "ECG": detector = PeakDetection(self.waveform, "ecg_r_peak", **(self.peak_config)) self.r_peaks = detector.detect_peaks() if len(self.r_peaks) == 0: logger.warning( "No R peaks detected in ECG signal. This may cause issues with feature extraction." ) elif signal_type == "PPG": detector = PeakDetection( self.waveform, "ppg_systolic_peaks", **(self.peak_config) ) self.systolic_peaks = detector.detect_peaks() if len(self.systolic_peaks) == 0: logger.warning( "No systolic peaks detected in PPG signal. This may cause issues with feature extraction." ) elif signal_type == "EEG": detector = PeakDetection(self.waveform) self.eeg_peaks = detector.detect_peaks() else: raise ValueError("Invalid signal type. Supported types are ECG, PPG, EEG.") self.q_valleys = None self.s_valleys = None self.diastolic_troughs = None self.diastolic_peaks = None self.t_peaks = None self.p_peaks = None self.dicrotic_notches = None def _cache_result(self, key, method, *args, **kwargs): """Cache the result of a method call.""" cache_key = (key, tuple(args), frozenset(kwargs.items())) if cache_key not in self._cache: self._cache[cache_key] = method(*args, **kwargs) return self._cache[cache_key]
[docs] def detect_troughs(self, systolic_peaks=None): """ Detects the troughs (valleys) in the PPG waveform between systolic peaks. In simple_mode, uses the minimum value between adjacent peaks; otherwise, uses flat segment detection. Parameters ---------- systolic_peaks : np.array Indices of detected systolic peaks in the PPG waveform. Returns ------- troughs : np.array Indices of the detected troughs between the systolic peaks. Example ------- >>> waveform = np.sin(np.linspace(0, 2*np.pi, 100)) # Simulated PPG signal >>> wm = WaveformMorphology(waveform, signal_type="PPG") >>> peaks = PeakDetection(waveform).detect_peaks() >>> troughs = wm.detect_troughs(peaks) >>> print(f"Troughs: {troughs}") """ if self.signal_type != "PPG": logger.warning( "This function is designed to work with PPG signals. " "Other signal types may result in unexpected behaviour" ) if systolic_peaks is None: systolic_peaks = self.systolic_peaks diastolic_troughs = [] if self.simple_mode: # Simple mode: Find the minimum value between adjacent systolic peaks for i in range(len(systolic_peaks) - 1): peak_start = systolic_peaks[i] peak_end = systolic_peaks[i + 1] if peak_end <= peak_start: continue segment = self.waveform[peak_start:peak_end] trough_idx = peak_start + np.argmin(segment) # Index of minimum value diastolic_troughs.append(trough_idx) else: # Original mode: Use derivative and flat segments signal_derivative = np.diff(self.waveform) for i in range(len(systolic_peaks) - 1): peak_start = systolic_peaks[i] peak_end = systolic_peaks[i + 1] search_start = peak_start + (peak_end - peak_start) // 2 search_end = peak_end if search_start >= search_end or search_start < 0: continue # Calculate adaptive flatness threshold based on MAD of the derivative local_derivative = signal_derivative[search_start:search_end] mad_derivative = np.median( np.abs(local_derivative - np.median(local_derivative)) ) adaptive_threshold = 0.5 * mad_derivative # 50% of local MAD # Identify flat segments flat_segment = [ j for j in range(search_start, search_end - 1) if abs(signal_derivative[j]) < adaptive_threshold ] # Find midpoint of the longest flat segment if flat_segment: flat_segment_groups = np.split( flat_segment, np.where(np.diff(flat_segment) != 1)[0] + 1 ) longest_flat_segment = max(flat_segment_groups, key=len) trough_index = longest_flat_segment[len(longest_flat_segment) // 2] diastolic_troughs.append(trough_index) diastolic_troughs = np.array(diastolic_troughs) self.diastolic_troughs = diastolic_troughs return diastolic_troughs
# # Calculate the derivative of the signal for slope information # signal_derivative = np.diff(self.waveform) # for i in range(len(systolic_peaks) - 1): # # Define the search range as the midpoint between two adjacent systolic peaks # peak_start = systolic_peaks[i] # peak_end = systolic_peaks[i + 1] # search_start = peak_start + (peak_end - peak_start) // 2 # search_end = peak_end # # Ensure search range is valid # if search_start >= search_end or search_start < 0: # continue # # Narrow down to the section within the search range # search_section = self.waveform[search_start:search_end] # search_derivative = signal_derivative[search_start:search_end-1] # # Find the index where the slope is closest to zero with a preference for a negative slope # candidate_troughs = [ # idx for idx, slope in enumerate(search_derivative) if slope <= 0 # ] # if candidate_troughs: # trough_index = candidate_troughs[np.argmin(np.abs(search_derivative[candidate_troughs]))] # diastolic_troughs.append(search_start + trough_index) # return np.array(diastolic_troughs)
[docs] def detect_dicrotic_notches(self, systolic_peaks=None, diastolic_troughs=None): """ Detects the dicrotic notches in a PPG waveform using second derivative. Parameters ---------- systolic_peaks : np.array Indices of detected systolic peaks in the PPG waveform. diastolic_troughs : np.array Indices of the detected troughs between the systolic peaks. Returns ------- notches : np.array Indices of detected dicrotic notches in the PPG waveform. """ if self.signal_type != "PPG": raise ValueError("Notches can only be detected for PPG signals.") if systolic_peaks is None: systolic_peaks = self.systolic_peaks if diastolic_troughs is None: diastolic_troughs = self.detect_troughs(systolic_peaks=systolic_peaks) notches = [] signal_derivative = np.diff(self.waveform) min_length = min(len(systolic_peaks) - 1, len(diastolic_troughs)) systolic_peaks = systolic_peaks[: min_length + 1] diastolic_troughs = diastolic_troughs[:min_length] if self.simple_mode: # Simple mode: Full range, closest to zero derivative for i in range(min_length): peak = systolic_peaks[i] trough = diastolic_troughs[i] if trough <= peak: notches.append(peak + (trough - peak) // 2) # Fallback to midpoint continue # search_start = peak # search_end = trough # search_deriv = signal_derivative[search_start:search_end] interval_length = trough - peak search_start = peak + int( interval_length * 0.15 ) # Start of 2nd quarter search_end = peak + int(interval_length * 0.45) # End of 3rd quarter search_start = min(max(search_start, peak + 1), trough - 1) search_end = min(max(search_end, search_start + 1), trough) search_deriv = signal_derivative[search_start:search_end] if search_deriv.size > 0: notch_idx = search_start + np.argmin( np.abs(search_deriv) ) # Closest to zero notches.append( max(peak + 1, min(notch_idx, trough - 1) - 2) ) # Ensure bounds else: notches.append(peak + (trough - peak) // 2) # Fallback if empty notches = np.array(notches) target_length = len(diastolic_troughs) if len(notches) < target_length and target_length - len(notches) <= 1: # Impute missing notches with derivative closest to zero for i in range(len(notches), target_length): peak = systolic_peaks[i] trough = diastolic_troughs[i] if trough <= peak: imputed_notch = peak + (trough - peak) // 3 else: search_deriv = signal_derivative[peak:trough] min_deriv_idx = np.argmin(np.abs(search_deriv)) imputed_notch = peak + min_deriv_idx imputed_notch = max(peak + 1, min(imputed_notch, trough - 1)) notches = np.append(notches, imputed_notch) else: # Non-simple mode: Search 2nd and 3rd quarters (25% to 75%) of [peak, trough] for i in range(min_length): peak = systolic_peaks[i] trough = diastolic_troughs[i] if trough <= peak: notches.append(peak + (trough - peak) // 2) continue # Divide [peak, trough] into 4 parts, take 2nd and 3rd (25% to 75%) interval_length = trough - peak search_start = peak + int( interval_length * 0.25 ) # Start of 2nd quarter search_end = peak + int(interval_length * 0.75) # End of 3rd quarter search_start = min(max(search_start, peak + 1), trough - 1) search_end = min(max(search_end, search_start + 1), trough) search_deriv = signal_derivative[search_start:search_end] if search_deriv.size > 0: notch_idx = search_start + np.argmin( np.abs(search_deriv) ) # Closest to zero notches.append(max(search_start, min(notch_idx, search_end - 1))) else: notches.append(search_start + (search_end - search_start) // 2) # Sort and ensure length consistency notches = np.sort(notches)[: len(diastolic_troughs)] self.dicrotic_notches = notches return notches
[docs] def detect_diastolic_peak(self, notches=None, diastolic_troughs=None): """ Detect diastolic peaks in PPG signals based on notches and diastolic troughs. Parameters ---------- notches : list of int Indices of detected notches in the PPG signal. diastolic_troughs : list of int Indices of diastolic troughs in the PPG signal. Returns ------- diastolic_peaks : list of int Indices of diastolic peaks detected in the PPG signal. """ if self.signal_type != "PPG": raise ValueError("Diastolic peaks can only be detected for PPG signals.") if diastolic_troughs is None: diastolic_troughs = self.detect_troughs(systolic_peaks=self.systolic_peaks) if notches is None: notches = self.detect_dicrotic_notches( systolic_peaks=self.systolic_peaks, diastolic_troughs=diastolic_troughs ) # Get diastolic config from peak_config # diastolic_config = self.peak_config.get( # "diastolic_config", # { # "search_range_start": 0.2, # "search_range_end": 0.4, # "min_height_factor": 0.1, # "min_distance": 8, # "min_distance_to_trough": 8, # "slope_window": 5, # }, # ) diastolic_peaks = [] min_length = min(len(notches), len(diastolic_troughs)) notches = notches[:min_length] diastolic_troughs = diastolic_troughs[:min_length] signal_derivative = np.diff(self.waveform) signal_second_derivative = np.diff(signal_derivative) diastolic_peaks = [] if self.simple_mode: # Simple mode: Full range [notch, trough], find flattest derivative for i in range(min_length): notch = notches[i] trough = diastolic_troughs[i] if trough <= notch + 2: # Need at least 3 points for derivative diastolic_peaks.append(notch + (trough - notch) // 2) continue # search_start = notch # search_end = trough # Divide [notch, trough] into parts, take 2nd and 3rd (20% to 45%) interval_length = trough - notch search_start = notch + int( interval_length * 0.1 ) # Start of 2nd quarter search_end = notch + int(interval_length * 0.45) # End of 3rd quarter search_start = min(max(search_start, notch + 1), trough - 1) search_end = min(max(search_end, search_start + 1), trough) search_deriv = signal_derivative[search_start:search_end] if search_deriv.size > 0: min_deriv_idx = np.argmin(np.abs(search_deriv)) peak_idx = search_start + min_deriv_idx + 2 # Shift forth 2 units peak_idx = max( notch + 1, min(peak_idx, trough - 1) ) # Ensure bounds diastolic_peaks.append(peak_idx) else: diastolic_peaks.append(notch + (trough - notch) // 3) else: # Non-simple mode: Search 2nd and 3rd quarters (25% to 75%) of [notch, trough] for i in range(min_length): notch = notches[i] trough = diastolic_troughs[i] if trough <= notch + 2: # Need at least 3 points for derivative diastolic_peaks.append(notch + (trough - notch) // 2) continue # Divide [notch, trough] into 4 parts, take 2nd and 3rd (25% to 75%) interval_length = trough - notch search_start = notch + int( interval_length * 0.25 ) # Start of 2nd quarter search_end = notch + int(interval_length * 0.75) # End of 3rd quarter search_start = min(max(search_start, notch + 1), trough - 1) search_end = min(max(search_end, search_start + 1), trough) search_deriv = signal_second_derivative[search_start:search_end] if search_deriv.size > 0: min_deriv_idx = np.argmin(np.abs(search_deriv)) peak_idx = search_start + min_deriv_idx + 2 # Shift forth 3 units peak_idx = max( search_start, min(peak_idx, search_end - 1) ) # Ensure bounds diastolic_peaks.append(peak_idx) else: diastolic_peaks.append( search_start + (search_end - search_start) // 2 ) diastolic_peaks = np.array(diastolic_peaks) self.diastolic_peaks = diastolic_peaks return diastolic_peaks
# # signal_derivative = np.diff(self.waveform) # # Light smoothing to stabilize slope detection # # smoothed_signal = savgol_filter(self.waveform, window_length=5, polyorder=2) # smoothed_signal = self._smoothed_signal # for i in range(min_length): # notch = notches[i] # trough = diastolic_troughs[i] # min_dist = diastolic_config["min_distance"] # min_dist_trough = diastolic_config["min_distance_to_trough"] # if trough <= notch + min_dist + min_dist_trough: # diastolic_peaks.append(notch + (trough - notch) // 2) # continue # search_start = notch + min_dist # search_end = trough - min_dist_trough # if ( # search_end <= search_start + 2 # ): # Ensure at least 3 samples for slope calc # diastolic_peaks.append(notch + (trough - notch) // 2) # continue # if self.simple_mode: # # Simple mode: Find point with smallest slope # search_segment = smoothed_signal[search_start:search_end] # slopes = np.abs(np.diff(search_segment)) # Absolute slope # if len(slopes) > 0: # min_slope_idx = np.argmin(slopes) # peak_idx = search_start + min_slope_idx # # Ensure it's not too close to boundaries # if peak_idx < search_start + 1 or peak_idx > search_end - 1: # peak_idx = search_start + (search_end - search_start) // 2 # diastolic_peaks.append(peak_idx) # else: # diastolic_peaks.append(notch + (trough - notch) // 2) # else: # # Non-simple mode: Use find_peaks # search_segment = self.waveform[search_start:search_end] # notch_value = self.waveform[notch] # trough_value = self.waveform[trough] # min_height = (notch_value - trough_value) * diastolic_config[ # "min_height_factor" # ] + trough_value # peaks, _ = find_peaks( # search_segment, # height=min_height, # distance=diastolic_config["min_distance"], # ) # if peaks.size > 0: # peak_idx = search_start + peaks[0] # diastolic_peaks.append(peak_idx) # else: # peak_idx = search_start + np.argmax(search_segment) # diastolic_peaks.append(peak_idx) # # min_length = min(len(notches), len(diastolic_troughs)) # # notches = notches[:min_length] # # diastolic_troughs = diastolic_troughs[:min_length] # # for i in range(min_length): # # notch = notches[i] # # trough = diastolic_troughs[i] # # if trough <= notch: # # diastolic_peaks.append(notch) # Fallback to notch # # continue # # # Search full range from notch to trough # # search_segment = self.waveform[notch:trough] # # peak_idx = notch + np.argmax(search_segment) # Find max value # # diastolic_peaks.append(peak_idx) # diastolic_peaks = np.array(diastolic_peaks) # self.diastolic_peaks = diastolic_peaks # return diastolic_peaks # for i in range(len(notches)): # notch = notches[i] # trough = ( # diastolic_troughs[i] # if i < len(diastolic_troughs) # else len(self.waveform) - 1 # ) # # Define the initial search range: notch to halfway to the trough # search_start = notch # search_end = notch + (trough - notch) // 2 # if search_end <= search_start or search_end > len(self.waveform): # diastolic_peaks.append(notch) # continue # search_segment = self.waveform[search_start:search_end] # segment_derivative = np.diff(search_segment) # candidate_peaks = [ # idx # for idx in range(1, len(segment_derivative)) # if segment_derivative[idx - 1] < 0 and segment_derivative[idx] >= 0 # ] # # Convert candidates to absolute indices and select the most prominent peak if available # if candidate_peaks: # candidate_indices = [search_start + idx for idx in candidate_peaks] # diastolic_peak_idx = max( # candidate_indices, key=lambda x: self.waveform[x] # ) # diastolic_peaks.append(diastolic_peak_idx) # else: # # Fallback assignment: if no diastolic peak is detected, use the notch as the diastolic peak # diastolic_peaks.append(notch) # # min_length = min(len(notches), len(diastolic_troughs)) # # notches = notches[:min_length] # # diastolic_troughs = diastolic_troughs[:min_length] # # for i in range(min_length): # # notch = notches[i] # # trough = diastolic_troughs[i] # # if trough <= notch: # # diastolic_peaks.append(notch) # Fallback if order is wrong # # continue # # search_segment = self.waveform[notch:trough] # # peak_idx = notch + np.argmax(search_segment) # Find max in full range # # diastolic_peaks.append(peak_idx) # diastolic_peaks = np.array(diastolic_peaks) # self.diastolic_peaks = diastolic_peaks # return diastolic_peaks
[docs] def detect_q_valley(self, r_peaks=None): """ Detects the Q valley (local minimum) in the ECG waveform just before each R peak. Parameters ---------- r_peaks : np.array Indices of detected R peaks in the ECG waveform. Returns ------- q_valleys : list of int Indices of the Q valley (local minimum) for each R peak. """ if self.signal_type != "ECG": raise ValueError("Q valleys can only be detected for ECG signals.") if r_peaks is None: r_peaks = self.r_peaks if len(r_peaks) == 0: logger.warning("No R peaks available for Q valley detection.") return np.array([]) q_valleys = [] for i, r_peak in enumerate(r_peaks): # Set the end of the search range to be the R peak search_end = r_peak # Determine the start of the search range if i == 0: # For the first R peak, start from the beginning of the signal search_start = max( 0, search_end - int(self.fs * self.qrs_ratio) ) # Approx 200ms window else: # For subsequent R peaks, set the start as the midpoint to the previous R peak # search_start = (r_peaks[i - 1] + r_peak) // 2 search_start = r_peak - int(self.qrs_ratio * self.fs) # Ensure the search range is valid if search_start < search_end: # Extract the signal segment within the search range signal_segment = self.waveform[search_start:search_end] # Detect the Q valley as the minimum point in the segment q_valley_idx = np.argmin(signal_segment) + search_start q_valleys.append(q_valley_idx) q_valleys = np.array(q_valleys) self.q_valleys = q_valleys return q_valleys
[docs] def detect_s_valley(self, r_peaks=None): """ Detects the S valleys (local minima after each R peak). Parameters ---------- r_peaks : list of int Indices of detected R peaks in the ECG waveform. Returns ------- s_valleys : list of int Indices of the S valleys for each R peak. """ if self.signal_type != "ECG": raise ValueError("S valleys can only be detected for ECG signals.") if r_peaks is None: r_peaks = self.r_peaks if len(r_peaks) == 0: logger.warning("No R peaks available for S valley detection.") return np.array([]) s_valleys = [] for i, r_peak in enumerate(r_peaks): # Set the start of the search range to be the R peak search_start = r_peak # Determine the end of the search range if i == len(r_peaks) - 1: # For the last R peak, set the end to the end of the signal or a 200ms window after the R peak search_end = min( len(self.waveform) - 1, search_start + int(self.fs * 0.1) ) else: # For other R peaks, set the end as the midpoint to the next R peak # search_end = (r_peak + r_peaks[i + 1]) // 2 search_end = r_peak + int(self.qrs_ratio * self.fs) # Ensure the search range is valid if search_start < search_end: # Extract the signal segment within the search range signal_segment = self.waveform[search_start:search_end] # Detect the S valley as the minimum point in the segment s_valley_idx = np.argmin(signal_segment) + search_start s_valleys.append(s_valley_idx) s_valleys = np.array(s_valleys) self.s_valleys = s_valleys return s_valleys
[docs] def detect_p_peak(self, r_peaks=None, q_valleys=None): """ Detects the P peak (local maximum) in the ECG waveform just before each Q valley. Parameters ---------- q_valleys : list of int Indices of detected Q valleys in the ECG waveform. Returns ------- p_peaks : list of int Indices of the P peaks (local maximum) for each Q valley. """ if self.signal_type != "ECG": raise ValueError("P peaks can only be detected for ECG signals.") if r_peaks is None: r_peaks = self.r_peaks if len(r_peaks) == 0: logger.warning("No R peaks available for P peak detection.") return np.array([]) if q_valleys is None: q_valleys = self.detect_q_valley(r_peaks=r_peaks) if len(q_valleys) == 0: logger.warning("No Q valleys available for P peak detection.") return np.array([]) p_peaks = [] for i in range(len(q_valleys)): # Define search range with midpoint as start and Q valley as end if i == 0: # For the first Q valley, use the beginning of the signal as start search_start = max( 0, q_valleys[i] - int(self.fs * 0.2) ) # 200ms before Q valley else: # For subsequent Q valleys, use midpoint between previous and current R peaks midpoint = (r_peaks[i - 1] + r_peaks[i]) // 2 search_start = midpoint q_valley = q_valleys[i] # Ensure search range has positive length if search_start < q_valley: search_end = q_valley else: # Skip if search range is invalid continue # Extract signal segment within search range signal_segment = self.waveform[search_start:search_end] # Detect the maximum in this segment p_peak_idx = np.argmax(signal_segment) + search_start p_peaks.append(p_peak_idx) p_peaks = np.array(p_peaks) self.p_peaks = p_peaks return p_peaks
[docs] def detect_t_peak(self, r_peaks=None, s_valleys=None): """ Detect T peaks (local maxima) between the S valley and the midpoint to the next R peak. Parameters ---------- r_peaks : list of int Indices of detected R peaks in the ECG waveform. s_valleys : list of int Indices of detected S valleys in the ECG waveform. Returns ------- t_peaks : list of int Indices of the T peaks for each S valley. """ if self.signal_type != "ECG": raise ValueError("T peaks can only be detected for ECG signals.") if r_peaks is None: r_peaks = self.r_peaks if len(r_peaks) == 0: logger.warning("No R peaks available for T peak detection.") return np.array([]) if s_valleys is None: s_valleys = self.detect_s_valley(r_peaks=r_peaks) if len(s_valleys) == 0: logger.warning("No S valleys available for T peak detection.") return np.array([]) t_peaks = [] for i in range(len(s_valleys)): # Define S valley as the start of the search range s_valley = s_valleys[i] # Determine the end of the search range # For the last S valley, restrict the end point within signal bounds if i < len(r_peaks) - 1: midpoint = (r_peaks[i] + r_peaks[i + 1]) // 2 else: # For the last R peak, limit the midpoint to signal length midpoint = len(self.waveform) - 1 # Check if search range is valid if s_valley < midpoint: search_start = s_valley search_end = midpoint # Extract the signal segment within the search range signal_segment = self.waveform[search_start:search_end] # Detect the T peak as the maximum point in the segment t_peak_idx = np.argmax(signal_segment) + search_start t_peaks.append(t_peak_idx) t_peaks = np.array(t_peaks) self.t_peaks = t_peaks # Store the detected T peaks for future use return t_peaks
[docs] def detect_q_session(self, p_peaks=None, q_valleys=None, r_peaks=None): """ Detects the Q sessions (start and end) in the ECG waveform based on R peaks. Parameters ---------- r_peaks : np.array Indices of detected R peaks in the ECG waveform. Returns ------- q_sessions : list of tuples Each tuple contains the start and end index of a Q session. Example ------- >>> waveform = np.sin(np.linspace(0, 2*np.pi, 100)) # Simulated ECG signal >>> wm = WaveformMorphology(waveform, signal_type="ECG") >>> r_peaks = PeakDetection(waveform).detect_peaks() >>> q_sessions = wm.detect_q_session(r_peaks) >>> print(f"Q Sessions: {q_sessions}") """ if self.signal_type != "ECG": raise ValueError("Q sessions can only be detected for ECG signals.") if r_peaks is None: r_peaks = self.r_peaks if q_valleys is None: q_valleys = self.detect_q_valley(r_peaks=r_peaks) if p_peaks is None: p_peaks = self.detect_p_peak(r_peaks=r_peaks, q_valleys=q_valleys) q_sessions = [] if len(q_valleys) == 0 or len(p_peaks) == 0: return np.array([]) # Trick to ensure that P peak comes before Q valley at the start if q_valleys[0] < p_peaks[0]: q_valleys = q_valleys[ 1: ] # Skip the first Q valley if it comes before the first P peak r_peaks = r_peaks[1:] # Also skip the corresponding first R peak # Iterate over detected peaks and valleys for i in range(len(q_valleys)): # Ensure indices are within valid bounds if i < len(p_peaks) and i < len(r_peaks): # Define start of Q session as the midpoint between P peak and Q valley start = (p_peaks[i] + q_valleys[i]) // 2 # Set search range from Q valley to R peak search_start = q_valleys[i] search_end = r_peaks[i] # Check for valid search range if search_end > search_start: # Find the end of Q session: closest point to start signal value within search range start_value = self.waveform[start] end = search_start + np.argmin( np.abs(self.waveform[search_start:search_end] - start_value) ) # Append the session to the q_sessions list q_sessions.append((start, end)) q_sessions = np.array(q_sessions) self.q_sessions = q_sessions # Store the detected Q sessions for future use return q_sessions
[docs] def detect_s_session(self, t_peaks=None, s_valleys=None, r_peaks=None): """ Detects the S sessions (start and end) in the ECG waveform based on R peaks. Parameters ---------- r_peaks : np.array Indices of detected R peaks in the ECG waveform. Returns ------- s_sessions : list of tuples Each tuple contains the start and end index of an S session. Example ------- >>> waveform = np.sin(np.linspace(0, 2*np.pi, 100)) # Simulated ECG signal >>> wm = WaveformMorphology(waveform, signal_type="ECG") >>> r_peaks = PeakDetection(waveform).detect_peaks() >>> s_sessions = wm.detect_s_session(r_peaks) >>> print(f"S Sessions: {s_sessions}") """ if self.signal_type != "ECG": raise ValueError("S sessions can only be detected for ECG signals.") if r_peaks is None: r_peaks = self.r_peaks if s_valleys is None: s_valleys = self.detect_s_valley(r_peaks=r_peaks) if t_peaks is None: t_peaks = self.detect_t_peak(r_peaks=r_peaks, s_valleys=s_valleys) s_sessions = [] # Ensure all input arrays are aligned in size for i in range(1, min(len(r_peaks), len(s_valleys), len(t_peaks))): r_peak = r_peaks[i] s_valley = s_valleys[i] t_peak = t_peaks[i] # Calculate the midpoint between S valley and T peak for the end point s_end = s_valley + (t_peak - s_valley) // 2 # Determine the start point within the range from R peak to S valley, # choosing the point closest in value to s_end # Ensure the search range is valid and not empty if r_peak < s_valley: search_range = self.waveform[r_peak:s_valley] if len(search_range) > 0: # Find the start point within the search range closest to s_end's value start_index_within_range = np.argmin( np.abs(search_range - self.waveform[s_end]) ) s_start = r_peak + start_index_within_range # Only add the session if s_start is before s_end if s_start < s_end: s_sessions.append((s_start, s_end)) s_sessions = np.array(s_sessions) self.s_sessions = s_sessions # Store the detected S sessions for future use return s_sessions
[docs] def detect_r_session(self, rpeaks=None, q_sessions=None, s_sessions=None): """ Detects the R session (start and end) in the ECG waveform based on Q and S sessions. Parameters ---------- q_session : tuple The start and end index of the Q session. s_session : tuple The start and end index of the S session. Returns ------- r_sessions : list of tuples Each tuple contains the start and end index of an R session. """ if rpeaks is None: rpeaks = self.r_peaks # If q_sessions or s_sessions are explicitly provided as empty arrays, respect that if (q_sessions is not None and len(q_sessions) == 0) or ( s_sessions is not None and len(s_sessions) == 0 ): return np.array([]) # Only try to get sessions from instance if they weren't provided if q_sessions is None: q_sessions = self.q_sessions if s_sessions is None: s_sessions = self.s_sessions # If sessions are None or empty, try to detect them if q_sessions is None or len(q_sessions) == 0: q_sessions = self.detect_q_session(r_peaks=rpeaks) if s_sessions is None or len(s_sessions) == 0: s_sessions = self.detect_s_session(r_peaks=rpeaks) # If still empty after detection, return empty array if len(q_sessions) == 0 or len(s_sessions) == 0: return np.array([]) r_sessions = [] # Ensure all input arrays are aligned in size and skip the first item to avoid boundary issues for i in range(min(len(q_sessions), len(s_sessions))): q_end = q_sessions[i][1] s_start = s_sessions[i][0] # Ensure there is a valid interval for the R session if q_end < s_start: r_sessions.append((q_end, s_start)) r_sessions = np.array(r_sessions) self.r_sessions = r_sessions # Store the detected R sessions for future use return r_sessions
[docs] def detect_qrs_session(self, rpeaks=None, q_session=None, s_session=None): """ Detects the QRS complex sessions (start and end) in the ECG waveform. Parameters ---------- r_peaks : np.array Indices of detected R peaks in the ECG waveform. Returns ------- qrs_sessions : np.ndarray Each tuple contains the start and end index of a QRS session. Example ------- >>> waveform = np.sin(np.linspace(0, 2*np.pi, 100)) # Simulated ECG signal >>> wm = WaveformMorphology(waveform, signal_type="ECG") >>> r_peaks = PeakDetection(waveform).detect_peaks() >>> qrs_sessions = wm.detect_qrs_session(r_peaks) >>> print(f"QRS Sessions: {qrs_sessions}") """ if rpeaks is None: rpeaks = self.r_peaks if len(rpeaks) == 0: logger.warning("No R peaks available for QRS session detection.") return np.array([]) if q_session is None: q_session = self.detect_q_session(r_peaks=rpeaks) if s_session is None: s_session = self.detect_s_session(r_peaks=rpeaks) # Ensure all input arrays are aligned in size if len(q_session) == 0 or len(s_session) == 0: logger.warning("No Q or S sessions detected for QRS session detection.") return np.array([]) qrs_sessions = [(q[0], s[1]) for q, s in zip(q_session, s_session)] return np.array(qrs_sessions)
[docs] def detect_ppg_session(self, troughs=None): """ Detects PPG sessions between consecutive troughs. Parameters ---------- troughs : np.array, optional Indices of detected troughs in the PPG waveform. Returns ------- np.ndarray Array of tuples containing start and end indices of PPG sessions. """ if troughs is None: troughs = self.detect_troughs(systolic_peaks=self.systolic_peaks) ppg_sessions = [ (trough_start, trough_end) for trough_start, trough_end in zip(troughs[:-1], troughs[1:]) ] return np.array(ppg_sessions)
[docs] def detect_ecg_session(self, p_peaks=None, t_peaks=None): """ Detects the ECG session (start and end) based on flat lines before the P peak and after the T peak. Parameters ---------- p_peaks : np.array, optional Indices of detected P peaks in the ECG waveform. t_peaks : np.array, optional Indices of detected T peaks in the ECG waveform. Returns ------- ecg_sessions : list of tuples Each tuple contains the start and end index of an ECG session. """ if self.signal_type != "ECG": raise ValueError("ECG sessions can only be detected for ECG signals.") if p_peaks is None: p_peaks = self.detect_p_peak() if t_peaks is None: t_peaks = self.detect_t_peak() ecg_sessions = [] # Calculate the derivative of the signal to identify flat regions signal_derivative = np.diff(self.waveform) threshold = 0.15 * np.std(signal_derivative) # Adaptive threshold for flatness for p, t in zip(p_peaks, t_peaks): # Detect flat line before the P peak start = p for i in range(p, 0, -1): if abs(signal_derivative[i - 1]) > threshold: start = i break # Detect flat line after the T peak end = t for i in range(t, len(signal_derivative) - 1): if abs(signal_derivative[i]) > threshold: end = i break ecg_sessions.append((start, end)) ecg_sessions = np.array(ecg_sessions) self.ecg_sessions = ecg_sessions return ecg_sessions
[docs] def compute_amplitude( self, interval_type="R-to-S", baseline_method="moving_average", compare_to_baseline=False, signal_type="ECG", ): """ Computes the amplitude (max-min) of the waveform for specified intervals. Parameters ---------- interval_type : str, optional The interval type to calculate the amplitude for: - For ECG: - "R-to-S": Between R peak and S valley. - "R-to-Q": Between Q valley and R peak. - "P-to-Q": Between P peak and Q valley. - "T-to-S": Between S valley and T peak. - "T-to-Baseline": Between T peak and baseline. - "R-to-Baseline": Between R peak and baseline. - "S-to-Baseline": Between S valley and baseline. - For PPG: - "Sys-to-Notch": Between systolic peak and dicrotic notch. - "Notch-to-Dia": Between dicrotic notch and diastolic peak. - "Sys-to-Dia": Between systolic and diastolic peaks. - "Sys-to-Baseline": Between systolic peak and baseline. - "Notch-to-Baseline": Between dicrotic notch and baseline. - "Dia-to-Baseline": Between diastolic peak and baseline. signal_type : str, optional The type of signal: "ECG" or "PPG". Default is "ECG". compare_to_baseline : bool, optional If True, compute amplitudes relative to a baseline. Returns ------- List[float]: A list of amplitude values for each interval within a single complex. Examples -------- >>> amplitude_values = wm.compute_amplitude(interval_type="R-to-S", signal_type="ECG") >>> print(f"Amplitude values for each complex in R-to-S interval: {amplitude_values}") """ # Baseline calculation baseline = estimate_baseline(self.waveform, self.fs, method=baseline_method) # Automatically compute peaks and valleys if they are not already computed if signal_type == "ECG": if interval_type == "R-to-S": if self.s_valleys is None: self.detect_s_valley() peaks, valleys, require_peak_first = self.r_peaks, self.s_valleys, True elif interval_type == "R-to-Q": if self.q_valleys is None: self.detect_q_valley() # For R-to-Q, we want Q valley to start and R peak to end peaks, valleys, require_peak_first = self.q_valleys, self.r_peaks, False elif interval_type == "P-to-Q": if self.q_valleys is None: self.detect_q_valley() if self.p_peaks is None: self.detect_p_peak() peaks, valleys, require_peak_first = self.p_peaks, self.q_valleys, True elif interval_type == "T-to-S": if self.s_valleys is None: self.detect_s_valley() if self.t_peaks is None: self.detect_t_peak() # For T-to-S, we want S valley to start and T peak to end peaks, valleys, require_peak_first = self.s_valleys, self.t_peaks, False elif interval_type == "T-to-Baseline": if self.t_peaks is None: self.detect_t_peak() peaks, valleys, require_peak_first = self.t_peaks, None, False elif interval_type == "R-to-Baseline": peaks, valleys, require_peak_first = self.r_peaks, None, False elif interval_type == "S-to-Baseline": if self.s_valleys is None: self.detect_s_valley() peaks, valleys, require_peak_first = self.s_valleys, None, False else: raise ValueError("Invalid interval_type for ECG.") elif signal_type == "PPG": if interval_type == "Sys-to-Notch": if self.dicrotic_notches is None: self.detect_dicrotic_notches() peaks, valleys, require_peak_first = ( self.systolic_peaks, self.dicrotic_notches, True, ) elif interval_type == "Notch-to-Dia": if self.dicrotic_notches is None: self.detect_dicrotic_notches() if self.diastolic_peaks is None: self.detect_diastolic_peak() peaks, valleys, require_peak_first = ( self.dicrotic_notches, self.diastolic_peaks, False, # Keep as False since we want volume when notch comes before diastolic peak ) elif interval_type == "Sys-to-Dia": if self.diastolic_peaks is None: self.detect_diastolic_peak() peaks, valleys, require_peak_first = ( self.systolic_peaks, self.diastolic_peaks, True, ) elif interval_type == "Sys-to-Baseline": peaks, valleys, require_peak_first = self.systolic_peaks, None, False elif interval_type == "Notch-to-Baseline": if self.dicrotic_notches is None: self.detect_dicrotic_notches() peaks, valleys, require_peak_first = self.dicrotic_notches, None, False elif interval_type == "Dia-to-Baseline": if self.diastolic_peaks is None: self.detect_diastolic_peak() peaks, valleys, require_peak_first = self.diastolic_peaks, None, False else: raise ValueError("Invalid interval_type for PPG.") else: raise ValueError("Invalid signal type. Supported types are ECG, PPG, EEG.") # Compute amplitude for each interval or baseline comparison amplitudes = [] for i, peak in enumerate(peaks): if valleys is None: # Baseline comparison only amplitude = abs(self.waveform[peak] - baseline[peak]) amplitudes.append(amplitude) elif i < len(valleys): valley = valleys[i] # Always check that peak < valley for valid intervals if peak < valley: amplitude = abs(self.waveform[peak] - self.waveform[valley]) amplitudes.append(amplitude) return np.array(amplitudes)
[docs] def compute_volume(self, interval_type="P-to-T", signal_type="ECG", mode="peak"): """ Compute the area under the curve between two sets of peaks and valleys for specified intervals. Parameters ---------- interval_type : str, optional The interval type to calculate the volume for: - For ECG: - "P-to-T": Entire complex from P peak to T peak. - "R-to-S": Between R peak and S valley. - "R-to-Q": Between Q valley and R peak. - "P-to-Q": Between P peak and Q valley. - "T-to-S": Between S valley and T peak. - For PPG: - "Sys-to-Notch": Between systolic peak and dicrotic notch. - "Notch-to-Dia": Between dicrotic notch and diastolic peak. - "Sys-to-Dia": Between systolic and diastolic peaks. - "Sys-to-Sys": Between consecutive systolic peaks (full PPG complex). Default is "P-to-T" for ECG and "Sys-to-Notch" for PPG. signal_type : str, optional The type of signal: "ECG" or "PPG". Default is "ECG". mode : str, optional The area computation method ("peak" or "trough"). Default is "peak". - "peak": Computes the area under the curve. - "trough": Computes the area bounded by troughs. Returns ------- List[float]: A list of volume values, each representing the area for the specified interval within a single complex. Examples -------- >>> volume_values = wm.compute_volume(interval_type="R-to-S", signal_type="ECG") >>> print(f"Volume values for each complex in R-to-S interval: {volume_values}") """ # Automatically compute peaks and valleys if they are not already computed if signal_type == "ECG": if interval_type == "R-to-S": if self.s_valleys is None: self.detect_s_valley() peaks, valleys, require_peak_first = self.r_peaks, self.s_valleys, True elif interval_type == "R-to-Q": if self.q_valleys is None: self.detect_q_valley() # For R-to-Q, we want Q valley to start and R peak to end peaks, valleys, require_peak_first = self.q_valleys, self.r_peaks, False elif interval_type == "P-to-Q": if self.q_valleys is None: self.detect_q_valley() if self.p_peaks is None: self.detect_p_peak() peaks, valleys, require_peak_first = self.p_peaks, self.q_valleys, True elif interval_type == "T-to-S": if self.s_valleys is None: self.detect_s_valley() if self.t_peaks is None: self.detect_t_peak() # For T-to-S, we want S valley to start and T peak to end peaks, valleys, require_peak_first = self.s_valleys, self.t_peaks, False else: raise ValueError("Invalid interval_type for ECG.") elif signal_type == "PPG": if interval_type == "Sys-to-Notch": if self.dicrotic_notches is None: self.detect_dicrotic_notches() peaks, valleys, require_peak_first = ( self.systolic_peaks, self.dicrotic_notches, True, ) elif interval_type == "Notch-to-Dia": if self.dicrotic_notches is None: self.detect_dicrotic_notches() if self.diastolic_peaks is None: self.detect_diastolic_peak() peaks, valleys, require_peak_first = ( self.dicrotic_notches, self.diastolic_peaks, False, ) elif interval_type == "Sys-to-Dia": if self.diastolic_peaks is None: self.detect_diastolic_peak() peaks, valleys, require_peak_first = ( self.systolic_peaks, self.diastolic_peaks, True, ) elif interval_type == "Sys-to-Sys": # Full PPG complex: consecutive systolic peaks if self.systolic_peaks is None or len(self.systolic_peaks) < 2: raise ValueError( "At least two systolic peaks required for Sys-to-Sys interval." ) peaks = self.systolic_peaks[:-1] # Start points valleys = self.systolic_peaks[1:] # End points require_peak_first = True else: raise ValueError("Invalid interval_type for PPG.") else: raise ValueError("signal_type must be 'ECG' or 'PPG'.") # Check if peaks and valleys are empty if len(peaks) == 0 or len(valleys) == 0: logger.warning( f"No peaks or valleys available for {interval_type} volume computation." ) return np.array([]) # Compute area for each interval volumes = [] for start, end in zip(peaks, valleys): # Always check that start < end for valid intervals if start < end: if mode == "peak": volume = integrate.trapezoid( self.waveform[start : end + 1] ) # Integrate over interval elif mode == "trough": volume = integrate.trapezoid( self.waveform[min(start, end) : max(start, end) + 1] ) else: raise ValueError("Volume mode must be 'peak' or 'trough'.") volumes.append(volume) # Return empty array if no valid volumes were computed if len(volumes) == 0: logger.warning(f"No valid volumes computed for {interval_type}.") return np.array([]) return np.array(volumes)
[docs] def compute_skewness(self, signal_type="ECG"): """ Compute the skewness of each complex in the signal. Parameters ---------- signal_type : str, optional The type of signal, either "ECG" or "PPG". Default is "ECG". Returns ------- List[float]: A list of skewness values, one for each complex. Examples -------- >>> signal = np.random.randn(1000) >>> extractor = PhysiologicalFeatureExtractor(signal) >>> skewness_values = extractor.compute_skewness() >>> print(f"Skewness values for each complex: {skewness_values}") """ # Define complex points based on signal type if signal_type == "ECG": sessions = self.detect_ecg_session() elif signal_type == "PPG": sessions = self.detect_ppg_session() else: raise ValueError("signal_type must be 'ECG' or 'PPG'.") # Check if sessions are empty if len(sessions) == 0: logger.warning( f"No sessions detected for {signal_type} skewness computation." ) return np.array([]) # Compute skewness for each complex skewness_values = [] for session in sessions: # Ensure valid intervals start = session[0] end = session[1] # Compute skewness for the complex segment if valid intervals are found if end > start: complex_segment = self.waveform[start : end + 1] # Suppress scipy warnings for edge cases import warnings with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) try: skewness_val = skew(complex_segment) if np.isfinite(skewness_val): skewness_values.append(skewness_val) except (ValueError, RuntimeWarning): # Skip invalid skewness calculations continue # Return empty array if no valid skewness values were computed if len(skewness_values) == 0: logger.warning(f"No valid skewness values computed for {signal_type}.") return np.array([]) return np.array(skewness_values)
[docs] def compute_duration(self, sessions=None, mode="Custom"): """ Computes the duration of the QRS complex in an ECG waveform. Returns: float: The QRS duration in milliseconds. Example: >>> ecg_signal = [...] # Sample ECG signal >>> wm = WaveformMorphology(ecg_signal, signal_type="ECG") >>> qrs_duration = wm.compute_qrs_duration() >>> print(f"QRS Duration: {qrs_duration} ms") """ if mode not in ["ECG", "PPG", "QRS", "Custom"]: raise ValueError( "Duration can only be computed for ECG signals, PPG signals, QRS complexes, or Custom sessions." ) if sessions is None: if mode == "ECG": sessions = self.detect_ecg_session() elif mode == "PPG": sessions = self.detect_ppg_session() elif mode == "QRS": sessions = self.detect_qrs_session() session_durations = [ (session[1] - session[0]) / self.fs for session in sessions ] return np.array(session_durations)
[docs] def compute_ppg_dicrotic_notch(self): """ Detects the dicrotic notch in a PPG waveform and computes its timing. Returns: float: The timing of the dicrotic notch in milliseconds. Example: >>> ppg_signal = [...] # Sample PPG signal >>> wm = WaveformMorphology(ppg_signal, signal_type="PPG") >>> notch_timing = wm.compute_ppg_dicrotic_notch() >>> print(f"Dicrotic Notch Timing: {notch_timing} ms") """ if self.signal_type != "PPG": raise ValueError("Dicrotic notch can only be computed for PPG signals.") # Detect peaks and dicrotic notches in the PPG signal peak_detector = PeakDetection(self.waveform, method="ppg_first_derivative") systolic_peaks = peak_detector.detect_peaks() # dicrotic_notch_detector = PeakDetection( # self.waveform, method="ppg_second_derivative" # ) # dicrotic_notches = dicrotic_notch_detector.detect_peaks() dicrotic_notches = self.detect_dicrotic_notches(systolic_peaks=systolic_peaks) # Compute the time difference between systolic peak and dicrotic notch notch_timing = 0.0 # for peak, notch in zip(systolic_peaks, dicrotic_notches): # if notch > peak: # notch_timing += ( # (notch - peak) * 1000 / self.fs # ) # Convert to milliseconds # return notch_timing / len(systolic_peaks) if len(systolic_peaks) > 0 else 0.0 valid_pairs = 0 for peak, notch in zip(systolic_peaks, dicrotic_notches): if notch > peak: notch_timing += ( (notch - peak) * 1000 / self.fs ) # Convert to milliseconds valid_pairs += 1 return notch_timing / valid_pairs if valid_pairs > 0 else 0.0
[docs] def compute_eeg_wavelet_features(self): """ Computes EEG wavelet features by applying a wavelet transform and extracting relevant frequency bands. Returns: dict: A dictionary of extracted EEG wavelet features (e.g., delta, theta, alpha). Example: >>> eeg_signal = [...] # Sample EEG signal >>> wm = WaveformMorphology(eeg_signal, signal_type="EEG") >>> wavelet_features = wm.compute_eeg_wavelet_features() >>> print(wavelet_features) """ if self.signal_type != "EEG": raise ValueError("EEG wavelet features can only be computed for EEG signals.") signal = self.waveform n = len(signal) fft_vals = np.fft.rfft(signal) freqs = np.fft.rfftfreq(n, d=1.0 / self.fs) def bandpower(fmin, fmax): mask = (freqs >= fmin) & (freqs <= fmax) return np.sum(np.abs(fft_vals[mask]) ** 2) / n delta = bandpower(0.5, 4.0) theta = bandpower(4.0, 8.0) alpha = bandpower(8.0, 13.0) beta = bandpower(13.0, 30.0) gamma = bandpower(30.0, 100.0) return {"delta": delta, "theta": theta, "alpha": alpha, "beta": beta, "gamma": gamma}
[docs] def compute_slope(self, points=None, option=None, window=3, slope_unit="radians"): """ Compute the slope of the waveform at specified points or critical points. Parameters ---------- points : list or np.array, optional Indices of the points where the slope is to be calculated. Ignored if `option` is specified. option : str, optional Specifies the critical points to use for slope calculation. Options for ECG are "p_peaks", "q_valleys", "r_peaks", "s_valleys", "t_peaks". Options for PPG are "troughs", "systolic_peaks", "diastolic_peaks". window : int, optional The number of points before and after each point to consider for slope calculation. Returns ------- slopes : np.array Slope values at the specified points or critical points. Example ------- >>> waveform = np.sin(np.linspace(0, 2 * np.pi, 100)) >>> wm = WaveformMorphology(waveform, signal_type="ECG") >>> slopes = wm.compute_slope(option="r_peaks") >>> print(f"Slopes at R peaks: {slopes}") """ if option: if option == "p_peaks": points = ( self.p_peaks if self.p_peaks is not None else self.detect_p_peak() ) elif option == "q_valleys": points = ( self.q_valleys if self.q_valleys is not None else self.detect_q_valley() ) elif option == "r_peaks": points = self.r_peaks elif option == "s_valleys": points = ( self.s_valleys if self.s_valleys is not None else self.detect_s_valley() ) elif option == "t_peaks": points = ( self.t_peaks if self.t_peaks is not None else self.detect_t_peak() ) elif option == "troughs": points = ( self.diastolic_troughs if self.diastolic_troughs is not None else self.detect_troughs() ) elif option == "systolic_peaks": points = self.systolic_peaks elif option == "diastolic_peaks": points = ( self.diastolic_peaks if self.diastolic_peaks is not None else self.detect_diastolic_peak() ) # from plotly import graph_objects as go # fig = go.Figure() # fig.add_trace(go.Scatter(x=np.arange(len(self.waveform)), y=self.waveform, mode='lines')) # fig.add_trace(go.Scatter(x=self.dicrotic_notches, y=self.waveform[self.dicrotic_notches], mode='markers', name='Dicrotic Notches')) # fig.add_trace(go.Scatter(x=self.diastolic_peaks, y=self.waveform[self.diastolic_peaks], mode='markers', name='Diastolic Peaks')) # fig.add_trace(go.Scatter(x=self.systolic_peaks, y=self.waveform[self.systolic_peaks], mode='markers', name='Systolic Peaks')) # fig.add_trace(go.Scatter(x=self.diastolic_troughs, y=self.waveform[self.diastolic_troughs], mode='markers', name='Diastolic Troughs')) # fig.show() else: raise ValueError(f"Invalid option '{option}' for critical points.") if points is None: raise ValueError("No points specified for slope calculation.") slopes = [] for point in points: # Define the window around the point start = max(0, point - window) end = min(len(self.waveform) - 1, point + window) if end <= start + 1: # Need at least 2 points for a slope slopes.append(0.0) # Flat slope if window is too small continue # Extract segment and corresponding x-axis (in samples) x = np.arange(start, end + 1) y = self.waveform[start : end + 1] # Fit a line using least-squares (polyfit degree 1) coeffs = np.polyfit(x, y, 1) # coeffs[0] is slope, coeffs[1] is intercept raw_slope = coeffs[0] # Slope in signal units per sample # Convert to angle based on sampling frequency-adjusted x-axis # dx = (end - start) / self.fs # Time span in seconds # dy = self.waveform[end] - self.waveform[start] # Amplitude change # raw_slope = dy / dx # Slope in signal units per second if slope_unit == "degrees": slope = np.degrees(np.arctan(raw_slope)) # Convert to degrees elif slope_unit == "radians": slope = np.arctan(raw_slope) # Convert to radians elif slope_unit == "raw": slope = raw_slope # Keep as signal units per sample else: raise ValueError("slope_unit must be 'degrees', 'radians', or 'raw'.") slopes.append(slope) return np.array(slopes)
[docs] def compute_curvature(self, points=None, option=None, window=3): """ Compute the curvature of the waveform at specified points or critical points. Parameters ---------- points : list or np.array, optional Indices of the points where the curvature is to be calculated. Ignored if `option` is specified. option : str, optional Specifies the critical points to use for curvature calculation. Options for ECG are "p_peaks", "q_valleys", "r_peaks", "s_valleys", "t_peaks". Options for PPG are "troughs", "systolic_peaks", "diastolic_peaks". window : int, optional The number of points before and after each point to consider for curvature calculation. Returns ------- curvatures : np.array Curvature values at the specified points or critical points. Example ------- >>> waveform = np.sin(np.linspace(0, 2 * np.pi, 100)) >>> wm = WaveformMorphology(waveform, signal_type="PPG") >>> curvatures = wm.compute_curvature(option="systolic_peaks") >>> print(f"Curvatures at systolic peaks: {curvatures}") """ if option: if option == "p_peaks": points = ( self.p_peaks if self.p_peaks is not None else self.detect_p_peak() ) elif option == "q_valleys": points = ( self.q_valleys if self.q_valleys is not None else self.detect_q_valley() ) elif option == "r_peaks": points = ( self.r_peaks if self.r_peaks is not None else self.detect_r_peaks() ) elif option == "s_valleys": points = ( self.s_valleys if self.s_valleys is not None else self.detect_s_valley() ) elif option == "t_peaks": points = ( self.t_peaks if self.t_peaks is not None else self.detect_t_peak() ) elif option == "troughs": points = ( self.diastolic_troughs if self.diastolic_troughs is not None else self.detect_troughs() ) elif option == "systolic_peaks": points = ( self.systolic_peaks if self.systolic_peaks is not None else self.detect_systolic_peaks() ) elif option == "diastolic_peaks": points = ( self.diastolic_peaks if self.diastolic_peaks is not None else self.detect_diastolic_peak() ) else: raise ValueError(f"Invalid option '{option}' for critical points.") if points is None: raise ValueError("No points specified for curvature calculation.") curvatures = [] for point in points: start = max(0, point - window) end = min(len(self.waveform) - 1, point + window) dy_dx = np.gradient(self.waveform[start : end + 1]) d2y_dx2 = np.gradient(dy_dx) if len(d2y_dx2) > 1: curvature = d2y_dx2[len(d2y_dx2) // 2] else: curvature = 0 curvatures.append(curvature) return np.array(curvatures)
[docs] def get_duration( self, start_points=None, end_points=None, session_type="systolic", summary_type="mean", ): """ Computes the duration based on detected start and end points for a specified session type. Parameters ---------- start_points : list, optional List of detected start indices in the signal. If None, defaults based on `session_type`. end_points : list, optional List of detected end indices in the signal. If None, defaults based on `session_type`. session_type : str, optional Specifies the type of session: "systolic", "diastolic", "qrs", or "Custom". - For "systolic" and "diastolic", appropriate start and end points will be detected if not provided. - For "qrs", QRS sessions are automatically detected and do not require start or end points. - For "Custom", start and end points must be provided as arguments. summary_type : str, optional Type of summary to apply to the computed durations. Options are 'mean', 'median', '2nd_quartile', '3rd_quartile', or 'full' (returns all durations). Returns ------- float or list The summarized duration based on `summary_type`, or a list of durations if `summary_type` is 'full'. Notes ----- Logs an error and returns NaN if no valid sessions are detected or if an invalid summary type is provided. """ valid_summary_types = ["mean", "median", "sum"] if summary_type not in valid_summary_types: raise ValueError( "Invalid summary_type. Supported types are 'mean', 'median', and 'sum'." ) try: # Handle each session type to determine start and end points if session_type == "systolic": start_points = ( start_points if start_points is not None else self.detect_troughs() ) end_points = ( end_points if end_points is not None else self.detect_dicrotic_notches(diastolic_troughs=start_points) ) elif session_type == "diastolic": start_points = ( start_points if start_points is not None else self.detect_dicrotic_notches() ) end_points = ( end_points if end_points is not None else self.detect_troughs() ) elif session_type == "qrs": sessions = self.detect_qrs_session() if sessions is None or ( hasattr(sessions, "__len__") and len(sessions) == 0 ): raise ValueError("No valid QRS sessions for duration calculation.") return self._summarize_list( self.compute_duration(sessions), summary_type ) elif session_type == "Custom": if start_points is None or end_points is None: raise ValueError( "For 'Custom' session type, start_points and end_points must be provided." ) else: raise ValueError(f"Unsupported session type: {session_type}") # Validate start and end points if ( start_points is None or end_points is None or len(start_points) == 0 or len(end_points) == 0 ): raise ValueError( "Start or end points detection returned an empty list." ) # Adjust lists for alignment (e.g., if the first end point is before the first start point) if session_type in ["systolic", "diastolic", "Custom"]: if end_points[0] < start_points[0]: end_points = end_points[1:] min_length = min(len(start_points), len(end_points)) if min_length == 0: raise ValueError( f"No valid pairs of start and end points for {session_type} duration calculation." ) # Compute durations based on aligned start and end points sessions = [ (start, end) for start, end in zip( start_points[:min_length], end_points[:min_length] ) if end > start ] if sessions is None or ( hasattr(sessions, "__len__") and len(sessions) == 0 ): raise ValueError( f"No valid durations computed for {session_type} sessions." ) durations = self.compute_duration(sessions) return self._summarize_list(durations, summary_type) except ValueError as e: logger.error(f"Value error in get_duration for {session_type} session: {e}") return np.nan except Exception as e: logger.error( f"Unexpected error in get_duration for {session_type} session: {e}" ) return np.nan
def _summarize_list(self, values, summary_type): """ Helper function to apply summary type to a list of values. Parameters ---------- values : list List of values to summarize. summary_type : str Summary type: 'mean', 'median', 'q1', 'q3', or 'full'. Returns ------- float or list The summarized value or the original list if `summary_type` is 'full'. """ if summary_type == "mean": return np.mean(values) elif summary_type == "median": return np.median(values) elif summary_type == "q1": return np.percentile(values, 25) elif summary_type == "q3": return np.percentile(values, 75) elif summary_type == "full": return values else: raise ValueError(f"Unsupported summary type: {summary_type}")
[docs] def get_area(self, interval_type, signal_type="PPG", summary_type="mean"): """ Computes the area of the specified interval in the signal with the chosen summary type. Parameters ---------- interval_type : str Type of interval to compute the area for. Options include: - For PPG: "Sys-to-Notch", "Notch-to-Dia", "Sys-to-Dia" - For ECG: "R-to-Q", "R-to-S", "QRS", "T-to-S" signal_type : str, optional The type of signal: "PPG" or "ECG". Default is "PPG". summary_type : str, optional Specifies the summary statistic to return: - 'mean': Mean of all areas - 'median': Median of all areas - '2nd_quartile': 25th percentile (1st quartile) - '3rd_quartile': 75th percentile (3rd quartile) - 'full': Returns all areas as a list Default is 'mean'. Returns ------- float or list The summary statistic for the area, or the list of all areas if `summary_type` is 'full'. Returns NaN if an error occurs. Notes ----- Logs an error and returns NaN if the area computation fails or if an invalid summary type is provided. """ valid_summary_types = ["mean", "median", "sum"] if summary_type not in valid_summary_types: raise ValueError( "Invalid summary_type. Supported types are 'mean', 'median', and 'sum'." ) try: # If interval type is "QRS", compute combined area in a single step if interval_type == "QRS" and signal_type == "ECG": areas_r_to_q = self.compute_volume( interval_type="R-to-Q", signal_type=signal_type ) areas_r_to_s = self.compute_volume( interval_type="R-to-S", signal_type=signal_type ) if areas_r_to_q is None or areas_r_to_s is None: raise ValueError( "QRS area computation failed due to invalid sub-interval areas." ) min_length = min(len(areas_r_to_q), len(areas_r_to_s)) if len(areas_r_to_q) != len(areas_r_to_s): logger.warning( "Mismatch in lengths for R-to-Q and R-to-S areas. Truncating to minimum length." ) areas_r_to_q = areas_r_to_q[:min_length] areas_r_to_s = areas_r_to_s[:min_length] areas = np.array(areas_r_to_q) + np.array(areas_r_to_s) elif interval_type == "Notch-to-Dia" and signal_type == "PPG": areas_sys_to_sys = self.compute_volume( interval_type="Sys-to-Sys", signal_type=signal_type ) areas_sys_to_notch = self.compute_volume( interval_type="Sys-to-Notch", signal_type=signal_type ) if areas_sys_to_sys is None or areas_sys_to_notch is None: raise ValueError( "Notch-to-Dia computation failed due to invalid Sys-to-Sys or Sys-to-Notch areas." ) min_length = min(len(areas_sys_to_sys), len(areas_sys_to_notch)) if len(areas_sys_to_sys) != len(areas_sys_to_notch): areas_sys_to_sys = areas_sys_to_sys[:min_length] areas_sys_to_notch = areas_sys_to_notch[:min_length] areas = np.array(areas_sys_to_sys) - np.array(areas_sys_to_notch) else: areas = self.compute_volume( interval_type=interval_type, signal_type=signal_type ) # Validate computed areas if ( areas is None or not isinstance(areas, (list, np.ndarray)) or len(areas) == 0 ): raise ValueError( f"Computed areas for {interval_type} are None or empty." ) # Normalize by sampling frequency to get area in amplitude × seconds areas = areas / self.fs return self._summarize_list(areas, summary_type) except ValueError as e: logger.error(f"Value error in get_area for {interval_type} interval: {e}") return np.nan except Exception as e: logger.error( f"Unexpected error in get_area for {interval_type} interval: {e}" ) return np.nan
[docs] def get_slope( self, slope_type="systolic", window=5, summary_type="mean", slope_unit="radians" ): """ Computes the slope of the specified type (systolic, diastolic, QRS) using the chosen summary type. Parameters ---------- slope_type : str, optional The type of slope to calculate. Options are: - "systolic": Slope from systolic peaks. - "diastolic": Slope from diastolic peaks. - "qrs": Slope from QRS R peaks. Default is "systolic". window : int, optional The window size for slope calculation. Default is 5. summary_type : str, optional Specifies the summary statistic to return: - 'mean': Mean of all slopes - 'median': Median of all slopes - '2nd_quartile': 25th percentile (1st quartile) - '3rd_quartile': 75th percentile (3rd quartile) - 'full': Returns all slopes as a list Default is 'mean'. Returns ------- float or list The summary statistic for the slopes, or the list of slopes if `summary_type` is 'full'. Returns NaN if an error occurs. """ try: # Determine the option based on slope_type options_map = { "systolic": "systolic_peaks", "diastolic": "diastolic_peaks", "qrs": "r_peaks", } if slope_type not in options_map: raise ValueError(f"Unsupported slope type: {slope_type}") # Compute slopes based on specified option slopes = self.compute_slope( option=options_map[slope_type], window=window, slope_unit=slope_unit ) # Validate slopes array if ( slopes is None or not isinstance(slopes, (list, np.ndarray)) or len(slopes) == 0 ): raise ValueError(f"Computed slopes for {slope_type} are None or empty.") # Apply the specified summary type return self._summarize_list(slopes, summary_type) except ValueError as e: logger.error(f"Value error in get_slope for {slope_type}: {e}") return np.nan except Exception as e: logger.error(f"Unexpected error in get_slope for {slope_type}: {e}") return np.nan
[docs] def get_signal_skewness(self, signal_type="PPG", summary_type="mean"): """ Computes the skewness of the signal based on the specified signal type and summary type. Parameters ---------- signal_type : str, optional Type of signal to compute skewness for (e.g., 'PPG' or 'ECG'). Default is 'PPG'. summary_type : str, optional Type of summary to apply to the computed skewness values. Options are 'mean', 'median', '2nd_quartile', '3rd_quartile', or 'full' (returns all skewness values). Returns ------- float or list The summarized skewness value, or a list of skewness values if summary_type is 'full'. """ try: # Calculate skewness based on the signal type skewness_list = self.compute_skewness(signal_type=signal_type) # Ensure skewness_list is a valid, non-empty list or array if skewness_list is None or not isinstance( skewness_list, (list, np.ndarray) ): raise ValueError("Computed skewness is None or invalid.") if len(skewness_list) == 0: raise ValueError("Computed skewness is an empty array.") return self._summarize_list(skewness_list, summary_type) except ValueError as e: logger.error(f"Value error in get_signal_skewness: {e}") return np.nan except Exception as e: logger.error(f"Unexpected error in get_signal_skewness: {e}") return np.nan
[docs] def get_peak_trend_slope( self, peaks=None, method="linear_regression", window_size=5 ): """ Calculate the trend slope of peak values using specified method. Parameters ---------- peaks : list or np.ndarray The y-values of detected peaks (peak amplitudes). method : str, optional The method to calculate trend slope. Options are 'linear_regression', 'moving_average', and 'rate_of_change'. Default is 'linear_regression'. window_size : int, optional The window size for moving average calculation. Used only when method='moving_average'. Default is 5. Returns ------- float or np.ndarray The calculated trend slope. If 'moving_average' method is selected, returns an array of slopes. """ try: if peaks is None: if self.signal_type == "ECG": peaks = self.r_peaks if self.signal_type == "PPG": peaks = self.systolic_peaks if peaks.size == 0: return 0.0 # Check if peaks is a valid array-like input if not isinstance(peaks, (list, np.ndarray)) or len(peaks) == 0: raise ValueError("Peaks data is None, invalid, or empty.") peaks = np.array(peaks) # Ensure peaks is an np.ndarray if method == "linear_regression": # Use linear regression to compute the slope of the trend x = np.arange(len(peaks)) slope, _, _, _, _ = linregress(x, peaks) return slope elif method == "moving_average": # Compute the slope based on the moving average trend if len(peaks) < window_size: raise ValueError( "Window size is greater than the number of peak values." ) moving_averages = np.convolve( peaks, np.ones(window_size) / window_size, mode="valid" ) slopes = np.diff(moving_averages) / window_size return np.mean(slopes) elif method == "rate_of_change": # Calculate the overall rate of change across the entire signal if len(peaks) < 2: raise ValueError( "Rate of change requires at least two peak values." ) overall_slope = (peaks[-1] - peaks[0]) / (len(peaks) - 1) return overall_slope else: raise ValueError(f"Unsupported method: {method}") except ValueError as e: logger.error(f"Value error in get_peak_trend_slope: {e}") return np.nan except Exception as e: logger.error(f"Unexpected error in get_peak_trend_slope: {e}") return np.nan
[docs] def get_amplitude_variability( self, interval_type="Sys-to-Baseline", baseline_method="moving_average", signal_type="PPG", method="std_dev", ): """ Calculates the variability in amplitude over the specified interval or baseline comparison. Parameters ---------- interval_type : str The interval type for amplitude calculation, with default "Sys-to-Baseline". signal_type : str The type of signal: "PPG" or "ECG". method : str, optional The method to calculate variability: "std" for standard deviation or "cv" for coefficient of variation. Returns ------- float : Variability of the amplitudes. Examples -------- >>> variability = wm.get_amplitude_variability(interval_type="Sys-to-Baseline", signal_type="PPG") >>> print(f"Amplitude variability (Sys-to-Baseline): {variability}") """ # Early validation for method valid_methods = ["std_dev", "cv", "interquartile_range"] if method not in valid_methods: raise ValueError(f"Unsupported variability method: {method}") try: # Compute amplitudes based on the interval and signal type amplitudes = self.compute_amplitude( interval_type=interval_type, signal_type=signal_type, baseline_method=baseline_method, ) # Flatten amplitudes if needed if isinstance(amplitudes, np.ndarray) and amplitudes.ndim > 1: amplitudes = amplitudes.flatten() # Ensure amplitudes is a valid, non-empty array if amplitudes is None or amplitudes.size == 0: raise ValueError( "No amplitudes calculated; ensure peaks and baselines are detected properly." ) if method == "std_dev": # Standard deviation of amplitude variability variability = np.std(amplitudes) elif method == "cv": # Coefficient of Variation (CV) mean_amplitude = np.mean(amplitudes) if mean_amplitude == 0: raise ValueError( "Mean amplitude is zero, cannot compute coefficient of variation." ) variability = np.std(amplitudes) / mean_amplitude elif method == "interquartile_range": # Interquartile Range (IQR) q1 = np.percentile(amplitudes, 25) q3 = np.percentile(amplitudes, 75) variability = q3 - q1 return variability except ValueError as e: logger.error(f"Value error in get_systolic_amplitude_variability: {e}") return np.nan except Exception as e: logger.error(f"Unexpected error in get_systolic_amplitude_variability: {e}") return np.nan
[docs] def get_qrs_amplitude(self, summary_type="mean"): """ Calculates the QRS amplitude by finding the maximum amplitude between the R-to-S and R-to-Q intervals. Parameters ---------- summary_type : str, optional Specifies the summary statistic to return: - 'mean': Mean of all QRS amplitudes - 'median': Median of all QRS amplitudes - '2nd_quartile': 25th percentile (1st quartile) - '3rd_quartile': 75th percentile (3rd quartile) - 'full': Returns all QRS amplitudes as a list Default is 'mean'. Returns ------- float or list The summary statistic for QRS amplitude, or the list of amplitudes if `summary_type` is 'full'. Returns NaN if an error occurs. Notes ----- Logs an error and returns NaN if an invalid summary type is provided or if amplitude computation fails. """ if np.all(self.waveform == 0): return 0 try: # Compute R-to-S and R-to-Q amplitudes rs_amplitudes = np.array(self.compute_amplitude(interval_type="R-to-S")) qr_amplitudes = np.array(self.compute_amplitude(interval_type="R-to-Q")) # Verify amplitude arrays are valid and non-empty if rs_amplitudes.size == 0 or qr_amplitudes.size == 0: raise ValueError( "R-to-S or R-to-Q amplitude array is empty or invalid." ) # Check if the lengths of the two areas are equal min_length = min(len(rs_amplitudes), len(qr_amplitudes)) if len(rs_amplitudes) != len(qr_amplitudes): # Log a warning if lengths differ logger.warning( "Mismatch in lengths for R-to-Q and R-to-S areas. Truncating to minimum length." ) # Truncate both arrays to the minimum length to align them rs_amplitudes = rs_amplitudes[:min_length] qr_amplitudes = qr_amplitudes[:min_length] # Calculate maximum amplitude between R-S and R-Q intervals using vectorized operation qrs_amplitudes = np.maximum(rs_amplitudes, qr_amplitudes) return self._summarize_list(qrs_amplitudes, summary_type) except ValueError as e: logger.error(f"Value error in get_qrs_amplitude: {e}") return np.nan except Exception as e: logger.error(f"Unexpected error in get_qrs_amplitude: {e}") return np.nan
[docs] def get_heart_rate(self, summary_type="mean"): """ Computes the heart rate based on R-R intervals. Parameters ---------- summary_type : str, optional Specifies the summary statistic to return: - 'mean': Mean heart rate - 'median': Median heart rate - '2nd_quartile': 25th percentile (1st quartile) - '3rd_quartile': 75th percentile (3rd quartile) - 'full': Returns all computed heart rates as a list Default is 'mean'. Returns ------- float or list The summary statistic for heart rates, or a list of all heart rates if `summary_type` is 'full'. Returns NaN if an error occurs. Notes ----- Logs an error and returns NaN if an invalid summary type is provided or if heart rate computation fails. """ try: # Compute R-R intervals in seconds if self.signal_type == "ECG": rr_intervals = np.diff(self.r_peaks) / self.fs else: rr_intervals = np.diff(self.systolic_peaks) / self.fs # Ensure rr_intervals is valid and non-empty if rr_intervals.size == 0: raise ValueError("No R-R intervals found; cannot compute heart rate.") # Calculate heart rates from R-R intervals (in beats per minute) heart_rates = 60 / rr_intervals # Dictionary to map summary types to computations summary_methods = { "mean": np.mean(heart_rates), "median": np.median(heart_rates), "2nd_quartile": np.percentile(heart_rates, 25), "3rd_quartile": np.percentile(heart_rates, 75), "full": heart_rates.tolist(), } # Retrieve and return the computed summary based on the selected type if summary_type in summary_methods: return summary_methods[summary_type] else: raise ValueError(f"Unsupported summary type: {summary_type}") except ValueError as e: logger.error(f"Value error in get_heart_rate: {e}") return np.nan except Exception as e: logger.error(f"Unexpected error in get_heart_rate: {e}") return np.nan