"""
Data processor utility for vitalDSP webapp.
This module provides utility functions for data processing and validation.
"""
import pandas as pd
import numpy as np
import base64
import io
import logging
from typing import Optional, Dict, Any
from pathlib import Path
logger = logging.getLogger(__name__)
[docs]
class DataProcessor:
"""Utility class for data processing operations."""
[docs]
@staticmethod
def validate_file_extension(filename: str) -> bool:
"""Validate if file extension is supported."""
if not filename:
return False
supported_extensions = [".csv", ".txt", ".mat"]
file_ext = Path(filename).suffix.lower()
return file_ext in supported_extensions
[docs]
@staticmethod
def read_uploaded_content(contents: str, filename: str) -> Optional[pd.DataFrame]:
"""Read uploaded file content."""
try:
content_type, content_string = contents.split(",")
decoded = base64.b64decode(content_string)
if filename.endswith(".csv"):
df = pd.read_csv(io.StringIO(decoded.decode("utf-8")))
elif filename.endswith(".txt"):
df = pd.read_csv(io.StringIO(decoded.decode("utf-8")), sep="\t")
else:
logger.error(f"Unsupported file format: {filename}")
return None
return df
except Exception as e:
logger.error(f"Error reading uploaded content: {e}")
return None
[docs]
@staticmethod
def read_file(file_path: str, filename: str) -> Optional[pd.DataFrame]:
"""Read file from file path."""
try:
if filename.endswith(".csv"):
df = pd.read_csv(file_path)
elif filename.endswith(".txt"):
df = pd.read_csv(file_path, sep="\t")
else:
logger.error(f"Unsupported file format: {filename}")
return None
return df
except Exception as e:
logger.error(f"Error reading file: {e}")
return None
[docs]
@staticmethod
def generate_sample_ppg_data(
sampling_freq: float,
duration: float = 10.0,
heart_rate: float = 70,
noise_level: float = 0.05,
) -> pd.DataFrame:
"""Generate sample PPG data for testing."""
try:
# Generate time axis
t = np.arange(0, duration, 1 / sampling_freq)
# Add input validation
if sampling_freq <= 0:
logger.error("Sampling frequency must be positive")
return None
if duration <= 0:
logger.error("Duration must be positive")
return None
# Generate synthetic PPG signal
heart_freq = heart_rate / 60 # Hz
# Create PPG-like signal with multiple components
signal = (
1.0 * np.sin(2 * np.pi * heart_freq * t) # Fundamental
+ 0.3 * np.sin(2 * np.pi * 2 * heart_freq * t) # Second harmonic
+ 0.1 * np.sin(2 * np.pi * 3 * heart_freq * t) # Third harmonic
+ noise_level * np.random.randn(len(t)) # Noise with configurable level
)
# Add respiratory modulation (0.2-0.5 Hz)
resp_freq = 0.3 # Hz
resp_modulation = 0.1 * np.sin(2 * np.pi * resp_freq * t)
signal = signal * (1 + resp_modulation)
# Create DataFrame
df = pd.DataFrame(
{
"time": t, # Use lowercase 'time' to match test expectations
"signal": signal, # Use 'signal' instead of 'PPG_Signal'
}
)
return df
except Exception as e:
logger.error(f"Error generating sample data: {e}")
return None
[docs]
@staticmethod
def generate_sample_ecg_data(
sampling_freq: float,
duration: float = 10.0,
heart_rate: float = 70,
noise_level: float = 0.04,
) -> pd.DataFrame:
"""Generate a sample ECG-like signal for testing.
Builds a beat by summing four narrow Gaussian bumps (the P, Q,
R, S, T deflections), tiles it at ``heart_rate`` bpm, and adds
a touch of Gaussian noise. Cheap, recognisable, no external
deps — same shape contract as
:meth:`generate_sample_ppg_data` (a ``time``/``signal`` two-
column DataFrame).
"""
try:
if sampling_freq <= 0 or duration <= 0:
logger.error("Sampling frequency and duration must be positive")
return None
t = np.arange(0, duration, 1.0 / sampling_freq)
rr = 60.0 / max(heart_rate, 1.0) # one R-R interval, seconds
sig = np.zeros_like(t)
# Deflection model (relative position within an R-R, amplitude, width-seconds).
# Order: P, Q, R, S, T.
beats = [
(0.20, 0.10, 0.030), # P
(0.46, -0.10, 0.010), # Q
(0.50, 1.00, 0.010), # R
(0.54, -0.25, 0.012), # S
(0.72, 0.30, 0.060), # T
]
n_beats = int(np.ceil(duration / rr)) + 1
for k in range(n_beats):
beat_start = k * rr
for rel_pos, amp, width in beats:
centre = beat_start + rel_pos * rr
sig += amp * np.exp(-((t - centre) ** 2) / (2.0 * width**2))
sig += noise_level * np.random.randn(len(t))
df = pd.DataFrame({"time": t, "signal": sig})
return df
except Exception as e: # pragma: no cover — defensive
logger.error(f"Error generating sample ECG data: {e}")
return None
[docs]
@staticmethod
def process_uploaded_data(
df: pd.DataFrame,
filename: str,
sampling_freq: float,
time_unit: str = "seconds",
) -> Optional[Dict[str, Any]]:
"""Process uploaded data and return metadata."""
try:
if df is None or df.empty:
return None
# Basic data validation
if len(df.columns) < 2:
logger.warning("Data should have at least 2 columns (time and signal)")
# Calculate basic statistics
signal_data = (
df.iloc[:, 1].values if len(df.columns) > 1 else df.iloc[:, 0].values
)
# Convert time unit if needed
if time_unit == "milliseconds":
sampling_freq = sampling_freq / 1000
elif time_unit == "minutes":
sampling_freq = sampling_freq * 60
duration = len(signal_data) / sampling_freq
return {
"filename": filename,
"shape": df.shape,
"columns": df.columns.tolist(),
"sampling_freq": sampling_freq,
"time_unit": time_unit,
"duration": duration,
"signal_length": len(signal_data),
"mean": float(np.mean(signal_data)) if len(signal_data) > 0 else 0.0,
"std": float(np.std(signal_data)) if len(signal_data) > 0 else 0.0,
"min": float(np.min(signal_data)) if len(signal_data) > 0 else 0.0,
"max": float(np.max(signal_data)) if len(signal_data) > 0 else 0.0,
# Add fields expected by tests
"num_rows": df.shape[0],
"num_columns": df.shape[1],
}
except Exception as e:
logger.error(f"Error processing uploaded data: {e}")
return None