Examples

This section provides practical examples and real-world use cases for VitalDSP. Each example demonstrates specific functionality and can be used as a starting point for your own projects.

Example Categories

Clinical Research Examples
  • ECG analysis for cardiovascular research

  • PPG analysis for hemodynamic studies

  • Respiratory analysis for sleep studies

Healthcare Monitoring Examples
  • Real-time vital signs monitoring

  • Continuous patient monitoring

  • Telemedicine applications

Wearable Device Examples
  • Fitness tracker integration

  • Smartwatch signal processing

  • Mobile health applications

Medical Device Examples
  • Patient monitoring systems

  • Diagnostic equipment integration

  • Clinical decision support systems

Example 1: ECG Analysis for Clinical Research

This example demonstrates comprehensive ECG analysis for cardiovascular research applications.

Use Case: Analyzing ECG signals from clinical trials to assess cardiovascular health and detect abnormalities.

Key Features: * R-peak detection and RR interval analysis * Heart rate variability (HRV) analysis * Morphological feature extraction * Signal quality assessment * Clinical interpretation

Implementation:

import numpy as np
import pandas as pd
from vitalDSP.filtering.signal_filtering import SignalFiltering
from vitalDSP.physiological_features.waveform import WaveformMorphology
from vitalDSP.physiological_features.hrv_analysis import HRVFeatures
from vitalDSP.signal_quality_assessment.signal_quality_index import SignalQualityIndex

def analyze_ecg_clinical_research(ecg_signal, fs, patient_id=None):
    """
    Comprehensive ECG analysis for clinical research.

    Parameters:
    -----------
    ecg_signal : array-like
        ECG signal data
    fs : float
        Sampling frequency (Hz)
    patient_id : str, optional
        Patient identifier

    Returns:
    --------
    dict : Comprehensive analysis results
    """

    results = {
        'patient_id': patient_id,
        'sampling_frequency': fs,
        'signal_length': len(ecg_signal),
        'analysis_timestamp': pd.Timestamp.now()
    }

# 1. Signal Preprocessing
sf = SignalFiltering(ecg_signal)
filtered_ecg = sf.bandpass(lowcut=0.5, highcut=40.0, fs=fs, order=4)

    # 2. Signal Quality Assessment
    sqi = SignalQualityIndex(filtered_ecg)
    quality_metrics = {}

# Amplitude variability SQI
amp_sqi, _, _ = sqi.amplitude_variability_sqi(
    window_size=int(fs*5), step_size=int(fs*1), threshold=2, aggregate=False
)
quality_metrics['amplitude_variability'] = np.mean(amp_sqi)

# Baseline wander SQI
baseline_sqi, _, _ = sqi.baseline_wander_sqi(
    window_size=int(fs*5), step_size=int(fs*1), threshold=2, aggregate=False
)
quality_metrics['baseline_wander'] = np.mean(baseline_sqi)

# Signal-to-noise ratio
snr_sqi, _, _ = sqi.snr_sqi(
    window_size=int(fs*5), step_size=int(fs*1), threshold=-1, aggregate=False
)
quality_metrics['signal_to_noise_ratio'] = np.mean(snr_sqi)

    results['quality_metrics'] = quality_metrics

    # 3. R-Peak Detection and RR Interval Analysis
    wm = WaveformMorphology(filtered_ecg, fs=fs, signal_type="ECG")
    r_peaks = wm.r_peaks

    if len(r_peaks) > 10:  # Ensure sufficient data
        # Calculate RR intervals
        rr_intervals = np.diff(r_peaks) / fs * 1000  # Convert to ms

        # Remove outliers
        valid_rr = rr_intervals[(rr_intervals > 300) & (rr_intervals < 2000)]

        if len(valid_rr) > 5:
            # Heart rate calculation
            heart_rate = 60 * fs / np.mean(np.diff(r_peaks))

            # HRV Analysis
            hrv = HRVFeatures(valid_rr)
            hrv_features = hrv.compute_all_features()

            results['heart_rate'] = heart_rate
            results['rr_intervals'] = {
                'count': len(valid_rr),
                'mean': np.mean(valid_rr),
                'std': np.std(valid_rr),
                'min': np.min(valid_rr),
                'max': np.max(valid_rr)
            }
            results['hrv_features'] = hrv_features

            # Morphological features
            morphological_features = {
                'r_peak_count': len(r_peaks),
                'qrs_duration': wm.qrs_duration if hasattr(wm, 'qrs_duration') else None,
                'qt_interval': wm.qt_interval if hasattr(wm, 'qt_interval') else None
            }
            results['morphological_features'] = morphological_features

            # Clinical interpretation
            clinical_interpretation = interpret_ecg_results(hrv_features, heart_rate)
            results['clinical_interpretation'] = clinical_interpretation

    return results

def interpret_ecg_results(hrv_features, heart_rate):
    """Provide clinical interpretation of ECG analysis results."""

    interpretation = {
        'heart_rate_status': 'normal' if 60 <= heart_rate <= 100 else 'abnormal',
        'hrv_status': 'normal',
        'risk_assessment': 'low',
        'recommendations': []
    }

    # Heart rate interpretation
    if heart_rate < 60:
        interpretation['heart_rate_status'] = 'bradycardia'
        interpretation['recommendations'].append('Consider bradycardia evaluation')
    elif heart_rate > 100:
        interpretation['heart_rate_status'] = 'tachycardia'
        interpretation['recommendations'].append('Consider tachycardia evaluation')

    # HRV interpretation
    if 'sdnn' in hrv_features:
        sdnn = hrv_features['sdnn']
        if sdnn < 30:
            interpretation['hrv_status'] = 'reduced'
            interpretation['risk_assessment'] = 'elevated'
            interpretation['recommendations'].append('Reduced HRV may indicate stress or cardiovascular risk')
        elif sdnn > 50:
            interpretation['hrv_status'] = 'good'
            interpretation['risk_assessment'] = 'low'

    return interpretation

Usage Example:

# Load ECG data (replace with your data loading method)
ecg_data = np.load('ecg_data.npy')  # Your ECG signal
fs = 1000  # Sampling frequency

# Perform analysis
results = analyze_ecg_clinical_research(ecg_data, fs, patient_id="P001")

# Display results
print(f"Patient ID: {results['patient_id']}")
print(f"Heart Rate: {results['heart_rate']:.1f} BPM")
print(f"HRV Status: {results['clinical_interpretation']['hrv_status']}")
print(f"Risk Assessment: {results['clinical_interpretation']['risk_assessment']}")

Example 2: PPG Analysis for Hemodynamic Studies

This example demonstrates PPG signal analysis for hemodynamic studies and cardiovascular assessment.

Use Case: Analyzing PPG signals to assess blood volume changes, pulse wave characteristics, and cardiovascular health.

Key Features: * Systolic and diastolic peak detection * Pulse wave analysis * Hemodynamic parameter estimation * Signal quality assessment * Clinical interpretation

Implementation:

def analyze_ppg_hemodynamic(ppg_signal, fs, patient_id=None):
    """
    Comprehensive PPG analysis for hemodynamic studies.

    Parameters:
    -----------
    ppg_signal : array-like
        PPG signal data
    fs : float
        Sampling frequency (Hz)
    patient_id : str, optional
        Patient identifier

    Returns:
    --------
    dict : Comprehensive PPG analysis results
    """

    results = {
        'patient_id': patient_id,
        'sampling_frequency': fs,
        'signal_length': len(ppg_signal),
        'analysis_timestamp': pd.Timestamp.now()
    }

# 1. Signal Preprocessing
sf = SignalFiltering(ppg_signal)
filtered_ppg = sf.bandpass(lowcut=0.5, highcut=8.0, fs=fs, order=4)  # PPG-specific range

    # 2. Signal Quality Assessment
    sqi = SignalQualityIndex(filtered_ppg)
    quality_metrics = {}

# Amplitude variability SQI
amp_sqi, _, _ = sqi.amplitude_variability_sqi(
    window_size=int(fs*5), step_size=int(fs*1), threshold=2, aggregate=False
)
quality_metrics['amplitude_variability'] = np.mean(amp_sqi)

# Signal-to-noise ratio
snr_sqi, _, _ = sqi.snr_sqi(
    window_size=int(fs*5), step_size=int(fs*1), threshold=-1, aggregate=False
)
quality_metrics['signal_to_noise_ratio'] = np.mean(snr_sqi)

    results['quality_metrics'] = quality_metrics

    # 3. PPG-Specific Analysis
    wm = WaveformMorphology(filtered_ppg, fs=fs, signal_type="PPG")

    # Detect systolic peaks
    systolic_peaks = wm.systolic_peaks

    if len(systolic_peaks) > 5:
        # Calculate pulse rate
        pulse_rate = 60 * fs / np.mean(np.diff(systolic_peaks))

        # Pulse interval analysis
        pulse_intervals = np.diff(systolic_peaks) / fs * 1000  # Convert to ms
        valid_intervals = pulse_intervals[(pulse_intervals > 400) & (pulse_intervals < 2000)]

        if len(valid_intervals) > 3:
            # Pulse rate variability (similar to HRV)
            prv_features = {
                'mean_pulse_interval': np.mean(valid_intervals),
                'std_pulse_interval': np.std(valid_intervals),
                'pulse_rate_variability': np.std(valid_intervals) / np.mean(valid_intervals) * 100
            }

            # Hemodynamic parameters
            hemodynamic_params = calculate_hemodynamic_parameters(filtered_ppg, systolic_peaks, fs)

            results['pulse_rate'] = pulse_rate
            results['pulse_intervals'] = {
                'count': len(valid_intervals),
                'mean': np.mean(valid_intervals),
                'std': np.std(valid_intervals)
            }
            results['prv_features'] = prv_features
            results['hemodynamic_parameters'] = hemodynamic_params

            # Clinical interpretation
            clinical_interpretation = interpret_ppg_results(prv_features, pulse_rate, hemodynamic_params)
            results['clinical_interpretation'] = clinical_interpretation

    return results

