Advanced Computation
This section covers the advanced computational techniques provided by the VitalDSP library. These methods are designed to handle complex signal processing tasks, including machine learning, anomaly detection, Bayesian analysis, and advanced signal processing algorithms.
Overview
The advanced computation module provides cutting-edge algorithms for:
Machine Learning: Neural networks, reinforcement learning, and adaptive filtering
Anomaly Detection: Real-time and batch anomaly detection algorithms
Bayesian Analysis: Probabilistic modeling and optimization
Signal Decomposition: EMD, harmonic separation, and sparse processing
Nonlinear Analysis: Complex system analysis and chaos theory
Multimodal Fusion: Multi-signal integration and analysis
Machine Learning and AI
Neural Network Filtering
Advanced filtering techniques leveraging neural networks to process and enhance biomedical signals.
Advanced Computation Module for Physiological Signal Processing
This module provides comprehensive capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.
Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0
Key Features: - Object-oriented design with comprehensive classes - Multiple processing methods and functions - NumPy integration for numerical computations - Configurable parameters and settings - Advanced filtering algorithms
Examples:
- Basic usage:
>>> import numpy as np >>> from vitalDSP.advanced_computation.neural_network_filtering import NeuralNetworkFiltering >>> signal = np.random.randn(1000) >>> nn = NeuralNetworkFiltering(input_size=100) >>> filtered = nn.apply_filter(signal)
- class vitalDSP.advanced_computation.neural_network_filtering.ConvolutionalNetwork(dropout_rate, batch_norm)[source]
Bases:
object- predict(X)[source]
- train(X, y, learning_rate, epochs, batch_size)[source]
- class vitalDSP.advanced_computation.neural_network_filtering.FeedforwardNetwork(hidden_layers, dropout_rate, batch_norm)[source]
Bases:
object- predict(X)[source]
- train(X, y, learning_rate, epochs, batch_size)[source]
- class vitalDSP.advanced_computation.neural_network_filtering.NeuralNetworkFiltering(signal, network_type='feedforward', hidden_layers=[64, 64], learning_rate=0.001, epochs=100, batch_size=32, dropout_rate=0.5, batch_norm=True, recurrent_type='lstm')[source]
Bases:
objectA comprehensive neural network-based filtering approach for adaptive signal processing.
This class supports various neural network architectures, including feedforward, convolutional, and recurrent networks. It includes advanced features such as dropout for regularization, batch normalization for faster convergence, and customizable training options.
- train : method
Trains the neural network on the given signal.
- apply_filter : method
Applies the trained neural network to filter the signal.
- evaluate : method
Evaluates the performance of the neural network on a test signal.
- Example Usage
- -------------
- signal = np.sin(np.linspace(0, 10, 100)) + np.random.normal(0, 0.1, 100)
- nn_filter = NeuralNetworkFiltering(signal, network_type='feedforward', hidden_layers=[64, 64], epochs=100)
- # Train the neural network filter
- nn_filter.train()
- # Apply the trained filter
- filtered_signal = nn_filter.apply_filter()
- print("Filtered Signal:", filtered_signal)
- # Evaluate the filter on a test signal
- test_signal = np.sin(np.linspace(0, 10, 100)) + np.random.normal(0, 0.1, 100)
- mse = nn_filter.evaluate(test_signal)
- print("Mean Squared Error on Test Signal:", mse)
- apply_filter()[source]
Apply the trained neural network to filter the signal.
The trained neural network is used to predict and filter the input signal.
- Returns:
The filtered signal after applying the neural network.
- Return type:
- evaluate(test_signal)[source]
Evaluate the performance of the neural network on a test signal.
The method calculates the Mean Squared Error (MSE) between the predicted and actual values of the test signal.
- Parameters:
test_signal (numpy.ndarray) – The test signal to evaluate the neural network’s performance.
- Returns:
The mean squared error (MSE) of the neural network on the test signal.
- Return type:
- train()[source]
Train the neural network on the given signal.
The signal is divided into input-output pairs for supervised training. This method prepares the data, then trains the network using the specified learning rate, number of epochs, and batch size.
Reinforcement Learning Filter
Filtering methods that use reinforcement learning to adaptively improve signal quality based on feedback loops.
Advanced Computation Module for Physiological Signal Processing
This module provides comprehensive capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.
Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0
Key Features: - Object-oriented design with comprehensive classes - Multiple processing methods and functions - NumPy integration for numerical computations - Performance optimization - Advanced filtering algorithms
Examples:
- Basic usage:
>>> import numpy as np >>> from vitalDSP.advanced_computation.reinforcement_learning_filter import ReinforcementLearningFilter >>> signal = np.random.randn(1000) >>> rlf = ReinforcementLearningFilter(signal) >>> filtered = rlf.apply_filter(signal)
- class vitalDSP.advanced_computation.reinforcement_learning_filter.ReinforcementLearningFilter(signal, action_space, state_space=None)[source]
Bases:
objectA comprehensive Reinforcement Learning Filter for adaptive signal processing. This class provides methods to train filters using reinforcement learning algorithms such as Q-learning, Deep Q-Networks (DQN), and Proximal Policy Optimization (PPO), and to apply these trained filters to signals.
- train_q_learning(episodes=1000, alpha=0.1, gamma=0.99, epsilon=0.1)[source]
Trains a filter using Q-learning.
- train_dqn(episodes=1000, batch_size=32, gamma=0.99, epsilon=0.1, target_update=10)[source]
Trains a filter using Deep Q-Networks (DQN).
- train_ppo(epochs=1000, gamma=0.99, lambda_=0.95, clip_ratio=0.2)[source]
Trains a filter using Proximal Policy Optimization (PPO).
- apply_filter()[source]
Applies the trained filter to the signal.
- Example Usage
- -------------
- signal = np.sin(np.linspace(0, 10, 100)) + np.random.normal(0, 0.1, 100)
- rl_filter = ReinforcementLearningFilter(signal, action_space=[-1, 0, 1])
- # Train Q-learning based filter
- rl_filter.train_q_learning(episodes=500, alpha=0.1, gamma=0.99, epsilon=0.1)
- # Apply the trained filter
- filtered_signal = rl_filter.apply_filter()
- print("Filtered Signal (Q-Learning):", filtered_signal)
- # Train DQN-based filter
- rl_filter.train_dqn(episodes=500, batch_size=32, gamma=0.99, epsilon=0.1, target_update=10)
- # Apply the trained filter
- filtered_signal_dqn = rl_filter.apply_filter()
- print("Filtered Signal (DQN):", filtered_signal_dqn)
- # Train PPO-based filter
- rl_filter.train_ppo(epochs=500, gamma=0.99, lambda_=0.95, clip_ratio=0.2)
- # Apply the trained filter
- filtered_signal_ppo = rl_filter.apply_filter()
- print("Filtered Signal (PPO):", filtered_signal_ppo)
- apply_filter()[source]
Apply the trained filter to the signal using the learned policy or Q-values.
- Returns:
The filtered signal.
- Return type:
- train_dqn(episodes=1000, batch_size=32, gamma=0.99, epsilon=0.1, target_update=10)[source]
Train the filter using Deep Q-Networks (DQN).
DQN approximates the Q-function using a neural network and uses experience replay to stabilize learning.
- Parameters:
episodes (int, optional) – The number of training episodes (default is 1000).
batch_size (int, optional) – The batch size for experience replay (default is 32).
gamma (float, optional) – The discount factor (default is 0.99).
epsilon (float, optional) – The exploration rate for epsilon-greedy policy (default is 0.1).
target_update (int, optional) – The frequency of updating the target network (default is 10 episodes).
- train_ppo(epochs=1000, gamma=0.99, lambda_=0.95, clip_ratio=0.2)[source]
Train the filter using Proximal Policy Optimization (PPO).
PPO is a policy gradient method that seeks to optimize policies while ensuring the updates don’t deviate too much from the previous policy.
- Parameters:
- train_q_learning(episodes=1000, alpha=0.1, gamma=0.99, epsilon=0.1)[source]
Train the filter using Q-learning.
Q-learning updates a Q-table where each state-action pair has a value. The algorithm iterates over episodes and updates the Q-values based on the reward received for each action taken.
- Parameters:
- class vitalDSP.advanced_computation.reinforcement_learning_filter.SimpleNeuralNetwork(input_size, output_size)[source]
Bases:
object- get_weights()[source]
- predict(state)[source]
- set_weights(weights)[source]
- train(state, action, target)[source]
- class vitalDSP.advanced_computation.reinforcement_learning_filter.SimplePolicyNetwork(input_size, output_size)[source]
Bases:
object- predict(state)[source]
- sample_action(state)[source]
- train(state, action, advantage)[source]
- update(state, action, objective)[source]
- class vitalDSP.advanced_computation.reinforcement_learning_filter.SimpleValueNetwork(input_size)[source]
Bases:
object- predict(state)[source]
Predict the value of a given state.
- Parameters:
state (numpy.ndarray) – The input state vector.
- Returns:
The predicted value of the state.
- Return type:
- train(state, reward)[source]
Train the value network by updating weights based on the reward.
- Parameters:
state (numpy.ndarray) – The input state vector.
reward (float) – The reward value used for training.
Anomaly Detection
Anomaly Detection
Techniques for identifying anomalies in biomedical signals, crucial for detecting irregularities and potential issues in real-time signal monitoring.
Anomaly Detection Module for Physiological Signal Processing
This module provides comprehensive anomaly detection capabilities for physiological signals including ECG, PPG, EEG, and other vital signs. It implements multiple detection methods including statistical approaches, machine learning techniques, and frequency-domain analysis for identifying unusual patterns and artifacts.
Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0
Key Features: - Statistical anomaly detection (Z-score, IQR) - Moving average and rolling window methods - Local Outlier Factor (LOF) for density-based detection - Fourier-based frequency domain analysis - Real-time streaming anomaly detection - Performance monitoring and optimization
Examples:
- Basic Z-score anomaly detection:
>>> import numpy as np >>> from vitalDSP.advanced_computation.anomaly_detection import AnomalyDetection >>> signal = np.sin(np.linspace(0, 10, 100)) + np.random.normal(0, 0.1, 100) >>> anomaly_detector = AnomalyDetection(signal) >>> anomalies = anomaly_detector.detect_anomalies(method="z_score", threshold=2.0) >>> print(f"Detected {len(anomalies)} anomalies")
- Moving average detection:
>>> anomalies_ma = anomaly_detector.detect_anomalies(method="moving_average", window_size=5, threshold=0.5) >>> print(f"Moving average anomalies: {len(anomalies_ma)}")
- LOF-based detection:
>>> anomalies_lof = anomaly_detector.detect_anomalies(method="lof", n_neighbors=20) >>> print(f"LOF anomalies: {len(anomalies_lof)}")
- FFT-based detection:
>>> anomalies_fft = anomaly_detector.detect_anomalies(method="fft", threshold=0.1) >>> print(f"FFT anomalies: {len(anomalies_fft)}")
- class vitalDSP.advanced_computation.anomaly_detection.AnomalyDetection(signal)[source]
Bases:
objectComprehensive Anomaly Detection for detecting anomalies in real-time from streaming data.
This class offers multiple methods to detect anomalies in a given signal, including statistical methods, moving averages, Local Outlier Factor (LOF), and Fourier-based methods.
- detect_anomalies : function
Detects anomalies using various methods including z-score, moving average, custom LOF, and more.
Examples
>>> import numpy as np >>> from vitalDSP.advanced_computation.anomaly_detection import AnomalyDetection >>> >>> # Example 1: Z-score anomaly detection >>> signal = np.sin(np.linspace(0, 10, 100)) + np.random.normal(0, 0.1, 100) >>> anomaly_detector = AnomalyDetection(signal) >>> anomalies_z_score = anomaly_detector.detect_anomalies(method="z_score", threshold=2.0) >>> print(f"Z-score anomalies: {len(anomalies_z_score)}") >>> >>> # Example 2: Moving average anomaly detection >>> anomalies_moving_avg = anomaly_detector.detect_anomalies(method="moving_average", window_size=5, threshold=0.5) >>> print(f"Moving average anomalies: {len(anomalies_moving_avg)}") >>> >>> # Example 3: LOF anomaly detection >>> anomalies_lof = anomaly_detector.detect_anomalies(method="lof", n_neighbors=20) >>> print(f"LOF anomalies: {len(anomalies_lof)}") >>> >>> # Example 4: FFT-based anomaly detection >>> anomalies_fft = anomaly_detector.detect_anomalies(method="fft", threshold=0.1) >>> print(f"FFT anomalies: {len(anomalies_fft)}")
- detect_anomalies(method='z_score', **kwargs)[source]
Detect anomalies in the signal using the specified method.
- Parameters:
method (str, optional) – The method to use for detecting anomalies. Options: ‘z_score’, ‘moving_average’, ‘lof’, ‘fft’, ‘threshold’. Default is ‘z_score’.
**kwargs (additional arguments) – Additional parameters depending on the chosen method.
- Returns:
Indices of the detected anomalies.
- Return type:
- Raises:
ValueError – If the specified method is unknown.
Examples
>>> anomalies_z_score = anomaly_detector.detect_anomalies(method="z_score", threshold=2.0) >>> anomalies_moving_avg = anomaly_detector.detect_anomalies(method="moving_average", window_size=5, threshold=0.5)
Real-Time Anomaly Detection
Real-time methods for detecting anomalies in streaming biomedical signals, crucial for monitoring and alert systems.
Advanced Computation Module for Physiological Signal Processing
This module provides comprehensive capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.
Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0
Key Features: - Object-oriented design with comprehensive classes - Multiple processing methods and functions - NumPy integration for numerical computations - Pattern and anomaly detection
Examples:
- Basic usage:
>>> import numpy as np >>> from vitalDSP.advanced_computation.real_time_anomaly_detection import RealTimeAnomalyDetection >>> signal = np.random.randn(1000) >>> rtad = RealTimeAnomalyDetection(signal) >>> anomalies = rtad.detect_statistical()
- class vitalDSP.advanced_computation.real_time_anomaly_detection.RealTimeAnomalyDetection(window_size=10)[source]
Bases:
objectComprehensive Real-Time Anomaly Detection for detecting anomalies in streaming data.
This class supports multiple anomaly detection techniques including statistical methods, machine learning models, and deep learning models. It is designed for use in real-time environments with online learning capabilities.
- detect_statistical : method
Detects anomalies using statistical methods like Z-score and moving average.
- detect_knn : method
Detects anomalies using k-Nearest Neighbors (k-NN).
- detect_svm : method
Detects anomalies using Support Vector Machine (SVM).
- detect_autoencoder : method
Detects anomalies using Autoencoders.
- detect_lstm : method
Detects anomalies using LSTM-based models.
- detect_wavelet : method
Detects anomalies using wavelet transforms.
- update_model : method
Updates the model with new data for online learning.
- evaluate : method
Evaluates the performance of the anomaly detection method on a test dataset.
- Example Usage
- -------------
- data_stream = np.sin(np.linspace(0, 10, 100)) + np.random.normal(0, 0.1, 100)
- anomaly_detector = RealTimeAnomalyDetection(window_size=10)
- # Detect anomalies using Z-score
- for data_point in data_stream:
anomaly = anomaly_detector.detect_statistical(data_point, method=’z_score’, threshold=2.0) print(“Anomaly (Z-Score):”, anomaly)
- # Train and detect anomalies using k-NN
- anomaly_detector.train_knn(data_stream[:50])
- for data_point in data_stream[50:]:
anomaly = anomaly_detector.detect_knn(data_point) print(“Anomaly (k-NN):”, anomaly)
- detect_autoencoder(data_point, threshold=0.1)[source]
Detect anomalies using the Autoencoder method.
- detect_knn(data_point)[source]
Detect anomalies using the k-Nearest Neighbors (k-NN) method.
- detect_lstm(data_point, threshold=0.1)[source]
Detect anomalies using the LSTM-based model.
- detect_statistical(data_point, method='z_score', threshold=2.0, **kwargs)[source]
Detect anomalies using statistical methods like Z-score, moving average, etc.
- Parameters:
- Returns:
True if the data point is an anomaly, False otherwise.
- Return type:
- detect_svm(data_point)[source]
Detect anomalies using the Support Vector Machine (SVM) method.
- detect_wavelet(data_point, wavelet_name='haar', level=1, threshold=0.1)[source]
Detect anomalies using Wavelet Transform.
- Parameters:
data_point (float) – The new data point to be analyzed.
wavelet_name (str, optional) – The name of the wavelet to use for the transform (default is ‘haar’).
level (int, optional) – The number of decomposition levels in the wavelet transform (default is 1).
threshold (float, optional) – The threshold for detecting anomalies in the wavelet coefficients (default is 0.1).
- Returns:
True if the data point is an anomaly, False otherwise.
- Return type:
- evaluate(test_data, model_type='knn')[source]
Evaluate the performance of the anomaly detection method on a test dataset.
- Parameters:
test_data (numpy.ndarray) – The test dataset.
model_type (str, optional) – The type of model to evaluate (‘knn’, ‘svm’, ‘autoencoder’, ‘lstm’). Default is ‘knn’.
- Returns:
The accuracy of the anomaly detection method on the test dataset.
- Return type:
- train_autoencoder(training_data, encoding_dim=3)[source]
Train an Autoencoder model on the training data.
- Parameters:
training_data (numpy.ndarray) – The training dataset.
encoding_dim (int, optional) – The dimension of the encoding layer. Default is 3.
- Return type:
None
- train_knn(training_data, k=5)[source]
Train a k-Nearest Neighbors (k-NN) model on the training data.
- Parameters:
training_data (numpy.ndarray) – The training dataset.
k (int, optional) – The number of nearest neighbors to consider. Default is 5.
- Return type:
None
- train_lstm(training_data, hidden_units=50)[source]
Train an LSTM-based model on the training data.
- Parameters:
training_data (numpy.ndarray) – The training dataset.
hidden_units (int, optional) – The number of hidden units in the LSTM. Default is 50.
- Return type:
None
- train_svm(training_data, kernel='rbf')[source]
Train a Support Vector Machine (SVM) model on the training data.
- Parameters:
training_data (numpy.ndarray) – The training dataset.
kernel (str, optional) – The kernel type for SVM (‘linear’, ‘poly’, ‘rbf’). Default is ‘rbf’.
- Return type:
None
- update_model(data_point, model_type='knn')[source]
Update the model with new data for online learning.
- class vitalDSP.advanced_computation.real_time_anomaly_detection.SimpleAutoencoder(training_data, encoding_dim=3)[source]
Bases:
object- reconstruction_error(data_point)[source]
Compute the reconstruction error for a given data point.
- Parameters:
data_point (float or np.array) – The data point to compute the reconstruction error for.
- Returns:
The mean squared reconstruction error.
- Return type:
Example
>>> error = model.reconstruction_error(np.array([1, 2, 3]))
Bayesian Analysis
Bayesian Analysis
Bayesian methods applied to signal processing, offering a probabilistic approach to model and interpret signal data.
Advanced Computation Module for Physiological Signal Processing
This module provides Bayesian inference and optimization capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.
Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.1
Key Features: - Gaussian Process modeling for signal prediction - Bayesian Optimization for hyperparameter tuning - Expected Improvement acquisition function - Numerical stability improvements - Graceful error handling
Classes:
- GaussianProcess
Implements Gaussian Process regression for probabilistic modeling.
- BayesianOptimization
Implements Bayesian Optimization using Gaussian Processes.
Examples:
Example 1: Gaussian Process Prediction
>>> import numpy as np
>>> from vitalDSP.advanced_computation.bayesian_analysis import GaussianProcess
>>>
>>> # Create GP model
>>> gp = GaussianProcess(length_scale=1.0, noise=1e-5)
>>>
>>> # Training data
>>> X_train = np.array([[0.1], [0.4], [0.7]])
>>> y_train = np.sin(3 * X_train.flatten())
>>> gp.update(X_train, y_train)
>>>
>>> # Predict at new points
>>> X_new = np.array([[0.2], [0.5]])
>>> mean, variance = gp.predict(X_new)
>>> print(f"Predicted mean: {mean}")
>>> print(f"Predicted variance: {variance}")
Example 2: Bayesian Optimization for Hyperparameter Tuning
>>> from vitalDSP.advanced_computation.bayesian_analysis import BayesianOptimization
>>>
>>> # Define objective function to maximize
>>> def filter_performance(cutoff_freq):
... # Simulate filter performance metric
... cutoff = np.atleast_1d(cutoff_freq)[0]
... return -(cutoff - 0.5)**2 + 1.0 # Peak at 0.5
>>>
>>> # Optimize cutoff frequency in range [0, 1]
>>> optimizer = BayesianOptimization(
... func=filter_performance,
... bounds=(0, 1),
... length_scale=0.1,
... noise=1e-5
... )
>>>
>>> # Run optimization
>>> best_cutoff, best_performance = optimizer.optimize(n_iter=20, random_seed=42)
>>> print(f"Best cutoff frequency: {best_cutoff[0]:.4f}")
>>> print(f"Best performance: {best_performance:.4f}")
Example 3: ECG Filter Parameter Optimization
>>> from vitalDSP.filtering.signal_filtering import SignalFiltering
>>>
>>> # Simulated ECG signal
>>> ecg_signal = np.sin(2*np.pi*1.2*np.linspace(0, 10, 1000)) + 0.1*np.random.randn(1000)
>>>
>>> # Define objective: minimize noise while preserving signal
>>> def filter_quality(params):
... cutoff = np.atleast_1d(params)[0]
... sf = SignalFiltering(ecg_signal)
... filtered = sf.butterworth_lowpass(cutoff_freq=cutoff, fs=100, order=4)
... # Quality metric: SNR-like measure
... signal_power = np.var(filtered)
... noise_power = np.var(ecg_signal - filtered)
... return signal_power / (noise_power + 1e-10)
>>>
>>> # Optimize filter cutoff
>>> optimizer = BayesianOptimization(filter_quality, bounds=(0.1, 10))
>>> best_cutoff, best_snr = optimizer.optimize(n_iter=15, random_seed=42)
>>> print(f"Optimal cutoff: {best_cutoff[0]:.2f} Hz, SNR: {best_snr:.2f}")
Notes:
Numerical Stability: The default noise parameter (1e-5) provides good numerical stability. Avoid using values smaller than 1e-7 as they can cause singular matrix errors.
Initialization: BayesianOptimization starts with 3 random samples to bootstrap the Gaussian Process before beginning optimization.
Fallback: If numerical issues occur during optimization, the algorithm falls back to random sampling to ensure it always returns a result.
- class vitalDSP.advanced_computation.bayesian_analysis.BayesianOptimization(func, bounds, length_scale=1.0, noise=1e-05)[source]
Bases:
objectA class implementing Bayesian Optimization for parameter tuning.
Bayesian Optimization is an efficient method for finding the minimum or maximum of an objective function, especially when the function is expensive to evaluate. It uses a Gaussian Process model to predict the function’s behavior and an acquisition function to determine the next point to sample.
- optimize(n_iter=10, random_seed=None)[source]
Perform Bayesian optimization to find the best parameters.
- acquisition(X, xi=0.01)[source]
Compute the Expected Improvement (EI) at new points.
- propose_location(n_restarts=10)[source]
Propose the next sampling location based on the acquisition function.
- Example Usage
- -------------
- >>> def objective_function(x):
- >>> return -np.sin(3 * x) - x ** 2 + 0.7 * x
- >>>
- >>> bayesian_optimizer = BayesianOptimization(objective_function, bounds=(0, 2))
- >>> best_x, best_y = bayesian_optimizer.optimize(n_iter=10)
- >>> print("Best X:", best_x)
- >>> print("Best Y:", best_y)
- acquisition(X, xi=0.01)[source]
Expected Improvement (EI) acquisition function.
The EI acquisition function is used to balance exploration and exploitation in Bayesian Optimization. It selects points that have the potential to improve upon the best observed value.
- Parameters:
X (numpy.ndarray) – The points at which to evaluate the acquisition function.
xi (float, optional) – Exploration-exploitation trade-off parameter (default is 0.01).
- Returns:
The EI values at the provided points.
- Return type:
- optimize(n_iter=10, random_seed=None)[source]
Perform Bayesian optimization to find the best parameters.
- Parameters:
- Returns:
The best parameters and the corresponding function value.
- Return type:
Notes
The optimization process starts with a random sample to initialize the Gaussian Process, then iteratively selects new points using the acquisition function to balance exploration and exploitation.
Examples
>>> def objective(x): ... return -np.sin(3 * x) - x ** 2 + 0.7 * x >>> optimizer = BayesianOptimization(objective, bounds=(0, 2)) >>> best_x, best_y = optimizer.optimize(n_iter=10, random_seed=42) >>> print(f"Best X: {best_x}, Best Y: {best_y}")
- class vitalDSP.advanced_computation.bayesian_analysis.GaussianProcess(length_scale=1.0, noise=1e-10)[source]
Bases:
objectA class implementing Gaussian Process (GP) for Bayesian Optimization.
Gaussian Process models are used to predict the mean and variance of an unknown function based on observed data. This is particularly useful in Bayesian Optimization, where the GP helps in selecting the next sample point to evaluate.
- predict(X)[source]
Predict the mean and variance of the objective function at new points X.
- update(X_train, y_train)[source]
Update the GP model with new observations.
- Example Usage
- -------------
- >>> gp = GaussianProcess(length_scale=1.0, noise=1e-10)
- >>> X_train = np.array([[0.1], [0.4], [0.7]])
- >>> y_train = np.sin(3 * X_train) - X_train ** 2 + 0.7 * X_train
- >>> gp.update(X_train, y_train)
- >>> X_new = np.array([[0.2], [0.5]])
- >>> mean, variance = gp.predict(X_new)
- >>> print("Predicted Mean:", mean)
- >>> print("Predicted Variance:", variance)
- predict(X)[source]
Predict the mean and variance of the objective function at new points.
- Parameters:
X (numpy.ndarray) – The points at which to predict the mean and variance.
- Returns:
The predicted mean and variance at the input points X.
- Return type:
- Raises:
ValueError – If the GP model has not been updated with any training data.
- update(X_train, y_train)[source]
Update the GP model with new observations.
- Parameters:
X_train (numpy.ndarray) – The observed input points.
y_train (numpy.ndarray) – The observed output values corresponding to X_train.
Empirical Mode Decomposition (EMD)
Decomposing signals into intrinsic mode functions using Empirical Mode Decomposition, useful in analyzing non-linear and non-stationary signals.
Empirical Mode Decomposition (EMD) Module for Physiological Signal Processing
This module provides comprehensive Empirical Mode Decomposition capabilities for physiological signals including ECG, PPG, EEG, and other vital signs. EMD is particularly effective for analyzing non-linear and non-stationary signals by decomposing them into Intrinsic Mode Functions (IMFs).
Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0
Key Features: - Empirical Mode Decomposition (EMD) implementation - Intrinsic Mode Functions (IMFs) extraction - Non-linear and non-stationary signal analysis - Customizable stop criteria and IMF limits - Signal decomposition and reconstruction - Advanced signal analysis capabilities
Examples:
- Basic EMD decomposition:
>>> import numpy as np >>> from vitalDSP.advanced_computation.emd import EMD >>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100) >>> emd = EMD(signal) >>> imfs = emd.emd() >>> print(f"Number of IMFs: {len(imfs)}")
- Limited IMF decomposition:
>>> emd_limited = EMD(signal) >>> imfs_limited = emd_limited.emd(max_imfs=3) >>> print(f"Limited IMFs: {len(imfs_limited)}")
- Custom stop criterion:
>>> emd_custom = EMD(signal) >>> imfs_custom = emd_custom.emd(stop_criterion=0.01) >>> print(f"Custom stop criterion IMFs: {len(imfs_custom)}")
- Signal reconstruction:
>>> reconstructed = np.sum(imfs, axis=0) >>> print(f"Reconstruction error: {np.mean((signal - reconstructed)**2):.6f}")
- class vitalDSP.advanced_computation.emd.EMD(signal)[source]
Bases:
objectEmpirical Mode Decomposition (EMD) for decomposing non-linear and non-stationary signals into Intrinsic Mode Functions (IMFs).
EMD is particularly useful for analyzing signals that are non-linear and non-stationary, where traditional methods like Fourier Transform may not be effective.
- emd : method
Performs the EMD on the input signal and returns the IMFs.
Examples
>>> import numpy as np >>> from vitalDSP.advanced_computation.emd import EMD >>> >>> # Example 1: Basic EMD decomposition >>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100) >>> emd = EMD(signal) >>> imfs = emd.emd() >>> print(f"Number of IMFs: {len(imfs)}") >>> >>> # Example 2: EMD with limited number of IMFs >>> emd_limited = EMD(signal) >>> imfs_limited = emd_limited.emd(max_imfs=3) >>> print(f"Limited IMFs: {len(imfs_limited)}") >>> >>> # Example 3: EMD with custom stop criterion >>> emd_custom = EMD(signal) >>> imfs_custom = emd_custom.emd(stop_criterion=0.01) >>> print(f"Custom stop criterion IMFs: {len(imfs_custom)}")
- emd(max_imfs=None, stop_criterion=0.05, max_sifting_iterations=20, max_decomposition_iterations=10)[source]
Perform Empirical Mode Decomposition (EMD) on the input signal.
- Parameters:
max_imfs (int or None, optional) – Maximum number of IMFs to extract. If None, extract all possible IMFs (default is None).
stop_criterion (float, optional) – The stopping criterion for the sifting process, which controls how close the IMF is to an ideal IMF (default is 0.05).
max_sifting_iterations (int, optional) – Maximum number of sifting iterations per IMF to prevent infinite loops (default is 20).
max_decomposition_iterations (int, optional) – Maximum number of decomposition iterations to prevent excessive computation (default is 10).
- Returns:
imfs – A list of IMFs (Intrinsic Mode Functions) extracted from the signal.
- Return type:
Notes
Each IMF represents a simple oscillatory mode embedded in the signal. The sum of all IMFs plus the final residual will reconstruct the original signal.
OPTIMIZATION: Added convergence limits to prevent infinite loops and improve reliability.
Examples
>>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100) >>> emd = EMD(signal) >>> imfs = emd.emd() >>> print("IMFs:", imfs)
Generative Signal Synthesis
Methods for generating synthetic signals that mimic real-world biomedical signals, useful in training and testing algorithms.
Advanced Computation Module for Physiological Signal Processing
This module provides comprehensive capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.
Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0
Key Features: - Object-oriented design with comprehensive classes - Multiple processing methods and functions - NumPy integration for numerical computations
Examples:
- Basic usage:
>>> import numpy as np >>> from vitalDSP.advanced_computation.generative_signal_synthesis import GenerativeSignalSynthesis >>> signal = np.random.randn(1000) >>> gen = GenerativeSignalSynthesis() >>> signal = gen.generate()
- class vitalDSP.advanced_computation.generative_signal_synthesis.GenerativeSignalSynthesis[source]
Bases:
objectGenerative Signal Synthesis for creating synthetic signal data using various methods.
This class provides methods to generate synthetic signals using techniques such as random noise, Gaussian processes, autoregressive (AR) models, Markov chains, and custom-defined functions.
- generate(method="random_noise", length=100, \*\*kwargs) :
Generates synthetic signals using the specified method.
- Example Usage
- -------------
- signal_synthesizer = GenerativeSignalSynthesis()
- # Generate random noise
- random_signal = signal_synthesizer.generate(method="random_noise", length=100)
- print("Random Noise Signal:", random_signal)
- # Generate Gaussian process
- gp_signal = signal_synthesizer.generate(method="gaussian_process", length=100, mean=0, std_dev=1, correlation=0.9)
- print("Gaussian Process Signal:", gp_signal)
- # Generate AR model signal
- ar_signal = signal_synthesizer.generate(method="autoregressive", length=100, coeffs=[0.9, -0.5])
- print("AR Model Signal:", ar_signal)
- # Generate Markov chain signal
- markov_signal = signal_synthesizer.generate(method="markov_chain", length=100, states=[-1, 1], transition_matrix=[[0.9, 0.1], [0.1, 0.9]])
- print("Markov Chain Signal:", markov_signal)
- # Generate custom function signal
- custom_signal = signal_synthesizer.generate(method="custom_function", length=100, func=lambda x: np.sin(x))
- print("Custom Function Signal:", custom_signal)
- generate(method='random_noise', length=100, **kwargs)[source]
Generate synthetic signals.
- Parameters:
method (str, optional) – The method to use for generating the signal. Options include “random_noise”, “gaussian_process”, “autoregressive”, “markov_chain”, “custom_function”. Default is “random_noise”.
length (int, optional) – The length of the generated signal. Default is 100.
kwargs (dict) – Additional parameters depending on the generation method. These include: - mean (float): Mean of the noise or process (for “random_noise” and “gaussian_process”). - std_dev (float): Standard deviation (for “random_noise” and “gaussian_process”). - correlation (float): Correlation coefficient between successive points (for “gaussian_process”). - coeffs (list of float): Coefficients for the AR model (for “autoregressive”). - states (list): Possible states of the Markov chain (for “markov_chain”). - transition_matrix (list of list of float): State transition matrix (for “markov_chain”). - func (callable): Function to generate the signal (for “custom_function”).
- Returns:
The generated synthetic signal.
- Return type:
- Raises:
ValueError – If the specified generation method is unknown.
Harmonic/Percussive Separation
Techniques for separating harmonic and percussive components of signals, particularly useful in audio signal processing.
Advanced Computation Module for Physiological Signal Processing
This module provides comprehensive capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.
Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0
Key Features: - Object-oriented design with comprehensive classes - Multiple processing methods and functions - NumPy integration for numerical computations
Examples:
- Basic usage:
>>> import numpy as np >>> from vitalDSP.advanced_computation.harmonic_percussive_separation import HarmonicPercussiveSeparation >>> signal = np.random.randn(1000) >>> hps = HarmonicPercussiveSeparation(signal) >>> harmonic, percussive = hps.separate()
- class vitalDSP.advanced_computation.harmonic_percussive_separation.HarmonicPercussiveSeparation(signal)[source]
Bases:
objectHarmonic-Percussive Separation for separating vocal and noise components in respiratory analysis.
This class separates a signal into its harmonic (sustained, tonal) and percussive (transient, noisy) components, which can be particularly useful in respiratory analysis to distinguish between vocal and noise components.
- separate : method
Separates the harmonic and percussive components of the signal using median filtering.
- Example Usage
- -------------
- signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100)
- hps = HarmonicPercussiveSeparation(signal)
- harmonic, percussive = hps.separate()
- print("Harmonic Component:", harmonic)
- print("Percussive Component:", percussive)
- separate(kernel_size=31)[source]
Separate the harmonic and percussive components of the signal using median filtering.
The harmonic component is obtained by applying a median filter across the time axis, while the percussive component is obtained by applying a median filter across the frequency axis.
- Parameters:
kernel_size (int, optional) – Size of the median filter kernel (default is 31).
- Returns:
harmonic (numpy.ndarray) – The harmonic component of the signal.
percussive (numpy.ndarray) – The percussive component of the signal.
Example
>>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100) >>> hps = HarmonicPercussiveSeparation(signal) >>> harmonic, percussive = hps.separate(kernel_size=31) >>> print(harmonic) >>> print(percussive)
Kalman Filter
Implementing Kalman Filters for optimal estimation of system states, widely used in tracking and predictive modeling.
Advanced Computation Module for Physiological Signal Processing
This module provides comprehensive capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.
Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0
Key Features: - Object-oriented design with comprehensive classes - Multiple processing methods and functions - NumPy integration for numerical computations - Signal validation and error handling - Advanced filtering algorithms
Examples:
- Basic usage:
>>> import numpy as np >>> from vitalDSP.advanced_computation.kalman_filter import KalmanFilter >>> signal = np.random.randn(1000) >>> kf = KalmanFilter(signal) >>> filtered = kf.filter()
- class vitalDSP.advanced_computation.kalman_filter.KalmanFilter(initial_state, initial_covariance, process_covariance, measurement_covariance)[source]
Bases:
objectA Kalman Filter for real-time filtering and continuous monitoring of signals.
The Kalman Filter is an optimal recursive data processing algorithm that estimates the state of a dynamic system from a series of noisy measurements. It is widely used in control systems, signal processing, and time series analysis.
- Parameters:
initial_state (numpy.ndarray) – The initial state estimate of the system.
initial_covariance (numpy.ndarray) – The initial covariance estimate of the state.
process_covariance (numpy.ndarray) – The process noise covariance matrix, representing the uncertainty in the system dynamics.
measurement_covariance (numpy.ndarray) – The measurement noise covariance matrix, representing the uncertainty in the measurements.
- filter(signal, measurement_matrix, transition_matrix, control_input=None, control_matrix=None)[source]
Apply the Kalman filter to the input signal to obtain a filtered estimate of the state.
Examples
>>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100) >>> initial_state = np.array([0]) >>> initial_covariance = np.array([[1]]) >>> process_covariance = np.array([[1e-5]]) >>> measurement_covariance = np.array([[1e-1]]) >>> measurement_matrix = np.array([[1]]) >>> transition_matrix = np.array([[1]])
>>> kalman_filter = KalmanFilter(initial_state, initial_covariance, process_covariance, measurement_covariance) >>> filtered_signal = kalman_filter.filter(signal, measurement_matrix, transition_matrix) >>> print("Filtered Signal:", filtered_signal)
- filter(signal, measurement_matrix, transition_matrix, control_input=None, control_matrix=None)[source]
Apply the Kalman filter to the input signal.
The Kalman filter recursively estimates the state of a dynamic system from noisy measurements. At each time step, it predicts the next state, updates the estimate based on the actual measurement, and provides a filtered signal as output.
- Parameters:
signal (numpy.ndarray) – The input signal to be filtered.
measurement_matrix (numpy.ndarray) – The measurement matrix that maps the true state space into the observed space.
transition_matrix (numpy.ndarray) – The state transition matrix that describes how the state evolves from one time step to the next.
control_input (numpy.ndarray or None, optional) – An optional control input to the system (default is None).
control_matrix (numpy.ndarray or None, optional) – An optional control matrix that maps the control input to the state space (default is None).
- Returns:
The filtered signal, where each element corresponds to the estimated state at each time step.
- Return type:
Examples
>>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100) >>> measurement_matrix = np.array([[1]]) >>> transition_matrix = np.array([[1]]) >>> kalman_filter = KalmanFilter(initial_state, initial_covariance, process_covariance, measurement_covariance) >>> filtered_signal = kalman_filter.filter(signal, measurement_matrix, transition_matrix) >>> print(filtered_signal)
Multimodal Fusion
Combining information from multiple signal sources to improve the accuracy and robustness of signal analysis.
Advanced Computation Module for Physiological Signal Processing
This module provides comprehensive capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.
Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0
Key Features: - Object-oriented design with comprehensive classes - Multiple processing methods and functions - NumPy integration for numerical computations
Examples:
- Basic usage:
>>> import numpy as np >>> from vitalDSP.advanced_computation.multimodal_fusion import MultimodalFusion >>> signal = np.random.randn(1000) >>> mf = MultimodalFusion(signal) >>> fused = mf.fuse()
- class vitalDSP.advanced_computation.multimodal_fusion.MultimodalFusion(signals)[source]
Bases:
objectMultimodal Fusion class for combining multiple signals to improve health assessments.
The class supports various fusion strategies, including weighted sum, concatenation, PCA-based fusion, maximum, and minimum.
Example Usage
signal1 = np.sin(np.linspace(0, 10, 100)) signal2 = np.cos(np.linspace(0, 10, 100)) fusion = MultimodalFusion([signal1, signal2])
# Weighted Sum Fusion fused_signal_weighted = fusion.fuse(strategy=”weighted_sum”, weights=[0.6, 0.4]) print(“Fused Signal (Weighted Sum):”, fused_signal_weighted)
# Concatenation Fusion fused_signal_concat = fusion.fuse(strategy=”concatenation”) print(“Fused Signal (Concatenation):”, fused_signal_concat)
# PCA-based Fusion fused_signal_pca = fusion.fuse(strategy=”pca”, n_components=1) print(“Fused Signal (PCA):”, fused_signal_pca)
# Maximum Fusion fused_signal_max = fusion.fuse(strategy=”maximum”) print(“Fused Signal (Maximum):”, fused_signal_max)
# Minimum Fusion fused_signal_min = fusion.fuse(strategy=”minimum”) print(“Fused Signal (Minimum):”, fused_signal_min)
- fuse(strategy='weighted_sum', **kwargs)[source]
Fuse multiple signals using the specified strategy.
- Parameters:
strategy (str, optional) – The fusion strategy to use. Options include “weighted_sum”, “concatenation”, “pca”, “maximum”, “minimum” (default is “weighted_sum”).
kwargs (dict, optional) – Additional arguments depending on the fusion strategy. - weights (list of float): Weights for the weighted sum strategy (required if strategy=”weighted_sum”). - n_components (int): Number of principal components to keep for PCA (optional, default=1).
- Returns:
The fused signal based on the chosen strategy.
- Return type:
- Raises:
ValueError – If an unknown fusion strategy is specified or if required arguments are missing.
Examples
>>> signal1 = np.sin(np.linspace(0, 10, 100)) >>> signal2 = np.cos(np.linspace(0, 10, 100)) >>> fusion = MultimodalFusion([signal1, signal2]) >>> fused_signal = fusion.fuse(strategy="weighted_sum", weights=[0.6, 0.4]) >>> print(fused_signal)
Non-Linear Analysis
Analyzing signals using non-linear methods, essential for understanding complex systems where traditional linear methods fall short.
Advanced Computation Module for Physiological Signal Processing
This module provides comprehensive capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.
Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0
Key Features: - Object-oriented design with comprehensive classes - Multiple processing methods and functions - NumPy integration for numerical computations - Interactive visualization capabilities - Comprehensive signal analysis
Examples:
- Basic usage:
>>> import numpy as np >>> from vitalDSP.advanced_computation.non_linear_analysis import NonlinearAnalysis >>> signal = np.random.randn(1000) >>> nla = NonlinearAnalysis(signal) >>> le = nla.lyapunov_exponent()
- class vitalDSP.advanced_computation.non_linear_analysis.NonlinearAnalysis(signal)[source]
Bases:
objectNonlinear Analysis for examining chaotic signals, such as Heart Rate Variability (HRV).
This class provides methods for analyzing the chaotic behavior of signals using techniques like the estimation of Lyapunov exponents, generation of Poincaré plots, and calculation of correlation dimensions.
- lyapunov_exponent(max_iter=1000, epsilon=1e-8)[source]
Estimates the largest Lyapunov exponent to assess chaos in the signal.
- poincare_plot()[source]
Generates a Poincaré plot to visualize the dynamics of the signal.
- correlation_dimension(radius=0.1)[source]
Estimates the correlation dimension of the signal.
- Example Usage
- -------------
- >>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100)
- >>> nonlinear_analysis = NonlinearAnalysis(signal)
- >>> lyapunov = nonlinear_analysis.lyapunov_exponent()
- >>> print("Lyapunov Exponent:", lyapunov)
- >>> nonlinear_analysis.poincare_plot()
- >>> correlation_dim = nonlinear_analysis.correlation_dimension()
- >>> print("Correlation Dimension:", correlation_dim)
- correlation_dimension(radius=0.1, normalize=True)[source]
Estimate the correlation dimension of the signal using the Grassberger-Procaccia method.
The correlation dimension is a measure of the fractal dimension of the attractor in the phase space of the signal. It provides insight into the complexity of the signal.
IMPORTANT: Signal normalization is highly recommended for physiological signals to ensure the radius parameter is appropriate for the signal’s scale.
- Parameters:
radius (float, optional) – Radius within which points are considered neighbors (default is 0.1). For normalized signals, typical values are 0.1-2.0. For raw signals, choose radius based on signal scale.
normalize (bool, optional) – If True, automatically normalize the signal before computation (default is True). Normalization ensures radius is appropriate regardless of signal amplitude.
- Returns:
The estimated correlation dimension.
- Return type:
- Raises:
ValueError – If radius is 1.0 (causes divide-by-zero), or if no point pairs found within radius.
Warning – If correlation dimension is negative (suggests inappropriate radius selection).
Examples
>>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100) >>> nonlinear_analysis = NonlinearAnalysis(signal) >>> correlation_dim = nonlinear_analysis.correlation_dimension(radius=0.5) >>> print("Correlation Dimension:", correlation_dim)
Notes
For physiological signals (ECG, PPG): 1. Always normalize the signal (normalize=True, default) 2. Use radius in range [0.1, 2.0] for normalized signals 3. Avoid radius = 1.0 exactly (causes log(1.0) = 0) 4. If negative result, try different radius or check signal quality
The correlation dimension should typically be between 0 and the embedding dimension. Values outside this range suggest inappropriate parameters or unsuitable signal.
- lyapunov_exponent(max_iter=1000, epsilon=1e-08, normalize=True)[source]
Estimate the largest Lyapunov exponent of the signal.
The Lyapunov exponent is a measure of the rate of separation of infinitesimally close trajectories in a chaotic system. A positive Lyapunov exponent indicates chaos.
IMPORTANT: For reliable results with physiological signals (ECG, PPG), signal normalization is highly recommended to avoid numerical instabilities.
- Parameters:
max_iter (int, optional) – Maximum number of iterations to compute the exponent (default is 1000).
epsilon (float, optional) – Small perturbation used to calculate divergence, to avoid division by zero (default is 1e-8).
normalize (bool, optional) – If True, automatically normalize the signal before computation (default is True). Normalization prevents divide-by-zero errors with signals containing flat segments.
- Returns:
The estimated largest Lyapunov exponent.
- Return type:
- Raises:
ValueError – If signal is too short for the specified max_iter.
Warning – If computation results in NaN or inf values.
Examples
>>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100) >>> nonlinear_analysis = NonlinearAnalysis(signal) >>> lyapunov = nonlinear_analysis.lyapunov_exponent() >>> print("Lyapunov Exponent:", lyapunov)
Notes
Uses the Rosenstein et al. (1993) algorithm: delay-embeds the signal into a dim-dimensional phase space, finds the nearest neighbor for each reference point (with Theiler window to exclude temporal neighbors), tracks trajectory divergence, and fits the slope of the mean log-divergence curve.
- poincare_plot()[source]
Generate a Poincaré plot to visualize the dynamics of the signal.
The Poincaré plot is a scatter plot of the signal against its delayed version. It is often used to visualize periodic and chaotic dynamics in the signal.
- Returns:
The generated Poincaré plot.
- Return type:
Examples
>>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100) >>> nonlinear_analysis = NonlinearAnalysis(signal) >>> nonlinear_analysis.poincare_plot()
Pitch Shift
Techniques for shifting the pitch of audio signals, often used in music signal processing and speech modulation.
Advanced Computation Module for Physiological Signal Processing
This module provides comprehensive capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.
Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0
Key Features: - Object-oriented design with comprehensive classes - Multiple processing methods and functions - NumPy integration for numerical computations
Examples:
- Basic usage:
>>> import numpy as np >>> from vitalDSP.advanced_computation.pitch_shift import PitchShift >>> signal = np.random.randn(1000) >>> ps = PitchShift(signal, fs=256) >>> shifted = ps.shift_pitch(semitones=2)
- class vitalDSP.advanced_computation.pitch_shift.PitchShift(signal, sampling_rate=1000)[source]
Bases:
objectA class for pitch shifting and pitch detection, primarily used in the analysis of vocal pitch. This is particularly useful in diagnosing speech disorders and other vocal characteristics.
- shift_pitch : method
Shifts the pitch of the input signal by a specified number of semitones.
- detect_pitch : method
Detects the pitch of the input signal using the autocorrelation method.
- Example Usage
- -------------
- signal = np.sin(2 * np.pi * 440 * np.linspace(0, 1, 1000)) # A simple sine wave at 440 Hz (A4)
- pitch_shift = PitchShift(signal, sampling_rate=1000)
- shifted_signal = pitch_shift.shift_pitch(semitones=2)
- print("Shifted Signal:", shifted_signal)
- detected_pitch = pitch_shift.detect_pitch()
- print("Detected Pitch:", detected_pitch, "Hz")
- detect_pitch()[source]
Detect the pitch of the input signal using the autocorrelation method.
The autocorrelation method is commonly used to estimate the fundamental frequency (pitch) of a signal. This method is particularly effective for monophonic signals like speech or musical notes.
- Returns:
pitch – The detected pitch of the signal in Hertz (Hz).
- Return type:
Examples
>>> signal = np.sin(2 * np.pi * 440 * np.linspace(0, 1, 1000)) # 440 Hz signal >>> pitch_shift = PitchShift(signal, sampling_rate=1000) >>> detected_pitch = pitch_shift.detect_pitch() >>> print("Detected Pitch:", detected_pitch, "Hz")
- shift_pitch(semitones=1)[source]
Shift the pitch of the input signal by a given number of semitones.
This method shifts the pitch of the signal without altering its duration, which is useful for modifying vocal pitch in a controlled manner.
- Parameters:
semitones (int, optional) – The number of semitones to shift the pitch. A positive value raises the pitch, while a negative value lowers it. Default is 1 semitone.
- Returns:
shifted_signal – The pitch-shifted signal.
- Return type:
Examples
>>> signal = np.sin(2 * np.pi * 440 * np.linspace(0, 1, 1000)) # 440 Hz signal >>> pitch_shift = PitchShift(signal, sampling_rate=1000) >>> shifted_signal = pitch_shift.shift_pitch(semitones=3) >>> print("Shifted Signal:", shifted_signal)
Sparse Signal Processing
Techniques focused on processing and analyzing sparse signals, which are common in compressed sensing and other applications.
Advanced Computation Module for Physiological Signal Processing
This module provides comprehensive capabilities for physiological signal processing including ECG, PPG, EEG, and other vital signs.
Author: vitalDSP Team Date: 2025-01-27 Version: 1.0.0
Key Features: - Object-oriented design with comprehensive classes - Multiple processing methods and functions - NumPy integration for numerical computations
Examples:
- Basic usage:
>>> import numpy as np >>> from vitalDSP.advanced_computation.sparse_signal_processing import SparseSignalProcessing >>> signal = np.random.randn(1000) >>> ssp = SparseSignalProcessing(signal) >>> coeffs = ssp.sparse_representation()
- class vitalDSP.advanced_computation.sparse_signal_processing.SparseSignalProcessing(signal)[source]
Bases:
objectSparse Signal Processing for efficient representation and processing of signals.
This class provides methods for transforming a signal into a sparse domain, applying thresholding to remove noise, and reconstructing the signal from its sparse representation.
- sparse_representation : method
Represents the signal using a specified sparse basis (e.g., wavelets, DCT, FFT).
- thresholding : method
Applies a threshold to the sparse representation to denoise the signal.
- reconstruction : method
Reconstructs the signal from its sparse representation.
- Example Usage
- -------------
- signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100)
- sparse_processing = SparseSignalProcessing(signal)
- sparse_rep = sparse_processing.sparse_representation(np.fft.fft)
- thresholded_sparse = sparse_processing.thresholding(sparse_rep, threshold=0.1)
- reconstructed_signal = sparse_processing.reconstruction(thresholded_sparse, np.fft.ifft)
- print("Reconstructed Signal:", reconstructed_signal)
- reconstruction(sparse_rep, inverse_basis)[source]
Reconstruct the signal from its sparse representation.
This method applies the inverse transform to the sparse representation to reconstruct the signal in its original domain.
- Parameters:
sparse_rep (numpy.ndarray) – The sparse representation of the signal.
inverse_basis (callable) – A function or method to inverse-transform the signal from the sparse domain (e.g., np.fft.ifft for inverse FFT, or wavelet_transform.perform_inverse_wavelet_transform for inverse wavelet transform).
- Returns:
The reconstructed signal.
- Return type:
Examples
>>> reconstructed_signal = sparse_processing.reconstruction(thresholded_sparse, np.fft.ifft) >>> print(reconstructed_signal)
- sparse_representation(basis)[source]
Represent the signal using a sparse basis.
This method transforms the signal into a sparse domain, such as using wavelets, DCT (Discrete Cosine Transform), or FFT (Fast Fourier Transform), allowing for efficient processing and manipulation.
- Parameters:
basis (callable) – A function or method to transform the signal to the sparse domain (e.g., np.fft.fft for FFT, or wavelet_transform.perform_wavelet_transform for wavelet transform).
- Returns:
The sparse representation of the signal.
- Return type:
Examples
>>> signal = np.sin(np.linspace(0, 10, 100)) + 0.5 * np.random.normal(size=100) >>> sparse_processing = SparseSignalProcessing(signal) >>> sparse_rep = sparse_processing.sparse_representation(np.fft.fft) >>> print(sparse_rep)
- thresholding(sparse_rep, threshold)[source]
Apply a threshold to the sparse representation to denoise the signal.
This method sets values in the sparse representation that are below the threshold to zero, effectively removing small coefficients that may correspond to noise.
- Parameters:
sparse_rep (numpy.ndarray) – The sparse representation of the signal.
threshold (float) – The threshold value. Coefficients with absolute values below this threshold will be set to zero.
- Returns:
The thresholded sparse representation.
- Return type:
Examples
>>> thresholded_sparse = sparse_processing.thresholding(sparse_rep, threshold=0.1) >>> print(thresholded_sparse)