Source code for vitalDSP.physiological_features.symbolic_dynamics

"""
Symbolic Dynamics Analysis Module
==================================

This module provides symbolic dynamics methods for analyzing physiological signals
by transforming continuous signals into discrete symbol sequences for pattern analysis.

Implemented Methods:
--------------------
1. Symbolic Transformation (0V, 1V, 2LV, 2UV patterns)
2. Shannon Entropy of Symbol Distribution
3. Word Distribution Analysis
4. Forbidden Words Detection
5. Pattern Transition Analysis
6. Renyi Entropy
7. Permutation Entropy

Clinical Applications:
----------------------
- Cardiac autonomic function assessment
- Arrhythmia detection and classification
- Sleep stage classification
- Fetal heart rate monitoring
- Blood pressure variability analysis
- Seizure prediction

Mathematical Background:
------------------------
Symbolic dynamics transforms a continuous-valued time series into a sequence of
discrete symbols based on pattern recognition. This approach reduces noise sensitivity
and reveals underlying regulatory patterns.

The transformation captures important dynamical features while being robust to:
- Measurement noise
- Non-stationarity
- Missing data
- Computational complexity

References:
-----------
1. Voss, A., Schulz, S., Schroeder, R., Baumert, M., & Caminal, P. (2009).
   Methods derived from nonlinear dynamics for analysing heart rate variability.
   Philosophical Transactions of the Royal Society A, 367(1887), 277-296.

2. Porta, A., Guzzetti, S., Montano, N., Furlan, R., Pagani, M., Malliani, A.,
   & Cerutti, S. (2001). Entropy, entropy rate, and pattern classification as
   tools to typify complexity in short heart period variability series.
   IEEE Transactions on Biomedical Engineering, 48(11), 1282-1291.

3. Bandt, C., & Pompe, B. (2002). Permutation entropy: a natural complexity
   measure for time series. Physical review letters, 88(17), 174102.


Physiological Features Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological
signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0

Key Features:
- Object-oriented design with comprehensive classes
- Multiple processing methods and functions
- NumPy integration for numerical computations
- SciPy integration for advanced signal processing
- Interactive visualization capabilities

Examples:
---------
Basic usage:
    >>> import numpy as np
    >>> from vitalDSP.physiological_features.symbolic_dynamics import SymbolicDynamics
    >>> rr_intervals = np.random.normal(0.8, 0.05, 200)
    >>> sd = SymbolicDynamics(rr_intervals, n_symbols=4, word_length=3, method='0V')
    >>> shannon = sd.compute_shannon_entropy()   # returns float
    >>> forbidden = sd.detect_forbidden_words()  # returns list of str
    >>> print(f'Shannon Entropy: {shannon:.3f}, Forbidden words: {len(forbidden)}')
"""

import numpy as np
import math
from collections import Counter
from itertools import permutations, product
from typing import List, Dict, Tuple, Optional
import warnings