def calculate_hemodynamic_parameters(ppg_signal, systolic_peaks, fs):
    """Calculate hemodynamic parameters from PPG signal."""

    params = {}

    if len(systolic_peaks) > 2:
        # Pulse amplitude
        pulse_amplitudes = []
        for i in range(len(systolic_peaks) - 1):
            start = systolic_peaks[i]
            end = systolic_peaks[i + 1]
            pulse_segment = ppg_signal[start:end]
            amplitude = np.max(pulse_segment) - np.min(pulse_segment)
            pulse_amplitudes.append(amplitude)

        params['mean_pulse_amplitude'] = np.mean(pulse_amplitudes)
        params['pulse_amplitude_variability'] = np.std(pulse_amplitudes) / np.mean(pulse_amplitudes) * 100

        # Pulse wave analysis
        params['pulse_wave_analysis'] = analyze_pulse_wave_morphology(ppg_signal, systolic_peaks, fs)

    return params

def analyze_pulse_wave_morphology(ppg_signal, systolic_peaks, fs):
    """Analyze pulse wave morphology."""

    morphology = {}

    if len(systolic_peaks) > 1:
        # Calculate average pulse wave
        pulse_waves = []
        for i in range(len(systolic_peaks) - 1):
            start = systolic_peaks[i]
            end = systolic_peaks[i + 1]
            pulse_wave = ppg_signal[start:end]
            if len(pulse_wave) > 10:  # Ensure sufficient data
                pulse_waves.append(pulse_wave)

        if pulse_waves:
            # Normalize pulse waves to same length
            min_length = min(len(wave) for wave in pulse_waves)
            normalized_waves = [wave[:min_length] for wave in pulse_waves]

            # Calculate average pulse wave
            average_pulse_wave = np.mean(normalized_waves, axis=0)

            # Analyze morphology
            morphology['pulse_width'] = calculate_pulse_width(average_pulse_wave)
            morphology['pulse_slope'] = calculate_pulse_slope(average_pulse_wave)
            morphology['pulse_area'] = np.trapz(average_pulse_wave)

    return morphology

def interpret_ppg_results(prv_features, pulse_rate, hemodynamic_params):
    """Provide clinical interpretation of PPG analysis results."""

    interpretation = {
        'pulse_rate_status': 'normal' if 60 <= pulse_rate <= 100 else 'abnormal',
        'hemodynamic_status': 'normal',
        'risk_assessment': 'low',
        'recommendations': []
    }

    # Pulse rate interpretation
    if pulse_rate < 60:
        interpretation['pulse_rate_status'] = 'bradycardia'
        interpretation['recommendations'].append('Consider bradycardia evaluation')
    elif pulse_rate > 100:
        interpretation['pulse_rate_status'] = 'tachycardia'
        interpretation['recommendations'].append('Consider tachycardia evaluation')

    # Hemodynamic interpretation
    if 'mean_pulse_amplitude' in hemodynamic_params:
        pulse_amplitude = hemodynamic_params['mean_pulse_amplitude']
        if pulse_amplitude < 0.1:  # Threshold depends on your signal scaling
            interpretation['hemodynamic_status'] = 'reduced'
            interpretation['risk_assessment'] = 'elevated'
            interpretation['recommendations'].append('Reduced pulse amplitude may indicate poor perfusion')

    return interpretation

Example 3: Real-Time Vital Signs Monitoring

This example demonstrates real-time vital signs monitoring using VitalDSP.

Use Case: Continuous monitoring of vital signs in clinical settings or remote patient monitoring applications.

Key Features: * Real-time signal processing * Continuous vital signs calculation * Alert generation for abnormal values * Data logging and storage * Web interface for monitoring

Implementation:

import time
import threading
from collections import deque
from vitalDSP.filtering.signal_filtering import SignalFiltering
from vitalDSP.physiological_features.waveform import WaveformMorphology
from vitalDSP.signal_quality_assessment.signal_quality_index import SignalQualityIndex

class RealTimeVitalSignsMonitor:
    """Real-time vital signs monitoring system."""

    def __init__(self, fs=1000, window_size=10, update_interval=1):
        """
        Initialize real-time monitor.

        Parameters:
        -----------
        fs : float
            Sampling frequency (Hz)
        window_size : int
            Window size in seconds
        update_interval : float
            Update interval in seconds
        """

        self.fs = fs
        self.window_size = window_size
        self.update_interval = update_interval
        self.window_samples = fs * window_size

        # Data buffers
        self.signal_buffer = deque(maxlen=self.window_samples)
        self.vital_signs_history = deque(maxlen=100)  # Keep last 100 measurements

        # Current vital signs
        self.current_vital_signs = {
            'heart_rate': None,
            'pulse_rate': None,
            'signal_quality': None,
            'timestamp': None
        }

        # Alert thresholds
        self.alert_thresholds = {
            'heart_rate_low': 50,
            'heart_rate_high': 120,
            'signal_quality_low': 0.5
        }

        # Monitoring state
        self.is_monitoring = False
        self.monitor_thread = None

    def add_signal_data(self, signal_chunk):
        """Add new signal data to the buffer."""
        self.signal_buffer.extend(signal_chunk)

    def start_monitoring(self):
        """Start real-time monitoring."""
        if not self.is_monitoring:
            self.is_monitoring = True
            self.monitor_thread = threading.Thread(target=self._monitoring_loop)
            self.monitor_thread.daemon = True
            self.monitor_thread.start()
            print("Real-time monitoring started")

    def stop_monitoring(self):
        """Stop real-time monitoring."""
        self.is_monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join()
            print("Real-time monitoring stopped")

    def _monitoring_loop(self):
        """Main monitoring loop."""
        while self.is_monitoring:
            if len(self.signal_buffer) >= self.window_samples:
                # Get current window
                current_signal = np.array(list(self.signal_buffer))

                # Process signal
                vital_signs = self._process_signal_window(current_signal)

                # Update current vital signs
                self.current_vital_signs.update(vital_signs)
                self.current_vital_signs['timestamp'] = time.time()

                # Add to history
                self.vital_signs_history.append(vital_signs.copy())

                # Check for alerts
                self._check_alerts(vital_signs)

                # Log results
                self._log_vital_signs(vital_signs)

            time.sleep(self.update_interval)

    def _process_signal_window(self, signal):
        """Process a window of signal data."""
        vital_signs = {}

    try:
        # Signal preprocessing
        sf = SignalFiltering(signal)
        filtered_signal = sf.bandpass(lowcut=0.5, highcut=40.0, fs=self.fs, order=4)

        # Signal quality assessment
        sqi = SignalQualityIndex(filtered_signal)
        quality_sqi, _, _ = sqi.amplitude_variability_sqi(
            window_size=int(self.fs*5), step_size=int(self.fs*1), threshold=2, aggregate=False
        )
        vital_signs['signal_quality'] = np.mean(quality_sqi)

            # Detect peaks (assuming ECG signal)
            wm = WaveformMorphology(filtered_signal, fs=self.fs, signal_type="ECG")
            r_peaks = wm.r_peaks

            if len(r_peaks) > 2:
                # Calculate heart rate
                rr_intervals = np.diff(r_peaks) / self.fs
                valid_rr = rr_intervals[(rr_intervals > 0.3) & (rr_intervals < 2.0)]

                if len(valid_rr) > 1:
                    heart_rate = 60 / np.mean(valid_rr)
                    vital_signs['heart_rate'] = heart_rate
                    vital_signs['pulse_rate'] = heart_rate  # Assuming same for this example

        except Exception as e:
            print(f"Error processing signal: {e}")
            vital_signs['error'] = str(e)

        return vital_signs

    def _check_alerts(self, vital_signs):
        """Check for alert conditions."""
        alerts = []

        if 'heart_rate' in vital_signs:
            hr = vital_signs['heart_rate']
            if hr < self.alert_thresholds['heart_rate_low']:
                alerts.append(f"Low heart rate: {hr:.1f} BPM")
            elif hr > self.alert_thresholds['heart_rate_high']:
                alerts.append(f"High heart rate: {hr:.1f} BPM")

        if 'signal_quality' in vital_signs:
            sq = vital_signs['signal_quality']
            if sq < self.alert_thresholds['signal_quality_low']:
                alerts.append(f"Poor signal quality: {sq:.2f}")

        if alerts:
            self._handle_alerts(alerts)

    def _handle_alerts(self, alerts):
        """Handle alert conditions."""
        timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
        for alert in alerts:
            print(f"[ALERT {timestamp}] {alert}")
            # Here you could send notifications, log to database, etc.

    def _log_vital_signs(self, vital_signs):
        """Log vital signs data."""
        timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
        log_entry = f"[{timestamp}] "

        if 'heart_rate' in vital_signs:
            log_entry += f"HR: {vital_signs['heart_rate']:.1f} BPM "

        if 'signal_quality' in vital_signs:
            log_entry += f"SQ: {vital_signs['signal_quality']:.2f}"

        print(log_entry)

    def get_current_vital_signs(self):
        """Get current vital signs."""
        return self.current_vital_signs.copy()

    def get_vital_signs_history(self):
        """Get vital signs history."""
        return list(self.vital_signs_history)

    def get_statistics(self):
        """Get statistics from vital signs history."""
        if not self.vital_signs_history:
            return {}

        stats = {}

        # Heart rate statistics
        heart_rates = [vs.get('heart_rate') for vs in self.vital_signs_history if vs.get('heart_rate')]
        if heart_rates:
            stats['heart_rate'] = {
                'mean': np.mean(heart_rates),
                'std': np.std(heart_rates),
                'min': np.min(heart_rates),
                'max': np.max(heart_rates)
            }

        # Signal quality statistics
        signal_qualities = [vs.get('signal_quality') for vs in self.vital_signs_history if vs.get('signal_quality')]
        if signal_qualities:
            stats['signal_quality'] = {
                'mean': np.mean(signal_qualities),
                'std': np.std(signal_qualities),
                'min': np.min(signal_qualities),
                'max': np.max(signal_qualities)
            }

        return stats

