Advanced Computation

This notebook demonstrates advanced signal processing techniques including anomaly detection, real-time monitoring, harmonic-percussive separation, and Empirical Mode Decomposition (EMD) using vitalDSP.

Setup

import numpy as np
import plotly.io as pio
pio.renderers.default = "sphinx_gallery"
from plotly import graph_objects as go
from vitalDSP.notebooks import load_sample_ecg_small, load_sample_ppg, plot_trace

fs = 128
signal_col, date_col = load_sample_ecg_small()
signal_col = np.array(signal_col)

print(f"ECG signal: {len(signal_col)} samples at {fs} Hz")
plot_trace(signal_col)

Anomaly Detection

from vitalDSP.advanced_computation.anomaly_detection import AnomalyDetection

ad = AnomalyDetection(signal_col)

# Z-score based anomaly detection
anomalies_z = ad.detect_anomalies(method='z_score', threshold=3.0)
print(f"Z-score anomalies detected: {len(anomalies_z)}")

# IQR based
anomalies_iqr = ad.detect_anomalies(method='iqr')
print(f"IQR anomalies detected    : {len(anomalies_iqr)}")

# Visualize anomalies
fig = go.Figure()
fig.add_trace(go.Scatter(y=signal_col, mode='lines', name='ECG Signal', opacity=0.7))
if len(anomalies_z) > 0:
    fig.add_trace(go.Scatter(
        x=anomalies_z,
        y=signal_col[anomalies_z],
        mode='markers', marker=dict(color='red', size=8, symbol='circle-open'),
        name='Anomalies (Z-score)'
    ))
fig.update_layout(title='ECG Anomaly Detection', xaxis_title='Sample', yaxis_title='Amplitude')
fig.show()

Real-Time Anomaly Detection

from vitalDSP.advanced_computation.real_time_anomaly_detection import RealTimeAnomalyDetection

# Initialize with a sliding window size
rtad = RealTimeAnomalyDetection(window_size=64)

# Train autoencoder on the first 512 samples as normal baseline
training_data = signal_col[:512]
rtad.train_autoencoder(training_data, encoding_dim=8)

# Stream test samples and detect anomalies
test_segment = signal_col[512:640]
anomaly_flags = [rtad.detect_autoencoder(np.array([s]), threshold=0.1) for s in test_segment]
stat_flags    = [rtad.detect_statistical(np.array([s]), method='z_score', threshold=2.5) for s in test_segment]

print(f"Autoencoder anomalies in test segment: {sum(anomaly_flags)} / {len(test_segment)} samples")
print(f"Statistical anomalies                : {sum(stat_flags)} / {len(test_segment)} samples")

Harmonic-Percussive Source Separation

from vitalDSP.advanced_computation.harmonic_percussive_separation import HarmonicPercussiveSeparation

hps = HarmonicPercussiveSeparation(signal_col)
harmonic, percussive = hps.separate()

print(f"Harmonic component  : shape={harmonic.shape}, power={np.mean(harmonic**2):.4f}")
print(f"Percussive component: shape={percussive.shape}, power={np.mean(percussive**2):.4f}")

fig = go.Figure()
fig.add_trace(go.Scatter(y=signal_col[:512], mode='lines', name='Original', opacity=0.4))
fig.add_trace(go.Scatter(y=harmonic[:512], mode='lines', name='Harmonic (smooth)'))
fig.add_trace(go.Scatter(y=percussive[:512], mode='lines', name='Percussive (transients)'))
fig.update_layout(
    title='Harmonic-Percussive Separation',
    xaxis_title='Sample', yaxis_title='Amplitude'
)
fig.show()

Empirical Mode Decomposition (EMD)

from vitalDSP.advanced_computation.emd import EMD

# Use a shorter segment for EMD (computationally intensive)
short_signal = signal_col[:512]

emd = EMD(short_signal)
imfs = emd.emd()

print(f"Number of IMFs extracted: {len(imfs)}")
for i, imf in enumerate(imfs):
    print(f"  IMF {i+1}: mean={np.mean(imf):.4f}, std={np.std(imf):.4f}")

