Advanced Computation

This section covers the advanced computational techniques provided by the VitalDSP library. These methods are designed to handle complex signal processing tasks, including machine learning, anomaly detection, Bayesian analysis, and advanced signal processing algorithms.

Overview

The advanced computation module provides cutting-edge algorithms for:

  • Machine Learning: Neural networks, reinforcement learning, and adaptive filtering

  • Anomaly Detection: Real-time and batch anomaly detection algorithms

  • Bayesian Analysis: Probabilistic modeling and optimization

  • Signal Decomposition: EMD, harmonic separation, and sparse processing

  • Nonlinear Analysis: Complex system analysis and chaos theory

  • Multimodal Fusion: Multi-signal integration and analysis

Machine Learning and AI

Neural Network Filtering

Advanced filtering techniques leveraging neural networks to process and enhance biomedical signals.

Advanced Computation Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0

Key Features: - Object-oriented design with comprehensive classes - Multiple processing methods and functions - NumPy integration for numerical computations - Configurable parameters and settings - Advanced filtering algorithms

Examples:

Basic usage:
>>> import numpy as np
>>> from vitalDSP.advanced_computation.neural_network_filtering import NeuralNetworkFiltering
>>> signal = np.random.randn(1000)
>>> nn = NeuralNetworkFiltering(input_size=100)
>>> filtered = nn.apply_filter(signal)
class vitalDSP.advanced_computation.neural_network_filtering.ConvolutionalNetwork(dropout_rate, batch_norm)[source]

Bases: object

predict(X)[source]
train(X, y, learning_rate, epochs, batch_size)[source]
class vitalDSP.advanced_computation.neural_network_filtering.FeedforwardNetwork(hidden_layers, dropout_rate, batch_norm)[source]

Bases: object

predict(X)[source]
train(X, y, learning_rate, epochs, batch_size)[source]
class vitalDSP.advanced_computation.neural_network_filtering.NeuralNetworkFiltering(signal, network_type='feedforward', hidden_layers=[64, 64], learning_rate=0.001, epochs=100, batch_size=32, dropout_rate=0.5, batch_norm=True, recurrent_type='lstm')[source]

Bases: object

A comprehensive neural network-based filtering approach for adaptive signal processing.

This class supports various neural network architectures, including feedforward, convolutional, and recurrent networks. It includes advanced features such as dropout for regularization, batch normalization for faster convergence, and customizable training options.

train : method

Trains the neural network on the given signal.

apply_filter : method

Applies the trained neural network to filter the signal.

evaluate : method

Evaluates the performance of the neural network on a test signal.

Example Usage
-------------
signal = np.sin(np.linspace(0, 10, 100)) + np.random.normal(0, 0.1, 100)
nn_filter = NeuralNetworkFiltering(signal, network_type='feedforward', hidden_layers=[64, 64], epochs=100)
# Train the neural network filter
nn_filter.train()
# Apply the trained filter
filtered_signal = nn_filter.apply_filter()
print("Filtered Signal:", filtered_signal)
# Evaluate the filter on a test signal
test_signal = np.sin(np.linspace(0, 10, 100)) + np.random.normal(0, 0.1, 100)
mse = nn_filter.evaluate(test_signal)
print("Mean Squared Error on Test Signal:", mse)
apply_filter()[source]

Apply the trained neural network to filter the signal.

The trained neural network is used to predict and filter the input signal.

Returns:

The filtered signal after applying the neural network.

Return type:

numpy.ndarray

evaluate(test_signal)[source]

Evaluate the performance of the neural network on a test signal.

The method calculates the Mean Squared Error (MSE) between the predicted and actual values of the test signal.

Parameters:

test_signal (numpy.ndarray) – The test signal to evaluate the neural network’s performance.

Returns:

The mean squared error (MSE) of the neural network on the test signal.

Return type:

float

train()[source]

Train the neural network on the given signal.

The signal is divided into input-output pairs for supervised training. This method prepares the data, then trains the network using the specified learning rate, number of epochs, and batch size.

class vitalDSP.advanced_computation.neural_network_filtering.RecurrentNetwork(recurrent_type='lstm', dropout_rate=0.5, batch_norm=True)[source]

Bases: object

predict(X)[source]
train(X, y, learning_rate, epochs, batch_size)[source]

Reinforcement Learning Filter

Filtering methods that use reinforcement learning to adaptively improve signal quality based on feedback loops.

Advanced Computation Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0

Key Features: - Object-oriented design with comprehensive classes - Multiple processing methods and functions - NumPy integration for numerical computations - Performance optimization - Advanced filtering algorithms

Examples:

Basic usage:
>>> import numpy as np
>>> from vitalDSP.advanced_computation.reinforcement_learning_filter import ReinforcementLearningFilter
>>> signal = np.random.randn(1000)
>>> rlf = ReinforcementLearningFilter(signal)
>>> filtered = rlf.apply_filter(signal)
class vitalDSP.advanced_computation.reinforcement_learning_filter.ReinforcementLearningFilter(signal, action_space, state_space=None)[source]

Bases: object

A comprehensive Reinforcement Learning Filter for adaptive signal processing. This class provides methods to train filters using reinforcement learning algorithms such as Q-learning, Deep Q-Networks (DQN), and Proximal Policy Optimization (PPO), and to apply these trained filters to signals.

train_q_learning(episodes=1000, alpha=0.1, gamma=0.99, epsilon=0.1)[source]

Trains a filter using Q-learning.

train_dqn(episodes=1000, batch_size=32, gamma=0.99, epsilon=0.1, target_update=10)[source]

Trains a filter using Deep Q-Networks (DQN).

train_ppo(epochs=1000, gamma=0.99, lambda_=0.95, clip_ratio=0.2)[source]

Trains a filter using Proximal Policy Optimization (PPO).

apply_filter()[source]

Applies the trained filter to the signal.

Example Usage
-------------
signal = np.sin(np.linspace(0, 10, 100)) + np.random.normal(0, 0.1, 100)
rl_filter = ReinforcementLearningFilter(signal, action_space=[-1, 0, 1])
# Train Q-learning based filter
rl_filter.train_q_learning(episodes=500, alpha=0.1, gamma=0.99, epsilon=0.1)
# Apply the trained filter
filtered_signal = rl_filter.apply_filter()
print("Filtered Signal (Q-Learning):", filtered_signal)
# Train DQN-based filter
rl_filter.train_dqn(episodes=500, batch_size=32, gamma=0.99, epsilon=0.1, target_update=10)
# Apply the trained filter
filtered_signal_dqn = rl_filter.apply_filter()
print("Filtered Signal (DQN):", filtered_signal_dqn)
# Train PPO-based filter
rl_filter.train_ppo(epochs=500, gamma=0.99, lambda_=0.95, clip_ratio=0.2)
# Apply the trained filter
filtered_signal_ppo = rl_filter.apply_filter()
print("Filtered Signal (PPO):", filtered_signal_ppo)
apply_filter()[source]

Apply the trained filter to the signal using the learned policy or Q-values.

Returns:

The filtered signal.

Return type:

numpy.ndarray

train_dqn(episodes=1000, batch_size=32, gamma=0.99, epsilon=0.1, target_update=10)[source]

