"""
Data service for vitalDSP webapp.
This module provides data management and processing services.
"""
import pandas as pd
import numpy as np
import logging
from typing import Optional, Dict, Any
from pathlib import Path
logger = logging.getLogger(__name__)
[docs]
class DataService:
"""Service for managing data operations."""
def __init__(self):
self.current_data: Optional[pd.DataFrame] = None
self.data_config: Dict[str, Any] = {}
self._data_store: Dict[str, Any] = {}
self._column_mappings: Dict[str, Dict[str, str]] = {}
self._next_id = 1
[docs]
def load_data(self, file_path: str) -> Optional[pd.DataFrame]:
"""Load data from file path."""
try:
file_path = Path(file_path)
if file_path.suffix.lower() == ".csv":
df = pd.read_csv(file_path)
elif file_path.suffix.lower() == ".txt":
df = pd.read_csv(file_path, sep="\t")
elif file_path.suffix.lower() == ".mat":
# For .mat files, we'd need scipy.io
logger.warning(".mat files not yet supported")
return None
else:
logger.error(f"Unsupported file format: {file_path.suffix}")
return None
self.current_data = df
return df
except Exception as e:
logger.error(f"Error loading data: {e}")
return None
[docs]
def process_data(
self, df: pd.DataFrame, sampling_freq: float, time_unit: str = "seconds"
) -> Dict[str, Any]:
"""Process uploaded data and return metadata."""
try:
# Basic data validation
if df.empty:
return {"error": "Data is empty"}
# Calculate basic statistics
signal_data = (
df.iloc[:, 1].values if len(df.columns) > 1 else df.iloc[:, 0].values
)
# Convert time unit if needed
if time_unit == "milliseconds":
sampling_freq = sampling_freq / 1000
elif time_unit == "minutes":
sampling_freq = sampling_freq * 60
duration = len(signal_data) / sampling_freq
return {
"shape": df.shape,
"columns": df.columns.tolist(),
"sampling_freq": sampling_freq,
"time_unit": time_unit,
"duration": duration,
"signal_length": len(signal_data),
"mean": float(np.mean(signal_data)) if len(signal_data) > 0 else 0.0,
"std": float(np.std(signal_data)) if len(signal_data) > 0 else 0.0,
"min": float(np.min(signal_data)) if len(signal_data) > 0 else 0.0,
"max": float(np.max(signal_data)) if len(signal_data) > 0 else 0.0,
}
except Exception as e:
logger.error(f"Error processing data: {e}")
return {"error": str(e)}
[docs]
def store_data(self, df: pd.DataFrame, info: Dict[str, Any]) -> str:
"""Store data with a unique ID and return the ID."""
try:
data_id = f"data_{self._next_id}"
self._next_id += 1
logger.info("=== STORING DATA ===")
logger.info(f"Data ID: {data_id}")
logger.info(f"Data shape: {df.shape}")
logger.info(f"Data columns: {list(df.columns)}")
logger.info(f"Data info: {info}")
logger.info(f"Signal type in info: {info.get('signal_type', 'NOT FOUND')}")
self._data_store[data_id] = {
"data": df,
"info": info,
"timestamp": pd.Timestamp.now().isoformat(),
}
# Check if custom column mapping is provided in info
if "column_mapping" in info and info["column_mapping"]:
logger.info("Using custom column mapping from info")
column_mapping = info["column_mapping"]
logger.info(f"Custom column mapping: {column_mapping}")
else:
# Auto-generate column mapping only if no custom mapping provided
logger.info("No custom column mapping found, auto-detecting columns...")
column_mapping = self._auto_detect_columns(df)
logger.info(f"Auto-detected column mapping: {column_mapping}")
self._column_mappings[data_id] = column_mapping
logger.info(f"Data stored with ID: {data_id}")
logger.info(f"Final column mapping: {column_mapping}")
return data_id
except Exception as e:
logger.error(f"Error storing data: {e}")
return None
[docs]
def get_data(self, data_id: str) -> Optional[pd.DataFrame]:
"""Get data by ID."""
if data_id in self._data_store:
return self._data_store[data_id]["data"]
return None
[docs]
def get_all_data(self) -> Dict[str, Any]:
"""Get all stored data."""
return self._data_store
[docs]
def get_column_mapping(self, data_id: str) -> Dict[str, str]:
"""Get column mapping for a specific data ID."""
mapping = self._column_mappings.get(data_id, {})
logger.info(f"Getting column mapping for {data_id}: {mapping}")
return mapping
[docs]
def update_column_mapping(self, data_id: str, mapping: Dict[str, str]):
"""Update column mapping for a specific data ID."""
self._column_mappings[data_id] = mapping
logger.info(f"Column mapping updated for {data_id}: {mapping}")
def _auto_detect_columns(self, df: pd.DataFrame) -> Dict[str, str]:
"""Auto-detect column types based on data characteristics."""
mapping = {}
if len(df.columns) >= 1:
# Look for specific column types based on names first
for col in df.columns:
col_lower = col.lower()
# Time-related columns - only set if not already set
# Be more specific about time column detection to avoid false positives
if "time" not in mapping and (
any(keyword in col_lower for keyword in ["time", "timestamp"])
or col_lower == "t"
):
mapping["time"] = col
# Signal columns - prioritize waveform/pleth columns
elif any(
keyword in col_lower for keyword in ["waveform", "pleth", "pl"]
):
mapping["signal"] = col
logger.info(f"Found waveform/pleth column: {col}")
# Other signal columns
elif any(keyword in col_lower for keyword in ["signal", "ppg", "ecg"]):
if "signal" not in mapping: # Only set if no waveform/pleth found
mapping["signal"] = col
logger.info(f"Found signal column: {col}")
# RED channel (for pulse oximetry) - only set if not already set
elif "red" not in mapping and any(
keyword in col_lower for keyword in ["red"]
):
mapping["red"] = col
# IR channel (for pulse oximetry) - only set if not already set
elif "ir" not in mapping and any(
keyword in col_lower for keyword in ["ir", "infrared"]
):
mapping["ir"] = col
# If no specific columns found, use defaults based on position
# Priority: time = first column, signal = second column (if available)
# ALWAYS assign time to first column if not already detected
if "time" not in mapping and len(df.columns) > 0:
mapping["time"] = df.columns[0]
logger.info(f"Using default time column: {df.columns[0]}")
# Only assign signal column if not already detected
if "signal" not in mapping:
if len(df.columns) > 1:
mapping["signal"] = df.columns[1]
logger.info(f"Using default signal column: {df.columns[1]}")
elif len(df.columns) == 1:
# If only one column, use it for signal (time was already assigned)
mapping["signal"] = df.columns[0]
logger.info(f"Using single column for signal: {df.columns[0]}")
# Special case: single column dataframe
if len(df.columns) == 1:
single_col = df.columns[0]
mapping["time"] = single_col
mapping["signal"] = single_col
logger.info(
f"Single column detected, mapping both time and signal to: {single_col}"
)
logger.info(f"Auto-detected column mapping: {mapping}")
return mapping
[docs]
def get_data_info(self, data_id: str) -> Optional[Dict[str, Any]]:
"""Get data info by ID."""
if data_id in self._data_store:
info = self._data_store[data_id]["info"]
logger.info("=== RETRIEVING DATA INFO ===")
logger.info(f"Data ID: {data_id}")
logger.info(f"Info keys: {list(info.keys()) if info else 'None'}")
logger.info(
f"Signal type in retrieved info: {info.get('signal_type', 'NOT FOUND')}"
)
return info
logger.warning(f"Data ID {data_id} not found in data store")
return None
[docs]
def get_current_data(self) -> Optional[pd.DataFrame]:
"""Get current data."""
return self.current_data
[docs]
def get_config(self) -> Dict[str, Any]:
"""Get current data configuration."""
return self.data_config
[docs]
def update_config(self, config: Dict[str, Any]):
"""Update data configuration."""
self.data_config.update(config)
[docs]
def clear_data(self, data_id: str):
"""Clear data by ID."""
if data_id in self._data_store:
del self._data_store[data_id]
if data_id in self._column_mappings:
del self._column_mappings[data_id]
logger.info(f"Data cleared for ID: {data_id}")
[docs]
def clear_all_data(self):
"""Clear all stored data."""
self.current_data = None
self.data_config.clear()
self._data_store.clear()
self._column_mappings.clear()
self._next_id = 1
logger.info("All data cleared")
[docs]
def get_current_config(self) -> Dict[str, Any]:
"""Get current data configuration."""
return self.data_config.copy() # Return a copy, not the original
[docs]
def set_column_mapping(self, data_id: str, mapping: Dict[str, str]):
"""Set column mapping for a specific data ID."""
self._column_mappings[data_id] = mapping
logger.info(f"Column mapping set for {data_id}: {mapping}")
[docs]
def get_data_summary(self) -> Optional[Dict[str, Any]]:
"""Get summary of current data."""
if self.current_data is None or self.current_data.empty:
return None
return {
"shape": self.current_data.shape,
"columns": self.current_data.columns.tolist(),
"data_config": self.data_config,
}
[docs]
def store_filtered_data(
self, data_id: str, filtered_signal: np.ndarray, filter_info: Dict[str, Any]
) -> bool:
"""Store filtered signal data from filtering screen."""
try:
if data_id not in self._data_store:
logger.error(f"Data ID {data_id} not found")
return False
logger.info(f"Storing filtered data for ID: {data_id}")
logger.info(f"Filtered signal shape: {filtered_signal.shape}")
logger.info(f"Filter info: {filter_info}")
self._data_store[data_id]["filtered_signal"] = filtered_signal
self._data_store[data_id]["filter_info"] = filter_info
self._data_store[data_id]["has_filtered_data"] = True
self._data_store[data_id][
"filtered_timestamp"
] = pd.Timestamp.now().isoformat()
logger.info(f"Filtered data stored successfully for ID: {data_id}")
return True
except Exception as e:
logger.error(f"Error storing filtered data: {e}")
return False
[docs]
def get_filtered_data(self, data_id: str) -> Optional[np.ndarray]:
"""Retrieve filtered signal data."""
try:
if data_id not in self._data_store:
logger.warning(f"Data ID {data_id} not found")
return None
data = self._data_store[data_id]
if data.get("has_filtered_data", False):
logger.info(f"Retrieved filtered data for ID: {data_id}")
return data.get("filtered_signal")
else:
logger.info(f"No filtered data available for ID: {data_id}")
return None
except Exception as e:
logger.error(f"Error retrieving filtered data: {e}")
return None
[docs]
def has_filtered_data(self, data_id: str) -> bool:
"""Check if filtered data is available."""
try:
if data_id not in self._data_store:
return False
has_filtered = self._data_store[data_id].get("has_filtered_data", False)
logger.info(f"Filtered data available for ID {data_id}: {has_filtered}")
return has_filtered
except Exception as e:
logger.error(f"Error checking filtered data availability: {e}")
return False
[docs]
def get_filter_info(self, data_id: str) -> Optional[Dict[str, Any]]:
"""Get filter information for a specific data ID."""
try:
if data_id not in self._data_store:
return None
data = self._data_store[data_id]
if data.get("has_filtered_data", False):
return data.get("filter_info")
return None
except Exception as e:
logger.error(f"Error retrieving filter info: {e}")
return None
[docs]
def clear_filtered_data(self, data_id: str) -> bool:
"""Clear filtered data for a specific data ID."""
try:
if data_id not in self._data_store:
logger.warning(f"Data ID {data_id} not found")
return False
# Remove filtered data fields
if "filtered_signal" in self._data_store[data_id]:
del self._data_store[data_id]["filtered_signal"]
if "filter_info" in self._data_store[data_id]:
del self._data_store[data_id]["filter_info"]
if "has_filtered_data" in self._data_store[data_id]:
del self._data_store[data_id]["has_filtered_data"]
if "filtered_timestamp" in self._data_store[data_id]:
del self._data_store[data_id]["filtered_timestamp"]
logger.info(f"Filtered data cleared for ID: {data_id}")
return True
except Exception as e:
logger.error(f"Error clearing filtered data: {e}")
return False
# Global instance
_data_service = DataService()
[docs]
def get_data_service() -> DataService:
"""Get the global data service instance."""
return _data_service