# Plot IMFs
n_plot = min(4, len(imfs))
fig = go.Figure()
fig.add_trace(go.Scatter(y=short_signal, mode='lines', name='Original', opacity=0.4))
for i in range(n_plot):
    fig.add_trace(go.Scatter(y=imfs[i], mode='lines', name=f'IMF {i+1}'))
fig.update_layout(
    title='EMD: Intrinsic Mode Functions',
    xaxis_title='Sample', yaxis_title='Amplitude'
)
fig.show()

Advanced Kalman Filter

from vitalDSP.advanced_computation.kalman_filter import KalmanFilter

# 1-D Kalman filter with identity transition / measurement matrices
kf = KalmanFilter(
    initial_state=np.array([[signal_col[0]]]),
    initial_covariance=np.array([[1.0]]),
    process_covariance=np.array([[1e-3]]),
    measurement_covariance=np.array([[0.1]])
)

H = np.array([[1.0]])  # measurement matrix
F = np.array([[1.0]])  # state transition matrix

kf_filtered = kf.filter(signal_col, measurement_matrix=H, transition_matrix=F)

plot_trace(signal_col, kf_filtered)

Sparse Signal Processing

from vitalDSP.advanced_computation.sparse_signal_processing import SparseSignalProcessing

ssp = SparseSignalProcessing(signal_col)

# Build a DCT basis for sparse representation
N = len(signal_col)
k = np.arange(N)
basis = np.cos(np.pi * k[:, None] * (2 * k[None, :] + 1) / (2 * N)) * np.sqrt(2 / N)
basis[0] *= 1 / np.sqrt(2)

sparse_repr = ssp.sparse_representation(basis)

# Threshold: keep only 5% of coefficients
threshold = np.percentile(np.abs(sparse_repr), 95)
sparse_thresh = ssp.thresholding(sparse_repr, threshold=threshold)

reconstructed = ssp.reconstruction(sparse_thresh, inverse_basis=basis.T)
print(f"Sparse reconstruction MSE (5% kept): {np.mean((signal_col - reconstructed)**2):.6f}")

plot_trace(signal_col, reconstructed)

Gaussian Process Regression

from vitalDSP.advanced_computation.bayesian_analysis import GaussianProcess

# Fit a GP to a short ECG segment as a smooth interpolator
n_train = 40
X_train = np.arange(n_train).reshape(-1, 1).astype(float)
y_train = signal_col[:n_train]

gp = GaussianProcess(length_scale=5.0, noise=0.05)
gp.update(X_train, y_train)

X_test = np.linspace(0, n_train - 1, 200).reshape(-1, 1)
mean_pred, var_pred = gp.predict(X_test)
std_pred = np.sqrt(var_pred)

fig = go.Figure()
fig.add_trace(go.Scatter(x=X_train.ravel(), y=y_train,
                         mode='markers', name='Training points'))
fig.add_trace(go.Scatter(x=X_test.ravel(), y=mean_pred,
                         mode='lines', name='GP mean prediction'))
fig.add_trace(go.Scatter(
    x=np.concatenate([X_test.ravel(), X_test.ravel()[::-1]]),
    y=np.concatenate([mean_pred + 2*std_pred, (mean_pred - 2*std_pred)[::-1]]),
    fill='toself', opacity=0.2, name='95% confidence'
))
fig.update_layout(title='Gaussian Process Regression on ECG Segment',
                  xaxis_title='Sample', yaxis_title='Amplitude')
fig.show()

Bayesian Optimization

from vitalDSP.advanced_computation.bayesian_analysis import BayesianOptimization

# Optimize a synthetic objective (e.g., finding best filter cutoff)
def signal_snr_objective(cutoff):
    from vitalDSP.filtering.signal_filtering import SignalFiltering
    filtered = SignalFiltering(signal_col).lowpass(cutoff=cutoff, fs=fs, order=4)
    noise = signal_col - filtered
    snr = 10 * np.log10(np.mean(filtered**2) / (np.mean(noise**2) + 1e-8))
    return snr