Train the filter using Deep Q-Networks (DQN).

DQN approximates the Q-function using a neural network and uses experience replay to stabilize learning.

Parameters:
  • episodes (int, optional) – The number of training episodes (default is 1000).

  • batch_size (int, optional) – The batch size for experience replay (default is 32).

  • gamma (float, optional) – The discount factor (default is 0.99).

  • epsilon (float, optional) – The exploration rate for epsilon-greedy policy (default is 0.1).

  • target_update (int, optional) – The frequency of updating the target network (default is 10 episodes).

train_ppo(epochs=1000, gamma=0.99, lambda_=0.95, clip_ratio=0.2)[source]

Train the filter using Proximal Policy Optimization (PPO).

PPO is a policy gradient method that seeks to optimize policies while ensuring the updates don’t deviate too much from the previous policy.

Parameters:
  • epochs (int, optional) – The number of training epochs (default is 1000).

  • gamma (float, optional) – The discount factor (default is 0.99).

  • lambda (float, optional) – The GAE-lambda parameter (default is 0.95).

  • clip_ratio (float, optional) – The clipping parameter for PPO (default is 0.2).

train_q_learning(episodes=1000, alpha=0.1, gamma=0.99, epsilon=0.1)[source]

Train the filter using Q-learning.

Q-learning updates a Q-table where each state-action pair has a value. The algorithm iterates over episodes and updates the Q-values based on the reward received for each action taken.

Parameters:
  • episodes (int, optional) – The number of training episodes (default is 1000).

  • alpha (float, optional) – The learning rate (default is 0.1).

  • gamma (float, optional) – The discount factor (default is 0.99).

  • epsilon (float, optional) – The exploration rate for epsilon-greedy policy (default is 0.1).

class vitalDSP.advanced_computation.reinforcement_learning_filter.SimpleNeuralNetwork(input_size, output_size)[source]

Bases: object

get_weights()[source]
predict(state)[source]
set_weights(weights)[source]
train(state, action, target)[source]
class vitalDSP.advanced_computation.reinforcement_learning_filter.SimplePolicyNetwork(input_size, output_size)[source]

Bases: object

predict(state)[source]
sample_action(state)[source]
train(state, action, advantage)[source]
update(state, action, objective)[source]
class vitalDSP.advanced_computation.reinforcement_learning_filter.SimpleValueNetwork(input_size)[source]

Bases: object

predict(state)[source]

Predict the value of a given state.

Parameters:

state (numpy.ndarray) – The input state vector.

Returns:

The predicted value of the state.

Return type:

float

train(state, reward)[source]

Train the value network by updating weights based on the reward.

Parameters:
  • state (numpy.ndarray) – The input state vector.

  • reward (float) – The reward value used for training.

Anomaly Detection

Anomaly Detection

Techniques for identifying anomalies in biomedical signals, crucial for detecting irregularities and potential issues in real-time signal monitoring.

Anomaly Detection Module for Physiological Signal Processing

This module provides comprehensive anomaly detection capabilities for physiological signals including ECG, PPG, EEG, and other vital signs. It implements multiple detection methods including statistical approaches, machine learning techniques, and frequency-domain analysis for identifying unusual patterns and artifacts.

Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0

Key Features: - Statistical anomaly detection (Z-score, IQR) - Moving average and rolling window methods - Local Outlier Factor (LOF) for density-based detection - Fourier-based frequency domain analysis - Real-time streaming anomaly detection - Performance monitoring and optimization

Examples:

Basic Z-score anomaly detection:
>>> import numpy as np
>>> from vitalDSP.advanced_computation.anomaly_detection import AnomalyDetection
>>> signal = np.sin(np.linspace(0, 10, 100)) + np.random.normal(0, 0.1, 100)
>>> anomaly_detector = AnomalyDetection(signal)
>>> anomalies = anomaly_detector.detect_anomalies(method="z_score", threshold=2.0)
>>> print(f"Detected {len(anomalies)} anomalies")
Moving average detection:
>>> anomalies_ma = anomaly_detector.detect_anomalies(method="moving_average", window_size=5, threshold=0.5)
>>> print(f"Moving average anomalies: {len(anomalies_ma)}")
LOF-based detection:
>>> anomalies_lof = anomaly_detector.detect_anomalies(method="lof", n_neighbors=20)
>>> print(f"LOF anomalies: {len(anomalies_lof)}")
FFT-based detection:
>>> anomalies_fft = anomaly_detector.detect_anomalies(method="fft", threshold=0.1)
>>> print(f"FFT anomalies: {len(anomalies_fft)}")
class vitalDSP.advanced_computation.anomaly_detection.AnomalyDetection(signal)[source]

Bases: object

Comprehensive Anomaly Detection for detecting anomalies in real-time from streaming data.

This class offers multiple methods to detect anomalies in a given signal, including statistical methods, moving averages, Local Outlier Factor (LOF), and Fourier-based methods.

detect_anomalies : function

Detects anomalies using various methods including z-score, moving average, custom LOF, and more.

Examples

>>> import numpy as np
>>> from vitalDSP.advanced_computation.anomaly_detection import AnomalyDetection
>>>
>>> # Example 1: Z-score anomaly detection
>>> signal = np.sin(np.linspace(0, 10, 100)) + np.random.normal(0, 0.1, 100)
>>> anomaly_detector = AnomalyDetection(signal)
>>> anomalies_z_score = anomaly_detector.detect_anomalies(method="z_score", threshold=2.0)
>>> print(f"Z-score anomalies: {len(anomalies_z_score)}")
>>>
>>> # Example 2: Moving average anomaly detection
>>> anomalies_moving_avg = anomaly_detector.detect_anomalies(method="moving_average", window_size=5, threshold=0.5)
>>> print(f"Moving average anomalies: {len(anomalies_moving_avg)}")
>>>
>>> # Example 3: LOF anomaly detection
>>> anomalies_lof = anomaly_detector.detect_anomalies(method="lof", n_neighbors=20)
>>> print(f"LOF anomalies: {len(anomalies_lof)}")
>>>
>>> # Example 4: FFT-based anomaly detection
>>> anomalies_fft = anomaly_detector.detect_anomalies(method="fft", threshold=0.1)
>>> print(f"FFT anomalies: {len(anomalies_fft)}")
detect_anomalies(method='z_score', **kwargs)[source]

Detect anomalies in the signal using the specified method.

Parameters:
  • method (str, optional) – The method to use for detecting anomalies. Options: ‘z_score’, ‘moving_average’, ‘lof’, ‘fft’, ‘threshold’. Default is ‘z_score’.

  • **kwargs (additional arguments) – Additional parameters depending on the chosen method.

Returns:

Indices of the detected anomalies.

Return type:

numpy.ndarray

Raises:

ValueError – If the specified method is unknown.

Examples

>>> anomalies_z_score = anomaly_detector.detect_anomalies(method="z_score", threshold=2.0)
>>> anomalies_moving_avg = anomaly_detector.detect_anomalies(method="moving_average", window_size=5, threshold=0.5)

Real-Time Anomaly Detection

