Source code for vitalDSP.advanced_computation.non_linear_analysis

"""
Advanced Computation Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological
signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0

Key Features:
- Object-oriented design with comprehensive classes
- Multiple processing methods and functions
- NumPy integration for numerical computations
- Interactive visualization capabilities
- Comprehensive signal analysis

Examples:
--------
Basic usage:
    >>> import numpy as np
    >>> from vitalDSP.advanced_computation.non_linear_analysis import NonLinearAnalysis
    >>> signal = np.random.randn(1000)
    >>> processor = NonLinearAnalysis(signal)
    >>> result = processor.process()
    >>> print(f'Processing result: {result}')
"""

import numpy as np
import matplotlib.pyplot as plt


[docs] class NonlinearAnalysis: """ Nonlinear Analysis for examining chaotic signals, such as Heart Rate Variability (HRV). This class provides methods for analyzing the chaotic behavior of signals using techniques like the estimation of Lyapunov exponents, generation of Poincaré plots, and calculation of correlation dimensions. Methods ------- lyapunov_exponent(max_iter=1000, epsilon=1e-8) Estimates the largest Lyapunov exponent to assess chaos in the signal. poincare_plot() Generates a Poincaré plot to visualize the dynamics of the signal. correlation_dimension(radius=0.1) Estimates the correlation dimension of the signal. Example Usage ------------- >>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100) >>> nonlinear_analysis = NonlinearAnalysis(signal) >>> lyapunov = nonlinear_analysis.lyapunov_exponent() >>> print("Lyapunov Exponent:", lyapunov) >>> nonlinear_analysis.poincare_plot() >>> correlation_dim = nonlinear_analysis.correlation_dimension() >>> print("Correlation Dimension:", correlation_dim) """ def __init__(self, signal): """ Initialize the NonlinearAnalysis class with the provided signal. Parameters ---------- signal : numpy.ndarray The input signal to be analyzed for nonlinear dynamics. """ self.signal = signal
[docs] def lyapunov_exponent(self, max_iter=1000, epsilon=1e-8, normalize=True): """ Estimate the largest Lyapunov exponent of the signal. The Lyapunov exponent is a measure of the rate of separation of infinitesimally close trajectories in a chaotic system. A positive Lyapunov exponent indicates chaos. **IMPORTANT:** For reliable results with physiological signals (ECG, PPG), signal normalization is highly recommended to avoid numerical instabilities. Parameters ---------- max_iter : int, optional Maximum number of iterations to compute the exponent (default is 1000). epsilon : float, optional Small perturbation used to calculate divergence, to avoid division by zero (default is 1e-8). normalize : bool, optional If True, automatically normalize the signal before computation (default is True). Normalization prevents divide-by-zero errors with signals containing flat segments. Returns ------- float The estimated largest Lyapunov exponent. Raises ------ ValueError If signal is too short for the specified max_iter. Warning If computation results in NaN or inf values. Examples -------- >>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100) >>> nonlinear_analysis = NonlinearAnalysis(signal) >>> lyapunov = nonlinear_analysis.lyapunov_exponent() >>> print("Lyapunov Exponent:", lyapunov) Notes ----- Uses the Rosenstein et al. (1993) algorithm: delay-embeds the signal into a dim-dimensional phase space, finds the nearest neighbor for each reference point (with Theiler window to exclude temporal neighbors), tracks trajectory divergence, and fits the slope of the mean log-divergence curve. """ import warnings n = len(self.signal) # Validate signal length if n <= max_iter: raise ValueError( f"Signal length ({n}) must be greater than max_iter ({max_iter}). " f"Either reduce max_iter or provide a longer signal." ) # Normalize signal if requested (recommended for physiological signals) signal = self.signal.copy() if normalize: signal_mean = np.mean(signal) signal_std = np.std(signal) if signal_std > epsilon: signal = (signal - signal_mean) / signal_std else: warnings.warn( "Signal has zero or very low variance. Lyapunov exponent may not be meaningful." ) # Phase-space embedding parameters (Rosenstein algorithm) tau = 1 # Time delay dim = 2 # Embedding dimension min_sep = max(1, int(0.1 * n)) # Theiler window (minimum temporal separation) # Create delay-embedded phase space n_emb = n - (dim - 1) * tau if n_emb < max_iter + min_sep + 1: return np.nan embedded = np.array([signal[i:i + dim * tau:tau] for i in range(n_emb)]) all_log_ratios = [] skipped_points = 0 for i in range(n_emb - max_iter): # Find nearest neighbor with Theiler exclusion best_dist = np.inf nn_idx = -1 for j in range(n_emb): if abs(i - j) <= min_sep: continue d = np.linalg.norm(embedded[i] - embedded[j]) if d < best_dist and d > 0: best_dist = d nn_idx = j if nn_idx == -1 or best_dist < epsilon: skipped_points += 1 continue # Track divergence of nearest-neighbor trajectory log_ratio = [] for t in range(1, max_iter): if i + t >= n_emb or nn_idx + t >= n_emb: break d_t = np.linalg.norm(embedded[i + t] - embedded[nn_idx + t]) if d_t > 0: log_ratio.append(np.log(d_t / best_dist)) if len(log_ratio) > 0 and np.all(np.isfinite(log_ratio)): all_log_ratios.append(log_ratio) if len(all_log_ratios) == 0: warnings.warn( "No valid distance calculations obtained. Signal may have too many " "repeated values or be unsuitable for Lyapunov exponent estimation. " "Try normalizing the signal or using a different analysis method." ) return np.nan if skipped_points > (n_emb - max_iter) * 0.5: warnings.warn( f"Skipped {skipped_points}/{n_emb - max_iter} points due to near-zero distances. " f"Results may be unreliable. Consider signal preprocessing." ) # Pad shorter arrays with NaN for consistent shape max_len = max(len(r) for r in all_log_ratios) padded = np.full((len(all_log_ratios), max_len), np.nan) for i, r in enumerate(all_log_ratios): padded[i, :len(r)] = r avg_log_div = np.nanmean(padded, axis=0) time_indices = np.arange(1, len(avg_log_div) + 1) valid = ~np.isnan(avg_log_div) & ~np.isinf(avg_log_div) if np.sum(valid) < 2: return 0.0 coeffs = np.polyfit(time_indices[valid], avg_log_div[valid], 1) lyapunov = coeffs[0] if not np.isfinite(lyapunov): warnings.warn( f"Lyapunov exponent is {lyapunov}. This indicates numerical issues. " f"Try signal normalization or different epsilon value." ) return lyapunov
[docs] def poincare_plot(self): """ Generate a Poincaré plot to visualize the dynamics of the signal. The Poincaré plot is a scatter plot of the signal against its delayed version. It is often used to visualize periodic and chaotic dynamics in the signal. Returns ------- matplotlib.figure.Figure The generated Poincaré plot. Examples -------- >>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100) >>> nonlinear_analysis = NonlinearAnalysis(signal) >>> nonlinear_analysis.poincare_plot() """ x = self.signal[:-1] y = self.signal[1:] fig, ax = plt.subplots() ax.scatter(x, y) ax.set_xlabel(r"$x_{n}$") ax.set_ylabel(r"$x_{n+1}$") ax.set_title("Poincaré Plot") return fig
[docs] def correlation_dimension(self, radius=0.1, normalize=True): """ Estimate the correlation dimension of the signal using the Grassberger-Procaccia method. The correlation dimension is a measure of the fractal dimension of the attractor in the phase space of the signal. It provides insight into the complexity of the signal. **IMPORTANT:** Signal normalization is highly recommended for physiological signals to ensure the radius parameter is appropriate for the signal's scale. Parameters ---------- radius : float, optional Radius within which points are considered neighbors (default is 0.1). For normalized signals, typical values are 0.1-2.0. For raw signals, choose radius based on signal scale. normalize : bool, optional If True, automatically normalize the signal before computation (default is True). Normalization ensures radius is appropriate regardless of signal amplitude. Returns ------- float The estimated correlation dimension. Raises ------ ValueError If radius is 1.0 (causes divide-by-zero), or if no point pairs found within radius. Warning If correlation dimension is negative (suggests inappropriate radius selection). Examples -------- >>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100) >>> nonlinear_analysis = NonlinearAnalysis(signal) >>> correlation_dim = nonlinear_analysis.correlation_dimension(radius=0.5) >>> print("Correlation Dimension:", correlation_dim) Notes ----- For physiological signals (ECG, PPG): 1. Always normalize the signal (normalize=True, default) 2. Use radius in range [0.1, 2.0] for normalized signals 3. Avoid radius = 1.0 exactly (causes log(1.0) = 0) 4. If negative result, try different radius or check signal quality The correlation dimension should typically be between 0 and the embedding dimension. Values outside this range suggest inappropriate parameters or unsuitable signal. """ # Validate radius if np.abs(radius - 1.0) < 1e-10: raise ValueError( "Radius cannot be 1.0 because log(1.0) = 0, causing division by zero. " "Use radius != 1.0 (e.g., 0.9 or 1.1)." ) if radius <= 0: raise ValueError(f"Radius must be positive, got {radius}") n = len(self.signal) signal = self.signal.copy() if normalize: signal_mean = np.mean(signal) signal_std = np.std(signal) if signal_std > 1e-10: signal = (signal - signal_mean) / signal_std else: import warnings warnings.warn( "Signal has zero or very low variance. " "Correlation dimension may not be meaningful." ) # Time-delay embedding tau = 1 m = 2 if n <= (m - 1) * tau: return 0.0 embedded = np.array([signal[i:i + (m-1)*tau + 1:tau] for i in range(n - (m-1)*tau)]) n_emb = len(embedded) # Compute C(r) at multiple radii and use slope in log-log space sig_std = np.std(signal) if sig_std < 1e-10: sig_std = 1.0 radii = np.logspace(np.log10(0.1 * sig_std), np.log10(2 * sig_std), 10) log_C = [] log_r = [] for r in radii: count = 0 for i in range(n_emb): for j in range(i + 1, n_emb): if np.linalg.norm(embedded[i] - embedded[j]) < r: count += 1 C = 2.0 * count / (n_emb * (n_emb - 1)) if n_emb > 1 else 0 if C > 0: log_C.append(np.log(C)) log_r.append(np.log(r)) if len(log_C) < 2: raise ValueError( f"No point pairs found within radius range. " f"Signal characteristics: mean={np.mean(signal):.4f}, std={np.std(signal):.4f}, " f"range=[{np.min(signal):.4f}, {np.max(signal):.4f}]. " f"Try: (1) Increasing radius, (2) Enabling normalization (normalize=True), " f"or (3) Using a longer signal segment." ) slope = np.polyfit(log_r, log_C, 1)[0] if not np.isfinite(slope): import warnings warnings.warn( f"Correlation dimension is {slope}. This indicates numerical issues." ) return slope