Usage Example:

# Initialize real-time monitor
monitor = RealTimeVitalSignsMonitor(fs=1000, window_size=10, update_interval=1)

# Start monitoring
monitor.start_monitoring()

# Simulate real-time data (replace with your data source)
for i in range(100):
    # Generate sample data (replace with your data acquisition)
    sample_data = np.random.randn(100) + np.sin(2 * np.pi * 1.2 * np.arange(100) / 1000)
    monitor.add_signal_data(sample_data)
    time.sleep(0.1)

# Get current vital signs
current_vitals = monitor.get_current_vital_signs()
print(f"Current heart rate: {current_vitals.get('heart_rate', 'N/A')} BPM")

# Get statistics
stats = monitor.get_statistics()
print(f"Average heart rate: {stats.get('heart_rate', {}).get('mean', 'N/A')} BPM")

# Stop monitoring
monitor.stop_monitoring()

Example 4: Wearable Device Integration

This example demonstrates how to integrate VitalDSP with wearable devices for health monitoring.

Use Case: Processing data from fitness trackers, smartwatches, or other wearable health devices.

Key Features: * Data format conversion and preprocessing * Real-time processing for wearable constraints * Battery-efficient algorithms * Mobile-friendly analysis * Cloud integration

Implementation:

import json
import requests
from datetime import datetime, timedelta

class WearableDeviceIntegration:
    """Integration with wearable devices for health monitoring."""

    def __init__(self, device_type="fitness_tracker", cloud_endpoint=None):
        """
        Initialize wearable device integration.

        Parameters:
        -----------
        device_type : str
            Type of wearable device
        cloud_endpoint : str, optional
            Cloud endpoint for data synchronization
        """

        self.device_type = device_type
        self.cloud_endpoint = cloud_endpoint
        self.device_config = self._get_device_config(device_type)

        # Data processing parameters optimized for wearables
        self.processing_params = {
            'fs': self.device_config['sampling_rate'],
            'window_size': 30,  # 30-second windows
            'update_interval': 5,  # 5-second updates
            'battery_optimized': True
        }

        # Local data storage
        self.local_data = []
        self.processed_data = []

    def _get_device_config(self, device_type):
        """Get device-specific configuration."""

        configs = {
            'fitness_tracker': {
                'sampling_rate': 100,
                'signal_types': ['ppg', 'accelerometer'],
                'battery_life': '7_days',
                'data_format': 'json'
            },
            'smartwatch': {
                'sampling_rate': 200,
                'signal_types': ['ppg', 'ecg', 'accelerometer'],
                'battery_life': '2_days',
                'data_format': 'json'
            },
            'chest_strap': {
                'sampling_rate': 1000,
                'signal_types': ['ecg', 'accelerometer'],
                'battery_life': '24_hours',
                'data_format': 'csv'
            }
        }

        return configs.get(device_type, configs['fitness_tracker'])

    def process_wearable_data(self, raw_data, data_type="ppg"):
        """
        Process data from wearable device.

        Parameters:
        -----------
        raw_data : dict or array
            Raw data from wearable device
        data_type : str
            Type of signal data

        Returns:
        --------
        dict : Processed analysis results
        """

        # Convert raw data to standard format
        processed_signal = self._convert_wearable_data(raw_data, data_type)

        if processed_signal is None:
            return {'error': 'Failed to convert data'}

        # Apply battery-optimized processing
        results = self._battery_optimized_processing(processed_signal, data_type)

        # Add metadata
        results['device_type'] = self.device_type
        results['data_type'] = data_type
        results['timestamp'] = datetime.now().isoformat()
        results['processing_params'] = self.processing_params

        # Store locally
        self.processed_data.append(results)

        # Sync to cloud if enabled
        if self.cloud_endpoint:
            self._sync_to_cloud(results)

        return results

    def _convert_wearable_data(self, raw_data, data_type):
        """Convert wearable data to standard format."""

        try:
            if self.device_config['data_format'] == 'json':
                if isinstance(raw_data, str):
                    data = json.loads(raw_data)
                else:
                    data = raw_data

                # Extract signal data based on device type
                if self.device_type == 'fitness_tracker':
                    signal = data.get('ppg_data', [])
                elif self.device_type == 'smartwatch':
                    signal = data.get('heart_rate_data', [])
                elif self.device_type == 'chest_strap':
                    signal = data.get('ecg_data', [])
                else:
                    signal = data.get('signal_data', [])

            elif self.device_config['data_format'] == 'csv':
                # Handle CSV data
                signal = raw_data if isinstance(raw_data, list) else raw_data.tolist()

            else:
                signal = raw_data

            # Convert to numpy array
            signal_array = np.array(signal)

            # Validate signal
            if len(signal_array) < 10:
                return None

            return signal_array

        except Exception as e:
            print(f"Error converting wearable data: {e}")
            return None

    def _battery_optimized_processing(self, signal, data_type):
        """Apply battery-optimized signal processing."""

        results = {}

        try:
            fs = self.processing_params['fs']

            # Lightweight preprocessing
            sf = SignalFiltering(signal)

            # Use simpler filters for battery optimization
            if data_type == 'ppg':
                filtered_signal = sf.bandpass(lowcut=0.5, highcut=8.0, fs=fs, order=2)
            elif data_type == 'ecg':
                filtered_signal = sf.bandpass(lowcut=0.5, highcut=40.0, fs=fs, order=2)
            else:
                filtered_signal = sf.bandpass(lowcut=0.1, highcut=20.0, fs=fs, order=2)

            # Basic feature extraction
            if data_type in ['ppg', 'ecg']:
                wm = WaveformMorphology(filtered_signal, fs=fs, signal_type=data_type)

                if data_type == 'ppg':
                    peaks = wm.systolic_peaks
                else:
                    peaks = wm.r_peaks

                if len(peaks) > 2:
                    # Calculate heart rate
                    intervals = np.diff(peaks) / fs
                    valid_intervals = intervals[(intervals > 0.3) & (intervals < 2.0)]

                    if len(valid_intervals) > 1:
                        heart_rate = 60 / np.mean(valid_intervals)
                        results['heart_rate'] = heart_rate
                        results['heart_rate_variability'] = np.std(valid_intervals) / np.mean(valid_intervals) * 100

        # Basic signal quality
        sqi = SignalQualityIndex(filtered_signal)
        quality_sqi, _, _ = sqi.amplitude_variability_sqi(
            window_size=int(fs*5), step_size=int(fs*1), threshold=2, aggregate=False
        )
        results['signal_quality'] = np.mean(quality_sqi)

            # Activity level (if accelerometer data available)
            if data_type == 'accelerometer':
                results['activity_level'] = self._calculate_activity_level(signal)

        except Exception as e:
            print(f"Error in battery-optimized processing: {e}")
            results['error'] = str(e)

        return results

    def _calculate_activity_level(self, accelerometer_data):
        """Calculate activity level from accelerometer data."""

        if len(accelerometer_data) < 3:
            return 'unknown'

        # Calculate magnitude of acceleration
        if accelerometer_data.ndim > 1:
            magnitude = np.sqrt(np.sum(accelerometer_data**2, axis=1))
        else:
            magnitude = np.abs(accelerometer_data)

        # Calculate activity level
        mean_magnitude = np.mean(magnitude)

        if mean_magnitude < 1.0:
            return 'sedentary'
        elif mean_magnitude < 2.0:
            return 'light'
        elif mean_magnitude < 3.0:
            return 'moderate'
        else:
            return 'vigorous'

    def _sync_to_cloud(self, data):
        """Sync processed data to cloud endpoint."""

        try:
            if self.cloud_endpoint:
                response = requests.post(
                    self.cloud_endpoint,
                    json=data,
                    timeout=10
                )

                if response.status_code == 200:
                    print("Data synced to cloud successfully")
                else:
                    print(f"Cloud sync failed: {response.status_code}")

        except Exception as e:
            print(f"Cloud sync error: {e}")

    def get_health_summary(self, hours=24):
        """Get health summary for specified time period."""

        cutoff_time = datetime.now() - timedelta(hours=hours)

        # Filter data for specified time period
        recent_data = [
            data for data in self.processed_data
            if datetime.fromisoformat(data['timestamp']) > cutoff_time
        ]

        if not recent_data:
            return {'message': 'No data available for specified period'}

        # Calculate summary statistics
        summary = {
            'period_hours': hours,
            'data_points': len(recent_data),
            'heart_rate_stats': self._calculate_heart_rate_stats(recent_data),
            'activity_summary': self._calculate_activity_summary(recent_data),
            'signal_quality_stats': self._calculate_signal_quality_stats(recent_data)
        }

        return summary

    def _calculate_heart_rate_stats(self, data):
        """Calculate heart rate statistics."""

        heart_rates = [d.get('heart_rate') for d in data if d.get('heart_rate')]

        if not heart_rates:
            return None

        return {
            'mean': np.mean(heart_rates),
            'std': np.std(heart_rates),
            'min': np.min(heart_rates),
            'max': np.max(heart_rates),
            'count': len(heart_rates)
        }

    def _calculate_activity_summary(self, data):
        """Calculate activity summary."""

        activities = [d.get('activity_level') for d in data if d.get('activity_level')]

        if not activities:
            return None

        activity_counts = {}
        for activity in activities:
            activity_counts[activity] = activity_counts.get(activity, 0) + 1

        return activity_counts

    def _calculate_signal_quality_stats(self, data):
        """Calculate signal quality statistics."""

        qualities = [d.get('signal_quality') for d in data if d.get('signal_quality')]

        if not qualities:
            return None

        return {
            'mean': np.mean(qualities),
            'std': np.std(qualities),
            'min': np.min(qualities),
            'max': np.max(qualities)
        }

