Source code for vitalDSP.signal_quality_assessment.adaptive_snr_estimation

"""
Signal Quality Assessment Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological
signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0

Key Features:
- Multiple processing methods and functions
- NumPy integration for numerical computations

Examples:
--------
Basic usage:
    >>> import numpy as np
    >>> from vitalDSP.signal_quality_assessment.adaptive_snr_estimation import AdaptiveSnrEstimation
    >>> signal = np.random.randn(1000)
    >>> processor = AdaptiveSnrEstimation(signal)
    >>> result = processor.process()
    >>> print(f'Processing result: {result}')
"""

import numpy as np


[docs] def sliding_window_snr(signal, window_size=100, step_size=50): """ Estimate SNR adaptively using a sliding window approach. Parameters ---------- signal : numpy.ndarray The input signal. window_size : int, optional (default=100) The size of the sliding window. step_size : int, optional (default=50) The step size for moving the window. Returns ------- snr_estimates : numpy.ndarray Array of SNR estimates for each window. Examples -------- >>> signal = np.sin(2 * np.pi * 0.2 * np.arange(0, 10, 0.01)) + 0.1 * np.random.normal(size=1000) >>> snr_estimates = sliding_window_snr(signal) >>> print(snr_estimates) """ from scipy.ndimage import uniform_filter1d snr_estimates = [] for i in range(0, len(signal) - window_size + 1, step_size): window_signal = signal[i : i + window_size] smoothed = uniform_filter1d(window_signal.astype(float), size=max(3, len(window_signal) // 5)) signal_power = np.mean(smoothed ** 2) noise_power = np.mean((window_signal - smoothed) ** 2) if noise_power == 0: snr_estimates.append(float('inf')) continue snr = 10 * np.log10(signal_power / noise_power) snr_estimates.append(snr) return np.array(snr_estimates)
[docs] def adaptive_threshold_snr(signal, threshold=0.5): """ Estimate SNR adaptively by applying a threshold to segment the signal. Parameters ---------- signal : numpy.ndarray The input signal. threshold : float, optional (default=0.5) The amplitude threshold for detecting noise segments. Returns ------- snr_estimate : float The estimated SNR. Examples -------- >>> signal = np.sin(2 * np.pi * 0.2 * np.arange(0, 10, 0.01)) + 0.1 * np.random.normal(size=1000) >>> snr_estimate = adaptive_threshold_snr(signal, threshold=0.3) >>> print(snr_estimate) """ noise_segments = signal[np.abs(signal) < threshold] signal_segments = signal[np.abs(signal) >= threshold] if len(signal_segments) == 0 or len(noise_segments) == 0: return -float("inf") if len(signal_segments) == 0 else float("inf") signal_power = np.mean(signal_segments**2) noise_power = np.mean(noise_segments**2) if noise_power == 0: return float("inf") # Infinite SNR due to no noise if signal_power == 0: return -float("inf") # No signal means undefined SNR (set to -inf) snr = 10 * np.log10(signal_power / noise_power) return snr
[docs] def recursive_snr_estimation(signal, alpha=0.9): """ Estimate SNR recursively using an exponential moving average. Parameters ---------- signal : numpy.ndarray The input signal. alpha : float, optional (default=0.9) Smoothing factor for the exponential moving average. Returns ------- snr_estimates : numpy.ndarray Array of SNR estimates for each point in the signal. Examples -------- >>> signal = np.sin(2 * np.pi * 0.2 * np.arange(0, 10, 0.01)) + 0.1 * np.random.normal(size=1000) >>> snr_estimates = recursive_snr_estimation(signal) >>> print(snr_estimates) """ snr_estimates = [] signal_estimate = 0 noise_estimate = 0 avg_mean = signal[0] for i, x in enumerate(signal): avg_mean = alpha * avg_mean + (1 - alpha) * x noise_estimate = (1 - alpha) * noise_estimate + alpha * (x - avg_mean) ** 2 signal_estimate = (1 - alpha) * signal_estimate + alpha * x ** 2 if noise_estimate == 0: snr = float("inf") else: snr = 10 * np.log10(signal_estimate / noise_estimate) snr_estimates.append(snr) return np.array(snr_estimates)