bo = BayesianOptimization(func=signal_snr_objective, bounds=(5.0, 45.0))
best_cutoff, best_snr = bo.optimize(n_iter=8)
print(f'Optimal lowpass cutoff: {float(best_cutoff):.2f} Hz (SNR={best_snr:.2f} dB)')

Generative Signal Synthesis

from vitalDSP.advanced_computation.generative_signal_synthesis import GenerativeSignalSynthesis

gss = GenerativeSignalSynthesis()

synth_ecg  = gss.generate(method='random_noise', length=512)
synth_ar   = gss.generate(method='autoregressive', length=512, coeffs=[0.9, -0.2])

print(f'Synthetic noise signal : {synth_ecg.shape}')
print(f'Synthetic sine signal  : {synth_ar.shape}')

fig = go.Figure()
fig.add_trace(go.Scatter(y=synth_ecg, mode='lines', name='Random noise synthesis'))
fig.add_trace(go.Scatter(y=synth_ar, mode='lines', name='Autoregressive synthesis'))
fig.update_layout(title='Generative Signal Synthesis',
                  xaxis_title='Sample', yaxis_title='Amplitude')
fig.show()

Multimodal Signal Fusion

from vitalDSP.advanced_computation.multimodal_fusion import MultimodalFusion
from vitalDSP.notebooks import load_sample_ppg

ppg_col, _ = load_sample_ppg()
ppg_col = np.array(ppg_col)
n = min(len(signal_col), len(ppg_col))

# Normalize both signals before fusion
ecg_norm = (signal_col[:n] - signal_col[:n].mean()) / (signal_col[:n].std() + 1e-8)
ppg_norm = (ppg_col[:n] - ppg_col[:n].mean()) / (ppg_col[:n].std() + 1e-8)

mf = MultimodalFusion(signals=[ecg_norm, ppg_norm])

fused_ws  = mf.fuse(strategy='weighted_sum', weights=[0.6, 0.4])
fused_pca = mf.fuse(strategy='pca', n_components=1)
fused_max = mf.fuse(strategy='maximum')

print(f'Weighted sum fusion : {fused_ws.shape}')
print(f'PCA fusion          : {fused_pca.shape}')
print(f'Maximum fusion      : {fused_max.shape}')

fig = go.Figure()
fig.add_trace(go.Scatter(y=ecg_norm[:512], mode='lines', name='ECG (norm)', opacity=0.5))
fig.add_trace(go.Scatter(y=ppg_norm[:512], mode='lines', name='PPG (norm)', opacity=0.5))
fig.add_trace(go.Scatter(y=fused_ws[:512], mode='lines', name='Weighted Sum (0.6/0.4)'))
fig.add_trace(go.Scatter(y=fused_pca[:512], mode='lines', name='PCA Fusion'))
fig.update_layout(title='Multimodal ECG-PPG Fusion',
                  xaxis_title='Sample', yaxis_title='Amplitude')
fig.show()

Neural Network Filtering

from vitalDSP.advanced_computation.neural_network_filtering import NeuralNetworkFiltering

# Train a feedforward network to learn the signal structure
# and apply it as a learned filter
nnf = NeuralNetworkFiltering(
    signal_col[:512],
    network_type='feedforward',
    hidden_layers=[32, 16],
    epochs=5,
    batch_size=32
)
nnf.train()
filtered_nn = nnf.apply_filter().ravel()

print(f'NN filtered signal shape: {filtered_nn.shape}')

fig = go.Figure()
fig.add_trace(go.Scatter(y=signal_col[1:512], mode='lines', name='Original ECG', opacity=0.5))
fig.add_trace(go.Scatter(y=filtered_nn, mode='lines', name='NN Filtered'))
fig.update_layout(title='Neural Network Filtering',
                  xaxis_title='Sample', yaxis_title='Amplitude')
fig.show()