Examples
This section provides practical examples and real-world use cases for VitalDSP. Each example demonstrates specific functionality and can be used as a starting point for your own projects.
Example Categories
- Clinical Research Examples
ECG analysis for cardiovascular research
PPG analysis for hemodynamic studies
Respiratory analysis for sleep studies
- Healthcare Monitoring Examples
Real-time vital signs monitoring
Continuous patient monitoring
Telemedicine applications
- Wearable Device Examples
Fitness tracker integration
Multi-signal synchronization
- Advanced Analysis
Multi-scale entropy for cardiac complexity
Comprehensive feature extraction (time, frequency, nonlinear)
Machine learning pipeline for signal classification
Example 1: ECG Analysis for Clinical Research
Comprehensive ECG analysis including bandpass filtering, signal quality assessment, waveform morphology, and HRV feature extraction.
import numpy as np
import pandas as pd
from vitalDSP.filtering.signal_filtering import SignalFiltering
from vitalDSP.physiological_features.waveform import WaveformMorphology
from vitalDSP.physiological_features.hrv_analysis import HRVFeatures
from vitalDSP.signal_quality_assessment.signal_quality_index import SignalQualityIndex
from vitalDSP.utils.data_processing.synthesize_data import generate_ecg_signal
def analyze_ecg_clinical_research(ecg_signal, fs, patient_id=None):
"""
Comprehensive ECG analysis for clinical research.
Parameters
----------
ecg_signal : numpy.ndarray
Raw ECG signal.
fs : float
Sampling frequency in Hz.
patient_id : str, optional
Patient identifier for logging.
Returns
-------
dict : Analysis results including quality metrics, heart rate, and HRV.
"""
results = {
'patient_id': patient_id,
'sampling_frequency': fs,
'signal_length': len(ecg_signal),
}
# 1. Bandpass filter (0.5–40 Hz for ECG)
sf = SignalFiltering(ecg_signal)
filtered_ecg = sf.bandpass(lowcut=0.5, highcut=40.0, fs=fs, order=4)
# 2. Signal Quality Assessment
sqi = SignalQualityIndex(filtered_ecg)
results['snr_sqi'] = sqi.snr_sqi(window_size=int(fs * 5), step_size=int(fs), aggregate=True)
results['baseline_sqi'] = sqi.baseline_wander_sqi(window_size=int(fs * 5), step_size=int(fs), aggregate=True)
# 3. Waveform Morphology — detects R-peaks, computes heart rate
wm = WaveformMorphology(filtered_ecg, fs=fs, signal_type='ECG')
heart_rate = wm.get_heart_rate(summary_type='mean')
results['heart_rate'] = heart_rate
results['heart_rate_status'] = (
'bradycardia' if heart_rate < 60 else
'tachycardia' if heart_rate > 100 else 'normal'
)
# 4. HRV analysis from RR intervals
# BeatToBeatAnalysis gives us cleaned RR intervals
from vitalDSP.physiological_features.beat_to_beat import BeatToBeatAnalysis
bb = BeatToBeatAnalysis(filtered_ecg, fs=fs, signal_type='ECG')
rr_intervals = bb.compute_rr_intervals() # ms, outlier-corrected
if len(rr_intervals) > 10:
hrv = HRVFeatures(signals=filtered_ecg, nn_intervals=rr_intervals, fs=fs)
hrv_features = hrv.compute_all_features()
results['hrv'] = hrv_features
sdnn = hrv_features.get('sdnn', 0)
results['hrv_status'] = (
'reduced' if sdnn < 30 else
'good' if sdnn > 50 else 'normal'
)
return results
# --- Usage ---
fs = 256
ecg_data = generate_ecg_signal(sfecg=fs, duration=60, hrmean=72, Anoise=0.05)
results = analyze_ecg_clinical_research(ecg_data, fs, patient_id='P001')
print(f"Patient : {results['patient_id']}")
print(f"Heart rate: {results['heart_rate']:.1f} BPM ({results['heart_rate_status']})")
print(f"SNR SQI : {results['snr_sqi']:.3f}")
if 'hrv_status' in results:
print(f"HRV status: {results['hrv_status']}")
Example 2: PPG Analysis for Hemodynamic Studies
Analyzes PPG signals for pulse rate, pulse amplitude, and signal quality — using real data from the vitalDSP sample loader.
import numpy as np
from vitalDSP.filtering.signal_filtering import SignalFiltering
from vitalDSP.physiological_features.waveform import WaveformMorphology
from vitalDSP.physiological_features.beat_to_beat import BeatToBeatAnalysis
from vitalDSP.signal_quality_assessment.signal_quality_index import SignalQualityIndex
from vitalDSP.notebooks import load_sample_ppg
def analyze_ppg_hemodynamic(ppg_signal, fs, patient_id=None):
"""
PPG analysis for hemodynamic characterization.
Parameters
----------
ppg_signal : numpy.ndarray
Raw PPG signal.
fs : float
Sampling frequency in Hz.
Returns
-------
dict : Pulse rate, variability, and signal quality metrics.
"""
results = {'patient_id': patient_id, 'sampling_frequency': fs}
# 1. Bandpass filter (0.5–8 Hz for PPG)
sf = SignalFiltering(ppg_signal)
filtered_ppg = sf.bandpass(lowcut=0.5, highcut=8.0, fs=fs, order=4)
# 2. Signal quality
sqi = SignalQualityIndex(filtered_ppg)
results['snr_sqi'] = sqi.snr_sqi(
window_size=int(fs * 5), step_size=int(fs), aggregate=True
)
# 3. Pulse rate via WaveformMorphology
wm = WaveformMorphology(filtered_ppg, fs=fs, signal_type='PPG')
pulse_rate = wm.get_heart_rate(summary_type='mean')
results['pulse_rate'] = pulse_rate
results['pulse_rate_status'] = (
'bradycardia' if pulse_rate < 60 else
'tachycardia' if pulse_rate > 100 else 'normal'
)
# 4. Beat-to-beat pulse interval variability
bb = BeatToBeatAnalysis(filtered_ppg, fs=fs, signal_type='PPG')
pi_intervals = bb.compute_rr_intervals()
if len(pi_intervals) > 5:
results['mean_pulse_interval'] = float(np.mean(pi_intervals))
results['std_pulse_interval'] = float(np.std(pi_intervals))
results['pulse_rate_variability'] = float(
np.std(pi_intervals) / np.mean(pi_intervals) * 100
)
# 5. Pulse amplitude variability
results['amplitude_sqi'] = sqi.amplitude_variability_sqi(
window_size=int(fs * 5), step_size=int(fs), aggregate=True
)
return results
# --- Usage ---
ppg_col, _ = load_sample_ppg()
ppg_col = np.array(ppg_col)
fs = 128
results = analyze_ppg_hemodynamic(ppg_col, fs=fs, patient_id='P001')
print(f"Pulse rate : {results['pulse_rate']:.1f} BPM ({results['pulse_rate_status']})")
print(f"SNR SQI : {results['snr_sqi']:.3f}")
print(f"Amplitude SQI: {results['amplitude_sqi']:.3f}")
if 'pulse_rate_variability' in results:
print(f"PRV (CV) : {results['pulse_rate_variability']:.2f} %")
Example 3: Real-Time Vital Signs Monitoring
A sliding-window monitoring system that continuously processes incoming signal chunks and logs vital signs.
import time
import threading
import numpy as np
from collections import deque
from vitalDSP.filtering.signal_filtering import SignalFiltering
from vitalDSP.physiological_features.waveform import WaveformMorphology
from vitalDSP.signal_quality_assessment.signal_quality_index import SignalQualityIndex
class RealTimeVitalSignsMonitor:
"""Sliding-window real-time vital signs monitor."""
def __init__(self, fs=128, window_size=10, update_interval=1):
self.fs = fs
self.window_samples = fs * window_size
self.update_interval = update_interval
self.signal_buffer = deque(maxlen=self.window_samples)
self.history = deque(maxlen=100)
self.current = {}
self.is_running = False
self.thresholds = {'hr_low': 50, 'hr_high': 120, 'sqi_low': 0.3}
def add_data(self, chunk):
"""Push a new signal chunk into the sliding buffer."""
self.signal_buffer.extend(chunk)
def start(self):
self.is_running = True
t = threading.Thread(target=self._loop, daemon=True)
t.start()
print("Monitoring started.")
def stop(self):
self.is_running = False
print("Monitoring stopped.")
def _loop(self):
while self.is_running:
if len(self.signal_buffer) >= self.window_samples:
window = np.array(list(self.signal_buffer))
vitals = self._process(window)
self.current = vitals
self.history.append(vitals)
self._check_alerts(vitals)
self._log(vitals)
time.sleep(self.update_interval)
def _process(self, signal):
vitals = {}
try:
sf = SignalFiltering(signal)
filtered = sf.bandpass(lowcut=0.5, highcut=40.0, fs=self.fs, order=4)
sqi = SignalQualityIndex(filtered)
vitals['sqi'] = sqi.snr_sqi(
window_size=int(self.fs * 5), step_size=int(self.fs), aggregate=True
)
wm = WaveformMorphology(filtered, fs=self.fs, signal_type='ECG')
vitals['heart_rate'] = wm.get_heart_rate(summary_type='mean')
except Exception as e:
vitals['error'] = str(e)
return vitals
def _check_alerts(self, vitals):
hr = vitals.get('heart_rate', None)
if hr is not None:
if hr < self.thresholds['hr_low']:
print(f"[ALERT] Low HR: {hr:.1f} BPM")
elif hr > self.thresholds['hr_high']:
print(f"[ALERT] High HR: {hr:.1f} BPM")
if vitals.get('sqi', 1.0) < self.thresholds['sqi_low']:
print(f"[ALERT] Poor signal quality: SQI={vitals['sqi']:.2f}")
def _log(self, vitals):
hr = vitals.get('heart_rate', float('nan'))
sqi = vitals.get('sqi', float('nan'))
print(f"HR={hr:.1f} BPM SQI={sqi:.3f}")
def get_stats(self):
hrs = [v['heart_rate'] for v in self.history if 'heart_rate' in v]
if not hrs:
return {}
return {'mean_hr': np.mean(hrs), 'std_hr': np.std(hrs),
'min_hr': np.min(hrs), 'max_hr': np.max(hrs)}
# --- Usage ---
from vitalDSP.utils.data_processing.synthesize_data import generate_ecg_signal
fs = 128
monitor = RealTimeVitalSignsMonitor(fs=fs, window_size=10, update_interval=1)
monitor.start()
ecg_stream = generate_ecg_signal(sfecg=fs, duration=15, hrmean=72, Anoise=0.05)
chunk_size = fs # 1 second per push
for start in range(0, len(ecg_stream), chunk_size):
monitor.add_data(ecg_stream[start:start + chunk_size])
time.sleep(0.1) # simulate real-time
monitor.stop()
stats = monitor.get_stats()
print(f"Session mean HR: {stats.get('mean_hr', float('nan')):.1f} BPM")
Example 4: Wearable Device Integration
Processes data from wearable devices (fitness tracker, smartwatch, chest strap) with device-specific filter settings.
import numpy as np
from datetime import datetime
from vitalDSP.filtering.signal_filtering import SignalFiltering
from vitalDSP.physiological_features.waveform import WaveformMorphology
from vitalDSP.signal_quality_assessment.signal_quality_index import SignalQualityIndex
DEVICE_CONFIGS = {
'fitness_tracker': {'fs': 100, 'signal': 'PPG', 'lowcut': 0.5, 'highcut': 8.0},
'smartwatch': {'fs': 200, 'signal': 'PPG', 'lowcut': 0.5, 'highcut': 8.0},
'chest_strap': {'fs': 256, 'signal': 'ECG', 'lowcut': 0.5, 'highcut': 40.0},
}
def process_wearable_data(raw_signal, device_type='fitness_tracker'):
"""
Process a wearable signal chunk and return vital signs.
Parameters
----------
raw_signal : numpy.ndarray
Raw signal from the wearable sensor.
device_type : str
One of 'fitness_tracker', 'smartwatch', 'chest_strap'.
Returns
-------
dict : Heart rate, signal quality, device metadata, timestamp.
"""
cfg = DEVICE_CONFIGS.get(device_type, DEVICE_CONFIGS['fitness_tracker'])
fs = cfg['fs']
sf = SignalFiltering(raw_signal)
filtered = sf.bandpass(lowcut=cfg['lowcut'], highcut=cfg['highcut'], fs=fs, order=2)
result = {
'device_type': device_type,
'signal_type': cfg['signal'],
'timestamp': datetime.now().isoformat(),
}
# Signal quality
sqi = SignalQualityIndex(filtered)
result['sqi'] = sqi.snr_sqi(
window_size=min(int(fs * 5), len(filtered) - 1),
step_size=int(fs),
aggregate=True
)
# Heart / pulse rate
wm = WaveformMorphology(filtered, fs=fs, signal_type=cfg['signal'])
result['heart_rate'] = wm.get_heart_rate(summary_type='mean')
return result
# --- Usage ---
np.random.seed(42)
for device in ['fitness_tracker', 'smartwatch', 'chest_strap']:
cfg = DEVICE_CONFIGS[device]
t = np.arange(2000) / cfg['fs']
sim_signal = np.sin(2 * np.pi * 1.2 * t) + 0.1 * np.random.randn(len(t))
r = process_wearable_data(sim_signal, device_type=device)
print(f"{device}: HR={r['heart_rate']:.1f} BPM SQI={r['sqi']:.3f}")
Example 5: Advanced Multi-Scale Entropy Analysis
Quantifies cardiac signal complexity using Multi-Scale Entropy (MSE), Composite MSE, and Refined Composite MSE.
import numpy as np
from vitalDSP.physiological_features.advanced_entropy import MultiScaleEntropy
from vitalDSP.physiological_features.beat_to_beat import BeatToBeatAnalysis
from vitalDSP.filtering.signal_filtering import SignalFiltering
from vitalDSP.utils.data_processing.synthesize_data import generate_ecg_signal
def analyze_cardiac_complexity(ecg_signal, fs, patient_id=None):
"""
Multi-scale entropy analysis of cardiac dynamics.
Parameters
----------
ecg_signal : numpy.ndarray
ECG signal.
fs : float
Sampling frequency in Hz.
Returns
-------
dict : MSE, CMSE, RCMSE values and complexity indices.
"""
results = {'patient_id': patient_id, 'fs': fs}
# Preprocess
sf = SignalFiltering(ecg_signal)
filtered = sf.bandpass(lowcut=0.5, highcut=40.0, fs=fs, order=4)
# Extract RR intervals
bb = BeatToBeatAnalysis(filtered, fs=fs, signal_type='ECG')
rr = bb.compute_rr_intervals()
if len(rr) < 50:
results['error'] = f'Only {len(rr)} RR intervals — need ≥50 for MSE.'
return results
# Multi-Scale Entropy
mse_analyzer = MultiScaleEntropy(signal=rr, max_scale=20, m=2, r=0.15)
mse_values = mse_analyzer.compute_mse()
cmse_values = mse_analyzer.compute_cmse()
rcmse_values = mse_analyzer.compute_rcmse()
results['mse'] = mse_values.tolist()
results['cmse'] = cmse_values.tolist()
results['rcmse'] = rcmse_values.tolist()
results['complexity_index'] = float(mse_analyzer.get_complexity_index(mse_values))
results['cmse_ci'] = float(mse_analyzer.get_complexity_index(cmse_values))
results['rcmse_ci'] = float(mse_analyzer.get_complexity_index(rcmse_values))
# Clinical interpretation
cmse_ci = results['cmse_ci']
results['interpretation'] = (
'reduced complexity — possible autonomic dysfunction' if cmse_ci < 30 else
'excellent complexity — healthy autonomic function' if cmse_ci > 60 else
'normal complexity'
)
return results
# --- Usage ---
fs = 256
ecg_data = generate_ecg_signal(sfecg=fs, duration=120, hrmean=72, Anoise=0.03)
results = analyze_cardiac_complexity(ecg_data, fs=fs, patient_id='P001')
if 'error' not in results:
print(f"Patient : {results['patient_id']}")
print(f"MSE CI : {results['complexity_index']:.2f}")
print(f"CMSE CI : {results['cmse_ci']:.2f}")
print(f"RCMSE CI : {results['rcmse_ci']:.2f}")
print(f"Interpretation : {results['interpretation']}")
else:
print(results['error'])
Example 6: Comprehensive Health Monitoring System
Multi-modal vital signs assessment (ECG + PPG + respiratory) with automated health scoring and clinical recommendations.
import numpy as np
import pandas as pd
from vitalDSP.filtering.signal_filtering import SignalFiltering
from vitalDSP.physiological_features.waveform import WaveformMorphology
from vitalDSP.physiological_features.hrv_analysis import HRVFeatures
from vitalDSP.physiological_features.beat_to_beat import BeatToBeatAnalysis
from vitalDSP.respiratory_analysis.respiratory_analysis import RespiratoryAnalysis
from vitalDSP.utils.data_processing.synthesize_data import generate_ecg_signal
class ComprehensiveHealthMonitor:
"""Multi-modal health monitoring with automated assessment."""
def __init__(self, patient_id, fs=256):
self.patient_id = patient_id
self.fs = fs
def process_vital_signs(self, ecg_signal, ppg_signal=None, resp_signal=None):
assessment = {
'patient_id': self.patient_id,
'timestamp': pd.Timestamp.now().isoformat(),
'vital_signs': {},
}
if ecg_signal is not None:
assessment['vital_signs']['ecg'] = self._process_ecg(ecg_signal)
if ppg_signal is not None:
assessment['vital_signs']['ppg'] = self._process_ppg(ppg_signal)
if resp_signal is not None:
assessment['vital_signs']['respiratory'] = self._process_resp(resp_signal)
assessment['health_score'] = self._health_score(assessment['vital_signs'])
assessment['recommendations'] = self._recommendations(
assessment['vital_signs'], assessment['health_score']
)
return assessment
def _process_ecg(self, signal):
sf = SignalFiltering(signal)
filtered = sf.bandpass(lowcut=0.5, highcut=40.0, fs=self.fs, order=4)
wm = WaveformMorphology(filtered, fs=self.fs, signal_type='ECG')
hr = wm.get_heart_rate(summary_type='mean')
bb = BeatToBeatAnalysis(filtered, fs=self.fs, signal_type='ECG')
rr = bb.compute_rr_intervals()
res = {'heart_rate': float(hr),
'heart_rate_status': self._hr_status(hr)}
if len(rr) > 10:
hrv = HRVFeatures(signals=filtered, nn_intervals=rr, fs=self.fs)
hrv_feats = hrv.compute_all_features()
res['hrv'] = hrv_feats
sdnn = hrv_feats.get('sdnn', 0)
res['hrv_status'] = ('reduced' if sdnn < 30 else
'good' if sdnn > 50 else 'normal')
return res
def _process_ppg(self, signal):
sf = SignalFiltering(signal)
filtered = sf.bandpass(lowcut=0.5, highcut=8.0, fs=self.fs, order=4)
wm = WaveformMorphology(filtered, fs=self.fs, signal_type='PPG')
pr = wm.get_heart_rate(summary_type='mean')
bb = BeatToBeatAnalysis(filtered, fs=self.fs, signal_type='PPG')
pi = bb.compute_rr_intervals()
res = {'pulse_rate': float(pr), 'pulse_rate_status': self._hr_status(pr)}
if len(pi) > 3:
res['prv'] = float(np.std(pi))
return res
def _process_resp(self, signal):
try:
ra = RespiratoryAnalysis(signal, fs=self.fs)
rr = ra.compute_respiratory_rate(method='fft')
return {
'respiratory_rate': float(rr),
'status': ('bradypnea' if rr < 12 else
'tachypnea' if rr > 20 else 'normal'),
}
except Exception as e:
return {'error': str(e)}
def _hr_status(self, hr):
return 'bradycardia' if hr < 60 else 'tachycardia' if hr > 100 else 'normal'
def _health_score(self, vitals):
score = 100
ecg = vitals.get('ecg', {})
if ecg.get('heart_rate_status') != 'normal':
score -= 15
if ecg.get('hrv_status') == 'reduced':
score -= 20
if vitals.get('ppg', {}).get('pulse_rate_status') != 'normal':
score -= 10
if vitals.get('respiratory', {}).get('status') not in ('normal', None):
score -= 15
return max(0, min(100, score))
def _recommendations(self, vitals, score):
recs = []
if score < 60:
recs.append('URGENT: immediate clinical evaluation recommended')
elif score < 75:
recs.append('Clinical review recommended within 24 hours')
ecg = vitals.get('ecg', {})
if ecg.get('hrv_status') == 'reduced':
recs.append('Reduced HRV — consider autonomic assessment')
if ecg.get('heart_rate_status') == 'bradycardia':
recs.append('Bradycardia detected — evaluate underlying cause')
if not recs:
recs.append('All vital signs within normal ranges')
return recs
# --- Usage ---
fs = 256
ecg = generate_ecg_signal(sfecg=fs, duration=60, hrmean=75, Anoise=0.05)
monitor = ComprehensiveHealthMonitor(patient_id='P001', fs=fs)
assessment = monitor.process_vital_signs(ecg_signal=ecg)
print(f"Patient : {assessment['patient_id']}")
print(f"Health Score : {assessment['health_score']}/100")
ecg_vs = assessment['vital_signs']['ecg']
print(f"Heart Rate : {ecg_vs['heart_rate']:.1f} BPM ({ecg_vs['heart_rate_status']})")
if 'hrv_status' in ecg_vs:
print(f"HRV Status : {ecg_vs['hrv_status']}")
print("Recommendations:")
for r in assessment['recommendations']:
print(f" • {r}")
Example 7: Cross-Signal Synchronization Analysis
Analyzes directional coupling between cardiac and respiratory signals using Transfer Entropy and cross-correlation.
import numpy as np
from scipy import signal as sp_signal
from vitalDSP.filtering.signal_filtering import SignalFiltering
from vitalDSP.physiological_features.beat_to_beat import BeatToBeatAnalysis
from vitalDSP.physiological_features.transfer_entropy import TransferEntropy
from vitalDSP.utils.data_processing.synthesize_data import generate_ecg_signal
def analyze_cardiorespiratory_coupling(ecg_signal, resp_signal, fs):
"""
Directional coupling between cardiac and respiratory signals.
Parameters
----------
ecg_signal : numpy.ndarray
ECG signal.
resp_signal : numpy.ndarray
Respiratory signal (same length, same fs).
fs : float
Sampling frequency.
Returns
-------
dict : Transfer entropy, cross-correlation, and clinical interpretation.
"""
results = {'fs': fs}
# Filter ECG
sf_ecg = SignalFiltering(ecg_signal)
filtered_ecg = sf_ecg.bandpass(lowcut=0.5, highcut=40.0, fs=fs, order=4)
# Filter respiratory signal to the respiratory band
sf_resp = SignalFiltering(resp_signal)
filtered_resp = sf_resp.bandpass(lowcut=0.1, highcut=0.5, fs=fs, order=4)
# Extract RR intervals
bb = BeatToBeatAnalysis(filtered_ecg, fs=fs, signal_type='ECG')
rr = bb.compute_rr_intervals()
if len(rr) < 50:
return {'error': f'Need ≥50 RR intervals, got {len(rr)}'}
# Resample respiratory signal to length of RR series
resp_resampled = sp_signal.resample(filtered_resp, len(rr))
# Transfer Entropy: respiratory → cardiac
te_r2c = TransferEntropy(
source=resp_resampled, target=rr,
k_coef=1, l_coef=1, delay=1
).compute_transfer_entropy()
# Transfer Entropy: cardiac → respiratory
te_c2r = TransferEntropy(
source=rr, target=resp_resampled,
k_coef=1, l_coef=1, delay=1
).compute_transfer_entropy()
results['te_resp_to_cardiac'] = float(te_r2c)
results['te_cardiac_to_resp'] = float(te_c2r)
results['net_directionality'] = float(te_r2c - te_c2r)
# Normalized cross-correlation
x = rr - np.mean(rr)
y = resp_resampled - np.mean(resp_resampled)
denom = len(rr) * (np.std(x) * np.std(y) + 1e-9)
xcorr = np.correlate(x, y, mode='full') / denom
idx = np.argmax(np.abs(xcorr))
results['max_correlation'] = float(xcorr[idx])
results['optimal_lag'] = int(idx - len(rr) + 1)
# Interpretation
avg_te = (te_r2c + te_c2r) / 2
results['coupling_strength'] = ('weak' if avg_te < 0.05 else
'strong' if avg_te > 0.2 else 'moderate')
if abs(results['net_directionality']) > 0.1:
results['dominant_direction'] = (
'respiratory→cardiac' if results['net_directionality'] > 0
else 'cardiac→respiratory'
)
else:
results['dominant_direction'] = 'balanced'
return results
# --- Usage ---
fs = 256
ecg = generate_ecg_signal(sfecg=fs, duration=120, hrmean=72, Anoise=0.03)
t = np.arange(len(ecg)) / fs
resp = 0.5 * np.sin(2 * np.pi * 0.25 * t) + 0.05 * np.random.randn(len(t))
results = analyze_cardiorespiratory_coupling(ecg, resp, fs)
if 'error' not in results:
print(f"TE (Resp→Cardiac): {results['te_resp_to_cardiac']:.4f}")
print(f"TE (Cardiac→Resp): {results['te_cardiac_to_resp']:.4f}")
print(f"Net directionality: {results['net_directionality']:.4f}")
print(f"Coupling strength : {results['coupling_strength']}")
print(f"Dominant direction: {results['dominant_direction']}")
print(f"Max cross-corr : {results['max_correlation']:.3f} (lag={results['optimal_lag']})")
Example 8: Comprehensive Physiological Feature Extraction
Extracts time-domain, frequency-domain, nonlinear, and energy features from a single ECG recording.
import numpy as np
import pandas as pd
from vitalDSP.filtering.signal_filtering import SignalFiltering
from vitalDSP.physiological_features.time_domain import TimeDomainFeatures
from vitalDSP.physiological_features.frequency_domain import FrequencyDomainFeatures
from vitalDSP.physiological_features.nonlinear import NonlinearFeatures
from vitalDSP.physiological_features.beat_to_beat import BeatToBeatAnalysis
from vitalDSP.physiological_features.energy_analysis import EnergyAnalysis
from vitalDSP.utils.data_processing.synthesize_data import generate_ecg_signal
def extract_comprehensive_features(ecg_signal, fs, patient_id=None):
"""
Full-spectrum feature extraction: time, frequency, nonlinear, energy.
Parameters
----------
ecg_signal : numpy.ndarray
ECG signal.
fs : float
Sampling frequency in Hz.
Returns
-------
dict : Feature dictionary with clinical interpretation.
"""
results = {'patient_id': patient_id, 'features': {}, 'interpretation': {}}
# Preprocessing
sf = SignalFiltering(ecg_signal)
filtered = sf.bandpass(lowcut=0.5, highcut=40.0, fs=fs, order=4)
# RR intervals
bb = BeatToBeatAnalysis(filtered, fs=fs, signal_type='ECG')
rr = bb.compute_rr_intervals()
if len(rr) < 5:
results['error'] = 'Insufficient RR intervals'
return results
# 1. TIME DOMAIN
td = TimeDomainFeatures(rr)
results['features'].update({
'mean_rr' : td.compute_mean_nn(),
'sdnn' : td.compute_sdnn(),
'rmssd' : td.compute_rmssd(),
'sdsd' : td.compute_sdsd(),
'nn50' : td.compute_nn50(),
'pnn50' : td.compute_pnn50(),
'pnn20' : td.compute_pnn20(),
'mean_hr' : 60000 / max(td.compute_mean_nn(), 1),
})
# 2. FREQUENCY DOMAIN (nn_intervals as input, fs=4 Hz = typical RR series rate)
fd = FrequencyDomainFeatures(rr, fs=4)
psd = fd.compute_psd()
lf = psd['lf_power']
hf = psd['hf_power']
results['features'].update({
'vlf_power' : psd['vlf_power'],
'lf_power' : lf,
'hf_power' : hf,
'total_power' : psd['total_power'],
'lf_hf_ratio' : psd['lf_hf_ratio'],
'lf_nu' : lf / (lf + hf + 1e-9) * 100,
'hf_nu' : hf / (lf + hf + 1e-9) * 100,
})
# 3. NONLINEAR
nl = NonlinearFeatures(rr)
poincare = nl.compute_poincare_features()
results['features'].update({
'sd1' : poincare['sd1'],
'sd2' : poincare['sd2'],
'sd_ratio' : poincare['sd_ratio'],
'sample_entropy' : nl.compute_sample_entropy(m=2, r=0.2),
'approx_entropy' : nl.compute_approximate_entropy(m=2, r=0.2),
'dfa_alpha' : nl.compute_dfa(),
})
# 4. ENERGY (on the filtered ECG signal)
ea = EnergyAnalysis(filtered, fs=fs)
results['features'].update({
'total_energy' : ea.compute_total_energy(),
'band_energy_lf' : ea.compute_band_energy(low_freq=0.5, high_freq=5.0),
'spectral_energy' : ea.compute_spectral_energy(),
})
# 5. INTERPRETATION
feats = results['features']
sdnn = feats.get('sdnn', 0)
lf_hf = feats.get('lf_hf_ratio', 1.0)
samp_ent = feats.get('sample_entropy', 1.0)
interp = {}
interp['hrv_status'] = ('reduced' if sdnn < 50 else 'good' if sdnn > 100 else 'normal')
interp['autonomic_balance'] = ('parasympathetic' if lf_hf < 0.5 else
'sympathetic' if lf_hf > 2.0 else 'balanced')
risk = sum([sdnn < 50, feats.get('rmssd', 50) < 20,
lf_hf > 2.5, samp_ent < 1.0])
interp['risk_level'] = ('high' if risk >= 3 else 'moderate' if risk >= 1 else 'low')
interp['cardiac_health'] = ('concerning' if risk >= 3 else 'fair' if risk >= 1 else 'healthy')
results['interpretation'] = interp
return results
# --- Usage ---
fs = 256
ecg = generate_ecg_signal(sfecg=fs, duration=120, hrmean=75, Anoise=0.05)
res = extract_comprehensive_features(ecg, fs=fs, patient_id='P001')
if 'error' not in res:
f = res['features']
print(f"=== Time Domain ===")
print(f"Mean RR : {f['mean_rr']:.1f} ms Mean HR : {f['mean_hr']:.1f} BPM")
print(f"SDNN : {f['sdnn']:.1f} ms RMSSD : {f['rmssd']:.1f} ms")
print(f"pNN50 : {f['pnn50']:.2f} %")
print(f"\n=== Frequency Domain ===")
print(f"LF : {f['lf_power']:.2f} HF : {f['hf_power']:.2f}")
print(f"LF/HF : {f['lf_hf_ratio']:.2f}")
print(f"\n=== Nonlinear ===")
print(f"SD1 : {f['sd1']:.2f} SD2 : {f['sd2']:.2f}")
print(f"SampEn : {f['sample_entropy']:.3f} DFA α: {f['dfa_alpha']:.3f}")
i = res['interpretation']
print(f"\n=== Interpretation ===")
print(f"HRV : {i['hrv_status']} ANS : {i['autonomic_balance']}")
print(f"Risk : {i['risk_level']} Health: {i['cardiac_health']}")
else:
print(res['error'])
# Export features to CSV
features_df = pd.DataFrame([res['features']])
features_df.to_csv(f"features_{res['patient_id']}.csv", index=False)
Example 9: Machine Learning for Physiological Signal Analysis
Anomaly detection, feature extraction with FeatureExtractor, and CNN1D classification on synthetic ECG signals.
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from vitalDSP.advanced_computation.anomaly_detection import AnomalyDetection
from vitalDSP.ml_models.feature_extractor import FeatureExtractor
from vitalDSP.ml_models.deep_models import CNN1D
def ml_signal_analysis_pipeline(clean_signals, noisy_signals, fs, anomaly_threshold=2.5):
"""
ML pipeline: anomaly detection → feature extraction → CNN1D classification.
Parameters
----------
clean_signals : list of numpy.ndarray
Reference (normal) signals for training.
noisy_signals : list of numpy.ndarray
Signals to screen for anomalies and classify.
fs : float
Sampling frequency.
anomaly_threshold : float
Z-score threshold for anomaly flagging.
Returns
-------
dict : Anomaly counts, extracted features, and CNN1D training history.
"""
results = {'anomaly': {}, 'classification': {}}
sig_len = min(len(s) for s in clean_signals)
# 1. ANOMALY DETECTION
anomaly_info = []
for i, sig in enumerate(noisy_signals):
det = AnomalyDetection(sig)
z_anomalies = det.detect_anomalies(method='z_score', threshold=anomaly_threshold)
iqr_anomalies = det.detect_anomalies(method='iqr')
anomaly_info.append({
'signal_index' : i,
'z_score_anomalies' : len(z_anomalies),
'iqr_anomalies' : len(iqr_anomalies),
'quality' : 'good' if len(z_anomalies) < 10 else 'poor',
})
results['anomaly']['per_signal'] = anomaly_info
results['anomaly']['good'] = sum(1 for a in anomaly_info if a['quality'] == 'good')
results['anomaly']['poor'] = sum(1 for a in anomaly_info if a['quality'] == 'poor')
print(f"Good-quality signals: {results['anomaly']['good']}/{len(noisy_signals)}")
# 2. FEATURE EXTRACTION (sklearn-compatible transformer)
X_clean = np.array([s[:sig_len] for s in clean_signals])
extractor = FeatureExtractor(signal_type='ecg', sampling_rate=float(fs))
X_features = extractor.fit_transform(X_clean)
results['classification']['n_features'] = X_features.shape[1]
print(f"Features extracted : {X_features.shape[1]}")
# 3. CNN1D CLASSIFICATION
scaler = StandardScaler()
X_norm = scaler.fit_transform(X_clean)
X_3d = X_norm.reshape(X_norm.shape[0], sig_len, 1)
# Synthetic binary labels: high-noise vs low-noise
noise_std = np.array([np.std(n[:sig_len] - c[:sig_len])
for n, c in zip(noisy_signals, clean_signals)])
labels = (noise_std > np.median(noise_std)).astype(int)
X_train, X_test, y_train, y_test = train_test_split(
X_3d, labels, test_size=0.2, random_state=42
)
cnn = CNN1D(
input_shape=(sig_len, 1),
n_classes=2,
n_filters=[32, 64],
kernel_sizes=[3],
pool_sizes=[2],
)
history = cnn.train(X_train, y_train, X_val=X_test, y_val=y_test,
epochs=10, batch_size=16)
results['classification']['history'] = history
results['classification']['n_train'] = len(X_train)
results['classification']['n_test'] = len(X_test)
print(f"CNN1D trained: {len(X_train)} train / {len(X_test)} test samples")
return results
# --- Usage ---
np.random.seed(42)
n_signals = 30
fs = 128
sig_len = 640 # 5 seconds
clean = [np.sin(2 * np.pi * 1.2 * np.arange(sig_len) / fs) +
0.05 * np.random.randn(sig_len) for _ in range(n_signals)]
noisy = [c + np.random.normal(0, np.random.uniform(0.05, 0.3), sig_len)
for c in clean]
results = ml_signal_analysis_pipeline(clean, noisy, fs=fs)
print(f"Features: {results['classification']['n_features']}")
print(f"Anomaly good/poor: {results['anomaly']['good']}/{results['anomaly']['poor']}")
These examples provide a solid foundation for implementing VitalDSP in real-world scenarios. Refer to the sample_notebooks section for interactive Jupyter notebook versions.