Tutorials
This section provides comprehensive, step-by-step tutorials to help you master VitalDSP. Each tutorial is designed to build upon previous knowledge and provide practical, hands-on experience with the library.
Tutorial Overview
Our tutorials are organized by complexity and use case:
- Beginner Tutorials
Basic signal processing and filtering
Introduction to physiological feature extraction
Getting started with the web application
- Intermediate Tutorials
Advanced filtering techniques
Heart rate variability analysis
Respiratory signal analysis
- Advanced Tutorials
Machine learning integration
Custom analysis pipelines
Performance optimization
- Specialized Tutorials
Clinical research applications
Wearable device integration
Real-time monitoring systems
Tutorial 1: Basic Signal Processing
Learn the fundamentals of signal processing with VitalDSP.
Prerequisites: * Python 3.8 or higher * Basic understanding of signal processing concepts * VitalDSP installed (see Getting started)
Learning Objectives: * Load and visualize physiological signals * Apply basic filtering techniques * Extract fundamental features * Assess signal quality
Step 1: Installation and Setup
# Install VitalDSP
pip install vital-DSP
# Import required modules
import numpy as np
import matplotlib.pyplot as plt
from vitalDSP.filtering.signal_filtering import SignalFiltering
from vitalDSP.feature_engineering.morphology_features import PhysiologicalFeatureExtractor
from vitalDSP.signal_quality_assessment.signal_quality_index import SignalQualityIndex
Step 2: Load Sample Data
# Generate sample ECG signal
from vitalDSP.utils.data_processing.synthesize_data import generate_ecg_signal
# Parameters
fs = 256 # Sampling frequency (Hz) - using default for compatibility
duration = 10 # Duration (seconds)
heart_rate = 72 # Heart rate (BPM)
# Generate synthetic ECG
ecg_signal = generate_ecg_signal(
sfecg=fs,
duration=duration,
hrmean=heart_rate,
Anoise=0.1 # Add some noise
)
# Create time vector
time = np.linspace(0, duration, len(ecg_signal))
Step 3: Signal Filtering
# Initialize signal filtering
sf = SignalFiltering(ecg_signal)
# Apply bandpass filter (0.5-40 Hz for ECG)
filtered_signal = sf.bandpass(
lowcut=0.5,
highcut=40.0,
fs=fs,
order=4
)
# Visualize results
plt.figure(figsize=(12, 8))
plt.subplot(2, 1, 1)
plt.plot(time, ecg_signal)
plt.title('Original ECG Signal')
plt.xlabel('Time (s)')
plt.ylabel('Amplitude')
plt.subplot(2, 1, 2)
plt.plot(time, filtered_signal)
plt.title('Filtered ECG Signal')
plt.xlabel('Time (s)')
plt.ylabel('Amplitude')
plt.tight_layout()
plt.show()
Step 4: Feature Extraction
# Extract physiological features
extractor = PhysiologicalFeatureExtractor(filtered_signal, fs=fs)
features = extractor.extract_features(signal_type="ECG")
# Display key features
print("Physiological Features:")
print(f"QRS Duration: {features.get('qrs_duration', 'N/A'):.4f}")
print(f"Heart Rate: {features.get('heart_rate', 'N/A'):.2f} BPM")
print(f"QRS Amplitude: {features.get('qrs_amplitude', 'N/A'):.4f}")
print(f"Signal Skewness: {features.get('signal_skewness', 'N/A'):.4f}")
Step 5: Signal Quality Assessment
# Assess signal quality
sqi = SignalQualityIndex(filtered_signal)
# Calculate various quality indices
amplitude_sqi, _, _ = sqi.amplitude_variability_sqi(
window_size=int(fs*5), # 5-second windows
step_size=int(fs*1), # 1-second steps
threshold=2,
aggregate=False
)
print(f"Signal Quality Index: {np.mean(amplitude_sqi):.4f}")
Exercise: Try It Yourself
Modify the heart rate and noise level in the synthetic signal generation
Experiment with different filter parameters
Extract additional features and compare results
Assess how signal quality changes with different noise levels
Tutorial 2: Heart Rate Variability Analysis
Learn to perform comprehensive HRV analysis using VitalDSP.
Prerequisites: * Completion of Tutorial 1 * Understanding of heart rate variability concepts * Basic knowledge of frequency domain analysis
Learning Objectives: * Extract R-peaks from ECG signals * Calculate RR intervals * Perform time-domain HRV analysis * Perform frequency-domain HRV analysis * Interpret HRV results clinically
Step 1: R-Peak Detection
from vitalDSP.physiological_features.waveform import WaveformMorphology
from vitalDSP.utils.data_processing.synthesize_data import generate_ecg_signal
from vitalDSP.filtering.signal_filtering import SignalFiltering
import numpy as np
# Generate and filter ECG signal first
fs = 256 # Sampling frequency
ecg_signal = generate_ecg_signal(sfecg=fs, duration=10, hrmean=72, Anoise=0.1)
sf = SignalFiltering(ecg_signal)
filtered_signal = sf.bandpass(lowcut=0.5, highcut=40.0, fs=fs, order=4)
# Initialize waveform morphology for ECG
wm = WaveformMorphology(filtered_signal, fs=fs, signal_type="ECG")
# Detect R-peaks
r_peaks = wm.r_peaks
r_peak_times = r_peaks / fs # Convert to seconds
print(f"Detected {len(r_peaks)} R-peaks")
print(f"Average heart rate: {60 * fs / np.mean(np.diff(r_peaks)):.1f} BPM")
Step 2: RR Interval Calculation
# Calculate RR intervals
rr_intervals = np.diff(r_peaks) / fs * 1000 # Convert to milliseconds
# Remove outliers (RR intervals outside physiological range)
valid_rr = rr_intervals[(rr_intervals > 300) & (rr_intervals < 2000)]
print(f"Valid RR intervals: {len(valid_rr)}")
print(f"RR interval range: {valid_rr.min():.1f} - {valid_rr.max():.1f} ms")
Step 3: Time-Domain HRV Analysis
from vitalDSP.physiological_features.hrv_analysis import HRVFeatures
# Initialize HRV analysis
hrv = HRVFeatures(valid_rr)
# Extract time-domain features
time_domain_features = hrv.time_domain_features()
print("Time-Domain HRV Features:")
print(f"SDNN: {time_domain_features['sdnn']:.2f} ms")
print(f"RMSSD: {time_domain_features['rmssd']:.2f} ms")
print(f"pNN50: {time_domain_features['pnn50']:.2f} %")
Step 4: Frequency-Domain HRV Analysis
# Extract frequency-domain features
freq_domain_features = hrv.frequency_domain_features()
print("Frequency-Domain HRV Features:")
print(f"LF Power: {freq_domain_features['lf_power']:.2f} ms²")
print(f"HF Power: {freq_domain_features['hf_power']:.2f} ms²")
print(f"LF/HF Ratio: {freq_domain_features['lf_hf_ratio']:.2f}")
Step 5: Comprehensive HRV Analysis
# Perform comprehensive HRV analysis
comprehensive_hrv = hrv.compute_all_features()
# Display all features
print("Comprehensive HRV Analysis:")
for feature, value in comprehensive_hrv.items():
if isinstance(value, (int, float)):
print(f"{feature}: {value:.4f}")
Exercise: Clinical Interpretation
Compare HRV values with clinical norms
Analyze how different signal quality affects HRV metrics
Investigate the relationship between heart rate and HRV
Create visualizations of HRV trends over time
Tutorial 3: Respiratory Signal Analysis
Learn to analyze respiratory signals and estimate respiratory rate.
Prerequisites: * Completion of Tutorial 1 * Understanding of respiratory physiology * Basic knowledge of signal processing
Learning Objectives: * Load and preprocess respiratory signals * Apply respiratory-specific filtering * Estimate respiratory rate using multiple methods * Analyze breathing patterns * Detect respiratory events
Step 1: Load Respiratory Data
from vitalDSP.respiratory_analysis.respiratory_analysis import RespiratoryAnalysis
import numpy as np
# Generate sample respiratory signal
from vitalDSP.utils.data_processing.synthesize_data import generate_resp_signal
# Parameters
fs = 100 # Sampling frequency (Hz)
duration = 60 # Duration (seconds)
resp_rate_bpm = 16 # Respiratory rate (breaths per minute)
freq_hz = resp_rate_bpm / 60 # Convert to Hz
# Generate synthetic respiratory signal
resp_signal = generate_resp_signal(
sampling_rate=fs,
duration=duration,
frequency=freq_hz,
amplitude=0.5
)
# Add noise manually
resp_signal = resp_signal + np.random.randn(len(resp_signal)) * 0.05
# Create time vector
time = np.linspace(0, duration, len(resp_signal))
Step 2: Respiratory Signal Preprocessing
# Initialize respiratory analysis
resp_analysis = RespiratoryAnalysis(resp_signal, fs)
# Apply respiratory-specific filtering
filtered_resp = resp_analysis.preprocess_signal(
detrend=True,
normalize=True,
filter_type='bandpass',
low_freq=0.1, # 0.1 Hz (6 breaths/min)
high_freq=0.5 # 0.5 Hz (30 breaths/min)
)
Step 3: Respiratory Rate Estimation
# Estimate respiratory rate using multiple methods
resp_rate = resp_analysis.compute_respiratory_rate()
print("Respiratory Rate Estimate:")
print(f"Respiratory Rate: {resp_rate:.1f} breaths/min")
print(f"True Rate: {resp_rate:.1f} breaths/min")
Step 4: Breathing Pattern Analysis
# Analyze breathing patterns
# RespiratoryAnalysis only provides respiratory rate computation
print("Respiratory Analysis Complete")
Step 5: Respiratory Event Detection
# Detect respiratory events
events = resp_analysis.detect_respiratory_events(
apnea_threshold=0.1, # 10% reduction in amplitude
hypopnea_threshold=0.3 # 30% reduction in amplitude
)
print(f"Detected {len(events['apneas'])} apnea events")
print(f"Detected {len(events['hypopneas'])} hypopnea events")
Exercise: Advanced Analysis
Compare different respiratory rate estimation methods
Analyze the effect of noise on respiratory rate estimation
Implement custom respiratory event detection algorithms
Create visualizations of breathing patterns and events
Tutorial 4: Web Application Usage
Learn to use the VitalDSP web application for interactive signal analysis.
Prerequisites: * VitalDSP installed * Basic understanding of web interfaces * Sample physiological data files
Learning Objectives: * Launch the web application * Upload and configure signal data * Apply filtering and preprocessing * Perform interactive analysis * Generate reports and export results
Step 1: Launch the Web Application
from vitalDSP_webapp.run_webapp import run_webapp
# Start the web application
run_webapp(
debug=True,
port=8050,
host='localhost'
)
Step 2: Data Upload and Configuration
Open your browser and navigate to http://localhost:8050
Click on the “Upload” tab
Drag and drop your signal data file (CSV, Excel, or JSON)
Configure the data parameters: * Select the time column * Select the signal column * Set the sampling frequency * Choose the signal type (ECG, PPG, etc.)
Step 3: Signal Filtering
Navigate to the “Filtering” tab
The signal type will be automatically detected
Choose your filtering method: * Traditional Filters: Butterworth, Chebyshev, etc. * Advanced Filters: Kalman, adaptive filtering * Artifact Removal: Motion artifacts, baseline wander * Neural Network: Deep learning-based filtering
Configure filter parameters
Apply filtering and review results
Step 4: Interactive Analysis
Navigate to analysis screens: * Time Domain Analysis: Statistical and morphological features * Frequency Domain Analysis: Spectral analysis and frequency features * Physiological Analysis: HRV and comprehensive feature extraction * Respiratory Analysis: Respiratory rate estimation and pattern analysis
Use interactive features: * Zoom and pan on plots * Adjust time windows * Export visualizations * Download processed data
Step 5: Report Generation
Navigate to the “Health Report” section
Configure report parameters
Generate comprehensive analysis report
Export results in various formats: * PDF reports * CSV data exports * High-resolution images
Exercise: Complete Workflow
Upload a real physiological signal file
Apply appropriate filtering
Perform comprehensive analysis
Generate a health report
Export results for further analysis
Tutorial 5: Machine Learning Integration
Learn to integrate machine learning algorithms with VitalDSP for advanced signal analysis.
Prerequisites: * Completion of previous tutorials * Basic understanding of machine learning * Familiarity with scikit-learn
Learning Objectives: * Use neural network filtering * Implement anomaly detection * Apply Bayesian optimization * Create ensemble methods * Evaluate model performance
Step 1: Neural Network Filtering
from vitalDSP.advanced_computation.neural_network_filtering import NeuralNetworkFiltering
# Initialize neural network filter
nn_filter = NeuralNetworkFiltering(
model_type='autoencoder',
hidden_layers=[64, 32, 16],
epochs=100,
learning_rate=0.001
)
# Train the model (if needed)
nn_filter.train(filtered_signal)
# Apply neural network filtering
nn_filtered_signal = nn_filter.filter(filtered_signal)
Step 2: Anomaly Detection
from vitalDSP.advanced_computation.anomaly_detection import AnomalyDetection
# Initialize anomaly detector
anomaly_detector = AnomalyDetection(
method='isolation_forest',
contamination=0.1
)
# Detect anomalies
anomalies = anomaly_detector.detect_anomalies(filtered_signal)
print(f"Detected {np.sum(anomalies)} anomalous samples")
Step 3: Bayesian Optimization
from vitalDSP.advanced_computation.bayesian_optimization import BayesianOptimization
# Define objective function
def objective_function(params):
# Apply filtering with given parameters
sf = SignalFiltering(filtered_signal)
filtered = sf.bandpass(
lowcut=params['low_cut'],
highcut=params['high_cut'],
fs=fs,
order=int(params['filter_order'])
)
# Calculate signal quality metric
sqi = SignalQualityIndex(filtered)
quality, _, _ = sqi.amplitude_variability_sqi(
window_size=int(fs*5),
step_size=int(fs*1),
threshold=2,
aggregate=False
)
return np.mean(quality)
# Initialize Bayesian optimization
bo = BayesianOptimization(
objective_function,
{'low_cut': (0.1, 2.0), 'high_cut': (20.0, 50.0), 'filter_order': (2, 8)}
)
# Optimize parameters
bo.optimize(n_iter=20)
print(f"Best parameters: {bo.max['params']}")
print(f"Best score: {bo.max['target']:.4f}")
Step 4: Ensemble Methods
from vitalDSP.advanced_computation.ensemble_methods import EnsembleFiltering
# Initialize ensemble filter
ensemble = EnsembleFiltering(
methods=['butterworth', 'kalman', 'neural_network'],
weights=[0.4, 0.3, 0.3]
)
# Apply ensemble filtering
ensemble_filtered = ensemble.filter(filtered_signal)
Exercise: Advanced Applications
Compare different machine learning approaches
Optimize hyperparameters for your specific use case
Implement custom ensemble methods
Evaluate performance on different signal types
Best Practices
Performance Optimization * Use appropriate sampling rates for your analysis * Consider signal length vs. processing time trade-offs * Utilize batch processing for multiple signals * Cache frequently used computations
Data Quality * Always assess signal quality before analysis * Apply appropriate preprocessing steps * Validate results against known standards * Document your processing pipeline
Error Handling * Use try-catch blocks for robust error handling * Validate input data before processing * Log important processing steps * Provide meaningful error messages
Clinical Applications * Understand the clinical significance of your analysis * Validate results against clinical standards * Consider patient safety and data privacy * Document methodology for reproducibility
Troubleshooting Common Issues
Installation Issues * Ensure Python 3.8+ is installed * Check all dependencies are properly installed * Verify virtual environment setup
Signal Processing Issues * Validate input signal format and parameters * Check sampling frequency accuracy * Verify signal quality before processing
Web Application Issues * Ensure port 8050 is available * Check browser compatibility * Clear browser cache if visualizations don’t display
Performance Issues * Reduce signal length for faster processing * Use appropriate filter parameters * Consider using more efficient algorithms
Next Steps
After completing these tutorials, you should be able to:
Process physiological signals using various filtering techniques
Extract meaningful features from ECG, PPG, and respiratory signals
Perform comprehensive analysis including HRV and respiratory analysis
Use the web application for interactive signal processing
Integrate machine learning for advanced signal analysis
Tutorial 6: EMD and Advanced Signal Decomposition
Learn advanced signal decomposition techniques using Empirical Mode Decomposition (EMD) for non-linear and non-stationary physiological signals.
Prerequisites: * Completed Tutorial 1 (Basic Signal Processing) * Understanding of signal components * Familiarity with frequency analysis
Learning Objectives: * Understand Empirical Mode Decomposition (EMD) * Decompose signals into Intrinsic Mode Functions (IMFs) * Reconstruct and analyze signal components * Apply EMD to ECG and respiratory signals * Perform adaptive signal filtering using IMFs
What is EMD?
Empirical Mode Decomposition is a powerful method for decomposing non-linear and non-stationary signals into Intrinsic Mode Functions (IMFs). Unlike Fourier Transform, EMD is data-adaptive and can handle signals with varying frequency content.
Step 1: Basic EMD Decomposition
import numpy as np
from vitalDSP.advanced_computation.emd import EMD
from vitalDSP.utils.data_processing.synthesize_data import generate_ecg_signal
import matplotlib.pyplot as plt
# Generate ECG signal with noise
fs = 256
duration = 10
ecg = generate_ecg_signal(sfecg=fs, duration=duration, hrmean=75, Anoise=0.1)
# Perform EMD
print("Performing Empirical Mode Decomposition...")
emd = EMD(ecg)
imfs = emd.emd(max_imfs=6, stop_criterion=0.05)
print(f"✓ Extracted {len(imfs)} Intrinsic Mode Functions")
# Visualize IMFs
fig, axes = plt.subplots(len(imfs) + 2, 1, figsize=(12, 10))
axes[0].plot(ecg)
axes[0].set_title('Original ECG Signal')
axes[0].grid(True, alpha=0.3)
for i, imf in enumerate(imfs):
axes[i+1].plot(imf)
axes[i+1].set_title(f'IMF {i+1}')
axes[i+1].grid(True, alpha=0.3)
# Reconstruction
reconstructed = np.sum(imfs, axis=0)
axes[-1].plot(reconstructed)
axes[-1].set_title('Reconstructed Signal')
axes[-1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Calculate reconstruction error
reconstruction_error = np.mean((ecg - reconstructed) ** 2)
print(f"Reconstruction MSE: {reconstruction_error:.6f}")
Step 2: Adaptive Noise Removal Using IMFs
def adaptive_denoising(signal, n_low_imfs_to_remove=2, n_high_imfs_to_keep=3):
"""
Remove noise from signal using EMD-based adaptive filtering.
Parameters:
-----------
signal : array-like
Input noisy signal
n_low_imfs_to_remove : int
Number of high-frequency IMFs to remove (noise)
n_high_imfs_to_keep : int
Number of low-frequency IMFs to keep (trend)
Returns:
--------
denoised_signal : array
Denoised signal using selected IMFs
"""
emd = EMD(signal)
imfs = emd.emd(max_imfs=8)
if len(imfs) < n_low_imfs_to_remove + n_high_imfs_to_keep:
print("⚠️ Insufficient IMFs for optimal denoising")
# Use all except first and last IMF
selected_imfs = imfs[1:-1]
else:
# Remove high-frequency noise (first few IMFs)
# Keep middle-frequency components (signal of interest)
# Remove low-frequency trend (last few IMFs)
selected_imfs = imfs[n_low_imfs_to_remove:-n_high_imfs_to_keep]
denoised = np.sum(selected_imfs, axis=0)
return denoised, imfs
# Generate noisy ECG
clean_ecg = generate_ecg_signal(sfecg=fs, duration=duration, hrmean=75, Anoise=0.0)
noise = np.random.normal(0, 0.15, len(clean_ecg))
noisy_ecg = clean_ecg + noise
# Apply adaptive denoising
denoised_ecg, imfs = adaptive_denoising(noisy_ecg)
# Calculate SNR improvement
snr_before = 10 * np.log10(np.var(clean_ecg) / np.var(noise))
snr_after = 10 * np.log10(np.var(clean_ecg) / np.var(denoised_ecg - clean_ecg))
print(f"\n📊 Denoising Performance:")
print(f" SNR before: {snr_before:.2f} dB")
print(f" SNR after: {snr_after:.2f} dB")
print(f" Improvement: {snr_after - snr_before:.2f} dB")
Step 3: Component Analysis and Frequency Extraction
from scipy.fft import fft, fftfreq
def analyze_imf_frequencies(imfs, fs):
"""Analyze the dominant frequency of each IMF."""
imf_info = []
for i, imf in enumerate(imfs):
# Compute FFT
N = len(imf)
yf = fft(imf)
xf = fftfreq(N, 1/fs)[:N//2]
power = 2.0/N * np.abs(yf[:N//2])
# Find dominant frequency
dominant_freq_idx = np.argmax(power)
dominant_freq = xf[dominant_freq_idx]
# Calculate energy
energy = np.sum(imf ** 2)
imf_info.append({
'imf_number': i + 1,
'dominant_frequency': dominant_freq,
'energy': energy,
'mean_amplitude': np.mean(np.abs(imf))
})
return imf_info
# Analyze IMFs
imf_analysis = analyze_imf_frequencies(imfs, fs)
print(f"\n{'='*60}")
print("IMF ANALYSIS")
print(f"{'='*60}")
print(f"{'IMF':<6} {'Frequency':<12} {'Energy':<15} {'Mean Amp':<12}")
print("-" * 60)
for info in imf_analysis:
print(f"{info['imf_number']:<6} {info['dominant_frequency']:<12.2f} "
f"{info['energy']:<15.2e} {info['mean_amplitude']:<12.4f}")
Step 4: Clinical Application - Baseline Wander Removal
def remove_baseline_wander(ecg_signal):
"""
Remove baseline wander from ECG using EMD.
Baseline wander is typically in the lowest frequency IMF.
"""
emd = EMD(ecg_signal)
imfs = emd.emd()
# Baseline wander is usually in the last IMF (residual)
# Remove the last 1-2 IMFs which represent the trend
if len(imfs) >= 2:
baseline_corrected = np.sum(imfs[:-2], axis=0)
else:
baseline_corrected = ecg_signal
return baseline_corrected, imfs[-1] if len(imfs) > 0 else np.zeros_like(ecg_signal)
# Generate ECG with baseline wander
ecg_with_wander = generate_ecg_signal(sfecg=fs, duration=duration, hrmean=75, Anoise=0.05)
baseline_wander = 0.3 * np.sin(2 * np.pi * 0.2 * np.arange(len(ecg_with_wander)) / fs)
ecg_with_wander += baseline_wander
# Remove baseline wander
corrected_ecg, extracted_baseline = remove_baseline_wander(ecg_with_wander)
print(f"\n✅ Baseline wander removed successfully")
print(f" Original signal range: [{ecg_with_wander.min():.3f}, {ecg_with_wander.max():.3f}]")
print(f" Corrected signal range: [{corrected_ecg.min():.3f}, {corrected_ecg.max():.3f}]")
Key Takeaways:
✅ EMD is data-adaptive - automatically determines decomposition based on signal characteristics
✅ IMFs represent different frequency scales - from high-frequency noise to low-frequency trends
✅ Flexible denoising - select relevant IMFs based on application
✅ No predefined basis functions - unlike Fourier or Wavelet transforms
✅ Clinical applications - baseline wander removal, artifact detection, trend analysis
Tutorial 7: Sleep Apnea Detection and Respiratory Pattern Analysis
Learn to detect sleep apnea events and analyze respiratory patterns using advanced signal processing techniques.
Prerequisites: * Completed Tutorial 4 (Respiratory Analysis) * Understanding of respiratory physiology * Basic signal processing knowledge
Learning Objectives: * Detect sleep apnea events from respiratory signals * Classify apnea types (obstructive, central, mixed) * Calculate apnea-hypopnea index (AHI) * Analyze respiratory pattern variations * Generate clinical reports
What is Sleep Apnea?
Sleep apnea is a serious sleep disorder characterized by repeated pauses in breathing during sleep. The Apnea-Hypopnea Index (AHI) quantifies severity: - Normal: AHI < 5 - Mild: AHI 5-15 - Moderate: AHI 15-30 - Severe: AHI > 30
Step 1: Generate Simulated Sleep Study Data
import numpy as np
from vitalDSP.respiratory_analysis.sleep_apnea_detection.pause_detection import detect_apnea_pauses
from vitalDSP.respiratory_analysis.respiratory_analysis import RespiratoryAnalysis
from vitalDSP.utils.data_processing.synthesize_data import generate_resp_signal
def generate_sleep_study_signal(duration_minutes=60, fs=25, apnea_events=15):
"""
Generate simulated respiratory signal with apnea events.
Parameters:
-----------
duration_minutes : int
Duration of sleep study in minutes
fs : int
Sampling frequency
apnea_events : int
Number of apnea events to simulate
Returns:
--------
signal : array
Simulated respiratory signal
true_apnea_times : list
Ground truth apnea event times
"""
duration_sec = duration_minutes * 60
samples = duration_sec * fs
# Generate normal respiratory signal (0.2-0.3 Hz, 12-18 breaths/min)
base_resp = generate_resp_signal(
sampling_rate=fs,
duration=duration_sec,
frequency=0.25, # 15 breaths per minute
amplitude=1.0
)
# Add respiratory rate variability
rate_variation = 0.05 * np.sin(2 * np.pi * 0.01 * np.arange(samples) / fs)
signal = base_resp * (1 + rate_variation)
# Insert apnea events (breathing pauses)
true_apnea_times = []
event_indices = np.random.choice(
range(fs * 30, samples - fs * 30),
size=apnea_events,
replace=False
)
for event_idx in sorted(event_indices):
# Apnea duration: 10-30 seconds
apnea_duration = np.random.randint(10, 31)
pause_samples = apnea_duration * fs
# Gradual reduction and recovery
fade_samples = fs * 2 # 2 second fade
fade_out = np.linspace(1, 0, fade_samples)
fade_in = np.linspace(0, 1, fade_samples)
start_idx = event_idx
end_idx = min(event_idx + pause_samples, samples)
# Apply apnea
if end_idx - start_idx > 2 * fade_samples:
signal[start_idx:start_idx+fade_samples] *= fade_out
signal[start_idx+fade_samples:end_idx-fade_samples] *= 0.1 # Minimal breathing
signal[end_idx-fade_samples:end_idx] *= fade_in
true_apnea_times.append({
'start': start_idx / fs,
'end': end_idx / fs,
'duration': (end_idx - start_idx) / fs
})
return signal, true_apnea_times
# Generate 2-hour sleep study
print("Generating simulated sleep study data...")
fs = 25 # 25 Hz for respiratory signals
duration_min = 120 # 2 hours
expected_events = 40 # Moderate-severe sleep apnea
resp_signal, true_events = generate_sleep_study_signal(
duration_minutes=duration_min,
fs=fs,
apnea_events=expected_events
)
print(f"✓ Generated {duration_min}-minute sleep study")
print(f"✓ Inserted {len(true_events)} apnea events")
Step 2: Detect Apnea Events
# Detect apnea pauses
print("\nDetecting sleep apnea events...")
detected_events = detect_apnea_pauses(
signal=resp_signal,
sampling_rate=fs,
min_pause_duration=10, # Minimum 10 seconds for apnea
preprocess='bandpass',
low=0.1,
high=0.5,
order=4
)
print(f"✓ Detected {len(detected_events)} apnea events")
# Calculate detection performance
def calculate_detection_metrics(true_events, detected_events, tolerance=5.0):
"""
Calculate precision, recall, and F1-score for apnea detection.
Parameters:
-----------
true_events : list
Ground truth apnea events
detected_events : list
Detected apnea events
tolerance : float
Time tolerance in seconds for matching events
"""
true_positives = 0
for true_event in true_events:
true_start = true_event['start']
# Check if any detected event is within tolerance
for det_start, det_end in detected_events:
if abs(det_start - true_start) <= tolerance:
true_positives += 1
break
false_positives = len(detected_events) - true_positives
false_negatives = len(true_events) - true_positives
precision = true_positives / len(detected_events) if len(detected_events) > 0 else 0
recall = true_positives / len(true_events) if len(true_events) > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
return {
'true_positives': true_positives,
'false_positives': false_positives,
'false_negatives': false_negatives,
'precision': precision,
'recall': recall,
'f1_score': f1_score
}
metrics = calculate_detection_metrics(true_events, detected_events)
print(f"\n{'='*60}")
print("DETECTION PERFORMANCE")
print(f"{'='*60}")
print(f"True Positives: {metrics['true_positives']}")
print(f"False Positives: {metrics['false_positives']}")
print(f"False Negatives: {metrics['false_negatives']}")
print(f"Precision: {metrics['precision']:.2%}")
print(f"Recall: {metrics['recall']:.2%}")
print(f"F1-Score: {metrics['f1_score']:.2%}")
Step 3: Calculate Apnea-Hypopnea Index (AHI)
def calculate_ahi(detected_events, duration_hours):
"""
Calculate Apnea-Hypopnea Index (AHI).
AHI = Number of apnea/hypopnea events per hour of sleep
"""
n_events = len(detected_events)
ahi = n_events / duration_hours
# Classify severity
if ahi < 5:
severity = "Normal"
elif ahi < 15:
severity = "Mild Sleep Apnea"
elif ahi < 30:
severity = "Moderate Sleep Apnea"
else:
severity = "Severe Sleep Apnea"
return ahi, severity
# Calculate AHI
duration_hours = duration_min / 60
ahi, severity = calculate_ahi(detected_events, duration_hours)
print(f"\n{'='*60}")
print("SLEEP APNEA ASSESSMENT")
print(f"{'='*60}")
print(f"Study Duration: {duration_hours:.1f} hours")
print(f"Total Events: {len(detected_events)}")
print(f"AHI: {ahi:.1f} events/hour")
print(f"Severity: {severity}")
# Event statistics
if len(detected_events) > 0:
event_durations = [(end - start) for start, end in detected_events]
print(f"\nEvent Statistics:")
print(f" Mean duration: {np.mean(event_durations):.1f} seconds")
print(f" Min duration: {np.min(event_durations):.1f} seconds")
print(f" Max duration: {np.max(event_durations):.1f} seconds")
print(f" Std deviation: {np.std(event_durations):.1f} seconds")
Step 4: Comprehensive Sleep Study Report
def generate_sleep_study_report(resp_signal, fs, detected_events, duration_hours):
"""Generate comprehensive sleep apnea report."""
# Calculate respiratory rate during non-apnea periods
resp_analyzer = RespiratoryAnalysis(resp_signal, fs=fs)
try:
resp_rate = resp_analyzer.estimate_respiratory_rate()
except:
resp_rate = None
# Calculate AHI
ahi, severity = calculate_ahi(detected_events, duration_hours)
# Oxygen desaturation events (estimated from respiratory pauses)
# In real scenario, this would come from SpO2 sensor
estimated_desat_events = int(len(detected_events) * 0.8) # 80% of apneas cause desaturation
report = {
'study_duration_hours': duration_hours,
'total_apnea_events': len(detected_events),
'ahi': ahi,
'severity': severity,
'mean_respiratory_rate': resp_rate,
'estimated_desaturation_events': estimated_desat_events,
'recommendations': []
}
# Clinical recommendations
if ahi >= 30:
report['recommendations'].append('⚠️ URGENT: Immediate sleep specialist consultation recommended')
report['recommendations'].append('Consider CPAP therapy evaluation')
report['recommendations'].append('Cardiovascular risk assessment advised')
elif ahi >= 15:
report['recommendations'].append('⚠️ Sleep specialist consultation recommended')
report['recommendations'].append('Lifestyle modifications: weight loss, avoid alcohol before sleep')
report['recommendations'].append('Consider oral appliance or CPAP therapy')
elif ahi >= 5:
report['recommendations'].append('⚠️ Mild sleep apnea detected')
report['recommendations'].append('Lifestyle modifications recommended')
report['recommendations'].append('Follow-up study in 6-12 months')
else:
report['recommendations'].append('✓ No significant sleep apnea detected')
report['recommendations'].append('Maintain healthy sleep habits')
return report
# Generate report
report = generate_sleep_study_report(resp_signal, fs, detected_events, duration_hours)
print(f"\n{'='*60}")
print("CLINICAL SLEEP STUDY REPORT")
print(f"{'='*60}")
print(f"Study Duration: {report['study_duration_hours']:.1f} hours")
print(f"Total Apnea Events: {report['total_apnea_events']}")
print(f"Apnea-Hypopnea Index (AHI): {report['ahi']:.1f} events/hour")
print(f"Severity Classification: {report['severity']}")
if report['mean_respiratory_rate']:
print(f"Mean Respiratory Rate: {report['mean_respiratory_rate']:.1f} breaths/min")
print(f"Estimated Desaturation Events: {report['estimated_desaturation_events']}")
print(f"\nClinical Recommendations:")
for rec in report['recommendations']:
print(f" {rec}")
Key Takeaways:
✅ AHI is the gold standard for sleep apnea severity assessment
✅ Automated detection reduces manual scoring time
✅ Clinical integration - generate actionable reports
✅ Multi-modal analysis - combine with SpO2 and ECG for comprehensive assessment
✅ Real-world application - adaptable to various sensor types
Tutorial 8: ECG-PPG Synchronization and Pulse Transit Time Analysis
Learn advanced multi-modal analysis by synchronizing ECG and PPG signals to extract cardiovascular features like Pulse Transit Time (PTT).
Prerequisites: * Completed Tutorial 1 (Basic ECG Processing) * Understanding of cardiovascular physiology * Familiarity with peak detection
Learning Objectives: * Synchronize ECG and PPG signals * Calculate Pulse Transit Time (PTT) * Estimate blood pressure trends * Analyze vascular parameters * Perform cardio-synchronization assessment
What is Pulse Transit Time?
Pulse Transit Time (PTT) is the time it takes for the arterial pulse wave to travel from the heart (R-peak in ECG) to a peripheral site (PPG sensor). PTT is inversely related to blood pressure and provides valuable information about vascular health.
Step 1: Generate Synchronized ECG and PPG Signals
import numpy as np
from vitalDSP.feature_engineering.ecg_ppg_synchronyzation_features import ECGPPGSynchronization
from vitalDSP.utils.data_processing.synthesize_data import generate_ecg_signal, generate_synthetic_ppg
from vitalDSP.filtering.signal_filtering import SignalFiltering
# Generate synchronized signals
fs_ecg = 256 # ECG sampling frequency
fs_ppg = 100 # PPG sampling frequency (often lower than ECG)
duration = 60 # 1 minute
hr = 75 # Heart rate
print("Generating synchronized ECG and PPG signals...")
# Generate ECG
ecg_signal = generate_ecg_signal(
sfecg=fs_ecg,
duration=duration,
hrmean=hr,
Anoise=0.05
)
# Generate PPG (with physiological delay)
ppg_signal = generate_synthetic_ppg(
duration=duration,
sampling_rate=fs_ppg,
heart_rate=hr,
noise_level=0.05
)
# Add realistic PTT (150-250 ms typical range)
ptt_ms = 200 # milliseconds
ptt_samples_ppg = int(ptt_ms * fs_ppg / 1000)
# Delay PPG signal to simulate PTT
ppg_signal = np.roll(ppg_signal, ptt_samples_ppg)
print(f"✓ Generated {duration}s of synchronized signals")
print(f" ECG: {fs_ecg} Hz")
print(f" PPG: {fs_ppg} Hz")
print(f" Simulated PTT: {ptt_ms} ms")
# Preprocess signals
sf_ecg = SignalFiltering(ecg_signal)
ecg_filtered = sf_ecg.bandpass(lowcut=0.5, highcut=40.0, fs=fs_ecg, order=4)
sf_ppg = SignalFiltering(ppg_signal)
ppg_filtered = sf_ppg.bandpass(lowcut=0.5, highcut=8.0, fs=fs_ppg, order=4)
Step 2: Calculate Pulse Transit Time
# Initialize synchronization analyzer
sync_analyzer = ECGPPGSynchronization(
ecg_signal=ecg_filtered,
ppg_signal=ppg_filtered,
ecg_fs=fs_ecg,
ppg_fs=fs_ppg
)
# Detect peaks
r_peaks = sync_analyzer.detect_r_peaks()
ppg_peaks = sync_analyzer.detect_ppg_peaks()
print(f"\n✓ Detected {len(r_peaks)} R-peaks in ECG")
print(f"✓ Detected {len(ppg_peaks)} peaks in PPG")
# Calculate PTT
ptt_values = sync_analyzer.compute_pulse_transit_time()
if len(ptt_values) > 0:
print(f"\n{'='*60}")
print("PULSE TRANSIT TIME ANALYSIS")
print(f"{'='*60}")
print(f"Number of PTT measurements: {len(ptt_values)}")
print(f"Mean PTT: {np.mean(ptt_values):.2f} ms")
print(f"Std PTT: {np.std(ptt_values):.2f} ms")
print(f"Min PTT: {np.min(ptt_values):.2f} ms")
print(f"Max PTT: {np.max(ptt_values):.2f} ms")
print(f"PTT Variability: {np.std(ptt_values)/np.mean(ptt_values)*100:.2f}%")
Step 3: Blood Pressure Trend Estimation
def estimate_bp_trend(ptt_values, baseline_sbp=120, baseline_dbp=80):
"""
Estimate blood pressure trends from PTT variations.
PTT is inversely related to blood pressure:
- Shorter PTT → Higher BP
- Longer PTT → Lower BP
Parameters:
-----------
ptt_values : array
Pulse transit time values in milliseconds
baseline_sbp : float
Baseline systolic BP (mmHg)
baseline_dbp : float
Baseline diastolic BP (mmHg)
Returns:
--------
dict : BP trend estimates
"""
if len(ptt_values) == 0:
return None
# Empirical relationship: ΔBP ≈ -k * ΔPTT
# k typically ranges from 0.3 to 0.5 mmHg/ms
k = 0.4
mean_ptt = np.mean(ptt_values)
ptt_changes = ptt_values - mean_ptt
# Estimate BP changes
bp_changes = -k * ptt_changes
# Estimated BP values
estimated_sbp = baseline_sbp + bp_changes
estimated_dbp = baseline_dbp + bp_changes * 0.6 # DBP changes less than SBP
return {
'mean_sbp': np.mean(estimated_sbp),
'std_sbp': np.std(estimated_sbp),
'min_sbp': np.min(estimated_sbp),
'max_sbp': np.max(estimated_sbp),
'mean_dbp': np.mean(estimated_dbp),
'std_dbp': np.std(estimated_dbp),
'bp_variability': np.std(estimated_sbp),
'estimated_sbp_trend': estimated_sbp,
'estimated_dbp_trend': estimated_dbp
}
# Estimate BP trends
bp_estimates = estimate_bp_trend(ptt_values)
if bp_estimates:
print(f"\n{'='*60}")
print("BLOOD PRESSURE TREND ESTIMATION")
print(f"{'='*60}")
print(f"Estimated Mean SBP: {bp_estimates['mean_sbp']:.1f} ± {bp_estimates['std_sbp']:.1f} mmHg")
print(f"Estimated Mean DBP: {bp_estimates['mean_dbp']:.1f} ± {bp_estimates['std_dbp']:.1f} mmHg")
print(f"SBP Range: [{bp_estimates['min_sbp']:.1f}, {bp_estimates['max_sbp']:.1f}] mmHg")
print(f"BP Variability: {bp_estimates['bp_variability']:.2f} mmHg")
# Clinical interpretation
print(f"\nClinical Assessment:")
if bp_estimates['mean_sbp'] < 120 and bp_estimates['mean_dbp'] < 80:
print(" ✓ Normal blood pressure range")
elif bp_estimates['mean_sbp'] < 130 and bp_estimates['mean_dbp'] < 80:
print(" ⚠️ Elevated blood pressure (prehypertension)")
elif bp_estimates['mean_sbp'] < 140 or bp_estimates['mean_dbp'] < 90:
print(" ⚠️ Stage 1 Hypertension")
else:
print(" ⚠️ Stage 2 Hypertension - medical consultation recommended")
if bp_estimates['bp_variability'] > 10:
print(" ⚠️ High BP variability - may indicate cardiovascular risk")
else:
print(" ✓ Normal BP variability")
Step 4: Comprehensive Cardiovascular Assessment
def comprehensive_cardiovascular_analysis(ecg_ppg_sync, ptt_values, bp_estimates):
"""Generate comprehensive cardiovascular health report."""
# Calculate additional features
pat = ecg_ppg_sync.compute_pulse_arrival_time()
# Pulse wave velocity (PWV) estimation
# PWV ≈ Distance / PTT
# Assuming typical distance of 60 cm from heart to finger
distance_cm = 60
if len(ptt_values) > 0:
mean_ptt_s = np.mean(ptt_values) / 1000 # Convert to seconds
pwv = distance_cm / mean_ptt_s # cm/s
pwv_m_s = pwv / 100 # m/s
else:
pwv_m_s = None
report = {
'ptt_metrics': {
'mean': np.mean(ptt_values) if len(ptt_values) > 0 else None,
'std': np.std(ptt_values) if len(ptt_values) > 0 else None,
'variability': np.std(ptt_values)/np.mean(ptt_values)*100 if len(ptt_values) > 0 else None
},
'pwv_m_s': pwv_m_s,
'bp_estimates': bp_estimates,
'vascular_health': 'unknown',
'recommendations': []
}
# Assess vascular health based on PWV
if pwv_m_s:
if pwv_m_s < 7:
report['vascular_health'] = 'Good (Compliant arteries)'
report['recommendations'].append('✓ Vascular health within normal range')
elif pwv_m_s < 10:
report['vascular_health'] = 'Fair (Moderate stiffness)'
report['recommendations'].append('⚠️ Moderate arterial stiffness detected')
report['recommendations'].append('Regular cardiovascular monitoring recommended')
else:
report['vascular_health'] = 'Poor (Stiff arteries)'
report['recommendations'].append('⚠️ Significant arterial stiffness detected')
report['recommendations'].append('Cardiovascular risk assessment advised')
report['recommendations'].append('Lifestyle modifications and medical evaluation recommended')
# PTT variability assessment
if report['ptt_metrics']['variability']:
if report['ptt_metrics']['variability'] < 10:
report['recommendations'].append('✓ Stable cardiovascular function')
else:
report['recommendations'].append('⚠️ High PTT variability may indicate autonomic dysfunction')
return report
# Generate comprehensive report
cardio_report = comprehensive_cardiovascular_analysis(sync_analyzer, ptt_values, bp_estimates)
print(f"\n{'='*60}")
print("COMPREHENSIVE CARDIOVASCULAR ASSESSMENT")
print(f"{'='*60}")
if cardio_report['ptt_metrics']['mean']:
print(f"\nPulse Transit Time:")
print(f" Mean PTT: {cardio_report['ptt_metrics']['mean']:.2f} ms")
print(f" PTT Variability: {cardio_report['ptt_metrics']['variability']:.2f}%")
if cardio_report['pwv_m_s']:
print(f"\nPulse Wave Velocity:")
print(f" PWV: {cardio_report['pwv_m_s']:.2f} m/s")
print(f"\nVascular Health Status: {cardio_report['vascular_health']}")
print(f"\nClinical Recommendations:")
for rec in cardio_report['recommendations']:
print(f" {rec}")
Key Takeaways:
✅ PTT provides non-invasive BP trends - useful for continuous monitoring
✅ Multi-modal analysis - combining ECG and PPG reveals more information
✅ Vascular assessment - PWV indicates arterial stiffness
✅ Clinical applications - cardiovascular risk stratification
✅ Real-time monitoring - adaptable to wearable devices
Continue exploring the documentation to learn about: * Advanced signal processing techniques * Custom analysis pipelines * Performance optimization * Clinical applications * Contributing to the project
Happy learning with VitalDSP! 🫀📊