Real-time methods for detecting anomalies in streaming biomedical signals, crucial for monitoring and alert systems.

Advanced Computation Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0

Key Features: - Object-oriented design with comprehensive classes - Multiple processing methods and functions - NumPy integration for numerical computations - Pattern and anomaly detection

Examples:

Basic usage:
>>> import numpy as np
>>> from vitalDSP.advanced_computation.real_time_anomaly_detection import RealTimeAnomalyDetection
>>> signal = np.random.randn(1000)
>>> rtad = RealTimeAnomalyDetection(signal)
>>> anomalies = rtad.detect_statistical()
class vitalDSP.advanced_computation.real_time_anomaly_detection.RealTimeAnomalyDetection(window_size=10)[source]

Bases: object

Comprehensive Real-Time Anomaly Detection for detecting anomalies in streaming data.

This class supports multiple anomaly detection techniques including statistical methods, machine learning models, and deep learning models. It is designed for use in real-time environments with online learning capabilities.

detect_statistical : method

Detects anomalies using statistical methods like Z-score and moving average.

detect_knn : method

Detects anomalies using k-Nearest Neighbors (k-NN).

detect_svm : method

Detects anomalies using Support Vector Machine (SVM).

detect_autoencoder : method

Detects anomalies using Autoencoders.

detect_lstm : method

Detects anomalies using LSTM-based models.

detect_wavelet : method

Detects anomalies using wavelet transforms.

update_model : method

Updates the model with new data for online learning.

evaluate : method

Evaluates the performance of the anomaly detection method on a test dataset.

Example Usage
-------------
data_stream = np.sin(np.linspace(0, 10, 100)) + np.random.normal(0, 0.1, 100)
anomaly_detector = RealTimeAnomalyDetection(window_size=10)
# Detect anomalies using Z-score
for data_point in data_stream:

anomaly = anomaly_detector.detect_statistical(data_point, method=’z_score’, threshold=2.0) print(“Anomaly (Z-Score):”, anomaly)

# Train and detect anomalies using k-NN
anomaly_detector.train_knn(data_stream[:50])
for data_point in data_stream[50:]:

anomaly = anomaly_detector.detect_knn(data_point) print(“Anomaly (k-NN):”, anomaly)

detect_autoencoder(data_point, threshold=0.1)[source]

Detect anomalies using the Autoencoder method.

Parameters:
  • data_point (float) – The new data point to be analyzed.

  • threshold (float, optional) – The reconstruction error threshold for detecting anomalies. Default is 0.1.

Returns:

True if the data point is an anomaly, False otherwise.

Return type:

bool

detect_knn(data_point)[source]

Detect anomalies using the k-Nearest Neighbors (k-NN) method.

Parameters:

data_point (float) – The new data point to be analyzed.

Returns:

True if the data point is an anomaly, False otherwise.

Return type:

bool

detect_lstm(data_point, threshold=0.1)[source]

Detect anomalies using the LSTM-based model.

Parameters:
  • data_point (float) – The new data point to be analyzed.

  • threshold (float, optional) – The prediction error threshold for detecting anomalies. Default is 0.1.

Returns:

True if the data point is an anomaly, False otherwise.

Return type:

bool

detect_statistical(data_point, method='z_score', threshold=2.0, **kwargs)[source]

Detect anomalies using statistical methods like Z-score, moving average, etc.

Parameters:
  • data_point (float) – The new data point to be analyzed.

  • method (str, optional) – The statistical method to use (‘z_score’ or ‘moving_average’). Default is ‘z_score’.

  • threshold (float, optional) – The threshold value for detecting anomalies. Default is 2.0.

Returns:

True if the data point is an anomaly, False otherwise.

Return type:

bool

detect_svm(data_point)[source]

Detect anomalies using the Support Vector Machine (SVM) method.

Parameters:

data_point (float) – The new data point to be analyzed.

Returns:

True if the data point is an anomaly, False otherwise.

Return type:

bool

detect_wavelet(data_point, wavelet_name='haar', level=1, threshold=0.1)[source]

Detect anomalies using Wavelet Transform.

Parameters:
  • data_point (float) – The new data point to be analyzed.

  • wavelet_name (str, optional) – The name of the wavelet to use for the transform (default is ‘haar’).

  • level (int, optional) – The number of decomposition levels in the wavelet transform (default is 1).

  • threshold (float, optional) – The threshold for detecting anomalies in the wavelet coefficients (default is 0.1).

Returns:

True if the data point is an anomaly, False otherwise.

Return type:

bool

evaluate(test_data, model_type='knn')[source]

Evaluate the performance of the anomaly detection method on a test dataset.

Parameters:
  • test_data (numpy.ndarray) – The test dataset.

  • model_type (str, optional) – The type of model to evaluate (‘knn’, ‘svm’, ‘autoencoder’, ‘lstm’). Default is ‘knn’.

Returns:

The accuracy of the anomaly detection method on the test dataset.

Return type:

float

train_autoencoder(training_data, encoding_dim=3)[source]

Train an Autoencoder model on the training data.

Parameters:
  • training_data (numpy.ndarray) – The training dataset.

  • encoding_dim (int, optional) – The dimension of the encoding layer. Default is 3.

Return type:

None

train_knn(training_data, k=5)[source]

Train a k-Nearest Neighbors (k-NN) model on the training data.

Parameters:
  • training_data (numpy.ndarray) – The training dataset.

  • k (int, optional) – The number of nearest neighbors to consider. Default is 5.

Return type:

None

train_lstm(training_data, hidden_units=50)[source]

Train an LSTM-based model on the training data.

Parameters:
  • training_data (numpy.ndarray) – The training dataset.

  • hidden_units (int, optional) – The number of hidden units in the LSTM. Default is 50.

Return type:

None

train_svm(training_data, kernel='rbf')[source]

Train a Support Vector Machine (SVM) model on the training data.

Parameters:
  • training_data (numpy.ndarray) – The training dataset.

  • kernel (str, optional) – The kernel type for SVM (‘linear’, ‘poly’, ‘rbf’). Default is ‘rbf’.

Return type:

None

update_model(data_point, model_type='knn')[source]

Update the model with new data for online learning.

Parameters:
  • data_point (float) – The new data point to update the model.

  • model_type (str, optional) – The type of model to update (‘knn’, ‘svm’, ‘autoencoder’, ‘lstm’). Default is ‘knn’.

Return type:

None

class vitalDSP.advanced_computation.real_time_anomaly_detection.SimpleAutoencoder(training_data, encoding_dim=3)[source]

Bases: object

reconstruction_error(data_point)[source]

Compute the reconstruction error for a given data point.

Parameters:

data_point (float or np.array) – The data point to compute the reconstruction error for.

Returns:

The mean squared reconstruction error.

Return type:

float

Example

>>> error = model.reconstruction_error(np.array([1, 2, 3]))
update(data_point)[source]

Update the autoencoder with a new data point.

Parameters:

data_point (float or np.array) – The new data point to be added for online learning.

Example

>>> model.update(np.array([4, 5, 6]))
class vitalDSP.advanced_computation.real_time_anomaly_detection.SimpleLSTM(training_data, hidden_units=50)[source]

Bases: object

