Source code for vitalDSP.physiological_features.nonlinear

"""
Physiological Features Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological
signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0

Key Features:
- Object-oriented design with comprehensive classes
- Multiple processing methods and functions
- NumPy integration for numerical computations
- SciPy integration for advanced signal processing
- Performance optimization

Examples:
---------
Basic usage:
    >>> import numpy as np
    >>> from vitalDSP.physiological_features.nonlinear import NonlinearFeatures
    >>> signal = np.random.randn(1000)
    >>> nf = NonlinearFeatures(signal)
    >>> sample_ent = nf.compute_sample_entropy()
    >>> dfa_alpha = nf.compute_dfa()
    >>> poincare = nf.compute_poincare_features()
    >>> print(f'Sample Entropy: {sample_ent}, DFA: {dfa_alpha}')
"""

import numpy as np
import pandas as pd
import os
import warnings

# from scipy.spatial.distance import cdist
from scipy.spatial import KDTree
from scipy.spatial import distance as sp_distance
from vitalDSP.utils.quality_performance.performance_monitoring import (
    monitor_feature_extraction_operation,
)


[docs] class NonlinearFeatures: """ A class for computing nonlinear (geometric) features from physiological signals (ECG, PPG, EEG). Attributes ---------- signal : np.array The physiological signal (e.g., ECG, PPG, EEG). fs : int The sampling frequency of the signal in Hz. Default is 1000 Hz. Methods ------- compute_sample_entropy(m=2, r=0.2) Computes the sample entropy of the signal, measuring its complexity. compute_approximate_entropy(m=2, r=0.2) Computes the approximate entropy of the signal, quantifying its unpredictability. compute_fractal_dimension(kmax=10) Computes the fractal dimension of the signal using Higuchi's method. compute_lyapunov_exponent() Computes the largest Lyapunov exponent, indicating the presence of chaos in the signal. compute_dfa(order=1) Computes the detrended fluctuation analysis (DFA) for assessing fractal scaling. compute_poincare_features() Computes Poincaré plot features (SD1 and SD2) to assess short- and long-term HRV variability. compute_recurrence_features(threshold=0.2) Computes features from the recurrence plot, including recurrence rate, determinism, and laminarity. """ def __init__(self, signal, fs=1000): """ Initializes the NonlinearFeatures object. Args: signal (np.array): The physiological signal. fs (int): The sampling frequency of the signal in Hz. Default is 1000 Hz. """ self.signal = np.array(signal) self.fs = fs # Sampling frequency
[docs] def compute_sample_entropy(self, m=2, r=0.2): """ Computes the sample entropy of the signal. Sample entropy is a measure of signal complexity, specifically used for detecting the regularity and unpredictability of fluctuations in a signal. Args: m (int): Embedding dimension (default is 2). r (float): Tolerance (default is 0.2). Returns: float: The computed sample entropy of the signal. Example: >>> ecg_signal = [...] # Sample ECG signal >>> nf = NonlinearFeatures(ecg_signal) >>> sample_entropy = nf.compute_sample_entropy() >>> print(f"Sample Entropy: {sample_entropy}") """ signal = np.asarray(self.signal) N = len(signal) if np.all(signal == 0) or np.std(signal) == 0: return 0 # Return 0 for constant or zero signals if N < m + 1: return 0 # Return 0 for signals too short for meaningful entropy # Normalize the signal to have zero mean and unit variance original_std = np.std(signal) signal = (signal - np.mean(signal)) / original_std r *= original_std # Scale tolerance by original std before normalization # Create embedded vectors for dimensions m and m+1 embedded_m = np.array([signal[i : i + m] for i in range(N - m + 1)]) embedded_m1 = np.array([signal[i : i + m + 1] for i in range(N - m)]) def _phi(embedded, tol): n = len(embedded) if n <= 1: return 0.0 tree = KDTree(embedded) # Use <= tol; adjust tol slightly if strict < is needed, but for practicality, use <= counts = tree.query_ball_point( embedded, r=tol, p=np.inf, return_length=True ) total_double = np.sum(counts) - n # Subtract self-matches num_pairs = n * (n - 1) / 2.0 return total_double / (2.0 * num_pairs) if num_pairs > 0 else 0.0 phi_m = _phi(embedded_m, r) phi_m1 = _phi(embedded_m1, r) if phi_m == 0 or phi_m1 == 0: return 0 # Avoid log of zero return -np.log(phi_m1 / phi_m)
[docs] def compute_approximate_entropy(self, m=2, r=0.2): """ Computes the approximate entropy of the signal. Approximate entropy quantifies the unpredictability and regularity of signal patterns. Args: m (int): Embedding dimension (default is 2). r (float): Tolerance (default is 0.2). Returns: float: The computed approximate entropy of the signal. Example: >>> ppg_signal = [...] # Sample PPG signal >>> nf = NonlinearFeatures(ppg_signal) >>> approx_entropy = nf.compute_approximate_entropy() >>> print(f"Approximate Entropy: {approx_entropy}") """ signal = np.asarray(self.signal) N = len(signal) if np.all(signal == 0) or np.std(signal) == 0: return 0 # Return 0 for constant or zero signals if N <= m + 1: return 0 # Return 0 for signals too short for meaningful entropy # Normalize the signal to have zero mean and unit variance signal = (signal - np.mean(signal)) / np.std(signal) r *= np.std(signal) # Scale tolerance to the signal's standard deviation def _phi(m): # Create embedded vectors for dimension m embedded = np.array([signal[i : i + m] for i in range(N - m + 1)]) # Build a KDTree for efficient neighbor searches tree = KDTree(embedded) # Query neighbors within radius r using Chebyshev (max) distance counts = tree.query_ball_point(embedded, r, p=np.inf) # Compute C_i values C = np.array([len(c) / (N - m + 1) for c in counts]) # Handle zero counts to avoid log(0) C = np.where(C == 0, np.finfo(float).eps, C) # Compute phi phi = np.sum(np.log(C)) / (N - m + 1) return phi # Compute phi for m and m+1 phi_m = _phi(m) phi_m1 = _phi(m + 1) # Approximate entropy is the difference between the two phi values return phi_m - phi_m1
[docs] def compute_fractal_dimension(self, kmax=10): """ Computes the fractal dimension of the signal using Higuchi's method. Fractal dimension is a measure of complexity, reflecting how the signal fills space as its scale changes. Returns: float: The fractal dimension of the signal. Example: >>> eeg_signal = [...] # Sample EEG signal >>> nf = NonlinearFeatures(eeg_signal) >>> fractal_dimension = nf.compute_fractal_dimension() >>> print(f"Fractal Dimension: {fractal_dimension}") """ if len(self.signal) < kmax: return 0.0 # Return 0.0 for signals too short for the given kmax def _higuchi_fd(signal, kmax): """ OPTIMIZED: Vectorized Higuchi fractal dimension computation """ Lmk = np.zeros((kmax, kmax)) N = len(signal) # OPTIMIZATION: Vectorized computation for all k and m values for k in range(1, kmax + 1): for m in range(0, k): # OPTIMIZATION: Vectorized curve length computation indices = np.arange(m, N, k) if len(indices) > 1: # Compute differences vectorized diffs = np.abs(np.diff(signal[indices])) Lm = np.sum(diffs) # Normalize by curve length curve_length = len(indices) - 1 if curve_length > 0: Lmk[m, k - 1] = Lm * (N - 1) / (curve_length * k * k) k_counts = np.arange(1, kmax + 1) Lk = np.sum(Lmk, axis=0) / k_counts # OPTIMIZATION: Vectorized log computation log_range = np.log(np.arange(1, kmax + 1)) if np.any(Lk == 0): return 0.0 # Return 0.0 to avoid division by zero in polyfit return -np.polyfit(log_range, np.log(Lk), 1)[0] return _higuchi_fd(self.signal, kmax)
[docs] def compute_lyapunov_exponent(self): """ Computes the largest Lyapunov exponent (LLE) of the signal. LLE measures the rate at which nearby trajectories in phase space diverge, indicating chaotic behavior in the signal. Returns: float: The largest Lyapunov exponent of the signal. Example: >>> ecg_signal = [...] # Sample ECG signal >>> nf = NonlinearFeatures(ecg_signal) >>> lyapunov_exponent = nf.compute_lyapunov_exponent() >>> print(f"Largest Lyapunov Exponent: {lyapunov_exponent}") """ N = len(self.signal) if N < 3: return 0 # Not enough data for meaningful computation epsilon = np.std(self.signal) * 0.1 max_t = min( int(N / 10), N - 3 ) # Ensure max_t is not larger than the length of the phase space def _distance(x, y): return np.sqrt(np.sum((x - y) ** 2)) def _lyapunov(time_delay, dim, max_t): """ OPTIMIZED: Vectorized Lyapunov exponent computation with spatial indexing """ if max_t <= 1: return 0 # Prevent division errors with too short signals # OPTIMIZATION: Vectorized phase space creation phase_space = np.array([self.signal[i::time_delay] for i in range(dim)]).T # OPTIMIZATION: Use spatial data structure for nearest neighbor search try: from scipy.spatial import cKDTree tree = cKDTree(phase_space) query_points = phase_space[: len(phase_space) - max_t - 1] all_distances, all_indices = tree.query(query_points, k=2) divergences = [] for i, (distances, indices) in enumerate( zip(all_distances, all_indices) ): if distances[1] > epsilon: neighbor_idx = indices[1] if neighbor_idx + max_t < len(phase_space): d0 = distances[1] d1 = np.linalg.norm( phase_space[i + max_t] - phase_space[neighbor_idx + max_t] ) if d1 > epsilon: divergences.append(np.log(d1 / d0)) except ImportError: # Fallback to original implementation if scipy not available divergences = [] for i in range(len(phase_space) - max_t - 1): d0 = _distance(phase_space[i], phase_space[i + 1]) d1 = _distance(phase_space[i + max_t], phase_space[i + max_t + 1]) if d0 > epsilon and d1 > epsilon: divergences.append(np.log(d1 / d0)) if len(divergences) == 0: return 0 # Return 0 if no valid divergences were found return np.mean(divergences) return _lyapunov(time_delay=5, dim=2, max_t=max_t)
[docs] @monitor_feature_extraction_operation def compute_dfa(self, order=1): """ Computes the Detrended Fluctuation Analysis (DFA) of the signal. DFA is used to assess the fractal scaling properties of time-series data, especially in physiological signals. Args: order (int): The order of the polynomial fit for detrending. Default is 1 (linear detrending). Returns: float: The DFA scaling exponent (α). Example: >>> ecg_signal = [...] # Sample ECG signal >>> nf = NonlinearFeatures(ecg_signal) >>> dfa = nf.compute_dfa(order=1) >>> print(f"DFA Scaling Exponent: {dfa}") """ import numpy as np signal = np.asarray(self.signal) N = len(signal) if N < 4: return 0 # Not enough data for DFA computation # Step 1: Compute the integrated signal integrated_signal = np.cumsum(signal - np.mean(signal)) # Step 2: Define scales (segment lengths) scales = np.unique( np.floor(np.logspace(np.log10(4), np.log10(N // 4), num=20)) ).astype(int) fluctuation_sizes = [] for scale in scales: # Number of segments n_segments = N // scale if n_segments < 2: continue # Need at least 2 segments for reliable estimation # Truncate the integrated signal to make it divisible by scale truncated_length = n_segments * scale integrated_truncated = integrated_signal[:truncated_length] # Reshape into segments segments = integrated_truncated.reshape((n_segments, scale)) # Create the time vector for polynomial fitting x = np.arange(scale) if order == 1: # OPTIMIZED: Vectorized linear detrending for order 1 X = np.vstack([x, np.ones_like(x)]).T # Design matrix # Precompute pseudoinverse of X for efficiency XtX_inv_Xt = np.linalg.pinv(X) # Compute coefficients for all segments at once coeffs = XtX_inv_Xt @ segments.T # Shape: (2, n_segments) # Compute trends efficiently trends = X @ coeffs # Shape: (scale, n_segments) trends = trends.T # Shape: (n_segments, scale) elif order == 2: # OPTIMIZED: Vectorized quadratic detrending for order 2 X = np.vstack([x**2, x, np.ones_like(x)]).T # Design matrix XtX_inv_Xt = np.linalg.pinv(X) coeffs = XtX_inv_Xt @ segments.T # Shape: (3, n_segments) trends = X @ coeffs # Shape: (scale, n_segments) trends = trends.T # Shape: (n_segments, scale) else: # OPTIMIZED: Batch processing for higher orders # Process segments in batches to reduce overhead batch_size = min(100, n_segments) trends = np.zeros_like(segments) for batch_start in range(0, n_segments, batch_size): batch_end = min(batch_start + batch_size, n_segments) batch_segments = segments[batch_start:batch_end] # Vectorized polynomial fitting for batch for i, segment in enumerate(batch_segments): coeffs = np.polyfit(x, segment, order) trends[batch_start + i] = np.polyval(coeffs, x) # Compute fluctuations residuals = segments - trends rms = np.sqrt(np.mean(residuals**2, axis=1)) # Append the mean fluctuation for this scale fluctuation_sizes.append(np.mean(rms)) # Convert scales and fluctuation sizes to numpy arrays fluctuation_sizes = np.array(fluctuation_sizes) scales = scales[: len(fluctuation_sizes)] # Logarithms of scales and fluctuation sizes with safety checks log_scales = np.log(np.maximum(scales, 1e-10)) # Avoid log(0) log_fluctuation_sizes = np.log( np.maximum(fluctuation_sizes, 1e-10) ) # Avoid log(0) # Linear regression to find the scaling exponent (alpha) dfa_alpha = np.polyfit(log_scales, log_fluctuation_sizes, 1)[0] return dfa_alpha
[docs] def compute_poincare_features(self): """ Computes the SD1 and SD2 features from the Poincaré plot of the NN intervals. SD1 reflects short-term HRV, while SD2 reflects long-term HRV. Returns: dict: Dictionary containing Poincaré plot features: - 'sd1': Short-term HRV variability - 'sd2': Long-term HRV variability - 'sd_ratio': SD1/SD2 ratio Example: >>> nn_intervals = [800, 810, 790, 805, 795] >>> nf = NonlinearFeatures(nn_intervals) >>> poincare = nf.compute_poincare_features() >>> print(f"SD1: {poincare['sd1']}, SD2: {poincare['sd2']}") """ nn_intervals = np.asarray(self.signal) x1 = nn_intervals[:-1] x2 = nn_intervals[1:] # Compute the differences and sums diff = x2 - x1 # summ = x2 + x1 # Compute variances var_diff = np.var(diff, ddof=1) var_nn = np.var(nn_intervals, ddof=1) # Calculate SD1 and SD2 with safety checks sd1_arg = var_diff / 2 sd1 = np.sqrt(np.maximum(sd1_arg, 0)) # Ensure non-negative sd2_arg = 2 * var_nn - var_diff / 2 sd2 = np.sqrt(np.maximum(sd2_arg, 0)) # Ensure non-negative # Calculate SD1/SD2 ratio sd_ratio = sd1 / sd2 if sd2 > 0 else 0 return {"sd1": sd1, "sd2": sd2, "sd_ratio": sd_ratio}
[docs] def compute_recurrence_features(self, threshold=0.2, sample_size=10000): """ Computes approximate recurrence features by sampling point pairs. This approach significantly reduces computation time for large datasets by avoiding the full pairwise distance calculations. Args: threshold (float): The threshold to define recurrences. Default is 0.2. sample_size (int): The number of point pairs to sample. Default is 10,000. Returns: dict: A dictionary containing approximate recurrence rate, determinism, and laminarity. Example: >>> ecg_signal = [...] # Sample ECG signal >>> nf = NonlinearFeatures(ecg_signal) >>> rqa_features = nf.compute_recurrence_features(threshold=0.2, sample_size=10000) >>> print(rqa_features) """ signal = np.asarray(self.signal) N = len(signal) if N < 2: return { "recurrence_rate": 0, "determinism": 0, "laminarity": 0, } # Normalize the signal to zero mean and unit variance with safety checks signal_mean = np.mean(signal) signal_std = np.std(signal) if signal_std > 0: signal = (signal - signal_mean) / signal_std else: # If signal is constant, return zero recurrence features return { "recurrence_rate": 0, "determinism": 0, "laminarity": 0, } # Create phase space (embedding dimension = 2) phase_space = np.column_stack((signal[:-1], signal[1:])) M = len(phase_space) # Total number of possible pairs (excluding self-pairs) total_pairs = M * (M - 1) // 2 # Adjust sample size if necessary sample_size = min(sample_size, total_pairs) # Sample random indices without replacement idx_pairs = np.random.choice(total_pairs, size=sample_size, replace=False) idx1 = idx_pairs // M idx2 = idx_pairs % M # Ensure idx1 < idx2 to avoid duplicate pairs mask = idx1 < idx2 idx1 = idx1[mask] idx2 = idx2[mask] sample_size = len(idx1) # Compute distances between sampled pairs diffs = phase_space[idx1] - phase_space[idx2] distances = np.sqrt(np.sum(diffs**2, axis=1)) # Compute approximate recurrence rate with safety check recurrences = distances < threshold if sample_size > 0: recurrence_rate = np.sum(recurrences) / sample_size else: recurrence_rate = 0 # Approximate determinism and laminarity # Sort indices to detect line structures sorted_indices = np.argsort(idx1) idx1_sorted = idx1[sorted_indices] idx2_sorted = idx2[sorted_indices] recurrences_sorted = recurrences[sorted_indices] # Initialize variables for line detection diag_lengths = [] vert_lengths = [] current_diag_len = 1 current_vert_len = 1 for i in range(1, sample_size): if recurrences_sorted[i]: # Check for diagonal lines (idx1 and idx2 increase by 1) if ( idx1_sorted[i] - idx1_sorted[i - 1] == 1 and idx2_sorted[i] - idx2_sorted[i - 1] == 1 ): current_diag_len += 1 else: if current_diag_len > 1: diag_lengths.append(current_diag_len) current_diag_len = 1 # Check for vertical lines (idx1 increases by 1, idx2 same) if ( idx1_sorted[i] - idx1_sorted[i - 1] == 1 and idx2_sorted[i] == idx2_sorted[i - 1] ): current_vert_len += 1 else: if current_vert_len > 1: vert_lengths.append(current_vert_len) current_vert_len = 1 else: if current_diag_len > 1: diag_lengths.append(current_diag_len) current_diag_len = 1 if current_vert_len > 1: vert_lengths.append(current_vert_len) current_vert_len = 1 # Append the last lengths if needed if current_diag_len > 1: diag_lengths.append(current_diag_len) if current_vert_len > 1: vert_lengths.append(current_vert_len) # Calculate determinism if diag_lengths: det = np.sum(diag_lengths) / np.sum(recurrences) else: det = 0 # Calculate laminarity if vert_lengths: laminarity = np.sum(vert_lengths) / np.sum(recurrences) else: laminarity = 0 return { "recurrence_rate": recurrence_rate, "determinism": det, "laminarity": laminarity, }
if __name__ == "__main__": ppg_signal = np.random.rand(1000) fname = "20190109T151032.026+0700_1050000_1080000.csv" PATH = r"D:\Workspace\Data\24EIa\output\sample" ppg_signal = pd.read_csv(os.path.join(PATH, fname))["PLETH"].values fs = 100 features = NonlinearFeatures(ppg_signal, fs) sample_entropy = features.compute_sample_entropy() approximate_entropy = features.compute_approximate_entropy() fractal_dimension = features.compute_fractal_dimension() lyapunov_exponent = features.compute_lyapunov_exponent() dfa = features.compute_dfa() poincare_features = features.compute_poincare_features() recurrence_features = features.compute_recurrence_features() print( f"Sample Entropy: {sample_entropy}, Approximate Entropy: {approximate_entropy}, Fractal Dimension: {fractal_dimension}, Lyapunov Exponent: {lyapunov_exponent}, DFA: {dfa}, Poincaré Features: {poincare_features}, Recurrence Features: {recurrence_features}" )