Examples
This section provides practical examples and real-world use cases for VitalDSP. Each example demonstrates specific functionality and can be used as a starting point for your own projects.
Example Categories
- Clinical Research Examples
ECG analysis for cardiovascular research
PPG analysis for hemodynamic studies
Respiratory analysis for sleep studies
- Healthcare Monitoring Examples
Real-time vital signs monitoring
Continuous patient monitoring
Telemedicine applications
- Wearable Device Examples
Fitness tracker integration
Smartwatch signal processing
Mobile health applications
- Medical Device Examples
Patient monitoring systems
Diagnostic equipment integration
Clinical decision support systems
Example 1: ECG Analysis for Clinical Research
This example demonstrates comprehensive ECG analysis for cardiovascular research applications.
Use Case: Analyzing ECG signals from clinical trials to assess cardiovascular health and detect abnormalities.
Key Features: * R-peak detection and RR interval analysis * Heart rate variability (HRV) analysis * Morphological feature extraction * Signal quality assessment * Clinical interpretation
Implementation:
import numpy as np
import pandas as pd
from vitalDSP.filtering.signal_filtering import SignalFiltering
from vitalDSP.physiological_features.waveform import WaveformMorphology
from vitalDSP.physiological_features.hrv_analysis import HRVFeatures
from vitalDSP.signal_quality_assessment.signal_quality_index import SignalQualityIndex
def analyze_ecg_clinical_research(ecg_signal, fs, patient_id=None):
"""
Comprehensive ECG analysis for clinical research.
Parameters:
-----------
ecg_signal : array-like
ECG signal data
fs : float
Sampling frequency (Hz)
patient_id : str, optional
Patient identifier
Returns:
--------
dict : Comprehensive analysis results
"""
results = {
'patient_id': patient_id,
'sampling_frequency': fs,
'signal_length': len(ecg_signal),
'analysis_timestamp': pd.Timestamp.now()
}
# 1. Signal Preprocessing
sf = SignalFiltering(ecg_signal)
filtered_ecg = sf.bandpass(lowcut=0.5, highcut=40.0, fs=fs, order=4)
# 2. Signal Quality Assessment
sqi = SignalQualityIndex(filtered_ecg)
quality_metrics = {}
# Amplitude variability SQI
amp_sqi, _, _ = sqi.amplitude_variability_sqi(
window_size=int(fs*5), step_size=int(fs*1), threshold=2, aggregate=False
)
quality_metrics['amplitude_variability'] = np.mean(amp_sqi)
# Baseline wander SQI
baseline_sqi, _, _ = sqi.baseline_wander_sqi(
window_size=int(fs*5), step_size=int(fs*1), threshold=2, aggregate=False
)
quality_metrics['baseline_wander'] = np.mean(baseline_sqi)
# Signal-to-noise ratio
snr_sqi, _, _ = sqi.snr_sqi(
window_size=int(fs*5), step_size=int(fs*1), threshold=-1, aggregate=False
)
quality_metrics['signal_to_noise_ratio'] = np.mean(snr_sqi)
results['quality_metrics'] = quality_metrics
# 3. R-Peak Detection and RR Interval Analysis
wm = WaveformMorphology(filtered_ecg, fs=fs, signal_type="ECG")
r_peaks = wm.r_peaks
if len(r_peaks) > 10: # Ensure sufficient data
# Calculate RR intervals
rr_intervals = np.diff(r_peaks) / fs * 1000 # Convert to ms
# Remove outliers
valid_rr = rr_intervals[(rr_intervals > 300) & (rr_intervals < 2000)]
if len(valid_rr) > 5:
# Heart rate calculation
heart_rate = 60 * fs / np.mean(np.diff(r_peaks))
# HRV Analysis
hrv = HRVFeatures(valid_rr)
hrv_features = hrv.compute_all_features()
results['heart_rate'] = heart_rate
results['rr_intervals'] = {
'count': len(valid_rr),
'mean': np.mean(valid_rr),
'std': np.std(valid_rr),
'min': np.min(valid_rr),
'max': np.max(valid_rr)
}
results['hrv_features'] = hrv_features
# Morphological features
morphological_features = {
'r_peak_count': len(r_peaks),
'qrs_duration': wm.qrs_duration if hasattr(wm, 'qrs_duration') else None,
'qt_interval': wm.qt_interval if hasattr(wm, 'qt_interval') else None
}
results['morphological_features'] = morphological_features
# Clinical interpretation
clinical_interpretation = interpret_ecg_results(hrv_features, heart_rate)
results['clinical_interpretation'] = clinical_interpretation
return results
def interpret_ecg_results(hrv_features, heart_rate):
"""Provide clinical interpretation of ECG analysis results."""
interpretation = {
'heart_rate_status': 'normal' if 60 <= heart_rate <= 100 else 'abnormal',
'hrv_status': 'normal',
'risk_assessment': 'low',
'recommendations': []
}
# Heart rate interpretation
if heart_rate < 60:
interpretation['heart_rate_status'] = 'bradycardia'
interpretation['recommendations'].append('Consider bradycardia evaluation')
elif heart_rate > 100:
interpretation['heart_rate_status'] = 'tachycardia'
interpretation['recommendations'].append('Consider tachycardia evaluation')
# HRV interpretation
if 'sdnn' in hrv_features:
sdnn = hrv_features['sdnn']
if sdnn < 30:
interpretation['hrv_status'] = 'reduced'
interpretation['risk_assessment'] = 'elevated'
interpretation['recommendations'].append('Reduced HRV may indicate stress or cardiovascular risk')
elif sdnn > 50:
interpretation['hrv_status'] = 'good'
interpretation['risk_assessment'] = 'low'
return interpretation
Usage Example:
# Load ECG data (replace with your data loading method)
ecg_data = np.load('ecg_data.npy') # Your ECG signal
fs = 1000 # Sampling frequency
# Perform analysis
results = analyze_ecg_clinical_research(ecg_data, fs, patient_id="P001")
# Display results
print(f"Patient ID: {results['patient_id']}")
print(f"Heart Rate: {results['heart_rate']:.1f} BPM")
print(f"HRV Status: {results['clinical_interpretation']['hrv_status']}")
print(f"Risk Assessment: {results['clinical_interpretation']['risk_assessment']}")
Example 2: PPG Analysis for Hemodynamic Studies
This example demonstrates PPG signal analysis for hemodynamic studies and cardiovascular assessment.
Use Case: Analyzing PPG signals to assess blood volume changes, pulse wave characteristics, and cardiovascular health.
Key Features: * Systolic and diastolic peak detection * Pulse wave analysis * Hemodynamic parameter estimation * Signal quality assessment * Clinical interpretation
Implementation:
def analyze_ppg_hemodynamic(ppg_signal, fs, patient_id=None):
"""
Comprehensive PPG analysis for hemodynamic studies.
Parameters:
-----------
ppg_signal : array-like
PPG signal data
fs : float
Sampling frequency (Hz)
patient_id : str, optional
Patient identifier
Returns:
--------
dict : Comprehensive PPG analysis results
"""
results = {
'patient_id': patient_id,
'sampling_frequency': fs,
'signal_length': len(ppg_signal),
'analysis_timestamp': pd.Timestamp.now()
}
# 1. Signal Preprocessing
sf = SignalFiltering(ppg_signal)
filtered_ppg = sf.bandpass(lowcut=0.5, highcut=8.0, fs=fs, order=4) # PPG-specific range
# 2. Signal Quality Assessment
sqi = SignalQualityIndex(filtered_ppg)
quality_metrics = {}
# Amplitude variability SQI
amp_sqi, _, _ = sqi.amplitude_variability_sqi(
window_size=int(fs*5), step_size=int(fs*1), threshold=2, aggregate=False
)
quality_metrics['amplitude_variability'] = np.mean(amp_sqi)
# Signal-to-noise ratio
snr_sqi, _, _ = sqi.snr_sqi(
window_size=int(fs*5), step_size=int(fs*1), threshold=-1, aggregate=False
)
quality_metrics['signal_to_noise_ratio'] = np.mean(snr_sqi)
results['quality_metrics'] = quality_metrics
# 3. PPG-Specific Analysis
wm = WaveformMorphology(filtered_ppg, fs=fs, signal_type="PPG")
# Detect systolic peaks
systolic_peaks = wm.systolic_peaks
if len(systolic_peaks) > 5:
# Calculate pulse rate
pulse_rate = 60 * fs / np.mean(np.diff(systolic_peaks))
# Pulse interval analysis
pulse_intervals = np.diff(systolic_peaks) / fs * 1000 # Convert to ms
valid_intervals = pulse_intervals[(pulse_intervals > 400) & (pulse_intervals < 2000)]
if len(valid_intervals) > 3:
# Pulse rate variability (similar to HRV)
prv_features = {
'mean_pulse_interval': np.mean(valid_intervals),
'std_pulse_interval': np.std(valid_intervals),
'pulse_rate_variability': np.std(valid_intervals) / np.mean(valid_intervals) * 100
}
# Hemodynamic parameters
hemodynamic_params = calculate_hemodynamic_parameters(filtered_ppg, systolic_peaks, fs)
results['pulse_rate'] = pulse_rate
results['pulse_intervals'] = {
'count': len(valid_intervals),
'mean': np.mean(valid_intervals),
'std': np.std(valid_intervals)
}
results['prv_features'] = prv_features
results['hemodynamic_parameters'] = hemodynamic_params
# Clinical interpretation
clinical_interpretation = interpret_ppg_results(prv_features, pulse_rate, hemodynamic_params)
results['clinical_interpretation'] = clinical_interpretation
return results
def calculate_hemodynamic_parameters(ppg_signal, systolic_peaks, fs):
"""Calculate hemodynamic parameters from PPG signal."""
params = {}
if len(systolic_peaks) > 2:
# Pulse amplitude
pulse_amplitudes = []
for i in range(len(systolic_peaks) - 1):
start = systolic_peaks[i]
end = systolic_peaks[i + 1]
pulse_segment = ppg_signal[start:end]
amplitude = np.max(pulse_segment) - np.min(pulse_segment)
pulse_amplitudes.append(amplitude)
params['mean_pulse_amplitude'] = np.mean(pulse_amplitudes)
params['pulse_amplitude_variability'] = np.std(pulse_amplitudes) / np.mean(pulse_amplitudes) * 100
# Pulse wave analysis
params['pulse_wave_analysis'] = analyze_pulse_wave_morphology(ppg_signal, systolic_peaks, fs)
return params
def analyze_pulse_wave_morphology(ppg_signal, systolic_peaks, fs):
"""Analyze pulse wave morphology."""
morphology = {}
if len(systolic_peaks) > 1:
# Calculate average pulse wave
pulse_waves = []
for i in range(len(systolic_peaks) - 1):
start = systolic_peaks[i]
end = systolic_peaks[i + 1]
pulse_wave = ppg_signal[start:end]
if len(pulse_wave) > 10: # Ensure sufficient data
pulse_waves.append(pulse_wave)
if pulse_waves:
# Normalize pulse waves to same length
min_length = min(len(wave) for wave in pulse_waves)
normalized_waves = [wave[:min_length] for wave in pulse_waves]
# Calculate average pulse wave
average_pulse_wave = np.mean(normalized_waves, axis=0)
# Analyze morphology
morphology['pulse_width'] = calculate_pulse_width(average_pulse_wave)
morphology['pulse_slope'] = calculate_pulse_slope(average_pulse_wave)
morphology['pulse_area'] = np.trapz(average_pulse_wave)
return morphology
def interpret_ppg_results(prv_features, pulse_rate, hemodynamic_params):
"""Provide clinical interpretation of PPG analysis results."""
interpretation = {
'pulse_rate_status': 'normal' if 60 <= pulse_rate <= 100 else 'abnormal',
'hemodynamic_status': 'normal',
'risk_assessment': 'low',
'recommendations': []
}
# Pulse rate interpretation
if pulse_rate < 60:
interpretation['pulse_rate_status'] = 'bradycardia'
interpretation['recommendations'].append('Consider bradycardia evaluation')
elif pulse_rate > 100:
interpretation['pulse_rate_status'] = 'tachycardia'
interpretation['recommendations'].append('Consider tachycardia evaluation')
# Hemodynamic interpretation
if 'mean_pulse_amplitude' in hemodynamic_params:
pulse_amplitude = hemodynamic_params['mean_pulse_amplitude']
if pulse_amplitude < 0.1: # Threshold depends on your signal scaling
interpretation['hemodynamic_status'] = 'reduced'
interpretation['risk_assessment'] = 'elevated'
interpretation['recommendations'].append('Reduced pulse amplitude may indicate poor perfusion')
return interpretation
Example 3: Real-Time Vital Signs Monitoring
This example demonstrates real-time vital signs monitoring using VitalDSP.
Use Case: Continuous monitoring of vital signs in clinical settings or remote patient monitoring applications.
Key Features: * Real-time signal processing * Continuous vital signs calculation * Alert generation for abnormal values * Data logging and storage * Web interface for monitoring
Implementation:
import time
import threading
from collections import deque
from vitalDSP.filtering.signal_filtering import SignalFiltering
from vitalDSP.physiological_features.waveform import WaveformMorphology
from vitalDSP.signal_quality_assessment.signal_quality_index import SignalQualityIndex
class RealTimeVitalSignsMonitor:
"""Real-time vital signs monitoring system."""
def __init__(self, fs=1000, window_size=10, update_interval=1):
"""
Initialize real-time monitor.
Parameters:
-----------
fs : float
Sampling frequency (Hz)
window_size : int
Window size in seconds
update_interval : float
Update interval in seconds
"""
self.fs = fs
self.window_size = window_size
self.update_interval = update_interval
self.window_samples = fs * window_size
# Data buffers
self.signal_buffer = deque(maxlen=self.window_samples)
self.vital_signs_history = deque(maxlen=100) # Keep last 100 measurements
# Current vital signs
self.current_vital_signs = {
'heart_rate': None,
'pulse_rate': None,
'signal_quality': None,
'timestamp': None
}
# Alert thresholds
self.alert_thresholds = {
'heart_rate_low': 50,
'heart_rate_high': 120,
'signal_quality_low': 0.5
}
# Monitoring state
self.is_monitoring = False
self.monitor_thread = None
def add_signal_data(self, signal_chunk):
"""Add new signal data to the buffer."""
self.signal_buffer.extend(signal_chunk)
def start_monitoring(self):
"""Start real-time monitoring."""
if not self.is_monitoring:
self.is_monitoring = True
self.monitor_thread = threading.Thread(target=self._monitoring_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
print("Real-time monitoring started")
def stop_monitoring(self):
"""Stop real-time monitoring."""
self.is_monitoring = False
if self.monitor_thread:
self.monitor_thread.join()
print("Real-time monitoring stopped")
def _monitoring_loop(self):
"""Main monitoring loop."""
while self.is_monitoring:
if len(self.signal_buffer) >= self.window_samples:
# Get current window
current_signal = np.array(list(self.signal_buffer))
# Process signal
vital_signs = self._process_signal_window(current_signal)
# Update current vital signs
self.current_vital_signs.update(vital_signs)
self.current_vital_signs['timestamp'] = time.time()
# Add to history
self.vital_signs_history.append(vital_signs.copy())
# Check for alerts
self._check_alerts(vital_signs)
# Log results
self._log_vital_signs(vital_signs)
time.sleep(self.update_interval)
def _process_signal_window(self, signal):
"""Process a window of signal data."""
vital_signs = {}
try:
# Signal preprocessing
sf = SignalFiltering(signal)
filtered_signal = sf.bandpass(lowcut=0.5, highcut=40.0, fs=self.fs, order=4)
# Signal quality assessment
sqi = SignalQualityIndex(filtered_signal)
quality_sqi, _, _ = sqi.amplitude_variability_sqi(
window_size=int(self.fs*5), step_size=int(self.fs*1), threshold=2, aggregate=False
)
vital_signs['signal_quality'] = np.mean(quality_sqi)
# Detect peaks (assuming ECG signal)
wm = WaveformMorphology(filtered_signal, fs=self.fs, signal_type="ECG")
r_peaks = wm.r_peaks
if len(r_peaks) > 2:
# Calculate heart rate
rr_intervals = np.diff(r_peaks) / self.fs
valid_rr = rr_intervals[(rr_intervals > 0.3) & (rr_intervals < 2.0)]
if len(valid_rr) > 1:
heart_rate = 60 / np.mean(valid_rr)
vital_signs['heart_rate'] = heart_rate
vital_signs['pulse_rate'] = heart_rate # Assuming same for this example
except Exception as e:
print(f"Error processing signal: {e}")
vital_signs['error'] = str(e)
return vital_signs
def _check_alerts(self, vital_signs):
"""Check for alert conditions."""
alerts = []
if 'heart_rate' in vital_signs:
hr = vital_signs['heart_rate']
if hr < self.alert_thresholds['heart_rate_low']:
alerts.append(f"Low heart rate: {hr:.1f} BPM")
elif hr > self.alert_thresholds['heart_rate_high']:
alerts.append(f"High heart rate: {hr:.1f} BPM")
if 'signal_quality' in vital_signs:
sq = vital_signs['signal_quality']
if sq < self.alert_thresholds['signal_quality_low']:
alerts.append(f"Poor signal quality: {sq:.2f}")
if alerts:
self._handle_alerts(alerts)
def _handle_alerts(self, alerts):
"""Handle alert conditions."""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
for alert in alerts:
print(f"[ALERT {timestamp}] {alert}")
# Here you could send notifications, log to database, etc.
def _log_vital_signs(self, vital_signs):
"""Log vital signs data."""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] "
if 'heart_rate' in vital_signs:
log_entry += f"HR: {vital_signs['heart_rate']:.1f} BPM "
if 'signal_quality' in vital_signs:
log_entry += f"SQ: {vital_signs['signal_quality']:.2f}"
print(log_entry)
def get_current_vital_signs(self):
"""Get current vital signs."""
return self.current_vital_signs.copy()
def get_vital_signs_history(self):
"""Get vital signs history."""
return list(self.vital_signs_history)
def get_statistics(self):
"""Get statistics from vital signs history."""
if not self.vital_signs_history:
return {}
stats = {}
# Heart rate statistics
heart_rates = [vs.get('heart_rate') for vs in self.vital_signs_history if vs.get('heart_rate')]
if heart_rates:
stats['heart_rate'] = {
'mean': np.mean(heart_rates),
'std': np.std(heart_rates),
'min': np.min(heart_rates),
'max': np.max(heart_rates)
}
# Signal quality statistics
signal_qualities = [vs.get('signal_quality') for vs in self.vital_signs_history if vs.get('signal_quality')]
if signal_qualities:
stats['signal_quality'] = {
'mean': np.mean(signal_qualities),
'std': np.std(signal_qualities),
'min': np.min(signal_qualities),
'max': np.max(signal_qualities)
}
return stats
Usage Example:
# Initialize real-time monitor
monitor = RealTimeVitalSignsMonitor(fs=1000, window_size=10, update_interval=1)
# Start monitoring
monitor.start_monitoring()
# Simulate real-time data (replace with your data source)
for i in range(100):
# Generate sample data (replace with your data acquisition)
sample_data = np.random.randn(100) + np.sin(2 * np.pi * 1.2 * np.arange(100) / 1000)
monitor.add_signal_data(sample_data)
time.sleep(0.1)
# Get current vital signs
current_vitals = monitor.get_current_vital_signs()
print(f"Current heart rate: {current_vitals.get('heart_rate', 'N/A')} BPM")
# Get statistics
stats = monitor.get_statistics()
print(f"Average heart rate: {stats.get('heart_rate', {}).get('mean', 'N/A')} BPM")
# Stop monitoring
monitor.stop_monitoring()
Example 4: Wearable Device Integration
This example demonstrates how to integrate VitalDSP with wearable devices for health monitoring.
Use Case: Processing data from fitness trackers, smartwatches, or other wearable health devices.
Key Features: * Data format conversion and preprocessing * Real-time processing for wearable constraints * Battery-efficient algorithms * Mobile-friendly analysis * Cloud integration
Implementation:
import json
import requests
from datetime import datetime, timedelta
class WearableDeviceIntegration:
"""Integration with wearable devices for health monitoring."""
def __init__(self, device_type="fitness_tracker", cloud_endpoint=None):
"""
Initialize wearable device integration.
Parameters:
-----------
device_type : str
Type of wearable device
cloud_endpoint : str, optional
Cloud endpoint for data synchronization
"""
self.device_type = device_type
self.cloud_endpoint = cloud_endpoint
self.device_config = self._get_device_config(device_type)
# Data processing parameters optimized for wearables
self.processing_params = {
'fs': self.device_config['sampling_rate'],
'window_size': 30, # 30-second windows
'update_interval': 5, # 5-second updates
'battery_optimized': True
}
# Local data storage
self.local_data = []
self.processed_data = []
def _get_device_config(self, device_type):
"""Get device-specific configuration."""
configs = {
'fitness_tracker': {
'sampling_rate': 100,
'signal_types': ['ppg', 'accelerometer'],
'battery_life': '7_days',
'data_format': 'json'
},
'smartwatch': {
'sampling_rate': 200,
'signal_types': ['ppg', 'ecg', 'accelerometer'],
'battery_life': '2_days',
'data_format': 'json'
},
'chest_strap': {
'sampling_rate': 1000,
'signal_types': ['ecg', 'accelerometer'],
'battery_life': '24_hours',
'data_format': 'csv'
}
}
return configs.get(device_type, configs['fitness_tracker'])
def process_wearable_data(self, raw_data, data_type="ppg"):
"""
Process data from wearable device.
Parameters:
-----------
raw_data : dict or array
Raw data from wearable device
data_type : str
Type of signal data
Returns:
--------
dict : Processed analysis results
"""
# Convert raw data to standard format
processed_signal = self._convert_wearable_data(raw_data, data_type)
if processed_signal is None:
return {'error': 'Failed to convert data'}
# Apply battery-optimized processing
results = self._battery_optimized_processing(processed_signal, data_type)
# Add metadata
results['device_type'] = self.device_type
results['data_type'] = data_type
results['timestamp'] = datetime.now().isoformat()
results['processing_params'] = self.processing_params
# Store locally
self.processed_data.append(results)
# Sync to cloud if enabled
if self.cloud_endpoint:
self._sync_to_cloud(results)
return results
def _convert_wearable_data(self, raw_data, data_type):
"""Convert wearable data to standard format."""
try:
if self.device_config['data_format'] == 'json':
if isinstance(raw_data, str):
data = json.loads(raw_data)
else:
data = raw_data
# Extract signal data based on device type
if self.device_type == 'fitness_tracker':
signal = data.get('ppg_data', [])
elif self.device_type == 'smartwatch':
signal = data.get('heart_rate_data', [])
elif self.device_type == 'chest_strap':
signal = data.get('ecg_data', [])
else:
signal = data.get('signal_data', [])
elif self.device_config['data_format'] == 'csv':
# Handle CSV data
signal = raw_data if isinstance(raw_data, list) else raw_data.tolist()
else:
signal = raw_data
# Convert to numpy array
signal_array = np.array(signal)
# Validate signal
if len(signal_array) < 10:
return None
return signal_array
except Exception as e:
print(f"Error converting wearable data: {e}")
return None
def _battery_optimized_processing(self, signal, data_type):
"""Apply battery-optimized signal processing."""
results = {}
try:
fs = self.processing_params['fs']
# Lightweight preprocessing
sf = SignalFiltering(signal)
# Use simpler filters for battery optimization
if data_type == 'ppg':
filtered_signal = sf.bandpass(lowcut=0.5, highcut=8.0, fs=fs, order=2)
elif data_type == 'ecg':
filtered_signal = sf.bandpass(lowcut=0.5, highcut=40.0, fs=fs, order=2)
else:
filtered_signal = sf.bandpass(lowcut=0.1, highcut=20.0, fs=fs, order=2)
# Basic feature extraction
if data_type in ['ppg', 'ecg']:
wm = WaveformMorphology(filtered_signal, fs=fs, signal_type=data_type)
if data_type == 'ppg':
peaks = wm.systolic_peaks
else:
peaks = wm.r_peaks
if len(peaks) > 2:
# Calculate heart rate
intervals = np.diff(peaks) / fs
valid_intervals = intervals[(intervals > 0.3) & (intervals < 2.0)]
if len(valid_intervals) > 1:
heart_rate = 60 / np.mean(valid_intervals)
results['heart_rate'] = heart_rate
results['heart_rate_variability'] = np.std(valid_intervals) / np.mean(valid_intervals) * 100
# Basic signal quality
sqi = SignalQualityIndex(filtered_signal)
quality_sqi, _, _ = sqi.amplitude_variability_sqi(
window_size=int(fs*5), step_size=int(fs*1), threshold=2, aggregate=False
)
results['signal_quality'] = np.mean(quality_sqi)
# Activity level (if accelerometer data available)
if data_type == 'accelerometer':
results['activity_level'] = self._calculate_activity_level(signal)
except Exception as e:
print(f"Error in battery-optimized processing: {e}")
results['error'] = str(e)
return results
def _calculate_activity_level(self, accelerometer_data):
"""Calculate activity level from accelerometer data."""
if len(accelerometer_data) < 3:
return 'unknown'
# Calculate magnitude of acceleration
if accelerometer_data.ndim > 1:
magnitude = np.sqrt(np.sum(accelerometer_data**2, axis=1))
else:
magnitude = np.abs(accelerometer_data)
# Calculate activity level
mean_magnitude = np.mean(magnitude)
if mean_magnitude < 1.0:
return 'sedentary'
elif mean_magnitude < 2.0:
return 'light'
elif mean_magnitude < 3.0:
return 'moderate'
else:
return 'vigorous'
def _sync_to_cloud(self, data):
"""Sync processed data to cloud endpoint."""
try:
if self.cloud_endpoint:
response = requests.post(
self.cloud_endpoint,
json=data,
timeout=10
)
if response.status_code == 200:
print("Data synced to cloud successfully")
else:
print(f"Cloud sync failed: {response.status_code}")
except Exception as e:
print(f"Cloud sync error: {e}")
def get_health_summary(self, hours=24):
"""Get health summary for specified time period."""
cutoff_time = datetime.now() - timedelta(hours=hours)
# Filter data for specified time period
recent_data = [
data for data in self.processed_data
if datetime.fromisoformat(data['timestamp']) > cutoff_time
]
if not recent_data:
return {'message': 'No data available for specified period'}
# Calculate summary statistics
summary = {
'period_hours': hours,
'data_points': len(recent_data),
'heart_rate_stats': self._calculate_heart_rate_stats(recent_data),
'activity_summary': self._calculate_activity_summary(recent_data),
'signal_quality_stats': self._calculate_signal_quality_stats(recent_data)
}
return summary
def _calculate_heart_rate_stats(self, data):
"""Calculate heart rate statistics."""
heart_rates = [d.get('heart_rate') for d in data if d.get('heart_rate')]
if not heart_rates:
return None
return {
'mean': np.mean(heart_rates),
'std': np.std(heart_rates),
'min': np.min(heart_rates),
'max': np.max(heart_rates),
'count': len(heart_rates)
}
def _calculate_activity_summary(self, data):
"""Calculate activity summary."""
activities = [d.get('activity_level') for d in data if d.get('activity_level')]
if not activities:
return None
activity_counts = {}
for activity in activities:
activity_counts[activity] = activity_counts.get(activity, 0) + 1
return activity_counts
def _calculate_signal_quality_stats(self, data):
"""Calculate signal quality statistics."""
qualities = [d.get('signal_quality') for d in data if d.get('signal_quality')]
if not qualities:
return None
return {
'mean': np.mean(qualities),
'std': np.std(qualities),
'min': np.min(qualities),
'max': np.max(qualities)
}
Usage Example:
# Initialize wearable integration
wearable = WearableDeviceIntegration(
device_type="fitness_tracker",
cloud_endpoint="https://api.example.com/vital-signs"
)
# Simulate wearable data
sample_data = {
'ppg_data': np.random.randn(1000) + np.sin(2 * np.pi * 1.2 * np.arange(1000) / 100),
'timestamp': datetime.now().isoformat(),
'device_id': 'tracker_001'
}
# Process wearable data
results = wearable.process_wearable_data(sample_data, data_type="ppg")
print(f"Heart rate: {results.get('heart_rate', 'N/A')} BPM")
print(f"Signal quality: {results.get('signal_quality', 'N/A')}")
# Get health summary
summary = wearable.get_health_summary(hours=24)
print(f"Health summary: {summary}")
Best Practices for Examples
Code Organization * Use clear, descriptive function and variable names * Include comprehensive docstrings * Implement proper error handling * Follow PEP 8 style guidelines
Performance Considerations * Optimize for your specific use case * Consider memory usage for large datasets * Use appropriate sampling rates * Implement efficient data structures
Clinical Applications * Validate results against clinical standards * Consider patient safety and data privacy * Document methodology for reproducibility * Include appropriate disclaimers
Integration Guidelines * Design for your target platform * Consider real-time constraints * Implement proper data validation * Include comprehensive logging
Testing and Validation * Test with various signal types and qualities * Validate against known datasets * Include edge case handling * Document test procedures
Example 5: Advanced Multi-Scale Entropy Analysis
This example demonstrates advanced nonlinear analysis techniques for assessing physiological signal complexity.
Use Case: Quantifying cardiac health and autonomic nervous system function through multi-scale entropy analysis.
Key Features: * Multi-scale entropy (MSE) computation * Composite multi-scale entropy (CMSE) * Refined composite multi-scale entropy (RCMSE) * Clinical interpretation of entropy metrics * Complexity-loss aging assessment
Implementation:
import numpy as np
from vitalDSP.physiological_features.advanced_entropy import MultiScaleEntropy
from vitalDSP.utils.data_processing.synthesize_data import generate_ecg_signal
from vitalDSP.physiological_features.waveform import WaveformMorphology
from vitalDSP.filtering.signal_filtering import SignalFiltering
def analyze_cardiac_complexity(ecg_signal, fs, patient_id=None):
"""
Comprehensive cardiac complexity analysis using multi-scale entropy.
Parameters:
-----------
ecg_signal : array-like
ECG signal data
fs : float
Sampling frequency (Hz)
patient_id : str, optional
Patient identifier
Returns:
--------
dict : Comprehensive complexity analysis results
"""
results = {
'patient_id': patient_id,
'sampling_frequency': fs,
'analysis': 'Multi-Scale Entropy'
}
# 1. Preprocess ECG signal
sf = SignalFiltering(ecg_signal)
filtered_ecg = sf.bandpass(lowcut=0.5, highcut=40.0, fs=fs, order=4)
# 2. Extract RR intervals
wm = WaveformMorphology(filtered_ecg, fs=fs, signal_type="ECG")
r_peaks = wm.r_peaks
if len(r_peaks) > 50:
# Calculate RR intervals in milliseconds
rr_intervals = np.diff(r_peaks) / fs * 1000
# Remove outliers
valid_rr = rr_intervals[(rr_intervals > 300) & (rr_intervals < 2000)]
if len(valid_rr) > 50:
# 3. Multi-Scale Entropy Analysis
mse_analyzer = MultiScaleEntropy(
signal=valid_rr,
m=2, # Embedding dimension
r_multiplier=0.15, # Tolerance multiplier
max_scale=20 # Maximum scale
)
# Standard MSE
mse_values, complexity_index = mse_analyzer.compute_mse()
results['mse_values'] = mse_values.tolist()
results['complexity_index'] = float(complexity_index)
# Composite MSE (more robust)
cmse_values, cmse_ci = mse_analyzer.compute_cmse()
results['cmse_values'] = cmse_values.tolist()
results['cmse_complexity_index'] = float(cmse_ci)
# Refined Composite MSE (best for short signals)
rcmse_values, rcmse_ci = mse_analyzer.compute_rcmse()
results['rcmse_values'] = rcmse_values.tolist()
results['rcmse_complexity_index'] = float(rcmse_ci)
# 4. Clinical Interpretation
interpretation = interpret_complexity_metrics(
complexity_index,
cmse_ci,
rcmse_ci,
len(valid_rr)
)
results['clinical_interpretation'] = interpretation
return results
def interpret_complexity_metrics(mse_ci, cmse_ci, rcmse_ci, signal_length):
"""Provide clinical interpretation of complexity metrics."""
interpretation = {
'complexity_level': 'normal',
'autonomic_status': 'balanced',
'cardiac_health': 'healthy',
'recommendations': []
}
# Complexity Index thresholds (clinically validated)
if cmse_ci < 30:
interpretation['complexity_level'] = 'reduced'
interpretation['autonomic_status'] = 'impaired'
interpretation['cardiac_health'] = 'at_risk'
interpretation['recommendations'].append(
'Reduced complexity suggests autonomic dysfunction or aging'
)
interpretation['recommendations'].append(
'Consider comprehensive cardiac evaluation'
)
elif cmse_ci > 60:
interpretation['complexity_level'] = 'high'
interpretation['autonomic_status'] = 'healthy'
interpretation['cardiac_health'] = 'excellent'
interpretation['recommendations'].append(
'High complexity indicates good cardiovascular health'
)
else:
interpretation['complexity_level'] = 'normal'
interpretation['autonomic_status'] = 'balanced'
interpretation['recommendations'].append(
'Complexity within normal range for age'
)
# Signal quality check
if signal_length < 100:
interpretation['recommendations'].append(
'Signal length is short - consider longer recording for robust assessment'
)
return interpretation
Usage Example:
# Generate or load ECG data
fs = 256
ecg_data = generate_ecg_signal(sfecg=fs, duration=300, hrmean=72, Anoise=0.05)
# Perform complexity analysis
results = analyze_cardiac_complexity(ecg_data, fs, patient_id="P001")
# Display results
print(f"Patient ID: {results['patient_id']}")
print(f"Complexity Index (CMSE): {results['cmse_complexity_index']:.2f}")
print(f"Cardiac Health: {results['clinical_interpretation']['cardiac_health']}")
print(f"Autonomic Status: {results['clinical_interpretation']['autonomic_status']}")
# Plot MSE curves
import matplotlib.pyplot as plt
scales = range(1, len(results['mse_values']) + 1)
plt.figure(figsize=(12, 6))
plt.plot(scales, results['mse_values'], 'b-', label='MSE', linewidth=2)
plt.plot(scales, results['cmse_values'], 'r-', label='CMSE', linewidth=2)
plt.plot(scales, results['rcmse_values'], 'g-', label='RCMSE', linewidth=2)
plt.xlabel('Scale Factor')
plt.ylabel('Sample Entropy')
plt.title('Multi-Scale Entropy Analysis')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
Example 6: Comprehensive Health Monitoring System
This example demonstrates a complete health monitoring pipeline with automated report generation.
Use Case: Continuous health monitoring with automated assessment and alerting for clinical decision support.
Key Features: * Multi-modal signal processing (ECG, PPG, Respiratory) * Comprehensive feature extraction * Automated health report generation * Clinical decision support * Trend analysis and alerts
Implementation:
from vitalDSP.health_analysis.health_report_generator import HealthReportGenerator
from vitalDSP.filtering.signal_filtering import SignalFiltering
from vitalDSP.physiological_features.hrv_analysis import HRVFeatures
from vitalDSP.physiological_features.waveform import WaveformMorphology
from vitalDSP.respiratory_analysis.respiratory_analysis import RespiratoryAnalysis
import pandas as pd
import numpy as np
class ComprehensiveHealthMonitor:
"""Complete health monitoring system with automated reporting."""
def __init__(self, patient_id, fs=256):
"""
Initialize health monitoring system.
Parameters:
-----------
patient_id : str
Patient identifier
fs : float
Sampling frequency (Hz)
"""
self.patient_id = patient_id
self.fs = fs
self.monitoring_history = []
def process_vital_signs(self, ecg_signal, ppg_signal=None, resp_signal=None):
"""
Process multiple vital signs and generate comprehensive assessment.
Parameters:
-----------
ecg_signal : array-like
ECG signal data
ppg_signal : array-like, optional
PPG signal data
resp_signal : array-like, optional
Respiratory signal data
Returns:
--------
dict : Comprehensive health assessment
"""
assessment = {
'patient_id': self.patient_id,
'timestamp': pd.Timestamp.now(),
'vital_signs': {}
}
# Process ECG signal
if ecg_signal is not None:
ecg_results = self._process_ecg(ecg_signal)
assessment['vital_signs']['ecg'] = ecg_results
# Process PPG signal
if ppg_signal is not None:
ppg_results = self._process_ppg(ppg_signal)
assessment['vital_signs']['ppg'] = ppg_results
# Process Respiratory signal
if resp_signal is not None:
resp_results = self._process_respiratory(resp_signal)
assessment['vital_signs']['respiratory'] = resp_results
# Generate health score
health_score = self._calculate_health_score(assessment['vital_signs'])
assessment['health_score'] = health_score
# Clinical recommendations
assessment['recommendations'] = self._generate_recommendations(
assessment['vital_signs'],
health_score
)
# Store in history
self.monitoring_history.append(assessment)
return assessment
def _process_ecg(self, ecg_signal):
"""Process ECG signal and extract features."""
results = {}
# Filter ECG
sf = SignalFiltering(ecg_signal)
filtered_ecg = sf.bandpass(lowcut=0.5, highcut=40.0, fs=self.fs, order=4)
# Detect R-peaks
wm = WaveformMorphology(filtered_ecg, fs=self.fs, signal_type="ECG")
r_peaks = wm.r_peaks
if len(r_peaks) > 10:
# RR intervals
rr_intervals = np.diff(r_peaks) / self.fs * 1000
valid_rr = rr_intervals[(rr_intervals > 300) & (rr_intervals < 2000)]
if len(valid_rr) > 5:
# Heart rate
heart_rate = 60000 / np.mean(valid_rr)
results['heart_rate'] = float(heart_rate)
results['heart_rate_status'] = self._assess_heart_rate(heart_rate)
# HRV analysis
hrv = HRVFeatures(valid_rr)
hrv_features = hrv.compute_all_features()
results['hrv'] = hrv_features
results['hrv_status'] = self._assess_hrv(hrv_features)
return results
def _process_ppg(self, ppg_signal):
"""Process PPG signal and extract features."""
results = {}
# Filter PPG
sf = SignalFiltering(ppg_signal)
filtered_ppg = sf.bandpass(lowcut=0.5, highcut=8.0, fs=self.fs, order=4)
# Detect peaks
wm = WaveformMorphology(filtered_ppg, fs=self.fs, signal_type="PPG")
systolic_peaks = wm.systolic_peaks
if len(systolic_peaks) > 5:
# Pulse rate
pulse_intervals = np.diff(systolic_peaks) / self.fs * 1000
valid_intervals = pulse_intervals[(pulse_intervals > 400) & (pulse_intervals < 2000)]
if len(valid_intervals) > 3:
pulse_rate = 60000 / np.mean(valid_intervals)
results['pulse_rate'] = float(pulse_rate)
results['pulse_rate_status'] = self._assess_heart_rate(pulse_rate)
# Pulse rate variability
prv = np.std(valid_intervals)
results['pulse_rate_variability'] = float(prv)
return results
def _process_respiratory(self, resp_signal):
"""Process respiratory signal."""
results = {}
try:
# Respiratory analysis
resp_analyzer = RespiratoryAnalysis(resp_signal, self.fs)
# Preprocess
filtered_resp = resp_analyzer.preprocess_signal(
detrend=True,
normalize=True,
filter_type='bandpass',
low_freq=0.1,
high_freq=0.5
)
# Estimate rate
resp_rate = resp_analyzer.compute_respiratory_rate()
results['respiratory_rate'] = float(resp_rate)
results['respiratory_status'] = self._assess_respiratory_rate(resp_rate)
except Exception as e:
results['error'] = str(e)
return results
def _assess_heart_rate(self, hr):
"""Assess heart rate status."""
if hr < 60:
return 'bradycardia'
elif hr > 100:
return 'tachycardia'
else:
return 'normal'
def _assess_hrv(self, hrv_features):
"""Assess HRV status."""
if 'sdnn' in hrv_features:
sdnn = hrv_features['sdnn']
if sdnn < 30:
return 'reduced'
elif sdnn > 50:
return 'good'
else:
return 'normal'
return 'unknown'
def _assess_respiratory_rate(self, rr):
"""Assess respiratory rate status."""
if rr < 12:
return 'bradypnea'
elif rr > 20:
return 'tachypnea'
else:
return 'normal'
def _calculate_health_score(self, vital_signs):
"""Calculate overall health score (0-100)."""
score = 100
# ECG assessment
if 'ecg' in vital_signs:
ecg = vital_signs['ecg']
if ecg.get('heart_rate_status') != 'normal':
score -= 15
if ecg.get('hrv_status') == 'reduced':
score -= 20
elif ecg.get('hrv_status') == 'good':
score += 5
# PPG assessment
if 'ppg' in vital_signs:
ppg = vital_signs['ppg']
if ppg.get('pulse_rate_status') != 'normal':
score -= 10
# Respiratory assessment
if 'respiratory' in vital_signs:
resp = vital_signs['respiratory']
if resp.get('respiratory_status') != 'normal':
score -= 15
return max(0, min(100, score))
def _generate_recommendations(self, vital_signs, health_score):
"""Generate clinical recommendations."""
recommendations = []
if health_score < 60:
recommendations.append('⚠️ URGENT: Immediate clinical evaluation recommended')
elif health_score < 75:
recommendations.append('⚠️ Clinical review recommended within 24 hours')
# Specific recommendations
if 'ecg' in vital_signs:
ecg = vital_signs['ecg']
if ecg.get('heart_rate_status') == 'bradycardia':
recommendations.append('• Evaluate for bradycardia causes')
elif ecg.get('heart_rate_status') == 'tachycardia':
recommendations.append('• Assess for tachycardia etiology')
if ecg.get('hrv_status') == 'reduced':
recommendations.append('• Reduced HRV - consider autonomic assessment')
if 'respiratory' in vital_signs:
resp = vital_signs['respiratory']
if resp.get('respiratory_status') != 'normal':
recommendations.append('• Abnormal respiratory rate - assess pulmonary function')
if not recommendations:
recommendations.append('✓ All vital signs within normal ranges')
recommendations.append('✓ Continue regular monitoring')
return recommendations
def generate_health_report(self, assessment, output_path='health_report.html'):
"""Generate comprehensive health report."""
# Prepare feature data for report generator
feature_data = {}
if 'ecg' in assessment['vital_signs']:
ecg = assessment['vital_signs']['ecg']
if 'hrv' in ecg:
for key, value in ecg['hrv'].items():
if isinstance(value, (int, float, np.number)):
feature_data[key] = [float(value)]
if feature_data:
# Generate report
report_gen = HealthReportGenerator(
feature_data=feature_data,
sampling_rate=self.fs
)
report_path = report_gen.generate_report(
patient_id=self.patient_id,
output_path=output_path
)
return report_path
return None
Usage Example:
# Initialize monitoring system
monitor = ComprehensiveHealthMonitor(patient_id="P001", fs=256)
# Generate or load vital signs
ecg_signal = generate_ecg_signal(sfecg=256, duration=60, hrmean=75, Anoise=0.05)
# Process vital signs
assessment = monitor.process_vital_signs(ecg_signal=ecg_signal)
# Display results
print(f"Patient ID: {assessment['patient_id']}")
print(f"Health Score: {assessment['health_score']}/100")
print(f"Timestamp: {assessment['timestamp']}")
print("\nVital Signs:")
if 'ecg' in assessment['vital_signs']:
ecg = assessment['vital_signs']['ecg']
print(f" Heart Rate: {ecg.get('heart_rate', 'N/A'):.1f} BPM ({ecg.get('heart_rate_status', 'unknown')})")
print(f" HRV Status: {ecg.get('hrv_status', 'unknown')}")
print("\nRecommendations:")
for rec in assessment['recommendations']:
print(f" {rec}")
# Generate HTML report
report_path = monitor.generate_health_report(assessment)
if report_path:
print(f"\n📄 Health report generated: {report_path}")
Example 7: Cross-Signal Synchronization Analysis
This example demonstrates advanced analysis of coupling between multiple physiological signals.
Use Case: Analyzing cardio-respiratory coupling and autonomic nervous system coordination.
Key Features: * Transfer entropy analysis * Phase synchronization * Cross-correlation analysis * Directional coupling assessment * Clinical interpretation of coupling metrics
Implementation:
from vitalDSP.physiological_features.transfer_entropy import TransferEntropy
from vitalDSP.filtering.signal_filtering import SignalFiltering
from vitalDSP.physiological_features.waveform import WaveformMorphology
import numpy as np
from scipy import signal as sp_signal
def analyze_cardiorespiratory_coupling(ecg_signal, resp_signal, fs):
"""
Analyze coupling between cardiac and respiratory signals.
Parameters:
-----------
ecg_signal : array-like
ECG signal data
resp_signal : array-like
Respiratory signal data
fs : float
Sampling frequency (Hz)
Returns:
--------
dict : Coupling analysis results
"""
results = {
'sampling_frequency': fs,
'analysis_type': 'cardio-respiratory_coupling'
}
# 1. Preprocess signals
sf_ecg = SignalFiltering(ecg_signal)
filtered_ecg = sf_ecg.bandpass(lowcut=0.5, highcut=40.0, fs=fs, order=4)
sf_resp = SignalFiltering(resp_signal)
filtered_resp = sf_resp.bandpass(lowcut=0.1, highcut=0.5, fs=fs, order=4)
# 2. Extract RR intervals
wm = WaveformMorphology(filtered_ecg, fs=fs, signal_type="ECG")
r_peaks = wm.r_peaks
if len(r_peaks) > 50:
rr_intervals = np.diff(r_peaks) / fs * 1000
valid_rr = rr_intervals[(rr_intervals > 300) & (rr_intervals < 2000)]
if len(valid_rr) > 50 and len(filtered_resp) > 100:
# Resample respiratory signal to match RR intervals
resp_resampled = sp_signal.resample(
filtered_resp,
len(valid_rr)
)
# 3. Transfer Entropy Analysis
te_analyzer = TransferEntropy(
source=resp_resampled[:len(valid_rr)],
target=valid_rr,
k=1, # History length
delay=1 # Time delay
)
# Respiratory -> Cardiac influence
te_resp_to_cardiac = te_analyzer.compute_transfer_entropy()
results['te_resp_to_cardiac'] = float(te_resp_to_cardiac)
# Cardiac -> Respiratory influence
te_analyzer_reverse = TransferEntropy(
source=valid_rr,
target=resp_resampled[:len(valid_rr)],
k=1,
delay=1
)
te_cardiac_to_resp = te_analyzer_reverse.compute_transfer_entropy()
results['te_cardiac_to_resp'] = float(te_cardiac_to_resp)
# Net directionality
net_directionality = te_resp_to_cardiac - te_cardiac_to_resp
results['net_directionality'] = float(net_directionality)
# 4. Cross-correlation analysis
cross_corr = np.correlate(
valid_rr - np.mean(valid_rr),
resp_resampled[:len(valid_rr)] - np.mean(resp_resampled[:len(valid_rr)]),
mode='full'
)
cross_corr = cross_corr / (len(valid_rr) * np.std(valid_rr) * np.std(resp_resampled[:len(valid_rr)]))
max_corr_idx = np.argmax(np.abs(cross_corr))
max_correlation = cross_corr[max_corr_idx]
lag = max_corr_idx - len(valid_rr) + 1
results['max_correlation'] = float(max_correlation)
results['optimal_lag'] = int(lag)
# 5. Clinical interpretation
interpretation = interpret_coupling_metrics(
te_resp_to_cardiac,
te_cardiac_to_resp,
net_directionality,
max_correlation
)
results['clinical_interpretation'] = interpretation
return results
def interpret_coupling_metrics(te_r_to_c, te_c_to_r, net_dir, max_corr):
"""Provide clinical interpretation of coupling metrics."""
interpretation = {
'coupling_strength': 'normal',
'dominant_direction': 'balanced',
'autonomic_status': 'healthy',
'recommendations': []
}
# Assess coupling strength
avg_te = (te_r_to_c + te_c_to_r) / 2
if avg_te < 0.05:
interpretation['coupling_strength'] = 'weak'
interpretation['autonomic_status'] = 'impaired'
interpretation['recommendations'].append(
'Weak cardio-respiratory coupling suggests autonomic dysfunction'
)
elif avg_te > 0.2:
interpretation['coupling_strength'] = 'strong'
interpretation['autonomic_status'] = 'healthy'
interpretation['recommendations'].append(
'Strong coupling indicates good autonomic function'
)
# Assess directionality
if abs(net_dir) > 0.1:
if net_dir > 0:
interpretation['dominant_direction'] = 'respiratory_to_cardiac'
interpretation['recommendations'].append(
'Respiratory drive dominant - typical of healthy state'
)
else:
interpretation['dominant_direction'] = 'cardiac_to_respiratory'
interpretation['recommendations'].append(
'Cardiac drive dominant - may indicate stress or pathology'
)
else:
interpretation['dominant_direction'] = 'balanced'
interpretation['recommendations'].append(
'Balanced bidirectional coupling'
)
# Assess correlation
if abs(max_corr) > 0.5:
interpretation['recommendations'].append(
f'High correlation ({max_corr:.2f}) indicates strong synchronization'
)
elif abs(max_corr) < 0.2:
interpretation['recommendations'].append(
f'Low correlation ({max_corr:.2f}) suggests reduced synchronization'
)
return interpretation
Usage Example:
from vitalDSP.utils.data_processing.synthesize_data import generate_ecg_signal, generate_resp_signal
# Generate signals
fs = 256
duration = 300
ecg_signal = generate_ecg_signal(sfecg=fs, duration=duration, hrmean=72, Anoise=0.05)
resp_signal = generate_resp_signal(sampling_rate=fs, duration=duration, frequency=0.25, amplitude=0.5)
# Analyze coupling
results = analyze_cardiorespiratory_coupling(ecg_signal, resp_signal, fs)
# Display results
print("Cardio-Respiratory Coupling Analysis")
print("=" * 50)
print(f"Transfer Entropy (Resp → Cardiac): {results['te_resp_to_cardiac']:.4f}")
print(f"Transfer Entropy (Cardiac → Resp): {results['te_cardiac_to_resp']:.4f}")
print(f"Net Directionality: {results['net_directionality']:.4f}")
print(f"Maximum Correlation: {results['max_correlation']:.3f}")
print(f"Optimal Lag: {results['optimal_lag']} samples")
print("\nClinical Interpretation:")
interp = results['clinical_interpretation']
print(f"Coupling Strength: {interp['coupling_strength']}")
print(f"Dominant Direction: {interp['dominant_direction']}")
print(f"Autonomic Status: {interp['autonomic_status']}")
print("\nRecommendations:")
for rec in interp['recommendations']:
print(f" • {rec}")
Example 8: Comprehensive Physiological Feature Extraction
This example demonstrates comprehensive feature extraction across all domains: time, frequency, nonlinear, and energy analysis.
Use Case: Complete physiological characterization for research and clinical assessment.
Key Features: * Time-domain feature extraction * Frequency-domain analysis (PSD, spectral bands) * Nonlinear dynamics (Poincaré, entropy, fractal dimension) * Beat-to-beat variability analysis * Energy distribution analysis * Automated feature interpretation
Implementation:
import numpy as np
from vitalDSP.physiological_features.time_domain import TimeDomainFeatures
from vitalDSP.physiological_features.frequency_domain import FrequencyDomainFeatures
from vitalDSP.physiological_features.nonlinear import NonlinearFeatures
from vitalDSP.physiological_features.beat_to_beat import BeatToBeatAnalysis
from vitalDSP.physiological_features.energy_analysis import EnergyAnalysis
from vitalDSP.physiological_features.waveform import WaveformMorphology
from vitalDSP.filtering.signal_filtering import SignalFiltering
from vitalDSP.utils.data_processing.synthesize_data import generate_ecg_signal
import pandas as pd
def extract_comprehensive_features(ecg_signal, fs, patient_id=None):
"""
Extract comprehensive physiological features from ECG signal.
Parameters:
-----------
ecg_signal : array-like
ECG signal data
fs : float
Sampling frequency (Hz)
patient_id : str, optional
Patient identifier
Returns:
--------
dict : Complete feature set with clinical interpretation
"""
results = {
'patient_id': patient_id,
'sampling_frequency': fs,
'features': {},
'interpretation': {}
}
# 1. Preprocess signal
sf = SignalFiltering(ecg_signal)
filtered_ecg = sf.bandpass(lowcut=0.5, highcut=40.0, fs=fs, order=4)
# 2. Extract R-peaks and RR intervals
wm = WaveformMorphology(filtered_ecg, fs=fs, signal_type="ECG")
r_peaks = wm.r_peaks
if len(r_peaks) < 10:
results['error'] = 'Insufficient R-peaks detected'
return results
# Calculate RR intervals in milliseconds
rr_intervals = np.diff(r_peaks) / fs * 1000
valid_rr = rr_intervals[(rr_intervals > 300) & (rr_intervals < 2000)]
if len(valid_rr) < 5:
results['error'] = 'Insufficient valid RR intervals'
return results
# 3. TIME DOMAIN FEATURES
print("Extracting time-domain features...")
td_features = TimeDomainFeatures(valid_rr)
# Basic statistics
results['features']['mean_rr'] = td_features.mean()
results['features']['std_rr'] = td_features.std()
results['features']['min_rr'] = td_features.min()
results['features']['max_rr'] = td_features.max()
results['features']['range_rr'] = td_features.max() - td_features.min()
# HRV time-domain metrics
results['features']['sdnn'] = td_features.sdnn()
results['features']['rmssd'] = td_features.rmssd()
results['features']['sdsd'] = td_features.sdsd()
results['features']['nn50'] = td_features.nn50()
results['features']['pnn50'] = td_features.pnn50()
results['features']['nn20'] = td_features.nn20()
results['features']['pnn20'] = td_features.pnn20()
# Heart rate
results['features']['mean_hr'] = 60000 / results['features']['mean_rr']
# 4. FREQUENCY DOMAIN FEATURES
print("Extracting frequency-domain features...")
fd_features = FrequencyDomainFeatures(valid_rr, fs=4.0) # Typical RR sampling ~4Hz
# Compute PSD
psd_results = fd_features.compute_psd(method='welch')
results['features']['vlf_power'] = psd_results['vlf_power']
results['features']['lf_power'] = psd_results['lf_power']
results['features']['hf_power'] = psd_results['hf_power']
results['features']['total_power'] = psd_results['total_power']
results['features']['lf_hf_ratio'] = psd_results['lf_hf_ratio']
# Normalized powers
if results['features']['total_power'] > 0:
results['features']['lf_nu'] = (results['features']['lf_power'] /
(results['features']['lf_power'] + results['features']['hf_power']) * 100)
results['features']['hf_nu'] = (results['features']['hf_power'] /
(results['features']['lf_power'] + results['features']['hf_power']) * 100)
# Peak frequencies
results['features']['lf_peak'] = psd_results.get('lf_peak_frequency', 0)
results['features']['hf_peak'] = psd_results.get('hf_peak_frequency', 0)
# 5. NONLINEAR FEATURES
print("Extracting nonlinear features...")
nl_features = NonlinearFeatures(valid_rr)
# Poincaré analysis
poincare = nl_features.compute_poincare_features()
results['features']['sd1'] = poincare['sd1']
results['features']['sd2'] = poincare['sd2']
results['features']['sd_ratio'] = poincare['sd_ratio']
# Entropy measures
results['features']['sample_entropy'] = nl_features.sample_entropy(m=2, r=0.2)
results['features']['approximate_entropy'] = nl_features.approximate_entropy(m=2, r=0.2)
# Detrended Fluctuation Analysis
results['features']['dfa_alpha1'] = nl_features.dfa(scale_min=4, scale_max=16)
results['features']['dfa_alpha2'] = nl_features.dfa(scale_min=16, scale_max=64)
# 6. BEAT-TO-BEAT ANALYSIS
print("Extracting beat-to-beat features...")
bb_analysis = BeatToBeatAnalysis(filtered_ecg, fs=fs, r_peaks=r_peaks)
# Beat-to-beat variability
bb_features = bb_analysis.compute_variability_features()
results['features']['beat_variability'] = bb_features.get('variability_index', 0)
results['features']['successive_difference'] = bb_features.get('successive_difference', 0)
# 7. ENERGY ANALYSIS
print("Extracting energy features...")
energy_analysis = EnergyAnalysis(filtered_ecg, fs=fs)
# Signal energy distribution
energy_features = energy_analysis.compute_energy_features()
results['features']['total_energy'] = energy_features.get('total_energy', 0)
results['features']['energy_mean'] = energy_features.get('mean_energy', 0)
results['features']['energy_std'] = energy_features.get('std_energy', 0)
# Spectral energy
spectral_energy = energy_analysis.compute_spectral_energy()
results['features']['spectral_energy'] = spectral_energy.get('total_spectral_energy', 0)
# 8. CLINICAL INTERPRETATION
results['interpretation'] = interpret_comprehensive_features(results['features'])
return results
def interpret_comprehensive_features(features):
"""Provide comprehensive clinical interpretation."""
interpretation = {
'hrv_status': 'normal',
'autonomic_balance': 'balanced',
'cardiac_health': 'healthy',
'risk_level': 'low',
'recommendations': []
}
# Time-domain assessment
sdnn = features.get('sdnn', 0)
rmssd = features.get('rmssd', 0)
pnn50 = features.get('pnn50', 0)
if sdnn < 50:
interpretation['hrv_status'] = 'reduced'
interpretation['cardiac_health'] = 'at_risk'
interpretation['risk_level'] = 'moderate'
interpretation['recommendations'].append(
'Reduced SDNN (<50ms) suggests decreased HRV and increased cardiac risk'
)
elif sdnn < 100:
interpretation['hrv_status'] = 'fair'
interpretation['recommendations'].append(
'SDNN within fair range (50-100ms) - regular monitoring recommended'
)
else:
interpretation['hrv_status'] = 'good'
interpretation['recommendations'].append(
'Good SDNN (>100ms) indicates healthy HRV'
)
# Frequency-domain assessment
lf_hf_ratio = features.get('lf_hf_ratio', 1.0)
if lf_hf_ratio < 0.5:
interpretation['autonomic_balance'] = 'parasympathetic_dominant'
interpretation['recommendations'].append(
'Low LF/HF ratio (<0.5) suggests parasympathetic dominance'
)
elif lf_hf_ratio > 2.0:
interpretation['autonomic_balance'] = 'sympathetic_dominant'
interpretation['recommendations'].append(
'High LF/HF ratio (>2.0) suggests sympathetic dominance or stress'
)
else:
interpretation['autonomic_balance'] = 'balanced'
interpretation['recommendations'].append(
'LF/HF ratio within normal range indicates balanced autonomic function'
)
# Nonlinear assessment
sd1 = features.get('sd1', 0)
sd2 = features.get('sd2', 0)
sample_entropy = features.get('sample_entropy', 0)
if sd1 < 10:
interpretation['recommendations'].append(
'Low SD1 (<10ms) indicates reduced short-term variability'
)
if sample_entropy < 1.0:
interpretation['recommendations'].append(
'Low sample entropy (<1.0) suggests reduced signal complexity'
)
elif sample_entropy > 2.0:
interpretation['recommendations'].append(
'High sample entropy (>2.0) indicates good signal complexity'
)
# Overall risk assessment
risk_factors = 0
if sdnn < 50:
risk_factors += 2
if rmssd < 20:
risk_factors += 1
if lf_hf_ratio > 2.5:
risk_factors += 1
if sample_entropy < 1.0:
risk_factors += 1
if risk_factors >= 3:
interpretation['risk_level'] = 'high'
interpretation['cardiac_health'] = 'concerning'
interpretation['recommendations'].append(
'⚠️ Multiple risk factors detected - comprehensive cardiac evaluation recommended'
)
elif risk_factors >= 1:
interpretation['risk_level'] = 'moderate'
interpretation['cardiac_health'] = 'fair'
interpretation['recommendations'].append(
'⚠️ Some risk factors present - regular monitoring advised'
)
else:
interpretation['risk_level'] = 'low'
interpretation['cardiac_health'] = 'healthy'
interpretation['recommendations'].append(
'✓ All metrics within healthy ranges'
)
return interpretation
Usage Example:
# Generate or load ECG data
fs = 256
duration = 300 # 5 minutes
ecg_data = generate_ecg_signal(sfecg=fs, duration=duration, hrmean=75, Anoise=0.05)
# Extract comprehensive features
results = extract_comprehensive_features(ecg_data, fs, patient_id="P001")
# Display results
print(f"Patient ID: {results['patient_id']}")
print(f"\n{'='*60}")
print("TIME DOMAIN FEATURES")
print(f"{'='*60}")
print(f"Mean RR: {results['features']['mean_rr']:.2f} ms")
print(f"SDNN: {results['features']['sdnn']:.2f} ms")
print(f"RMSSD: {results['features']['rmssd']:.2f} ms")
print(f"pNN50: {results['features']['pnn50']:.2f} %")
print(f"Heart Rate: {results['features']['mean_hr']:.1f} BPM")
print(f"\n{'='*60}")
print("FREQUENCY DOMAIN FEATURES")
print(f"{'='*60}")
print(f"VLF Power: {results['features']['vlf_power']:.2f} ms²")
print(f"LF Power: {results['features']['lf_power']:.2f} ms²")
print(f"HF Power: {results['features']['hf_power']:.2f} ms²")
print(f"LF/HF Ratio: {results['features']['lf_hf_ratio']:.2f}")
print(f"Total Power: {results['features']['total_power']:.2f} ms²")
print(f"\n{'='*60}")
print("NONLINEAR FEATURES")
print(f"{'='*60}")
print(f"SD1: {results['features']['sd1']:.2f} ms")
print(f"SD2: {results['features']['sd2']:.2f} ms")
print(f"SD1/SD2 Ratio: {results['features']['sd_ratio']:.3f}")
print(f"Sample Entropy: {results['features']['sample_entropy']:.3f}")
print(f"Approximate Entropy: {results['features']['approximate_entropy']:.3f}")
print(f"\n{'='*60}")
print("CLINICAL INTERPRETATION")
print(f"{'='*60}")
interp = results['interpretation']
print(f"HRV Status: {interp['hrv_status']}")
print(f"Autonomic Balance: {interp['autonomic_balance']}")
print(f"Cardiac Health: {interp['cardiac_health']}")
print(f"Risk Level: {interp['risk_level']}")
print(f"\nRecommendations:")
for rec in interp['recommendations']:
print(f" {rec}")
# Export to DataFrame for further analysis
features_df = pd.DataFrame([results['features']])
features_df.to_csv(f"features_{results['patient_id']}.csv", index=False)
print(f"\n✅ Features exported to features_{results['patient_id']}.csv")
Example 9: Machine Learning for Physiological Signal Analysis
This example demonstrates machine learning applications including anomaly detection, signal denoising with autoencoders, and pattern classification.
Use Case: Automated signal quality assessment, artifact detection, and arrhythmia classification using deep learning.
Key Features: * Autoencoder-based signal denoising * Anomaly detection for artifact identification * CNN1D for pattern classification * Feature extraction for ML * Model training and evaluation * Real-time inference pipeline
Implementation:
import numpy as np
from vitalDSP.ml_models.deep_models import (
StandardAutoencoder, ConvolutionalAutoencoder, CNN1D
)
from vitalDSP.advanced_computation.anomaly_detection import AnomalyDetection
from vitalDSP.ml_models.feature_extractor import FeatureExtractor
from vitalDSP.filtering.signal_filtering import SignalFiltering
from vitalDSP.utils.data_processing.synthesize_data import generate_ecg_signal
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
def ml_signal_analysis_pipeline(clean_signals, noisy_signals, fs, anomaly_threshold=2.5):
"""
Complete ML pipeline for physiological signal analysis.
Parameters:
-----------
clean_signals : list of arrays
Clean reference signals for training
noisy_signals : list of arrays
Noisy signals to denoise
fs : float
Sampling frequency
anomaly_threshold : float
Z-score threshold for anomaly detection
Returns:
--------
dict : ML analysis results including denoised signals and detected anomalies
"""
results = {
'denoising': {},
'anomaly_detection': {},
'classification': {},
'model_performance': {}
}
# 1. SIGNAL DENOISING WITH AUTOENCODER
print("="*60)
print("1. AUTOENCODER-BASED SIGNAL DENOISING")
print("="*60)
# Prepare data for autoencoder
signal_length = min([len(s) for s in clean_signals])
X_clean = np.array([s[:signal_length] for s in clean_signals])
X_noisy = np.array([s[:signal_length] for s in noisy_signals])
# Normalize
scaler = StandardScaler()
X_clean_norm = scaler.fit_transform(X_clean)
X_noisy_norm = scaler.transform(X_noisy)
# Reshape for autoencoder (samples, timesteps, features)
X_clean_reshaped = X_clean_norm.reshape(X_clean_norm.shape[0], X_clean_norm.shape[1], 1)
X_noisy_reshaped = X_noisy_norm.reshape(X_noisy_norm.shape[0], X_noisy_norm.shape[1], 1)
# Train Convolutional Autoencoder
print("Training Convolutional Autoencoder...")
autoencoder = ConvolutionalAutoencoder(
input_shape=(signal_length, 1),
latent_dim=32
)
history = autoencoder.fit(
X_noisy_reshaped,
X_clean_reshaped,
epochs=50,
batch_size=32,
validation_split=0.2,
verbose=0
)
# Denoise signals
X_denoised = autoencoder.predict(X_noisy_reshaped)
X_denoised = scaler.inverse_transform(X_denoised.reshape(X_denoised.shape[0], -1))
# Calculate reconstruction error
reconstruction_error = np.mean((X_clean - X_denoised) ** 2, axis=1)
results['denoising']['clean_signals'] = X_clean
results['denoising']['noisy_signals'] = X_noisy
results['denoising']['denoised_signals'] = X_denoised
results['denoising']['reconstruction_error'] = reconstruction_error
results['denoising']['mean_error'] = np.mean(reconstruction_error)
results['denoising']['std_error'] = np.std(reconstruction_error)
print(f"✓ Denoising complete")
print(f" Mean Reconstruction Error: {results['denoising']['mean_error']:.4f}")
print(f" Std Reconstruction Error: {results['denoising']['std_error']:.4f}")
# 2. ANOMALY DETECTION
print(f"\n{'='*60}")
print("2. ANOMALY DETECTION")
print("="*60)
anomaly_results = []
for i, signal in enumerate(noisy_signals):
detector = AnomalyDetection(signal)
# Z-score based detection
anomalies_zscore = detector.detect_anomalies(
method='z_score',
threshold=anomaly_threshold
)
# Statistical detection
anomalies_stat = detector.detect_anomalies(
method='statistical',
window_size=100
)
# Isolation Forest
try:
anomalies_isolation = detector.detect_anomalies(
method='isolation_forest',
contamination=0.1
)
except:
anomalies_isolation = []
anomaly_info = {
'signal_index': i,
'z_score_anomalies': len(anomalies_zscore),
'statistical_anomalies': len(anomalies_stat),
'isolation_forest_anomalies': len(anomalies_isolation),
'z_score_locations': anomalies_zscore,
'signal_quality': 'good' if len(anomalies_zscore) < 10 else 'poor'
}
anomaly_results.append(anomaly_info)
results['anomaly_detection']['per_signal'] = anomaly_results
results['anomaly_detection']['total_signals'] = len(noisy_signals)
results['anomaly_detection']['good_quality'] = sum(1 for a in anomaly_results if a['signal_quality'] == 'good')
results['anomaly_detection']['poor_quality'] = sum(1 for a in anomaly_results if a['signal_quality'] == 'poor')
print(f"✓ Anomaly detection complete")
print(f" Total signals analyzed: {results['anomaly_detection']['total_signals']}")
print(f" Good quality signals: {results['anomaly_detection']['good_quality']}")
print(f" Poor quality signals: {results['anomaly_detection']['poor_quality']}")
# 3. FEATURE EXTRACTION FOR ML
print(f"\n{'='*60}")
print("3. FEATURE EXTRACTION FOR MACHINE LEARNING")
print("="*60)
extractor = FeatureExtractor()
features_list = []
for signal in X_denoised:
# Extract comprehensive features
features = extractor.extract_features(
signal,
fs=fs,
feature_types=['statistical', 'temporal', 'spectral']
)
features_list.append(features)
features_array = np.array(features_list)
results['classification']['features'] = features_array
results['classification']['n_features'] = features_array.shape[1]
print(f"✓ Feature extraction complete")
print(f" Number of features extracted: {features_array.shape[1]}")
print(f" Feature dimensions: {features_array.shape}")
# 4. CNN1D CLASSIFICATION (Example with synthetic labels)
print(f"\n{'='*60}")
print("4. CNN1D PATTERN CLASSIFICATION")
print("="*60)
# Create synthetic labels for demonstration (normal vs. abnormal)
labels = np.array([0 if e < np.median(reconstruction_error) else 1
for e in reconstruction_error])
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X_clean_reshaped, labels, test_size=0.2, random_state=42, stratify=labels
)
# Train CNN1D classifier
print("Training CNN1D classifier...")
cnn = CNN1D(
input_shape=(signal_length, 1),
num_classes=2,
num_filters=[32, 64, 128],
kernel_size=3,
pool_size=2
)
cnn_history = cnn.fit(
X_train, y_train,
epochs=30,
batch_size=32,
validation_split=0.2,
verbose=0
)
# Evaluate
test_loss, test_accuracy = cnn.evaluate(X_test, y_test)
results['classification']['test_accuracy'] = test_accuracy
results['classification']['test_loss'] = test_loss
results['classification']['n_train'] = len(X_train)
results['classification']['n_test'] = len(X_test)
print(f"✓ Classification complete")
print(f" Test Accuracy: {test_accuracy:.2%}")
print(f" Test Loss: {test_loss:.4f}")
print(f" Training samples: {len(X_train)}")
print(f" Test samples: {len(X_test)}")
return results
def evaluate_ml_pipeline(results):
"""Evaluate and display ML pipeline results."""
print(f"\n{'='*60}")
print("ML PIPELINE PERFORMANCE SUMMARY")
print("="*60)
# Denoising performance
print(f"\n📊 Denoising Performance:")
print(f" Mean Reconstruction Error: {results['denoising']['mean_error']:.4f}")
print(f" Std Reconstruction Error: {results['denoising']['std_error']:.4f}")
# Calculate SNR improvement
original_noise = np.mean(np.var(results['denoising']['noisy_signals'] -
results['denoising']['clean_signals'], axis=1))
residual_noise = np.mean(np.var(results['denoising']['denoised_signals'] -
results['denoising']['clean_signals'], axis=1))
snr_improvement = 10 * np.log10(original_noise / (residual_noise + 1e-10))
print(f" SNR Improvement: {snr_improvement:.2f} dB")
# Anomaly detection
print(f"\n🔍 Anomaly Detection:")
print(f" Total signals: {results['anomaly_detection']['total_signals']}")
print(f" Good quality: {results['anomaly_detection']['good_quality']} "
f"({results['anomaly_detection']['good_quality']/results['anomaly_detection']['total_signals']*100:.1f}%)")
print(f" Poor quality: {results['anomaly_detection']['poor_quality']} "
f"({results['anomaly_detection']['poor_quality']/results['anomaly_detection']['total_signals']*100:.1f}%)")
# Classification
print(f"\n🎯 Classification Performance:")
print(f" Test Accuracy: {results['classification']['test_accuracy']:.2%}")
print(f" Number of features: {results['classification']['n_features']}")
# Overall assessment
print(f"\n{'='*60}")
print("OVERALL ASSESSMENT")
print("="*60)
quality_score = (
(snr_improvement / 10) * 30 + # Denoising (30 points)
(results['anomaly_detection']['good_quality'] /
results['anomaly_detection']['total_signals']) * 30 + # Quality (30 points)
results['classification']['test_accuracy'] * 40 # Classification (40 points)
)
print(f"Quality Score: {quality_score:.1f}/100")
if quality_score >= 80:
print("✅ Excellent - ML pipeline performing optimally")
elif quality_score >= 60:
print("⚠️ Good - ML pipeline acceptable, some improvements possible")
else:
print("❌ Fair - ML pipeline needs optimization")
Usage Example:
# Generate training data
n_signals = 100
fs = 256
signal_length = 2560 # 10 seconds
print("Generating training data...")
clean_signals = []
noisy_signals = []
for i in range(n_signals):
# Generate clean ECG
clean = generate_ecg_signal(sfecg=fs, duration=10, hrmean=70+np.random.randn()*5, Anoise=0.0)
clean_signals.append(clean[:signal_length])
# Add noise
noise = np.random.normal(0, 0.1, signal_length)
noisy = clean[:signal_length] + noise
noisy_signals.append(noisy)
# Run ML pipeline
results = ml_signal_analysis_pipeline(clean_signals, noisy_signals, fs=fs)
# Evaluate results
evaluate_ml_pipeline(results)
# Save models for deployment
print("\n💾 Models ready for deployment")
print(" - Autoencoder trained for real-time denoising")
print(" - Anomaly detector calibrated for quality assessment")
print(" - CNN1D classifier ready for pattern recognition")
These examples provide a solid foundation for implementing VitalDSP in various real-world scenarios. Adapt them to your specific needs and requirements.
For more advanced examples and use cases, explore the Jupyter notebooks in the sample_notebooks section.