prediction_error(data_point)[source]
update(data_point)[source]
class vitalDSP.advanced_computation.real_time_anomaly_detection.SimpleSVM(training_data, kernel='rbf')[source]

Bases: object

predict(data_point)[source]
update(data_point)[source]

Bayesian Analysis

Bayesian Analysis

Bayesian methods applied to signal processing, offering a probabilistic approach to model and interpret signal data.

Advanced Computation Module for Physiological Signal Processing

This module provides Bayesian inference and optimization capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.1

Key Features: - Gaussian Process modeling for signal prediction - Bayesian Optimization for hyperparameter tuning - Expected Improvement acquisition function - Numerical stability improvements - Graceful error handling

Classes:

GaussianProcess

Implements Gaussian Process regression for probabilistic modeling.

BayesianOptimization

Implements Bayesian Optimization using Gaussian Processes.

Examples:

Example 1: Gaussian Process Prediction

>>> import numpy as np
>>> from vitalDSP.advanced_computation.bayesian_analysis import GaussianProcess
>>>
>>> # Create GP model
>>> gp = GaussianProcess(length_scale=1.0, noise=1e-5)
>>>
>>> # Training data
>>> X_train = np.array([[0.1], [0.4], [0.7]])
>>> y_train = np.sin(3 * X_train.flatten())
>>> gp.update(X_train, y_train)
>>>
>>> # Predict at new points
>>> X_new = np.array([[0.2], [0.5]])
>>> mean, variance = gp.predict(X_new)
>>> print(f"Predicted mean: {mean}")
>>> print(f"Predicted variance: {variance}")

Example 2: Bayesian Optimization for Hyperparameter Tuning

>>> from vitalDSP.advanced_computation.bayesian_analysis import BayesianOptimization
>>>
>>> # Define objective function to maximize
>>> def filter_performance(cutoff_freq):
...     # Simulate filter performance metric
...     cutoff = np.atleast_1d(cutoff_freq)[0]
...     return -(cutoff - 0.5)**2 + 1.0  # Peak at 0.5
>>>
>>> # Optimize cutoff frequency in range [0, 1]
>>> optimizer = BayesianOptimization(
...     func=filter_performance,
...     bounds=(0, 1),
...     length_scale=0.1,
...     noise=1e-5
... )
>>>
>>> # Run optimization
>>> best_cutoff, best_performance = optimizer.optimize(n_iter=20, random_seed=42)
>>> print(f"Best cutoff frequency: {best_cutoff[0]:.4f}")
>>> print(f"Best performance: {best_performance:.4f}")

Example 3: ECG Filter Parameter Optimization

>>> from vitalDSP.filtering.signal_filtering import SignalFiltering
>>>
>>> # Simulated ECG signal
>>> ecg_signal = np.sin(2*np.pi*1.2*np.linspace(0, 10, 1000)) + 0.1*np.random.randn(1000)
>>>
>>> # Define objective: minimize noise while preserving signal
>>> def filter_quality(params):
...     cutoff = np.atleast_1d(params)[0]
...     sf = SignalFiltering(ecg_signal)
...     filtered = sf.butterworth_lowpass(cutoff_freq=cutoff, fs=100, order=4)
...     # Quality metric: SNR-like measure
...     signal_power = np.var(filtered)
...     noise_power = np.var(ecg_signal - filtered)
...     return signal_power / (noise_power + 1e-10)
>>>
>>> # Optimize filter cutoff
>>> optimizer = BayesianOptimization(filter_quality, bounds=(0.1, 10))
>>> best_cutoff, best_snr = optimizer.optimize(n_iter=15, random_seed=42)
>>> print(f"Optimal cutoff: {best_cutoff[0]:.2f} Hz, SNR: {best_snr:.2f}")

Notes:

  • Numerical Stability: The default noise parameter (1e-5) provides good numerical stability. Avoid using values smaller than 1e-7 as they can cause singular matrix errors.

  • Initialization: BayesianOptimization starts with 3 random samples to bootstrap the Gaussian Process before beginning optimization.

  • Fallback: If numerical issues occur during optimization, the algorithm falls back to random sampling to ensure it always returns a result.

class vitalDSP.advanced_computation.bayesian_analysis.BayesianOptimization(func, bounds, length_scale=1.0, noise=1e-05)[source]

Bases: object

A class implementing Bayesian Optimization for parameter tuning.

Bayesian Optimization is an efficient method for finding the minimum or maximum of an objective function, especially when the function is expensive to evaluate. It uses a Gaussian Process model to predict the function’s behavior and an acquisition function to determine the next point to sample.

optimize(n_iter=10, random_seed=None)[source]

Perform Bayesian optimization to find the best parameters.

acquisition(X, xi=0.01)[source]

Compute the Expected Improvement (EI) at new points.

propose_location(n_restarts=10)[source]

Propose the next sampling location based on the acquisition function.

Example Usage
-------------
>>> def objective_function(x):
>>>     return -np.sin(3 * x) - x ** 2 + 0.7 * x
>>>
>>> bayesian_optimizer = BayesianOptimization(objective_function, bounds=(0, 2))
>>> best_x, best_y = bayesian_optimizer.optimize(n_iter=10)
>>> print("Best X:", best_x)
>>> print("Best Y:", best_y)
acquisition(X, xi=0.01)[source]

Expected Improvement (EI) acquisition function.

The EI acquisition function is used to balance exploration and exploitation in Bayesian Optimization. It selects points that have the potential to improve upon the best observed value.

Parameters:
  • X (numpy.ndarray) – The points at which to evaluate the acquisition function.

  • xi (float, optional) – Exploration-exploitation trade-off parameter (default is 0.01).

Returns:

The EI values at the provided points.

Return type:

numpy.ndarray

optimize(n_iter=10, random_seed=None)[source]

Perform Bayesian optimization to find the best parameters.

Parameters:
  • n_iter (int, optional) – Number of iterations for the optimization (default is 10).

  • random_seed (int or None, optional) – Random seed for reproducibility (default is None).

Returns:

The best parameters and the corresponding function value.

Return type:

tuple

Notes

The optimization process starts with a random sample to initialize the Gaussian Process, then iteratively selects new points using the acquisition function to balance exploration and exploitation.

Examples

>>> def objective(x):
...     return -np.sin(3 * x) - x ** 2 + 0.7 * x
>>> optimizer = BayesianOptimization(objective, bounds=(0, 2))
>>> best_x, best_y = optimizer.optimize(n_iter=10, random_seed=42)
>>> print(f"Best X: {best_x}, Best Y: {best_y}")
propose_location(n_restarts=10)[source]

Propose the next sampling location by optimizing the acquisition function.

Parameters:

n_restarts (int, optional) – Number of restarts for the optimization to avoid local optima (default is 10).

Returns:

The proposed next sampling point.

Return type:

numpy.ndarray

class vitalDSP.advanced_computation.bayesian_analysis.GaussianProcess(length_scale=1.0, noise=1e-10)[source]

Bases: object

A class implementing Gaussian Process (GP) for Bayesian Optimization.

Gaussian Process models are used to predict the mean and variance of an unknown function based on observed data. This is particularly useful in Bayesian Optimization, where the GP helps in selecting the next sample point to evaluate.