Usage Example:

# Initialize wearable integration
wearable = WearableDeviceIntegration(
    device_type="fitness_tracker",
    cloud_endpoint="https://api.example.com/vital-signs"
)

# Simulate wearable data
sample_data = {
    'ppg_data': np.random.randn(1000) + np.sin(2 * np.pi * 1.2 * np.arange(1000) / 100),
    'timestamp': datetime.now().isoformat(),
    'device_id': 'tracker_001'
}

# Process wearable data
results = wearable.process_wearable_data(sample_data, data_type="ppg")

print(f"Heart rate: {results.get('heart_rate', 'N/A')} BPM")
print(f"Signal quality: {results.get('signal_quality', 'N/A')}")

# Get health summary
summary = wearable.get_health_summary(hours=24)
print(f"Health summary: {summary}")

Best Practices for Examples

Code Organization * Use clear, descriptive function and variable names * Include comprehensive docstrings * Implement proper error handling * Follow PEP 8 style guidelines

Performance Considerations * Optimize for your specific use case * Consider memory usage for large datasets * Use appropriate sampling rates * Implement efficient data structures

Clinical Applications * Validate results against clinical standards * Consider patient safety and data privacy * Document methodology for reproducibility * Include appropriate disclaimers

Integration Guidelines * Design for your target platform * Consider real-time constraints * Implement proper data validation * Include comprehensive logging

Testing and Validation * Test with various signal types and qualities * Validate against known datasets * Include edge case handling * Document test procedures

Example 5: Advanced Multi-Scale Entropy Analysis

This example demonstrates advanced nonlinear analysis techniques for assessing physiological signal complexity.

Use Case: Quantifying cardiac health and autonomic nervous system function through multi-scale entropy analysis.

Key Features: * Multi-scale entropy (MSE) computation * Composite multi-scale entropy (CMSE) * Refined composite multi-scale entropy (RCMSE) * Clinical interpretation of entropy metrics * Complexity-loss aging assessment

Implementation:

import numpy as np
from vitalDSP.physiological_features.advanced_entropy import MultiScaleEntropy
from vitalDSP.utils.data_processing.synthesize_data import generate_ecg_signal
from vitalDSP.physiological_features.waveform import WaveformMorphology
from vitalDSP.filtering.signal_filtering import SignalFiltering

def analyze_cardiac_complexity(ecg_signal, fs, patient_id=None):
    """
    Comprehensive cardiac complexity analysis using multi-scale entropy.

    Parameters:
    -----------
    ecg_signal : array-like
        ECG signal data
    fs : float
        Sampling frequency (Hz)
    patient_id : str, optional
        Patient identifier

    Returns:
    --------
    dict : Comprehensive complexity analysis results
    """

    results = {
        'patient_id': patient_id,
        'sampling_frequency': fs,
        'analysis': 'Multi-Scale Entropy'
    }

    # 1. Preprocess ECG signal
    sf = SignalFiltering(ecg_signal)
    filtered_ecg = sf.bandpass(lowcut=0.5, highcut=40.0, fs=fs, order=4)

    # 2. Extract RR intervals
    wm = WaveformMorphology(filtered_ecg, fs=fs, signal_type="ECG")
    r_peaks = wm.r_peaks

    if len(r_peaks) > 50:
        # Calculate RR intervals in milliseconds
        rr_intervals = np.diff(r_peaks) / fs * 1000

        # Remove outliers
        valid_rr = rr_intervals[(rr_intervals > 300) & (rr_intervals < 2000)]

        if len(valid_rr) > 50:
            # 3. Multi-Scale Entropy Analysis
            mse_analyzer = MultiScaleEntropy(
                signal=valid_rr,
                m=2,           # Embedding dimension
                r_multiplier=0.15,  # Tolerance multiplier
                max_scale=20   # Maximum scale
            )

            # Standard MSE
            mse_values, complexity_index = mse_analyzer.compute_mse()
            results['mse_values'] = mse_values.tolist()
            results['complexity_index'] = float(complexity_index)

            # Composite MSE (more robust)
            cmse_values, cmse_ci = mse_analyzer.compute_cmse()
            results['cmse_values'] = cmse_values.tolist()
            results['cmse_complexity_index'] = float(cmse_ci)

            # Refined Composite MSE (best for short signals)
            rcmse_values, rcmse_ci = mse_analyzer.compute_rcmse()
            results['rcmse_values'] = rcmse_values.tolist()
            results['rcmse_complexity_index'] = float(rcmse_ci)

            # 4. Clinical Interpretation
            interpretation = interpret_complexity_metrics(
                complexity_index,
                cmse_ci,
                rcmse_ci,
                len(valid_rr)
            )
            results['clinical_interpretation'] = interpretation

    return results

def interpret_complexity_metrics(mse_ci, cmse_ci, rcmse_ci, signal_length):
    """Provide clinical interpretation of complexity metrics."""

    interpretation = {
        'complexity_level': 'normal',
        'autonomic_status': 'balanced',
        'cardiac_health': 'healthy',
        'recommendations': []
    }

    # Complexity Index thresholds (clinically validated)
    if cmse_ci < 30:
        interpretation['complexity_level'] = 'reduced'
        interpretation['autonomic_status'] = 'impaired'
        interpretation['cardiac_health'] = 'at_risk'
        interpretation['recommendations'].append(
            'Reduced complexity suggests autonomic dysfunction or aging'
        )
        interpretation['recommendations'].append(
            'Consider comprehensive cardiac evaluation'
        )
    elif cmse_ci > 60:
        interpretation['complexity_level'] = 'high'
        interpretation['autonomic_status'] = 'healthy'
        interpretation['cardiac_health'] = 'excellent'
        interpretation['recommendations'].append(
            'High complexity indicates good cardiovascular health'
        )
    else:
        interpretation['complexity_level'] = 'normal'
        interpretation['autonomic_status'] = 'balanced'
        interpretation['recommendations'].append(
            'Complexity within normal range for age'
        )

    # Signal quality check
    if signal_length < 100:
        interpretation['recommendations'].append(
            'Signal length is short - consider longer recording for robust assessment'
        )

    return interpretation

Usage Example:

# Generate or load ECG data
fs = 256
ecg_data = generate_ecg_signal(sfecg=fs, duration=300, hrmean=72, Anoise=0.05)

# Perform complexity analysis
results = analyze_cardiac_complexity(ecg_data, fs, patient_id="P001")

# Display results
print(f"Patient ID: {results['patient_id']}")
print(f"Complexity Index (CMSE): {results['cmse_complexity_index']:.2f}")
print(f"Cardiac Health: {results['clinical_interpretation']['cardiac_health']}")
print(f"Autonomic Status: {results['clinical_interpretation']['autonomic_status']}")

# Plot MSE curves
import matplotlib.pyplot as plt

scales = range(1, len(results['mse_values']) + 1)
plt.figure(figsize=(12, 6))
plt.plot(scales, results['mse_values'], 'b-', label='MSE', linewidth=2)
plt.plot(scales, results['cmse_values'], 'r-', label='CMSE', linewidth=2)
plt.plot(scales, results['rcmse_values'], 'g-', label='RCMSE', linewidth=2)
plt.xlabel('Scale Factor')
plt.ylabel('Sample Entropy')
plt.title('Multi-Scale Entropy Analysis')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

Example 6: Comprehensive Health Monitoring System

This example demonstrates a complete health monitoring pipeline with automated report generation.

Use Case: Continuous health monitoring with automated assessment and alerting for clinical decision support.

Key Features: * Multi-modal signal processing (ECG, PPG, Respiratory) * Comprehensive feature extraction * Automated health report generation * Clinical decision support * Trend analysis and alerts

Implementation:

from vitalDSP.health_analysis.health_report_generator import HealthReportGenerator
from vitalDSP.filtering.signal_filtering import SignalFiltering
from vitalDSP.physiological_features.hrv_analysis import HRVFeatures
from vitalDSP.physiological_features.waveform import WaveformMorphology
from vitalDSP.respiratory_analysis.respiratory_analysis import RespiratoryAnalysis
import pandas as pd
import numpy as np

