"""
Respiratory Analysis Module for Physiological Signal Processing
This module provides comprehensive capabilities for physiological
signal processing including ECG, PPG, EEG, and other vital signs.
Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0
Key Features:
- Multiple processing methods and functions
- NumPy integration for numerical computations
Examples:
---------
Basic usage:
>>> import numpy as np
>>> from vitalDSP.estimate_rr.fft_based_rr import FftBasedRr
>>> signal = np.random.randn(1000)
>>> processor = FftBasedRr(signal)
>>> result = processor.process()
>>> print(f'Processing result: {result}')
"""
import numpy as np
import warnings
import logging
from vitalDSP.preprocess.preprocess_operations import preprocess_signal
# Set up logger for this module
logger = logging.getLogger(__name__)
[docs]
def fft_based_rr(
signal,
sampling_rate,
preprocess=None,
freq_min=0.1,
freq_max=0.5,
**preprocess_kwargs,
):
"""
Estimate respiratory rate using the FFT (Fast Fourier Transform) method
with respiratory band filtering.
This method computes the FFT power spectrum and identifies the dominant
frequency within the physiological respiratory range (0.1-0.5 Hz or 6-30 BPM).
The respiratory band filtering prevents false detection of cardiac frequencies
or high-frequency noise artifacts.
Parameters
----------
signal : numpy.ndarray
The input respiratory signal.
sampling_rate : float
The sampling rate of the signal in Hz.
preprocess : str, optional
The preprocessing method to apply before estimation (e.g., "bandpass", "wavelet").
freq_min : float, optional (default=0.1)
Minimum respiratory frequency in Hz (6 BPM). Can be adjusted for different
populations (e.g., 0.05 Hz for very slow breathing).
freq_max : float, optional (default=0.5)
Maximum respiratory frequency in Hz (30 BPM). Can be increased to 0.67 Hz
(40 BPM) for tachypnea or exercise conditions.
preprocess_kwargs : dict, optional
Additional arguments for the preprocessing function.
Returns
-------
rr : float
Estimated respiratory rate in breaths per minute.
Examples
--------
>>> signal = np.sin(2 * np.pi * 0.2 * np.arange(0, 10, 0.01))
>>> rr = fft_based_rr(signal, sampling_rate=100, preprocess='bandpass', lowcut=0.1, highcut=0.5)
>>> print(rr)
>>> # For exercise/tachypnea (up to 40 BPM)
>>> rr = fft_based_rr(signal, sampling_rate=100, freq_max=0.67)
Notes
-----
This implementation fixes the critical bug in the original version that searched
the entire frequency spectrum, often picking cardiac frequencies (1-2 Hz / 60-120 BPM)
instead of respiratory frequencies. The corrected version restricts the search to
the physiological respiratory band.
The method includes SNR validation to warn if the detected peak has low prominence,
which may indicate poor signal quality or unreliable estimation.
References
----------
.. [charlton2018] Charlton, P.H., et al. (2018). Breathing rate estimation from the
electrocardiogram and photoplethysmogram: A review. IEEE Reviews in
Biomedical Engineering, 11, 2-20.
"""
logger.debug("=" * 80)
logger.debug("FFT-BASED RR - Starting estimation")
logger.debug(f"Input signal length: {len(signal)} samples")
logger.debug(f"Sampling rate: {sampling_rate} Hz")
logger.debug(f"Signal duration: {len(signal)/sampling_rate:.2f} seconds")
logger.debug(f"Signal range: [{np.min(signal):.4f}, {np.max(signal):.4f}]")
logger.debug(f"Signal mean: {np.mean(signal):.4f}, std: {np.std(signal):.4f}")
logger.debug(
f"Respiratory frequency range: {freq_min}-{freq_max} Hz ({freq_min*60:.0f}-{freq_max*60:.0f} BPM)"
)
if preprocess:
logger.debug(f"Applying preprocessing: {preprocess}")
signal_before = signal.copy()
signal = preprocess_signal(
signal, sampling_rate, filter_type=preprocess, **preprocess_kwargs
)
logger.debug(
f"After preprocessing - mean: {np.mean(signal):.4f}, std: {np.std(signal):.4f}"
)
# Validate signal
if len(signal) < 10:
logger.error("Signal too short for reliable FFT analysis")
warnings.warn("Signal too short for reliable FFT analysis")
return 0.0
# Perform FFT on the signal
fft_result = np.fft.fft(signal)
freqs = np.fft.fftfreq(len(signal), d=1 / sampling_rate)
logger.debug(f"FFT computed - {len(freqs)} frequency bins")
logger.debug(f"Frequency resolution: {freqs[1]-freqs[0]:.4f} Hz")
# Filter to respiratory frequency range ONLY (critical fix)
respiratory_mask = (freqs >= freq_min) & (freqs <= freq_max)
resp_freqs = freqs[respiratory_mask]
resp_fft = np.abs(fft_result[respiratory_mask])
logger.debug(f"Frequencies in respiratory band: {len(resp_fft)}")
if len(resp_fft) == 0:
logger.error(f"No frequencies in respiratory range ({freq_min}-{freq_max} Hz)")
raise ValueError(
f"No frequencies found in respiratory range ({freq_min}-{freq_max} Hz). "
f"Signal may be too short or sampling rate too low. "
f"Minimum signal length: {int(1/freq_min * sampling_rate)} samples."
)
# Find peak in respiratory band
peak_idx = np.argmax(resp_fft)
peak_freq = resp_freqs[peak_idx]
peak_power = resp_fft[peak_idx]
logger.debug(f"Peak found at frequency: {peak_freq:.4f} Hz")
logger.debug(f"Peak power: {peak_power:.4e}")
# Show top 5 peaks for comparison
sorted_indices = np.argsort(resp_fft)[::-1][:5]
logger.debug("Top 5 frequency peaks in respiratory band:")
for i, idx in enumerate(sorted_indices):
freq = resp_freqs[idx]
power = resp_fft[idx]
bpm = freq * 60
logger.debug(f" {i+1}. {freq:.4f} Hz ({bpm:.1f} BPM) - Power: {power:.4e}")
# Validate peak prominence (SNR check)
# Compare peak power to median power (noise floor estimate)
noise_floor = np.median(resp_fft)
snr = peak_power / (noise_floor + 1e-10)
logger.debug(f"Noise floor (median): {noise_floor:.4e}")
logger.debug(f"SNR: {snr:.2f}")
if snr < 2.0:
logger.warning(f"Low SNR ({snr:.2f}) - result may be unreliable")
warnings.warn(
f"Low SNR ({snr:.2f}) in FFT peak detection. "
f"Peak power: {peak_power:.2e}, Noise floor: {noise_floor:.2e}. "
f"Result may be unreliable due to poor signal quality."
)
# Convert frequency to BPM
rr = float(peak_freq * 60)
logger.debug(f"Computed RR: {rr:.1f} BPM from frequency {peak_freq:.4f} Hz")
# Sanity check - validate result is within expected range
expected_min = freq_min * 60
expected_max = freq_max * 60
if rr < expected_min or rr > expected_max:
logger.warning(
f"RR ({rr:.1f} BPM) outside expected range ({expected_min:.0f}-{expected_max:.0f} BPM)"
)
warnings.warn(
f"Estimated RR ({rr:.1f} BPM) outside expected range "
f"({expected_min:.0f}-{expected_max:.0f} BPM)"
)
else:
logger.debug(f"✓ FINAL RR ESTIMATE: {rr:.1f} BPM (within expected range)")
logger.debug("=" * 80)
return rr