predict(X)[source]

Predict the mean and variance of the objective function at new points X.

update(X_train, y_train)[source]

Update the GP model with new observations.

Example Usage
-------------
>>> gp = GaussianProcess(length_scale=1.0, noise=1e-10)
>>> X_train = np.array([[0.1], [0.4], [0.7]])
>>> y_train = np.sin(3 * X_train) - X_train ** 2 + 0.7 * X_train
>>> gp.update(X_train, y_train)
>>> X_new = np.array([[0.2], [0.5]])
>>> mean, variance = gp.predict(X_new)
>>> print("Predicted Mean:", mean)
>>> print("Predicted Variance:", variance)
predict(X)[source]

Predict the mean and variance of the objective function at new points.

Parameters:

X (numpy.ndarray) – The points at which to predict the mean and variance.

Returns:

The predicted mean and variance at the input points X.

Return type:

tuple

Raises:

ValueError – If the GP model has not been updated with any training data.

update(X_train, y_train)[source]

Update the GP model with new observations.

Parameters:
  • X_train (numpy.ndarray) – The observed input points.

  • y_train (numpy.ndarray) – The observed output values corresponding to X_train.

Empirical Mode Decomposition (EMD)

Decomposing signals into intrinsic mode functions using Empirical Mode Decomposition, useful in analyzing non-linear and non-stationary signals.

Empirical Mode Decomposition (EMD) Module for Physiological Signal Processing

This module provides comprehensive Empirical Mode Decomposition capabilities for physiological signals including ECG, PPG, EEG, and other vital signs. EMD is particularly effective for analyzing non-linear and non-stationary signals by decomposing them into Intrinsic Mode Functions (IMFs).

Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0

Key Features: - Empirical Mode Decomposition (EMD) implementation - Intrinsic Mode Functions (IMFs) extraction - Non-linear and non-stationary signal analysis - Customizable stop criteria and IMF limits - Signal decomposition and reconstruction - Advanced signal analysis capabilities

Examples:

Basic EMD decomposition:
>>> import numpy as np
>>> from vitalDSP.advanced_computation.emd import EMD
>>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100)
>>> emd = EMD(signal)
>>> imfs = emd.emd()
>>> print(f"Number of IMFs: {len(imfs)}")
Limited IMF decomposition:
>>> emd_limited = EMD(signal)
>>> imfs_limited = emd_limited.emd(max_imfs=3)
>>> print(f"Limited IMFs: {len(imfs_limited)}")
Custom stop criterion:
>>> emd_custom = EMD(signal)
>>> imfs_custom = emd_custom.emd(stop_criterion=0.01)
>>> print(f"Custom stop criterion IMFs: {len(imfs_custom)}")
Signal reconstruction:
>>> reconstructed = np.sum(imfs, axis=0)
>>> print(f"Reconstruction error: {np.mean((signal - reconstructed)**2):.6f}")
class vitalDSP.advanced_computation.emd.EMD(signal)[source]

Bases: object

Empirical Mode Decomposition (EMD) for decomposing non-linear and non-stationary signals into Intrinsic Mode Functions (IMFs).

EMD is particularly useful for analyzing signals that are non-linear and non-stationary, where traditional methods like Fourier Transform may not be effective.

emd : method

Performs the EMD on the input signal and returns the IMFs.

Examples

>>> import numpy as np
>>> from vitalDSP.advanced_computation.emd import EMD
>>>
>>> # Example 1: Basic EMD decomposition
>>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100)
>>> emd = EMD(signal)
>>> imfs = emd.emd()
>>> print(f"Number of IMFs: {len(imfs)}")
>>>
>>> # Example 2: EMD with limited number of IMFs
>>> emd_limited = EMD(signal)
>>> imfs_limited = emd_limited.emd(max_imfs=3)
>>> print(f"Limited IMFs: {len(imfs_limited)}")
>>>
>>> # Example 3: EMD with custom stop criterion
>>> emd_custom = EMD(signal)
>>> imfs_custom = emd_custom.emd(stop_criterion=0.01)
>>> print(f"Custom stop criterion IMFs: {len(imfs_custom)}")
emd(max_imfs=None, stop_criterion=0.05, max_sifting_iterations=20, max_decomposition_iterations=10)[source]

Perform Empirical Mode Decomposition (EMD) on the input signal.

Parameters:
  • max_imfs (int or None, optional) – Maximum number of IMFs to extract. If None, extract all possible IMFs (default is None).

  • stop_criterion (float, optional) – The stopping criterion for the sifting process, which controls how close the IMF is to an ideal IMF (default is 0.05).

  • max_sifting_iterations (int, optional) – Maximum number of sifting iterations per IMF to prevent infinite loops (default is 20).

  • max_decomposition_iterations (int, optional) – Maximum number of decomposition iterations to prevent excessive computation (default is 10).

Returns:

imfs – A list of IMFs (Intrinsic Mode Functions) extracted from the signal.

Return type:

list of numpy.ndarray

Notes

Each IMF represents a simple oscillatory mode embedded in the signal. The sum of all IMFs plus the final residual will reconstruct the original signal.

OPTIMIZATION: Added convergence limits to prevent infinite loops and improve reliability.

Examples

>>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100)
>>> emd = EMD(signal)
>>> imfs = emd.emd()
>>> print("IMFs:", imfs)

Generative Signal Synthesis

Methods for generating synthetic signals that mimic real-world biomedical signals, useful in training and testing algorithms.

Advanced Computation Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0

Key Features: - Object-oriented design with comprehensive classes - Multiple processing methods and functions - NumPy integration for numerical computations

Examples:

Basic usage:
>>> import numpy as np
>>> from vitalDSP.advanced_computation.generative_signal_synthesis import GenerativeSignalSynthesis
>>> signal = np.random.randn(1000)
>>> gen = GenerativeSignalSynthesis()
>>> signal = gen.generate()
class vitalDSP.advanced_computation.generative_signal_synthesis.GenerativeSignalSynthesis[source]

Bases: object

Generative Signal Synthesis for creating synthetic signal data using various methods.

This class provides methods to generate synthetic signals using techniques such as random noise, Gaussian processes, autoregressive (AR) models, Markov chains, and custom-defined functions.

generate(method="random_noise", length=100, \*\*kwargs) :

Generates synthetic signals using the specified method.

Example Usage
-------------
signal_synthesizer = GenerativeSignalSynthesis()
# Generate random noise
random_signal = signal_synthesizer.generate(method="random_noise", length=100)
print("Random Noise Signal:", random_signal)
# Generate Gaussian process
gp_signal = signal_synthesizer.generate(method="gaussian_process", length=100, mean=0, std_dev=1, correlation=0.9)
print("Gaussian Process Signal:", gp_signal)
# Generate AR model signal
ar_signal = signal_synthesizer.generate(method="autoregressive", length=100, coeffs=[0.9, -0.5])
print("AR Model Signal:", ar_signal)
# Generate Markov chain signal
markov_signal = signal_synthesizer.generate(method="markov_chain", length=100, states=[-1, 1], transition_matrix=[[0.9, 0.1], [0.1, 0.9]])
print("Markov Chain Signal:", markov_signal)
# Generate custom function signal
custom_signal = signal_synthesizer.generate(method="custom_function", length=100, func=lambda x: np.sin(x))
print("Custom Function Signal:", custom_signal)
generate(method='random_noise', length=100, **kwargs)[source]