class ComprehensiveHealthMonitor:
    """Complete health monitoring system with automated reporting."""

    def __init__(self, patient_id, fs=256):
        """
        Initialize health monitoring system.

        Parameters:
        -----------
        patient_id : str
            Patient identifier
        fs : float
            Sampling frequency (Hz)
        """
        self.patient_id = patient_id
        self.fs = fs
        self.monitoring_history = []

    def process_vital_signs(self, ecg_signal, ppg_signal=None, resp_signal=None):
        """
        Process multiple vital signs and generate comprehensive assessment.

        Parameters:
        -----------
        ecg_signal : array-like
            ECG signal data
        ppg_signal : array-like, optional
            PPG signal data
        resp_signal : array-like, optional
            Respiratory signal data

        Returns:
        --------
        dict : Comprehensive health assessment
        """

        assessment = {
            'patient_id': self.patient_id,
            'timestamp': pd.Timestamp.now(),
            'vital_signs': {}
        }

        # Process ECG signal
        if ecg_signal is not None:
            ecg_results = self._process_ecg(ecg_signal)
            assessment['vital_signs']['ecg'] = ecg_results

        # Process PPG signal
        if ppg_signal is not None:
            ppg_results = self._process_ppg(ppg_signal)
            assessment['vital_signs']['ppg'] = ppg_results

        # Process Respiratory signal
        if resp_signal is not None:
            resp_results = self._process_respiratory(resp_signal)
            assessment['vital_signs']['respiratory'] = resp_results

        # Generate health score
        health_score = self._calculate_health_score(assessment['vital_signs'])
        assessment['health_score'] = health_score

        # Clinical recommendations
        assessment['recommendations'] = self._generate_recommendations(
            assessment['vital_signs'],
            health_score
        )

        # Store in history
        self.monitoring_history.append(assessment)

        return assessment

    def _process_ecg(self, ecg_signal):
        """Process ECG signal and extract features."""

        results = {}

        # Filter ECG
        sf = SignalFiltering(ecg_signal)
        filtered_ecg = sf.bandpass(lowcut=0.5, highcut=40.0, fs=self.fs, order=4)

        # Detect R-peaks
        wm = WaveformMorphology(filtered_ecg, fs=self.fs, signal_type="ECG")
        r_peaks = wm.r_peaks

        if len(r_peaks) > 10:
            # RR intervals
            rr_intervals = np.diff(r_peaks) / self.fs * 1000
            valid_rr = rr_intervals[(rr_intervals > 300) & (rr_intervals < 2000)]

            if len(valid_rr) > 5:
                # Heart rate
                heart_rate = 60000 / np.mean(valid_rr)
                results['heart_rate'] = float(heart_rate)
                results['heart_rate_status'] = self._assess_heart_rate(heart_rate)

                # HRV analysis
                hrv = HRVFeatures(valid_rr)
                hrv_features = hrv.compute_all_features()
                results['hrv'] = hrv_features
                results['hrv_status'] = self._assess_hrv(hrv_features)

        return results

    def _process_ppg(self, ppg_signal):
        """Process PPG signal and extract features."""

        results = {}

        # Filter PPG
        sf = SignalFiltering(ppg_signal)
        filtered_ppg = sf.bandpass(lowcut=0.5, highcut=8.0, fs=self.fs, order=4)

        # Detect peaks
        wm = WaveformMorphology(filtered_ppg, fs=self.fs, signal_type="PPG")
        systolic_peaks = wm.systolic_peaks

        if len(systolic_peaks) > 5:
            # Pulse rate
            pulse_intervals = np.diff(systolic_peaks) / self.fs * 1000
            valid_intervals = pulse_intervals[(pulse_intervals > 400) & (pulse_intervals < 2000)]

            if len(valid_intervals) > 3:
                pulse_rate = 60000 / np.mean(valid_intervals)
                results['pulse_rate'] = float(pulse_rate)
                results['pulse_rate_status'] = self._assess_heart_rate(pulse_rate)

                # Pulse rate variability
                prv = np.std(valid_intervals)
                results['pulse_rate_variability'] = float(prv)

        return results

    def _process_respiratory(self, resp_signal):
        """Process respiratory signal."""

        results = {}

        try:
            # Respiratory analysis
            resp_analyzer = RespiratoryAnalysis(resp_signal, self.fs)

            # Preprocess
            filtered_resp = resp_analyzer.preprocess_signal(
                detrend=True,
                normalize=True,
                filter_type='bandpass',
                low_freq=0.1,
                high_freq=0.5
            )

            # Estimate rate
            resp_rate = resp_analyzer.compute_respiratory_rate()
            results['respiratory_rate'] = float(resp_rate)
            results['respiratory_status'] = self._assess_respiratory_rate(resp_rate)

        except Exception as e:
            results['error'] = str(e)

        return results

    def _assess_heart_rate(self, hr):
        """Assess heart rate status."""
        if hr < 60:
            return 'bradycardia'
        elif hr > 100:
            return 'tachycardia'
        else:
            return 'normal'

    def _assess_hrv(self, hrv_features):
        """Assess HRV status."""
        if 'sdnn' in hrv_features:
            sdnn = hrv_features['sdnn']
            if sdnn < 30:
                return 'reduced'
            elif sdnn > 50:
                return 'good'
            else:
                return 'normal'
        return 'unknown'

    def _assess_respiratory_rate(self, rr):
        """Assess respiratory rate status."""
        if rr < 12:
            return 'bradypnea'
        elif rr > 20:
            return 'tachypnea'
        else:
            return 'normal'

    def _calculate_health_score(self, vital_signs):
        """Calculate overall health score (0-100)."""

        score = 100

        # ECG assessment
        if 'ecg' in vital_signs:
            ecg = vital_signs['ecg']
            if ecg.get('heart_rate_status') != 'normal':
                score -= 15
            if ecg.get('hrv_status') == 'reduced':
                score -= 20
            elif ecg.get('hrv_status') == 'good':
                score += 5

        # PPG assessment
        if 'ppg' in vital_signs:
            ppg = vital_signs['ppg']
            if ppg.get('pulse_rate_status') != 'normal':
                score -= 10

        # Respiratory assessment
        if 'respiratory' in vital_signs:
            resp = vital_signs['respiratory']
            if resp.get('respiratory_status') != 'normal':
                score -= 15

        return max(0, min(100, score))

    def _generate_recommendations(self, vital_signs, health_score):
        """Generate clinical recommendations."""

        recommendations = []

        if health_score < 60:
            recommendations.append('⚠️ URGENT: Immediate clinical evaluation recommended')
        elif health_score < 75:
            recommendations.append('⚠️ Clinical review recommended within 24 hours')

        # Specific recommendations
        if 'ecg' in vital_signs:
            ecg = vital_signs['ecg']
            if ecg.get('heart_rate_status') == 'bradycardia':
                recommendations.append('• Evaluate for bradycardia causes')
            elif ecg.get('heart_rate_status') == 'tachycardia':
                recommendations.append('• Assess for tachycardia etiology')

            if ecg.get('hrv_status') == 'reduced':
                recommendations.append('• Reduced HRV - consider autonomic assessment')

        if 'respiratory' in vital_signs:
            resp = vital_signs['respiratory']
            if resp.get('respiratory_status') != 'normal':
                recommendations.append('• Abnormal respiratory rate - assess pulmonary function')

        if not recommendations:
            recommendations.append('✓ All vital signs within normal ranges')
            recommendations.append('✓ Continue regular monitoring')

        return recommendations

    def generate_health_report(self, assessment, output_path='health_report.html'):
        """Generate comprehensive health report."""

        # Prepare feature data for report generator
        feature_data = {}

        if 'ecg' in assessment['vital_signs']:
            ecg = assessment['vital_signs']['ecg']
            if 'hrv' in ecg:
                for key, value in ecg['hrv'].items():
                    if isinstance(value, (int, float, np.number)):
                        feature_data[key] = [float(value)]

        if feature_data:
            # Generate report
            report_gen = HealthReportGenerator(
                feature_data=feature_data,
                sampling_rate=self.fs
            )

            report_path = report_gen.generate_report(
                patient_id=self.patient_id,
                output_path=output_path
            )

            return report_path

        return None

Usage Example:

# Initialize monitoring system
monitor = ComprehensiveHealthMonitor(patient_id="P001", fs=256)

# Generate or load vital signs
ecg_signal = generate_ecg_signal(sfecg=256, duration=60, hrmean=75, Anoise=0.05)

# Process vital signs
assessment = monitor.process_vital_signs(ecg_signal=ecg_signal)

# Display results
print(f"Patient ID: {assessment['patient_id']}")
print(f"Health Score: {assessment['health_score']}/100")
print(f"Timestamp: {assessment['timestamp']}")

print("\nVital Signs:")
if 'ecg' in assessment['vital_signs']:
    ecg = assessment['vital_signs']['ecg']
    print(f"  Heart Rate: {ecg.get('heart_rate', 'N/A'):.1f} BPM ({ecg.get('heart_rate_status', 'unknown')})")
    print(f"  HRV Status: {ecg.get('hrv_status', 'unknown')}")

print("\nRecommendations:")
for rec in assessment['recommendations']:
    print(f"  {rec}")

# Generate HTML report
report_path = monitor.generate_health_report(assessment)
if report_path:
    print(f"\n📄 Health report generated: {report_path}")

Example 7: Cross-Signal Synchronization Analysis

This example demonstrates advanced analysis of coupling between multiple physiological signals.

Use Case: Analyzing cardio-respiratory coupling and autonomic nervous system coordination.

Key Features: * Transfer entropy analysis * Phase synchronization * Cross-correlation analysis * Directional coupling assessment * Clinical interpretation of coupling metrics

Implementation:

from vitalDSP.physiological_features.transfer_entropy import TransferEntropy
from vitalDSP.filtering.signal_filtering import SignalFiltering
from vitalDSP.physiological_features.waveform import WaveformMorphology
import numpy as np
from scipy import signal as sp_signal

