Source code for vitalDSP_webapp.callbacks.analysis.respiratory_callbacks
"""
Respiratory analysis callbacks for vitalDSP webapp.
Handles comprehensive respiratory rate estimation and breathing pattern analysis using vitalDSP.
"""
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from dash import Input, Output, State, callback_context, no_update, html
from dash.exceptions import PreventUpdate
from scipy import signal
import dash_bootstrap_components as dbc
import logging
# Import plot utilities for performance optimization
try:
from vitalDSP_webapp.utils.plot_utils import limit_plot_data, check_plot_data_size
except ImportError:
# Fallback if plot_utils not available
def limit_plot_data(
time_axis, signal_data, max_duration=300, max_points=10000, start_time=None
):
"""Fallback implementation of limit_plot_data"""
return time_axis, signal_data
def check_plot_data_size(time_axis, signal_data):
"""Fallback implementation"""
return True
# Initialize logger first
logger = logging.getLogger(__name__)
# Initialize vitalDSP modules as None
RespiratoryAnalysis = None
peak_detection_rr = None
fft_based_rr = None
frequency_domain_rr = None
time_domain_rr = None
detect_apnea_amplitude = None
detect_apnea_pauses = None
multimodal_analysis = None
ppg_ecg_fusion = None
respiratory_cardiac_fusion = None
PPGAutonomicFeatures = None
ECGPPGSynchronization = None
PreprocessConfig = None
preprocess_signal = None
# Global variable for app instance (will be set by register_callbacks)
app = None
def _import_vitaldsp_modules():
"""Import vitalDSP modules with error handling."""
global RespiratoryAnalysis, peak_detection_rr, fft_based_rr, frequency_domain_rr
global time_domain_rr, detect_apnea_amplitude, detect_apnea_pauses, multimodal_analysis
global ppg_ecg_fusion, respiratory_cardiac_fusion, PPGAutonomicFeatures
global ECGPPGSynchronization, PreprocessConfig, preprocess_signal
logger.info("=== IMPORTING VITALDSP MODULES ===")
try:
from vitalDSP.respiratory_analysis.respiratory_analysis import (
RespiratoryAnalysis,
)
logger.info("✓ RespiratoryAnalysis imported successfully")
except Exception as e:
logger.error(f"✗ Failed to import RespiratoryAnalysis: {e}")
RespiratoryAnalysis = None
try:
from vitalDSP.respiratory_analysis.estimate_rr.peak_detection_rr import (
peak_detection_rr,
)
logger.info("✓ peak_detection_rr imported successfully")
except Exception as e:
logger.error(f"✗ Failed to import peak_detection_rr: {e}")
peak_detection_rr = None
try:
from vitalDSP.respiratory_analysis.estimate_rr.fft_based_rr import fft_based_rr
logger.info("✓ fft_based_rr imported successfully")
except Exception as e:
logger.error(f"✗ Failed to import fft_based_rr: {e}")
fft_based_rr = None
try:
from vitalDSP.respiratory_analysis.estimate_rr.frequency_domain_rr import (
frequency_domain_rr,
)
logger.info("✓ frequency_domain_rr imported successfully")
except Exception as e:
logger.error(f"✗ Failed to import frequency_domain_rr: {e}")
frequency_domain_rr = None
try:
from vitalDSP.respiratory_analysis.estimate_rr.time_domain_rr import (
time_domain_rr,
)
logger.info("✓ time_domain_rr imported successfully")
except Exception as e:
logger.error(f"✗ Failed to import time_domain_rr: {e}")
time_domain_rr = None
try:
from vitalDSP.respiratory_analysis.sleep_apnea_detection.amplitude_threshold import (
detect_apnea_amplitude,
)
logger.info("✓ detect_apnea_amplitude imported successfully")
except Exception as e:
logger.error(f"✗ Failed to import detect_apnea_amplitude: {e}")
detect_apnea_amplitude = None
try:
from vitalDSP.respiratory_analysis.sleep_apnea_detection.pause_detection import (
detect_apnea_pauses,
)
logger.info("✓ detect_apnea_pauses imported successfully")
except Exception as e:
logger.error(f"✗ Failed to import detect_apnea_pauses: {e}")
detect_apnea_pauses = None
try:
from vitalDSP.respiratory_analysis.fusion.multimodal_analysis import (
multimodal_analysis,
)
logger.info("✓ multimodal_analysis imported successfully")
except Exception as e:
logger.error(f"✗ Failed to import multimodal_analysis: {e}")
multimodal_analysis = None
try:
from vitalDSP.respiratory_analysis.fusion.ppg_ecg_fusion import ppg_ecg_fusion
logger.info("✓ ppg_ecg_fusion imported successfully")
except Exception as e:
logger.error(f"✗ Failed to import ppg_ecg_fusion: {e}")
ppg_ecg_fusion = None
try:
from vitalDSP.respiratory_analysis.fusion.respiratory_cardiac_fusion import (
respiratory_cardiac_fusion,
)
logger.info("✓ respiratory_cardiac_fusion imported successfully")
except Exception as e:
logger.error(f"✗ Failed to import respiratory_cardiac_fusion: {e}")
respiratory_cardiac_fusion = None
try:
from vitalDSP.feature_engineering.ppg_autonomic_features import (
PPGAutonomicFeatures,
)
logger.info("✓ PPGAutonomicFeatures imported successfully")
except Exception as e:
logger.error(f"✗ Failed to import PPGAutonomicFeatures: {e}")
PPGAutonomicFeatures = None
try:
from vitalDSP.feature_engineering.ecg_ppg_synchronization_features import (
ECGPPGSynchronization,
)
logger.info("✓ ECGPPGSynchronization imported successfully")
except Exception as e:
logger.error(f"✗ Failed to import ECGPPGSynchronization: {e}")
ECGPPGSynchronization = None
try:
from vitalDSP.preprocess.preprocess_operations import (
PreprocessConfig,
preprocess_signal,
)
logger.info("✓ PreprocessConfig and preprocess_signal imported successfully")
except Exception as e:
logger.error(f"✗ Failed to import PreprocessConfig/preprocess_signal: {e}")
PreprocessConfig = None
preprocess_signal = None
logger.info("=== VITALDSP MODULE IMPORT COMPLETED ===")
[docs]
def toggle_ensemble_options(estimation_methods):
"""Show/hide ensemble options based on selection."""
if estimation_methods and "ensemble" in estimation_methods:
return {"display": "block"}
return {"display": "none"}
[docs]
def register_respiratory_callbacks(app):
"""Register all respiratory analysis callbacks."""
logger.info("=== REGISTERING RESPIRATORY CALLBACKS ===")
logger.info(f"App type: {type(app)}")
# Import vitalDSP modules when callbacks are registered
_import_vitaldsp_modules()
# Auto-select signal type based on uploaded data
@app.callback(
[Output("resp-signal-type", "value")],
[Input("url", "pathname")],
[State("resp-signal-type", "value")],
prevent_initial_call=True,
)
def auto_select_resp_signal_type(pathname, current_signal_type):
"""Auto-select signal type based on uploaded data."""
logger.info("=== AUTO-SELECT RESP SIGNAL TYPE CALLBACK TRIGGERED ===")
logger.info(f"Pathname: {pathname}, Current selection: {current_signal_type}")
if pathname != "/respiratory":
logger.info("Not on respiratory page, preventing update")
raise PreventUpdate
# If user has already made a selection, preserve it
if current_signal_type is not None:
logger.info(
f"User has existing signal type selection: {current_signal_type}, preserving it"
)
raise PreventUpdate
try:
from vitalDSP_webapp.services.data.enhanced_data_service import (
get_enhanced_data_service,
)
data_service = get_enhanced_data_service()
if not data_service:
logger.warning("Data service not available")
return ["PPG"]
# Get the latest data
all_data = data_service.get_all_data()
if not all_data:
logger.info("No data available, using defaults")
return ["PPG"]
# Get the most recent data
latest_data_id = max(
all_data.keys(), key=lambda x: int(x.split("_")[1]) if "_" in x else 0
)
data_info = data_service.get_data_info(latest_data_id)
if not data_info:
logger.info("No data info available, using defaults")
return ["PPG"]
# Debug: Log the data_info to see what's stored
logger.info(
f"Resp data info keys: {list(data_info.keys()) if data_info else 'None'}"
)
logger.info(f"Resp full data info: {data_info}")
# First, check if signal type is stored in data info
stored_signal_type = data_info.get("signal_type", None)
logger.info(f"Resp stored signal type: {stored_signal_type}")
if stored_signal_type and stored_signal_type.lower() != "auto":
# Convert stored value to match respiratory screen dropdown format (lowercase)
signal_type = stored_signal_type.lower()
logger.info(f"Resp using stored signal type: {signal_type}")
else:
# Auto-detect signal type based on data characteristics
signal_type = "PPG" # Default for respiratory screen
logger.info("Resp auto-detecting signal type from data characteristics")
# Try to detect signal type from column names or data characteristics if not stored
if (
stored_signal_type
and stored_signal_type.lower() == "auto"
or not stored_signal_type
):
df = data_service.get_data(latest_data_id)
if df is not None and not df.empty:
column_mapping = data_service.get_column_mapping(latest_data_id)
signal_column = column_mapping.get("signal", "")
# Check column names for signal type hints
if any(
keyword in signal_column.lower()
for keyword in ["ecg", "electrocardio"]
):
signal_type = "ECG"
logger.info("Auto-detected ECG signal type from column name")
elif any(
keyword in signal_column.lower()
for keyword in ["ppg", "pleth", "photopleth"]
):
signal_type = "PPG"
logger.info("Auto-detected PPG signal type from column name")
else:
# Try to detect from data characteristics
try:
signal_data = (
df[signal_column].values
if signal_column
else df.iloc[:, 1].values
)
sampling_freq = data_info.get("sampling_freq", 1000)
# Simple heuristic: ECG typically has higher frequency content
from scipy import signal
f, psd = signal.welch(
signal_data,
fs=sampling_freq,
nperseg=min(1024, len(signal_data) // 4),
)
dominant_freq = f[np.argmax(psd)]
if (
dominant_freq > 1.0
): # Higher frequency content suggests ECG
signal_type = "ECG"
logger.info(
"Auto-detected ECG signal type from frequency analysis"
)
else:
signal_type = "PPG"
logger.info(
"Auto-detected PPG signal type from frequency analysis"
)
except Exception as e:
logger.warning(
f"Could not analyze signal characteristics: {e}"
)
signal_type = "PPG"
logger.info(f"Auto-selected respiratory signal type: {signal_type}")
return [signal_type]
except Exception as e:
logger.error(f"Error in auto-selection: {e}")
return ["PPG"]
@app.callback(
Output("resp-ensemble-options", "style"),
Input("resp-estimation-methods", "value"),
prevent_initial_call=False,
)
def toggle_ensemble_options_callback(estimation_methods):
"""Show/hide ensemble options based on selection."""
return toggle_ensemble_options(estimation_methods)
@app.callback(
[
Output("resp-main-plot", "figure"),
Output("resp-analysis-results", "children"),
Output("resp-analysis-plots", "figure"),
Output("resp-data-store", "data"),
Output("resp-features-store", "data"),
],
[
Input("url", "pathname"), # Trigger on page navigation
Input("resp-analyze-btn", "n_clicks"),
Input("resp-btn-nudge-m10", "n_clicks"),
Input("resp-btn-nudge-m1", "n_clicks"),
Input("resp-btn-nudge-p1", "n_clicks"),
Input("resp-btn-nudge-p10", "n_clicks"),
],
[
State(
"resp-start-position-slider", "value"
), # NEW: start position instead of time-range-slider
State(
"resp-duration-select", "value"
), # NEW: duration instead of start-time/end-time
State("resp-signal-type", "value"),
State("resp-estimation-methods", "value"),
State("resp-advanced-options", "value"),
# REMOVED: State("resp-preprocessing-options", "value") - preprocessing done on filtering page
# REMOVED: State("resp-low-cut", "value") - preprocessing done on filtering page
# REMOVED: State("resp-high-cut", "value") - preprocessing done on filtering page
State("resp-min-breath-duration", "value"),
State("resp-max-breath-duration", "value"),
State("resp-ensemble-method", "value"),
State(
"store-filtered-signal", "data"
), # NEW: Access to filtered signal from filtering page
],
)
def respiratory_analysis_callback(
pathname, # Input - triggers on page navigation
n_clicks,
nudge_m10,
nudge_m1,
nudge_p1,
nudge_p10,
start_position, # State: start position instead of slider_value
duration, # State: duration instead of start_time/end_time
signal_type,
estimation_methods,
advanced_options,
# REMOVED: preprocessing_options - preprocessing done on filtering page
# REMOVED: low_cut - preprocessing done on filtering page
# REMOVED: high_cut - preprocessing done on filtering page
min_breath_duration,
max_breath_duration,
ensemble_method,
filtered_signal_data, # State: Filtered signal data from filtering page
):
"""Unified callback for respiratory analysis - handles both page load and user interactions."""
ctx = callback_context
# Determine what triggered this callback
if not ctx.triggered:
logger.warning("No context triggered - raising PreventUpdate")
raise PreventUpdate
trigger_id = ctx.triggered[0]["prop_id"].split(".")[0]
logger.info("=== RESPIRATORY ANALYSIS CALLBACK TRIGGERED ===")
logger.info(f"Trigger ID: {trigger_id}")
logger.info(f"Pathname: {pathname}")
logger.info("All callback parameters:")
logger.info(f" - n_clicks: {n_clicks}")
logger.info(f" - start_position: {start_position}")
logger.info(f" - duration: {duration}")
logger.info(f" - signal_type: {signal_type}")
logger.info(f" - estimation_methods: {estimation_methods}")
logger.info(f" - advanced_options: {advanced_options}")
# REMOVED: preprocessing_options, low_cut, high_cut - preprocessing done on filtering page
logger.info(f" - min_breath_duration: {min_breath_duration}")
logger.info(f" - max_breath_duration: {max_breath_duration}")
# Only run this when we're on the respiratory page
if pathname != "/respiratory":
logger.info("Not on respiratory page, returning empty figures")
return (
create_empty_figure(),
"Navigate to Respiratory Analysis page",
create_empty_figure(),
None,
None,
)
# If this is the first time loading the page (no button clicks), show a message
if not ctx.triggered or ctx.triggered[0]["prop_id"].split(".")[0] == "url":
logger.info("First time loading respiratory page, attempting to load data")
try:
# Get data from the data service
logger.info("Attempting to get data service...")
from vitalDSP_webapp.services.data.enhanced_data_service import (
get_enhanced_data_service,
)
data_service = get_enhanced_data_service()
logger.info("Data service retrieved successfully")
# Get the most recent data
logger.info("Retrieving all data from service...")
all_data = data_service.get_all_data()
logger.info(
f"All data keys: {list(all_data.keys()) if all_data else 'None'}"
)
if not all_data:
logger.warning("No data found in service")
return (
create_empty_figure(),
"No data available. Please upload and process data first.",
create_empty_figure(),
None,
None,
)
# Get the most recent data entry
latest_data_id = list(all_data.keys())[-1]
latest_data = all_data[latest_data_id]
logger.info(f"Latest data ID: {latest_data_id}")
logger.info(f"Latest data info: {latest_data.get('info', 'No info')}")
# Get column mapping
logger.info("Retrieving column mapping...")
column_mapping = data_service.get_column_mapping(latest_data_id)
logger.info(f"Column mapping: {column_mapping}")
if not column_mapping:
logger.warning(
"Data has not been processed yet - no column mapping found"
)
return (
create_empty_figure(),
"Please process your data on the Upload page first (configure column mapping)",
create_empty_figure(),
None,
None,
)
# Get the actual data
logger.info("Retrieving data frame...")
df = data_service.get_data(latest_data_id)
logger.info(f"Data frame shape: {df.shape if df is not None else 'None'}")
logger.info(
f"Data frame columns: {list(df.columns) if df is not None else 'None'}"
)
if df is None or df.empty:
logger.warning("Data frame is empty")
return (
create_empty_figure(),
"Data is empty or corrupted.",
create_empty_figure(),
None,
None,
)
# Get sampling frequency from the data info
sampling_freq = latest_data.get("info", {}).get("sampling_freq", 1000)
logger.info(f"Sampling frequency: {sampling_freq}")
# Set default values
if start_position is None:
start_position = 0
if duration is None:
duration = 60
# Handle time window adjustments for nudge buttons
# Nudge buttons adjust the position by percentage
if trigger_id in [
"resp-btn-nudge-m10",
"resp-btn-nudge-m1",
"resp-btn-nudge-p1",
"resp-btn-nudge-p10",
]:
if trigger_id == "resp-btn-nudge-m10":
start_position = max(0, start_position - 10) # -10%
elif trigger_id == "resp-btn-nudge-m1":
start_position = max(0, start_position - 5) # -5%
elif trigger_id == "resp-btn-nudge-p1":
start_position = min(100, start_position + 5) # +5%
elif trigger_id == "resp-btn-nudge-p10":
start_position = min(100, start_position + 10) # +10%
logger.info(f"Position adjusted: {start_position}%")
# Get total signal duration in seconds
time_column = column_mapping.get("time")
if time_column and time_column in df.columns:
time_diff = df[time_column].iloc[-1] - df[time_column].iloc[0]
# Convert Timedelta to seconds if necessary
if hasattr(time_diff, "total_seconds"):
total_duration = time_diff.total_seconds()
else:
total_duration = float(time_diff)
else:
total_duration = len(df) / sampling_freq
# Convert start position (0-100%) to actual start time in seconds
actual_start_time = (start_position / 100.0) * total_duration
actual_end_time = actual_start_time + duration
# Ensure end time doesn't exceed total duration
if actual_end_time > total_duration:
actual_end_time = total_duration
actual_start_time = max(0, actual_end_time - duration)
logger.info(f"Time window calculation:")
logger.info(f" Start position: {start_position}%")
logger.info(f" Duration: {duration}s")
logger.info(f" Total duration: {total_duration:.2f}s")
logger.info(f" Actual start time: {actual_start_time:.2f}s")
logger.info(f" Actual end time: {actual_end_time:.2f}s")
start_time = actual_start_time
end_time = actual_end_time
# Apply time window
start_sample = int(start_time * sampling_freq)
end_sample = int(end_time * sampling_freq)
windowed_data = df.iloc[start_sample:end_sample].copy()
# Create time axis
time_axis = np.arange(len(windowed_data)) / sampling_freq
# Get signal column
signal_column = column_mapping.get("signal")
logger.info(f"Signal column from mapping: {signal_column}")
logger.info(
f"Available columns in windowed data: {list(windowed_data.columns)}"
)
if not signal_column or signal_column not in windowed_data.columns:
logger.warning(f"Signal column {signal_column} not found in data")
return (
create_empty_figure(),
"Signal column not found in data.",
create_empty_figure(),
None,
None,
)
signal_data = windowed_data[signal_column].values
logger.info(f"Signal data shape: {signal_data.shape}")
logger.info(
f"Signal data range: {np.min(signal_data):.3f} to {np.max(signal_data):.3f}"
)
logger.info(f"Signal data mean: {np.mean(signal_data):.3f}")
# Data validation: Check for extreme values that indicate data corruption
signal_range = np.max(signal_data) - np.min(signal_data)
signal_std = np.std(signal_data)
# Check for suspiciously large values (likely data corruption)
if (
signal_range > 1e6
or signal_std > 1e6
or np.any(np.abs(signal_data) > 1e6)
):
logger.error(
f"⚠️ DETECTED EXTREME SIGNAL VALUES - Possible data corruption!"
)
logger.error(f" Signal range: {signal_range:.3e}")
logger.error(f" Signal std: {signal_std:.3e}")
logger.error(
f" Max absolute value: {np.max(np.abs(signal_data)):.3e}"
)
logger.error(
f" This suggests data corruption or incorrect data type conversion"
)
# Try to fix by normalizing the signal
logger.info("Attempting to fix by normalizing signal...")
try:
# Check if it's a scaling issue (values are too large by a factor)
if np.max(np.abs(signal_data)) > 1e6:
# Try to scale down by finding a reasonable factor
scale_factor = 1e6 / np.max(np.abs(signal_data))
signal_data = signal_data * scale_factor
logger.info(f"Scaled signal down by factor {scale_factor:.3e}")
logger.info(
f"New signal range: {np.min(signal_data):.3f} to {np.max(signal_data):.3f}"
)
else:
# If values are still reasonable, just normalize
signal_data = (signal_data - np.mean(signal_data)) / np.std(
signal_data
)
logger.info("Normalized signal to zero mean and unit variance")
logger.info(
f"New signal range: {np.min(signal_data):.3f} to {np.max(signal_data):.3f}"
)
except Exception as e:
logger.error(f"Failed to fix signal data: {e}")
logger.error("Using original data (may cause analysis issues)")
else:
logger.info(
"✓ Signal data validation passed - values are within reasonable range"
)
# Check for filtered signal from filtering page
if filtered_signal_data is not None:
logger.info("Checking for filtered signal from filtering page store...")
try:
if (
isinstance(filtered_signal_data, dict)
and "signal" in filtered_signal_data
):
filtered_signal = np.array(filtered_signal_data["signal"])
logger.info(
f"Retrieved filtered signal from filtering page store: {filtered_signal.shape}"
)
# Use filtered signal if it matches our time window
# The filtered signal should be the full signal, so we need to window it
if len(filtered_signal) >= end_sample:
signal_data = filtered_signal[start_sample:end_sample]
logger.info(
f"Using filtered signal data (windowed): {signal_data.shape}"
)
# Validate filtered signal data as well
filtered_range = np.max(signal_data) - np.min(signal_data)
filtered_std = np.std(signal_data)
if (
filtered_range > 1e6
or filtered_std > 1e6
or np.any(np.abs(signal_data) > 1e6)
):
logger.error(
f"⚠️ FILTERED SIGNAL ALSO HAS EXTREME VALUES!"
)
logger.error(
f" Filtered signal range: {filtered_range:.3e}"
)
logger.error(
f" Filtered signal std: {filtered_std:.3e}"
)
logger.error(
f" This suggests the filtering process created corrupted data"
)
# Try to fix the filtered signal
try:
if np.max(np.abs(signal_data)) > 1e6:
scale_factor = 1e6 / np.max(np.abs(signal_data))
signal_data = signal_data * scale_factor
logger.info(
f"Scaled filtered signal down by factor {scale_factor:.3e}"
)
else:
signal_data = (
signal_data - np.mean(signal_data)
) / np.std(signal_data)
logger.info("Normalized filtered signal")
except Exception as e:
logger.error(f"Failed to fix filtered signal: {e}")
logger.info("Falling back to original signal data")
signal_data = windowed_data[signal_column].values
else:
logger.info("✓ Filtered signal data validation passed")
else:
logger.warning(
f"Filtered signal too short ({len(filtered_signal)} < {end_sample}), using original"
)
except Exception as e:
logger.error(f"Error retrieving filtered signal: {e}")
logger.info("Falling back to original signal data")
# Auto-detect signal type if needed
if signal_type == "auto":
logger.info("Auto-detecting signal type...")
signal_type = detect_respiratory_signal_type(signal_data, sampling_freq)
logger.info(f"Auto-detected signal type: {signal_type}")
logger.info(f"Final signal type: {signal_type}")
# Extract respiratory signal from ECG/PPG if needed
respiratory_signal = signal_data.copy()
if signal_type in ["ecg", "ppg"]:
logger.info(
f"Extracting respiratory signal from {signal_type.upper()}..."
)
try:
# Use vitalDSP's respiratory_filtering function
from vitalDSP.preprocess.preprocess_operations import (
respiratory_filtering,
)
# Convert breath duration constraints to frequency cutoffs
# Frequency (Hz) = 1 / Duration (s)
# Default: min_duration=1.2s → max_freq=0.833Hz, max_duration=6.0s → min_freq=0.167Hz
min_duration = (
min_breath_duration if min_breath_duration else 1.2
) # Default 1.2s (50 BPM)
max_duration = (
max_breath_duration if max_breath_duration else 6.0
) # Default 6.0s (10 BPM)
# Calculate frequency range from breath duration constraints
highcut_freq = (
1.0 / min_duration
) # Shortest breath → highest frequency
lowcut_freq = (
1.0 / max_duration
) # Longest breath → lowest frequency
logger.info(
f"Breath duration constraints: {min_duration:.1f}s to {max_duration:.1f}s"
)
logger.info(
f"Respiratory frequency range: {lowcut_freq:.3f} Hz to {highcut_freq:.3f} Hz"
)
logger.info(
f"Respiratory rate range: {60/max_duration:.1f} to {60/min_duration:.1f} BPM"
)
# Apply respiratory filtering with user-specified breath duration constraints
respiratory_signal = respiratory_filtering(
signal_data,
sampling_freq,
lowcut=lowcut_freq,
highcut=highcut_freq,
order=4,
)
logger.info(
f"Successfully extracted respiratory signal from {signal_type.upper()} using vitalDSP"
)
logger.info(
f" - Signal range: {np.min(respiratory_signal):.3f} to {np.max(respiratory_signal):.3f}"
)
logger.info(
f" - Signal mean: {np.mean(respiratory_signal):.3f}, std: {np.std(respiratory_signal):.3f}"
)
# Validate extracted respiratory signal
resp_range = np.max(respiratory_signal) - np.min(respiratory_signal)
resp_std = np.std(respiratory_signal)
if (
resp_range > 1e6
or resp_std > 1e6
or np.any(np.abs(respiratory_signal) > 1e6)
):
logger.error(
f"⚠️ RESPIRATORY EXTRACTION CREATED EXTREME VALUES!"
)
logger.error(f" Respiratory signal range: {resp_range:.3e}")
logger.error(f" Respiratory signal std: {resp_std:.3e}")
logger.error(
f" This suggests the respiratory_filtering function has issues"
)
# Try to fix the respiratory signal
try:
if np.max(np.abs(respiratory_signal)) > 1e6:
scale_factor = 1e6 / np.max(np.abs(respiratory_signal))
respiratory_signal = respiratory_signal * scale_factor
logger.info(
f"Scaled respiratory signal down by factor {scale_factor:.3e}"
)
else:
respiratory_signal = (
respiratory_signal - np.mean(respiratory_signal)
) / np.std(respiratory_signal)
logger.info("Normalized respiratory signal")
except Exception as e:
logger.error(f"Failed to fix respiratory signal: {e}")
logger.info("Falling back to original signal")
respiratory_signal = signal_data
else:
logger.info("✓ Respiratory signal extraction validation passed")
except Exception as e:
logger.error(f"Error extracting respiratory signal: {e}")
logger.info("Falling back to original signal")
respiratory_signal = signal_data
else:
logger.info(
f"Signal type '{signal_type}' is already respiratory signal, no extraction needed"
)
logger.info(f"Estimation methods: {estimation_methods}")
logger.info(f"Advanced options: {advanced_options}")
# REMOVED: preprocessing_options logging - preprocessing done on filtering page
# Create main respiratory signal plot with annotations
logger.info("Creating main respiratory signal plot with annotations...")
main_plot = create_respiratory_signal_plot(
respiratory_signal, # Use extracted respiratory signal instead of raw ECG/PPG
time_axis,
sampling_freq,
signal_type,
estimation_methods,
# REMOVED: preprocessing_options, low_cut, high_cut - preprocessing done on filtering page
)
logger.info("Main respiratory signal plot created successfully")
logger.info(f"Main plot type: {type(main_plot)}")
# Generate comprehensive respiratory analysis results
logger.info("Generating comprehensive respiratory analysis results...")
analysis_results = generate_comprehensive_respiratory_analysis(
respiratory_signal, # Use extracted respiratory signal
time_axis,
sampling_freq,
signal_type,
estimation_methods,
advanced_options,
# REMOVED: preprocessing_options, low_cut, high_cut - preprocessing done on filtering page
min_breath_duration,
max_breath_duration,
ensemble_method,
)
logger.info("Respiratory analysis results generated successfully")
logger.info(f"Analysis results type: {type(analysis_results)}")
logger.info(
f"Analysis results content length: {len(str(analysis_results)) if analysis_results else 'None'}"
)
# Create respiratory analysis plots
logger.info("Creating respiratory analysis plots...")
analysis_plots = create_comprehensive_respiratory_plots(
respiratory_signal, # Use extracted respiratory signal
time_axis,
sampling_freq,
signal_type,
estimation_methods,
advanced_options,
# REMOVED: preprocessing_options, low_cut, high_cut - preprocessing done on filtering page
)
logger.info("Respiratory analysis plots created successfully")
logger.info(f"Analysis plots type: {type(analysis_plots)}")
# Store processed data
resp_data = {
"signal_data": signal_data.tolist(), # Original ECG/PPG
"respiratory_signal": respiratory_signal.tolist(), # Extracted respiratory signal
"time_axis": time_axis.tolist(),
"sampling_freq": sampling_freq,
"window": [start_position, end_time],
"signal_type": signal_type,
"estimation_methods": estimation_methods,
}
resp_features = {
"advanced_options": advanced_options,
# REMOVED: preprocessing_options, filter_params - preprocessing done on filtering page
"breath_constraints": {
"min_duration": min_breath_duration,
"max_duration": max_breath_duration,
},
}
logger.info("Respiratory analysis completed successfully")
logger.info("Returning results:")
logger.info(f" - Main plot: {type(main_plot)}")
logger.info(f" - Analysis results: {type(analysis_results)}")
logger.info(f" - Analysis plots: {type(analysis_plots)}")
logger.info(f" - Resp data: {type(resp_data)}")
logger.info(f" - Resp features: {type(resp_features)}")
return main_plot, analysis_results, analysis_plots, resp_data, resp_features
except Exception as e:
logger.error(f"Error in respiratory analysis callback: {e}")
import traceback
traceback.print_exc()
return (
create_empty_figure(),
f"Error in analysis: {str(e)}",
create_empty_figure(),
None,
None,
)
# REMOVED: Orphaned callback update_resp_time_inputs - referenced non-existent components
# (resp-start-time, resp-end-time, resp-time-range-slider)
@app.callback(
[
Output("resp-start-position-slider", "min"),
Output("resp-start-position-slider", "max"),
Output("resp-start-position-slider", "value"),
],
[Input("url", "pathname")],
prevent_initial_call=True, # Prevent triggering on page load
)
def update_resp_time_slider_range(pathname):
"""Update time slider range based on data duration."""
logger.info("=== UPDATE RESP TIME SLIDER RANGE ===")
logger.info(f"Pathname: {pathname}")
# Only run this when we're on the respiratory page
if pathname != "/respiratory":
return 0, 100, [0, 10]
try:
# Get data from the data service
from vitalDSP_webapp.services.data.enhanced_data_service import (
get_enhanced_data_service,
)
data_service = get_enhanced_data_service()
# Get the most recent data
all_data = data_service.get_all_data()
if not all_data:
logger.warning("No data found in service")
return 0, 100, [0, 10]
# Get the most recent data entry
latest_data_id = list(all_data.keys())[-1]
latest_data = all_data[latest_data_id]
# Get the actual data
df = data_service.get_data(latest_data_id)
if df is None or df.empty:
logger.warning("Data frame is empty")
return 0, 100, [0, 10]
# Get sampling frequency from the data info
sampling_freq = latest_data.get("info", {}).get("sampling_freq", 1000)
max_time = len(df) / sampling_freq
logger.info(f"Max time: {max_time}, Sampling freq: {sampling_freq}")
return 0, max_time, [0, min(10, max_time)]
except Exception as e:
logger.error(f"Error updating resp time slider range: {e}")
return 0, 100, [0, 10]
logger.info("=== RESPIRATORY CALLBACKS REGISTERED SUCCESSFULLY ===")
@app.callback(
Output("resp-additional-analysis-section", "children"),
[Input("resp-advanced-options", "value")],
)
def update_additional_analysis_section(advanced_options):
"""Update additional analysis section based on selected options.
Note: This is currently a placeholder. All analysis results are shown in the main
resp-analysis-results div. This section can be used for future additional analysis
displays if needed.
"""
# Return empty div - all analysis is shown in the main analysis card
return html.Div()
@app.callback(
Output("resp-btn-export-results", "n_clicks"),
[Input("resp-btn-export-results", "n_clicks")],
[State("resp-data-store", "data"), State("resp-features-store", "data")],
)
def export_respiratory_results(n_clicks, resp_data, resp_features):
"""Export respiratory analysis results."""
if not n_clicks or not resp_data:
return no_update
try:
# Convert to JSON and trigger download
# This would typically trigger a download in a real implementation
logger.info("Respiratory analysis results exported successfully")
except Exception as e:
logger.error(f"Error exporting respiratory results: {e}")
return no_update
[docs]
def create_empty_figure():
"""Create an empty figure for when no data is available."""
fig = go.Figure()
fig.add_annotation(
text="No data available<br>Please upload data first",
xref="paper",
yref="paper",
x=0.5,
y=0.5,
showarrow=False,
font=dict(size=16, color="gray"),
)
fig.update_layout(
template="plotly_white",
height=400,
margin=dict(l=40, r=40, t=40, b=40),
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
)
return fig
[docs]
def detect_respiratory_signal_type(signal_data, sampling_freq):
"""Auto-detect respiratory signal type based on signal characteristics."""
try:
# Calculate basic statistics
std_val = np.std(signal_data)
range_val = np.max(signal_data) - np.min(signal_data)
# Calculate frequency content
fft_result = np.abs(np.fft.rfft(signal_data))
freqs = np.fft.rfftfreq(len(signal_data), 1 / sampling_freq)
# Find dominant frequency
peak_idx = np.argmax(fft_result)
dominant_freq = freqs[peak_idx]
# Respiratory signals typically have dominant frequency around 0.2-0.5 Hz (12-30 BPM)
# PPG respiratory component is usually lower amplitude
# ECG respiratory component is more complex
if 0.1 < dominant_freq < 0.8 and range_val < 2 * std_val:
return "ppg"
elif 0.1 < dominant_freq < 1.0 and range_val > 3 * std_val:
return "respiratory"
elif dominant_freq > 0.5:
return "ecg"
else:
return "ppg" # Default to PPG for most cases
except Exception as e:
logger.warning(f"Respiratory signal type detection failed: {e}")
return "ppg" # Default fallback
[docs]
def create_respiratory_signal_plot(
signal_data,
time_axis,
sampling_freq,
signal_type,
estimation_methods,
# REMOVED: preprocessing_options, low_cut, high_cut - preprocessing done on filtering page
):
"""Create the main respiratory signal plot with annotations.
Note: Preprocessing (filtering, denoising) should be done on the filtering page.
This function receives already-filtered signal data from store-filtered-signal.
"""
logger.info("=== CREATE RESPIRATORY SIGNAL PLOT STARTED ===")
logger.info("Input parameters:")
logger.info(f" - signal_data shape: {signal_data.shape}")
logger.info(f" - time_axis shape: {time_axis.shape}")
logger.info(f" - sampling_freq: {sampling_freq}")
logger.info(f" - signal_type: {signal_type}")
logger.info(f" - estimation_methods: {estimation_methods}")
# REMOVED: preprocessing_options, low_cut, high_cut logging - preprocessing done on filtering page
try:
fig = go.Figure()
# REMOVED: Preprocessing logic - all preprocessing is done on the filtering page
# Signal data received here is already filtered from store-filtered-signal
# PERFORMANCE OPTIMIZATION: Limit plot data to max 5 minutes and 10K points
time_axis_plot, signal_data_plot = limit_plot_data(
time_axis,
signal_data,
max_duration=300, # 5 minutes max
max_points=10000, # 10K points max
)
logger.info(
f"Plot data limited: {len(signal_data)} → {len(signal_data_plot)} points"
)
# Create the main signal plot (using limited data)
fig.add_trace(
go.Scatter(
x=time_axis_plot,
y=signal_data_plot,
mode="lines",
name=f"{signal_type.upper()} Signal",
line=dict(color="#2E86AB", width=2),
hovertemplate="<b>Time:</b> %{x:.3f}s<br><b>Amplitude:</b> %{y:.3f}<extra></extra>",
)
)
# Add breathing pattern detection if enabled (use limited data for performance)
if estimation_methods and "peak_detection" in estimation_methods:
try:
# Detect breathing peaks on limited data
prominence = 0.3 * np.std(signal_data_plot)
distance = int(0.5 * sampling_freq) # Minimum 0.5s between breaths
peaks, properties = signal.find_peaks(
signal_data_plot, prominence=prominence, distance=distance
)
if len(peaks) > 0:
fig.add_trace(
go.Scatter(
x=time_axis_plot[peaks],
y=signal_data_plot[peaks],
mode="markers",
name="Breathing Peaks",
marker=dict(color="red", size=8, symbol="diamond"),
hovertemplate="<b>Breath:</b> %{y:.3f}<br><b>Time:</b> %{x:.3f}s<extra></extra>",
)
)
# Add breath annotations
for i, peak in enumerate(peaks[:10]): # Limit to first 10 breaths
fig.add_annotation(
x=time_axis_plot[peak],
y=signal_data_plot[peak],
text=f"B{i+1}",
showarrow=True,
arrowhead=2,
arrowsize=1,
arrowwidth=2,
arrowcolor="red",
ax=0,
ay=-40,
)
# Add breathing rate annotation
if len(peaks) > 1:
breath_intervals = np.diff(peaks) / sampling_freq
breathing_rate = (
60 / np.mean(breath_intervals)
if len(breath_intervals) > 0
else 0
)
fig.add_annotation(
x=0.02,
y=0.98,
xref="paper",
yref="paper",
text=f"Breathing Rate: {breathing_rate:.1f} BPM",
showarrow=False,
bgcolor="rgba(255,255,255,0.8)",
bordercolor="red",
borderwidth=2,
)
except Exception as e:
logger.error(f"Breathing peak detection failed: {e}")
# Add baseline
baseline = np.mean(signal_data_plot) if len(signal_data_plot) > 0 else 0
fig.add_hline(
y=baseline,
line_dash="dash",
line_color="gray",
annotation_text=f"Baseline: {baseline:.3f}",
)
logger.info("Updating plot layout...")
fig.update_layout(
title=f"{signal_type.upper()} Respiratory Signal Analysis",
xaxis_title="Time (seconds)",
yaxis_title="Amplitude",
template="plotly_white",
height=400,
margin=dict(l=40, r=40, t=60, b=40),
showlegend=True,
hovermode="closest",
)
logger.info("=== CREATE RESPIRATORY SIGNAL PLOT COMPLETED ===")
return fig
except Exception as e:
logger.error(f"Error creating respiratory signal plot: {e}")
return create_empty_figure()
[docs]
def generate_comprehensive_respiratory_analysis(
signal_data,
time_axis,
sampling_freq,
signal_type,
estimation_methods,
advanced_options,
# REMOVED: preprocessing_options, low_cut, high_cut - preprocessing done on filtering page
min_breath_duration,
max_breath_duration,
ensemble_method=None,
):
"""Generate comprehensive respiratory analysis results using vitalDSP.
Note: Preprocessing (filtering, denoising) should be done on the filtering page.
This function receives already-filtered signal data from store-filtered-signal.
"""
logger.info("=== GENERATE COMPREHENSIVE RESPIRATORY ANALYSIS STARTED ===")
logger.info("Input parameters:")
logger.info(f" - signal_data shape: {signal_data.shape}")
logger.info(f" - time_axis shape: {time_axis.shape}")
logger.info(f" - sampling_freq: {sampling_freq}")
logger.info(f" - signal_type: {signal_type}")
logger.info(f" - estimation_methods: {estimation_methods}")
logger.info(f" - advanced_options: {advanced_options}")
# REMOVED: preprocessing_options, low_cut, high_cut - preprocessing done on filtering page
logger.info(f" - min_breath_duration: {min_breath_duration}")
logger.info(f" - max_breath_duration: {max_breath_duration}")
logger.info(f" - ensemble_method: {ensemble_method}")
try:
results = []
# Initialize vitalDSP RespiratoryAnalysis
if RespiratoryAnalysis is None:
logger.warning(
"RespiratoryAnalysis module not available - using fallback analysis"
)
results.append(
html.Div(
[
html.H5("⚠️ Using Fallback Analysis", className="text-warning"),
html.P(
"vitalDSP modules not available. Using basic signal processing methods.",
className="text-muted",
),
]
)
)
# Fallback: Basic respiratory rate estimation using peak detection
try:
# Simple peak detection for breathing
prominence = 0.3 * np.std(signal_data)
distance = int(0.5 * sampling_freq) # Minimum 0.5s between breaths
peaks, _ = signal.find_peaks(
signal_data, prominence=prominence, distance=distance
)
if len(peaks) > 1:
breath_intervals = np.diff(peaks) / sampling_freq
rr_bpm = (
60 / np.mean(breath_intervals)
if len(breath_intervals) > 0
else 0
)
rr_std = (
60 * np.std(breath_intervals) / (np.mean(breath_intervals) ** 2)
if len(breath_intervals) > 0 and np.mean(breath_intervals) > 0
else 0
)
results.append(
html.Div(
[
html.Strong("Fallback Peak Detection: "),
html.Span(
f"{rr_bpm:.2f} BPM", className="text-success"
),
],
className="mb-2",
)
)
results.append(
html.Div(
[
html.Strong("Breathing Variability: "),
html.Span(f"{rr_std:.2f} BPM", className="text-info"),
],
className="mb-2",
)
)
results.append(
html.Div(
[
html.Strong("Number of Breaths: "),
html.Span(f"{len(peaks)}", className="text-info"),
],
className="mb-2",
)
)
else:
results.append(
html.Div(
[
html.Strong("Fallback Analysis: "),
html.Span(
"Insufficient peaks detected",
className="text-warning",
),
],
className="mb-2",
)
)
except Exception as e:
logger.error(f"Fallback analysis failed: {e}")
results.append(
html.Div(
[
html.Strong("Fallback Analysis: "),
html.Span("Failed", className="text-danger"),
],
className="mb-2",
)
)
# Continue with basic statistics
else:
try:
logger.info("Initializing vitalDSP RespiratoryAnalysis...")
# signal_data already contains the extracted/filtered respiratory signal (passed from caller)
resp_analysis = RespiratoryAnalysis(signal_data, sampling_freq)
logger.info("RespiratoryAnalysis initialized with respiratory signal")
except Exception as e:
logger.error(f"Failed to initialize RespiratoryAnalysis: {e}")
logger.info("Falling back to basic analysis")
resp_analysis = None
results.append(html.H5("⚠️ Fallback Analysis", className="text-warning"))
results.append(
html.P(
"vitalDSP initialization failed. Using basic signal analysis.",
className="text-muted",
)
)
# REMOVED: Preprocessing configuration - preprocessing done on filtering page
# Signal data received here is already filtered from store-filtered-signal
# VitalDSP algorithms will work on the pre-filtered signal
preprocess_config = None
logger.info(
"Using pre-filtered signal from filtering page (no additional preprocessing)"
)
# Respiratory Rate Estimation using multiple methods
logger.info(f"Processing estimation methods: {estimation_methods}")
if estimation_methods and resp_analysis is not None:
logger.info("Adding respiratory rate estimation header...")
results.append(html.H5("🫁 Respiratory Rate Estimation", className="mb-3"))
for method in estimation_methods:
logger.info(f"Processing method: {method}")
try:
if method == "peak_detection":
logger.info(
"Computing respiratory rate using peak detection method..."
)
try:
rr = resp_analysis.compute_respiratory_rate(
method="peaks",
min_breath_duration=min_breath_duration,
max_breath_duration=max_breath_duration,
preprocess_config=preprocess_config,
)
logger.info(f"Peak detection method result: {rr:.2f} BPM")
results.append(
html.Div(
[
html.Strong("Peak Detection Method: "),
html.Span(
f"{rr:.2f} BPM", className="text-success"
),
],
className="mb-2",
)
)
except Exception as e:
logger.error(f"Peak detection method failed: {e}")
results.append(
html.Div(
[
html.Strong("Peak Detection Method: "),
html.Span("Failed", className="text-danger"),
],
className="mb-2",
)
)
elif method == "zero_crossing":
logger.info(
"Computing respiratory rate using zero crossing method..."
)
try:
rr = resp_analysis.compute_respiratory_rate(
method="zero_crossing",
min_breath_duration=min_breath_duration,
max_breath_duration=max_breath_duration,
preprocess_config=preprocess_config,
)
# Zero crossing counts both up/down crossings, so divide by 2 for complete breaths
logger.info(f"Zero crossing method result: {rr:.2f} BPM")
results.append(
html.Div(
[
html.Strong("Zero Crossing Method: "),
html.Span(
f"{rr:.2f} BPM", className="text-success"
),
],
className="mb-2",
)
)
except Exception as e:
logger.error(f"Zero crossing method failed: {e}")
results.append(
html.Div(
[
html.Strong("Zero Crossing Method: "),
html.Span("Failed", className="text-danger"),
],
className="mb-2",
)
)
elif method == "time_domain":
logger.info(
"Computing respiratory rate using time domain method..."
)
try:
rr = resp_analysis.compute_respiratory_rate(
method="time_domain",
preprocess_config=preprocess_config,
)
logger.info(f"Time domain method result: {rr:.2f} BPM")
results.append(
html.Div(
[
html.Strong("Time Domain Method: "),
html.Span(
f"{rr:.2f} BPM", className="text-success"
),
],
className="mb-2",
)
)
except Exception as e:
logger.error(f"Time domain method failed: {e}")
results.append(
html.Div(
[
html.Strong("Time Domain Method: "),
html.Span("Failed", className="text-danger"),
],
className="mb-2",
)
)
elif method == "frequency_domain":
logger.info(
"Computing respiratory rate using frequency domain method..."
)
try:
rr = resp_analysis.compute_respiratory_rate(
method="frequency_domain",
preprocess_config=preprocess_config,
)
logger.info(f"Frequency domain method result: {rr:.2f} BPM")
results.append(
html.Div(
[
html.Strong("Frequency Domain Method: "),
html.Span(
f"{rr:.2f} BPM", className="text-success"
),
],
className="mb-2",
)
)
except Exception as e:
logger.error(f"Frequency domain method failed: {e}")
results.append(
html.Div(
[
html.Strong("Frequency Domain Method: "),
html.Span("Failed", className="text-danger"),
],
className="mb-2",
)
)
elif method == "fft_based":
logger.info(
"Computing respiratory rate using FFT-based method..."
)
try:
rr = resp_analysis.compute_respiratory_rate(
method="fft_based",
preprocess_config=preprocess_config,
)
logger.info(f"FFT-based method result: {rr:.2f} BPM")
results.append(
html.Div(
[
html.Strong("FFT-based Method: "),
html.Span(
f"{rr:.2f} BPM", className="text-success"
),
],
className="mb-2",
)
)
except Exception as e:
logger.error(f"FFT-based method failed: {e}")
results.append(
html.Div(
[
html.Strong("FFT-based Method: "),
html.Span("Failed", className="text-danger"),
],
className="mb-2",
)
)
elif method == "counting":
logger.info(
"Computing respiratory rate using counting method..."
)
try:
rr = resp_analysis.compute_respiratory_rate(
method="counting",
preprocess_config=preprocess_config,
)
logger.info(f"Counting method result: {rr:.2f} BPM")
results.append(
html.Div(
[
html.Strong("Counting Method: "),
html.Span(
f"{rr:.2f} BPM", className="text-success"
),
],
className="mb-2",
)
)
except Exception as e:
logger.error(f"Counting method failed: {e}")
results.append(
html.Div(
[
html.Strong("Counting Method: "),
html.Span("Failed", className="text-danger"),
],
className="mb-2",
)
)
elif method == "ensemble":
logger.info(
"Computing ensemble respiratory rate using vitalDSP ensemble method..."
)
logger.info(f"Ensemble method parameter: {ensemble_method}")
try:
if RespiratoryAnalysis is not None:
# Use vitalDSP's built-in ensemble method
ensemble_result = (
resp_analysis.compute_respiratory_rate_ensemble(
preprocess_config=preprocess_config,
methods=[
"counting",
"fft_based",
"frequency_domain",
"time_domain",
],
)
)
# Extract results from vitalDSP ensemble
rr_estimate = ensemble_result.get("respiratory_rate", 0)
confidence = ensemble_result.get("confidence", 0)
quality = ensemble_result.get("quality", "unknown")
std_dev = ensemble_result.get("std", 0)
n_methods = ensemble_result.get("n_methods", 0)
individual_estimates = ensemble_result.get(
"individual_estimates", {}
)
# Determine confidence color and description
if confidence >= 0.8:
confidence_desc = "High Confidence"
confidence_color = "text-success"
elif confidence >= 0.5:
confidence_desc = "Medium Confidence"
confidence_color = "text-warning"
else:
confidence_desc = "Low Confidence"
confidence_color = "text-danger"
# Determine quality color
if quality == "high":
quality_color = "text-success"
elif quality == "medium":
quality_color = "text-warning"
else:
quality_color = "text-danger"
# Create individual method results display
individual_results = []
for (
method_name,
estimate,
) in individual_estimates.items():
if estimate is not None:
individual_results.append(
f"{method_name}: {estimate:.1f} BPM"
)
else:
individual_results.append(
f"{method_name}: Failed"
)
results.append(
html.Div(
[
html.Strong(
"🎯 Ensemble Respiratory Rate: "
),
html.Span(
f"{rr_estimate:.2f} ± {std_dev:.2f} BPM",
className="text-success",
),
html.Br(),
html.Small(
f"Confidence: {confidence:.2f} ({confidence_desc})",
className=confidence_color,
),
html.Br(),
html.Small(
f"Quality: {quality.title()}",
className=quality_color,
),
html.Br(),
html.Small(
f"Methods used: {n_methods}",
className="text-muted",
),
html.Br(),
html.Small(
f"Individual estimates: {', '.join(individual_results)}",
className="text-muted",
),
],
className="mb-3 p-3 border border-success rounded",
)
)
logger.info(
f"vitalDSP Ensemble result: {rr_estimate:.2f} ± {std_dev:.2f} BPM "
f"(confidence: {confidence:.2f}, quality: {quality})"
)
else:
results.append(
html.Div(
[
html.Strong("Ensemble Method: "),
html.Span(
"RespiratoryAnalysis not available",
className="text-warning",
),
],
className="mb-2",
)
)
except Exception as e:
logger.error(f"Ensemble method failed: {e}")
results.append(
html.Div(
[
html.Strong("Ensemble Method: "),
html.Span("Failed", className="text-danger"),
],
className="mb-2",
)
)
except Exception as e:
logger.warning(f"Method {method} failed: {e}")
results.append(
html.Div(
[
html.Strong(
f"{method.replace('_', ' ').title()} Method: "
),
html.Span("Failed", className="text-danger"),
],
className="mb-2",
)
)
elif estimation_methods and resp_analysis is None:
logger.warning(
"Estimation methods requested but RespiratoryAnalysis not available"
)
results.append(
html.H5("⚠️ Respiratory Rate Estimation", className="text-warning")
)
results.append(
html.P(
"vitalDSP RespiratoryAnalysis not available. Cannot perform advanced estimation methods.",
className="text-muted",
)
)
# Method Agreement Analysis (if multiple methods were used)
if estimation_methods and len(estimation_methods) > 1:
logger.info("Adding method agreement analysis...")
results.append(html.Hr())
results.append(
html.H5("📊 METHOD AGREEMENT ANALYSIS", className="text-primary mb-3")
)
try:
# Collect results from different estimation methods
method_results = []
# Add basic peak detection method
try:
prominence = 0.3 * np.std(signal_data)
distance = int(0.5 * sampling_freq)
peaks, _ = signal.find_peaks(
signal_data, prominence=prominence, distance=distance
)
if len(peaks) > 1:
breath_intervals = np.diff(peaks) / sampling_freq
rr_peak = (
60.0 / np.mean(breath_intervals)
if len(breath_intervals) > 0
else 0
)
method_results.append(("Peak Detection", rr_peak, len(peaks)))
except Exception as e:
logger.warning(f"Peak detection method failed: {e}")
# Add FFT-based method
try:
# Simple FFT-based respiratory rate estimation
fft_signal = np.fft.fft(signal_data)
freqs = np.fft.fftfreq(len(signal_data), 1 / sampling_freq)
# Focus on respiratory frequency range (0.1-0.5 Hz = 6-30 BPM)
resp_mask = (freqs > 0.1) & (freqs < 0.5)
resp_freqs = freqs[resp_mask]
resp_fft = np.abs(fft_signal[resp_mask])
if len(resp_freqs) > 0:
dominant_freq_idx = np.argmax(resp_fft)
dominant_freq = resp_freqs[dominant_freq_idx]
rr_fft = dominant_freq * 60 # Convert to BPM
method_results.append(("FFT Analysis", rr_fft, 1))
except Exception as e:
logger.warning(f"FFT method failed: {e}")
# Add autocorrelation method
try:
# Simple autocorrelation-based method
autocorr = np.correlate(signal_data, signal_data, mode="full")
autocorr = autocorr[len(autocorr) // 2 :]
# Find peaks in autocorrelation (excluding first peak)
peaks_auto, _ = signal.find_peaks(autocorr[100:], distance=50)
if len(peaks_auto) > 0:
# Convert peak distance to frequency
peak_distance = peaks_auto[0] / sampling_freq
rr_auto = 60.0 / peak_distance
method_results.append(
("Autocorrelation", rr_auto, len(peaks_auto))
)
except Exception as e:
logger.warning(f"Autocorrelation method failed: {e}")
if len(method_results) > 1:
# Calculate agreement metrics
rr_values = [result[1] for result in method_results]
mean_rr = np.mean(rr_values) if len(rr_values) > 0 else 0
std_rr = np.std(rr_values) if len(rr_values) > 0 else 0
cv_rr = std_rr / mean_rr if mean_rr > 0 else 0
# Calculate pairwise differences
differences = []
for i in range(len(rr_values)):
for j in range(i + 1, len(rr_values)):
diff = abs(rr_values[i] - rr_values[j])
differences.append(diff)
mean_diff = np.mean(differences) if differences else 0
max_diff = max(differences) if differences else 0
# Agreement assessment
if cv_rr < 0.1:
agreement_level = "Excellent"
agreement_color = "text-success"
elif cv_rr < 0.2:
agreement_level = "Good"
agreement_color = "text-info"
elif cv_rr < 0.3:
agreement_level = "Moderate"
agreement_color = "text-warning"
else:
agreement_level = "Poor"
agreement_color = "text-danger"
# Create comprehensive method agreement analysis
results.append(
html.Div(
[
# Method Results Summary
html.Div(
[
html.H6(
"📊 METHOD RESULTS",
className="text-dark mb-2",
),
html.P(
f"Methods Used: {len(method_results)}",
className="mb-1",
),
html.P(
f"Mean RR: {mean_rr:.1f} BPM",
className="mb-1",
),
html.P(
f"Std Dev: {std_rr:.1f} BPM",
className="mb-1",
),
html.P(
f"Coefficient of Variation: {cv_rr:.3f}",
className="mb-1",
),
],
className="p-2 border border-primary rounded mb-2",
),
# Individual Method Results
html.Div(
[
html.H6(
"🔬 INDIVIDUAL METHODS",
className="text-dark mb-2",
),
*[
html.P(
f"{method[0]}: {method[1]:.1f} BPM ({method[2]} samples)",
className="mb-1",
)
for method in method_results
],
],
className="p-2 border border-info rounded mb-2",
),
# Agreement Assessment
html.Div(
[
html.H6(
"🎯 AGREEMENT ASSESSMENT",
className="text-dark mb-2",
),
html.P(
f"Agreement Level: {agreement_level}",
className=f"mb-1 {agreement_color}",
),
html.P(
f"Mean Difference: {mean_diff:.1f} BPM",
className="mb-1",
),
html.P(
f"Max Difference: {max_diff:.1f} BPM",
className="mb-1",
),
html.P(f"CV: {cv_rr:.3f}", className="mb-1"),
],
className="p-2 border border-success rounded mb-2",
),
# Clinical Interpretation
html.Div(
[
html.H6(
"🏥 CLINICAL INTERPRETATION",
className="text-dark mb-2",
),
html.P(
f"Reliability: {'High' if cv_rr < 0.2 else 'Moderate' if cv_rr < 0.3 else 'Low'}",
className=(
"text-success"
if cv_rr < 0.2
else (
"text-warning"
if cv_rr < 0.3
else "text-danger"
)
),
),
html.P(
f"Consistency: {'Consistent' if mean_diff < 2 else 'Variable' if mean_diff < 5 else 'Inconsistent'}",
className=(
"text-success"
if mean_diff < 2
else (
"text-warning"
if mean_diff < 5
else "text-danger"
)
),
),
html.P(
f"Recommendation: {'Use any method' if cv_rr < 0.15 else 'Use average' if cv_rr < 0.25 else 'Verify manually'}",
className=(
"text-success"
if cv_rr < 0.25
else (
"text-info"
if cv_rr < 0.25
else "text-warning"
)
),
),
],
className="p-2 border border-warning rounded",
),
],
className="mb-3",
)
)
logger.info(
f"Method agreement analysis completed: {len(method_results)} methods, CV: {cv_rr:.3f}"
)
else:
results.append(
html.Div(
[
html.H6(
"⚠️ INSUFFICIENT METHODS",
className="text-warning mb-2",
),
html.P(
f"Only {len(method_results)} method(s) available. Need at least 2 methods for agreement analysis."
),
html.P(
"Try different signal processing approaches or check signal quality."
),
],
className="p-3 border border-warning rounded bg-light",
)
)
except Exception as e:
logger.error(f"Method agreement analysis failed: {e}")
results.append(
html.Div(
[
html.H6("❌ ANALYSIS FAILED", className="text-danger mb-2"),
html.P(f"Error: {str(e)}"),
html.P("Please check signal quality and try again."),
],
className="p-3 border border-danger rounded bg-light",
)
)
# Advanced Analysis Features
logger.info(f"Processing advanced options: {advanced_options}")
if advanced_options:
logger.info("Adding advanced analysis section...")
results.append(html.Hr())
results.append(
html.H5("🔬 ADVANCED ANALYSIS", className="text-primary mb-2")
)
# Debug: Show what options are selected
results.append(
html.Div(
[
html.H6("📋 SELECTED OPTIONS", className="text-dark mb-2"),
html.P(
f"Selected: {', '.join(advanced_options)}", className="mb-1"
),
html.P(f"Signal Type: {signal_type}", className="mb-1"),
html.P(
f"Signal Length: {len(signal_data)} samples",
className="mb-1",
),
html.P(
f"Duration: {len(signal_data)/sampling_freq:.1f}s",
className="mb-1",
),
],
className="p-2 border border-info rounded mb-2 bg-light",
)
)
# Create inline grid layout for analysis blocks
analysis_blocks = []
# Sleep Apnea Detection
if "sleep_apnea" in advanced_options:
logger.info("Processing sleep apnea detection...")
try:
# Detect apnea events using amplitude threshold
apnea_threshold = 0.3 * np.std(signal_data)
logger.info(f"Apnea threshold: {apnea_threshold:.3f}")
if detect_apnea_amplitude is None:
logger.warning("detect_apnea_amplitude function not available")
apnea_events = []
else:
apnea_events = detect_apnea_amplitude(
signal_data,
sampling_freq,
threshold=apnea_threshold,
min_duration=5,
)
# Also detect apnea events using pause detection
if detect_apnea_pauses is None:
logger.warning("detect_apnea_pauses function not available")
pause_apnea_events = []
else:
pause_apnea_events = detect_apnea_pauses(
signal_data, sampling_freq, min_pause_duration=5
)
total_apnea_events = len(apnea_events) + len(pause_apnea_events)
all_events = apnea_events + pause_apnea_events
if total_apnea_events > 0:
# Create compact sleep apnea card
apnea_card = html.Div(
[
html.H6(
"😴 SLEEP APNEA", className="text-warning mb-1"
),
html.P(
f"Events: {total_apnea_events}",
className="mb-1 fw-bold",
),
html.P(
f"Amplitude: {len(apnea_events)}",
className="mb-1 small",
),
html.P(
f"Pause: {len(pause_apnea_events)}",
className="mb-1 small",
),
html.P(
(
f"First: {all_events[0][0]:.1f}s"
if all_events
else "No events"
),
className="mb-0 small text-muted",
),
],
className="p-2 border border-warning rounded bg-light h-100",
)
analysis_blocks.append(apnea_card)
else:
# Create no events card
no_apnea_card = html.Div(
[
html.H6(
"😴 SLEEP APNEA", className="text-success mb-1"
),
html.P(
"No events detected",
className="mb-0 fw-bold text-success",
),
],
className="p-2 border border-success rounded bg-light h-100",
)
analysis_blocks.append(no_apnea_card)
except Exception as e:
logger.warning(f"Sleep apnea detection failed: {e}")
# Create error card
error_card = html.Div(
[
html.H6("😴 SLEEP APNEA", className="text-danger mb-1"),
html.P(
"Analysis failed", className="mb-0 fw-bold text-danger"
),
],
className="p-2 border border-danger rounded bg-light h-100",
)
analysis_blocks.append(error_card)
# Breathing Pattern Analysis
if "breathing_pattern" in advanced_options:
logger.info("Processing breathing pattern analysis...")
try:
# Enhanced breathing pattern analysis with comprehensive insights
# Use user-specified parameters for consistency with respiratory rate estimation
prominence = 0.3 * np.std(signal_data)
# Use min_breath_duration for distance calculation (convert to samples)
min_distance_samples = int(
(min_breath_duration or 0.1) * sampling_freq
)
max_distance_samples = int(
(max_breath_duration or 6.0) * sampling_freq
)
# REMOVED: Preprocessing logic - signal data is already filtered from filtering page
peaks, properties = signal.find_peaks(
signal_data,
prominence=prominence,
distance=min_distance_samples,
)
if len(peaks) > 1:
breath_intervals = np.diff(peaks) / sampling_freq
variability = np.std(breath_intervals)
mean_interval = np.mean(breath_intervals)
# Calculate respiratory rate metrics
rr_bpm = 60.0 / mean_interval
# Log the parameters used for debugging
logger.info("Breathing pattern analysis parameters:")
logger.info(
f" - Min breath duration: {min_breath_duration or 0.1}s"
)
logger.info(
f" - Max breath duration: {max_breath_duration or 6.0}s"
)
# REMOVED: low_cut, high_cut, preprocessing_options logging
logger.info(f" - Detected peaks: {len(peaks)}")
logger.info(f" - Calculated RR: {rr_bpm:.2f} BPM")
rr_std = 60.0 * variability / (mean_interval**2)
rr_cv = variability / mean_interval # Coefficient of variation
# Breathing pattern classification
if rr_cv < 0.1:
pattern_type = "Regular"
pattern_color = "text-success"
elif rr_cv < 0.2:
pattern_type = "Slightly Irregular"
pattern_color = "text-info"
elif rr_cv < 0.3:
pattern_type = "Moderately Irregular"
pattern_color = "text-warning"
else:
pattern_type = "Highly Irregular"
pattern_color = "text-danger"
# Detect breathing irregularities
irregular_breaths = np.sum(
np.abs(breath_intervals - mean_interval) > 2 * variability
)
irregularity_percentage = (
irregular_breaths / len(breath_intervals)
) * 100
# Breathing rate stability
if irregularity_percentage < 10:
stability = "High"
stability_color = "text-success"
elif irregularity_percentage < 25:
stability = "Moderate"
stability_color = "text-info"
elif irregularity_percentage < 50:
stability = "Low"
stability_color = "text-warning"
else:
stability = "Very Low"
stability_color = "text-danger"
# Create compact breathing pattern card
breathing_card = html.Div(
[
html.H6(
"🫁 BREATHING PATTERN",
className="text-primary mb-1",
),
html.P(
f"Breaths: {len(peaks)}", className="mb-1 fw-bold"
),
html.P(f"RR: {rr_bpm:.1f} BPM", className="mb-1"),
html.P(
f"Pattern: {pattern_type}",
className=f"mb-1 {pattern_color}",
),
html.P(
f"Stability: {stability}",
className=f"mb-1 {stability_color}",
),
html.P(
f"CV: {rr_cv:.3f}",
className="mb-0 small text-muted",
),
],
className="p-2 border border-primary rounded bg-light h-100",
)
analysis_blocks.append(breathing_card)
logger.info(
f"Breathing pattern analysis completed: {len(peaks)} breaths, RR: {rr_bpm:.1f} BPM, CV: {rr_cv:.3f}"
)
else:
# Create insufficient data card
insufficient_card = html.Div(
[
html.H6(
"🫁 BREATHING PATTERN",
className="text-warning mb-1",
),
html.P(
f"Only {len(peaks)} breath(s)",
className="mb-1 fw-bold text-warning",
),
html.P(
"Need at least 2 breaths",
className="mb-0 small text-muted",
),
],
className="p-2 border border-warning rounded bg-light h-100",
)
analysis_blocks.append(insufficient_card)
except Exception as e:
logger.error(f"Breathing pattern analysis failed: {e}")
# Create error card
error_card = html.Div(
[
html.H6(
"🫁 BREATHING PATTERN", className="text-danger mb-1"
),
html.P(
"Analysis failed", className="mb-1 fw-bold text-danger"
),
html.P(
f"Error: {str(e)[:30]}...",
className="mb-0 small text-muted",
),
],
className="p-2 border border-danger rounded bg-light h-100",
)
analysis_blocks.append(error_card)
# Respiratory Variability
if "respiratory_variability" in advanced_options:
logger.info("Processing respiratory variability analysis...")
try:
# Enhanced respiratory variability analysis
if signal_type == "PPG":
if PPGAutonomicFeatures is None:
logger.warning("PPGAutonomicFeatures not available")
# Fallback to basic RRV calculation
try:
# Calculate basic RRV from breathing intervals
prominence = 0.3 * np.std(signal_data)
distance = int(0.5 * sampling_freq)
peaks, _ = signal.find_peaks(
signal_data,
prominence=prominence,
distance=distance,
)
if len(peaks) > 1:
breath_intervals = np.diff(peaks) / sampling_freq
rr_bpm = 60.0 / np.mean(breath_intervals)
rrv = np.std(breath_intervals)
rrv_norm = rrv / np.mean(
breath_intervals
) # Normalized RRV
results.append(
html.Div(
[
html.H5(
"📊 RESPIRATORY RATE VARIABILITY",
className="text-primary mb-3",
),
html.Div(
[
html.H6(
"🫁 BASIC RRV METRICS",
className="text-dark mb-2",
),
html.P(
f"RRV (Std Dev): {rrv:.3f}s",
className="mb-1",
),
html.P(
f"Normalized RRV: {rrv_norm:.3f}",
className="mb-1",
),
html.P(
f"Mean RR: {rr_bpm:.1f} BPM",
className="mb-1",
),
html.P(
f"Breaths: {len(peaks)}",
className="mb-1",
),
html.P(
"Method: Fallback (Peak Detection)",
className="text-info",
),
],
className="p-3 border border-info rounded bg-light",
),
],
className="mb-4",
)
)
else:
results.append(
html.Div(
[
html.H5(
"📊 RESPIRATORY RATE VARIABILITY",
className="text-warning mb-3",
),
html.Div(
[
html.H6(
"⚠️ INSUFFICIENT DATA",
className="text-warning",
),
html.P(
"Need at least 2 breaths for RRV analysis.",
className="mb-1",
),
],
className="p-3 border border-warning rounded bg-light",
),
],
className="mb-4",
)
)
except Exception as fallback_error:
logger.error(
f"Fallback RRV calculation failed: {fallback_error}"
)
results.append(
html.Div(
[
html.H5(
"📊 RESPIRATORY RATE VARIABILITY",
className="text-danger mb-3",
),
html.Div(
[
html.H6(
"❌ ANALYSIS FAILED",
className="text-danger",
),
html.P(
f"Error: {str(fallback_error)}",
className="mb-1",
),
],
className="p-3 border border-danger rounded bg-light",
),
],
className="mb-4",
)
)
else:
# Use PPG respiratory autonomic features
try:
ppg_features = PPGAutonomicFeatures(
signal_data, sampling_freq
)
rrv = ppg_features.compute_rrv()
rsa = ppg_features.compute_rsa()
# Create compact PPG RRV card
ppg_rrv_card = html.Div(
[
html.H6(
"📊 PPG RRV", className="text-success mb-1"
),
html.P(
f"RRV: {rrv:.4f}s", className="mb-1 fw-bold"
),
html.P(f"RSA: {rsa:.4f}s", className="mb-1"),
html.P(
"vitalDSP PPG",
className="mb-0 small text-success",
),
],
className="p-2 border border-success rounded bg-light h-100",
)
analysis_blocks.append(ppg_rrv_card)
except Exception as ppg_error:
logger.error(f"PPG RRV analysis failed: {ppg_error}")
results.append(
html.Div(
[
html.H5(
"📊 RESPIRATORY RATE VARIABILITY",
className="text-danger mb-3",
),
html.Div(
[
html.H6(
"❌ PPG ANALYSIS FAILED",
className="text-danger",
),
html.P(
f"Error: {str(ppg_error)}",
className="mb-1",
),
html.P(
"Falling back to basic RRV calculation...",
className="text-info",
),
],
className="p-3 border border-danger rounded bg-light",
),
],
className="mb-4",
)
)
else:
# For non-PPG signals, use basic RRV calculation
try:
prominence = 0.3 * np.std(signal_data)
distance = int(0.5 * sampling_freq)
peaks, _ = signal.find_peaks(
signal_data, prominence=prominence, distance=distance
)
if len(peaks) > 1:
breath_intervals = np.diff(peaks) / sampling_freq
rr_bpm = 60.0 / np.mean(breath_intervals)
rrv = np.std(breath_intervals)
rrv_norm = rrv / np.mean(breath_intervals)
# Create compact basic RRV card
basic_rrv_card = html.Div(
[
html.H6(
"📊 BASIC RRV", className="text-info mb-1"
),
html.P(
f"RRV: {rrv:.3f}s", className="mb-1 fw-bold"
),
html.P(
f"Norm RRV: {rrv_norm:.3f}",
className="mb-1",
),
html.P(
f"RR: {rr_bpm:.1f} BPM", className="mb-1"
),
html.P(
f"Breaths: {len(peaks)}",
className="mb-0 small text-muted",
),
],
className="p-2 border border-info rounded bg-light h-100",
)
analysis_blocks.append(basic_rrv_card)
else:
# Create insufficient data card
insufficient_rrv_card = html.Div(
[
html.H6(
"📊 BASIC RRV",
className="text-warning mb-1",
),
html.P(
"Insufficient data",
className="mb-1 fw-bold text-warning",
),
html.P(
"Need at least 2 breaths",
className="mb-0 small text-muted",
),
],
className="p-2 border border-warning rounded bg-light h-100",
)
analysis_blocks.append(insufficient_rrv_card)
except Exception as basic_error:
logger.error(f"Basic RRV calculation failed: {basic_error}")
# Create error card
error_rrv_card = html.Div(
[
html.H6(
"📊 BASIC RRV", className="text-danger mb-1"
),
html.P(
"Analysis failed",
className="mb-1 fw-bold text-danger",
),
html.P(
f"Error: {str(basic_error)[:30]}...",
className="mb-0 small text-muted",
),
],
className="p-2 border border-danger rounded bg-light h-100",
)
analysis_blocks.append(error_rrv_card)
except Exception as e:
logger.error(f"Respiratory variability analysis failed: {e}")
# Create error card
error_rrv_main_card = html.Div(
[
html.H6("📊 RRV ANALYSIS", className="text-danger mb-1"),
html.P(
"Analysis failed", className="mb-1 fw-bold text-danger"
),
html.P(
f"Error: {str(e)[:30]}...",
className="mb-0 small text-muted",
),
],
className="p-2 border border-danger rounded bg-light h-100",
)
analysis_blocks.append(error_rrv_main_card)
# PPG-ECG Fusion Analysis
if "ppg_ecg_fusion" in advanced_options:
logger.info("Processing PPG-ECG fusion analysis...")
try:
if ppg_ecg_fusion is None:
logger.warning("ppg_ecg_fusion function not available")
# Fallback to basic fusion analysis
try:
# Calculate respiratory rate from PPG signal using multiple methods
prominence = 0.3 * np.std(signal_data)
distance = int(0.5 * sampling_freq)
peaks, _ = signal.find_peaks(
signal_data, prominence=prominence, distance=distance
)
if len(peaks) > 1:
breath_intervals = np.diff(peaks) / sampling_freq
rr_ppg = 60.0 / np.mean(breath_intervals)
# Calculate frequency domain estimate
fft_result = np.abs(np.fft.rfft(signal_data))
freqs = np.fft.rfftfreq(
len(signal_data), 1 / sampling_freq
)
resp_mask = (freqs >= 0.1) & (freqs <= 0.5)
if np.any(resp_mask):
peak_freq_idx = np.argmax(fft_result[resp_mask])
peak_freq = freqs[resp_mask][peak_freq_idx]
rr_freq = peak_freq * 60
else:
rr_freq = rr_ppg
# Fusion result (simple average)
fusion_rr = np.mean([rr_ppg, rr_freq])
fusion_std = np.std([rr_ppg, rr_freq])
results.append(
html.Div(
[
html.H5(
"🔗 PPG-ECG FUSION",
className="text-primary mb-3",
),
html.Div(
[
html.H6(
"🫁 FUSION RESULTS",
className="text-dark mb-2",
),
html.P(
f"Fusion RR: {fusion_rr:.1f} ± {fusion_std:.1f} BPM",
className="mb-1",
),
html.P(
f"PPG RR: {rr_ppg:.1f} BPM",
className="mb-1",
),
html.P(
f"Frequency RR: {rr_freq:.1f} BPM",
className="mb-1",
),
html.P(
"Method: Fallback (Peak + FFT)",
className="text-info",
),
],
className="p-3 border border-info rounded bg-light",
),
],
className="mb-4",
)
)
else:
results.append(
html.Div(
[
html.H6(
"🔗 PPG-ECG Fusion Analysis",
className="mb-3",
),
html.Div(
[
html.Strong(
"⚠️ Insufficient Data:",
className="text-warning",
),
html.Br(),
html.Small(
"Need at least 2 breaths for fusion analysis.",
className="text-muted",
),
],
className="mb-3 p-3 border border-warning rounded bg-light",
),
],
className="mb-4",
)
)
except Exception as fallback_error:
logger.error(
f"Fallback PPG-ECG fusion failed: {fallback_error}"
)
results.append(
html.Div(
[
html.H6(
"🔗 PPG-ECG Fusion Analysis",
className="mb-3",
),
html.Div(
[
html.Strong(
"❌ Analysis Failed:",
className="text-danger",
),
html.Br(),
html.Small(
f"Error: {str(fallback_error)}",
className="text-muted",
),
],
className="mb-3 p-3 border border-danger rounded bg-light",
),
],
className="mb-4",
)
)
else:
# Use vitalDSP PPG-ECG fusion
try:
# For demonstration, use the same signal as both PPG and ECG
# In real applications, you would have separate signals
# REMOVED: preprocess, lowcut, highcut - signal already filtered from filtering page
fusion_rr = ppg_ecg_fusion(
signal_data,
signal_data,
sampling_freq,
)
results.append(
html.Div(
[
html.H5(
"🔗 PPG-ECG FUSION",
className="text-success mb-3",
),
html.Div(
[
html.H6(
"🫁 VITALDSP RESULT",
className="text-dark mb-2",
),
html.P(
f"Fusion RR: {fusion_rr:.1f} BPM",
className="mb-1",
),
html.P(
"Method: vitalDSP PPG-ECG Fusion",
className="text-success",
),
],
className="p-3 border border-success rounded bg-light",
),
],
className="mb-4",
)
)
except Exception as fusion_error:
logger.error(
f"vitalDSP PPG-ECG fusion failed: {fusion_error}"
)
results.append(
html.Div(
[
html.H6(
"🔗 PPG-ECG Fusion Analysis",
className="mb-3",
),
html.Div(
[
html.Strong(
"❌ vitalDSP Fusion Failed:",
className="text-danger",
),
html.Br(),
html.Small(
f"Error: {str(fusion_error)}",
className="text-muted",
),
html.Br(),
html.Small(
"Falling back to basic fusion...",
className="text-info",
),
],
className="mb-3 p-3 border border-danger rounded bg-light",
),
],
className="mb-4",
)
)
except Exception as e:
logger.error(f"PPG-ECG fusion analysis failed: {e}")
results.append(
html.Div(
[
html.H6("🔗 PPG-ECG Fusion Analysis", className="mb-3"),
html.Div(
[
html.Strong(
"❌ Analysis Failed:",
className="text-danger",
),
html.Br(),
html.Small(
f"Error: {str(e)}", className="text-muted"
),
],
className="mb-3 p-3 border border-danger rounded bg-light",
),
],
className="mb-4",
)
)
# Respiratory-Cardiac Fusion Analysis
if "resp_cardiac_fusion" in advanced_options:
logger.info("Processing respiratory-cardiac fusion analysis...")
try:
if respiratory_cardiac_fusion is None:
logger.warning(
"respiratory_cardiac_fusion function not available"
)
# Fallback to basic fusion analysis
try:
# Calculate respiratory rate from signal using multiple methods
prominence = 0.3 * np.std(signal_data)
distance = int(0.5 * sampling_freq)
peaks, _ = signal.find_peaks(
signal_data, prominence=prominence, distance=distance
)
if len(peaks) > 1:
breath_intervals = np.diff(peaks) / sampling_freq
rr_resp = 60.0 / np.mean(breath_intervals)
# Calculate frequency domain estimate
fft_result = np.abs(np.fft.rfft(signal_data))
freqs = np.fft.rfftfreq(
len(signal_data), 1 / sampling_freq
)
resp_mask = (freqs >= 0.1) & (freqs <= 0.5)
if np.any(resp_mask):
peak_freq_idx = np.argmax(fft_result[resp_mask])
peak_freq = freqs[resp_mask][peak_freq_idx]
rr_cardiac = peak_freq * 60
else:
rr_cardiac = rr_resp
# Fusion result (simple average)
fusion_rr = np.mean([rr_resp, rr_cardiac])
fusion_std = np.std([rr_resp, rr_cardiac])
results.append(
html.Div(
[
html.H6(
"🫀 Respiratory-Cardiac Fusion Analysis",
className="mb-3",
),
html.Div(
[
html.Strong(
"🫁 Fusion Respiratory Rate:",
className="text-primary",
),
html.Br(),
html.Small(
f"• Fusion RR: {fusion_rr:.1f} ± {fusion_std:.1f} BPM",
className="text-muted",
),
html.Br(),
html.Small(
f"• Respiratory-based RR: {rr_resp:.1f} BPM",
className="text-muted",
),
html.Br(),
html.Small(
f"• Cardiac-based RR: {rr_cardiac:.1f} BPM",
className="text-muted",
),
html.Br(),
html.Small(
"• Analysis Method: Fallback (Peak + FFT)",
className="text-info",
),
],
className="mb-3 p-3 border border-info rounded bg-light",
),
],
className="mb-4",
)
)
else:
results.append(
html.Div(
[
html.H6(
"🫀 Respiratory-Cardiac Fusion Analysis",
className="mb-3",
),
html.Div(
[
html.Strong(
"⚠️ Insufficient Data:",
className="text-warning",
),
html.Br(),
html.Small(
"Need at least 2 breaths for fusion analysis.",
className="text-muted",
),
],
className="mb-3 p-3 border border-warning rounded bg-light",
),
],
className="mb-4",
)
)
except Exception as fallback_error:
logger.error(
f"Fallback respiratory-cardiac fusion failed: {fallback_error}"
)
results.append(
html.Div(
[
html.H6(
"🫀 Respiratory-Cardiac Fusion Analysis",
className="mb-3",
),
html.Div(
[
html.Strong(
"❌ Analysis Failed:",
className="text-danger",
),
html.Br(),
html.Small(
f"Error: {str(fallback_error)}",
className="text-muted",
),
],
className="mb-3 p-3 border border-danger rounded bg-light",
),
],
className="mb-4",
)
)
else:
# Use vitalDSP respiratory-cardiac fusion
try:
# For demonstration, use the same signal as both respiratory and cardiac
# In real applications, you would have separate signals
# REMOVED: preprocess, lowcut, highcut - signal already filtered from filtering page
fusion_rr = respiratory_cardiac_fusion(
signal_data,
signal_data,
sampling_freq,
)
results.append(
html.Div(
[
html.H6(
"🫀 Respiratory-Cardiac Fusion Analysis",
className="mb-3",
),
html.Div(
[
html.Strong(
"🫁 vitalDSP Fusion Result:",
className="text-primary",
),
html.Br(),
html.Small(
f"• Fusion Respiratory Rate: {fusion_rr:.1f} BPM",
className="text-muted",
),
html.Br(),
html.Small(
"• Analysis Method: vitalDSP Respiratory-Cardiac Fusion",
className="text-success",
),
],
className="mb-3 p-3 border border-success rounded bg-light",
),
],
className="mb-4",
)
)
except Exception as fusion_error:
logger.error(
f"vitalDSP respiratory-cardiac fusion failed: {fusion_error}"
)
results.append(
html.Div(
[
html.H6(
"🫀 Respiratory-Cardiac Fusion Analysis",
className="mb-3",
),
html.Div(
[
html.Strong(
"❌ vitalDSP Fusion Failed:",
className="text-danger",
),
html.Br(),
html.Small(
f"Error: {str(fusion_error)}",
className="text-muted",
),
html.Br(),
html.Small(
"Falling back to basic fusion...",
className="text-info",
),
],
className="mb-3 p-3 border border-danger rounded bg-light",
),
],
className="mb-4",
)
)
except Exception as e:
logger.error(f"Respiratory-cardiac fusion analysis failed: {e}")
results.append(
html.Div(
[
html.H6(
"🫀 Respiratory-Cardiac Fusion Analysis",
className="mb-3",
),
html.Div(
[
html.Strong(
"❌ Analysis Failed:",
className="text-danger",
),
html.Br(),
html.Small(
f"Error: {str(e)}", className="text-muted"
),
],
className="mb-3 p-3 border border-danger rounded bg-light",
),
],
className="mb-4",
)
)
# Multimodal Fusion
if "multimodal" in advanced_options:
logger.info("Processing multimodal fusion analysis...")
try:
if multimodal_analysis is None:
logger.warning("multimodal_analysis function not available")
# Fallback to basic multimodal analysis
try:
# Calculate respiratory rate using multiple methods
prominence = 0.3 * np.std(signal_data)
distance = int(0.5 * sampling_freq)
peaks, _ = signal.find_peaks(
signal_data, prominence=prominence, distance=distance
)
if len(peaks) > 1:
breath_intervals = np.diff(peaks) / sampling_freq
rr_peak = 60.0 / np.mean(breath_intervals)
# Calculate frequency domain estimate
fft_result = np.abs(np.fft.rfft(signal_data))
freqs = np.fft.rfftfreq(
len(signal_data), 1 / sampling_freq
)
resp_mask = (freqs >= 0.1) & (freqs <= 0.5)
if np.any(resp_mask):
peak_freq_idx = np.argmax(fft_result[resp_mask])
peak_freq = freqs[resp_mask][peak_freq_idx]
rr_freq = peak_freq * 60
else:
rr_freq = rr_peak
# Multimodal result (simple average)
multimodal_rr = np.mean([rr_peak, rr_freq])
multimodal_std = np.std([rr_peak, rr_freq])
results.append(
html.Div(
[
html.H5(
"🔗 MULTIMODAL FUSION",
className="text-primary mb-3",
),
html.Div(
[
html.H6(
"🫁 MULTIMODAL RESULTS",
className="text-dark mb-2",
),
html.P(
f"Multimodal RR: {multimodal_rr:.1f} ± {multimodal_std:.1f} BPM",
className="mb-1",
),
html.P(
f"Peak-based RR: {rr_peak:.1f} BPM",
className="mb-1",
),
html.P(
f"Frequency-based RR: {rr_freq:.1f} BPM",
className="mb-1",
),
html.P(
"Method: Fallback (Peak + FFT)",
className="text-info",
),
],
className="p-3 border border-info rounded bg-light",
),
],
className="mb-4",
)
)
else:
results.append(
html.Div(
[
html.H5(
"🔗 MULTIMODAL FUSION",
className="text-warning mb-3",
),
html.Div(
[
html.H6(
"⚠️ INSUFFICIENT DATA",
className="text-warning",
),
html.P(
"Need at least 2 breaths for multimodal analysis.",
className="mb-1",
),
],
className="p-3 border border-warning rounded bg-light",
),
],
className="mb-4",
)
)
except Exception as fallback_error:
logger.error(
f"Fallback multimodal fusion failed: {fallback_error}"
)
results.append(
html.Div(
[
html.H5(
"🔗 MULTIMODAL FUSION",
className="text-danger mb-3",
),
html.Div(
[
html.H6(
"❌ ANALYSIS FAILED",
className="text-danger",
),
html.P(
f"Error: {str(fallback_error)}",
className="mb-1",
),
],
className="p-3 border border-danger rounded bg-light",
),
],
className="mb-4",
)
)
else:
# Use vitalDSP multimodal analysis
try:
# For demonstration, use the same signal as multiple modalities
# In real applications, you would have different signal types
# REMOVED: preprocess, lowcut, highcut - signal already filtered from filtering page
multimodal_rr = multimodal_analysis(
[
signal_data,
signal_data,
], # Using same signal for demo
sampling_freq,
)
results.append(
html.Div(
[
html.H5(
"🔗 MULTIMODAL FUSION",
className="text-success mb-3",
),
html.Div(
[
html.H6(
"🫁 VITALDSP RESULT",
className="text-dark mb-2",
),
html.P(
f"Multimodal RR: {multimodal_rr:.1f} BPM",
className="mb-1",
),
html.P(
"Method: vitalDSP Multimodal Analysis",
className="text-success",
),
],
className="p-3 border border-success rounded bg-light",
),
],
className="mb-4",
)
)
except Exception as fusion_error:
logger.error(
f"vitalDSP multimodal fusion failed: {fusion_error}"
)
results.append(
html.Div(
[
html.H5(
"🔗 MULTIMODAL FUSION",
className="text-danger mb-3",
),
html.Div(
[
html.H6(
"❌ VITALDSP FAILED",
className="text-danger",
),
html.P(
f"Error: {str(fusion_error)}",
className="mb-1",
),
html.P(
"Falling back to basic multimodal...",
className="text-info",
),
],
className="p-3 border border-danger rounded bg-light",
),
],
className="mb-4",
)
)
except Exception as e:
logger.error(f"Multimodal fusion analysis failed: {e}")
results.append(
html.Div(
[
html.H6(
"🔗 Multimodal Fusion Analysis", className="mb-3"
),
html.Div(
[
html.Strong(
"❌ Analysis Failed:",
className="text-danger",
),
html.Br(),
html.Small(
f"Error: {str(e)}", className="text-muted"
),
],
className="mb-3 p-3 border border-danger rounded bg-light",
),
],
className="mb-4",
)
)
# Quality Assessment
if "quality_assessment" in advanced_options:
logger.info("Processing signal quality assessment...")
try:
# Enhanced signal quality metrics
signal_mean = np.mean(signal_data) if len(signal_data) > 0 else 0
signal_std = np.std(signal_data) if len(signal_data) > 0 else 0
signal_min = np.min(signal_data) if len(signal_data) > 0 else 0
signal_max = np.max(signal_data) if len(signal_data) > 0 else 0
dynamic_range = signal_max - signal_min
# Calculate signal-to-noise ratio using different methods
# Method 1: Variance-based SNR
signal_power = np.var(signal_data)
noise_power = np.var(signal_data - signal_mean)
snr_variance = (
10 * np.log10(signal_power / noise_power)
if noise_power > 0
else 0
)
# Method 2: Peak-to-peak SNR
peak_to_peak = signal_max - signal_min
rms_noise = (
np.sqrt(np.mean((signal_data - signal_mean) ** 2))
if len(signal_data) > 0
else 0
)
snr_pp = (
20 * np.log10(peak_to_peak / (2 * rms_noise))
if rms_noise > 0
else 0
)
# Method 3: RMS SNR
rms_signal = (
np.sqrt(np.mean(signal_data**2)) if len(signal_data) > 0 else 0
)
rms_noise_signal = (
np.sqrt(np.mean((signal_data - signal_mean) ** 2))
if len(signal_data) > 0
else 0
)
snr_rms = (
20 * np.log10(rms_signal / rms_noise_signal)
if rms_noise_signal > 0
else 0
)
# Signal quality classification
if snr_variance > 20:
quality_level = "Excellent"
quality_color = "text-success"
elif snr_variance > 15:
quality_level = "Good"
quality_color = "text-info"
elif snr_variance > 10:
quality_level = "Fair"
quality_color = "text-warning"
else:
quality_level = "Poor"
quality_color = "text-danger"
# Calculate additional quality metrics
signal_energy = np.sum(signal_data**2)
zero_crossings = np.sum(np.diff(np.sign(signal_data)) != 0)
results.append(
html.Div(
[
html.H5(
"🎯 SIGNAL QUALITY ASSESSMENT",
className="text-primary mb-2",
),
# Quality Summary
html.Div(
[
html.H6(
"📊 QUALITY SUMMARY",
className="text-dark mb-2",
),
html.P(
f"Overall Quality: {quality_level}",
className=f"mb-1 {quality_color}",
),
html.P(
f"Variance SNR: {snr_variance:.2f} dB",
className="mb-1",
),
html.P(
f"Peak-to-Peak SNR: {snr_pp:.2f} dB",
className="mb-1",
),
html.P(
f"RMS SNR: {snr_rms:.2f} dB",
className="mb-1",
),
],
className="p-2 border border-primary rounded mb-2",
),
# Signal Statistics
html.Div(
[
html.H6(
"📈 SIGNAL STATISTICS",
className="text-dark mb-2",
),
html.P(
f"Dynamic Range: {dynamic_range:.3f}",
className="mb-1",
),
html.P(
f"Signal Power: {signal_power:.3f}",
className="mb-1",
),
html.P(
f"Signal Energy: {signal_energy:.3f}",
className="mb-1",
),
html.P(
f"Zero Crossings: {zero_crossings}",
className="mb-1",
),
],
className="p-2 border border-info rounded mb-2",
),
# Signal Characteristics
html.Div(
[
html.H6(
"🔍 SIGNAL CHARACTERISTICS",
className="text-dark mb-2",
),
html.P(
f"Mean: {signal_mean:.3f}", className="mb-1"
),
html.P(
f"Std Dev: {signal_std:.3f}",
className="mb-1",
),
html.P(
f"Range: {signal_min:.3f} to {signal_max:.3f}",
className="mb-1",
),
html.P(
(
f"CV: {signal_std/abs(signal_mean):.3f}"
if abs(signal_mean) > 0
else "N/A"
),
className="mb-1",
),
],
className="p-2 border border-success rounded",
),
],
className="mb-3",
)
)
logger.info(
f"Signal quality assessment completed: SNR={snr_variance:.2f} dB, Quality={quality_level}"
)
except Exception as e:
logger.error(f"Quality assessment failed: {e}")
results.append(
html.Div(
[
html.H5(
"🎯 SIGNAL QUALITY ASSESSMENT",
className="text-danger mb-3",
),
html.Div(
[
html.H6(
"❌ ANALYSIS FAILED",
className="text-danger",
),
html.P(f"Error: {str(e)}", className="mb-1"),
],
className="p-3 border border-danger rounded bg-light",
),
],
className="mb-4",
)
)
# Display all analysis blocks in inline grid layout
if analysis_blocks:
results.append(
html.Div(
[
html.H6("📊 ANALYSIS RESULTS", className="text-dark mb-2"),
html.Div(
[
dbc.Row(
[
dbc.Col(block, md=3, className="mb-2")
for block in analysis_blocks
],
className="g-2",
)
]
),
],
className="mb-3",
)
)
# Enhanced Signal Analysis
logger.info("Adding enhanced signal analysis section...")
results.append(html.Hr())
results.append(
html.H5("🔍 ENHANCED SIGNAL ANALYSIS", className="text-primary mb-2")
)
# Basic statistics
mean_val = np.mean(signal_data) if len(signal_data) > 0 else 0
std_val = np.std(signal_data) if len(signal_data) > 0 else 0
min_val = np.min(signal_data) if len(signal_data) > 0 else 0
max_val = np.max(signal_data) if len(signal_data) > 0 else 0
rms_val = np.sqrt(np.mean(signal_data**2)) if len(signal_data) > 0 else 0
peak_to_peak = max_val - min_val
# Signal quality metrics
snr_db = 20 * np.log10(rms_val / std_val) if std_val > 0 else 0
crest_factor = peak_to_peak / rms_val if rms_val > 0 else 0
# Frequency domain analysis
fft_result = np.abs(np.fft.rfft(signal_data))
freqs = np.fft.rfftfreq(len(signal_data), 1 / sampling_freq)
# Find dominant frequency
peak_idx = np.argmax(fft_result)
dominant_freq = freqs[peak_idx]
dominant_freq_bpm = dominant_freq * 60
# Respiratory frequency band analysis
resp_mask = (freqs >= 0.1) & (freqs <= 0.5) # 0.1-0.5 Hz (6-30 BPM)
if np.any(resp_mask):
resp_power = np.sum(fft_result[resp_mask])
total_power = np.sum(fft_result)
resp_power_ratio = resp_power / total_power if total_power > 0 else 0
else:
resp_power_ratio = 0
# Breathing pattern classification
try:
peaks, _ = signal.find_peaks(signal_data, distance=int(0.5 * sampling_freq))
if len(peaks) > 1:
breath_intervals = np.diff(peaks) / sampling_freq
mean_interval = np.mean(breath_intervals)
interval_std = np.std(breath_intervals)
cv_intervals = interval_std / mean_interval if mean_interval > 0 else 0
# Classify breathing pattern
if cv_intervals < 0.1:
pattern_type = "Regular"
pattern_color = "text-success"
elif cv_intervals < 0.2:
pattern_type = "Slightly Irregular"
pattern_color = "text-info"
elif cv_intervals < 0.3:
pattern_type = "Irregular"
pattern_color = "text-warning"
else:
pattern_type = "Highly Irregular"
pattern_color = "text-danger"
# Clinical interpretation
if mean_interval < 2.0: # > 30 BPM
clinical_status = "Tachypnea (Rapid Breathing)"
clinical_color = "text-warning"
elif mean_interval > 6.0: # < 10 BPM
clinical_status = "Bradypnea (Slow Breathing)"
clinical_color = "text-warning"
else:
clinical_status = "Normal Breathing Rate"
clinical_color = "text-success"
else:
pattern_type = "Insufficient Data"
pattern_color = "text-muted"
clinical_status = "Cannot Determine"
clinical_color = "text-muted"
cv_intervals = 0
mean_interval = 0
except Exception:
pattern_type = "Analysis Failed"
pattern_color = "text-danger"
clinical_status = "Cannot Determine"
clinical_color = "text-danger"
cv_intervals = 0
mean_interval = 0
# Create comprehensive analysis display
results.append(
html.Div(
[
# Signal Quality Metrics
html.H6("📊 Signal Quality Metrics", className="mb-2"),
html.Div(
[
html.Small(f"SNR: {snr_db:.1f} dB", className="text-muted"),
html.Br(),
html.Small(
f"Crest Factor: {crest_factor:.2f}",
className="text-muted",
),
html.Br(),
html.Small(f"RMS: {rms_val:.3f}", className="text-muted"),
html.Br(),
html.Small(
f"Peak-to-Peak: {peak_to_peak:.3f}",
className="text-muted",
),
],
className="ms-3 mb-2",
),
# Frequency Analysis
html.H6("🌊 Frequency Analysis", className="mb-2"),
html.Div(
[
html.Small(
f"Dominant Frequency: {dominant_freq:.3f} Hz ({dominant_freq_bpm:.1f} BPM)",
className="text-muted",
),
html.Br(),
html.Small(
f"Respiratory Power Ratio: {resp_power_ratio:.1%}",
className="text-muted",
),
],
className="ms-3 mb-2",
),
# Breathing Pattern Analysis
html.H6("🫁 Breathing Pattern Analysis", className="mb-2"),
html.Div(
[
html.Small("Pattern Type: ", className="text-muted"),
html.Span(f"{pattern_type}", className=pattern_color),
html.Br(),
html.Small(
f"Mean Interval: {mean_interval:.2f}s",
className="text-muted",
),
html.Br(),
html.Small(
f"Interval CV: {cv_intervals:.3f}",
className="text-muted",
),
],
className="ms-3 mb-2",
),
# Clinical Interpretation
html.H6("🏥 Clinical Interpretation", className="mb-2"),
html.Div(
[
html.Small("Status: ", className="text-muted"),
html.Span(f"{clinical_status}", className=clinical_color),
],
className="ms-3 mb-2",
),
],
className="p-3 border border-info rounded mb-3",
)
)
# Basic Statistics (simplified)
results.append(
html.Div(
[
html.H6("📈 Basic Statistics", className="mb-2"),
html.Small(f"Mean: {mean_val:.3f}", className="text-muted"),
html.Br(),
html.Small(f"Std: {std_val:.3f}", className="text-muted"),
html.Br(),
html.Small(
f"Range: {min_val:.3f} to {max_val:.3f}", className="text-muted"
),
html.Br(),
html.Small(
f"Duration: {len(signal_data)/sampling_freq:.1f}s",
className="text-muted",
),
],
className="mb-2",
)
)
# Additional Respiratory Metrics
logger.info("Calculating additional respiratory metrics...")
try:
# Calculate respiratory rate variability from peak intervals
peaks, _ = signal.find_peaks(signal_data, distance=int(0.5 * sampling_freq))
logger.info(f"Found {len(peaks)} peaks for additional metrics")
if len(peaks) > 1:
breath_intervals = np.diff(peaks) / sampling_freq
rr_mean = 60 / np.mean(breath_intervals)
rr_std = (
60 * np.std(breath_intervals) / (np.mean(breath_intervals) ** 2)
)
results.append(
html.Div(
[
html.Strong("Respiratory Rate Metrics: "),
html.Br(),
html.Small(
f"Mean RR: {rr_mean:.1f} BPM", className="text-muted"
),
html.Br(),
html.Small(
f"RR Variability: {rr_std:.1f} BPM",
className="text-muted",
),
html.Br(),
html.Small(
f"Number of breaths: {len(peaks)}",
className="text-muted",
),
],
className="mb-2",
)
)
except Exception as e:
logger.warning(f"Additional respiratory metrics failed: {e}")
# Comprehensive Summary
logger.info("Adding comprehensive summary section...")
results.append(html.Hr())
results.append(html.H5("🎯 Comprehensive Summary", className="mb-3"))
# Calculate overall respiratory rate if we have estimates
try:
# Collect all RR estimates from the results
rr_estimates = []
for result in results:
if hasattr(result, "children"):
for child in result.children:
if hasattr(child, "children"):
for subchild in child.children:
if isinstance(subchild, str) and "BPM" in subchild:
try:
rr_val = float(subchild.split()[0])
rr_estimates.append(rr_val)
except Exception:
pass
if rr_estimates:
overall_mean = np.mean(rr_estimates)
overall_std = np.std(rr_estimates)
# Overall assessment
if overall_std < 2.0:
assessment = "Excellent Agreement"
assessment_color = "text-success"
elif overall_std < 4.0:
assessment = "Good Agreement"
assessment_color = "text-info"
elif overall_std < 6.0:
assessment = "Fair Agreement"
assessment_color = "text-warning"
else:
assessment = "Poor Agreement"
assessment_color = "text-danger"
results.append(
html.Div(
[
html.H6("📈 Overall Assessment", className="mb-2"),
html.Div(
[
html.Small(
"Mean Respiratory Rate: ",
className="text-muted",
),
html.Span(
f"{overall_mean:.1f} ± {overall_std:.1f} BPM",
className="text-success",
),
html.Br(),
html.Small(
"Method Agreement: ", className="text-muted"
),
html.Span(
f"{assessment}", className=assessment_color
),
html.Br(),
html.Small(
f"Number of Methods: {len(rr_estimates)}",
className="text-muted",
),
],
className="ms-3 mb-2",
),
],
className="p-3 border border-success rounded mb-3",
)
)
except Exception as e:
logger.warning(f"Summary calculation failed: {e}")
# Recommendations
results.append(
html.Div(
[
html.H6("💡 Recommendations", className="mb-2"),
html.Div(
[
html.Small(
"• Use ensemble method for most reliable estimates",
className="text-muted",
),
html.Br(),
html.Small(
"• Check signal quality if methods disagree significantly",
className="text-muted",
),
html.Br(),
html.Small(
"• Consider preprocessing options for noisy signals",
className="text-muted",
),
html.Br(),
html.Small(
"• Monitor breathing pattern regularity for clinical insights",
className="text-muted",
),
],
className="ms-3 mb-2",
),
],
className="p-3 border border-info rounded mb-3",
)
)
logger.info(f"Final results list length: {len(results)}")
logger.info("=== GENERATE COMPREHENSIVE RESPIRATORY ANALYSIS COMPLETED ===")
return html.Div(results)
except Exception as e:
logger.error(f"Error generating respiratory analysis results: {e}")
return html.Div(
[
html.H5("❌ Analysis Failed", className="text-danger"),
html.P(f"Error: {str(e)}", className="text-muted"),
]
)
[docs]
def create_comprehensive_respiratory_plots(
signal_data,
time_axis,
sampling_freq,
signal_type,
estimation_methods,
advanced_options,
# REMOVED: preprocessing_options, low_cut, high_cut - preprocessing done on filtering page
):
"""Create comprehensive respiratory analysis plots.
Note: Preprocessing (filtering, denoising) should be done on the filtering page.
This function receives already-filtered signal data from store-filtered-signal.
"""
logger.info("=== CREATE COMPREHENSIVE RESPIRATORY PLOTS STARTED ===")
logger.info("Input parameters:")
logger.info(f" - signal_data shape: {signal_data.shape}")
logger.info(f" - time_axis shape: {time_axis.shape}")
logger.info(f" - sampling_freq: {sampling_freq}")
logger.info(f" - signal_type: {signal_type}")
logger.info(f" - estimation_methods: {estimation_methods}")
logger.info(f" - advanced_options: {advanced_options}")
# REMOVED: preprocessing_options, low_cut, high_cut - preprocessing done on filtering page
try:
# PERFORMANCE OPTIMIZATION: Limit plot data to max 5 minutes and 10K points
time_axis_plot, signal_data_plot = limit_plot_data(
time_axis,
signal_data,
max_duration=300, # 5 minutes max
max_points=10000, # 10K points max
)
logger.info(
f"Comprehensive plots data limited: {len(signal_data)} → {len(signal_data_plot)} points"
)
# Create subplots for different analyses
logger.info("Creating subplots...")
fig = make_subplots(
rows=3,
cols=2,
subplot_titles=(
"Respiratory Signal (Time Domain)",
"Frequency Spectrum (FFT)",
"Breathing Patterns & Peaks",
"Respiratory Rate Variability",
"Advanced Analysis",
"Signal Statistics",
),
specs=[
[{"secondary_y": False}, {"secondary_y": False}],
[{"secondary_y": False}, {"secondary_y": False}],
[{"secondary_y": False}, {"secondary_y": False}],
],
vertical_spacing=0.10,
horizontal_spacing=0.10,
)
# 1. Time Domain Signal (using limited data)
# REMOVED: Preprocessing logic - signal data is already filtered from filtering page
fig.add_trace(
go.Scatter(
x=time_axis_plot,
y=signal_data_plot,
mode="lines",
name="Respiratory Signal",
line=dict(color="#2E86AB", width=2.5),
hovertemplate="<b>Time:</b> %{x:.2f}s<br><b>Amplitude:</b> %{y:.3f}<extra></extra>",
),
row=1,
col=1,
)
# 2. Frequency Domain
fft_result = np.abs(np.fft.rfft(signal_data_plot))
freqs = np.fft.rfftfreq(len(signal_data_plot), 1 / sampling_freq)
fig.add_trace(
go.Scatter(
x=freqs,
y=fft_result,
mode="lines",
name="FFT Spectrum",
line=dict(color="#9B59B6", width=2),
),
row=1,
col=2,
)
# Highlight respiratory frequency band
resp_mask = (freqs >= 0.1) & (freqs <= 0.5)
if np.any(resp_mask):
fig.add_trace(
go.Scatter(
x=freqs[resp_mask],
y=fft_result[resp_mask],
mode="lines",
name="Respiratory Band",
line=dict(color="#E67E22", width=3),
),
row=1,
col=2,
)
# Add respiratory frequency annotations
peak_freq_idx = np.argmax(fft_result[resp_mask])
peak_freq = freqs[resp_mask][peak_freq_idx]
peak_magnitude = fft_result[resp_mask][peak_freq_idx]
fig.add_annotation(
x=peak_freq,
y=peak_magnitude,
text=f"Peak: {peak_freq:.2f} Hz<br>({peak_freq*60:.1f} BPM)",
showarrow=True,
arrowhead=2,
arrowsize=1,
arrowwidth=2,
arrowcolor="#E67E22",
ax=0.1,
ay=-40,
bgcolor="rgba(255,255,255,0.8)",
bordercolor="#E67E22",
)
# 3. Breathing Pattern & Respiratory Rate Variability
if estimation_methods and "peak_detection" in estimation_methods:
try:
prominence = 0.3 * np.std(signal_data_plot)
distance = int(0.5 * sampling_freq)
peaks, _ = signal.find_peaks(
signal_data_plot, prominence=prominence, distance=distance
)
if len(peaks) > 0:
fig.add_trace(
go.Scatter(
x=time_axis_plot[peaks],
y=signal_data_plot[peaks],
mode="markers",
name="Breathing Peaks",
marker=dict(color="red", size=8, symbol="diamond"),
),
row=2,
col=1,
)
# Add breath intervals
if len(peaks) > 1:
breath_intervals = np.diff(peaks) / sampling_freq
interval_times = time_axis_plot[peaks[1:]]
# Calculate respiratory rate over time
rr_over_time = 60.0 / breath_intervals # Convert to BPM
fig.add_trace(
go.Scatter(
x=interval_times,
y=breath_intervals,
mode="lines+markers",
name="Breath Intervals",
line=dict(color="#27AE60", width=2),
),
row=2,
col=2,
)
# Add respiratory rate variability analysis
if len(rr_over_time) > 2:
# Calculate moving average of RR
window_size = min(5, len(rr_over_time))
if window_size > 1:
rr_moving_avg = np.convolve(
rr_over_time,
np.ones(window_size) / window_size,
mode="same",
)
fig.add_trace(
go.Scatter(
x=interval_times,
y=rr_moving_avg,
mode="lines",
name="RR Moving Average",
line=dict(color="#8E44AD", width=2, dash="dot"),
),
row=2,
col=2,
)
# Add RRV statistics annotation
rr_mean = np.mean(rr_over_time)
rr_std = np.std(rr_over_time)
rr_cv = rr_std / rr_mean if rr_mean > 0 else 0
fig.add_annotation(
x=interval_times[len(interval_times) // 2],
y=np.max(breath_intervals),
text=f"RRV: {rr_cv:.3f}<br>Mean RR: {rr_mean:.1f} BPM<br>Std: {rr_std:.1f}",
showarrow=False,
bgcolor="rgba(255,255,255,0.9)",
bordercolor="#27AE60",
borderwidth=1,
font=dict(size=10),
)
except Exception as e:
logger.error(f"Breathing pattern analysis failed: {e}")
# 4. Sleep Apnea Detection
if advanced_options and "sleep_apnea" in advanced_options:
try:
# Amplitude-based apnea detection
apnea_threshold = 0.3 * np.std(signal_data_plot)
if detect_apnea_amplitude is None:
logger.warning(
"detect_apnea_amplitude function not available for plots"
)
apnea_events = []
else:
apnea_events = detect_apnea_amplitude(
signal_data_plot,
sampling_freq,
threshold=apnea_threshold,
min_duration=5,
)
# Pause-based apnea detection
if detect_apnea_pauses is None:
logger.warning(
"detect_apnea_pauses function not available for plots"
)
pause_apnea_events = []
else:
pause_apnea_events = detect_apnea_pauses(
signal_data_plot, sampling_freq, min_pause_duration=5
)
# Plot all apnea events
all_apnea_events = apnea_events + pause_apnea_events
if all_apnea_events:
for start, end in all_apnea_events:
start_idx = int(start * sampling_freq)
end_idx = int(end * sampling_freq)
if start_idx < len(time_axis_plot) and end_idx < len(
time_axis_plot
):
fig.add_trace(
go.Scatter(
x=time_axis_plot[start_idx:end_idx],
y=signal_data_plot[start_idx:end_idx],
mode="lines",
name="Apnea Event",
line=dict(color="red", width=3),
),
row=3,
col=1,
)
# Add apnea threshold line
fig.add_hline(
y=apnea_threshold,
line_dash="dash",
line_color="red",
annotation_text="Apnea Threshold",
row=3,
col=1,
)
fig.add_hline(
y=-apnea_threshold,
line_dash="dash",
line_color="red",
row=3,
col=1,
)
except Exception as e:
logger.error(f"Sleep apnea detection failed: {e}")
# 5. Signal Quality & Ensemble Method Comparison
try:
# Calculate moving average for trend
window_size = int(0.5 * sampling_freq)
if window_size > 0:
moving_avg = np.convolve(
signal_data_plot, np.ones(window_size) / window_size, mode="same"
)
fig.add_trace(
go.Scatter(
x=time_axis_plot,
y=moving_avg,
mode="lines",
name="Moving Average",
line=dict(color="#F39C12", width=2),
),
row=3,
col=2,
)
# Add signal envelope
upper_envelope = moving_avg + 2 * np.std(signal_data_plot)
lower_envelope = moving_avg - 2 * np.std(signal_data_plot)
fig.add_trace(
go.Scatter(
x=time_axis_plot,
y=upper_envelope,
mode="lines",
name="Upper Envelope",
line=dict(color="#E74C3C", width=1, dash="dot"),
opacity=0.7,
),
row=3,
col=2,
)
fig.add_trace(
go.Scatter(
x=time_axis_plot,
y=lower_envelope,
mode="lines",
name="Lower Envelope",
line=dict(color="#E74C3C", width=1, dash="dot"),
opacity=0.7,
),
row=3,
col=2,
)
# Add ensemble method comparison if multiple methods are selected
if (
estimation_methods
and len(estimation_methods) > 1
and "ensemble" in estimation_methods
):
try:
# Create a mini-ensemble visualization
ensemble_window = int(2.0 * sampling_freq) # 2-second windows
if (
ensemble_window > 0
and len(signal_data_plot) > ensemble_window
):
# Calculate local statistics in windows
n_windows = len(signal_data_plot) // ensemble_window
window_means = []
window_stds = []
window_times = []
for i in range(n_windows):
start_idx = i * ensemble_window
end_idx = start_idx + ensemble_window
window_data = signal_data_plot[start_idx:end_idx]
window_means.append(np.mean(window_data))
window_stds.append(np.std(window_data))
window_times.append(
time_axis_plot[start_idx + ensemble_window // 2]
)
# Add ensemble stability indicator
fig.add_trace(
go.Scatter(
x=window_times,
y=window_means,
mode="lines+markers",
name="Local Ensemble Mean",
line=dict(color="#2ECC71", width=2),
marker=dict(size=4),
),
row=3,
col=2,
)
# Add ensemble variance indicator
fig.add_trace(
go.Scatter(
x=window_times,
y=window_stds,
mode="lines",
name="Local Ensemble Std",
line=dict(color="#E67E22", width=1, dash="dot"),
opacity=0.7,
),
row=3,
col=2,
)
except Exception as e:
logger.error(f"Ensemble visualization failed: {e}")
except Exception as e:
logger.error(f"Signal quality analysis failed: {e}")
# Update layout
logger.info("Updating plot layout...")
fig.update_layout(
title="Comprehensive Respiratory Analysis",
template="plotly_white",
height=900,
showlegend=True,
margin=dict(l=50, r=50, t=100, b=50),
hovermode="closest",
)
# Update axes labels with gridlines for better visibility
logger.info("Updating axes labels and gridlines...")
fig.update_xaxes(
title_text="Time (s)",
showgrid=True,
gridwidth=1,
gridcolor="LightGray",
row=1,
col=1,
)
fig.update_yaxes(
title_text="Amplitude",
showgrid=True,
gridwidth=1,
gridcolor="LightGray",
row=1,
col=1,
)
fig.update_xaxes(
title_text="Frequency (Hz)",
showgrid=True,
gridwidth=1,
gridcolor="LightGray",
row=1,
col=2,
)
fig.update_yaxes(
title_text="Magnitude",
showgrid=True,
gridwidth=1,
gridcolor="LightGray",
row=1,
col=2,
)
fig.update_xaxes(
title_text="Time (s)",
showgrid=True,
gridwidth=1,
gridcolor="LightGray",
row=2,
col=1,
)
fig.update_yaxes(
title_text="Amplitude",
showgrid=True,
gridwidth=1,
gridcolor="LightGray",
row=2,
col=1,
)
fig.update_xaxes(
title_text="Time (s)",
showgrid=True,
gridwidth=1,
gridcolor="LightGray",
row=2,
col=2,
)
fig.update_yaxes(
title_text="Interval (s) / RR (BPM)",
showgrid=True,
gridwidth=1,
gridcolor="LightGray",
row=2,
col=2,
)
fig.update_xaxes(
title_text="Time (s)",
showgrid=True,
gridwidth=1,
gridcolor="LightGray",
row=3,
col=1,
)
fig.update_yaxes(
title_text="Amplitude",
showgrid=True,
gridwidth=1,
gridcolor="LightGray",
row=3,
col=1,
)
fig.update_xaxes(
title_text="Time (s)",
showgrid=True,
gridwidth=1,
gridcolor="LightGray",
row=3,
col=2,
)
fig.update_yaxes(
title_text="Statistics",
showgrid=True,
gridwidth=1,
gridcolor="LightGray",
row=3,
col=2,
)
logger.info("=== CREATE COMPREHENSIVE RESPIRATORY PLOTS COMPLETED ===")
return fig
except Exception as e:
logger.error(f"Error creating comprehensive respiratory plots: {e}")
return create_empty_figure()