Generate synthetic signals.

Parameters:
  • method (str, optional) – The method to use for generating the signal. Options include “random_noise”, “gaussian_process”, “autoregressive”, “markov_chain”, “custom_function”. Default is “random_noise”.

  • length (int, optional) – The length of the generated signal. Default is 100.

  • kwargs (dict) – Additional parameters depending on the generation method. These include: - mean (float): Mean of the noise or process (for “random_noise” and “gaussian_process”). - std_dev (float): Standard deviation (for “random_noise” and “gaussian_process”). - correlation (float): Correlation coefficient between successive points (for “gaussian_process”). - coeffs (list of float): Coefficients for the AR model (for “autoregressive”). - states (list): Possible states of the Markov chain (for “markov_chain”). - transition_matrix (list of list of float): State transition matrix (for “markov_chain”). - func (callable): Function to generate the signal (for “custom_function”).

Returns:

The generated synthetic signal.

Return type:

numpy.ndarray

Raises:

ValueError – If the specified generation method is unknown.

Harmonic/Percussive Separation

Techniques for separating harmonic and percussive components of signals, particularly useful in audio signal processing.

Advanced Computation Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0

Key Features: - Object-oriented design with comprehensive classes - Multiple processing methods and functions - NumPy integration for numerical computations

Examples:

Basic usage:
>>> import numpy as np
>>> from vitalDSP.advanced_computation.harmonic_percussive_separation import HarmonicPercussiveSeparation
>>> signal = np.random.randn(1000)
>>> hps = HarmonicPercussiveSeparation(signal)
>>> harmonic, percussive = hps.separate()
class vitalDSP.advanced_computation.harmonic_percussive_separation.HarmonicPercussiveSeparation(signal)[source]

Bases: object

Harmonic-Percussive Separation for separating vocal and noise components in respiratory analysis.

This class separates a signal into its harmonic (sustained, tonal) and percussive (transient, noisy) components, which can be particularly useful in respiratory analysis to distinguish between vocal and noise components.

separate : method

Separates the harmonic and percussive components of the signal using median filtering.

Example Usage
-------------
signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100)
hps = HarmonicPercussiveSeparation(signal)
harmonic, percussive = hps.separate()
print("Harmonic Component:", harmonic)
print("Percussive Component:", percussive)
separate(kernel_size=31)[source]

Separate the harmonic and percussive components of the signal using median filtering.

The harmonic component is obtained by applying a median filter across the time axis, while the percussive component is obtained by applying a median filter across the frequency axis.

Parameters:

kernel_size (int, optional) – Size of the median filter kernel (default is 31).

Returns:

  • harmonic (numpy.ndarray) – The harmonic component of the signal.

  • percussive (numpy.ndarray) – The percussive component of the signal.

Example

>>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100)
>>> hps = HarmonicPercussiveSeparation(signal)
>>> harmonic, percussive = hps.separate(kernel_size=31)
>>> print(harmonic)
>>> print(percussive)

Kalman Filter

Implementing Kalman Filters for optimal estimation of system states, widely used in tracking and predictive modeling.

Advanced Computation Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0

Key Features: - Object-oriented design with comprehensive classes - Multiple processing methods and functions - NumPy integration for numerical computations - Signal validation and error handling - Advanced filtering algorithms

Examples:

Basic usage:
>>> import numpy as np
>>> from vitalDSP.advanced_computation.kalman_filter import KalmanFilter
>>> signal = np.random.randn(1000)
>>> kf = KalmanFilter(signal)
>>> filtered = kf.filter()
class vitalDSP.advanced_computation.kalman_filter.KalmanFilter(initial_state, initial_covariance, process_covariance, measurement_covariance)[source]

Bases: object

A Kalman Filter for real-time filtering and continuous monitoring of signals.

The Kalman Filter is an optimal recursive data processing algorithm that estimates the state of a dynamic system from a series of noisy measurements. It is widely used in control systems, signal processing, and time series analysis.

Parameters:
  • initial_state (numpy.ndarray) – The initial state estimate of the system.

  • initial_covariance (numpy.ndarray) – The initial covariance estimate of the state.

  • process_covariance (numpy.ndarray) – The process noise covariance matrix, representing the uncertainty in the system dynamics.

  • measurement_covariance (numpy.ndarray) – The measurement noise covariance matrix, representing the uncertainty in the measurements.

filter(signal, measurement_matrix, transition_matrix, control_input=None, control_matrix=None)[source]

Apply the Kalman filter to the input signal to obtain a filtered estimate of the state.

Examples

>>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100)
>>> initial_state = np.array([0])
>>> initial_covariance = np.array([[1]])
>>> process_covariance = np.array([[1e-5]])
>>> measurement_covariance = np.array([[1e-1]])
>>> measurement_matrix = np.array([[1]])
>>> transition_matrix = np.array([[1]])
>>> kalman_filter = KalmanFilter(initial_state, initial_covariance, process_covariance, measurement_covariance)
>>> filtered_signal = kalman_filter.filter(signal, measurement_matrix, transition_matrix)
>>> print("Filtered Signal:", filtered_signal)
filter(signal, measurement_matrix, transition_matrix, control_input=None, control_matrix=None)[source]

Apply the Kalman filter to the input signal.

The Kalman filter recursively estimates the state of a dynamic system from noisy measurements. At each time step, it predicts the next state, updates the estimate based on the actual measurement, and provides a filtered signal as output.

Parameters:
  • signal (numpy.ndarray) – The input signal to be filtered.

  • measurement_matrix (numpy.ndarray) – The measurement matrix that maps the true state space into the observed space.

  • transition_matrix (numpy.ndarray) – The state transition matrix that describes how the state evolves from one time step to the next.

  • control_input (numpy.ndarray or None, optional) – An optional control input to the system (default is None).

  • control_matrix (numpy.ndarray or None, optional) – An optional control matrix that maps the control input to the state space (default is None).

Returns:

The filtered signal, where each element corresponds to the estimated state at each time step.

Return type:

numpy.ndarray

Examples

>>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100)
>>> measurement_matrix = np.array([[1]])
>>> transition_matrix = np.array([[1]])
>>> kalman_filter = KalmanFilter(initial_state, initial_covariance, process_covariance, measurement_covariance)
>>> filtered_signal = kalman_filter.filter(signal, measurement_matrix, transition_matrix)
>>> print(filtered_signal)

Multimodal Fusion

Combining information from multiple signal sources to improve the accuracy and robustness of signal analysis.

Advanced Computation Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0

Key Features: - Object-oriented design with comprehensive classes - Multiple processing methods and functions - NumPy integration for numerical computations

Examples:

Basic usage:
>>> import numpy as np
>>> from vitalDSP.advanced_computation.multimodal_fusion import MultimodalFusion
>>> signal = np.random.randn(1000)
>>> mf = MultimodalFusion(signal)
>>> fused = mf.fuse()
class vitalDSP.advanced_computation.multimodal_fusion.MultimodalFusion(signals)[source]

