Source code for vitalDSP.respiratory_analysis.estimate_rr.time_domain_rr

"""
Respiratory Analysis Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological
signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0

Key Features:
- Multiple processing methods and functions
- NumPy integration for numerical computations

Examples:
--------
Basic usage:
    >>> import numpy as np
    >>> from vitalDSP.estimate_rr.time_domain_rr import TimeDomainRr
    >>> signal = np.random.randn(1000)
    >>> processor = TimeDomainRr(signal)
    >>> result = processor.process()
    >>> print(f'Processing result: {result}')
"""

import numpy as np
import warnings
import logging
from scipy.signal import find_peaks
from vitalDSP.preprocess.preprocess_operations import preprocess_signal

logger = logging.getLogger(__name__)


[docs] def time_domain_rr(signal, sampling_rate, preprocess=None, **preprocess_kwargs): """ Estimate respiratory rate using time-domain autocorrelation method. This method computes the autocorrelation of the respiratory signal and finds the dominant periodicity within the physiological respiratory range (1.5-10 seconds corresponding to 40-6 breaths/min). Parameters ---------- signal : numpy.ndarray The input respiratory signal. sampling_rate : float The sampling rate of the signal in Hz. preprocess : str, optional The preprocessing method to apply before estimation (e.g., "bandpass", "wavelet"). preprocess_kwargs : dict, optional Additional arguments for the preprocessing function. Returns ------- rr : float Estimated respiratory rate in breaths per minute. Examples -------- >>> signal = np.sin(2 * np.pi * 0.2 * np.arange(0, 10, 0.01)) >>> rr = time_domain_rr(signal, sampling_rate=100, preprocess='bandpass', lowcut=0.1, highcut=0.5) >>> print(rr) Notes ----- This implementation fixes the critical bug in the original version that used np.diff() followed by np.argmax(), which finds maximum slope instead of maximum peak. The corrected version uses scipy.signal.find_peaks() to properly identify autocorrelation peaks within the valid respiratory range. References ---------- .. [1] Box, G.E., Jenkins, G.M. and Reinsel, G.C., 1994. Time series analysis: forecasting and control. Prentice Hall. """ logger.debug("=" * 80) logger.debug("TIME DOMAIN (AUTOCORRELATION) RR - Starting estimation") logger.debug( f"Input signal: {len(signal)} samples at {sampling_rate} Hz ({len(signal)/sampling_rate:.2f}s duration)" ) logger.debug( f"Signal statistics: range=[{np.min(signal):.4f}, {np.max(signal):.4f}], mean={np.mean(signal):.4f}, std={np.std(signal):.4f}" ) # Apply preprocessing if specified if preprocess: logger.debug(f"Preprocessing: {preprocess} with params {preprocess_kwargs}") original_signal = signal.copy() signal = preprocess_signal( signal, sampling_rate, filter_type=preprocess, **preprocess_kwargs ) logger.debug( f"After preprocessing: mean={np.mean(signal):.4f}, std={np.std(signal):.4f}" ) # Validate signal if len(signal) < 10: warnings.warn("Signal too short for reliable autocorrelation analysis") return 0.0 # Normalize the signal (zero mean, unit variance) signal_mean = np.mean(signal) signal_std = np.std(signal) logger.debug(f"Normalizing signal: mean={signal_mean:.4f}, std={signal_std:.4f}") if signal_std < 1e-10: logger.warning("Signal has zero or near-zero variance") warnings.warn("Signal has zero or near-zero variance") return 0.0 signal = (signal - signal_mean) / signal_std # Compute the autocorrelation of the signal logger.debug("Computing autocorrelation...") autocorr = np.correlate(signal, signal, mode="full") autocorr = autocorr[len(autocorr) // 2 :] # Keep only positive lags logger.debug(f"Autocorrelation computed: {len(autocorr)} lag points") # Normalize autocorrelation to [0, 1] range if autocorr[0] > 0: autocorr = autocorr / autocorr[0] logger.debug(f"Autocorrelation normalized: max={np.max(autocorr):.4f}") else: logger.warning("Invalid autocorrelation computation") warnings.warn("Invalid autocorrelation computation") return 0.0 # Define valid lag range for respiratory signals # Normal breathing: 6-40 BPM → 1.5-10 seconds period min_lag = int(1.5 * sampling_rate) # 40 BPM max max_lag = int(10 * sampling_rate) # 6 BPM min logger.debug( f"Respiratory lag range: {min_lag}-{max_lag} samples ({min_lag/sampling_rate:.2f}-{max_lag/sampling_rate:.2f}s)" ) logger.debug(f"Corresponding to: 40-6 BPM") # Ensure we have valid search range if min_lag >= len(autocorr): logger.warning( f"Signal too short ({len(signal)/sampling_rate:.1f}s) for minimum respiratory period (1.5s)" ) warnings.warn( f"Signal too short ({len(signal)/sampling_rate:.1f}s) for minimum respiratory period (1.5s)" ) return 0.0 max_lag = min(max_lag, len(autocorr) - 1) if min_lag >= max_lag: logger.warning("Invalid lag range for autocorrelation search") warnings.warn("Invalid lag range for autocorrelation search") return 0.0 # Only search for peaks within valid respiratory range search_range = autocorr[min_lag:max_lag] logger.debug(f"Search range: {len(search_range)} lag points") logger.debug( f"Search range autocorr values: min={np.min(search_range):.4f}, max={np.max(search_range):.4f}, mean={np.mean(search_range):.4f}" ) # Find peaks in autocorrelation with minimum prominence logger.debug("Finding peaks in autocorrelation with prominence >= 0.1...") peaks, properties = find_peaks(search_range, prominence=0.1) if len(peaks) == 0: # No clear peaks found - use global maximum in search range logger.warning("No clear autocorrelation peaks found with prominence >= 0.1") logger.debug("Falling back to global maximum in search range") warnings.warn("No clear autocorrelation peak found, using global maximum") peak_lag = np.argmax(search_range) + min_lag peak_value = search_range[np.argmax(search_range)] logger.debug( f"Global max: lag={peak_lag} samples ({peak_lag/sampling_rate:.3f}s), value={peak_value:.4f}" ) else: # Take the strongest peak (highest prominence) logger.debug(f"Found {len(peaks)} autocorrelation peaks") logger.debug("Top 5 peaks by prominence:") sorted_indices = np.argsort(properties["prominences"])[::-1][:5] for i, idx in enumerate(sorted_indices): if idx < len(peaks): peak_pos = peaks[idx] actual_lag = peak_pos + min_lag period = actual_lag / sampling_rate rr_candidate = 60 / period prominence = properties["prominences"][idx] autocorr_value = search_range[peak_pos] logger.debug( f" {i+1}. Lag={actual_lag} samples ({period:.3f}s) → {rr_candidate:.1f} BPM, prominence={prominence:.4f}, autocorr={autocorr_value:.4f}" ) strongest_peak_idx = peaks[np.argmax(properties["prominences"])] peak_lag = strongest_peak_idx + min_lag logger.debug( f"Selected strongest peak: lag={peak_lag} samples ({peak_lag/sampling_rate:.3f}s)" ) # Convert lag to respiratory period (in seconds) rr_interval = peak_lag / sampling_rate # Calculate respiratory rate rr = 60 / rr_interval logger.debug(f"Respiratory interval: {rr_interval:.3f}s") logger.debug(f"Calculated RR: {rr:.1f} BPM") # Validate result is within physiological range if rr < 6 or rr > 40: logger.warning( f"RR estimate ({rr:.1f} BPM) outside physiological range (6-40 BPM)" ) warnings.warn( f"Estimated RR ({rr:.1f} BPM) outside physiological range (6-40 BPM). " f"Result may be unreliable." ) else: logger.debug(f"✓ RR estimate within physiological range") logger.debug(f"✓ FINAL RR ESTIMATE: {rr:.1f} BPM") logger.debug("=" * 80) return float(rr)