"""
Advanced Computation Module for Physiological Signal Processing
This module provides Bayesian inference and optimization capabilities for physiological
signal processing including ECG, PPG, EEG, and other vital signs.
Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.1
Key Features:
- Gaussian Process modeling for signal prediction
- Bayesian Optimization for hyperparameter tuning
- Expected Improvement acquisition function
- Numerical stability improvements
- Graceful error handling
Classes:
--------
GaussianProcess
Implements Gaussian Process regression for probabilistic modeling.
BayesianOptimization
Implements Bayesian Optimization using Gaussian Processes.
Examples:
---------
**Example 1: Gaussian Process Prediction**
>>> import numpy as np
>>> from vitalDSP.advanced_computation.bayesian_analysis import GaussianProcess
>>>
>>> # Create GP model
>>> gp = GaussianProcess(length_scale=1.0, noise=1e-5)
>>>
>>> # Training data
>>> X_train = np.array([[0.1], [0.4], [0.7]])
>>> y_train = np.sin(3 * X_train.flatten())
>>> gp.update(X_train, y_train)
>>>
>>> # Predict at new points
>>> X_new = np.array([[0.2], [0.5]])
>>> mean, variance = gp.predict(X_new)
>>> print(f"Predicted mean: {mean}")
>>> print(f"Predicted variance: {variance}")
**Example 2: Bayesian Optimization for Hyperparameter Tuning**
>>> from vitalDSP.advanced_computation.bayesian_analysis import BayesianOptimization
>>>
>>> # Define objective function to maximize
>>> def filter_performance(cutoff_freq):
... # Simulate filter performance metric
... cutoff = np.atleast_1d(cutoff_freq)[0]
... return -(cutoff - 0.5)**2 + 1.0 # Peak at 0.5
>>>
>>> # Optimize cutoff frequency in range [0, 1]
>>> optimizer = BayesianOptimization(
... func=filter_performance,
... bounds=(0, 1),
... length_scale=0.1,
... noise=1e-5
... )
>>>
>>> # Run optimization
>>> best_cutoff, best_performance = optimizer.optimize(n_iter=20, random_seed=42)
>>> print(f"Best cutoff frequency: {best_cutoff[0]:.4f}")
>>> print(f"Best performance: {best_performance:.4f}")
**Example 3: ECG Filter Parameter Optimization**
>>> from vitalDSP.filtering.signal_filtering import SignalFiltering
>>>
>>> # Simulated ECG signal
>>> ecg_signal = np.sin(2*np.pi*1.2*np.linspace(0, 10, 1000)) + 0.1*np.random.randn(1000)
>>>
>>> # Define objective: minimize noise while preserving signal
>>> def filter_quality(params):
... cutoff = np.atleast_1d(params)[0]
... sf = SignalFiltering(ecg_signal)
... filtered = sf.butterworth_lowpass(cutoff_freq=cutoff, fs=100, order=4)
... # Quality metric: SNR-like measure
... signal_power = np.var(filtered)
... noise_power = np.var(ecg_signal - filtered)
... return signal_power / (noise_power + 1e-10)
>>>
>>> # Optimize filter cutoff
>>> optimizer = BayesianOptimization(filter_quality, bounds=(0.1, 10))
>>> best_cutoff, best_snr = optimizer.optimize(n_iter=15, random_seed=42)
>>> print(f"Optimal cutoff: {best_cutoff[0]:.2f} Hz, SNR: {best_snr:.2f}")
Notes:
------
- **Numerical Stability**: The default noise parameter (1e-5) provides good numerical
stability. Avoid using values smaller than 1e-7 as they can cause singular matrix errors.
- **Initialization**: BayesianOptimization starts with 3 random samples to bootstrap
the Gaussian Process before beginning optimization.
- **Fallback**: If numerical issues occur during optimization, the algorithm falls
back to random sampling to ensure it always returns a result.
"""
import numpy as np
from scipy.optimize import minimize
from scipy.special import erf
[docs]
class GaussianProcess:
"""
A class implementing Gaussian Process (GP) for Bayesian Optimization.
Gaussian Process models are used to predict the mean and variance of an unknown function based on observed data. This is particularly useful in Bayesian Optimization, where the GP helps in selecting the next sample point to evaluate.
Methods
-------
predict(X)
Predict the mean and variance of the objective function at new points X.
update(X_train, y_train)
Update the GP model with new observations.
Example Usage
-------------
>>> gp = GaussianProcess(length_scale=1.0, noise=1e-10)
>>> X_train = np.array([[0.1], [0.4], [0.7]])
>>> y_train = np.sin(3 * X_train) - X_train ** 2 + 0.7 * X_train
>>> gp.update(X_train, y_train)
>>> X_new = np.array([[0.2], [0.5]])
>>> mean, variance = gp.predict(X_new)
>>> print("Predicted Mean:", mean)
>>> print("Predicted Variance:", variance)
"""
def __init__(self, length_scale=1.0, noise=1e-10):
"""
Initialize the Gaussian Process model.
Parameters
----------
length_scale : float, optional
The length scale parameter for the RBF kernel. Controls the smoothness of the function (default is 1.0).
noise : float, optional
The noise level in the observations. Used to regularize the covariance matrix (default is 1e-10).
"""
self.length_scale = length_scale
self.noise = noise
self.X_train = None
self.y_train = None
self.K = None
def _rbf_kernel(self, X1, X2):
"""
Radial Basis Function (RBF) kernel, also known as the Gaussian kernel.
Parameters
----------
X1 : numpy.ndarray
The first set of input points.
X2 : numpy.ndarray
The second set of input points.
Returns
-------
numpy.ndarray
The kernel matrix computed between X1 and X2.
"""
sqdist = (
np.sum(X1**2, 1).reshape(-1, 1) + np.sum(X2**2, 1) - 2 * np.dot(X1, X2.T)
)
return np.exp(-0.5 / self.length_scale**2 * sqdist)
[docs]
def predict(self, X):
"""
Predict the mean and variance of the objective function at new points.
Parameters
----------
X : numpy.ndarray
The points at which to predict the mean and variance.
Returns
-------
tuple
The predicted mean and variance at the input points X.
Raises
------
ValueError
If the GP model has not been updated with any training data.
"""
if self.X_train is None or self.y_train is None:
raise ValueError(
"The GP model has not been updated with any training data."
)
K_s = self._rbf_kernel(self.X_train, X)
K_ss = self._rbf_kernel(X, X)
K_inv = np.linalg.inv(self.K + self.noise**2 * np.eye(len(self.X_train)))
mu_s = K_s.T.dot(K_inv).dot(self.y_train)
cov_s = K_ss - K_s.T.dot(K_inv).dot(K_s)
return mu_s.ravel(), np.diag(cov_s)
[docs]
def update(self, X_train, y_train):
"""
Update the GP model with new observations.
Parameters
----------
X_train : numpy.ndarray
The observed input points.
y_train : numpy.ndarray
The observed output values corresponding to X_train.
"""
self.X_train = X_train
self.y_train = y_train
self.K = self._rbf_kernel(X_train, X_train)
[docs]
class BayesianOptimization:
"""
A class implementing Bayesian Optimization for parameter tuning.
Bayesian Optimization is an efficient method for finding the minimum or maximum of an objective function, especially when the function is expensive to evaluate. It uses a Gaussian Process model to predict the function's behavior and an acquisition function to determine the next point to sample.
Methods
-------
optimize(n_iter=10, random_seed=None)
Perform Bayesian optimization to find the best parameters.
acquisition(X, xi=0.01)
Compute the Expected Improvement (EI) at new points.
propose_location(n_restarts=10)
Propose the next sampling location based on the acquisition function.
Example Usage
-------------
>>> def objective_function(x):
>>> return -np.sin(3 * x) - x ** 2 + 0.7 * x
>>>
>>> bayesian_optimizer = BayesianOptimization(objective_function, bounds=(0, 2))
>>> best_x, best_y = bayesian_optimizer.optimize(n_iter=10)
>>> print("Best X:", best_x)
>>> print("Best Y:", best_y)
"""
def __init__(self, func, bounds, length_scale=1.0, noise=1e-5):
"""
Initialize BayesianOptimization with the objective function and bounds.
Parameters
----------
func : callable
The objective function to be optimized.
bounds : tuple
The bounds within which to search for the optimal parameters.
length_scale : float, optional
Length scale parameter for the Gaussian Process kernel (default is 1.0).
noise : float, optional
Noise level in the observations for numerical stability (default is 1e-5).
Larger values (1e-5 to 1e-3) provide better numerical stability.
"""
self.func = func
self.bounds = bounds
self.gp = GaussianProcess(length_scale, noise)
self.X_samples = []
self.Y_samples = []
[docs]
def acquisition(self, X, xi=0.01):
"""
Expected Improvement (EI) acquisition function.
The EI acquisition function is used to balance exploration and exploitation in Bayesian Optimization. It selects points that have the potential to improve upon the best observed value.
Parameters
----------
X : numpy.ndarray
The points at which to evaluate the acquisition function.
xi : float, optional
Exploration-exploitation trade-off parameter (default is 0.01).
Returns
-------
numpy.ndarray
The EI values at the provided points.
"""
mu, sigma = self.gp.predict(X)
mu_sample = np.max(self.Y_samples)
imp = mu - mu_sample - xi
# Avoid divide by zero
sigma_safe = np.where(sigma == 0.0, 1e-10, sigma)
Z = imp / sigma_safe
ei = imp * self._cdf(Z) + sigma * self._pdf(Z)
ei[sigma == 0.0] = 0.0
return ei
def _cdf(self, x):
"""
Cumulative Distribution Function (CDF) for the standard normal distribution.
Parameters
----------
x : numpy.ndarray
The input array.
Returns
-------
numpy.ndarray
The CDF values.
"""
return 0.5 * (1 + erf(x / np.sqrt(2)))
def _pdf(self, x):
"""
Probability Density Function (PDF) for the standard normal distribution.
Parameters
----------
x : numpy.ndarray
The input array.
Returns
-------
numpy.ndarray
The PDF values.
"""
return np.exp(-0.5 * x**2) / np.sqrt(2 * np.pi)
[docs]
def propose_location(self, n_restarts=10):
"""
Propose the next sampling location by optimizing the acquisition function.
Parameters
----------
n_restarts : int, optional
Number of restarts for the optimization to avoid local optima (default is 10).
Returns
-------
numpy.ndarray
The proposed next sampling point.
"""
best_value = None
best_x = None
for _ in range(n_restarts):
x_init = np.random.uniform(
self.bounds[0], self.bounds[1], size=(1,)
).flatten() # Flatten x_init
res = minimize(
lambda x: -self.acquisition(np.atleast_2d(x)),
x_init,
bounds=[self.bounds],
)
if best_value is None or res.fun < best_value:
best_value = res.fun
best_x = res.x
return best_x
[docs]
def optimize(self, n_iter=10, random_seed=None):
"""
Perform Bayesian optimization to find the best parameters.
Parameters
----------
n_iter : int, optional
Number of iterations for the optimization (default is 10).
random_seed : int or None, optional
Random seed for reproducibility (default is None).
Returns
-------
tuple
The best parameters and the corresponding function value.
Notes
-----
The optimization process starts with a random sample to initialize the Gaussian Process,
then iteratively selects new points using the acquisition function to balance exploration
and exploitation.
Examples
--------
>>> def objective(x):
... return -np.sin(3 * x) - x ** 2 + 0.7 * x
>>> optimizer = BayesianOptimization(objective, bounds=(0, 2))
>>> best_x, best_y = optimizer.optimize(n_iter=10, random_seed=42)
>>> print(f"Best X: {best_x}, Best Y: {best_y}")
"""
if random_seed is not None:
np.random.seed(random_seed)
# Initialize with 2-3 random samples to bootstrap the GP
# This prevents singular matrix issues
# Normalize Y_samples to ensure all values are scalars
# This handles cases where Y_samples might contain arrays from test setup
normalized_Y_samples = []
for y in self.Y_samples:
if isinstance(y, (list, np.ndarray)):
y = float(np.atleast_1d(y).flatten()[0])
normalized_Y_samples.append(float(y))
self.Y_samples = normalized_Y_samples
if len(self.X_samples) == 0:
n_init = 3 # Start with 3 random samples for stability
for _ in range(n_init):
X_init = np.random.uniform(
self.bounds[0], self.bounds[1], size=(1,)
).reshape(-1, 1)
Y_init = self.func(X_init)
self.X_samples.append(X_init)
self.Y_samples.append(float(np.atleast_1d(Y_init).flatten()[0]))
for _ in range(n_iter):
try:
# Convert samples to proper arrays for GP update
X_array = np.vstack(self.X_samples)
# Ensure all Y_samples are scalars before converting to array
Y_scalars = []
for y in self.Y_samples:
if isinstance(y, (list, np.ndarray)):
y = float(np.atleast_1d(y).flatten()[0])
Y_scalars.append(float(y))
Y_array = np.array(Y_scalars).flatten()
# Update GP with current samples
self.gp.update(X_array, Y_array)
# Propose next location using acquisition function
X_next = self.propose_location()
Y_next = self.func(X_next)
# Append new sample to the lists
self.X_samples.append(X_next.reshape(-1, 1))
self.Y_samples.append(float(np.atleast_1d(Y_next).flatten()[0]))
except np.linalg.LinAlgError:
# If numerical issues occur, fall back to random sampling
X_random = np.random.uniform(
self.bounds[0], self.bounds[1], size=(1,)
).reshape(-1, 1)
Y_random = self.func(X_random)
self.X_samples.append(X_random)
self.Y_samples.append(float(np.atleast_1d(Y_random).flatten()[0]))
# Find and return the best sample
best_index = np.argmax(self.Y_samples)
best_x = self.X_samples[best_index]
best_y = self.Y_samples[best_index]
# Ensure best_y is a numpy scalar (not Python float) for consistency with test expectations
best_y = np.float64(best_y)
return best_x.flatten(), best_y