Source code for vitalDSP.physiological_features.trend_analysis

"""
Physiological Features Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological
signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0

Key Features:
- Object-oriented design with comprehensive classes
- Multiple processing methods and functions
- NumPy integration for numerical computations
- Comprehensive signal analysis

Examples:
--------
Basic usage:
    >>> import numpy as np
    >>> from vitalDSP.physiological_features.trend_analysis import TrendAnalysis
    >>> signal = np.random.randn(1000)
    >>> processor = TrendAnalysis(signal)
    >>> result = processor.process()
    >>> print(f'Processing result: {result}')
"""

import numpy as np


[docs] class TrendAnalysis: """ A comprehensive class to track and analyze long-term trends in physiological signals. This class provides various methods to compute trends, moving averages, momentum, and decompose the signal into its components for in-depth trend analysis. Methods ------- compute_moving_average : function Computes the moving average of the signal. compute_weighted_moving_average : function Computes the weighted moving average of the signal. compute_exponential_smoothing : function Computes exponential smoothing of the signal. compute_linear_trend : function Computes the linear trend using least squares regression. compute_polynomial_trend : function Computes a polynomial trend. compute_momentum : function Computes the momentum of the signal. compute_seasonal_decomposition : function Decomposes the signal into trend, seasonal, and residual components. compute_trend_strength : function Calculates the strength of the trend. detect_trend_reversal : function Detects trend reversal points in the signal. """ def __init__(self, signal): """ Initialize the TrendAnalysis class with the signal. Parameters ---------- signal : numpy.ndarray The input physiological signal to be analyzed. """ if len(signal) == 0: raise ValueError("Signal cannot be empty.") self.signal = signal
[docs] def compute_moving_average(self, window_size): """ Compute the moving average of the signal. This method smooths the signal by averaging data points within a moving window. Parameters ---------- window_size : int The size of the moving average window. Returns ------- moving_avg : numpy.ndarray The moving average of the signal. Examples -------- >>> signal = np.array([1, 2, 3, 4, 5, 6, 7]) >>> ta = TrendAnalysis(signal) >>> ta.compute_moving_average(3) array([2., 3., 4., 5., 6.]) """ if window_size > len(self.signal): raise ValueError( "Window size cannot be greater than the length of the signal." ) cumsum = np.cumsum(np.insert(self.signal, 0, 0)) moving_avg = (cumsum[window_size:] - cumsum[:-window_size]) / float(window_size) return moving_avg
[docs] def compute_weighted_moving_average(self, window_size, weights=None): """ Compute the weighted moving average of the signal. This method smooths the signal by averaging data points within a moving window, with each point weighted by the given weights. Parameters ---------- window_size : int The size of the moving average window. weights : numpy.ndarray or None, optional Weights for the moving average. If None, uniform weights are used. Returns ------- weighted_avg : numpy.ndarray The weighted moving average of the signal. Examples -------- >>> signal = np.array([1, 2, 3, 4, 5, 6, 7]) >>> ta = TrendAnalysis(signal) >>> ta.compute_weighted_moving_average(3, weights=[0.1, 0.3, 0.6]) array([2.2, 3.2, 4.2, 5.2, 6.2]) """ if weights is None: weights = np.ones(window_size) / window_size else: weights = np.array(weights) weights /= np.sum(weights) weighted_avg = np.convolve(self.signal, weights, mode="valid") return weighted_avg
[docs] def compute_exponential_smoothing(self, alpha): """ Compute exponential smoothing of the signal. Exponential smoothing applies a decay factor to smooth the signal, giving more weight to recent observations while maintaining a smooth trend. Parameters ---------- alpha : float Smoothing factor (0 < alpha <= 1). Returns ------- smoothed_signal : numpy.ndarray The exponentially smoothed signal. Examples -------- >>> signal = np.array([1, 2, 3, 4, 5, 6, 7]) >>> ta = TrendAnalysis(signal) >>> ta.compute_exponential_smoothing(0.3) array([1., 1.3, 1.81, 2.467, 3.227, 4.059, 4.941]) """ if not (0 < alpha <= 1): raise ValueError("Alpha must be between 0 and 1.") smoothed_signal = np.zeros_like(self.signal) smoothed_signal[0] = self.signal[0] for t in range(1, len(self.signal)): smoothed_signal[t] = ( alpha * self.signal[t] + (1 - alpha) * smoothed_signal[t - 1] ) return smoothed_signal
[docs] def compute_linear_trend(self): """ Compute the linear trend of the signal using least squares regression. This method fits a straight line to the signal data, representing the overall direction of the signal over time. Returns ------- linear_trend : numpy.ndarray The linear trend of the signal. Examples -------- >>> signal = np.array([1, 2, 3, 5, 8, 13, 21]) >>> ta = TrendAnalysis(signal) >>> ta.compute_linear_trend() array([ 0.6 , 2.45714286, 4.31428571, 6.17142857, 8.02857143, 9.88571429, 11.74285714]) """ x = np.arange(len(self.signal)) A = np.vstack([x, np.ones(len(x))]).T m, c = np.linalg.lstsq(A, self.signal, rcond=None)[0] linear_trend = m * x + c return linear_trend
[docs] def compute_polynomial_trend(self, degree): """ Compute a polynomial trend of the signal. This method fits a polynomial of the specified degree to the signal data, providing a more flexible trend line that can account for curvature in the data. Parameters ---------- degree : int The degree of the polynomial to fit. Returns ------- polynomial_trend : numpy.ndarray The polynomial trend of the signal. Examples -------- >>> signal = np.array([1, 2, 3, 5, 8, 13, 21]) >>> ta = TrendAnalysis(signal) >>> ta.compute_polynomial_trend(degree=2) array([ 1.05714286, 1.84285714, 3.05714286, 4.7 , 6.77142857, 9.27142857, 12.2 ]) """ x = np.arange(len(self.signal)) coeffs = np.polyfit(x, self.signal, degree) polynomial_trend = np.polyval(coeffs, x) return polynomial_trend
[docs] def compute_momentum(self, window_size): """ Compute the momentum of the signal. Momentum measures the rate of change of the signal over a specified window size, indicating the strength and direction of the trend. Parameters ---------- window_size : int The size of the window to calculate momentum. Returns ------- momentum : numpy.ndarray The momentum of the signal. Examples -------- >>> signal = np.array([1, 2, 3, 5, 8, 13, 21]) >>> ta = TrendAnalysis(signal) >>> ta.compute_momentum(window_size=2) array([ 0, 0, 1, 2, 3, 5, 8]) """ momentum = np.zeros_like(self.signal) momentum[window_size:] = self.signal[window_size:] - self.signal[:-window_size] return momentum
[docs] def compute_seasonal_decomposition(self, period): """ Decompose the signal into trend, seasonal, and residual components using seasonal decomposition. This method separates the signal into its underlying trend, seasonal variations, and random noise components, providing a clearer view of each aspect of the signal. Parameters ---------- period : int The period of the seasonal component. Returns ------- decomposition : dict A dictionary containing 'trend', 'seasonal', and 'residual' components. Examples -------- >>> signal = np.array([1, 2, 3, 4, 5, 6, 7]) >>> ta = TrendAnalysis(signal) >>> decomposition = ta.compute_seasonal_decomposition(period=2) >>> decomposition['trend'] array([1.5, 2.5, 3.5, 4.5, 5.5, 6.5]) >>> decomposition['seasonal'] array([ 0. , 0. , -0.5, 0.5, -0.5, 0.5]) >>> decomposition['residual'] array([-0.5, 0.5, 0. , 0. , 0. , 0. ]) """ n = len(self.signal) half = period // 2 trend = np.full(n, np.nan) for i in range(half, n - half): end_offset = 1 if period % 2 == 1 else 0 trend[i] = np.mean(self.signal[i - half : i + half + end_offset]) trend[:half] = trend[half] trend[n - half:] = trend[n - half - 1] detrended = self.signal - trend seasonal = np.array([np.mean(detrended[i::period]) for i in range(period)]) seasonal = np.tile(seasonal, n // period + 1)[:n] residual = self.signal - trend - seasonal return {"trend": trend, "seasonal": seasonal, "residual": residual}
[docs] def compute_trend_strength(self): """ Calculate the strength of the trend in the signal. Trend strength is determined by comparing the variance of the detrended signal to the variance of the original signal, with a value close to 1 indicating a strong trend. Returns ------- trend_strength : float The strength of the trend (between 0 and 1). Examples -------- >>> signal = np.array([1, 2, 3, 5, 8, 13, 21]) >>> ta = TrendAnalysis(signal) >>> ta.compute_trend_strength() 0.9571428571428573 """ linear_trend = self.compute_linear_trend() detrended_signal = self.signal - linear_trend signal_var = np.var(self.signal) if signal_var == 0: return 1.0 trend_strength = 1 - (np.var(detrended_signal) / signal_var) return trend_strength
[docs] def detect_trend_reversal(self, window_size): """ Detect trend reversal points in the signal. This method identifies points in the signal where the trend changes direction, which can be critical in understanding shifts in physiological signals. Parameters ---------- window_size : int The window size to evaluate trend reversals. Returns ------- reversals : numpy.ndarray Indices of the detected trend reversal points. Examples -------- >>> signal = np.array([1, 2, 3, 2, 1, 2, 3]) >>> ta = TrendAnalysis(signal) >>> ta.detect_trend_reversal(window_size=2) array([2, 4]) """ trend = self.compute_moving_average(window_size) diff_trend = np.diff(trend) reversals = np.where(np.diff(np.sign(diff_trend)))[0] + 1 return reversals