Source code for vitalDSP.respiratory_analysis.estimate_rr.frequency_domain_rr

"""
Respiratory Analysis Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological
signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0

Key Features:
- Multiple processing methods and functions
- NumPy integration for numerical computations
- SciPy integration for advanced signal processing

Examples:
--------
Basic usage:
    >>> import numpy as np
    >>> from vitalDSP.estimate_rr.frequency_domain_rr import FrequencyDomainRr
    >>> signal = np.random.randn(1000)
    >>> processor = FrequencyDomainRr(signal)
    >>> result = processor.process()
    >>> print(f'Processing result: {result}')
"""

import numpy as np
import warnings
import logging
from scipy.signal import welch
from vitalDSP.preprocess.preprocess_operations import preprocess_signal

# Set up logger for this module
logger = logging.getLogger(__name__)


[docs] def frequency_domain_rr( signal, sampling_rate, preprocess=None, nperseg=None, freq_min=0.1, freq_max=0.5, **preprocess_kwargs, ): """ Estimate respiratory rate using frequency-domain Welch method with respiratory band filtering. This method computes the power spectral density (PSD) using Welch's method, which provides better noise reduction than standard FFT through averaging of overlapping segments. The search is restricted to the physiological respiratory frequency range (0.1-0.5 Hz or 6-30 BPM). Parameters ---------- signal : numpy.ndarray The input respiratory signal. sampling_rate : float The sampling rate of the signal in Hz. preprocess : str, optional The preprocessing method to apply before estimation (e.g., "bandpass", "wavelet"). nperseg : int, optional Length of each segment for Welch method. If None, automatically computed to achieve ~0.05 Hz frequency resolution (3 BPM discrimination). Larger nperseg provides better frequency resolution but requires longer signals. freq_min : float, optional (default=0.1) Minimum respiratory frequency in Hz (6 BPM). freq_max : float, optional (default=0.5) Maximum respiratory frequency in Hz (30 BPM). Can be increased to 0.67 Hz (40 BPM) for tachypnea or exercise. preprocess_kwargs : dict, optional Additional arguments for the preprocessing function. Returns ------- rr : float Estimated respiratory rate in breaths per minute. Examples -------- >>> signal = np.sin(2 * np.pi * 0.2 * np.arange(0, 10, 0.01)) >>> rr = frequency_domain_rr(signal, sampling_rate=100, preprocess='bandpass', lowcut=0.1, highcut=0.5) >>> print(rr) >>> # Specify custom nperseg for better frequency resolution >>> rr = frequency_domain_rr(signal, sampling_rate=100, nperseg=512) Notes ----- This implementation fixes two critical bugs in the original version: 1. Missing respiratory band filtering (searched entire spectrum, could pick cardiac) 2. Inappropriate nperseg default (256 samples → only 0.5 Hz resolution at 128 Hz sampling) The corrected version: - Automatically sets nperseg to achieve 0.05 Hz resolution (3 BPM discrimination) - Restricts search to respiratory frequency band - Includes SNR validation for quality assessment - Provides comprehensive warnings for edge cases Frequency Resolution: Δf = sampling_rate / nperseg For respiratory analysis, we need Δf ≤ 0.05 Hz to discriminate between different breathing rates (e.g., 12 BPM vs 15 BPM = 0.2 Hz vs 0.25 Hz). References ---------- .. [1] Welch, P. (1967). The use of fast Fourier transform for the estimation of power spectra: a method based on time averaging over short, modified periodograms. IEEE Transactions on audio and electroacoustics, 15(2), 70-73. .. [2] Charlton, P.H., et al. (2018). Breathing rate estimation from the electrocardiogram and photoplethysmogram: A review. IEEE Reviews in Biomedical Engineering, 11, 2-20. """ logger.debug("=" * 80) logger.debug("FREQUENCY-DOMAIN RR (Welch PSD) - Starting estimation") logger.debug( f"Input signal: {len(signal)} samples, {sampling_rate} Hz, {len(signal)/sampling_rate:.2f}s" ) logger.debug(f"Signal stats: mean={np.mean(signal):.4f}, std={np.std(signal):.4f}") logger.debug( f"Respiratory range: {freq_min}-{freq_max} Hz ({freq_min*60:.0f}-{freq_max*60:.0f} BPM)" ) # Apply preprocessing if specified if preprocess: logger.debug(f"Preprocessing: {preprocess}") signal = preprocess_signal( signal, sampling_rate, filter_type=preprocess, **preprocess_kwargs ) # Validate signal if len(signal) < 10: logger.error("Signal too short") warnings.warn("Signal too short for reliable Welch PSD analysis") return 0.0 # Set appropriate nperseg for respiratory frequency resolution # Target: 0.05 Hz resolution (3 BPM discrimination capability) # Frequency resolution = sampling_rate / nperseg # So: nperseg = sampling_rate / 0.05 if nperseg is None: target_resolution = 0.05 # Hz nperseg = int(sampling_rate / target_resolution) nperseg = max(256, nperseg) if nperseg % 2 != 0: nperseg += 1 # This MUST be last to ensure nperseg <= signal length nperseg = min(nperseg, len(signal)) # Compute the power spectral density using the Welch method logger.debug(f"Computing Welch PSD with nperseg={nperseg}") freqs, psd = welch(signal, fs=sampling_rate, nperseg=nperseg) freq_resolution = freqs[1] - freqs[0] if len(freqs) > 1 else 0 logger.debug( f"PSD computed: {len(freqs)} frequency bins, resolution={freq_resolution:.4f} Hz" ) # Filter to respiratory frequency range ONLY (critical fix) respiratory_mask = (freqs >= freq_min) & (freqs <= freq_max) resp_freqs = freqs[respiratory_mask] resp_psd = psd[respiratory_mask] logger.debug(f"Frequencies in respiratory band: {len(resp_psd)}") if len(resp_psd) == 0: logger.error(f"No frequencies in range {freq_min}-{freq_max} Hz") raise ValueError( f"No frequencies found in respiratory range ({freq_min}-{freq_max} Hz). " f"Signal may be too short, sampling rate too low, or nperseg too large. " f"Frequency resolution: {freq_resolution:.4f} Hz. " f"Minimum signal length: {int(1/freq_min * sampling_rate)} samples." ) # Find peak in respiratory band peak_idx = np.argmax(resp_psd) peak_freq = resp_freqs[peak_idx] peak_power = resp_psd[peak_idx] logger.debug(f"Peak: {peak_freq:.4f} Hz, Power: {peak_power:.4e}") # Show top 5 peaks sorted_indices = np.argsort(resp_psd)[::-1][:5] logger.debug("Top 5 PSD peaks:") for i, idx in enumerate(sorted_indices): freq = resp_freqs[idx] power = resp_psd[idx] logger.debug(f" {i+1}. {freq:.4f} Hz ({freq*60:.1f} BPM) - Power: {power:.4e}") # SNR validation noise_floor = np.median(resp_psd) snr = peak_power / (noise_floor + 1e-10) logger.debug(f"SNR: {snr:.2f} (noise floor: {noise_floor:.4e})") if snr < 3.0: logger.warning(f"Low SNR: {snr:.2f}") warnings.warn( f"Low SNR ({snr:.2f}) in Welch PSD peak detection. " f"Peak power: {peak_power:.2e}, Noise floor: {noise_floor:.2e}. " f"Result may be unreliable due to poor signal quality or high variability." ) # Convert frequency to BPM rr = float(peak_freq * 60) logger.debug(f"Computed RR: {rr:.1f} BPM from {peak_freq:.4f} Hz") # Validation - check if result is within expected range expected_min = freq_min * 60 expected_max = freq_max * 60 if rr < expected_min or rr > expected_max: logger.warning( f"RR outside range: {rr:.1f} not in [{expected_min:.0f}, {expected_max:.0f}]" ) warnings.warn( f"Estimated RR ({rr:.1f} BPM) outside expected range " f"({expected_min:.0f}-{expected_max:.0f} BPM)" ) else: logger.debug(f"✓ FINAL RR ESTIMATE: {rr:.1f} BPM") # Additional diagnostic info if freq_resolution > 0.1: logger.warning(f"Coarse frequency resolution: {freq_resolution:.3f} Hz") warnings.warn( f"Frequency resolution ({freq_resolution:.3f} Hz) may be too coarse " f"for accurate RR discrimination. Consider longer signal or smaller nperseg." ) logger.debug("=" * 80) return rr