Bases: object

Multimodal Fusion class for combining multiple signals to improve health assessments.

The class supports various fusion strategies, including weighted sum, concatenation, PCA-based fusion, maximum, and minimum.

Example Usage

signal1 = np.sin(np.linspace(0, 10, 100)) signal2 = np.cos(np.linspace(0, 10, 100)) fusion = MultimodalFusion([signal1, signal2])

# Weighted Sum Fusion fused_signal_weighted = fusion.fuse(strategy=”weighted_sum”, weights=[0.6, 0.4]) print(“Fused Signal (Weighted Sum):”, fused_signal_weighted)

# Concatenation Fusion fused_signal_concat = fusion.fuse(strategy=”concatenation”) print(“Fused Signal (Concatenation):”, fused_signal_concat)

# PCA-based Fusion fused_signal_pca = fusion.fuse(strategy=”pca”, n_components=1) print(“Fused Signal (PCA):”, fused_signal_pca)

# Maximum Fusion fused_signal_max = fusion.fuse(strategy=”maximum”) print(“Fused Signal (Maximum):”, fused_signal_max)

# Minimum Fusion fused_signal_min = fusion.fuse(strategy=”minimum”) print(“Fused Signal (Minimum):”, fused_signal_min)

fuse(strategy='weighted_sum', **kwargs)[source]

Fuse multiple signals using the specified strategy.

Parameters:
  • strategy (str, optional) – The fusion strategy to use. Options include “weighted_sum”, “concatenation”, “pca”, “maximum”, “minimum” (default is “weighted_sum”).

  • kwargs (dict, optional) – Additional arguments depending on the fusion strategy. - weights (list of float): Weights for the weighted sum strategy (required if strategy=”weighted_sum”). - n_components (int): Number of principal components to keep for PCA (optional, default=1).

Returns:

The fused signal based on the chosen strategy.

Return type:

numpy.ndarray

Raises:

ValueError – If an unknown fusion strategy is specified or if required arguments are missing.

Examples

>>> signal1 = np.sin(np.linspace(0, 10, 100))
>>> signal2 = np.cos(np.linspace(0, 10, 100))
>>> fusion = MultimodalFusion([signal1, signal2])
>>> fused_signal = fusion.fuse(strategy="weighted_sum", weights=[0.6, 0.4])
>>> print(fused_signal)

Non-Linear Analysis

Analyzing signals using non-linear methods, essential for understanding complex systems where traditional linear methods fall short.

Advanced Computation Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0

Key Features: - Object-oriented design with comprehensive classes - Multiple processing methods and functions - NumPy integration for numerical computations - Interactive visualization capabilities - Comprehensive signal analysis

Examples:

Basic usage:
>>> import numpy as np
>>> from vitalDSP.advanced_computation.non_linear_analysis import NonlinearAnalysis
>>> signal = np.random.randn(1000)
>>> nla = NonlinearAnalysis(signal)
>>> le = nla.lyapunov_exponent()
class vitalDSP.advanced_computation.non_linear_analysis.NonlinearAnalysis(signal)[source]

Bases: object

Nonlinear Analysis for examining chaotic signals, such as Heart Rate Variability (HRV).

This class provides methods for analyzing the chaotic behavior of signals using techniques like the estimation of Lyapunov exponents, generation of Poincaré plots, and calculation of correlation dimensions.

lyapunov_exponent(max_iter=1000, epsilon=1e-8)[source]

Estimates the largest Lyapunov exponent to assess chaos in the signal.

poincare_plot()[source]

Generates a Poincaré plot to visualize the dynamics of the signal.

correlation_dimension(radius=0.1)[source]

Estimates the correlation dimension of the signal.

Example Usage
-------------
>>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100)
>>> nonlinear_analysis = NonlinearAnalysis(signal)
>>> lyapunov = nonlinear_analysis.lyapunov_exponent()
>>> print("Lyapunov Exponent:", lyapunov)
>>> nonlinear_analysis.poincare_plot()
>>> correlation_dim = nonlinear_analysis.correlation_dimension()
>>> print("Correlation Dimension:", correlation_dim)
correlation_dimension(radius=0.1, normalize=True)[source]

Estimate the correlation dimension of the signal using the Grassberger-Procaccia method.

The correlation dimension is a measure of the fractal dimension of the attractor in the phase space of the signal. It provides insight into the complexity of the signal.

IMPORTANT: Signal normalization is highly recommended for physiological signals to ensure the radius parameter is appropriate for the signal’s scale.

Parameters:
  • radius (float, optional) – Radius within which points are considered neighbors (default is 0.1). For normalized signals, typical values are 0.1-2.0. For raw signals, choose radius based on signal scale.

  • normalize (bool, optional) – If True, automatically normalize the signal before computation (default is True). Normalization ensures radius is appropriate regardless of signal amplitude.

Returns:

The estimated correlation dimension.

Return type:

float

Raises:
  • ValueError – If radius is 1.0 (causes divide-by-zero), or if no point pairs found within radius.

  • Warning – If correlation dimension is negative (suggests inappropriate radius selection).

Examples

>>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100)
>>> nonlinear_analysis = NonlinearAnalysis(signal)
>>> correlation_dim = nonlinear_analysis.correlation_dimension(radius=0.5)
>>> print("Correlation Dimension:", correlation_dim)

Notes

For physiological signals (ECG, PPG): 1. Always normalize the signal (normalize=True, default) 2. Use radius in range [0.1, 2.0] for normalized signals 3. Avoid radius = 1.0 exactly (causes log(1.0) = 0) 4. If negative result, try different radius or check signal quality

The correlation dimension should typically be between 0 and the embedding dimension. Values outside this range suggest inappropriate parameters or unsuitable signal.

lyapunov_exponent(max_iter=1000, epsilon=1e-08, normalize=True)[source]

Estimate the largest Lyapunov exponent of the signal.

The Lyapunov exponent is a measure of the rate of separation of infinitesimally close trajectories in a chaotic system. A positive Lyapunov exponent indicates chaos.

IMPORTANT: For reliable results with physiological signals (ECG, PPG), signal normalization is highly recommended to avoid numerical instabilities.

Parameters:
  • max_iter (int, optional) – Maximum number of iterations to compute the exponent (default is 1000).

  • epsilon (float, optional) – Small perturbation used to calculate divergence, to avoid division by zero (default is 1e-8).

  • normalize (bool, optional) – If True, automatically normalize the signal before computation (default is True). Normalization prevents divide-by-zero errors with signals containing flat segments.

Returns:

The estimated largest Lyapunov exponent.

Return type:

float

Raises:
  • ValueError – If signal is too short for the specified max_iter.

  • Warning – If computation results in NaN or inf values.

Examples

>>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100)
>>> nonlinear_analysis = NonlinearAnalysis(signal)
>>> lyapunov = nonlinear_analysis.lyapunov_exponent()
>>> print("Lyapunov Exponent:", lyapunov)

Notes

