Advanced Computation
This notebook demonstrates advanced signal processing techniques including anomaly detection, real-time monitoring, harmonic-percussive separation, and Empirical Mode Decomposition (EMD) using vitalDSP.
Setup
import numpy as np
import plotly.io as pio
pio.renderers.default = "sphinx_gallery"
from plotly import graph_objects as go
from vitalDSP.notebooks import load_sample_ecg_small, load_sample_ppg, plot_trace
fs = 128
signal_col, date_col = load_sample_ecg_small()
signal_col = np.array(signal_col)
print(f"ECG signal: {len(signal_col)} samples at {fs} Hz")
plot_trace(signal_col)
Anomaly Detection
from vitalDSP.advanced_computation.anomaly_detection import AnomalyDetection
ad = AnomalyDetection(signal_col)
# Z-score based anomaly detection
anomalies_z = ad.detect_anomalies(method='z_score', threshold=3.0)
print(f"Z-score anomalies detected: {len(anomalies_z)}")
# IQR based
anomalies_iqr = ad.detect_anomalies(method='iqr')
print(f"IQR anomalies detected : {len(anomalies_iqr)}")
# Visualize anomalies
fig = go.Figure()
fig.add_trace(go.Scatter(y=signal_col, mode='lines', name='ECG Signal', opacity=0.7))
if len(anomalies_z) > 0:
fig.add_trace(go.Scatter(
x=anomalies_z,
y=signal_col[anomalies_z],
mode='markers', marker=dict(color='red', size=8, symbol='circle-open'),
name='Anomalies (Z-score)'
))
fig.update_layout(title='ECG Anomaly Detection', xaxis_title='Sample', yaxis_title='Amplitude')
fig.show()
Real-Time Anomaly Detection
from vitalDSP.advanced_computation.real_time_anomaly_detection import RealTimeAnomalyDetection
# Initialize with a sliding window size
rtad = RealTimeAnomalyDetection(window_size=64)
# Train autoencoder on the first 512 samples as normal baseline
training_data = signal_col[:512]
rtad.train_autoencoder(training_data, encoding_dim=8)
# Stream test samples and detect anomalies
test_segment = signal_col[512:640]
anomaly_flags = [rtad.detect_autoencoder(np.array([s]), threshold=0.1) for s in test_segment]
stat_flags = [rtad.detect_statistical(np.array([s]), method='z_score', threshold=2.5) for s in test_segment]
print(f"Autoencoder anomalies in test segment: {sum(anomaly_flags)} / {len(test_segment)} samples")
print(f"Statistical anomalies : {sum(stat_flags)} / {len(test_segment)} samples")
Harmonic-Percussive Source Separation
from vitalDSP.advanced_computation.harmonic_percussive_separation import HarmonicPercussiveSeparation
hps = HarmonicPercussiveSeparation(signal_col)
harmonic, percussive = hps.separate()
print(f"Harmonic component : shape={harmonic.shape}, power={np.mean(harmonic**2):.4f}")
print(f"Percussive component: shape={percussive.shape}, power={np.mean(percussive**2):.4f}")
fig = go.Figure()
fig.add_trace(go.Scatter(y=signal_col[:512], mode='lines', name='Original', opacity=0.4))
fig.add_trace(go.Scatter(y=harmonic[:512], mode='lines', name='Harmonic (smooth)'))
fig.add_trace(go.Scatter(y=percussive[:512], mode='lines', name='Percussive (transients)'))
fig.update_layout(
title='Harmonic-Percussive Separation',
xaxis_title='Sample', yaxis_title='Amplitude'
)
fig.show()
Empirical Mode Decomposition (EMD)
from vitalDSP.advanced_computation.emd import EMD
# Use a shorter segment for EMD (computationally intensive)
short_signal = signal_col[:512]
emd = EMD(short_signal)
imfs = emd.emd()
print(f"Number of IMFs extracted: {len(imfs)}")
for i, imf in enumerate(imfs):
print(f" IMF {i+1}: mean={np.mean(imf):.4f}, std={np.std(imf):.4f}")
# Plot IMFs
n_plot = min(4, len(imfs))
fig = go.Figure()
fig.add_trace(go.Scatter(y=short_signal, mode='lines', name='Original', opacity=0.4))
for i in range(n_plot):
fig.add_trace(go.Scatter(y=imfs[i], mode='lines', name=f'IMF {i+1}'))
fig.update_layout(
title='EMD: Intrinsic Mode Functions',
xaxis_title='Sample', yaxis_title='Amplitude'
)
fig.show()
Advanced Kalman Filter
from vitalDSP.advanced_computation.kalman_filter import KalmanFilter
# 1-D Kalman filter with identity transition / measurement matrices
kf = KalmanFilter(
initial_state=np.array([[signal_col[0]]]),
initial_covariance=np.array([[1.0]]),
process_covariance=np.array([[1e-3]]),
measurement_covariance=np.array([[0.1]])
)
H = np.array([[1.0]]) # measurement matrix
F = np.array([[1.0]]) # state transition matrix
kf_filtered = kf.filter(signal_col, measurement_matrix=H, transition_matrix=F)
plot_trace(signal_col, kf_filtered)
Sparse Signal Processing
from vitalDSP.advanced_computation.sparse_signal_processing import SparseSignalProcessing
ssp = SparseSignalProcessing(signal_col)
# Build a DCT basis for sparse representation
N = len(signal_col)
k = np.arange(N)
basis = np.cos(np.pi * k[:, None] * (2 * k[None, :] + 1) / (2 * N)) * np.sqrt(2 / N)
basis[0] *= 1 / np.sqrt(2)
sparse_repr = ssp.sparse_representation(basis)
# Threshold: keep only 5% of coefficients
threshold = np.percentile(np.abs(sparse_repr), 95)
sparse_thresh = ssp.thresholding(sparse_repr, threshold=threshold)
reconstructed = ssp.reconstruction(sparse_thresh, inverse_basis=basis.T)
print(f"Sparse reconstruction MSE (5% kept): {np.mean((signal_col - reconstructed)**2):.6f}")
plot_trace(signal_col, reconstructed)
Gaussian Process Regression
from vitalDSP.advanced_computation.bayesian_analysis import GaussianProcess
# Fit a GP to a short ECG segment as a smooth interpolator
n_train = 40
X_train = np.arange(n_train).reshape(-1, 1).astype(float)
y_train = signal_col[:n_train]
gp = GaussianProcess(length_scale=5.0, noise=0.05)
gp.update(X_train, y_train)
X_test = np.linspace(0, n_train - 1, 200).reshape(-1, 1)
mean_pred, var_pred = gp.predict(X_test)
std_pred = np.sqrt(var_pred)
fig = go.Figure()
fig.add_trace(go.Scatter(x=X_train.ravel(), y=y_train,
mode='markers', name='Training points'))
fig.add_trace(go.Scatter(x=X_test.ravel(), y=mean_pred,
mode='lines', name='GP mean prediction'))
fig.add_trace(go.Scatter(
x=np.concatenate([X_test.ravel(), X_test.ravel()[::-1]]),
y=np.concatenate([mean_pred + 2*std_pred, (mean_pred - 2*std_pred)[::-1]]),
fill='toself', opacity=0.2, name='95% confidence'
))
fig.update_layout(title='Gaussian Process Regression on ECG Segment',
xaxis_title='Sample', yaxis_title='Amplitude')
fig.show()
Bayesian Optimization
from vitalDSP.advanced_computation.bayesian_analysis import BayesianOptimization
# Optimize a synthetic objective (e.g., finding best filter cutoff)
def signal_snr_objective(cutoff):
from vitalDSP.filtering.signal_filtering import SignalFiltering
filtered = SignalFiltering(signal_col).lowpass(cutoff=cutoff, fs=fs, order=4)
noise = signal_col - filtered
snr = 10 * np.log10(np.mean(filtered**2) / (np.mean(noise**2) + 1e-8))
return snr
bo = BayesianOptimization(func=signal_snr_objective, bounds=(5.0, 45.0))
best_cutoff, best_snr = bo.optimize(n_iter=8)
print(f'Optimal lowpass cutoff: {float(best_cutoff):.2f} Hz (SNR={best_snr:.2f} dB)')
Generative Signal Synthesis
from vitalDSP.advanced_computation.generative_signal_synthesis import GenerativeSignalSynthesis
gss = GenerativeSignalSynthesis()
synth_ecg = gss.generate(method='random_noise', length=512)
synth_ar = gss.generate(method='autoregressive', length=512, coeffs=[0.9, -0.2])
print(f'Synthetic noise signal : {synth_ecg.shape}')
print(f'Synthetic sine signal : {synth_ar.shape}')
fig = go.Figure()
fig.add_trace(go.Scatter(y=synth_ecg, mode='lines', name='Random noise synthesis'))
fig.add_trace(go.Scatter(y=synth_ar, mode='lines', name='Autoregressive synthesis'))
fig.update_layout(title='Generative Signal Synthesis',
xaxis_title='Sample', yaxis_title='Amplitude')
fig.show()
Multimodal Signal Fusion
from vitalDSP.advanced_computation.multimodal_fusion import MultimodalFusion
from vitalDSP.notebooks import load_sample_ppg
ppg_col, _ = load_sample_ppg()
ppg_col = np.array(ppg_col)
n = min(len(signal_col), len(ppg_col))
# Normalize both signals before fusion
ecg_norm = (signal_col[:n] - signal_col[:n].mean()) / (signal_col[:n].std() + 1e-8)
ppg_norm = (ppg_col[:n] - ppg_col[:n].mean()) / (ppg_col[:n].std() + 1e-8)
mf = MultimodalFusion(signals=[ecg_norm, ppg_norm])
fused_ws = mf.fuse(strategy='weighted_sum', weights=[0.6, 0.4])
fused_pca = mf.fuse(strategy='pca', n_components=1)
fused_max = mf.fuse(strategy='maximum')
print(f'Weighted sum fusion : {fused_ws.shape}')
print(f'PCA fusion : {fused_pca.shape}')
print(f'Maximum fusion : {fused_max.shape}')
fig = go.Figure()
fig.add_trace(go.Scatter(y=ecg_norm[:512], mode='lines', name='ECG (norm)', opacity=0.5))
fig.add_trace(go.Scatter(y=ppg_norm[:512], mode='lines', name='PPG (norm)', opacity=0.5))
fig.add_trace(go.Scatter(y=fused_ws[:512], mode='lines', name='Weighted Sum (0.6/0.4)'))
fig.add_trace(go.Scatter(y=fused_pca[:512], mode='lines', name='PCA Fusion'))
fig.update_layout(title='Multimodal ECG-PPG Fusion',
xaxis_title='Sample', yaxis_title='Amplitude')
fig.show()
Neural Network Filtering
from vitalDSP.advanced_computation.neural_network_filtering import NeuralNetworkFiltering
# Train a feedforward network to learn the signal structure
# and apply it as a learned filter
nnf = NeuralNetworkFiltering(
signal_col[:512],
network_type='feedforward',
hidden_layers=[32, 16],
epochs=5,
batch_size=32
)
nnf.train()
filtered_nn = nnf.apply_filter().ravel()
print(f'NN filtered signal shape: {filtered_nn.shape}')
fig = go.Figure()
fig.add_trace(go.Scatter(y=signal_col[1:512], mode='lines', name='Original ECG', opacity=0.5))
fig.add_trace(go.Scatter(y=filtered_nn, mode='lines', name='NN Filtered'))
fig.update_layout(title='Neural Network Filtering',
xaxis_title='Sample', yaxis_title='Amplitude')
fig.show()