def analyze_cardiorespiratory_coupling(ecg_signal, resp_signal, fs):
    """
    Analyze coupling between cardiac and respiratory signals.

    Parameters:
    -----------
    ecg_signal : array-like
        ECG signal data
    resp_signal : array-like
        Respiratory signal data
    fs : float
        Sampling frequency (Hz)

    Returns:
    --------
    dict : Coupling analysis results
    """

    results = {
        'sampling_frequency': fs,
        'analysis_type': 'cardio-respiratory_coupling'
    }

    # 1. Preprocess signals
    sf_ecg = SignalFiltering(ecg_signal)
    filtered_ecg = sf_ecg.bandpass(lowcut=0.5, highcut=40.0, fs=fs, order=4)

    sf_resp = SignalFiltering(resp_signal)
    filtered_resp = sf_resp.bandpass(lowcut=0.1, highcut=0.5, fs=fs, order=4)

    # 2. Extract RR intervals
    wm = WaveformMorphology(filtered_ecg, fs=fs, signal_type="ECG")
    r_peaks = wm.r_peaks

    if len(r_peaks) > 50:
        rr_intervals = np.diff(r_peaks) / fs * 1000
        valid_rr = rr_intervals[(rr_intervals > 300) & (rr_intervals < 2000)]

        if len(valid_rr) > 50 and len(filtered_resp) > 100:
            # Resample respiratory signal to match RR intervals
            resp_resampled = sp_signal.resample(
                filtered_resp,
                len(valid_rr)
            )

            # 3. Transfer Entropy Analysis
            te_analyzer = TransferEntropy(
                source=resp_resampled[:len(valid_rr)],
                target=valid_rr,
                k=1,  # History length
                delay=1  # Time delay
            )

            # Respiratory -> Cardiac influence
            te_resp_to_cardiac = te_analyzer.compute_transfer_entropy()
            results['te_resp_to_cardiac'] = float(te_resp_to_cardiac)

            # Cardiac -> Respiratory influence
            te_analyzer_reverse = TransferEntropy(
                source=valid_rr,
                target=resp_resampled[:len(valid_rr)],
                k=1,
                delay=1
            )
            te_cardiac_to_resp = te_analyzer_reverse.compute_transfer_entropy()
            results['te_cardiac_to_resp'] = float(te_cardiac_to_resp)

            # Net directionality
            net_directionality = te_resp_to_cardiac - te_cardiac_to_resp
            results['net_directionality'] = float(net_directionality)

            # 4. Cross-correlation analysis
            cross_corr = np.correlate(
                valid_rr - np.mean(valid_rr),
                resp_resampled[:len(valid_rr)] - np.mean(resp_resampled[:len(valid_rr)]),
                mode='full'
            )
            cross_corr = cross_corr / (len(valid_rr) * np.std(valid_rr) * np.std(resp_resampled[:len(valid_rr)]))

            max_corr_idx = np.argmax(np.abs(cross_corr))
            max_correlation = cross_corr[max_corr_idx]
            lag = max_corr_idx - len(valid_rr) + 1

            results['max_correlation'] = float(max_correlation)
            results['optimal_lag'] = int(lag)

            # 5. Clinical interpretation
            interpretation = interpret_coupling_metrics(
                te_resp_to_cardiac,
                te_cardiac_to_resp,
                net_directionality,
                max_correlation
            )
            results['clinical_interpretation'] = interpretation

    return results

def interpret_coupling_metrics(te_r_to_c, te_c_to_r, net_dir, max_corr):
    """Provide clinical interpretation of coupling metrics."""

    interpretation = {
        'coupling_strength': 'normal',
        'dominant_direction': 'balanced',
        'autonomic_status': 'healthy',
        'recommendations': []
    }

    # Assess coupling strength
    avg_te = (te_r_to_c + te_c_to_r) / 2
    if avg_te < 0.05:
        interpretation['coupling_strength'] = 'weak'
        interpretation['autonomic_status'] = 'impaired'
        interpretation['recommendations'].append(
            'Weak cardio-respiratory coupling suggests autonomic dysfunction'
        )
    elif avg_te > 0.2:
        interpretation['coupling_strength'] = 'strong'
        interpretation['autonomic_status'] = 'healthy'
        interpretation['recommendations'].append(
            'Strong coupling indicates good autonomic function'
        )

    # Assess directionality
    if abs(net_dir) > 0.1:
        if net_dir > 0:
            interpretation['dominant_direction'] = 'respiratory_to_cardiac'
            interpretation['recommendations'].append(
                'Respiratory drive dominant - typical of healthy state'
            )
        else:
            interpretation['dominant_direction'] = 'cardiac_to_respiratory'
            interpretation['recommendations'].append(
                'Cardiac drive dominant - may indicate stress or pathology'
            )
    else:
        interpretation['dominant_direction'] = 'balanced'
        interpretation['recommendations'].append(
            'Balanced bidirectional coupling'
        )

    # Assess correlation
    if abs(max_corr) > 0.5:
        interpretation['recommendations'].append(
            f'High correlation ({max_corr:.2f}) indicates strong synchronization'
        )
    elif abs(max_corr) < 0.2:
        interpretation['recommendations'].append(
            f'Low correlation ({max_corr:.2f}) suggests reduced synchronization'
        )

    return interpretation

Usage Example:

from vitalDSP.utils.data_processing.synthesize_data import generate_ecg_signal, generate_resp_signal

# Generate signals
fs = 256
duration = 300

ecg_signal = generate_ecg_signal(sfecg=fs, duration=duration, hrmean=72, Anoise=0.05)
resp_signal = generate_resp_signal(sampling_rate=fs, duration=duration, frequency=0.25, amplitude=0.5)

# Analyze coupling
results = analyze_cardiorespiratory_coupling(ecg_signal, resp_signal, fs)

# Display results
print("Cardio-Respiratory Coupling Analysis")
print("=" * 50)
print(f"Transfer Entropy (Resp → Cardiac): {results['te_resp_to_cardiac']:.4f}")
print(f"Transfer Entropy (Cardiac → Resp): {results['te_cardiac_to_resp']:.4f}")
print(f"Net Directionality: {results['net_directionality']:.4f}")
print(f"Maximum Correlation: {results['max_correlation']:.3f}")
print(f"Optimal Lag: {results['optimal_lag']} samples")

print("\nClinical Interpretation:")
interp = results['clinical_interpretation']
print(f"Coupling Strength: {interp['coupling_strength']}")
print(f"Dominant Direction: {interp['dominant_direction']}")
print(f"Autonomic Status: {interp['autonomic_status']}")

print("\nRecommendations:")
for rec in interp['recommendations']:
    print(f"  • {rec}")

Example 8: Comprehensive Physiological Feature Extraction

This example demonstrates comprehensive feature extraction across all domains: time, frequency, nonlinear, and energy analysis.

Use Case: Complete physiological characterization for research and clinical assessment.

Key Features: * Time-domain feature extraction * Frequency-domain analysis (PSD, spectral bands) * Nonlinear dynamics (Poincaré, entropy, fractal dimension) * Beat-to-beat variability analysis * Energy distribution analysis * Automated feature interpretation

Implementation:

import numpy as np
from vitalDSP.physiological_features.time_domain import TimeDomainFeatures
from vitalDSP.physiological_features.frequency_domain import FrequencyDomainFeatures
from vitalDSP.physiological_features.nonlinear import NonlinearFeatures
from vitalDSP.physiological_features.beat_to_beat import BeatToBeatAnalysis
from vitalDSP.physiological_features.energy_analysis import EnergyAnalysis
from vitalDSP.physiological_features.waveform import WaveformMorphology
from vitalDSP.filtering.signal_filtering import SignalFiltering
from vitalDSP.utils.data_processing.synthesize_data import generate_ecg_signal
import pandas as pd

def extract_comprehensive_features(ecg_signal, fs, patient_id=None):
    """
    Extract comprehensive physiological features from ECG signal.

    Parameters:
    -----------
    ecg_signal : array-like
        ECG signal data
    fs : float
        Sampling frequency (Hz)
    patient_id : str, optional
        Patient identifier

    Returns:
    --------
    dict : Complete feature set with clinical interpretation
    """

    results = {
        'patient_id': patient_id,
        'sampling_frequency': fs,
        'features': {},
        'interpretation': {}
    }

    # 1. Preprocess signal
    sf = SignalFiltering(ecg_signal)
    filtered_ecg = sf.bandpass(lowcut=0.5, highcut=40.0, fs=fs, order=4)

    # 2. Extract R-peaks and RR intervals
    wm = WaveformMorphology(filtered_ecg, fs=fs, signal_type="ECG")
    r_peaks = wm.r_peaks

    if len(r_peaks) < 10:
        results['error'] = 'Insufficient R-peaks detected'
        return results

    # Calculate RR intervals in milliseconds
    rr_intervals = np.diff(r_peaks) / fs * 1000
    valid_rr = rr_intervals[(rr_intervals > 300) & (rr_intervals < 2000)]

    if len(valid_rr) < 5:
        results['error'] = 'Insufficient valid RR intervals'
        return results

    # 3. TIME DOMAIN FEATURES
    print("Extracting time-domain features...")
    td_features = TimeDomainFeatures(valid_rr)

    # Basic statistics
    results['features']['mean_rr'] = td_features.mean()
    results['features']['std_rr'] = td_features.std()
    results['features']['min_rr'] = td_features.min()
    results['features']['max_rr'] = td_features.max()
    results['features']['range_rr'] = td_features.max() - td_features.min()

    # HRV time-domain metrics
    results['features']['sdnn'] = td_features.sdnn()
    results['features']['rmssd'] = td_features.rmssd()
    results['features']['sdsd'] = td_features.sdsd()
    results['features']['nn50'] = td_features.nn50()
    results['features']['pnn50'] = td_features.pnn50()
    results['features']['nn20'] = td_features.nn20()
    results['features']['pnn20'] = td_features.pnn20()

    # Heart rate
    results['features']['mean_hr'] = 60000 / results['features']['mean_rr']

    # 4. FREQUENCY DOMAIN FEATURES
    print("Extracting frequency-domain features...")
    fd_features = FrequencyDomainFeatures(valid_rr, fs=4.0)  # Typical RR sampling ~4Hz

    # Compute PSD
    psd_results = fd_features.compute_psd(method='welch')
    results['features']['vlf_power'] = psd_results['vlf_power']
    results['features']['lf_power'] = psd_results['lf_power']
    results['features']['hf_power'] = psd_results['hf_power']
    results['features']['total_power'] = psd_results['total_power']
    results['features']['lf_hf_ratio'] = psd_results['lf_hf_ratio']

    # Normalized powers
    if results['features']['total_power'] > 0:
        results['features']['lf_nu'] = (results['features']['lf_power'] /
                                        (results['features']['lf_power'] + results['features']['hf_power']) * 100)
        results['features']['hf_nu'] = (results['features']['hf_power'] /
                                        (results['features']['lf_power'] + results['features']['hf_power']) * 100)

    # Peak frequencies
    results['features']['lf_peak'] = psd_results.get('lf_peak_frequency', 0)
    results['features']['hf_peak'] = psd_results.get('hf_peak_frequency', 0)

    # 5. NONLINEAR FEATURES
    print("Extracting nonlinear features...")
    nl_features = NonlinearFeatures(valid_rr)

    # Poincaré analysis
    poincare = nl_features.compute_poincare_features()
    results['features']['sd1'] = poincare['sd1']
    results['features']['sd2'] = poincare['sd2']
    results['features']['sd_ratio'] = poincare['sd_ratio']

    # Entropy measures
    results['features']['sample_entropy'] = nl_features.sample_entropy(m=2, r=0.2)
    results['features']['approximate_entropy'] = nl_features.approximate_entropy(m=2, r=0.2)

    # Detrended Fluctuation Analysis
    results['features']['dfa_alpha1'] = nl_features.dfa(scale_min=4, scale_max=16)
    results['features']['dfa_alpha2'] = nl_features.dfa(scale_min=16, scale_max=64)

    # 6. BEAT-TO-BEAT ANALYSIS
    print("Extracting beat-to-beat features...")
    bb_analysis = BeatToBeatAnalysis(filtered_ecg, fs=fs, r_peaks=r_peaks)

    # Beat-to-beat variability
    bb_features = bb_analysis.compute_variability_features()
    results['features']['beat_variability'] = bb_features.get('variability_index', 0)
    results['features']['successive_difference'] = bb_features.get('successive_difference', 0)

    # 7. ENERGY ANALYSIS
    print("Extracting energy features...")
    energy_analysis = EnergyAnalysis(filtered_ecg, fs=fs)

    # Signal energy distribution
    energy_features = energy_analysis.compute_energy_features()
    results['features']['total_energy'] = energy_features.get('total_energy', 0)
    results['features']['energy_mean'] = energy_features.get('mean_energy', 0)
    results['features']['energy_std'] = energy_features.get('std_energy', 0)

    # Spectral energy
    spectral_energy = energy_analysis.compute_spectral_energy()
    results['features']['spectral_energy'] = spectral_energy.get('total_spectral_energy', 0)

    # 8. CLINICAL INTERPRETATION
    results['interpretation'] = interpret_comprehensive_features(results['features'])

    return results