[docs] class SymbolicDynamics: """ Symbolic Dynamics Analysis for physiological signals. Transforms continuous time series into symbolic sequences and analyzes the distribution and patterns of symbols. Parameters ---------- signal : numpy.ndarray Input time series signal (1D array) n_symbols : int, optional Number of symbols to use (default: 4) Common choices: 3, 4, 6 word_length : int, optional Length of words to analyze (default: 3) Typical range: 2-5 method : str, optional Symbolization method (default: '0V') Options: '0V' (variations), 'quantile', 'SAX', 'threshold' Attributes ---------- signal : numpy.ndarray Original signal n_symbols : int Number of symbols word_length : int Word length for pattern analysis method : str Symbolization method symbols : numpy.ndarray Symbolic sequence Methods ------- symbolize() Transform signal to symbol sequence compute_shannon_entropy() Shannon entropy of symbol distribution compute_word_distribution() Distribution of words detect_forbidden_words() Find patterns that never occur compute_transition_matrix() Symbol transition probabilities compute_renyi_entropy(alpha) Generalized Renyi entropy compute_permutation_entropy() Permutation entropy Examples -------- >>> # Analyze heart rate variability >>> from vitalDSP.physiological_features.symbolic_dynamics import SymbolicDynamics >>> import numpy as np >>> >>> # RR intervals (seconds) >>> rr = np.array([1.0, 0.95, 1.02, 0.98, 1.01, 0.96, ...]) >>> >>> # Create symbolic representation >>> sd = SymbolicDynamics(rr, n_symbols=4, word_length=3) >>> symbols = sd.symbolize() >>> >>> # Compute Shannon entropy >>> h = sd.compute_shannon_entropy() >>> print(f"Shannon Entropy: {h:.4f}") >>> >>> # Analyze word distribution >>> word_dist = sd.compute_word_distribution() >>> >>> # Find forbidden words (never occurring patterns) >>> forbidden = sd.detect_forbidden_words() >>> print(f"Forbidden words: {len(forbidden)}") Notes ----- **Symbol Interpretation (0V method):** - **0V (no variation):** Three consecutive values are approximately equal Represents stable regulation - **1V (one variation):** Two values equal, one different Represents small perturbations - **2LV (two variations, low first):** Low-High-Low or similar Represents oscillatory pattern with deceleration - **2UV (two variations, high first):** High-Low-High or similar Represents oscillatory pattern with acceleration **Clinical Interpretation:** - **Healthy:** Balanced distribution of symbols, few forbidden words - **Disease:** Skewed distribution, many forbidden words - **Atrial Fibrillation:** Very high entropy, nearly uniform distribution - **Heart Failure:** Low entropy, many forbidden words **Parameter Recommendations:** - **n_symbols:** 4-6 for HRV analysis - **word_length:** 3 for balance of detail and statistics - **method:** '0V' for HRV, 'quantile' for general signals """ def __init__( self, signal: np.ndarray, n_symbols: int = 4, word_length: int = 3, method: str = "0V", ): """ Initialize Symbolic Dynamics analyzer. Parameters ---------- signal : numpy.ndarray Input time series n_symbols : int Number of symbols (3-10) word_length : int Word length (2-5) method : str Symbolization method """ # Input validation if not isinstance(signal, np.ndarray): signal = np.array(signal) if len(signal) < word_length: raise ValueError( f"Signal too short ({len(signal)} samples). " f"Need at least {word_length} samples." ) if n_symbols < 2 or n_symbols > 26: raise ValueError(f"n_symbols must be 2-26, got {n_symbols}") if word_length < 2 or word_length > 10: raise ValueError(f"word_length must be 2-10, got {word_length}") valid_methods = ["0V", "quantile", "SAX", "threshold"] if method not in valid_methods: raise ValueError(f"method must be one of {valid_methods}, got {method}") self.signal = signal self.n_symbols = n_symbols self.word_length = word_length self.method = method self.symbols = None
[docs] def symbolize(self) -> np.ndarray: """ Transform continuous signal to symbolic sequence. Returns ------- symbols : numpy.ndarray Array of symbol indices (integers 0 to n_symbols-1) Methods: -------- 1. **0V Method (Variations):** Classifies triplets based on pattern variations: - 0V: all approximately equal (|a-b|<δ, |b-c|<δ, |a-c|<δ) - 1V: two equal, one different - 2LV: two variations with low-high-low pattern - 2UV: two variations with high-low-high pattern 2. **Quantile Method:** Divides signal into n_symbols quantiles. 3. **SAX (Symbolic Aggregate approXimation):** Uses Gaussian quantiles for symbolization. 4. **Threshold Method:** Simple thresholding based on percentiles. Examples: --------- >>> sd = SymbolicDynamics(signal, n_symbols=4, method='0V') >>> symbols = sd.symbolize() >>> >>> # Convert to letter representation >>> letters = ''.join([chr(65+s) for s in symbols]) # A, B, C, D... >>> print(f"Symbolic sequence: {letters[:50]}...") """ if self.method == "0V": symbols = self._symbolize_0v() elif self.method == "quantile": symbols = self._symbolize_quantile() elif self.method == "SAX": symbols = self._symbolize_sax() elif self.method == "threshold": symbols = self._symbolize_threshold() self.symbols = symbols return symbols
def _symbolize_0v(self) -> np.ndarray: """ 0V symbolization method for HRV analysis. Classifies consecutive triplets into 4 categories: 0V, 1V, 2LV, 2UV Returns: -------- symbols : numpy.ndarray Symbol sequence (0-3) """ # Calculate tolerance (typically 0.5% of signal range) delta = 0.005 * (np.max(self.signal) - np.min(self.signal)) symbols = [] for i in range(len(self.signal) - 2): a, b, c = self.signal[i : i + 3] d1 = b - a d2 = c - b v1 = abs(d1) > delta v2 = abs(d2) > delta if not v1 and not v2: symbol = 0 # 0V: no significant variation elif v1 != v2: symbol = 1 # 1V: one significant variation elif d1 * d2 > 0: symbol = 2 # 2LV: two like variations (monotonic) else: symbol = 3 # 2UV: two unlike variations (oscillatory) symbols.append(symbol) if self.n_symbols != 4: import warnings warnings.warn( f"0V symbolization always produces exactly 4 symbols (0-3). " f"n_symbols={self.n_symbols} is ignored for this method.", UserWarning, ) return np.array(symbols) def _symbolize_quantile(self) -> np.ndarray: """ Quantile-based symbolization. Divides signal range into equal-frequency bins. """ # Calculate quantile thresholds quantiles = np.linspace(0, 100, self.n_symbols + 1) thresholds = np.percentile(self.signal, quantiles[1:-1]) # Assign symbols based on quantile symbols = np.digitize(self.signal, thresholds) return symbols def _symbolize_sax(self) -> np.ndarray: """ SAX (Symbolic Aggregate approXimation) symbolization. Uses Gaussian quantiles assuming normalized signal. """ # Normalize signal (z-score) std_val = np.std(self.signal) if std_val == 0: return np.zeros(len(self.signal), dtype=int) normalized = (self.signal - np.mean(self.signal)) / std_val # Calculate Gaussian quantile breakpoints from scipy import stats quantiles = np.linspace(0, 1, self.n_symbols + 1)[1:-1] thresholds = stats.norm.ppf(quantiles) # Assign symbols symbols = np.digitize(normalized, thresholds) return symbols def _symbolize_threshold(self) -> np.ndarray: """Simple threshold-based symbolization.""" thresholds = np.linspace( np.min(self.signal), np.max(self.signal), self.n_symbols + 1 )[1:-1] symbols = np.digitize(self.signal, thresholds) return symbols
[docs] def compute_shannon_entropy(self) -> float: """ Compute Shannon entropy of symbol distribution. Shannon entropy quantifies the average information content or unpredictability of the symbol sequence. Returns ------- entropy : float Shannon entropy in bits (log base 2) Formula: -------- H = -Σ p(i) * log2(p(i)) where p(i) is the probability of symbol i. Interpretation: --------------- - ``H = 0``: Completely predictable (only one symbol appears) - ``H = log2(n_symbols)``: Maximum entropy (uniform distribution) - In between: Degree of predictability/complexity Clinical Significance: ---------------------- - **Low H:** Regular, predictable rhythm (may indicate reduced adaptability) - **High H:** Variable, unpredictable rhythm (healthy variability) - **Very High H:** Chaotic, random (e.g., atrial fibrillation) Examples: --------- >>> sd = SymbolicDynamics(signal) >>> sd.symbolize() >>> h = sd.compute_shannon_entropy() >>> >>> # Normalize by maximum possible entropy >>> h_max = np.log2(sd.n_symbols) >>> h_norm = h / h_max >>> print(f"Normalized entropy: {h_norm:.4f}") """ if self.symbols is None: self.symbolize() # Count symbol frequencies symbol_counts = Counter(self.symbols) total_symbols = len(self.symbols) # Symbol probability distribution (keyed by symbol value) symbol_distribution = { sym: count / total_symbols for sym, count in symbol_counts.items() } # Calculate probabilities array for entropy computation probabilities = np.array(list(symbol_distribution.values())) # Shannon entropy: -Σ p*log2(p), ignoring zero probabilities nonzero_probs = probabilities[probabilities > 0] entropy = -np.sum(nonzero_probs * np.log2(nonzero_probs)) max_entropy = np.log2(self.n_symbols) if self.n_symbols > 1 else 0.0 normalized_entropy = entropy / max_entropy if max_entropy > 0 else 0.0 return { "entropy": float(entropy), "normalized_entropy": float(normalized_entropy), "max_entropy": float(max_entropy), "symbol_distribution": symbol_distribution, }
[docs] def compute_word_distribution(self) -> Dict[str, float]: """ Compute distribution of words (symbol patterns). Returns ------- word_dist : dict Dictionary mapping words to their probabilities Keys: words (strings of symbols) Values: probabilities (0-1) Examples: --------- >>> sd = SymbolicDynamics(signal, word_length=3) >>> sd.symbolize() >>> word_dist = sd.compute_word_distribution() >>> >>> # Most common words >>> sorted_words = sorted(word_dist.items(), key=lambda x: x[1], reverse=True) >>> print("Top 5 most common words:") >>> for word, prob in sorted_words[:5]: ... print(f"{word}: {prob:.4f}") """ if self.symbols is None: self.symbolize() # Extract words words = [] for i in range(len(self.symbols) - self.word_length + 1): word = tuple(self.symbols[i : i + self.word_length]) words.append(word) # Count word frequencies word_counts = Counter(words) total_words = len(words) # Convert to probabilities with full metadata per word word_distribution = { "".join(str(s) for s in word): { "probability": count / total_words, "count": int(count), } for word, count in sorted( word_counts.items(), key=lambda kv: kv[1], reverse=True ) } return { "word_distribution": word_distribution, "total_words": total_words, "unique_words": len(word_counts), }
[docs] def detect_forbidden_words(self) -> List[str]: """ Detect forbidden words (patterns that never occur). Returns ------- forbidden_words : list of str List of words that never appear in the sequence Significance: ------------- Forbidden words indicate deterministic constraints or regulatory mechanisms that prevent certain patterns from occurring. - **Many forbidden words:** Strong regulatory constraints (often pathological) - **Few forbidden words:** Flexible regulation (typically healthy) - **No forbidden words:** Complete randomness (e.g., atrial fibrillation) Examples: --------- >>> sd = SymbolicDynamics(signal, n_symbols=4, word_length=3) >>> sd.symbolize() >>> forbidden = sd.detect_forbidden_words() >>> >>> total_possible = sd.n_symbols ** sd.word_length >>> forbidden_ratio = len(forbidden) / total_possible >>> print(f"Forbidden word ratio: {forbidden_ratio:.2%}") """ if self.symbols is None: self.symbolize() # Get observed words (compute_word_distribution now returns a dict with nested keys) word_dist_result = self.compute_word_distribution() observed_words = set(word_dist_result["word_distribution"].keys()) # Generate all possible words all_possible_words = set() for perm in product(range(self.n_symbols), repeat=self.word_length): word = "".join(str(s) for s in perm) all_possible_words.add(word) # Find forbidden words forbidden_words = sorted(all_possible_words - observed_words) n_possible = len(all_possible_words) n_forbidden = len(forbidden_words) forbidden_pct = 100.0 * n_forbidden / n_possible if n_possible > 0 else 0.0 if forbidden_pct > 50: interpretation = "Many forbidden words — strong regulatory constraints (likely pathological)" elif forbidden_pct > 20: interpretation = "Moderate forbidden words — some regulatory constraints" elif forbidden_pct > 0: interpretation = ( "Few forbidden words — flexible regulation (typically healthy)" ) else: interpretation = ( "No forbidden words — complete randomness (e.g., atrial fibrillation)" ) return { "forbidden_words": forbidden_words, "n_forbidden": n_forbidden, "n_possible": n_possible, "forbidden_percentage": forbidden_pct, "interpretation": interpretation, }
[docs] def compute_transition_matrix(self) -> np.ndarray: """ Compute symbol transition probability matrix. Returns ------- transition_matrix : numpy.ndarray Matrix of transition probabilities (n_symbols x n_symbols) Element [i,j] = P(next symbol is j | current symbol is i) Examples: --------- >>> sd = SymbolicDynamics(signal, n_symbols=4) >>> sd.symbolize() >>> trans = sd.compute_transition_matrix() >>> >>> # Visualize transition matrix >>> import matplotlib.pyplot as plt >>> plt.imshow(trans, cmap='hot', interpolation='nearest') >>> plt.colorbar(label='Transition Probability') >>> plt.xlabel('Next Symbol') >>> plt.ylabel('Current Symbol') >>> plt.title('Symbol Transition Matrix') """ if self.symbols is None: self.symbolize() # Initialize transition count matrix trans_counts = np.zeros((self.n_symbols, self.n_symbols)) # Count transitions for i in range(len(self.symbols) - 1): current_symbol = self.symbols[i] next_symbol = self.symbols[i + 1] trans_counts[current_symbol, next_symbol] += 1 # Convert to probabilities row_sums = trans_counts.sum(axis=1, keepdims=True) row_sums[row_sums == 0] = 1 # Avoid division by zero transition_matrix = trans_counts / row_sums return transition_matrix
[docs] def compute_renyi_entropy(self, alpha: float = 2.0) -> float: """ Compute Renyi entropy (generalized entropy measure). Parameters ---------- alpha : float Order parameter - alpha=0: Hartley entropy (log of number of distinct symbols) - alpha=1: Shannon entropy (limit as alpha→1) - alpha=2: Collision entropy - alpha=∞: Min-entropy Returns ------- renyi_entropy : float Renyi entropy value Formula: -------- H_α = (1/(1-α)) * log2(Σ p_i^α) where p_i are symbol probabilities. Clinical Use: ------------- Different alpha values emphasize different aspects: - α < 1: Emphasizes rare events - α > 1: Emphasizes common events - α = 2: Good balance, computationally efficient """ if self.symbols is None: self.symbolize() # Count symbol frequencies symbol_counts = Counter(self.symbols) total_symbols = len(self.symbols) # Calculate probabilities probabilities = np.array( [count / total_symbols for count in symbol_counts.values()] ) # Handle special cases if alpha == 1.0: # Shannon entropy (alpha→1 limit of Renyi entropy) result = self.compute_shannon_entropy() return ( float(result["entropy"]) if isinstance(result, dict) else float(result) ) if alpha == 0.0: # Hartley entropy return np.log2(len(probabilities)) if np.isinf(alpha): # Min-entropy return -np.log2(np.max(probabilities)) # General Renyi entropy renyi = (1 / (1 - alpha)) * np.log2(np.sum(probabilities**alpha)) return renyi
[docs] def compute_permutation_entropy(self, order: int = 3) -> float: """ Compute Permutation Entropy. Permutation entropy analyzes the order relationships between consecutive values, making it robust to noise and monotonic transformations. Parameters ---------- order : int Order of permutation patterns (default: 3) Typical range: 3-7 Returns ------- perm_entropy : float Permutation entropy value Algorithm: ---------- 1. Extract overlapping windows of length 'order' 2. Determine ranking permutation for each window 3. Count frequency of each permutation pattern 4. Calculate Shannon entropy of permutation distribution Advantages: ----------- - Robust to noise - Fast computation - Conceptually simple - Good for nonlinear signals References: ----------- Bandt, C., & Pompe, B. (2002). Permutation entropy: a natural complexity measure for time series. Physical review letters, 88(17), 174102. Examples: --------- >>> sd = SymbolicDynamics(signal) >>> pe = sd.compute_permutation_entropy(order=3) >>> print(f"Permutation Entropy: {pe:.4f}") """ signal = self.signal n = len(signal) if n < order: raise ValueError(f"Signal length ({n}) < order ({order})") # Extract permutation patterns patterns = [] for i in range(n - order + 1): # Get window window = signal[i : i + order] # Determine permutation (argsort gives ranking) perm = tuple(np.argsort(window)) patterns.append(perm) # Count pattern frequencies pattern_counts = Counter(patterns) total_patterns = len(patterns) # Calculate probabilities probabilities = np.array( [count / total_patterns for count in pattern_counts.values()] ) # Shannon entropy of permutations — guard zeros before log, not inside it safe_probs = np.where(probabilities > 0, probabilities, 1.0) perm_entropy = -np.sum(probabilities * np.log2(safe_probs)) # Normalize by maximum possible entropy max_entropy = np.log2(math.factorial(order)) normalized_pe = float(perm_entropy / max_entropy) if max_entropy > 0 else 0.0 return { "permutation_entropy": float(perm_entropy), "normalized_pe": normalized_pe, "max_entropy": float(max_entropy), "n_patterns": len(pattern_counts), "order": order, }
[docs] def compute_symbolic_features(self) -> Dict[str, float]: """ Convenience method that computes all symbolic dynamics features. Returns: dict: Dictionary containing all symbolic dynamics metrics: - 'shannon_entropy': Shannon entropy of symbol distribution - 'renyi_entropy': Renyi entropy (alpha=2) - 'permutation_entropy': Permutation entropy (order=3) - 'num_words': Total number of words in symbol sequence - 'num_forbidden_words': Number of forbidden word patterns Example: >>> nn_intervals = [800, 810, 790, 805, 795, 820, 780, 815] >>> sd = SymbolicDynamics(nn_intervals) >>> features = sd.compute_symbolic_features() >>> print(f"Shannon Entropy: {features['shannon_entropy']:.3f}") """ # Ensure symbolization is performed if ( not hasattr(self, "symbols") or self.symbols is None or len(self.symbols) == 0 ): self.symbolize() # Compute all features (methods now return dicts; extract key values) shannon_result = self.compute_shannon_entropy() perm_result = self.compute_permutation_entropy(order=3) word_result = self.compute_word_distribution() forbidden_result = self.detect_forbidden_words() features = { "shannon_entropy": shannon_result["entropy"], "normalized_shannon_entropy": shannon_result["normalized_entropy"], "renyi_entropy": self.compute_renyi_entropy(alpha=2.0), "permutation_entropy": perm_result["permutation_entropy"], "normalized_permutation_entropy": perm_result["normalized_pe"], "num_words": word_result["unique_words"], "num_forbidden_words": forbidden_result["n_forbidden"], "forbidden_percentage": forbidden_result["forbidden_percentage"], } return features
# Export main class __all__ = ["SymbolicDynamics"]