"""
Comprehensive respiratory rate analysis callbacks for vitalDSP webapp.
Handles all respiratory features including estimation methods, sleep apnea detection, and fusion analysis.
"""
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from dash import Input, Output, State, callback_context, no_update, html
from dash.exceptions import PreventUpdate
from scipy import signal
import logging
logger = logging.getLogger(__name__)
[docs]
def detect_respiratory_signal_type(signal_data, sampling_freq):
"""Auto-detect if signal is respiratory or cardiac based on frequency content."""
try:
# Validate sampling frequency
if sampling_freq <= 0:
return "unknown"
# Compute FFT
fft_result = np.fft.fft(signal_data)
fft_freq = np.fft.fftfreq(len(signal_data), 1 / sampling_freq)
# Focus on positive frequencies
positive_mask = fft_freq > 0
fft_freq = fft_freq[positive_mask]
fft_magnitude = np.abs(fft_result[positive_mask])
# Check for constant or near-constant signals
if (
np.std(signal_data) < 1e-10
): # Very small standard deviation indicates constant signal
return "unknown"
# Find dominant frequency
dominant_idx = np.argmax(fft_magnitude)
dominant_freq = fft_freq[dominant_idx]
# Respiratory signals typically have lower frequencies (0.1-0.5 Hz)
# Cardiac signals typically have higher frequencies (0.8-2.0 Hz)
if dominant_freq < 0.5:
return "respiratory"
else:
return "cardiac"
except Exception:
return "unknown"
[docs]
def create_respiratory_signal_plot(
signal_data,
time_axis,
sampling_freq,
signal_type,
estimation_methods,
preprocessing_options,
low_cut,
high_cut,
):
"""Create the main respiratory signal plot with annotations."""
try:
# Validate inputs
if signal_data is None or time_axis is None or sampling_freq is None:
return create_empty_figure()
fig = go.Figure()
# Add main signal
fig.add_trace(
go.Scatter(
x=time_axis,
y=signal_data,
mode="lines",
name=f"{signal_type.title()} Signal",
line=dict(color="blue", width=1),
)
)
# Add preprocessing info if applied
if preprocessing_options and any(preprocessing_options):
fig.add_trace(
go.Scatter(
x=time_axis,
y=signal_data, # This would be the preprocessed signal in real implementation
mode="lines",
name="Preprocessed Signal",
line=dict(color="red", width=1, dash="dash"),
)
)
# Add filter frequency lines if specified
if low_cut:
fig.add_hline(
y=np.min(signal_data),
line_dash="dot",
line_color="green",
annotation_text=f"Low Cut: {low_cut} Hz",
)
if high_cut:
fig.add_hline(
y=np.max(signal_data),
line_dash="dot",
line_color="orange",
annotation_text=f"High Cut: {high_cut} Hz",
)
fig.update_layout(
title=f"Respiratory Signal Analysis - {signal_type.title()}",
xaxis_title="Time (seconds)",
yaxis_title="Amplitude",
showlegend=True,
height=400,
plot_bgcolor="white",
margin=dict(l=60, r=60, t=80, b=60),
)
return fig
except Exception as e:
logger.error(f"Error creating respiratory signal plot: {e}")
return create_empty_figure()
[docs]
def generate_comprehensive_respiratory_analysis(
signal_data,
time_axis,
sampling_freq,
signal_type,
estimation_methods,
advanced_options,
preprocessing_options,
low_cut,
high_cut,
min_breath_duration,
max_breath_duration,
):
"""Generate comprehensive respiratory analysis results using vitalDSP."""
try:
results = []
# Basic statistics
results.append(html.H5("Signal Statistics"))
results.append(html.P(f"Signal Type: {signal_type.title()}"))
results.append(html.P(f"Duration: {time_axis[-1]:.2f} seconds"))
results.append(html.P(f"Sampling Frequency: {sampling_freq} Hz"))
results.append(html.P(f"Signal Length: {len(signal_data)} samples"))
results.append(html.P(f"Mean Amplitude: {np.mean(signal_data):.4f}"))
results.append(html.P(f"Std Amplitude: {np.std(signal_data):.4f}"))
# Estimation methods used
if estimation_methods:
results.append(html.Hr())
results.append(html.H6("Estimation Methods"))
for method in estimation_methods:
results.append(html.P(f"• {method.replace('_', ' ').title()}"))
# Preprocessing options
if preprocessing_options and any(preprocessing_options):
results.append(html.Hr())
results.append(html.H6("Preprocessing Applied"))
for option in preprocessing_options:
if option:
results.append(html.P(f"• {option.replace('_', ' ').title()}"))
# Filter parameters
if low_cut or high_cut:
results.append(html.Hr())
results.append(html.H6("Filter Parameters"))
if low_cut:
results.append(html.P(f"Low Cutoff: {low_cut} Hz"))
if high_cut:
results.append(html.P(f"High Cutoff: {high_cut} Hz"))
# Breath duration constraints
if min_breath_duration or max_breath_duration:
results.append(html.Hr())
results.append(html.H6("Breath Duration Constraints"))
if min_breath_duration:
results.append(
html.P(f"Minimum Duration: {min_breath_duration} seconds")
)
if max_breath_duration:
results.append(
html.P(f"Maximum Duration: {max_breath_duration} seconds")
)
# Try to compute respiratory rate using vitalDSP if available
try:
from vitalDSP.respiratory_analysis.respiratory_analysis import (
RespiratoryAnalysis,
)
from vitalDSP.preprocess.preprocess_operations import PreprocessConfig
# Create preprocessing configuration with proper respiratory parameters
preprocess_config = PreprocessConfig()
if preprocessing_options and "filter" in preprocessing_options:
preprocess_config.filter_type = "bandpass"
preprocess_config.lowcut = low_cut or 0.1
preprocess_config.highcut = (
high_cut or 0.5
) # Fixed: was 0.8, should be 0.5 for respiratory
preprocess_config.respiratory_mode = (
True # Enable respiratory-specific preprocessing
)
else:
# Even without explicit filtering, use respiratory-appropriate defaults
preprocess_config.filter_type = "bandpass"
preprocess_config.lowcut = 0.1
preprocess_config.highcut = 0.5
preprocess_config.respiratory_mode = True
# Initialize respiratory analysis
ra = RespiratoryAnalysis(signal_data, sampling_freq)
# Compute respiratory rate using different methods
if estimation_methods:
results.append(html.Hr())
results.append(html.H6("Respiratory Rate Estimates"))
for method in estimation_methods:
try:
if method == "peak_detection":
rr = ra.compute_respiratory_rate(
method="peaks",
min_breath_duration=min_breath_duration or 0.1,
max_breath_duration=max_breath_duration or 6,
preprocess_config=preprocess_config,
)
results.append(
html.P(f"• Peak Detection: {rr:.2f} breaths/min")
)
elif method == "fft_based":
rr = ra.compute_respiratory_rate(
method="fft_based", preprocess_config=preprocess_config
)
results.append(html.P(f"• FFT Based: {rr:.2f} breaths/min"))
elif method == "frequency_domain":
rr = ra.compute_respiratory_rate(
method="frequency_domain",
preprocess_config=preprocess_config,
)
results.append(
html.P(f"• Frequency Domain: {rr:.2f} breaths/min")
)
elif method == "time_domain":
rr = ra.compute_respiratory_rate(
method="time_domain",
preprocess_config=preprocess_config,
)
results.append(
html.P(f"• Time Domain: {rr:.2f} breaths/min")
)
elif method == "counting":
rr = ra.compute_respiratory_rate(
method="counting",
min_breath_duration=min_breath_duration or 0.1,
max_breath_duration=max_breath_duration or 6,
preprocess_config=preprocess_config,
)
results.append(html.P(f"• Counting: {rr:.2f} breaths/min"))
elif method == "zero_crossing":
rr = ra.compute_respiratory_rate(
method="zero_crossing",
min_breath_duration=min_breath_duration or 0.1,
max_breath_duration=max_breath_duration or 6,
preprocess_config=preprocess_config,
)
results.append(
html.P(f"• Zero Crossing: {rr:.2f} breaths/min")
)
except Exception as e:
logger.warning(f"Failed to compute RR using {method}: {e}")
results.append(
html.P(
f"• {method.replace('_', ' ').title()}: Failed - {str(e)}"
)
)
# Add comparison with fallback implementation for debugging
results.append(html.Hr())
results.append(html.H6("🔍 Debug: Fallback vs vitalDSP Comparison"))
# Fallback FFT implementation (the one giving good results)
try:
fft_signal = np.fft.fft(signal_data)
freqs = np.fft.fftfreq(len(signal_data), 1 / sampling_freq)
# Focus on respiratory frequency range (0.1-0.5 Hz = 6-30 BPM)
resp_mask = (freqs > 0.1) & (freqs < 0.5)
resp_freqs = freqs[resp_mask]
resp_fft = np.abs(fft_signal[resp_mask])
if len(resp_freqs) > 0:
dominant_freq_idx = np.argmax(resp_fft)
dominant_freq = resp_freqs[dominant_freq_idx]
rr_fallback = dominant_freq * 60 # Convert to BPM
results.append(
html.P(
f"• Fallback FFT: {rr_fallback:.2f} breaths/min",
className="text-info",
)
)
else:
results.append(
html.P(
"• Fallback FFT: No frequencies in range",
className="text-warning",
)
)
except Exception as e:
results.append(
html.P(
f"• Fallback FFT: Failed - {str(e)}", className="text-danger"
)
)
# Fallback peak detection
try:
peaks, _ = signal.find_peaks(
signal_data,
height=np.mean(signal_data) + np.std(signal_data),
distance=int(0.5 * sampling_freq),
)
if len(peaks) > 1:
intervals = np.diff(peaks) / sampling_freq
rr_fallback_peaks = 60 / np.mean(intervals)
results.append(
html.P(
f"• Fallback Peak Detection: {rr_fallback_peaks:.2f} breaths/min",
className="text-info",
)
)
else:
results.append(
html.P(
"• Fallback Peak Detection: Insufficient peaks",
className="text-warning",
)
)
except Exception as e:
results.append(
html.P(
f"• Fallback Peak Detection: Failed - {str(e)}",
className="text-danger",
)
)
# Signal quality assessment
results.append(html.Hr())
results.append(html.H6("📊 Signal Quality Assessment"))
results.append(html.P(f"Signal Length: {len(signal_data)} samples"))
results.append(
html.P(f"Duration: {len(signal_data)/sampling_freq:.2f} seconds")
)
results.append(html.P(f"Mean: {np.mean(signal_data):.4f}"))
results.append(html.P(f"Std: {np.std(signal_data):.4f}"))
results.append(
html.P(
f"SNR Estimate: {np.std(signal_data)/np.mean(np.abs(signal_data)):.2f}"
)
)
if advanced_options:
results.append(html.Hr())
results.append(html.H6("Advanced Analysis"))
for option in advanced_options:
try:
if option == "sleep_apnea":
from vitalDSP.respiratory_analysis.sleep_apnea_detection.amplitude_threshold import (
detect_apnea_amplitude,
)
apnea_events = detect_apnea_amplitude(
signal_data,
sampling_freq,
threshold=np.mean(signal_data) * 0.5,
min_duration=10,
)
results.append(
html.P(
f"• Sleep Apnea Events: {len(apnea_events)} detected"
)
)
elif option == "multimodal":
from vitalDSP.respiratory_analysis.fusion.multimodal_analysis import (
multimodal_analysis,
)
# For multimodal, we'd need multiple signals, so just show capability
results.append(
html.P(
"• Multimodal Analysis: Available for multiple signals"
)
)
elif option == "ppg_ecg_fusion":
from vitalDSP.respiratory_analysis.fusion.ppg_ecg_fusion import (
ppg_ecg_fusion,
)
# For fusion, we'd need both PPG and ECG signals
results.append(
html.P("• PPG-ECG Fusion: Available for dual signals")
)
elif option == "resp_cardiac_fusion":
from vitalDSP.respiratory_analysis.fusion.respiratory_cardiac_fusion import (
respiratory_cardiac_fusion,
)
results.append(
html.P("• Respiratory-Cardiac Fusion: Available")
)
except Exception as e:
logger.warning(f"Failed to perform {option} analysis: {e}")
results.append(
html.P(f"• {option.replace('_', ' ').title()}: Failed")
)
except ImportError as e:
logger.warning(f"vitalDSP respiratory modules not available: {e}")
results.append(html.Hr())
results.append(html.H6("Analysis Status"))
results.append(
html.P(
"vitalDSP respiratory modules not available for detailed analysis"
)
)
return results
except Exception as e:
logger.error(f"Error generating respiratory analysis results: {e}")
return [html.H5("Error"), html.P(f"Failed to generate results: {str(e)}")]
[docs]
def create_comprehensive_respiratory_plots(
signal_data,
time_axis,
sampling_freq,
signal_type,
estimation_methods,
advanced_options,
preprocessing_options,
low_cut,
high_cut,
min_breath_duration,
max_breath_duration,
):
"""Create comprehensive respiratory analysis plots."""
try:
# Create subplots
fig = make_subplots(
rows=2,
cols=2,
subplot_titles=(
"Time Domain",
"Frequency Domain",
"Power Spectral Density",
"Signal Quality",
),
specs=[[{"type": "xy"}, {"type": "xy"}], [{"type": "xy"}, {"type": "xy"}]],
)
# Time domain plot
fig.add_trace(
go.Scatter(x=time_axis, y=signal_data, mode="lines", name="Signal"),
row=1,
col=1,
)
# Frequency domain plot
fft_result = np.fft.fft(signal_data)
fft_freq = np.fft.fftfreq(len(signal_data), 1 / sampling_freq)
positive_mask = fft_freq > 0
fft_freq_positive = fft_freq[positive_mask]
fft_magnitude = np.abs(fft_result[positive_mask])
fig.add_trace(
go.Scatter(
x=fft_freq_positive, y=fft_magnitude, mode="lines", name="FFT Magnitude"
),
row=1,
col=2,
)
# Power Spectral Density
freqs, psd = signal.welch(
signal_data, sampling_freq, nperseg=min(256, len(signal_data) // 4)
)
fig.add_trace(
go.Scatter(x=freqs, y=psd, mode="lines", name="PSD"), row=2, col=1
)
# Signal quality metrics
# Calculate signal quality index
signal_quality = (
np.std(signal_data) / np.mean(np.abs(signal_data))
if np.mean(np.abs(signal_data)) > 0
else 0
)
fig.add_trace(
go.Bar(x=["Signal Quality"], y=[signal_quality], name="Quality Index"),
row=2,
col=2,
)
# Update layout
fig.update_layout(
title="Comprehensive Respiratory Analysis",
height=600,
showlegend=False,
plot_bgcolor="white",
margin=dict(l=60, r=60, t=80, b=60),
)
# Update all subplot axes
for i in range(1, 3):
for j in range(1, 3):
fig.update_xaxes(
showgrid=True,
gridwidth=1,
gridcolor="rgba(128, 128, 128, 0.2)",
row=i,
col=j,
)
fig.update_yaxes(
showgrid=True,
gridwidth=1,
gridcolor="rgba(128, 128, 128, 0.2)",
row=i,
col=j,
)
return fig
except Exception as e:
logger.error(f"Error creating comprehensive respiratory plots: {e}")
return create_empty_figure()
[docs]
def register_respiratory_callbacks(app):
"""Register all respiratory analysis callbacks."""
logger.info("=== REGISTERING RESPIRATORY CALLBACKS ===")
@app.callback(
[
Output("resp-main-plot", "figure"),
Output("resp-analysis-results", "children"),
Output("resp-analysis-plots", "figure"),
Output("resp-data-store", "data"),
Output("resp-features-store", "data"),
],
[
Input("url", "pathname"),
Input("resp-analyze-btn", "n_clicks"),
Input("resp-btn-nudge-m10", "n_clicks"),
Input("resp-btn-nudge-m1", "n_clicks"),
Input("resp-btn-nudge-p1", "n_clicks"),
Input("resp-btn-nudge-p10", "n_clicks"),
],
[
State(
"resp-start-position-slider", "value"
), # NEW: start position instead of time-range-slider
State(
"resp-duration-select", "value"
), # NEW: duration instead of start-time/end-time
State("resp-signal-type", "value"),
State("resp-signal-source-select", "value"),
State("resp-estimation-methods", "value"),
State("resp-advanced-options", "value"),
State("resp-preprocessing-options", "value"),
State("resp-low-cut", "value"),
State("resp-high-cut", "value"),
State("resp-min-breath-duration", "value"),
State("resp-max-breath-duration", "value"),
],
)
def respiratory_analysis_callback(
pathname,
n_clicks,
nudge_m10,
nudge_m1,
nudge_p1,
nudge_p10,
start_position, # NEW: start position instead of slider_value
duration, # NEW: duration instead of start_time/end_time
signal_type,
signal_source,
estimation_methods,
advanced_options,
preprocessing_options,
low_cut,
high_cut,
min_breath_duration,
max_breath_duration,
):
"""Comprehensive respiratory rate analysis using all vitalDSP respiratory features."""
ctx = callback_context
# Determine what triggered this callback
if not ctx.triggered:
logger.warning("No context triggered - raising PreventUpdate")
raise PreventUpdate
trigger_id = ctx.triggered[0]["prop_id"].split(".")[0]
logger.info("=== RESPIRATORY ANALYSIS CALLBACK TRIGGERED ===")
logger.info(f"Trigger ID: {trigger_id}")
logger.info(f"Pathname: {pathname}")
logger.info(f"Signal type: {signal_type}")
logger.info(f"Estimation methods: {estimation_methods}")
logger.info(f"Advanced options: {advanced_options}")
logger.info(f"Preprocessing options: {preprocessing_options}")
# Only run this when we're on the respiratory page
if pathname != "/respiratory":
logger.info("Not on respiratory page, returning empty figures")
return (
create_empty_figure(),
"Navigate to respiratory page to analyze respiratory signals.",
create_empty_figure(),
None,
None,
)
# Handle first-time loading
if not ctx.triggered or ctx.triggered[0]["prop_id"].split(".")[0] == "url":
logger.info("First time loading respiratory page, attempting to load data")
try:
# Get data from the data service
logger.info("Attempting to get data service...")
from vitalDSP_webapp.services.data.enhanced_data_service import (
get_enhanced_data_service,
)
data_service = get_enhanced_data_service()
logger.info("Data service retrieved successfully")
# Get the most recent data
logger.info("Retrieving all data from service...")
all_data = data_service.get_all_data()
logger.info(
f"All data keys: {list(all_data.keys()) if all_data else 'None'}"
)
if not all_data:
logger.warning("No data found in service")
return (
create_empty_figure(),
"No data available. Please upload and process data first.",
create_empty_figure(),
None,
None,
)
# Get the most recent data entry
latest_data_id = list(all_data.keys())[-1]
latest_data = all_data[latest_data_id]
logger.info(f"Latest data ID: {latest_data_id}")
logger.info(f"Latest data info: {latest_data.get('info', 'No info')}")
# Get column mapping
logger.info("Retrieving column mapping...")
column_mapping = data_service.get_column_mapping(latest_data_id)
logger.info(f"Column mapping: {column_mapping}")
if not column_mapping:
logger.warning(
"Data has not been processed yet - no column mapping found"
)
return (
create_empty_figure(),
"Please process your data on the Upload page first (configure column mapping)",
create_empty_figure(),
None,
None,
)
# Get the actual data
logger.info("Retrieving data frame...")
df = data_service.get_data(latest_data_id)
logger.info(f"Data frame shape: {df.shape if df is not None else 'None'}")
logger.info(
f"Data frame columns: {list(df.columns) if df is not None else 'None'}"
)
if df is None or df.empty:
logger.warning("Data frame is empty")
return (
create_empty_figure(),
"Data is empty or corrupted.",
create_empty_figure(),
None,
None,
)
# Get sampling frequency from the data info
sampling_freq = latest_data.get("info", {}).get("sampling_freq", 1000)
logger.info(f"Sampling frequency: {sampling_freq}")
# Handle time window adjustments for nudge buttons
# Note: Using start_position and duration from function parameters
if trigger_id in [
"resp-btn-nudge-m10",
"resp-btn-nudge-m1",
"resp-btn-nudge-p1",
"resp-btn-nudge-p10",
]:
# Initialize from parameters if not set
if start_position is None:
start_position = 0
if duration is None:
duration = 10
if trigger_id == "resp-btn-nudge-m10":
start_position = max(0, start_position - 10)
elif trigger_id == "resp-btn-nudge-m1":
start_position = max(0, start_position - 1)
elif trigger_id == "resp-btn-nudge-p1":
start_position = start_position + 1
elif trigger_id == "resp-btn-nudge-p10":
start_position = start_position + 10
logger.info(
f"Time window adjusted: start={start_position}, duration={duration}"
)
# Set default time window if not specified
if start_position is None:
start_position = 0
if duration is None:
duration = 10
# Calculate end time from start_position and duration
start_time = start_position
end_time = start_position + duration
logger.info(f"Using time window: {start_time} to {end_time}")
# Apply time window
start_sample = int(start_time * sampling_freq)
end_sample = int(end_time * sampling_freq)
windowed_data = df.iloc[start_sample:end_sample].copy()
# Create time axis
time_axis = np.arange(len(windowed_data)) / sampling_freq
# Get signal column
signal_column = column_mapping.get("signal")
logger.info(f"Signal column from mapping: {signal_column}")
logger.info(
f"Available columns in windowed data: {list(windowed_data.columns)}"
)
if not signal_column or signal_column not in windowed_data.columns:
logger.warning(f"Signal column {signal_column} not found in data")
return (
create_empty_figure(),
"Signal column not found in data.",
create_empty_figure(),
None,
None,
)
signal_data = windowed_data[signal_column].values
# Initialize filter_info for signal source loading
filter_info = None
# Check if we need to apply dynamic filtering for filtered signal
if signal_source == "filtered" and filter_info is not None:
# Check if the time range is within the original signal range
original_signal_length = len(df[signal_column].values)
expected_length = end_sample - start_sample
# If time range is outside original signal or we need dynamic filtering
if (
start_sample >= original_signal_length
or end_sample > original_signal_length
or expected_length != len(signal_data)
):
logger.info("=== DYNAMIC FILTERING ===")
logger.info(f"Original signal length: {original_signal_length}")
logger.info(f"Time range: {start_sample} to {end_sample}")
logger.info(f"Expected window length: {expected_length}")
logger.info("Applying dynamic filtering to current window...")
# If time range is completely outside original signal, return error
if (
start_sample >= original_signal_length
or end_sample > original_signal_length
):
logger.warning(
f"Time range {start_sample}-{end_sample} is outside original signal range (0-{original_signal_length})"
)
return (
create_empty_figure(),
f"Time range is outside the available signal data. Please select a time range within 0 to {original_signal_length/sampling_freq:.1f} seconds.",
create_empty_figure(),
None,
None,
)
try:
# Import the same filtering function used in time domain
from vitalDSP_webapp.callbacks.analysis.signal_filtering_callbacks import (
apply_traditional_filter,
)
# Get the full original signal for the current time window
full_original_signal = df[signal_column].values
windowed_original_signal = full_original_signal[
start_sample:end_sample
]
# Get filter parameters
parameters = filter_info.get("parameters", {})
detrending_applied = filter_info.get(
"detrending_applied", False
)
# Apply detrending if it was applied in the original filtering
if detrending_applied:
from scipy import signal as scipy_signal
signal_data_detrended = scipy_signal.detrend(
windowed_original_signal
)
logger.info("Applied detrending to signal")
else:
signal_data_detrended = windowed_original_signal
# Apply the same filter type as used in filtering screen
filter_type = filter_info.get("filter_type", "traditional")
if filter_type == "traditional":
# Extract traditional filter parameters
filter_family = parameters.get("filter_family", "butter")
filter_response = parameters.get(
"filter_response", "bandpass"
)
low_freq = parameters.get("low_freq", 0.5)
high_freq = parameters.get("high_freq", 5)
filter_order = parameters.get("filter_order", 4)
# Apply traditional filter
signal_data = apply_traditional_filter(
signal_data_detrended,
sampling_freq,
filter_family,
filter_response,
low_freq,
high_freq,
filter_order,
)
logger.info("Applied dynamic traditional filter")
else:
# For other filter types, use the original signal
signal_data = signal_data_detrended
logger.info(
f"Using original signal for filter type: {filter_type}"
)
signal_source_info = "Filtered Signal (Dynamic)"
logger.info("Dynamic filtering completed successfully")
except Exception as e:
logger.error(f"Error in dynamic filtering: {e}")
logger.info("Falling back to original signal")
signal_data = windowed_original_signal
signal_source_info = "Original Signal (Fallback)"
logger.info(f"Signal data shape: {signal_data.shape}")
logger.info(
f"Signal data range: {np.min(signal_data):.3f} to {np.max(signal_data):.3f}"
)
logger.info(f"Signal data mean: {np.mean(signal_data):.3f}")
# Signal source loading logic
logger.info("=== SIGNAL SOURCE LOADING ===")
logger.info(f"Signal source selection: {signal_source}")
if signal_source == "filtered":
# Try to load filtered data from filtering screen
filtered_data = data_service.get_filtered_data(latest_data_id)
filter_info = data_service.get_filter_info(latest_data_id)
if filtered_data is not None:
logger.info(
f"Found filtered data with shape: {filtered_data.shape}"
)
# Apply time window to filtered data
filtered_windowed = filtered_data[start_sample:end_sample]
signal_data = filtered_windowed
signal_source_info = "Filtered Signal"
else:
logger.info("No filtered data available, using original signal")
signal_source_info = "Original Signal"
else:
logger.info("Using original signal as requested")
signal_source_info = "Original Signal"
# Log selected signal characteristics
logger.info(f"Signal data shape: {signal_data.shape}")
logger.info(
f"Signal data range: {np.min(signal_data):.3f} to {np.max(signal_data):.3f}"
)
logger.info(f"Signal data mean: {np.mean(signal_data):.3f}")
logger.info(f"Signal source: {signal_source_info}")
# Auto-detect signal type if needed
if signal_type == "auto":
logger.info("Auto-detecting signal type...")
signal_type = detect_respiratory_signal_type(signal_data, sampling_freq)
logger.info(f"Auto-detected signal type: {signal_type}")
logger.info(f"Final signal type: {signal_type}")
logger.info(f"Estimation methods: {estimation_methods}")
logger.info(f"Advanced options: {advanced_options}")
logger.info(f"Preprocessing options: {preprocessing_options}")
# Create main respiratory signal plot with annotations
logger.info("Creating main respiratory signal plot with annotations...")
main_plot = create_respiratory_signal_plot(
signal_data,
time_axis,
sampling_freq,
signal_type,
estimation_methods,
preprocessing_options,
low_cut,
high_cut,
)
logger.info("Main respiratory signal plot created successfully")
# Generate comprehensive respiratory analysis results
logger.info("Generating comprehensive respiratory analysis results...")
analysis_results = generate_comprehensive_respiratory_analysis(
signal_data,
time_axis,
sampling_freq,
signal_type,
estimation_methods,
advanced_options,
preprocessing_options,
low_cut,
high_cut,
min_breath_duration,
max_breath_duration,
)
logger.info("Respiratory analysis results generated successfully")
# Create respiratory analysis plots
logger.info("Creating respiratory analysis plots...")
analysis_plots = create_comprehensive_respiratory_plots(
signal_data,
time_axis,
sampling_freq,
signal_type,
estimation_methods,
advanced_options,
preprocessing_options,
low_cut,
high_cut,
)
logger.info("Respiratory analysis plots created successfully")
# Store processed data
resp_data = {
"signal_data": signal_data.tolist(),
"time_axis": time_axis.tolist(),
"sampling_freq": sampling_freq,
"window": [start_time, end_time],
"signal_type": signal_type,
"estimation_methods": estimation_methods,
}
resp_features = {
"advanced_options": advanced_options,
"preprocessing_options": preprocessing_options,
"filter_params": {"low_cut": low_cut, "high_cut": high_cut},
"breath_constraints": {
"min_duration": min_breath_duration,
"max_duration": max_breath_duration,
},
}
logger.info("Respiratory analysis completed successfully")
return main_plot, analysis_results, analysis_plots, resp_data, resp_features
except Exception as e:
logger.error(f"Error in respiratory analysis callback: {e}")
import traceback
traceback.print_exc()
return (
create_empty_figure(),
f"Error in analysis: {str(e)}",
create_empty_figure(),
None,
None,
)
# Removed update_resp_time_inputs callback - no longer needed with start/duration approach
@app.callback(
[
Output("resp-start-position-slider", "min"),
Output("resp-start-position-slider", "max"),
Output("resp-start-position-slider", "value"),
],
[Input("url", "pathname")],
)
def update_resp_time_slider_range(pathname):
"""Update time slider range based on data duration."""
logger.info("=== UPDATE RESP TIME SLIDER RANGE ===")
logger.info(f"Pathname: {pathname}")
# Only run this when we're on the respiratory page
if pathname != "/respiratory":
return 0, 100, [0, 10]
try:
# Get data from the data service
from vitalDSP_webapp.services.data.enhanced_data_service import (
get_enhanced_data_service,
)
data_service = get_enhanced_data_service()
# Get the most recent data
all_data = data_service.get_all_data()
if not all_data:
return 0, 100, [0, 10]
latest_data_id = list(all_data.keys())[-1]
latest_data = all_data[latest_data_id]
# Get sampling frequency and calculate duration
sampling_freq = latest_data.get("info", {}).get("sampling_freq", 1000)
df = data_service.get_data(latest_data_id)
if df is None or df.empty:
return 0, 100, [0, 10]
duration = len(df) / sampling_freq
max_time = int(duration)
return 0, max_time, [0, min(10, max_time)]
except Exception as e:
logger.error(f"Error updating respiratory time slider range: {e}")
return 0, 100, [0, 10]