def interpret_comprehensive_features(features):
    """Provide comprehensive clinical interpretation."""

    interpretation = {
        'hrv_status': 'normal',
        'autonomic_balance': 'balanced',
        'cardiac_health': 'healthy',
        'risk_level': 'low',
        'recommendations': []
    }

    # Time-domain assessment
    sdnn = features.get('sdnn', 0)
    rmssd = features.get('rmssd', 0)
    pnn50 = features.get('pnn50', 0)

    if sdnn < 50:
        interpretation['hrv_status'] = 'reduced'
        interpretation['cardiac_health'] = 'at_risk'
        interpretation['risk_level'] = 'moderate'
        interpretation['recommendations'].append(
            'Reduced SDNN (<50ms) suggests decreased HRV and increased cardiac risk'
        )
    elif sdnn < 100:
        interpretation['hrv_status'] = 'fair'
        interpretation['recommendations'].append(
            'SDNN within fair range (50-100ms) - regular monitoring recommended'
        )
    else:
        interpretation['hrv_status'] = 'good'
        interpretation['recommendations'].append(
            'Good SDNN (>100ms) indicates healthy HRV'
        )

    # Frequency-domain assessment
    lf_hf_ratio = features.get('lf_hf_ratio', 1.0)

    if lf_hf_ratio < 0.5:
        interpretation['autonomic_balance'] = 'parasympathetic_dominant'
        interpretation['recommendations'].append(
            'Low LF/HF ratio (<0.5) suggests parasympathetic dominance'
        )
    elif lf_hf_ratio > 2.0:
        interpretation['autonomic_balance'] = 'sympathetic_dominant'
        interpretation['recommendations'].append(
            'High LF/HF ratio (>2.0) suggests sympathetic dominance or stress'
        )
    else:
        interpretation['autonomic_balance'] = 'balanced'
        interpretation['recommendations'].append(
            'LF/HF ratio within normal range indicates balanced autonomic function'
        )

    # Nonlinear assessment
    sd1 = features.get('sd1', 0)
    sd2 = features.get('sd2', 0)
    sample_entropy = features.get('sample_entropy', 0)

    if sd1 < 10:
        interpretation['recommendations'].append(
            'Low SD1 (<10ms) indicates reduced short-term variability'
        )

    if sample_entropy < 1.0:
        interpretation['recommendations'].append(
            'Low sample entropy (<1.0) suggests reduced signal complexity'
        )
    elif sample_entropy > 2.0:
        interpretation['recommendations'].append(
            'High sample entropy (>2.0) indicates good signal complexity'
        )

    # Overall risk assessment
    risk_factors = 0
    if sdnn < 50:
        risk_factors += 2
    if rmssd < 20:
        risk_factors += 1
    if lf_hf_ratio > 2.5:
        risk_factors += 1
    if sample_entropy < 1.0:
        risk_factors += 1

    if risk_factors >= 3:
        interpretation['risk_level'] = 'high'
        interpretation['cardiac_health'] = 'concerning'
        interpretation['recommendations'].append(
            '⚠️ Multiple risk factors detected - comprehensive cardiac evaluation recommended'
        )
    elif risk_factors >= 1:
        interpretation['risk_level'] = 'moderate'
        interpretation['cardiac_health'] = 'fair'
        interpretation['recommendations'].append(
            '⚠️ Some risk factors present - regular monitoring advised'
        )
    else:
        interpretation['risk_level'] = 'low'
        interpretation['cardiac_health'] = 'healthy'
        interpretation['recommendations'].append(
            '✓ All metrics within healthy ranges'
        )

    return interpretation

Usage Example:

# Generate or load ECG data
fs = 256
duration = 300  # 5 minutes
ecg_data = generate_ecg_signal(sfecg=fs, duration=duration, hrmean=75, Anoise=0.05)

# Extract comprehensive features
results = extract_comprehensive_features(ecg_data, fs, patient_id="P001")

# Display results
print(f"Patient ID: {results['patient_id']}")
print(f"\n{'='*60}")
print("TIME DOMAIN FEATURES")
print(f"{'='*60}")
print(f"Mean RR: {results['features']['mean_rr']:.2f} ms")
print(f"SDNN: {results['features']['sdnn']:.2f} ms")
print(f"RMSSD: {results['features']['rmssd']:.2f} ms")
print(f"pNN50: {results['features']['pnn50']:.2f} %")
print(f"Heart Rate: {results['features']['mean_hr']:.1f} BPM")

print(f"\n{'='*60}")
print("FREQUENCY DOMAIN FEATURES")
print(f"{'='*60}")
print(f"VLF Power: {results['features']['vlf_power']:.2f} ms²")
print(f"LF Power: {results['features']['lf_power']:.2f} ms²")
print(f"HF Power: {results['features']['hf_power']:.2f} ms²")
print(f"LF/HF Ratio: {results['features']['lf_hf_ratio']:.2f}")
print(f"Total Power: {results['features']['total_power']:.2f} ms²")

print(f"\n{'='*60}")
print("NONLINEAR FEATURES")
print(f"{'='*60}")
print(f"SD1: {results['features']['sd1']:.2f} ms")
print(f"SD2: {results['features']['sd2']:.2f} ms")
print(f"SD1/SD2 Ratio: {results['features']['sd_ratio']:.3f}")
print(f"Sample Entropy: {results['features']['sample_entropy']:.3f}")
print(f"Approximate Entropy: {results['features']['approximate_entropy']:.3f}")

print(f"\n{'='*60}")
print("CLINICAL INTERPRETATION")
print(f"{'='*60}")
interp = results['interpretation']
print(f"HRV Status: {interp['hrv_status']}")
print(f"Autonomic Balance: {interp['autonomic_balance']}")
print(f"Cardiac Health: {interp['cardiac_health']}")
print(f"Risk Level: {interp['risk_level']}")

print(f"\nRecommendations:")
for rec in interp['recommendations']:
    print(f"  {rec}")

# Export to DataFrame for further analysis
features_df = pd.DataFrame([results['features']])
features_df.to_csv(f"features_{results['patient_id']}.csv", index=False)
print(f"\n✅ Features exported to features_{results['patient_id']}.csv")

Example 9: Machine Learning for Physiological Signal Analysis

This example demonstrates machine learning applications including anomaly detection, signal denoising with autoencoders, and pattern classification.

Use Case: Automated signal quality assessment, artifact detection, and arrhythmia classification using deep learning.

Key Features: * Autoencoder-based signal denoising * Anomaly detection for artifact identification * CNN1D for pattern classification * Feature extraction for ML * Model training and evaluation * Real-time inference pipeline

Implementation:

import numpy as np
from vitalDSP.ml_models.deep_models import (
    StandardAutoencoder, ConvolutionalAutoencoder, CNN1D
)
from vitalDSP.advanced_computation.anomaly_detection import AnomalyDetection
from vitalDSP.ml_models.feature_extractor import FeatureExtractor
from vitalDSP.filtering.signal_filtering import SignalFiltering
from vitalDSP.utils.data_processing.synthesize_data import generate_ecg_signal
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

