"""
Analysis callbacks for vitalDSP core functionality.
This module handles core vitalDSP analysis callbacks including
frequency domain, and signal processing operations.
"""
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from dash import Input, Output, State, callback_context, no_update, html, dcc
from dash.exceptions import PreventUpdate
import dash_bootstrap_components as dbc
from scipy import signal
import logging
logger = logging.getLogger(__name__)
# Import plot utilities for performance optimization
try:
from vitalDSP_webapp.utils.plot_utils import limit_plot_data, check_plot_data_size
except ImportError:
# Fallback if plot_utils not available
def limit_plot_data(
time_axis, signal_data, max_duration=300, max_points=10000, start_time=None
):
"""Fallback implementation of limit_plot_data"""
return time_axis, signal_data
def check_plot_data_size(time_axis, signal_data):
"""Fallback implementation"""
return True
# Import create_empty_figure for error handling
try:
from vitalDSP_webapp.callbacks.analysis.time_domain_callbacks import (
create_empty_figure,
)
except ImportError:
# Fallback if import fails
def create_empty_figure():
"""Fallback implementation of create_empty_figure"""
fig = go.Figure()
fig.add_annotation(
text="No data available",
xref="paper",
yref="paper",
x=0.5,
y=0.5,
showarrow=False,
)
return fig
# Import filtering functions from signal filtering callbacks
from vitalDSP_webapp.callbacks.analysis.signal_filtering_callbacks import (
apply_traditional_filter,
)
from vitalDSP_webapp.callbacks.core.theme_callbacks import apply_plot_theme
# Helper function for formatting large numbers
[docs]
def apply_filter(
signal_data,
sampling_freq,
filter_family,
filter_response,
low_freq,
high_freq,
filter_order,
):
"""
Apply filter to the signal using vitalDSP SignalFiltering class.
This function now uses the validated vitalDSP SignalFiltering implementation
for improved error handling, parameter validation, and consistency.
Args:
signal_data: Signal array
sampling_freq: Sampling frequency in Hz
filter_family: Filter type (butter, cheby1, cheby2, ellip, bessel)
filter_response: Filter response (bandpass, lowpass, highpass, bandstop)
low_freq: Low cutoff frequency in Hz
high_freq: High cutoff frequency in Hz
filter_order: Filter order
Returns:
np.ndarray: Filtered signal
"""
try:
from vitalDSP.filtering.signal_filtering import SignalFiltering
# Create SignalFiltering instance
sf = SignalFiltering(signal_data)
# Determine cutoff and filter type
if filter_response == "bandpass":
cutoff = [low_freq, high_freq]
filter_type = "band"
elif filter_response == "lowpass":
cutoff = high_freq
filter_type = "low"
elif filter_response == "highpass":
cutoff = low_freq
filter_type = "high"
elif filter_response == "bandstop":
cutoff = [low_freq, high_freq]
filter_type = "bandstop"
else:
# Default to bandpass
cutoff = [low_freq, high_freq]
filter_type = "band"
# Apply appropriate filter using vitalDSP
if filter_family == "butter" or filter_family == "butterworth":
return sf.butterworth(
cutoff, fs=sampling_freq, order=filter_order, btype=filter_type
)
elif filter_family == "cheby1" or filter_family == "chebyshev1":
return sf.chebyshev(
cutoff,
fs=sampling_freq,
order=filter_order,
btype=filter_type,
ripple=0.5,
)
elif filter_family == "cheby2" or filter_family == "chebyshev2":
# Use vitalDSP's Chebyshev Type II filter implementation
return sf.chebyshev2(
cutoff,
fs=sampling_freq,
order=filter_order,
btype=filter_type,
stopband_attenuation=40,
)
elif filter_family == "ellip" or filter_family == "elliptic":
return sf.elliptic(
cutoff,
fs=sampling_freq,
order=filter_order,
btype=filter_type,
ripple=0.5,
stopband_attenuation=40,
)
elif filter_family == "bessel":
# Use vitalDSP's Bessel filter implementation
return sf.bessel(
cutoff,
fs=sampling_freq,
order=filter_order,
btype=filter_type,
)
else:
# Default to Butterworth
logger.warning(
f"Unknown filter family '{filter_family}', defaulting to Butterworth"
)
return sf.butterworth(
cutoff, fs=sampling_freq, order=filter_order, btype=filter_type
)
except Exception as e:
logger.error(
f"Error applying vitalDSP filter: {e}. Falling back to scipy implementation."
)
# Fallback to scipy if vitalDSP fails
try:
from scipy import signal as sp_signal
nyquist = sampling_freq / 2
low_freq_norm = low_freq / nyquist
high_freq_norm = high_freq / nyquist
if filter_response == "bandpass":
btype = "band"
cutoff = [low_freq_norm, high_freq_norm]
elif filter_response == "lowpass":
btype = "low"
cutoff = high_freq_norm
elif filter_response == "highpass":
btype = "high"
cutoff = low_freq_norm
elif filter_response == "bandstop":
btype = "bandstop"
cutoff = [low_freq_norm, high_freq_norm]
else:
btype = "band"
cutoff = [low_freq_norm, high_freq_norm]
b, a = sp_signal.butter(filter_order, cutoff, btype=btype)
return sp_signal.filtfilt(b, a, signal_data)
except Exception as fallback_error:
logger.error(f"Scipy fallback also failed: {fallback_error}")
return signal_data
# def detect_peaks(signal_data, sampling_freq):
# """Detect peaks in the signal."""
# try:
# # Calculate adaptive threshold
# mean_val = np.mean(signal_data) if len(signal_data) > 0 else 0
# std_val = np.std(signal_data) if len(signal_data) > 0 else 0
# threshold = mean_val + 2 * std_val
# # Find peaks with minimum distance constraint
# min_distance = int(sampling_freq * 0.1) # Minimum 0.1 seconds between peaks
# peaks, _ = signal.find_peaks(
# signal_data, height=threshold, distance=min_distance
# )
# return peaks
# except Exception as e:
# logger.error(f"Error detecting peaks: {e}")
# return np.array([])
# NOTE: This callback has been commented out as it's orphaned
# If needed, it should be moved to the appropriate callbacks file
# and registered in a register_*_callbacks(app) function
#
# @app.callback(
# [
# Output("fft-params", "style"),
# Output("psd-params", "style"),
# Output("stft-params", "style"),
# Output("wavelet-params", "style"),
# ],
# [Input("freq-analysis-type", "value")],
# )
# def toggle_frequency_params(analysis_type):
# """Show/hide parameter sections based on selected analysis type."""
# # Default style (hidden)
# hidden_style = {"display": "none"}
# visible_style = {"display": "block"}
#
# # Initialize all as hidden
# fft_style = hidden_style
# psd_style = hidden_style
# stft_style = hidden_style
# wavelet_style = hidden_style
#
# # Show relevant section based on analysis type
# if analysis_type == "fft":
# fft_style = visible_style
# elif analysis_type == "psd":
# psd_style = visible_style
# elif analysis_type == "stft":
# stft_style = visible_style
# elif analysis_type == "wavelet":
# wavelet_style = visible_style
#
# return fft_style, psd_style, stft_style, wavelet_style
[docs]
def create_fft_plot(
signal_data, sampling_freq, window_type, n_points, freq_min, freq_max
):
"""Create FFT plot using vitalDSP FourierTransform."""
try:
from vitalDSP.transforms.fourier_transform import FourierTransform
# Apply window if specified
if window_type and window_type != "none":
if window_type == "hann":
windowed_signal = signal_data * np.hanning(len(signal_data))
elif window_type == "hamming":
windowed_signal = signal_data * np.hamming(len(signal_data))
elif window_type == "blackman":
windowed_signal = signal_data * np.blackman(len(signal_data))
else:
windowed_signal = signal_data
else:
windowed_signal = signal_data
# Use vitalDSP FourierTransform
ft = FourierTransform(windowed_signal)
fft_result = ft.compute_dft()
# Compute frequency axis
fft_freq = np.fft.fftfreq(len(fft_result), 1 / sampling_freq)
# Get positive frequencies only
positive_freq_mask = fft_freq > 0
fft_freq = fft_freq[positive_freq_mask]
fft_magnitude = np.abs(fft_result[positive_freq_mask])
# Apply frequency range filter if specified
if freq_min is not None and freq_max is not None:
freq_mask = (fft_freq >= freq_min) & (fft_freq <= freq_max)
fft_freq = fft_freq[freq_mask]
fft_magnitude = fft_magnitude[freq_mask]
# Create plot
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=fft_freq,
y=fft_magnitude,
mode="lines",
name="FFT Magnitude",
line=dict(color="blue", width=2),
)
)
fig.update_layout(
title="Fast Fourier Transform (FFT)",
xaxis_title="Frequency (Hz)",
yaxis_title="Magnitude",
showlegend=True,
plot_bgcolor="white",
template="plotly_white",
# Add pan/zoom tools
dragmode="pan",
modebar=dict(
add=[
"pan2d",
"zoom2d",
"select2d",
"lasso2d",
"zoomIn2d",
"zoomOut2d",
"autoScale2d",
"resetScale2d",
]
),
)
return fig
except Exception as e:
logger.error(f"Error creating FFT plot: {e}")
return create_empty_figure()
[docs]
def create_stft_plot(
signal_data, sampling_freq, window_size, hop_size, freq_min, freq_max
):
"""Create STFT plot using vitalDSP STFT."""
try:
from vitalDSP.transforms.stft import STFT
# Use vitalDSP STFT
stft_obj = STFT(signal_data, window_size=window_size, hop_size=hop_size)
stft_result = stft_obj.compute_stft()
# Get time and frequency axes
time_axis = np.arange(stft_result.shape[1]) * hop_size / sampling_freq
freq_axis = np.fft.rfftfreq(window_size, 1 / sampling_freq)
# Apply frequency range filter if specified
if freq_min is not None and freq_max is not None:
freq_mask = (freq_axis >= freq_min) & (freq_axis <= freq_max)
freq_axis = freq_axis[freq_mask]
stft_result = stft_result[freq_mask, :]
# Create plot
fig = go.Figure()
fig.add_trace(
go.Heatmap(
z=np.abs(stft_result),
x=time_axis,
y=freq_axis,
colorscale="viridis",
name="STFT Magnitude",
)
)
fig.update_layout(
title="Short-Time Fourier Transform (STFT)",
xaxis_title="Time (s)",
yaxis_title="Frequency (Hz)",
showlegend=True,
plot_bgcolor="white",
template="plotly_white",
# Add pan/zoom tools
dragmode="pan",
modebar=dict(
add=[
"pan2d",
"zoom2d",
"select2d",
"lasso2d",
"zoomIn2d",
"zoomOut2d",
"autoScale2d",
"resetScale2d",
]
),
)
return fig
except Exception as e:
logger.error(f"Error creating STFT plot: {e}")
return create_empty_figure()
[docs]
def create_wavelet_plot(
signal_data, sampling_freq, wavelet_type, levels, freq_min, freq_max
):
"""Create wavelet plot using vitalDSP WaveletTransform."""
try:
from vitalDSP.transforms.wavelet_transform import WaveletTransform
# Use vitalDSP WaveletTransform
wt = WaveletTransform(signal_data, wavelet_name=wavelet_type)
# Perform wavelet decomposition
approximations = []
details = []
current_signal = signal_data.copy()
for level in range(levels):
approx, detail = wt._wavelet_decompose(current_signal)
approximations.append(approx)
details.append(detail)
current_signal = approx
# Create subplots for each level
fig = make_subplots(
rows=levels + 1,
cols=1,
subplot_titles=[f"Level {i+1} Detail" for i in range(levels)]
+ ["Final Approximation"],
vertical_spacing=0.05,
)
# Add detail coefficients
for i, detail in enumerate(details):
time_axis = np.arange(len(detail)) / sampling_freq
fig.add_trace(
go.Scatter(
x=time_axis,
y=detail,
mode="lines",
name=f"Level {i+1} Detail",
line=dict(width=1),
),
row=i + 1,
col=1,
)
# Add final approximation
time_axis = np.arange(len(approximations[-1])) / sampling_freq
fig.add_trace(
go.Scatter(
x=time_axis,
y=approximations[-1],
mode="lines",
name="Final Approximation",
line=dict(color="red", width=2),
),
row=levels + 1,
col=1,
)
fig.update_layout(
title=f"Wavelet Transform ({wavelet_type}) - {levels} Levels",
height=300 * (levels + 1),
showlegend=True,
plot_bgcolor="white",
template="plotly_white",
# Add pan/zoom tools
dragmode="pan",
modebar=dict(
add=[
"pan2d",
"zoom2d",
"select2d",
"lasso2d",
"zoomIn2d",
"zoomOut2d",
"autoScale2d",
"resetScale2d",
]
),
)
return fig
except Exception as e:
logger.error(f"Error creating wavelet plot: {e}")
return create_empty_figure()
[docs]
def create_enhanced_psd_plot(
signal_data,
sampling_freq,
window_sec,
overlap,
freq_max,
log_scale,
normalize,
channel,
column_mapping,
):
"""Create enhanced PSD plot using vitalDSP FrequencyDomainFeatures."""
try:
from vitalDSP.physiological_features.frequency_domain import (
FrequencyDomainFeatures,
)
# Convert window from seconds to samples
window_samples = int(window_sec * sampling_freq)
# Use vitalDSP's SignalPowerAnalysis for PSD computation
from vitalDSP.physiological_features.signal_power_analysis import (
SignalPowerAnalysis,
)
# Create SignalPowerAnalysis instance
spa = SignalPowerAnalysis(signal_data)
# Compute PSD using vitalDSP
freqs, psd = spa.compute_psd(
fs=sampling_freq,
nperseg=min(window_samples, len(signal_data)),
noverlap=int(min(window_samples, len(signal_data)) * overlap),
)
# Apply frequency range filter
if freq_max:
freq_mask = freqs <= freq_max
freqs = freqs[freq_mask]
psd = psd[freq_mask]
# Apply log scale if requested
if log_scale and "on" in log_scale:
psd = 10 * np.log10(psd + 1e-10) # Add small value to avoid log(0)
y_label = "Power Spectral Density (dB/Hz)"
else:
y_label = "Power Spectral Density"
# Normalize if requested
if normalize and "on" in normalize:
psd = psd / np.max(psd)
y_label += " (Normalized)"
# Create plot
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=freqs,
y=psd,
mode="lines",
name="PSD",
line=dict(color="green", width=2),
)
)
fig.update_layout(
title="Power Spectral Density (PSD)",
xaxis_title="Frequency (Hz)",
yaxis_title=y_label,
showlegend=True,
plot_bgcolor="white",
template="plotly_white",
# Add pan/zoom tools
dragmode="pan",
modebar=dict(
add=[
"pan2d",
"zoom2d",
"select2d",
"lasso2d",
"zoomIn2d",
"zoomOut2d",
"autoScale2d",
"resetScale2d",
]
),
)
return fig
except Exception as e:
logger.error(f"Error creating PSD plot: {e}")
return create_empty_figure()
[docs]
def create_enhanced_spectrogram_plot(
signal_data,
sampling_freq,
window_size,
overlap,
freq_max,
colormap,
scaling,
window_type,
):
"""Create enhanced spectrogram plot."""
try:
# Apply window function
if window_type == "hann":
window_func = np.hanning(window_size)
elif window_type == "hamming":
window_func = np.hamming(window_size)
elif window_type == "blackman":
window_func = np.blackman(window_size)
elif window_type == "kaiser":
window_func = np.kaiser(window_size, beta=14)
elif window_type == "gaussian":
window_func = np.exp(
-0.5
* ((np.arange(window_size) - window_size / 2) / (window_size / 6)) ** 2
)
else:
window_func = np.ones(window_size)
# Compute spectrogram using vitalDSP's STFT
from vitalDSP.transforms.stft import STFT
hop_size = window_size - int(window_size * overlap)
stft_obj = STFT(
signal_data, window_size=window_size, hop_size=hop_size, n_fft=window_size
)
stft_matrix = stft_obj.compute_stft()
# Convert STFT to magnitude spectrogram
Sxx = np.abs(stft_matrix) ** 2 # Power spectrogram
# Create time and frequency axes
times = np.arange(Sxx.shape[1]) * hop_size / sampling_freq
freqs = np.fft.rfftfreq(window_size, 1 / sampling_freq)
# Apply frequency range filter
if freq_max:
freq_mask = freqs <= freq_max
freqs = freqs[freq_mask]
Sxx = Sxx[freq_mask, :]
# Apply scaling
if scaling == "density":
Sxx = Sxx / sampling_freq
title_suffix = " (Density)"
else:
title_suffix = " (Spectrum)"
# Create plot
fig = go.Figure()
fig.add_trace(
go.Heatmap(
z=10 * np.log10(Sxx + 1e-10), # Convert to dB
x=times,
y=freqs,
colorscale=colormap,
name="Spectrogram",
)
)
fig.update_layout(
title=f"Time-Frequency Spectrogram{title_suffix}",
xaxis_title="Time (s)",
yaxis_title="Frequency (Hz)",
showlegend=True,
plot_bgcolor="white",
template="plotly_white",
# Add pan/zoom tools
dragmode="pan",
modebar=dict(
add=[
"pan2d",
"zoom2d",
"select2d",
"lasso2d",
"zoomIn2d",
"zoomOut2d",
"autoScale2d",
"resetScale2d",
]
),
)
return fig
except Exception as e:
logger.error(f"Error creating spectrogram plot: {e}")
return create_empty_figure()
[docs]
def generate_frequency_analysis_results(
signal_data, sampling_freq, analysis_type, analysis_options
):
"""Generate frequency analysis results summary."""
try:
results = []
# Basic signal information
results.append(html.H5("Signal Information"))
results.append(html.P(f"Signal Length: {len(signal_data)} samples"))
results.append(
html.P(f"Duration: {len(signal_data) / sampling_freq:.2f} seconds")
)
results.append(html.P(f"Sampling Frequency: {sampling_freq} Hz"))
results.append(html.P(f"Nyquist Frequency: {sampling_freq / 2:.2f} Hz"))
# Analysis type specific results
results.append(html.H5(f"Analysis Type: {analysis_type.upper()}"))
if analysis_type == "fft":
# FFT specific analysis
fft_result = np.fft.fft(signal_data)
fft_freq = np.fft.fftfreq(len(signal_data), 1 / sampling_freq)
# Find dominant frequency
positive_freq_mask = fft_freq > 0
fft_freq_pos = fft_freq[positive_freq_mask]
fft_magnitude_pos = np.abs(fft_result[positive_freq_mask])
if len(fft_magnitude_pos) > 0:
dominant_freq_idx = np.argmax(fft_magnitude_pos)
dominant_freq = fft_freq_pos[dominant_freq_idx]
results.append(html.P(f"Dominant Frequency: {dominant_freq:.2f} Hz"))
results.append(
html.P(
f"Frequency Resolution: {fft_freq_pos[1] - fft_freq_pos[0]:.4f} Hz"
)
)
elif analysis_type == "psd":
# PSD specific analysis
from vitalDSP.physiological_features.signal_power_analysis import (
SignalPowerAnalysis,
)
spa = SignalPowerAnalysis(signal_data)
freqs, psd = spa.compute_psd(
fs=sampling_freq, nperseg=min(256, len(signal_data) // 2)
)
# Find peak frequency
peak_freq_idx = np.argmax(psd)
peak_freq = freqs[peak_freq_idx]
results.append(html.P(f"Peak Frequency: {peak_freq:.2f} Hz"))
results.append(html.P(f"Total Power: {np.sum(psd):.2e}"))
elif analysis_type == "stft":
# STFT specific analysis
results.append(html.P("STFT provides time-frequency representation"))
results.append(html.P("Check the spectrogram plot for detailed analysis"))
elif analysis_type == "wavelet":
# Wavelet specific analysis
results.append(
html.P("Wavelet transform provides multi-resolution analysis")
)
results.append(html.P("Check the wavelet plot for decomposition levels"))
return html.Div(results)
except Exception as e:
logger.error(f"Error generating frequency analysis results: {e}")
return html.Div(
[html.H5("Error"), html.P(f"Failed to generate results: {str(e)}")]
)
[docs]
def create_frequency_peak_analysis_table(
signal_data, sampling_freq, analysis_type, analysis_options, signal_type="unknown"
):
"""Create frequency peak analysis table."""
try:
if "peak_detection" not in analysis_options:
return html.Div([html.P("Peak detection not selected")])
# Perform peak detection on frequency domain
if analysis_type == "fft":
fft_result = np.fft.fft(signal_data)
fft_freq = np.fft.fftfreq(len(signal_data), 1 / sampling_freq)
# Get positive frequencies
positive_freq_mask = fft_freq > 0
fft_freq_pos = fft_freq[positive_freq_mask]
fft_magnitude_pos = np.abs(fft_result[positive_freq_mask])
# Find peaks
# Use vitalDSP for ECG/PPG peak detection, scipy for others
if signal_type and signal_type.lower() in ["ecg", "ppg"]:
from vitalDSP.physiological_features.waveform import WaveformMorphology
wm = WaveformMorphology(
fft_magnitude_pos, fs=sampling_freq, signal_type=signal_type.upper()
)
if signal_type.lower() == "ecg":
peaks = wm.r_peaks
elif signal_type.lower() == "ppg":
peaks = wm.systolic_peaks
properties = {}
else:
# Use scipy for other signal types
from scipy.signal import find_peaks
peaks, properties = find_peaks(
fft_magnitude_pos, height=np.max(fft_magnitude_pos) * 0.1
)
if len(peaks) > 0:
# Create table
table_data = []
for i, peak_idx in enumerate(peaks[:10]): # Limit to top 10 peaks
freq = fft_freq_pos[peak_idx]
magnitude = fft_magnitude_pos[peak_idx]
table_data.append(
[f"Peak {i+1}", f"{freq:.2f} Hz", f"{magnitude:.2e}"]
)
return dbc.Table.from_dataframe(
pd.DataFrame(
table_data, columns=["Peak", "Frequency (Hz)", "Magnitude"]
),
striped=True,
bordered=True,
hover=True,
)
else:
return html.P("No significant peaks found")
else:
return html.P(f"Peak detection for {analysis_type} not implemented yet")
except Exception as e:
logger.error(f"Error creating peak analysis table: {e}")
return html.Div(
[html.H5("Error"), html.P(f"Failed to create peak table: {str(e)}")]
)
[docs]
def create_frequency_band_power_table(
signal_data, sampling_freq, analysis_type, analysis_options
):
"""Create frequency band power analysis table."""
try:
if "band_power" not in analysis_options:
return html.Div([html.P("Band power analysis not selected")])
# Define frequency bands
bands = {
"Delta (0.5-4 Hz)": (0.5, 4),
"Theta (4-8 Hz)": (4, 8),
"Alpha (8-13 Hz)": (8, 13),
"Beta (13-30 Hz)": (13, 30),
"Gamma (30-100 Hz)": (30, 100),
}
# Compute PSD using vitalDSP's SignalPowerAnalysis
from vitalDSP.physiological_features.signal_power_analysis import (
SignalPowerAnalysis,
)
spa = SignalPowerAnalysis(signal_data)
freqs, psd = spa.compute_psd(
fs=sampling_freq, nperseg=min(256, len(signal_data) // 2)
)
# Calculate band powers
band_powers = {}
for band_name, (low_freq, high_freq) in bands.items():
freq_mask = (freqs >= low_freq) & (freqs <= high_freq)
if np.any(freq_mask):
power = np.trapz(psd[freq_mask], freqs[freq_mask])
band_powers[band_name] = power
# Create table
table_data = []
for band_name, power in band_powers.items():
table_data.append([band_name, f"{power:.2e}"])
return dbc.Table.from_dataframe(
pd.DataFrame(table_data, columns=["Frequency Band", "Power"]),
striped=True,
bordered=True,
hover=True,
)
except Exception as e:
logger.error(f"Error creating band power table: {e}")
return html.Div(
[html.H5("Error"), html.P(f"Failed to create band power table: {str(e)}")]
)
[docs]
def create_frequency_stability_table(
signal_data, sampling_freq, analysis_type, analysis_options
):
"""Create frequency stability analysis table."""
try:
if "stability" not in analysis_options:
return html.Div([html.P("Stability analysis not selected")])
# Basic stability metrics
mean_value = np.mean(signal_data)
std_value = np.std(signal_data)
cv = std_value / abs(mean_value) if mean_value != 0 else float("inf")
# Frequency stability (using FFT)
fft_result = np.fft.fft(signal_data)
fft_freq = np.fft.fftfreq(len(signal_data), 1 / sampling_freq)
# Get positive frequencies
positive_freq_mask = fft_freq > 0
fft_freq_pos = fft_freq[positive_freq_mask]
fft_magnitude_pos = np.abs(fft_result[positive_freq_mask])
# Find dominant frequency
if len(fft_magnitude_pos) > 0:
dominant_freq_idx = np.argmax(fft_magnitude_pos)
dominant_freq = fft_freq_pos[dominant_freq_idx]
else:
dominant_freq = 0
# Create table
table_data = [
["Mean Value", f"{mean_value:.4f}"],
["Standard Deviation", f"{std_value:.4f}"],
["Coefficient of Variation", f"{cv:.4f}"],
["Dominant Frequency", f"{dominant_freq:.2f} Hz"],
["Signal Length", f"{len(signal_data)} samples"],
]
return dbc.Table.from_dataframe(
pd.DataFrame(table_data, columns=["Metric", "Value"]),
striped=True,
bordered=True,
hover=True,
)
except Exception as e:
logger.error(f"Error creating stability table: {e}")
return html.Div(
[html.H5("Error"), html.P(f"Failed to create stability table: {str(e)}")]
)
[docs]
def create_frequency_harmonics_table(
signal_data, sampling_freq, analysis_type, analysis_options
):
"""Create frequency harmonics analysis table."""
try:
if "harmonic_analysis" not in analysis_options:
return html.Div([html.P("Harmonic analysis not selected")])
# Perform FFT for harmonic analysis
fft_result = np.fft.fft(signal_data)
fft_freq = np.fft.fftfreq(len(signal_data), 1 / sampling_freq)
# Get positive frequencies
positive_freq_mask = fft_freq > 0
fft_freq_pos = fft_freq[positive_freq_mask]
fft_magnitude_pos = np.abs(fft_result[positive_freq_mask])
if len(fft_magnitude_pos) > 0:
# Find fundamental frequency (highest peak)
# Use vitalDSP for ECG/PPG peak detection, scipy for others
if signal_type and signal_type.lower() in ["ecg", "ppg"]: # noqa: F821
from vitalDSP.physiological_features.waveform import WaveformMorphology
wm = WaveformMorphology(
fft_magnitude_pos,
fs=sampling_freq,
signal_type=signal_type.upper(), # noqa: F821
)
if signal_type.lower() == "ecg": # noqa: F821
peaks = wm.r_peaks
elif signal_type.lower() == "ppg": # noqa: F821
peaks = wm.systolic_peaks
properties = {}
else:
# Use scipy for other signal types
from scipy.signal import find_peaks
peaks, properties = find_peaks(
fft_magnitude_pos, height=np.max(fft_magnitude_pos) * 0.1
)
if len(peaks) > 0:
fundamental_freq = fft_freq_pos[peaks[0]]
# Find harmonics (multiples of fundamental)
harmonics = []
for i in range(1, 6): # Look for up to 5th harmonic
harmonic_freq = fundamental_freq * i
# Find closest frequency in our data
freq_idx = np.argmin(np.abs(fft_freq_pos - harmonic_freq))
if freq_idx < len(fft_magnitude_pos):
harmonic_magnitude = fft_magnitude_pos[freq_idx]
harmonics.append(
[
f"{i}st",
f"{harmonic_freq:.2f} Hz",
f"{harmonic_magnitude:.2e}",
]
)
# Create table
table_data = [
[
"Fundamental",
f"{fundamental_freq:.2f} Hz",
f"{fft_magnitude_pos[peaks[0]]:.2e}",
]
]
table_data.extend(harmonics)
return dbc.Table.from_dataframe(
pd.DataFrame(
table_data, columns=["Harmonic", "Frequency (Hz)", "Magnitude"]
),
striped=True,
bordered=True,
hover=True,
)
else:
return html.P("No significant peaks found for harmonic analysis")
else:
return html.P("Insufficient data for harmonic analysis")
except Exception as e:
logger.error(f"Error creating harmonics table: {e}")
return html.Div(
[html.H5("Error"), html.P(f"Failed to create harmonics table: {str(e)}")]
)
[docs]
def register_vitaldsp_callbacks(app):
"""
Register vitalDSP core callbacks.
Note: Time domain callbacks have been migrated to time_domain_callbacks.py.
This file now only contains helper functions for frequency domain analysis.
No callbacks are registered here anymore.
"""
# Time domain callbacks moved to register_time_domain_callbacks()
# This function is kept for compatibility but registers nothing
logger = logging.getLogger(__name__)
logger.info(
"register_vitaldsp_callbacks called - no callbacks to register (migrated to time_domain_callbacks)"
)
pass