Source code for vitalDSP_webapp.callbacks.features.features_callbacks
"""
Feature engineering callbacks for vitalDSP webapp.
This module handles advanced feature engineering and signal processing features.
"""
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from dash import Input, Output, State, callback_context, no_update, html, dcc
from dash.exceptions import PreventUpdate
import dash_bootstrap_components as dbc
from scipy import signal
import logging
import traceback
logger = logging.getLogger(__name__)
[docs]
def register_features_callbacks(app):
"""Register all feature engineering callbacks."""
logger.info("=== REGISTERING FEATURE ENGINEERING CALLBACKS ===")
# Import vitalDSP modules
_import_vitaldsp_modules()
@app.callback(
[
Output("features-analysis-results", "children"),
Output("features-analysis-plots", "figure"),
Output("store-features-data", "data"),
Output("store-features-features", "data"),
],
[Input("features-analyze-btn", "n_clicks"), Input("url", "pathname")],
[
State("features-signal-type", "value"),
State("features-signal-source-select", "value"),
State("features-preprocessing", "value"),
State("features-categories", "value"),
State("features-advanced-options", "value"),
],
)
def features_analysis_callback(
n_clicks,
pathname,
signal_type,
signal_source,
preprocessing,
categories,
advanced_options,
):
"""Main callback for feature engineering analysis."""
logger.info("=== FEATURES ANALYSIS CALLBACK TRIGGERED ===")
# Only run this when we're on the features page
if pathname != "/features":
logger.info("Not on features page, returning empty figures")
return "Navigate to Features page", create_empty_figure(), None, None
# Check if data is available first
try:
from vitalDSP_webapp.services.data.enhanced_data_service import (
get_enhanced_data_service,
)
data_service = get_enhanced_data_service()
if data_service is None:
logger.error("Data service is None")
return (
"Data service not available. Please restart the application.",
create_empty_figure(),
None,
None,
)
all_data = data_service.get_all_data()
if not all_data:
logger.info("No data available")
return (
"No data available. Please upload and process data first.",
create_empty_figure(),
None,
None,
)
# Data is available, show instructions if no button click
if n_clicks is None:
logger.info("No button click - showing instructions")
return (
"Data is available! Click '🚀 Analyze Features' to start analysis.",
create_empty_figure(),
None,
None,
)
logger.info(f"Button clicked {n_clicks} times, starting analysis...")
# Get the most recent data entry
latest_data_id = list(all_data.keys())[-1]
latest_data = all_data[latest_data_id]
logger.info(f"Found data: {latest_data_id}")
# Get column mapping
column_mapping = data_service.get_column_mapping(latest_data_id)
if not column_mapping:
logger.warning(
"Data has not been processed yet - no column mapping found"
)
return (
"Please process your data on the Upload page first (configure column mapping)",
create_empty_figure(),
None,
None,
)
logger.info(f"Column mapping found: {column_mapping}")
df = data_service.get_data(latest_data_id)
if df is None or df.empty:
logger.warning("Data frame is empty")
return "Data is empty or corrupted.", create_empty_figure(), None, None
# Get sampling frequency from the data info
sampling_freq = latest_data.get("info", {}).get("sampling_freq", 1000)
logger.info(f"Sampling frequency: {sampling_freq}")
# Get the main signal column
signal_column = None
for col_type, col_name in column_mapping.items():
if col_type in ["ppg", "ecg", "signal"]:
signal_column = col_name
break
if signal_column is None:
# Fallback to first column
signal_column = df.columns[0]
logger.info(f"Using signal column: {signal_column}")
# Extract signal data
signal_data = np.array(df[signal_column])
# Always keep the original signal_data for dynamic filtering
original_signal_data = signal_data.copy()
selected_signal = signal_data
signal_source_info = "Original Signal"
filter_info = None
# Signal source loading logic
logger.info("=== SIGNAL SOURCE LOADING ===")
logger.info(f"Signal source selection: {signal_source}")
if signal_source == "filtered":
# Try to load filtered data from filtering screen
filtered_data = data_service.get_filtered_data(latest_data_id)
filter_info = data_service.get_filter_info(latest_data_id)
if filtered_data is not None:
logger.info(
f"Found filtered data with shape: {filtered_data.shape}"
)
selected_signal = filtered_data
signal_source_info = "Filtered Signal"
else:
logger.info("No filtered data available, using original signal")
else:
logger.info("Using original signal as requested")
# Log selected signal characteristics
logger.info(f"Selected signal shape: {selected_signal.shape}")
logger.info(
f"Selected signal range: {np.min(selected_signal):.3f} to {np.max(selected_signal):.3f}"
)
logger.info(f"Selected signal mean: {np.mean(selected_signal):.3f}")
logger.info(f"Signal source: {signal_source_info}")
# Use selected_signal for analysis
signal_data = selected_signal
# Set default values
signal_type = signal_type or "auto"
preprocessing = preprocessing or ["detrend", "normalize"]
categories = categories or ["statistical", "spectral"]
advanced_options = advanced_options or []
# Auto-detect signal type if needed
if signal_type == "auto":
signal_type = detect_signal_type(signal_data, sampling_freq)
logger.info(f"Auto-detected signal type: {signal_type}")
# Apply preprocessing
processed_signal = apply_preprocessing(
signal_data, preprocessing, sampling_freq, filter_info
)
# Extract features based on selected categories
features = extract_comprehensive_features(
processed_signal, sampling_freq, categories, advanced_options
)
# Extract additional signal quality and physiological features
signal_quality = _extract_signal_quality_metrics(
processed_signal, sampling_freq
)
physiological_features = _extract_physiological_features(
processed_signal, sampling_freq, signal_type
)
# Add to features dictionary
features["signal_quality"] = signal_quality
features["physiological"] = physiological_features
# Create comprehensive results display
results_display = create_comprehensive_features_display(
features, signal_type, categories
)
# Create analysis plots
logger.info(
f"Creating plots with signal data length: {len(processed_signal)}, categories: {categories}"
)
logger.info(f"Features keys: {list(features.keys())}")
analysis_plots = create_features_analysis_plots(
processed_signal, features, categories, sampling_freq
)
# Store data for other callbacks
features_data = {
"signal_data": processed_signal.tolist(),
"signal_type": signal_type,
"sampling_freq": sampling_freq,
"categories": categories,
"signal_quality": signal_quality,
"physiological_features": physiological_features,
}
features_features = {
"statistical": features.get("statistical", {}),
"spectral": features.get("spectral", {}),
"temporal": features.get("temporal", {}),
"morphological": features.get("morphological", {}),
"entropy": features.get("entropy", {}),
"fractal": features.get("fractal", {}),
"signal_quality": signal_quality,
"physiological": physiological_features,
}
logger.info("Feature analysis completed successfully")
return results_display, analysis_plots, features_data, features_features
except Exception as e:
logger.error(f"Error in feature analysis: {e}")
import traceback
logger.error(traceback.format_exc())
error_fig = create_empty_figure()
error_fig.add_annotation(
text=f"Error: {str(e)}",
xref="paper",
yref="paper",
x=0.5,
y=0.5,
showarrow=False,
)
error_results = html.Div(
[
html.H5("Error in Feature Analysis"),
html.P(f"Analysis failed: {str(e)}"),
html.P("Please check your data and parameters."),
]
)
return error_results, error_fig, None, None
# Helper functions for feature engineering
def _import_vitaldsp_modules():
"""Import vitalDSP modules when needed."""
try:
# Import vitalDSP modules directly from their modules since __init__.py files are empty
from vitalDSP.physiological_features.time_domain import TimeDomainFeatures
from vitalDSP.physiological_features.frequency_domain import (
FrequencyDomainFeatures,
)
from vitalDSP.physiological_features.hrv_analysis import HRVFeatures
from vitalDSP.physiological_features.beat_to_beat import BeatToBeatAnalysis
from vitalDSP.physiological_features.energy_analysis import EnergyAnalysis
from vitalDSP.physiological_features.envelope_detection import EnvelopeDetection
from vitalDSP.physiological_features.signal_segmentation import (
SignalSegmentation,
)
from vitalDSP.physiological_features.trend_analysis import TrendAnalysis
from vitalDSP.physiological_features.waveform import WaveformMorphology
from vitalDSP.physiological_features.nonlinear import NonlinearFeatures
from vitalDSP.physiological_features.cross_correlation import (
CrossCorrelationFeatures,
)
from vitalDSP.physiological_features.signal_power_analysis import (
SignalPowerAnalysis,
)
# Import transforms
from vitalDSP.transforms.wavelet_transform import WaveletTransform
from vitalDSP.transforms.fourier_transform import FourierTransform
from vitalDSP.transforms.hilbert_transform import HilbertTransform
from vitalDSP.transforms.stft import STFT
from vitalDSP.transforms.mfcc import MFCC
from vitalDSP.transforms.pca_ica_signal_decomposition import (
PCAICASignalDecomposition,
)
# Import advanced computation
from vitalDSP.advanced_computation.non_linear_analysis import NonlinearAnalysis
from vitalDSP.advanced_computation.anomaly_detection import AnomalyDetection
from vitalDSP.advanced_computation.bayesian_analysis import BayesianAnalysis
from vitalDSP.advanced_computation.kalman_filter import KalmanFilter
from vitalDSP.advanced_computation.harmonic_percussive_separation import (
HarmonicPercussiveSeparation,
)
from vitalDSP.advanced_computation.emd import EMD
# Import feature engineering
from vitalDSP.feature_engineering.morphology_features import MorphologyFeatures
from vitalDSP.feature_engineering.ppg_light_features import PPGLightFeatures
from vitalDSP.feature_engineering.ppg_autonomic_features import (
PPGAutonomicFeatures,
)
logger.info("Successfully imported vitalDSP modules")
return True
except ImportError as e:
logger.warning(
f"vitalDSP modules not available: {e}, using scipy/numpy fallback"
)
return False
[docs]
def create_empty_figure():
"""Create an empty figure for error handling."""
fig = go.Figure()
fig.add_annotation(
text="No data available",
xref="paper",
yref="paper",
x=0.5,
y=0.5,
showarrow=False,
)
fig.update_layout(
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
plot_bgcolor="white",
)
return fig
[docs]
def detect_signal_type(signal_data, sampling_freq):
"""Auto-detect the type of signal."""
try:
# Simple heuristics for signal type detection
mean_val = np.mean(signal_data)
std_val = np.std(signal_data)
# Find peaks for frequency analysis
peaks, _ = signal.find_peaks(
signal_data, height=mean_val + std_val, distance=int(sampling_freq * 0.3)
)
if len(peaks) > 1:
intervals = np.diff(peaks) / sampling_freq
if np.mean(intervals) < 1.0: # Less than 1 second between peaks
return "ecg" # Likely ECG (faster heart rate)
else:
return "ppg" # Likely PPG (slower, more variable)
else:
return "general" # Default to general if unclear
except Exception as e:
logger.warning(f"Error in signal type detection: {e}")
return "general" # Default fallback
[docs]
def apply_preprocessing(
signal_data, preprocessing_options, sampling_freq, filter_info=None
):
"""
Apply preprocessing to the signal.
PHASE B: Now supports both list format (legacy) and dict format with parameters.
Args:
signal_data: Signal data array
preprocessing_options: Either a list of option names (legacy) or dict of {operation: params}
sampling_freq: Sampling frequency in Hz
filter_info: Optional filter info from filtering screen
"""
processed_signal = signal_data.copy()
try:
# PHASE B: Handle both list and dict formats
if isinstance(preprocessing_options, dict):
# New dict format: {operation: {param: value, ...}}
options_iter = preprocessing_options.items()
else:
# Legacy list format: ["detrend", "normalize", ...]
options_iter = [(opt, {}) for opt in preprocessing_options]
for option, params in options_iter:
if option == "detrend":
# Use vitalDSP detrending when available
try:
from vitalDSP.preprocess.preprocess_operations import (
PreprocessConfig,
)
from vitalDSP.preprocess.preprocess_operations import (
PreprocessOperations,
)
preprocess_config = PreprocessConfig()
# PHASE B: Use user-specified detrend type
detrend_type = params.get("type", "linear")
preprocess_config.detrend_type = detrend_type
preprocess_ops = PreprocessOperations(
processed_signal, preprocess_config
)
processed_signal = preprocess_ops.apply_detrending()
logger.info(f"Applied vitalDSP detrending (type={detrend_type})")
except Exception as e:
logger.warning(f"vitalDSP detrending failed, using scipy: {e}")
# Fallback to scipy detrending
processed_signal = signal.detrend(processed_signal)
elif option == "normalize":
# Use vitalDSP normalization when available
try:
from vitalDSP.preprocess.preprocess_operations import (
PreprocessConfig,
)
from vitalDSP.preprocess.preprocess_operations import (
PreprocessOperations,
)
preprocess_config = PreprocessConfig()
# PHASE B: Use user-specified normalization type
normalization_type = params.get("type", "z_score")
preprocess_config.normalization_type = normalization_type
preprocess_ops = PreprocessOperations(
processed_signal, preprocess_config
)
processed_signal = preprocess_ops.apply_normalization()
logger.info(
f"Applied vitalDSP normalization (type={normalization_type})"
)
except Exception as e:
logger.warning(f"vitalDSP normalization failed, using basic: {e}")
# Fallback to basic normalization with safety checks
# Handle infinite values
processed_signal = np.where(
np.isfinite(processed_signal), processed_signal, 0
)
signal_mean = np.mean(processed_signal)
signal_std = np.std(processed_signal)
if signal_std > 0:
processed_signal = (processed_signal - signal_mean) / signal_std
else:
processed_signal = (
processed_signal - signal_mean
) # Just center if std is 0
elif option == "filter":
# Use filter parameters from filtering screen if available
if filter_info is not None:
logger.info("Using filter parameters from filtering screen")
parameters = filter_info.get("parameters", {})
filter_type = filter_info.get("filter_type", "traditional")
if filter_type == "traditional":
# Extract traditional filter parameters
filter_family = parameters.get("filter_family", "butter")
filter_response = parameters.get("filter_response", "bandpass")
low_freq = parameters.get("low_freq", 0.1)
high_freq = parameters.get("high_freq", 0.8)
filter_order = parameters.get("filter_order", 4)
# Apply the same filter as used in filtering screen
from vitalDSP_webapp.callbacks.analysis.signal_filtering_callbacks import (
apply_traditional_filter,
)
processed_signal = apply_traditional_filter(
processed_signal,
sampling_freq,
filter_family,
filter_response,
low_freq,
high_freq,
filter_order,
)
logger.info(
f"Applied traditional filter: {filter_family} {filter_response} {low_freq}-{high_freq} Hz"
)
else:
# For other filter types, use original signal
logger.info(
f"Using original signal for filter type: {filter_type}"
)
else:
# PHASE D: Enhanced user-specified filter parameters with advanced options
filter_family = params.get("family", "butterworth")
filter_type = params.get("type", "lowpass")
filter_order = params.get("order", 4)
filter_ripple = params.get("ripple", 0.5)
low_freq = params.get("low_freq", 0.5)
high_freq = params.get("high_freq", 50)
logger.info(
f"Applying {filter_family} {filter_type} filter: low={low_freq}, high={high_freq}, order={filter_order}"
)
nyquist = sampling_freq / 2
# Map filter family names
family_map = {
"butterworth": "butter",
"chebyshev1": "cheby1",
"chebyshev2": "cheby2",
"elliptic": "ellip",
"bessel": "bessel",
}
scipy_family = family_map.get(filter_family, "butter")
# Build filter based on type and family
try:
if filter_type in ["lowpass"]:
cutoff_normalized = min(high_freq / nyquist, 0.99)
if scipy_family in ["cheby1", "ellip"]:
b, a = getattr(signal, scipy_family)(
filter_order,
filter_ripple,
cutoff_normalized,
btype=filter_type,
)
elif scipy_family == "cheby2":
b, a = signal.cheby2(
filter_order,
filter_ripple,
cutoff_normalized,
btype=filter_type,
)
else:
b, a = getattr(signal, scipy_family)(
filter_order, cutoff_normalized, btype=filter_type
)
elif filter_type in ["highpass"]:
cutoff_normalized = max(low_freq / nyquist, 0.01)
if scipy_family in ["cheby1", "ellip"]:
b, a = getattr(signal, scipy_family)(
filter_order,
filter_ripple,
cutoff_normalized,
btype=filter_type,
)
elif scipy_family == "cheby2":
b, a = signal.cheby2(
filter_order,
filter_ripple,
cutoff_normalized,
btype=filter_type,
)
else:
b, a = getattr(signal, scipy_family)(
filter_order, cutoff_normalized, btype=filter_type
)
elif filter_type in ["bandpass", "bandstop"]:
low_normalized = max(low_freq / nyquist, 0.01)
high_normalized = min(high_freq / nyquist, 0.99)
if scipy_family in ["cheby1", "ellip"]:
b, a = getattr(signal, scipy_family)(
filter_order,
filter_ripple,
[low_normalized, high_normalized],
btype=filter_type,
)
elif scipy_family == "cheby2":
b, a = signal.cheby2(
filter_order,
filter_ripple,
[low_normalized, high_normalized],
btype=filter_type,
)
else:
b, a = getattr(signal, scipy_family)(
filter_order,
[low_normalized, high_normalized],
btype=filter_type,
)
else:
# Default to butterworth lowpass
cutoff_normalized = min(high_freq / nyquist, 0.99)
b, a = signal.butter(
filter_order, cutoff_normalized, btype="low"
)
processed_signal = signal.filtfilt(b, a, processed_signal)
logger.info(f"Filter applied successfully")
except Exception as e:
logger.warning(
f"Advanced filter failed: {e}, falling back to butterworth"
)
# Fallback to simple butterworth
if filter_type in ["lowpass"]:
cutoff_normalized = min(high_freq / nyquist, 0.99)
b, a = signal.butter(
filter_order, cutoff_normalized, btype=filter_type
)
elif filter_type in ["highpass"]:
cutoff_normalized = max(low_freq / nyquist, 0.01)
b, a = signal.butter(
filter_order, cutoff_normalized, btype=filter_type
)
else:
b, a = signal.butter(
filter_order,
[
max(low_freq / nyquist, 0.01),
min(high_freq / nyquist, 0.99),
],
btype=filter_type,
)
processed_signal = signal.filtfilt(b, a, processed_signal)
elif option == "outlier_removal":
# Use vitalDSP outlier removal when available
try:
from vitalDSP.preprocess.preprocess_operations import (
PreprocessConfig,
)
from vitalDSP.preprocess.preprocess_operations import (
PreprocessOperations,
)
preprocess_config = PreprocessConfig()
# PHASE B: Use user-specified outlier removal parameters
outlier_method = params.get("method", "iqr")
outlier_threshold = params.get("threshold", 1.5)
preprocess_config.outlier_method = outlier_method
preprocess_config.outlier_threshold = outlier_threshold
preprocess_ops = PreprocessOperations(
processed_signal, preprocess_config
)
processed_signal = preprocess_ops.apply_outlier_removal()
logger.info(
f"Applied vitalDSP outlier removal (method={outlier_method}, threshold={outlier_threshold})"
)
except Exception as e:
logger.warning(
f"vitalDSP outlier removal failed, using basic IQR: {e}"
)
# Fallback to basic IQR method
q1 = np.percentile(processed_signal, 25)
q3 = np.percentile(processed_signal, 75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
outlier_mask = (processed_signal >= lower_bound) & (
processed_signal <= upper_bound
)
if (
np.sum(outlier_mask) > len(processed_signal) * 0.5
): # Only if we don't lose too much data
processed_signal = processed_signal[outlier_mask]
elif option == "smoothing":
# Apply moving average smoothing using vitalDSP when available
try:
# Try to use vitalDSP smoothing if available
from vitalDSP.preprocess.preprocess_operations import (
PreprocessConfig,
)
from vitalDSP.preprocess.preprocess_operations import (
PreprocessOperations,
)
preprocess_config = PreprocessConfig()
# PHASE B: Use user-specified smoothing parameters
smoothing_method = params.get("method", "moving_average")
smoothing_window = params.get("window", 5)
# Map method names to vitalDSP types
method_map = {
"moving_average": "moving_average",
"savgol": "savgol",
"gaussian": "gaussian",
}
preprocess_config.smoothing_type = method_map.get(
smoothing_method, "savgol"
)
# Ensure window is odd and within valid range
smoothing_window = (
min(smoothing_window, len(processed_signal) // 10)
if len(processed_signal) >= 10
else 3
)
if smoothing_window % 2 == 0:
smoothing_window += 1
preprocess_config.smoothing_window = smoothing_window
preprocess_ops = PreprocessOperations(
processed_signal, preprocess_config
)
processed_signal = preprocess_ops.apply_smoothing()
logger.info(
f"Applied vitalDSP smoothing (method={smoothing_method}, window={smoothing_window})"
)
except Exception as e:
logger.warning(f"vitalDSP smoothing failed, using scipy: {e}")
# Fallback to scipy smoothing
window_size = min(21, len(processed_signal) // 10)
if window_size % 2 == 0:
window_size += 1
processed_signal = signal.savgol_filter(
processed_signal, window_size, 3
)
elif option == "baseline_correction":
# Apply baseline correction using vitalDSP when available
try:
from vitalDSP.preprocess.preprocess_operations import (
PreprocessConfig,
)
from vitalDSP.preprocess.preprocess_operations import (
PreprocessOperations,
)
preprocess_config = PreprocessConfig()
preprocess_config.baseline_method = "polynomial"
preprocess_config.baseline_order = 3
preprocess_ops = PreprocessOperations(
processed_signal, preprocess_config
)
processed_signal = preprocess_ops.apply_baseline_correction()
logger.info("Applied vitalDSP baseline correction")
except Exception as e:
logger.warning(
f"vitalDSP baseline correction failed, using scipy: {e}"
)
# Fallback to scipy detrending
processed_signal = signal.detrend(processed_signal)
elif option == "noise_reduction":
# Apply noise reduction using vitalDSP when available
try:
from vitalDSP.preprocess.preprocess_operations import (
PreprocessConfig,
)
from vitalDSP.preprocess.preprocess_operations import (
PreprocessOperations,
)
preprocess_config = PreprocessConfig()
preprocess_config.noise_reduction_method = "wavelet"
preprocess_config.noise_threshold = 0.1
preprocess_ops = PreprocessOperations(
processed_signal, preprocess_config
)
processed_signal = preprocess_ops.apply_noise_reduction()
logger.info("Applied vitalDSP noise reduction")
except Exception as e:
logger.warning(
f"vitalDSP noise reduction failed, using basic filtering: {e}"
)
# Fallback to basic low-pass filtering
nyquist = sampling_freq / 2
cutoff = min(nyquist * 0.8, 50)
b, a = signal.butter(4, cutoff / nyquist, btype="low")
processed_signal = signal.filtfilt(b, a, processed_signal)
elif option == "artifact_removal":
# Apply artifact removal using vitalDSP when available
try:
from vitalDSP.preprocess.preprocess_operations import (
PreprocessConfig,
)
from vitalDSP.preprocess.preprocess_operations import (
PreprocessOperations,
)
preprocess_config = PreprocessConfig()
preprocess_config.artifact_method = "statistical"
preprocess_config.artifact_threshold = 3.0 # 3 standard deviations
preprocess_ops = PreprocessOperations(
processed_signal, preprocess_config
)
processed_signal = preprocess_ops.apply_artifact_removal()
logger.info("Applied vitalDSP artifact removal")
except Exception as e:
logger.warning(
f"vitalDSP artifact removal failed, using basic outlier removal: {e}"
)
# Fallback to basic outlier removal
q1 = np.percentile(processed_signal, 25)
q3 = np.percentile(processed_signal, 75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
outlier_mask = (processed_signal >= lower_bound) & (
processed_signal <= upper_bound
)
if np.sum(outlier_mask) > len(processed_signal) * 0.5:
processed_signal = processed_signal[outlier_mask]
return processed_signal
except Exception as e:
logger.warning(f"Error in preprocessing: {e}")
return signal_data # Return original if preprocessing fails
[docs]
def extract_comprehensive_features(
signal_data, sampling_freq, categories, advanced_options
):
"""Extract comprehensive features from the signal using vitalDSP modules."""
features = {}
try:
logger.info(f"Extracting features for categories: {categories}")
logger.info(
f"Signal data length: {len(signal_data)}, sampling_freq: {sampling_freq}"
)
# Check if vitalDSP modules are available
vitaldsp_available = _import_vitaldsp_modules()
# Statistical Features
if "statistical" in categories:
features["statistical"] = extract_statistical_features(
signal_data, vitaldsp_available
)
# Spectral Features
if "spectral" in categories:
features["spectral"] = extract_spectral_features(
signal_data, sampling_freq, vitaldsp_available
)
# Temporal Features
if "temporal" in categories:
features["temporal"] = extract_temporal_features(
signal_data, sampling_freq, vitaldsp_available
)
# Morphological Features
if "morphological" in categories:
features["morphological"] = extract_morphological_features(
signal_data, sampling_freq, vitaldsp_available
)
# Entropy Features
if "entropy" in categories:
features["entropy"] = extract_entropy_features(
signal_data, vitaldsp_available
)
# Fractal Features
if "fractal" in categories:
features["fractal"] = extract_fractal_features(
signal_data, vitaldsp_available
)
# Advanced Features
if advanced_options:
features["advanced"] = extract_advanced_features(
signal_data, sampling_freq, advanced_options, vitaldsp_available
)
logger.info(f"Extracted features: {list(features.keys())}")
for key, value in features.items():
if isinstance(value, dict) and "error" in value:
logger.warning(f"Error in {key}: {value['error']}")
return features
except Exception as e:
logger.error(f"Error in feature extraction: {e}")
return {"error": f"Feature extraction failed: {str(e)}"}
[docs]
def extract_statistical_features(signal_data, vitaldsp_available=False):
"""Extract statistical features using vitalDSP when available."""
try:
if vitaldsp_available:
try:
# Use vitalDSP TimeDomainFeatures for enhanced statistical analysis
from vitalDSP.physiological_features.time_domain import (
TimeDomainFeatures,
)
# Create TimeDomainFeatures object
td_features = TimeDomainFeatures(signal_data)
# Extract comprehensive statistical features
basic_stats = {
"mean": np.mean(signal_data),
"std": np.std(signal_data),
"variance": np.var(signal_data),
"rms": np.sqrt(np.mean(signal_data**2)),
"peak_to_peak": np.max(signal_data) - np.min(signal_data),
}
# Add vitalDSP enhanced features
enhanced_stats = {
"skewness": _calculate_skewness(signal_data),
"kurtosis": _calculate_kurtosis(signal_data),
"crest_factor": (
np.max(np.abs(signal_data)) / np.sqrt(np.mean(signal_data**2))
if np.mean(signal_data**2) > 0
else 0
),
"shape_factor": (
np.sqrt(np.mean(signal_data**2)) / np.mean(np.abs(signal_data))
if np.mean(np.abs(signal_data)) > 0
else 0
),
"impulse_factor": (
np.max(np.abs(signal_data)) / np.mean(np.abs(signal_data))
if np.mean(np.abs(signal_data)) > 0
else 0
),
}
# Try to get additional vitalDSP statistical features
try:
# Get signal quality metrics
if hasattr(td_features, "get_signal_quality"):
quality_metrics = td_features.get_signal_quality()
enhanced_stats.update(quality_metrics)
# Get additional statistical measures
if hasattr(td_features, "get_statistical_measures"):
vitaldsp_stats = td_features.get_statistical_measures()
enhanced_stats.update(vitaldsp_stats)
except Exception as e:
logger.warning(
f"Could not get additional vitalDSP statistical features: {e}"
)
# Combine basic and enhanced features
return {**basic_stats, **enhanced_stats}
except Exception as e:
logger.warning(
f"vitalDSP statistical features failed, using basic: {e}"
)
# Fall back to basic implementation
pass
# Basic statistical features (fallback)
return {
"mean": np.mean(signal_data),
"std": np.std(signal_data),
"variance": np.var(signal_data),
"skewness": _calculate_skewness(signal_data),
"kurtosis": _calculate_kurtosis(signal_data),
"rms": np.sqrt(np.mean(signal_data**2)),
"peak_to_peak": np.max(signal_data) - np.min(signal_data),
"crest_factor": (
np.max(np.abs(signal_data)) / np.sqrt(np.mean(signal_data**2))
if np.mean(signal_data**2) > 0
else 0
),
"shape_factor": (
np.sqrt(np.mean(signal_data**2)) / np.mean(np.abs(signal_data))
if np.mean(np.abs(signal_data)) > 0
else 0
),
"impulse_factor": (
np.max(np.abs(signal_data)) / np.mean(np.abs(signal_data))
if np.mean(np.abs(signal_data)) > 0
else 0
),
}
except Exception as e:
logger.error(f"Error in statistical features: {e}")
return {"error": f"Statistical features failed: {str(e)}"}
[docs]
def extract_spectral_features(signal_data, sampling_freq, vitaldsp_available=False):
"""Extract spectral features using vitalDSP when available."""
try:
if vitaldsp_available:
try:
# Use vitalDSP FourierTransform
from vitalDSP.transforms.fourier_transform import FourierTransform
ft = FourierTransform(signal_data)
fft_result = ft.compute_dft()
# Calculate frequencies manually
fft_freq = np.fft.fftfreq(len(signal_data), 1 / sampling_freq)
fft_magnitude = np.abs(fft_result)
# Get positive frequencies
positive_mask = fft_freq > 0
fft_freq = fft_freq[positive_mask]
fft_magnitude = fft_magnitude[positive_mask]
# Try to use vitalDSP STFT for additional features
try:
from vitalDSP.transforms.stft import STFT
stft_obj = STFT(signal_data, sampling_freq)
stft_result = stft_obj.compute_stft()
stft_available = True
stft_shape = (
stft_result.shape if hasattr(stft_result, "shape") else "N/A"
)
except Exception as e:
logger.warning(f"STFT failed: {e}")
stft_available = False
stft_shape = "N/A"
# Calculate advanced spectral features
spectral_centroid = (
np.sum(fft_freq * fft_magnitude) / np.sum(fft_magnitude)
if np.sum(fft_magnitude) > 0
else 0
)
spectral_bandwidth = (
np.sqrt(
np.sum(((fft_freq - spectral_centroid) ** 2) * fft_magnitude)
/ np.sum(fft_magnitude)
)
if np.sum(fft_magnitude) > 0
else 0
)
spectral_rolloff = _calculate_spectral_rolloff(fft_freq, fft_magnitude)
return {
"spectral_centroid": spectral_centroid,
"spectral_bandwidth": spectral_bandwidth,
"spectral_rolloff": spectral_rolloff,
"dominant_frequency": (
fft_freq[np.argmax(fft_magnitude)]
if len(fft_magnitude) > 0
else 0
),
"total_energy": np.sum(fft_magnitude**2),
"frequency_range": (
f"{fft_freq[0]:.2f} - {fft_freq[-1]:.2f} Hz"
if len(fft_freq) > 0
else "N/A"
),
"stft_available": stft_available,
"stft_shape": stft_shape,
}
except Exception as e:
logger.warning(
f"vitalDSP spectral features failed, falling back to scipy: {e}"
)
vitaldsp_available = False
# Fallback to scipy implementation
if not vitaldsp_available:
# Calculate FFT
fft_result = np.fft.fft(signal_data)
fft_freq = np.fft.fftfreq(len(signal_data), 1 / sampling_freq)
# Get positive frequencies
positive_mask = fft_freq > 0
fft_freq = fft_freq[positive_mask]
fft_magnitude = np.abs(fft_result[positive_mask])
# Calculate spectral features
spectral_centroid = (
np.sum(fft_freq * fft_magnitude) / np.sum(fft_magnitude)
if np.sum(fft_magnitude) > 0
else 0
)
spectral_bandwidth = (
np.sqrt(
np.sum(((fft_freq - spectral_centroid) ** 2) * fft_magnitude)
/ np.sum(fft_magnitude)
)
if np.sum(fft_magnitude) > 0
else 0
)
spectral_rolloff = _calculate_spectral_rolloff(fft_freq, fft_magnitude)
return {
"spectral_centroid": spectral_centroid,
"spectral_bandwidth": spectral_bandwidth,
"spectral_rolloff": spectral_rolloff,
"dominant_frequency": (
fft_freq[np.argmax(fft_magnitude)] if len(fft_magnitude) > 0 else 0
),
"total_energy": np.sum(fft_magnitude**2),
"frequency_range": (
f"{fft_freq[0]:.2f} - {fft_freq[-1]:.2f} Hz"
if len(fft_freq) > 0
else "N/A"
),
"stft_available": False,
}
except Exception as e:
logger.error(f"Error in spectral features: {e}")
return {"error": f"Spectral features failed: {str(e)}"}
[docs]
def extract_temporal_features(signal_data, sampling_freq, vitaldsp_available=False):
"""Extract temporal features using vitalDSP when available."""
try:
if vitaldsp_available:
try:
# Use vitalDSP PeakDetection for robust peak detection
from vitalDSP.utils.signal_processing.peak_detection import (
PeakDetection,
)
logger.info("[vitalDSP] Using PeakDetection for temporal features")
# Use threshold-based peak detection with appropriate parameters
peak_detector = PeakDetection(
signal_data,
method="threshold",
height=np.mean(signal_data) + np.std(signal_data),
distance=int(sampling_freq * 0.3),
)
peaks = peak_detector.detect_peaks()
temporal_features = {
"signal_duration": len(signal_data) / sampling_freq,
"sampling_frequency": sampling_freq,
"num_samples": len(signal_data),
"vitaldsp_used": True,
}
if len(peaks) > 1:
# Convert to NN intervals for HRV analysis
intervals = (
np.diff(peaks) / sampling_freq * 1000
) # Convert to milliseconds
temporal_features.update(
{
"num_peaks": len(peaks),
"mean_interval": np.mean(intervals)
/ 1000, # Convert back to seconds
"std_interval": np.std(intervals) / 1000,
"min_interval": np.min(intervals) / 1000,
"max_interval": np.max(intervals) / 1000,
}
)
return temporal_features
except Exception as e:
logger.warning(
f"vitalDSP temporal features failed, falling back to scipy: {e}"
)
vitaldsp_available = False
# Fallback to scipy implementation
if not vitaldsp_available:
logger.info("[scipy fallback] Using scipy.signal for temporal features")
# Find peaks
peaks, _ = signal.find_peaks(
signal_data,
height=np.mean(signal_data) + np.std(signal_data),
distance=int(sampling_freq * 0.3),
)
temporal_features = {
"signal_duration": len(signal_data) / sampling_freq,
"sampling_frequency": sampling_freq,
"num_samples": len(signal_data),
"vitaldsp_used": False,
}
if len(peaks) > 1:
intervals = np.diff(peaks) / sampling_freq
temporal_features.update(
{
"num_peaks": len(peaks),
"mean_interval": np.mean(intervals),
"std_interval": np.std(intervals),
"min_interval": np.min(intervals),
"max_interval": np.max(intervals),
}
)
return temporal_features
except Exception as e:
logger.error(f"Error in temporal features: {e}")
return {"error": f"Temporal features failed: {str(e)}"}
[docs]
def extract_morphological_features(
signal_data, sampling_freq, vitaldsp_available=False
):
"""Extract morphological features using vitalDSP when available."""
try:
if vitaldsp_available:
try:
# Use vitalDSP PeakDetection for robust peak and valley detection
from vitalDSP.utils.signal_processing.peak_detection import (
PeakDetection,
)
logger.info("[vitalDSP] Using PeakDetection for morphological features")
# Find peaks using vitalDSP
peak_detector = PeakDetection(
signal_data,
method="threshold",
height=np.mean(signal_data) + np.std(signal_data),
distance=int(sampling_freq * 0.3),
)
peaks = peak_detector.detect_peaks()
# Find valleys by inverting the signal
valley_detector = PeakDetection(
-signal_data,
method="threshold",
height=np.mean(-signal_data) + np.std(-signal_data),
distance=int(sampling_freq * 0.3),
)
valleys = valley_detector.detect_peaks()
morphological_features = {
"num_peaks": len(peaks),
"num_valleys": len(valleys),
"peak_heights": (
signal_data[peaks].tolist() if len(peaks) > 0 else []
),
"valley_heights": (
signal_data[valleys].tolist() if len(valleys) > 0 else []
),
"vitaldsp_used": True,
}
if len(peaks) > 0:
morphological_features.update(
{
"mean_peak_height": np.mean(signal_data[peaks]),
"std_peak_height": np.std(signal_data[peaks]),
"max_peak_height": np.max(signal_data[peaks]),
"min_peak_height": np.min(signal_data[peaks]),
}
)
if len(valleys) > 0:
morphological_features.update(
{
"mean_valley_height": np.mean(signal_data[valleys]),
"std_valley_height": np.std(signal_data[valleys]),
"max_valley_height": np.max(signal_data[valleys]),
"min_valley_height": np.min(signal_data[valleys]),
}
)
return morphological_features
except Exception as e:
logger.warning(
f"vitalDSP morphological features failed, falling back to scipy: {e}"
)
vitaldsp_available = False
# Fallback to scipy implementation
if not vitaldsp_available:
logger.info(
"[scipy fallback] Using scipy.signal for morphological features"
)
# Find peaks and valleys
peaks, peak_properties = signal.find_peaks(
signal_data,
height=np.mean(signal_data) + np.std(signal_data),
distance=int(sampling_freq * 0.3),
)
valleys, valley_properties = signal.find_peaks(
-signal_data,
height=np.mean(-signal_data) + np.std(-signal_data),
distance=int(sampling_freq * 0.3),
)
morphological_features = {
"num_peaks": len(peaks),
"num_valleys": len(valleys),
"peak_heights": signal_data[peaks].tolist() if len(peaks) > 0 else [],
"valley_heights": (
signal_data[valleys].tolist() if len(valleys) > 0 else []
),
"vitaldsp_used": False,
}
if len(peaks) > 0:
morphological_features.update(
{
"mean_peak_height": np.mean(signal_data[peaks]),
"std_peak_height": np.std(signal_data[peaks]),
"max_peak_height": np.max(signal_data[peaks]),
"min_peak_height": np.min(signal_data[peaks]),
}
)
return morphological_features
except Exception as e:
logger.error(f"Error in morphological features: {e}")
return {"error": f"Morphological features failed: {str(e)}"}
[docs]
def extract_entropy_features(signal_data, vitaldsp_available=False):
"""Extract entropy-based features using vitalDSP when available."""
try:
if vitaldsp_available:
try:
# Use vitalDSP NonlinearFeatures
from vitalDSP.physiological_features.nonlinear import NonlinearFeatures
nf = NonlinearFeatures(signal_data)
# Extract comprehensive entropy features
sample_entropy = nf.compute_sample_entropy()
approximate_entropy = nf.compute_approximate_entropy()
# Try to get additional features
try:
fractal_dim = nf.compute_fractal_dimension()
except Exception:
fractal_dim = None
try:
lyapunov = nf.compute_lyapunov_exponent()
except Exception:
lyapunov = None
try:
dfa_result = nf.compute_dfa()
except Exception:
dfa_result = None
try:
poincare_features = nf.compute_poincare_features()
except Exception:
poincare_features = None
return {
"sample_entropy": sample_entropy,
"approximate_entropy": approximate_entropy,
"fractal_dimension": fractal_dim,
"lyapunov_exponent": lyapunov,
"dfa_result": dfa_result,
"poincare_features": poincare_features,
"vitaldsp_used": True,
}
except Exception as e:
logger.warning(
f"vitalDSP entropy features failed, falling back to scipy: {e}"
)
vitaldsp_available = False
# Fallback to simplified entropy calculation
if not vitaldsp_available:
return {
"sample_entropy": np.log(np.std(signal_data) + 1e-10),
"approximate_entropy": np.log(np.std(signal_data) + 1e-10),
"permutation_entropy": np.log(np.std(signal_data) + 1e-10),
"vitaldsp_used": False,
}
except Exception as e:
logger.error(f"Error in entropy features: {e}")
return {"error": f"Entropy features failed: {str(e)}"}
[docs]
def extract_fractal_features(signal_data, vitaldsp_available=False):
"""Extract fractal features using vitalDSP when available."""
try:
if vitaldsp_available:
try:
# Use vitalDSP NonlinearFeatures for fractal analysis
from vitalDSP.physiological_features.nonlinear import NonlinearFeatures
nf = NonlinearFeatures(signal_data)
# Get fractal dimension
try:
fractal_dim = nf.compute_fractal_dimension()
except Exception:
fractal_dim = None
# Get DFA result
try:
dfa_result = nf.compute_dfa()
except Exception:
dfa_result = None
return {
"higuchi_fractal_dimension": fractal_dim,
"box_counting_dimension": fractal_dim, # Use same value for now
"dfa_result": dfa_result,
"vitaldsp_used": True,
}
except Exception as e:
logger.warning(
f"vitalDSP fractal features failed, falling back to scipy: {e}"
)
vitaldsp_available = False
# Fallback to simplified fractal dimension calculation
if not vitaldsp_available:
return {
"higuchi_fractal_dimension": 1.0,
"box_counting_dimension": 1.0,
"vitaldsp_used": False,
}
except Exception as e:
logger.error(f"Error in fractal features: {e}")
return {"error": f"Fractal features failed: {str(e)}"}
[docs]
def extract_advanced_features(
signal_data, sampling_freq, advanced_options, vitaldsp_available=False
):
"""Extract advanced features based on selected options using vitalDSP when available."""
advanced_features = {}
try:
for option in advanced_options:
if option == "cross_correlation":
if vitaldsp_available:
try:
# Use vitalDSP CrossCorrelationFeatures
from vitalDSP.physiological_features.cross_correlation import (
CrossCorrelationFeatures,
)
cc = CrossCorrelationFeatures(
signal_data, signal_data, sampling_freq
)
cross_corr, lag = cc.compute_cross_correlation()
advanced_features["cross_correlation"] = {
"cross_correlation": (
cross_corr.tolist()
if hasattr(cross_corr, "tolist")
else cross_corr
),
"max_lag": lag,
"correlation_strength": (
np.max(np.abs(cross_corr))
if hasattr(cross_corr, "__len__")
else 0
),
"vitaldsp_used": True,
}
except Exception as e:
logger.warning(
f"vitalDSP cross-correlation failed, falling back to scipy: {e}"
)
# Fallback to scipy
autocorr = np.correlate(signal_data, signal_data, mode="full")
autocorr = autocorr[len(autocorr) // 2 :]
advanced_features["cross_correlation"] = {
"max_lag": (
np.argmax(autocorr[1:]) + 1 if len(autocorr) > 1 else 0
),
"correlation_strength": (
np.max(autocorr[1:]) / autocorr[0]
if autocorr[0] > 0
else 0
),
"vitaldsp_used": False,
}
else:
# Use scipy
autocorr = np.correlate(signal_data, signal_data, mode="full")
autocorr = autocorr[len(autocorr) // 2 :]
advanced_features["cross_correlation"] = {
"max_lag": (
np.argmax(autocorr[1:]) + 1 if len(autocorr) > 1 else 0
),
"correlation_strength": (
np.max(autocorr[1:]) / autocorr[0]
if autocorr[0] > 0
else 0
),
"vitaldsp_used": False,
}
elif option == "phase_analysis":
if vitaldsp_available:
try:
# Use vitalDSP HilbertTransform
from vitalDSP.transforms.hilbert_transform import (
HilbertTransform,
)
ht = HilbertTransform(signal_data)
analytic_signal = ht.compute_hilbert()
phase = ht.instantaneous_phase()
advanced_features["phase_analysis"] = {
"phase_range": np.max(phase) - np.min(phase),
"phase_std": np.std(phase),
"analytic_signal": (
analytic_signal.tolist()
if hasattr(analytic_signal, "tolist")
else analytic_signal
),
"vitaldsp_used": True,
}
except Exception as e:
logger.warning(
f"vitalDSP phase analysis failed, falling back to scipy: {e}"
)
# Fallback to scipy
analytic_signal = signal.hilbert(signal_data)
phase = np.unwrap(np.angle(analytic_signal))
advanced_features["phase_analysis"] = {
"phase_range": np.max(phase) - np.min(phase),
"phase_std": np.std(phase),
"vitaldsp_used": False,
}
else:
# Use scipy
analytic_signal = signal.hilbert(signal_data)
phase = np.unwrap(np.angle(analytic_signal))
advanced_features["phase_analysis"] = {
"phase_range": np.max(phase) - np.min(phase),
"phase_std": np.std(phase),
"vitaldsp_used": False,
}
elif option == "wavelet":
if vitaldsp_available:
try:
# Use vitalDSP WaveletTransform
from vitalDSP.transforms.wavelet_transform import (
WaveletTransform,
)
wt = WaveletTransform(signal_data, wavelet_name="haar")
wavelet_coeffs = wt.perform_wavelet_transform()
advanced_features["wavelet"] = {
"wavelet_coefficients": wavelet_coeffs,
"wavelet_name": "haar",
"vitaldsp_used": True,
}
except Exception as e:
logger.warning(f"vitalDSP wavelet transform failed: {e}")
advanced_features["wavelet"] = {
"error": f"Wavelet transform failed: {str(e)}"
}
elif option == "ml_features":
if vitaldsp_available:
try:
# Use vitalDSP advanced computation features
from vitalDSP.advanced_computation.anomaly_detection import (
AnomalyDetection,
)
ad = AnomalyDetection(signal_data)
anomalies = ad.detect_anomalies()
advanced_features["ml_features"] = {
"anomalies_detected": (
len(anomalies) if hasattr(anomalies, "__len__") else 0
),
"anomaly_indices": (
anomalies.tolist()
if hasattr(anomalies, "tolist")
else anomalies
),
"vitaldsp_used": True,
}
except Exception as e:
logger.warning(f"vitalDSP ML features failed: {e}")
advanced_features["ml_features"] = {
"error": f"ML features failed: {str(e)}"
}
return advanced_features
except Exception as e:
logger.error(f"Error in advanced features: {e}")
return {"error": f"Advanced features failed: {str(e)}"}
[docs]
def create_comprehensive_features_display(features, signal_type, categories):
"""Create comprehensive features display with enhanced insights."""
try:
sections = []
# Header with signal type and vitalDSP usage info
vitaldsp_used = any(
features.get(cat, {}).get("vitaldsp_used", False)
for cat in features.keys()
if isinstance(features.get(cat), dict)
)
header_section = html.Div(
[
html.H4(
"🚀 Advanced Feature Analysis Results",
className="text-primary mb-3",
),
html.Div(
[
html.Span(
f"Signal Type: {signal_type.upper()}",
className="badge bg-primary me-2",
),
html.Span(
f"vitalDSP: {'✅ Used' if vitaldsp_used else '⚠️ Fallback'}",
className=f"badge {'bg-success' if vitaldsp_used else 'bg-warning'} me-2",
),
html.Span(
f"Categories: {len(categories)}", className="badge bg-info"
),
],
className="mb-3",
),
]
)
sections.append(header_section)
# Statistical Features
if "statistical" in categories and "error" not in features.get(
"statistical", {}
):
stats = features["statistical"]
stats_section = html.Div(
[
html.H5("📊 Statistical Features", className="text-success"),
html.Div(
[
html.Div(
[
html.H6(
"Basic Statistics", className="text-primary"
),
html.P(
f"Mean: {stats.get('mean', 0):.4f}",
className="mb-1",
),
html.P(
f"Standard Deviation: {stats.get('std', 0):.4f}",
className="mb-1",
),
html.P(
f"Variance: {stats.get('variance', 0):.4f}",
className="mb-1",
),
html.P(
f"RMS: {stats.get('rms', 0):.4f}",
className="mb-1",
),
],
className="col-md-6",
),
html.Div(
[
html.H6(
"Shape Statistics", className="text-primary"
),
html.P(
f"Skewness: {stats.get('skewness', 0):.4f}",
className="mb-1",
),
html.P(
f"Kurtosis: {stats.get('kurtosis', 0):.4f}",
className="mb-1",
),
html.P(
f"Peak-to-Peak: {stats.get('peak_to_peak', 0):.4f}",
className="mb-1",
),
html.P(
f"Crest Factor: {stats.get('crest_factor', 0):.4f}",
className="mb-1",
),
],
className="col-md-6",
),
],
className="row",
),
],
className="mb-4",
)
sections.append(stats_section)
# Spectral Features
if "spectral" in categories and "error" not in features.get("spectral", {}):
spec = features["spectral"]
spec_section = html.Div(
[
html.H5("🌊 Spectral Features", className="text-info"),
html.Div(
[
html.Div(
[
html.H6(
"Frequency Analysis", className="text-primary"
),
html.P(
f"Spectral Centroid: {spec.get('spectral_centroid', 0):.2f} Hz",
className="mb-1",
),
html.P(
f"Spectral Bandwidth: {spec.get('spectral_bandwidth', 0):.2f} Hz",
className="mb-1",
),
html.P(
f"Dominant Frequency: {spec.get('dominant_frequency', 0):.2f} Hz",
className="mb-1",
),
],
className="col-md-6",
),
html.Div(
[
html.H6(
"Energy Analysis", className="text-primary"
),
html.P(
f"Total Energy: {spec.get('total_energy', 0):.2f}",
className="mb-1",
),
html.P(
f"Frequency Range: {spec.get('frequency_range', 'N/A')}",
className="mb-1",
),
html.P(
f"STFT Available: {'✅' if spec.get('stft_available', False) else '❌'}",
className="mb-1",
),
],
className="col-md-6",
),
],
className="row",
),
],
className="mb-4",
)
sections.append(spec_section)
# Temporal Features
if "temporal" in categories and "error" not in features.get("temporal", {}):
temp = features["temporal"]
temp_section = html.Div(
[
html.H5("⏱️ Temporal Features", className="text-warning"),
html.Div(
[
html.Div(
[
html.H6(
"Signal Properties", className="text-primary"
),
html.P(
f"Signal Duration: {temp.get('signal_duration', 0):.2f} seconds",
className="mb-1",
),
html.P(
f"Sampling Frequency: {temp.get('sampling_frequency', 0)} Hz",
className="mb-1",
),
html.P(
f"Number of Samples: {temp.get('num_samples', 0)}",
className="mb-1",
),
],
className="col-md-6",
),
html.Div(
[
(
html.H6(
"Peak Analysis", className="text-primary"
)
if "num_peaks" in temp
else html.Div()
),
(
html.P(
f"Number of Peaks: {temp.get('num_peaks', 0)}",
className="mb-1",
)
if "num_peaks" in temp
else html.Div()
),
(
html.P(
f"Mean Interval: {temp.get('mean_interval', 0):.3f} seconds",
className="mb-1",
)
if "mean_interval" in temp
else html.Div()
),
(
html.P(
f"Interval Std: {temp.get('std_interval', 0):.3f} seconds",
className="mb-1",
)
if "std_interval" in temp
else html.Div()
),
],
className="col-md-6",
),
],
className="row",
),
],
className="mb-4",
)
sections.append(temp_section)
# Morphological Features
if "morphological" in categories and "error" not in features.get(
"morphological", {}
):
morph = features["morphological"]
morph_section = html.Div(
[
html.H5("🔍 Morphological Features", className="text-danger"),
html.Div(
[
html.Div(
[
html.H6("Peak Analysis", className="text-primary"),
html.P(
f"Number of Peaks: {morph.get('num_peaks', 0)}",
className="mb-1",
),
html.P(
f"Number of Valleys: {morph.get('num_valleys', 0)}",
className="mb-1",
),
],
className="col-md-6",
),
html.Div(
[
(
html.H6(
"Peak Statistics", className="text-primary"
)
if "mean_peak_height" in morph
else html.Div()
),
(
html.P(
f"Mean Peak Height: {morph.get('mean_peak_height', 0):.4f}",
className="mb-1",
)
if "mean_peak_height" in morph
else html.Div()
),
(
html.P(
f"Peak Height Std: {morph.get('std_peak_height', 0):.4f}",
className="mb-1",
)
if "std_peak_height" in morph
else html.Div()
),
(
html.P(
f"Max Peak Height: {morph.get('max_peak_height', 0):.4f}",
className="mb-1",
)
if "max_peak_height" in morph
else html.Div()
),
],
className="col-md-6",
),
],
className="row",
),
],
className="mb-4",
)
sections.append(morph_section)
# Entropy Features
if "entropy" in categories and "error" not in features.get("entropy", {}):
ent = features["entropy"]
ent_section = html.Div(
[
html.H5(
"🧠 Entropy & Complexity Features", className="text-secondary"
),
html.Div(
[
html.Div(
[
html.H6(
"Entropy Measures", className="text-primary"
),
html.P(
f"Sample Entropy: {ent.get('sample_entropy', 0):.4f}",
className="mb-1",
),
html.P(
f"Approximate Entropy: {ent.get('approximate_entropy', 0):.4f}",
className="mb-1",
),
html.P(
f"Fractal Dimension: {ent.get('fractal_dimension', 'N/A')}",
className="mb-1",
),
],
className="col-md-6",
),
html.Div(
[
html.H6(
"Advanced Measures", className="text-primary"
),
html.P(
f"Lyapunov Exponent: {ent.get('lyapunov_exponent', 'N/A')}",
className="mb-1",
),
html.P(
f"DFA Result: {ent.get('dfa_result', 'N/A')}",
className="mb-1",
),
html.P(
f"Poincaré Features: {'Available' if ent.get('poincare_features') else 'N/A'}",
className="mb-1",
),
],
className="col-md-6",
),
],
className="row",
),
html.Div(
[
html.Small(
f"vitalDSP: {'✅ Used' if ent.get('vitaldsp_used', False) else '⚠️ Fallback'}",
className="text-muted",
)
],
className="mt-2",
),
],
className="mb-4",
)
sections.append(ent_section)
# Advanced Features
if "advanced" in features and "error" not in features.get("advanced", {}):
adv = features["advanced"]
adv_section = html.Div(
[
html.H5("🚀 Advanced Features", className="text-dark"),
html.Div(
[
(
html.Div(
[
(
html.H6(
"Cross-Correlation",
className="text-primary",
)
if "cross_correlation" in adv
else html.Div()
),
(
html.P(
f"Max Lag: {adv.get('cross_correlation', {}).get('max_lag', 'N/A')}",
className="mb-1",
)
if "cross_correlation" in adv
else html.Div()
),
(
html.P(
f"Correlation Strength: {adv.get('cross_correlation', {}).get('correlation_strength', 'N/A'):.4f}",
className="mb-1",
)
if "cross_correlation" in adv
else html.Div()
),
(
html.P(
f"vitalDSP: {'✅' if adv.get('cross_correlation', {}).get('vitaldsp_used', False) else '⚠️'}",
className="mb-1",
)
if "cross_correlation" in adv
else html.Div()
),
],
className="col-md-6",
)
if "cross_correlation" in adv
else html.Div()
),
(
html.Div(
[
(
html.H6(
"Phase Analysis",
className="text-primary",
)
if "phase_analysis" in adv
else html.Div()
),
(
html.P(
f"Phase Range: {adv.get('phase_analysis', {}).get('phase_range', 'N/A'):.4f}",
className="mb-1",
)
if "phase_analysis" in adv
else html.Div()
),
(
html.P(
f"Phase Std: {adv.get('phase_analysis', {}).get('phase_std', 'N/A'):.4f}",
className="mb-1",
)
if "phase_analysis" in adv
else html.Div()
),
(
html.P(
f"vitalDSP: {'✅' if adv.get('phase_analysis', {}).get('vitaldsp_used', False) else '⚠️'}",
className="mb-1",
)
if "phase_analysis" in adv
else html.Div()
),
],
className="col-md-6",
)
if "phase_analysis" in adv
else html.Div()
),
],
className="row",
),
],
className="mb-4",
)
sections.append(adv_section)
# Signal Quality Features
if "signal_quality" in features and "error" not in features.get(
"signal_quality", {}
):
sq = features["signal_quality"]
sq_section = html.Div(
[
html.H5("🔍 Signal Quality Assessment", className="text-success"),
html.Div(
[
html.Div(
[
html.H6(
"Quality Metrics", className="text-primary"
),
html.P(
f"Signal-to-Noise Ratio: {sq.get('snr', 0):.2f} dB",
className="mb-1",
),
html.P(
f"Baseline Wander: {sq.get('baseline_wander', 0):.4f}",
className="mb-1",
),
html.P(
f"Motion Artifacts: {sq.get('motion_artifacts', 0)}",
className="mb-1",
),
],
className="col-md-6",
),
html.Div(
[
html.H6(
"Quality Assessment", className="text-primary"
),
html.P(
f"vitalDSP: {'✅ Used' if sq.get('vitaldsp_used', False) else '⚠️ Fallback'}",
className="mb-1",
),
html.P(
f"Quality Score: {'Good' if sq.get('snr', 0) > 10 else 'Fair' if sq.get('snr', 0) > 5 else 'Poor'}",
className="mb-1",
),
],
className="col-md-6",
),
],
className="row",
),
],
className="mb-4",
)
sections.append(sq_section)
# Physiological Features
if "physiological" in features and "error" not in features.get(
"physiological", {}
):
phys = features["physiological"]
if phys.get("vitaldsp_used", False) and "waveform_features" in phys:
phys_section = html.Div(
[
html.H5("💓 Physiological Features", className="text-danger"),
html.Div(
[
html.H6(
"Waveform Morphology", className="text-primary"
),
html.P("vitalDSP: ✅ Used", className="mb-1"),
html.P(
f"Features Available: {len(phys.get('waveform_features', {}))}",
className="mb-1",
),
html.Small(
"Advanced physiological waveform analysis completed",
className="text-muted",
),
]
),
],
className="mb-4",
)
sections.append(phys_section)
if not sections:
return html.Div(
[
html.H5("No Features Extracted"),
html.P("Please select feature categories and run the analysis."),
]
)
# Add summary section
summary_section = html.Div(
[
html.Hr(),
html.H6("📋 Analysis Summary", className="text-primary"),
html.P(
f"Successfully extracted features from {len(categories)} categories"
),
html.P(f"Signal type: {signal_type}"),
html.P(
f"vitalDSP utilization: {'High' if vitaldsp_used else 'Limited'}"
),
html.Small(
"Features extracted using advanced signal processing algorithms",
className="text-muted",
),
],
className="mt-4",
)
sections.append(summary_section)
return html.Div(sections)
except Exception as e:
logger.error(f"Error creating features display: {e}")
return html.Div(
[html.H5("Error"), html.P(f"Failed to create features display: {str(e)}")]
)
[docs]
def create_features_analysis_plots(signal_data, features, categories, sampling_freq):
"""Create comprehensive features analysis plots with enhanced visualizations."""
try:
logger.info(
f"Plotting function called with signal_data length: {len(signal_data)}, categories: {categories}"
)
logger.info(f"Features available: {list(features.keys())}")
if len(categories) == 0:
logger.warning("No categories selected for plotting")
return create_empty_figure()
if signal_data is None or len(signal_data) == 0:
logger.error("Signal data is None or empty")
return create_empty_figure()
# Create subplots based on selected categories
num_plots = len(categories)
if num_plots == 0:
num_plots = 1
# Add extra plots for advanced features if available
advanced_plots = 0
if "advanced" in features and "error" not in features.get("advanced", {}):
advanced_options = features["advanced"].keys()
if "cross_correlation" in advanced_options:
advanced_plots += 1
if "phase_analysis" in advanced_options:
advanced_plots += 1
if "wavelet" in advanced_options:
advanced_plots += 1
# Add signal quality plot
signal_quality_plot = (
1
if "signal_quality" in features
and "error" not in features.get("signal_quality", {})
else 0
)
total_plots = num_plots + advanced_plots + signal_quality_plot
logger.info(f"Creating subplots: rows={total_plots}, cols=2")
# Create subplot titles
subplot_titles = (
[f"{cat.title()} Features" for cat in categories]
+ ["Cross-Correlation", "Phase Analysis", "Wavelet Analysis"][
:advanced_plots
]
+ ["Signal Quality"] * signal_quality_plot
+ [f"{cat.title()} Stats" for cat in categories]
+ ["Advanced Stats"] * advanced_plots
+ ["Quality Metrics"] * signal_quality_plot
)
logger.info(f"Subplot titles: {subplot_titles}")
# For now, let's use a simple plot to debug the issue
logger.info("Creating simple plot instead of complex subplots for debugging")
fig = go.Figure()
# Add the main signal
fig.add_trace(
go.Scatter(
x=list(range(len(signal_data))),
y=signal_data,
mode="lines",
name="Signal Data",
line=dict(color="blue", width=1),
)
)
# Add some basic statistics as horizontal lines
mean_val = np.mean(signal_data)
std_val = np.std(signal_data)
fig.add_hline(
y=mean_val,
line_dash="dash",
line_color="red",
annotation_text=f"Mean: {mean_val:.4f}",
)
fig.add_hline(
y=mean_val + std_val,
line_dash="dot",
line_color="orange",
annotation_text=f"+1σ: {mean_val + std_val:.4f}",
)
fig.add_hline(
y=mean_val - std_val,
line_dash="dot",
line_color="orange",
annotation_text=f"-1σ: {mean_val - std_val:.4f}",
)
# Add a second subplot for features if available
if "statistical" in features and "error" not in features.get("statistical", {}):
stats = features["statistical"]
stat_names = ["Mean", "Std", "Variance", "Skewness", "Kurtosis"]
stat_values = [
stats.get("mean", 0),
stats.get("std", 0),
stats.get("variance", 0),
stats.get("skewness", 0),
stats.get("kurtosis", 0),
]
fig.add_trace(
go.Bar(
x=stat_names,
y=stat_values,
name="Statistics",
marker_color=[
"#1f77b4",
"#ff7f0e",
"#2ca02c",
"#d62728",
"#9467bd",
],
yaxis="y2",
)
)
# Update layout to include secondary y-axis
fig.update_layout(
yaxis2=dict(title="Feature Values", overlaying="y", side="right")
)
fig.update_layout(
title="Signal Analysis - Basic Plot",
xaxis_title="Sample Index",
yaxis_title="Amplitude",
height=600,
showlegend=True,
template="plotly_white",
)
logger.info("Simple plot created successfully")
return fig
except Exception as e:
logger.error(f"Error creating analysis plots: {e}")
logger.error(f"Traceback: {traceback.format_exc()}")
# Create a simple fallback plot for debugging
try:
fallback_fig = go.Figure()
fallback_fig.add_trace(
go.Scatter(
x=list(range(len(signal_data))),
y=signal_data,
mode="lines",
name="Signal Data",
)
)
fallback_fig.update_layout(
title="Fallback Plot - Signal Data",
xaxis_title="Sample Index",
yaxis_title="Amplitude",
)
logger.info("Created fallback plot")
return fallback_fig
except Exception as fallback_error:
logger.error(f"Even fallback plot failed: {fallback_error}")
return create_empty_figure()
# Utility functions for feature calculations
def _calculate_skewness(data):
"""Calculate skewness of the data."""
mean = np.mean(data)
std = np.std(data)
if std == 0:
return 0
return np.mean(((data - mean) / std) ** 3)
def _calculate_kurtosis(data):
"""Calculate kurtosis of the data."""
mean = np.mean(data)
std = np.std(data)
if std == 0:
return 0
return np.mean(((data - mean) / std) ** 4) - 3
def _calculate_spectral_rolloff(freq, magnitude, threshold=0.85):
"""Calculate spectral rolloff frequency."""
total_energy = np.sum(magnitude)
if total_energy == 0:
return freq[-1] if len(freq) > 0 else 0
cumulative_energy = np.cumsum(magnitude)
rolloff_idx = np.where(cumulative_energy >= threshold * total_energy)[0]
if len(rolloff_idx) > 0:
return freq[rolloff_idx[0]]
return freq[-1] if len(freq) > 0 else 0
def _extract_signal_quality_metrics(signal_data, sampling_freq):
"""Extract signal quality metrics using vitalDSP if available."""
try:
# Try to use vitalDSP signal quality assessment
from vitalDSP.signal_quality_assessment.signal_quality import SignalQuality
# For SNR calculation, we need both original and processed signals
# Since we only have one signal, we'll use a simple approach
# Get various quality metrics using available methods
# For SNR, we'll estimate it using the signal itself
signal_power = np.mean(signal_data**2)
noise_estimate = np.mean(np.diff(signal_data) ** 2)
snr = 10 * np.log10(signal_power / noise_estimate) if noise_estimate > 0 else 0
# Estimate baseline wander and motion artifacts
baseline_wander = np.std(signal_data - np.mean(signal_data))
motion_artifacts = np.sum(
np.abs(np.diff(signal_data)) > 2 * np.std(signal_data)
)
return {
"snr": snr,
"baseline_wander": baseline_wander,
"motion_artifacts": motion_artifacts,
"vitaldsp_used": True,
}
except Exception as e:
logger.warning(f"vitalDSP signal quality assessment failed: {e}")
# Fallback to basic quality metrics
return {
"snr": (
(np.std(signal_data) / np.mean(np.abs(np.diff(signal_data))))
if np.mean(np.abs(np.diff(signal_data))) > 0
else 0
),
"baseline_wander": np.std(signal_data - np.mean(signal_data)),
"motion_artifacts": np.sum(
np.abs(np.diff(signal_data)) > 2 * np.std(signal_data)
),
"vitaldsp_used": False,
}
def _extract_physiological_features(signal_data, sampling_freq, signal_type):
"""Extract physiological-specific features using vitalDSP."""
try:
if signal_type.lower() in ["ppg", "ecg"]:
# Use vitalDSP physiological features
from vitalDSP.physiological_features.waveform import WaveformMorphology
wm = WaveformMorphology(
signal_data, sampling_freq, signal_type=signal_type.upper()
)
# Extract waveform features using available methods
try:
# First detect peaks using scipy since vitalDSP expects peaks to be passed in
peaks, _ = signal.find_peaks(
signal_data,
height=np.mean(signal_data) + np.std(signal_data),
distance=int(sampling_freq * 0.3),
)
# Get basic morphological features
if signal_type.lower() == "ecg":
# For ECG, get QRS features using detected peaks
if len(peaks) > 0:
q_valleys = (
wm.detect_q_valley(peaks)
if hasattr(wm, "detect_q_valley")
else []
)
s_valleys = (
wm.detect_s_valley(peaks)
if hasattr(wm, "detect_s_valley")
else []
)
features = {
"r_peaks": len(peaks),
"q_valleys": len(q_valleys),
"s_valleys": len(s_valleys),
"heart_rate": len(peaks)
* 60
/ (len(signal_data) / sampling_freq),
}
else:
features = {
"r_peaks": 0,
"q_valleys": 0,
"s_valleys": 0,
"heart_rate": 0,
}
else:
# For PPG, get pulse features
if len(peaks) > 0:
troughs = (
wm.detect_troughs(peaks)
if hasattr(wm, "detect_troughs")
else []
)
features = {
"systolic_peaks": len(peaks),
"troughs": len(troughs),
"pulse_rate": len(peaks)
* 60
/ (len(signal_data) / sampling_freq),
}
else:
features = {"systolic_peaks": 0, "troughs": 0, "pulse_rate": 0}
return {"waveform_features": features, "vitaldsp_used": True}
except Exception as e:
logger.warning(f"Error extracting waveform features: {e}")
return {"waveform_features": {"error": str(e)}, "vitaldsp_used": True}
else:
return {"vitaldsp_used": False}
except Exception as e:
logger.warning(f"vitalDSP physiological features failed: {e}")
return {"vitaldsp_used": False}