def ml_signal_analysis_pipeline(clean_signals, noisy_signals, fs, anomaly_threshold=2.5):
    """
    Complete ML pipeline for physiological signal analysis.

    Parameters:
    -----------
    clean_signals : list of arrays
        Clean reference signals for training
    noisy_signals : list of arrays
        Noisy signals to denoise
    fs : float
        Sampling frequency
    anomaly_threshold : float
        Z-score threshold for anomaly detection

    Returns:
    --------
    dict : ML analysis results including denoised signals and detected anomalies
    """

    results = {
        'denoising': {},
        'anomaly_detection': {},
        'classification': {},
        'model_performance': {}
    }

    # 1. SIGNAL DENOISING WITH AUTOENCODER
    print("="*60)
    print("1. AUTOENCODER-BASED SIGNAL DENOISING")
    print("="*60)

    # Prepare data for autoencoder
    signal_length = min([len(s) for s in clean_signals])
    X_clean = np.array([s[:signal_length] for s in clean_signals])
    X_noisy = np.array([s[:signal_length] for s in noisy_signals])

    # Normalize
    scaler = StandardScaler()
    X_clean_norm = scaler.fit_transform(X_clean)
    X_noisy_norm = scaler.transform(X_noisy)

    # Reshape for autoencoder (samples, timesteps, features)
    X_clean_reshaped = X_clean_norm.reshape(X_clean_norm.shape[0], X_clean_norm.shape[1], 1)
    X_noisy_reshaped = X_noisy_norm.reshape(X_noisy_norm.shape[0], X_noisy_norm.shape[1], 1)

    # Train Convolutional Autoencoder
    print("Training Convolutional Autoencoder...")
    autoencoder = ConvolutionalAutoencoder(
        input_shape=(signal_length, 1),
        latent_dim=32
    )

    history = autoencoder.fit(
        X_noisy_reshaped,
        X_clean_reshaped,
        epochs=50,
        batch_size=32,
        validation_split=0.2,
        verbose=0
    )

    # Denoise signals
    X_denoised = autoencoder.predict(X_noisy_reshaped)
    X_denoised = scaler.inverse_transform(X_denoised.reshape(X_denoised.shape[0], -1))

    # Calculate reconstruction error
    reconstruction_error = np.mean((X_clean - X_denoised) ** 2, axis=1)

    results['denoising']['clean_signals'] = X_clean
    results['denoising']['noisy_signals'] = X_noisy
    results['denoising']['denoised_signals'] = X_denoised
    results['denoising']['reconstruction_error'] = reconstruction_error
    results['denoising']['mean_error'] = np.mean(reconstruction_error)
    results['denoising']['std_error'] = np.std(reconstruction_error)

    print(f"✓ Denoising complete")
    print(f"  Mean Reconstruction Error: {results['denoising']['mean_error']:.4f}")
    print(f"  Std Reconstruction Error: {results['denoising']['std_error']:.4f}")

    # 2. ANOMALY DETECTION
    print(f"\n{'='*60}")
    print("2. ANOMALY DETECTION")
    print("="*60)

    anomaly_results = []
    for i, signal in enumerate(noisy_signals):
        detector = AnomalyDetection(signal)

        # Z-score based detection
        anomalies_zscore = detector.detect_anomalies(
            method='z_score',
            threshold=anomaly_threshold
        )

        # Statistical detection
        anomalies_stat = detector.detect_anomalies(
            method='statistical',
            window_size=100
        )

        # Isolation Forest
        try:
            anomalies_isolation = detector.detect_anomalies(
                method='isolation_forest',
                contamination=0.1
            )
        except:
            anomalies_isolation = []

        anomaly_info = {
            'signal_index': i,
            'z_score_anomalies': len(anomalies_zscore),
            'statistical_anomalies': len(anomalies_stat),
            'isolation_forest_anomalies': len(anomalies_isolation),
            'z_score_locations': anomalies_zscore,
            'signal_quality': 'good' if len(anomalies_zscore) < 10 else 'poor'
        }
        anomaly_results.append(anomaly_info)

    results['anomaly_detection']['per_signal'] = anomaly_results
    results['anomaly_detection']['total_signals'] = len(noisy_signals)
    results['anomaly_detection']['good_quality'] = sum(1 for a in anomaly_results if a['signal_quality'] == 'good')
    results['anomaly_detection']['poor_quality'] = sum(1 for a in anomaly_results if a['signal_quality'] == 'poor')

    print(f"✓ Anomaly detection complete")
    print(f"  Total signals analyzed: {results['anomaly_detection']['total_signals']}")
    print(f"  Good quality signals: {results['anomaly_detection']['good_quality']}")
    print(f"  Poor quality signals: {results['anomaly_detection']['poor_quality']}")

    # 3. FEATURE EXTRACTION FOR ML
    print(f"\n{'='*60}")
    print("3. FEATURE EXTRACTION FOR MACHINE LEARNING")
    print("="*60)

    extractor = FeatureExtractor()

    features_list = []
    for signal in X_denoised:
        # Extract comprehensive features
        features = extractor.extract_features(
            signal,
            fs=fs,
            feature_types=['statistical', 'temporal', 'spectral']
        )
        features_list.append(features)

    features_array = np.array(features_list)

    results['classification']['features'] = features_array
    results['classification']['n_features'] = features_array.shape[1]

    print(f"✓ Feature extraction complete")
    print(f"  Number of features extracted: {features_array.shape[1]}")
    print(f"  Feature dimensions: {features_array.shape}")

    # 4. CNN1D CLASSIFICATION (Example with synthetic labels)
    print(f"\n{'='*60}")
    print("4. CNN1D PATTERN CLASSIFICATION")
    print("="*60)

    # Create synthetic labels for demonstration (normal vs. abnormal)
    labels = np.array([0 if e < np.median(reconstruction_error) else 1
                      for e in reconstruction_error])

    # Split data
    X_train, X_test, y_train, y_test = train_test_split(
        X_clean_reshaped, labels, test_size=0.2, random_state=42, stratify=labels
    )

    # Train CNN1D classifier
    print("Training CNN1D classifier...")
    cnn = CNN1D(
        input_shape=(signal_length, 1),
        num_classes=2,
        num_filters=[32, 64, 128],
        kernel_size=3,
        pool_size=2
    )

    cnn_history = cnn.fit(
        X_train, y_train,
        epochs=30,
        batch_size=32,
        validation_split=0.2,
        verbose=0
    )

    # Evaluate
    test_loss, test_accuracy = cnn.evaluate(X_test, y_test)

    results['classification']['test_accuracy'] = test_accuracy
    results['classification']['test_loss'] = test_loss
    results['classification']['n_train'] = len(X_train)
    results['classification']['n_test'] = len(X_test)

    print(f"✓ Classification complete")
    print(f"  Test Accuracy: {test_accuracy:.2%}")
    print(f"  Test Loss: {test_loss:.4f}")
    print(f"  Training samples: {len(X_train)}")
    print(f"  Test samples: {len(X_test)}")

    return results

def evaluate_ml_pipeline(results):
    """Evaluate and display ML pipeline results."""

    print(f"\n{'='*60}")
    print("ML PIPELINE PERFORMANCE SUMMARY")
    print("="*60)

    # Denoising performance
    print(f"\n📊 Denoising Performance:")
    print(f"  Mean Reconstruction Error: {results['denoising']['mean_error']:.4f}")
    print(f"  Std Reconstruction Error: {results['denoising']['std_error']:.4f}")

    # Calculate SNR improvement
    original_noise = np.mean(np.var(results['denoising']['noisy_signals'] -
                                    results['denoising']['clean_signals'], axis=1))
    residual_noise = np.mean(np.var(results['denoising']['denoised_signals'] -
                                    results['denoising']['clean_signals'], axis=1))
    snr_improvement = 10 * np.log10(original_noise / (residual_noise + 1e-10))

    print(f"  SNR Improvement: {snr_improvement:.2f} dB")

    # Anomaly detection
    print(f"\n🔍 Anomaly Detection:")
    print(f"  Total signals: {results['anomaly_detection']['total_signals']}")
    print(f"  Good quality: {results['anomaly_detection']['good_quality']} "
          f"({results['anomaly_detection']['good_quality']/results['anomaly_detection']['total_signals']*100:.1f}%)")
    print(f"  Poor quality: {results['anomaly_detection']['poor_quality']} "
          f"({results['anomaly_detection']['poor_quality']/results['anomaly_detection']['total_signals']*100:.1f}%)")

    # Classification
    print(f"\n🎯 Classification Performance:")
    print(f"  Test Accuracy: {results['classification']['test_accuracy']:.2%}")
    print(f"  Number of features: {results['classification']['n_features']}")

    # Overall assessment
    print(f"\n{'='*60}")
    print("OVERALL ASSESSMENT")
    print("="*60)

    quality_score = (
        (snr_improvement / 10) * 30 +  # Denoising (30 points)
        (results['anomaly_detection']['good_quality'] /
         results['anomaly_detection']['total_signals']) * 30 +  # Quality (30 points)
        results['classification']['test_accuracy'] * 40  # Classification (40 points)
    )

    print(f"Quality Score: {quality_score:.1f}/100")

    if quality_score >= 80:
        print("✅ Excellent - ML pipeline performing optimally")
    elif quality_score >= 60:
        print("⚠️ Good - ML pipeline acceptable, some improvements possible")
    else:
        print("❌ Fair - ML pipeline needs optimization")

Usage Example:

# Generate training data
n_signals = 100
fs = 256
signal_length = 2560  # 10 seconds

print("Generating training data...")
clean_signals = []
noisy_signals = []

for i in range(n_signals):
    # Generate clean ECG
    clean = generate_ecg_signal(sfecg=fs, duration=10, hrmean=70+np.random.randn()*5, Anoise=0.0)
    clean_signals.append(clean[:signal_length])

    # Add noise
    noise = np.random.normal(0, 0.1, signal_length)
    noisy = clean[:signal_length] + noise
    noisy_signals.append(noisy)

# Run ML pipeline
results = ml_signal_analysis_pipeline(clean_signals, noisy_signals, fs=fs)

# Evaluate results
evaluate_ml_pipeline(results)

# Save models for deployment
print("\n💾 Models ready for deployment")
print("  - Autoencoder trained for real-time denoising")
print("  - Anomaly detector calibrated for quality assessment")
print("  - CNN1D classifier ready for pattern recognition")

These examples provide a solid foundation for implementing VitalDSP in various real-world scenarios. Adapt them to your specific needs and requirements.

For more advanced examples and use cases, explore the Jupyter notebooks in the sample_notebooks section.