Uses the Rosenstein et al. (1993) algorithm: delay-embeds the signal into a dim-dimensional phase space, finds the nearest neighbor for each reference point (with Theiler window to exclude temporal neighbors), tracks trajectory divergence, and fits the slope of the mean log-divergence curve.

poincare_plot()[source]

Generate a Poincaré plot to visualize the dynamics of the signal.

The Poincaré plot is a scatter plot of the signal against its delayed version. It is often used to visualize periodic and chaotic dynamics in the signal.

Returns:

The generated Poincaré plot.

Return type:

matplotlib.figure.Figure

Examples

>>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100)
>>> nonlinear_analysis = NonlinearAnalysis(signal)
>>> nonlinear_analysis.poincare_plot()

Pitch Shift

Techniques for shifting the pitch of audio signals, often used in music signal processing and speech modulation.

Advanced Computation Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0

Key Features: - Object-oriented design with comprehensive classes - Multiple processing methods and functions - NumPy integration for numerical computations

Examples:

Basic usage:
>>> import numpy as np
>>> from vitalDSP.advanced_computation.pitch_shift import PitchShift
>>> signal = np.random.randn(1000)
>>> ps = PitchShift(signal, fs=256)
>>> shifted = ps.shift_pitch(semitones=2)
class vitalDSP.advanced_computation.pitch_shift.PitchShift(signal, sampling_rate=1000)[source]

Bases: object

A class for pitch shifting and pitch detection, primarily used in the analysis of vocal pitch. This is particularly useful in diagnosing speech disorders and other vocal characteristics.

shift_pitch : method

Shifts the pitch of the input signal by a specified number of semitones.

detect_pitch : method

Detects the pitch of the input signal using the autocorrelation method.

Example Usage
-------------
signal = np.sin(2 * np.pi * 440 * np.linspace(0, 1, 1000))  # A simple sine wave at 440 Hz (A4)
pitch_shift = PitchShift(signal, sampling_rate=1000)
shifted_signal = pitch_shift.shift_pitch(semitones=2)
print("Shifted Signal:", shifted_signal)
detected_pitch = pitch_shift.detect_pitch()
print("Detected Pitch:", detected_pitch, "Hz")
detect_pitch()[source]

Detect the pitch of the input signal using the autocorrelation method.

The autocorrelation method is commonly used to estimate the fundamental frequency (pitch) of a signal. This method is particularly effective for monophonic signals like speech or musical notes.

Returns:

pitch – The detected pitch of the signal in Hertz (Hz).

Return type:

float

Examples

>>> signal = np.sin(2 * np.pi * 440 * np.linspace(0, 1, 1000))  # 440 Hz signal
>>> pitch_shift = PitchShift(signal, sampling_rate=1000)
>>> detected_pitch = pitch_shift.detect_pitch()
>>> print("Detected Pitch:", detected_pitch, "Hz")
shift_pitch(semitones=1)[source]

Shift the pitch of the input signal by a given number of semitones.

This method shifts the pitch of the signal without altering its duration, which is useful for modifying vocal pitch in a controlled manner.

Parameters:

semitones (int, optional) – The number of semitones to shift the pitch. A positive value raises the pitch, while a negative value lowers it. Default is 1 semitone.

Returns:

shifted_signal – The pitch-shifted signal.

Return type:

numpy.ndarray

Examples

>>> signal = np.sin(2 * np.pi * 440 * np.linspace(0, 1, 1000))  # 440 Hz signal
>>> pitch_shift = PitchShift(signal, sampling_rate=1000)
>>> shifted_signal = pitch_shift.shift_pitch(semitones=3)
>>> print("Shifted Signal:", shifted_signal)

Sparse Signal Processing

Techniques focused on processing and analyzing sparse signals, which are common in compressed sensing and other applications.

Advanced Computation Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0

Key Features: - Object-oriented design with comprehensive classes - Multiple processing methods and functions - NumPy integration for numerical computations

Examples:

Basic usage:
>>> import numpy as np
>>> from vitalDSP.advanced_computation.sparse_signal_processing import SparseSignalProcessing
>>> signal = np.random.randn(1000)
>>> ssp = SparseSignalProcessing(signal)
>>> coeffs = ssp.sparse_representation()
class vitalDSP.advanced_computation.sparse_signal_processing.SparseSignalProcessing(signal)[source]

Bases: object

Sparse Signal Processing for efficient representation and processing of signals.

This class provides methods for transforming a signal into a sparse domain, applying thresholding to remove noise, and reconstructing the signal from its sparse representation.

sparse_representation : method

Represents the signal using a specified sparse basis (e.g., wavelets, DCT, FFT).

thresholding : method

Applies a threshold to the sparse representation to denoise the signal.

reconstruction : method

Reconstructs the signal from its sparse representation.

Example Usage
-------------
signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100)
sparse_processing = SparseSignalProcessing(signal)
sparse_rep = sparse_processing.sparse_representation(np.fft.fft)
thresholded_sparse = sparse_processing.thresholding(sparse_rep, threshold=0.1)
reconstructed_signal = sparse_processing.reconstruction(thresholded_sparse, np.fft.ifft)
print("Reconstructed Signal:", reconstructed_signal)
reconstruction(sparse_rep, inverse_basis)[source]

Reconstruct the signal from its sparse representation.

This method applies the inverse transform to the sparse representation to reconstruct the signal in its original domain.

Parameters:
  • sparse_rep (numpy.ndarray) – The sparse representation of the signal.

  • inverse_basis (callable) – A function or method to inverse-transform the signal from the sparse domain (e.g., np.fft.ifft for inverse FFT, or wavelet_transform.perform_inverse_wavelet_transform for inverse wavelet transform).

Returns:

The reconstructed signal.

Return type:

numpy.ndarray

Examples

>>> reconstructed_signal = sparse_processing.reconstruction(thresholded_sparse, np.fft.ifft)
>>> print(reconstructed_signal)
sparse_representation(basis)[source]

Represent the signal using a sparse basis.

This method transforms the signal into a sparse domain, such as using wavelets, DCT (Discrete Cosine Transform), or FFT (Fast Fourier Transform), allowing for efficient processing and manipulation.

Parameters:

basis (callable) – A function or method to transform the signal to the sparse domain (e.g., np.fft.fft for FFT, or wavelet_transform.perform_wavelet_transform for wavelet transform).

Returns:

The sparse representation of the signal.

Return type:

numpy.ndarray

Examples

>>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100)
>>> sparse_processing = SparseSignalProcessing(signal)
>>> sparse_rep = sparse_processing.sparse_representation(np.fft.fft)
>>> print(sparse_rep)
thresholding(sparse_rep, threshold)[source]

Apply a threshold to the sparse representation to denoise the signal.

This method sets values in the sparse representation that are below the threshold to zero, effectively removing small coefficients that may correspond to noise.

Parameters:
  • sparse_rep (numpy.ndarray) – The sparse representation of the signal.

  • threshold (float) – The threshold value. Coefficients with absolute values below this threshold will be set to zero.

Returns:

The thresholded sparse representation.

Return type:

numpy.ndarray

Examples

>>> thresholded_sparse = sparse_processing.thresholding(sparse_rep, threshold=0.1)
>>> print(thresholded_sparse)