"""
Transfer Entropy Module for Coupling Analysis
==============================================
This module provides transfer entropy methods for analyzing directional information
flow and coupling between physiological signals.
Implemented Methods:
--------------------
1. Transfer Entropy (TE)
2. Conditional Transfer Entropy
3. Time-Delayed Transfer Entropy
4. Normalized Transfer Entropy
5. Effective Transfer Entropy
Clinical Applications:
----------------------
- Cardio-respiratory coupling analysis
- Brain-heart interaction
- Autonomic nervous system assessment
- Multi-organ system dynamics
- Baroreflex sensitivity
- Neurovascular coupling
Mathematical Background:
------------------------
Transfer entropy quantifies the directed (causal) information flow from one
time series to another. Unlike correlation, it captures nonlinear relationships
and distinguishes the direction of influence.
TE from X to Y measures how much uncertainty about the future of Y is reduced
by knowing the past of X, given the past of Y.
References:
-----------
1. Schreiber, T. (2000). Measuring information transfer. Physical review letters,
85(2), 461.
2. Faes, L., Nollo, G., & Porta, A. (2011). Information-based detection of
nonlinear Granger causality in multivariate processes via a nonuniform
embedding technique. Physical Review E, 83(5), 051112.
3. Barnett, L., Barrett, A. B., & Seth, A. K. (2009). Granger causality and
transfer entropy are equivalent for Gaussian variables. Physical review
letters, 103(23), 238701.
4. Vakorin, V. A., Krakovska, O. A., & McIntosh, A. R. (2009). Confounding
effects of indirect connections on causality estimation. Journal of
neuroscience methods, 184(1), 152-160.
Date: October 10, 2025
Version: 1.0
"""
"""
Physiological Features Module for Physiological Signal Processing
This module provides comprehensive capabilities for physiological
signal processing including ECG, PPG, EEG, and other vital signs.
Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0
Key Features:
- Object-oriented design with comprehensive classes
- Multiple processing methods and functions
- NumPy integration for numerical computations
- SciPy integration for advanced signal processing
- Interactive visualization capabilities
Examples:
---------
Basic usage:
>>> import numpy as np
>>> from vitalDSP.physiological_features.transfer_entropy import TransferEntropy
>>> signal = np.random.randn(1000)
>>> te = TransferEntropy(source, target, k_coef=1, l_coef=1)
>>> te_val = te.compute_transfer_entropy()
>>> te_fwd, te_bwd = te.compute_bidirectional_te()
>>> print(f'Processing result: {result}')
"""
import numpy as np
from scipy.spatial import cKDTree
from typing import Tuple, Optional, List
import warnings
[docs]
class TransferEntropy:
"""
Transfer Entropy analysis for directional coupling between signals.
Transfer Entropy (TE) quantifies the directional information flow from
a source signal to a target signal, revealing causal relationships.
Parameters
----------
source : numpy.ndarray
Source time series (potential driver)
target : numpy.ndarray
Target time series (potentially driven)
k : int, optional
History length (embedding dimension) for target (default: 1)
l : int, optional
History length for source (default: 1)
delay : int, optional
Time delay for embedding (default: 1)
n_bins : int, optional
Number of bins for histogram estimation (default: None, uses KNN)
k_neighbors : int, optional
Number of nearest neighbors for KNN estimation (default: 3)
Attributes
----------
source : numpy.ndarray
Source signal
target : numpy.ndarray
Target signal
k : int
Target history length
l : int
Source history length
delay : int
Embedding delay
Methods
-------
compute_transfer_entropy()
Compute TE from source to target
compute_bidirectional_te()
Compute TE in both directions
compute_time_delayed_te(max_delay)
TE across multiple time delays
compute_effective_te()
Normalized effective TE
test_significance(n_surrogates)
Statistical significance testing
Examples
--------
>>> # Analyze cardio-respiratory coupling
>>> from vitalDSP.physiological_features.transfer_entropy import TransferEntropy
>>> import numpy as np
>>>
>>> # Heart rate (BPM) and respiration rate
>>> heart_rate = np.array([...]) # Time series of HR
>>> resp_rate = np.array([...]) # Time series of respiration
>>>
>>> # Compute transfer entropy
>>> te = TransferEntropy(resp_rate, heart_rate, k=1, l=1)
>>>
>>> # Respiratory influence on heart rate
>>> te_resp_to_hr = te.compute_transfer_entropy()
>>> print(f"TE(Resp → HR): {te_resp_to_hr:.4f}")
>>>
>>> # Bidirectional coupling
>>> te_forward, te_backward = te.compute_bidirectional_te()
>>> print(f"TE(Resp → HR): {te_forward:.4f}")
>>> print(f"TE(HR → Resp): {te_backward:.4f}")
>>>
>>> # Net directional influence
>>> net_te = te_forward - te_backward
>>> if net_te > 0:
... print("Respiration drives heart rate")
>>> else:
... print("Heart rate drives respiration")
Notes
-----
**Interpretation:**
- **TE > 0:** Information flows from source to target
- **TE ≈ 0:** No directional coupling detected
- **TE < 0:** Should not occur (implementation error)
**Comparison with Bidirectional TE:**
- If TE(X→Y) > TE(Y→X): X predominantly drives Y
- If TE(X→Y) ≈ TE(Y→X): Bidirectional coupling or common drive
- Significance testing required to confirm non-zero values
**Parameter Guidelines:**
- **k, l:** Start with 1, increase if signals have memory
- **delay:** Typically 1 for high sampling rate, larger for slower dynamics
- **k_neighbors:** 3-5 for most applications
**Computational Considerations:**
- Uses KNN estimation (Kraskov method) for continuous signals
- Time complexity: O(N log N) with KD-trees
- Requires signals of same length
- Stationary signals recommended
"""
def __init__(
self,
source: np.ndarray,
target: np.ndarray,
k_coef: int = 1,
l_coef: int = 1,
delay: int = 1,
n_bins: Optional[int] = None,
k_neighbors: int = 3,
k: Optional[int] = None,
l: Optional[int] = None,
):
"""
Initialize Transfer Entropy analyzer.
Parameters
----------
source : numpy.ndarray
Source signal
target : numpy.ndarray
Target signal
k_coef : int
Target history length (also accepted as ``k``).
l_coef : int
Source history length (also accepted as ``l``).
delay : int
Time delay
n_bins : int, optional
Binning (None for KNN)
k_neighbors : int
Neighbors for KNN
k : int, optional
Alias for ``k_coef``.
l : int, optional
Alias for ``l_coef``.
"""
# Support legacy/shorthand keyword aliases
if k is not None:
k_coef = k
if l is not None:
l_coef = l
# Input validation
if not isinstance(source, np.ndarray):
source = np.array(source)
if not isinstance(target, np.ndarray):
target = np.array(target)
if len(source) != len(target):
raise ValueError(
f"Source and target must have same length. "
f"Got {len(source)} and {len(target)}."
)
if len(source) < max(k_coef, l_coef) * delay + 10:
raise ValueError(
f"Signals too short ({len(source)} samples) for "
f"k={k_coef}, l={l_coef}, delay={delay}."
)
if k_coef < 1 or l_coef < 1:
raise ValueError(f"k and l must be >= 1. Got k={k_coef}, l={l_coef}")
if delay < 1:
raise ValueError(f"delay must be >= 1. Got {delay}")
if k_neighbors < 1:
raise ValueError(f"k_neighbors must be >= 1. Got {k_neighbors}")
self.source = source
self.target = target
self.k = k_coef
self.l = l_coef
self.delay = delay
self.n_bins = n_bins
self.k_neighbors = k_neighbors
def _create_embedding(
self, signal: np.ndarray, dimension: int, delay: int
) -> np.ndarray:
"""
Create time-delay embedding (phase space reconstruction).
Parameters
----------
signal : numpy.ndarray
Input signal
dimension : int
Embedding dimension
delay : int
Time delay
Returns
-------
embedded : numpy.ndarray
Embedded vectors (n_vectors x dimension)
Mathematical Background:
------------------------
Time-delay embedding reconstructs the state space from a scalar time series:
X(t) = [x(t), x(t-τ), x(t-2τ), ..., x(t-(d-1)τ)]
where:
- d = dimension
- τ = delay
- t = time index
This is based on Takens' embedding theorem.
References:
-----------
Takens, F. (1981). Detecting strange attractors in turbulence.
In Dynamical systems and turbulence, Warwick 1980 (pp. 366-381).
"""
n = len(signal)
n_vectors = n - (dimension - 1) * delay
if n_vectors < 1:
raise ValueError(
f"Signal too short for embedding. Need at least "
f"{(dimension - 1) * delay + 1} samples."
)
# Create embedded vectors
embedded = np.zeros((n_vectors, dimension))
for i in range(dimension):
start_idx = i * delay
end_idx = start_idx + n_vectors
embedded[:, i] = signal[start_idx:end_idx]
return embedded
def _estimate_entropy_knn(self, data: np.ndarray, k: int) -> float:
"""
Estimate entropy using k-nearest neighbors (Kraskov method).
Parameters
----------
data : numpy.ndarray
Data points (n_samples x n_dimensions)
k : int
Number of nearest neighbors
Returns
-------
entropy : float
Estimated entropy in nats (natural log)
Algorithm:
----------
Uses Kraskov-Stögbauer-Grassberger (KSG) estimator:
H(X) = ψ(N) - ψ(k) + ψ(k_i)
where:
- ψ = digamma function
- N = number of samples
- k = number of neighbors
- k_i = number of points within k-th neighbor distance
References:
-----------
Kraskov, A., Stögbauer, H., & Grassberger, P. (2004). Estimating
mutual information. Physical review E, 69(6), 066138.
"""
from scipy.special import digamma
n_samples, n_dims = data.shape
if n_samples < k + 1:
warnings.warn(
f"Too few samples ({n_samples}) for k={k}. Returning 0.", UserWarning
)
return 0.0
# Build KD-tree
tree = cKDTree(data)
# Find k-nearest neighbors for each point
distances, _ = tree.query(data, k=k + 1) # +1 to exclude self
# Distance to k-th neighbor
epsilon = distances[:, -1]
# Add small constant to avoid log(0)
epsilon = np.maximum(epsilon, 1e-10)
# Estimate entropy using KSG estimator
# H(X) ≈ -ψ(k) + ψ(N) + log(c_d) + (d/N) * Σ log(ε_i)
# where c_d is the volume of d-dimensional unit ball
# Volume of d-dimensional unit ball
from scipy.special import gamma
c_d = (np.pi ** (n_dims / 2)) / gamma(n_dims / 2 + 1)
# Average log distance
avg_log_dist = np.mean(np.log(epsilon))
# KSG entropy estimate
entropy = -digamma(k) + digamma(n_samples) + np.log(c_d) + n_dims * avg_log_dist
return entropy
def _estimate_mutual_information_knn(
self, x: np.ndarray, y: np.ndarray, k: int
) -> float:
"""
Estimate mutual information using KNN.
Parameters
----------
x : numpy.ndarray
First variable (n_samples x dim_x)
y : numpy.ndarray
Second variable (n_samples x dim_y)
k : int
Number of neighbors
Returns
-------
mi : float
Mutual information estimate
Formula:
--------
I(X;Y) = H(X) + H(Y) - H(X,Y)
where H is entropy and (X,Y) is joint distribution.
"""
# Reshape if 1D
if x.ndim == 1:
x = x.reshape(-1, 1)
if y.ndim == 1:
y = y.reshape(-1, 1)
# Joint distribution
xy = np.hstack([x, y])
# Estimate entropies
h_x = self._estimate_entropy_knn(x, k)
h_y = self._estimate_entropy_knn(y, k)
h_xy = self._estimate_entropy_knn(xy, k)
# Mutual information
mi = h_x + h_y - h_xy
# Ensure non-negative (numerical precision)
mi = max(0.0, mi)
return mi
[docs]
def compute_transfer_entropy(self) -> float:
"""
Compute transfer entropy from source to target.
Returns
-------
te : float
Transfer entropy value in nats
Formula:
--------
TE(X→Y) = I(Y_future; X_past | Y_past)
More formally:
TE(X→Y) = H(Y_t | Y_past) - H(Y_t | Y_past, X_past)
where:
- Y_t = target at time t
- Y_past = k past values of target
- X_past = l past values of source
Algorithm Steps:
----------------
1. Create embeddings for target history (k values)
2. Create embeddings for source history (l values)
3. Extract future target values
4. Compute conditional mutual information
5. Return TE estimate
Examples:
---------
>>> te_analyzer = TransferEntropy(x, y, k=1, l=1)
>>> te_value = te_analyzer.compute_transfer_entropy()
>>>
>>> # Convert nats to bits
>>> te_bits = te_value / np.log(2)
>>> print(f"TE: {te_bits:.4f} bits")
Clinical Interpretation:
------------------------
- **Cardio-respiratory:**
- Healthy: Moderate bidirectional coupling
- Sleep apnea: Reduced respiratory → cardiac TE
- Heart failure: Altered coupling patterns
- **Brain-heart:**
- Mental stress: Increased brain → heart TE
- Relaxation: Reduced directional coupling
Notes:
------
- Returns value in nats (natural logarithm base)
- Convert to bits by dividing by ln(2)
- Significance should be tested with surrogate data
"""
# Create embeddings
# Target history: Y(t-delay), Y(t-2*delay), ..., Y(t-k*delay)
target_past = self._create_embedding(self.target[:-1], self.k, self.delay)
# Source history: X(t-delay), X(t-2*delay), ..., X(t-l*delay)
source_past = self._create_embedding(self.source[:-1], self.l, self.delay)
# Align to same time points — clamp to 0 to guard when k > l or l > k
max_lookback = max(self.k, self.l) * self.delay
target_past = target_past[max(0, max_lookback - self.k * self.delay) :]
source_past = source_past[max(0, max_lookback - self.l * self.delay) :]
# Future target: Y(t)
target_future = self.target[max_lookback:].reshape(-1, 1)
# Ensure same length
min_len = min(len(target_past), len(source_past), len(target_future))
target_past = target_past[:min_len]
source_past = source_past[:min_len]
target_future = target_future[:min_len]
# Compute Transfer Entropy
# TE = I(Y_future; X_past | Y_past)
# = I(Y_future, X_past; Y_past) - I(X_past; Y_past)
# Method 1: Using conditional MI
# TE = H(Y_future | Y_past) - H(Y_future | Y_past, X_past)
# Joint of target past and future
y_past_future = np.hstack([target_past, target_future])
# Joint of target past, source past, and target future
y_x_past_future = np.hstack([target_past, source_past, target_future])
# Estimate entropies
h_y_future_given_y_past = self._estimate_entropy_knn(
y_past_future, self.k_neighbors
) - self._estimate_entropy_knn(target_past, self.k_neighbors)
h_y_future_given_y_x_past = self._estimate_entropy_knn(
y_x_past_future, self.k_neighbors
) - self._estimate_entropy_knn(
np.hstack([target_past, source_past]), self.k_neighbors
)
# Transfer Entropy
te = h_y_future_given_y_past - h_y_future_given_y_x_past
# Ensure non-negative
te = max(0.0, te)
return te
[docs]
def compute_bidirectional_te(self) -> Tuple[float, float]:
"""
Compute transfer entropy in both directions.
Returns
-------
te_forward : float
TE from source to target
te_backward : float
TE from target to source
Examples:
---------
>>> te = TransferEntropy(resp, hr)
>>> te_resp_hr, te_hr_resp = te.compute_bidirectional_te()
>>>
>>> # Net directional coupling
>>> net_coupling = te_resp_hr - te_hr_resp
>>> dominant_direction = "Resp → HR" if net_coupling > 0 else "HR → Resp"
>>> print(f"Dominant direction: {dominant_direction}")
>>> print(f"Coupling asymmetry: {abs(net_coupling):.4f}")
Interpretation:
---------------
Comparing bidirectional TE reveals:
1. **Dominant Direction:**
- TE(X→Y) >> TE(Y→X): X drives Y
- TE(X→Y) << TE(Y→X): Y drives X
- TE(X→Y) ≈ TE(Y→X): Bidirectional or common drive
2. **Coupling Strength:**
- Sum = TE(X→Y) + TE(Y→X): Total coupling
- Difference = abs(TE(X→Y) - TE(Y→X)): Directional asymmetry
"""
# Forward: source → target
te_forward = self.compute_transfer_entropy()
# Backward: target → source (swap signals)
te_backward_analyzer = TransferEntropy(
self.target,
self.source,
k_coef=self.l,
l_coef=self.k, # Swap k and l
delay=self.delay,
k_neighbors=self.k_neighbors,
)
te_backward = te_backward_analyzer.compute_transfer_entropy()
net_te = te_forward - te_backward
total = te_forward + te_backward
ratio = (te_forward / te_backward) if te_backward > 0 else float("inf")
if net_te > 0.01 * total:
interpretation = "Source predominantly drives target"
elif net_te < -0.01 * total:
interpretation = "Target predominantly drives source"
else:
interpretation = "Bidirectional coupling or common drive"
return {
"te_forward": float(te_forward),
"te_backward": float(te_backward),
"net_te": float(net_te),
"net_coupling": float(net_te),
"ratio": float(ratio),
"dominant_direction": "source→target" if net_te >= 0 else "target→source",
"interpretation": interpretation,
}
[docs]
def compute_time_delayed_te(self, max_delay: int = 10) -> np.ndarray:
"""
Compute transfer entropy across multiple time delays.
Parameters
----------
max_delay : int
Maximum time delay to test
Returns
-------
te_values : numpy.ndarray
TE values for each delay (length: max_delay)
Purpose:
--------
Different physiological processes operate at different time scales.
Time-delayed TE reveals the temporal dynamics of coupling.
Examples:
---------
>>> te = TransferEntropy(source, target)
>>> te_delays = te.compute_time_delayed_te(max_delay=20)
>>>
>>> # Find optimal delay
>>> optimal_delay = np.argmax(te_delays) + 1
>>> print(f"Peak coupling at delay: {optimal_delay}")
>>>
>>> # Plot delay profile
>>> import matplotlib.pyplot as plt
>>> delays = np.arange(1, 21)
>>> plt.plot(delays, te_delays, 'o-')
>>> plt.xlabel('Time Delay')
>>> plt.ylabel('Transfer Entropy')
>>> plt.title('TE vs Time Delay')
>>> plt.grid(True)
Clinical Significance:
----------------------
- **Short delays (1-3):** Immediate physiological responses
- **Medium delays (5-10):** Regulatory mechanisms
- **Long delays (>10):** Slow adaptive processes
"""
te_values = []
delays = list(range(1, max_delay + 1))
for delay_val in delays:
# Create TE analyzer with this delay
te_analyzer = TransferEntropy(
self.source,
self.target,
k_coef=self.k,
l_coef=self.l,
delay=delay_val,
k_neighbors=self.k_neighbors,
)
try:
te = te_analyzer.compute_transfer_entropy()
te_values.append(te)
except Exception as e:
warnings.warn(
f"Failed to compute TE at delay {delay_val}: {str(e)}. Using 0.",
UserWarning,
)
te_values.append(0.0)
te_array = np.array(te_values)
optimal_idx = int(np.argmax(te_array))
return {
"te_values": te_array,
"delays": delays,
"optimal_delay": delays[optimal_idx],
"optimal_te": float(te_array[optimal_idx]),
}
[docs]
def compute_effective_te(self) -> float:
"""
Compute normalized effective transfer entropy.
Returns
-------
effective_te : float
Normalized TE in range [0, 1]
Formula:
--------
Effective TE = TE / H(target_future | target_past)
Normalization provides:
- Scale-independent measure
- Interpretability as fraction of uncertainty reduced
- Easier comparison across different signal pairs
Examples:
---------
>>> te_analyzer = TransferEntropy(x, y)
>>> eff_te = te_analyzer.compute_effective_te()
>>> print(f"Effective TE: {eff_te:.2%}")
"""
# Compute TE
te = self.compute_transfer_entropy()
# Compute H(Y_future | Y_past) for normalization
target_past = self._create_embedding(self.target[:-1], self.k, self.delay)
max_lookback = self.k * self.delay
target_future = self.target[max_lookback:].reshape(-1, 1)
min_len = min(len(target_past), len(target_future))
target_past = target_past[:min_len]
target_future = target_future[:min_len]
y_past_future = np.hstack([target_past, target_future])
h_y_future_given_y_past = self._estimate_entropy_knn(
y_past_future, self.k_neighbors
) - self._estimate_entropy_knn(target_past, self.k_neighbors)
# Normalize
if h_y_future_given_y_past > 0:
effective_te = te / h_y_future_given_y_past
else:
effective_te = 0.0
# Clip to [0, 1]
effective_te = np.clip(effective_te, 0.0, 1.0)
return effective_te
[docs]
def test_significance(
self, n_surrogates: int = 100, method: str = "shuffle"
) -> Tuple[float, float]:
"""
Test statistical significance of transfer entropy.
Parameters
----------
n_surrogates : int
Number of surrogate datasets
method : str
Surrogate generation method
- 'shuffle': Random permutation (destroys temporal structure)
- 'phase': Phase randomization (preserves power spectrum)
Returns
-------
p_value : float
Statistical significance (0-1)
te_original : float
Original TE value
Algorithm:
----------
1. Compute TE for original data
2. Generate n_surrogates by shuffling source signal
3. Compute TE for each surrogate
4. p-value = fraction of surrogates with TE >= original TE
Examples:
---------
>>> te = TransferEntropy(x, y)
>>> p_value, te_value = te.test_significance(n_surrogates=1000)
>>>
>>> if p_value < 0.05:
... print(f"Significant coupling (p={p_value:.4f})")
>>> else:
... print(f"No significant coupling (p={p_value:.4f})")
Notes:
------
- p < 0.05: Significant coupling
- p < 0.01: Highly significant
- More surrogates = more reliable p-value
- Computationally expensive for large n_surrogates
"""
# Compute original TE
te_original = self.compute_transfer_entropy()
# Generate surrogates and compute TE
surrogate_tes = []
for _ in range(n_surrogates):
if method == "shuffle":
# Shuffle source signal
surrogate_source = np.random.permutation(self.source)
elif method == "phase":
# Phase randomization (preserves power spectrum and conjugate symmetry)
fft = np.fft.fft(self.source)
n = len(fft)
phases = np.random.uniform(0, 2 * np.pi, n // 2 - 1)
fft_randomized = fft.copy()
fft_randomized[1 : n // 2] = np.abs(fft[1 : n // 2]) * np.exp(
1j * phases
)
if n % 2 == 0:
fft_randomized[n // 2] = np.abs(fft[n // 2])
fft_randomized[n // 2 + 1 :] = np.conj(
fft_randomized[1 : n - n // 2][::-1]
)
surrogate_source = np.real(np.fft.ifft(fft_randomized))
else:
raise ValueError(f"Unknown method: {method}")
# Compute TE for surrogate
te_surrogate_analyzer = TransferEntropy(
surrogate_source,
self.target,
k_coef=self.k,
l_coef=self.l,
delay=self.delay,
k_neighbors=self.k_neighbors,
)
try:
te_surrogate = te_surrogate_analyzer.compute_transfer_entropy()
surrogate_tes.append(te_surrogate)
except Exception as e:
warnings.warn(
f"Failed to compute TE for surrogate: {str(e)}. Using 0.",
UserWarning,
)
continue
# Compute p-value and summary statistics
if surrogate_tes:
surrogate_arr = np.array(surrogate_tes)
p_value = float(np.mean(surrogate_arr >= te_original))
surr_mean = float(np.mean(surrogate_arr))
surr_std = float(np.std(surrogate_arr))
effect_size = (te_original - surr_mean) / surr_std if surr_std > 0 else 0.0
else:
warnings.warn(
"All surrogate calculations failed. Cannot compute p-value.",
UserWarning,
)
p_value = 1.0
surr_mean = 0.0
surr_std = 0.0
effect_size = 0.0
if p_value < 0.01:
sig_label = "Highly significant (p < 0.01)"
elif p_value < 0.05:
sig_label = "Significant (p < 0.05)"
else:
sig_label = "Not significant (p >= 0.05)"
return {
"p_value": p_value,
"te_original": float(te_original),
"te_surrogates_mean": surr_mean,
"te_surrogates_std": surr_std,
"effect_size": float(effect_size),
"significance": sig_label,
}
# Export main class
__all__ = ["TransferEntropy"]