Source code for vitalDSP.ml_models.deep_models

"""
Deep Learning Models Module for Physiological Signal Processing

This module provides state-of-the-art deep learning architectures for physiological
signal analysis including ECG, PPG, EEG, and other vital signs. It implements
comprehensive neural network models with support for both TensorFlow and PyTorch
frameworks, enabling advanced signal classification, anomaly detection, and
sequence modeling.

Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0

Key Features:
- 1D CNN for signal classification and feature extraction
- LSTM networks for sequence modeling and temporal dependencies
- Transformer architectures for long-range dependencies
- Autoencoder models for anomaly detection and denoising
- Pre-trained models and transfer learning capabilities
- Model training utilities and callbacks
- Cross-framework compatibility (TensorFlow/PyTorch)

Examples:
---------
Basic CNN model for ECG classification:
    >>> import numpy as np
    >>> from vitalDSP.ml_models.deep_models import CNN1D
    >>> model = CNN1D(input_shape=(1000, 1), n_classes=5)
    >>> model.build_model()
    >>> # Train with your data: model.train(X_train, y_train, epochs=10)

LSTM for sequence modeling:
    >>> from vitalDSP.ml_models.deep_models import LSTMModel
    >>> lstm_model = LSTMModel(input_shape=(100, 1), n_classes=3)
    >>> lstm_model.build_model()
    >>> # lstm_model.train(X_train, y_train, epochs=10)
"""

import numpy as np
import warnings
from typing import Optional, Union, Tuple, Dict, List, Any
from abc import ABC, abstractmethod


# Check for deep learning frameworks
try:
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import layers, models, callbacks

    TF_AVAILABLE = True
except ImportError:
    TF_AVAILABLE = False
    warnings.warn("TensorFlow not available. Install with: pip install tensorflow")

try:
    import torch
    import torch.nn as nn
    import torch.nn.functional as F

    TORCH_AVAILABLE = True
except ImportError:
    TORCH_AVAILABLE = False


[docs] class BaseDeepModel(ABC): """ Base class for deep learning models. Provides common interface for all deep models with support for both TensorFlow/Keras and PyTorch backends. """ def __init__( self, input_shape: Tuple[int, ...], n_classes: int, backend: str = "tensorflow", **kwargs, ): """ Initialize base model. Parameters ---------- input_shape : tuple Shape of input data (excluding batch dimension) n_classes : int Number of output classes backend : str, default='tensorflow' Deep learning backend ('tensorflow' or 'pytorch') """ self.input_shape = input_shape self.n_classes = n_classes self.backend = backend.lower() self.model = None self.history = None if self.backend == "tensorflow" and not TF_AVAILABLE: raise ImportError( "TensorFlow not available. Install with: pip install tensorflow" ) elif self.backend == "pytorch" and not TORCH_AVAILABLE: raise ImportError("PyTorch not available. Install with: pip install torch")
[docs] @abstractmethod def build_model(self): """Build the model architecture.""" pass
[docs] @abstractmethod def train(self, X_train, y_train, X_val=None, y_val=None, **kwargs): """Train the model.""" pass
[docs] @abstractmethod def predict(self, X): """Make predictions.""" pass
[docs] def save(self, filepath: str): """Save model to disk.""" if self.backend == "tensorflow": self.model.save(filepath) elif self.backend == "pytorch": torch.save(self.model.state_dict(), filepath)
[docs] def load(self, filepath: str): """Load model from disk.""" if self.backend == "tensorflow": self.model = keras.models.load_model(filepath) elif self.backend == "pytorch": self.model.load_state_dict(torch.load(filepath))
[docs] class CNN1D(BaseDeepModel): """ 1D Convolutional Neural Network for signal classification. Designed for physiological signal classification tasks such as: - ECG arrhythmia detection - PPG quality assessment - EEG sleep stage classification - Respiratory event detection Architecture: - Multiple 1D convolutional layers with batch normalization - MaxPooling for downsampling - Dropout for regularization - Dense layers for classification - Residual connections (optional) Parameters ---------- input_shape : tuple Shape of input signals (sequence_length, n_channels) n_classes : int Number of output classes n_filters : list of int, default=[32, 64, 128] Number of filters in each conv layer kernel_sizes : list of int, default=[7, 5, 3] Kernel sizes for each conv layer pool_sizes : list of int, default=[2, 2, 2] Pool sizes for each pooling layer dropout_rate : float, default=0.5 Dropout rate for regularization use_residual : bool, default=False Whether to use residual connections backend : str, default='tensorflow' Deep learning backend Attributes ---------- model : keras.Model or torch.nn.Module The underlying model history : dict Training history Examples -------- >>> from vitalDSP.ml_models import CNN1D >>> model = CNN1D(input_shape=(1000, 1), n_classes=5) >>> model.build_model() >>> model.train(X_train, y_train, epochs=50, batch_size=32) >>> predictions = model.predict(X_test) """ def __init__( self, input_shape: Tuple[int, ...], n_classes: int, n_filters: List[int] = None, kernel_sizes: List[int] = None, pool_sizes: List[int] = None, dropout_rate: float = 0.5, use_residual: bool = False, backend: str = "tensorflow", **kwargs, ): super().__init__(input_shape, n_classes, backend, **kwargs) self.n_filters = n_filters or [32, 64, 128] self.kernel_sizes = kernel_sizes or [7, 5, 3] self.pool_sizes = pool_sizes or [2, 2, 2] self.dropout_rate = dropout_rate self.use_residual = use_residual
[docs] def build_model(self): """Build 1D CNN architecture.""" if self.backend == "tensorflow": self._build_tensorflow_model() elif self.backend == "pytorch": self._build_pytorch_model() return self.model
def _build_tensorflow_model(self): """Build TensorFlow/Keras model.""" inputs = keras.Input(shape=self.input_shape) x = inputs # Convolutional blocks for i, (n_filters, kernel_size, pool_size) in enumerate( zip(self.n_filters, self.kernel_sizes, self.pool_sizes) ): # Residual connection if self.use_residual and i > 0: residual = x # Conv block x = layers.Conv1D( n_filters, kernel_size, padding="same", activation=None, name=f"conv1d_{i+1}", )(x) x = layers.BatchNormalization(name=f"bn_{i+1}")(x) x = layers.Activation("relu", name=f"relu_{i+1}")(x) # Residual addition if self.use_residual and i > 0: # Match dimensions if needed if residual.shape[-1] != x.shape[-1]: residual = layers.Conv1D( n_filters, 1, padding="same", name=f"residual_conv1d_{i+1}" )(residual) x = layers.Add(name=f"add_{i+1}")([x, residual]) x = layers.MaxPooling1D(pool_size, name=f"maxpool_{i+1}")(x) x = layers.Dropout(self.dropout_rate, name=f"dropout_{i+1}")(x) # Global pooling x = layers.GlobalAveragePooling1D(name="global_pool")(x) # Dense layers x = layers.Dense(128, activation="relu", name="dense_1")(x) x = layers.Dropout(self.dropout_rate, name="dropout_dense")(x) x = layers.Dense(64, activation="relu", name="dense_2")(x) # Output layer if self.n_classes == 2: outputs = layers.Dense(1, activation="sigmoid", name="output")(x) else: outputs = layers.Dense(self.n_classes, activation="softmax", name="output")( x ) self.model = keras.Model(inputs=inputs, outputs=outputs, name="CNN1D") def _build_pytorch_model(self): """Build PyTorch model.""" class CNN1DModule(nn.Module): def __init__( self, input_shape, n_classes, n_filters, kernel_sizes, pool_sizes, dropout_rate, ): super().__init__() self.conv_blocks = nn.ModuleList() in_channels = input_shape[1] if len(input_shape) > 1 else 1 for n_filter, kernel_size, pool_size in zip( n_filters, kernel_sizes, pool_sizes ): block = nn.Sequential( nn.Conv1d( in_channels, n_filter, kernel_size, padding=kernel_size // 2 ), nn.BatchNorm1d(n_filter), nn.ReLU(), nn.MaxPool1d(pool_size), nn.Dropout(dropout_rate), ) self.conv_blocks.append(block) in_channels = n_filter self.global_pool = nn.AdaptiveAvgPool1d(1) self.fc1 = nn.Linear(n_filters[-1], 128) self.dropout = nn.Dropout(dropout_rate) self.fc2 = nn.Linear(128, 64) self.output = nn.Linear(64, n_classes if n_classes > 2 else 1) def forward(self, x): for block in self.conv_blocks: x = block(x) x = self.global_pool(x) x = x.squeeze(-1) x = F.relu(self.fc1(x)) x = self.dropout(x) x = F.relu(self.fc2(x)) x = self.output(x) if x.shape[-1] == 1: x = torch.sigmoid(x) else: x = F.softmax(x, dim=-1) return x self.model = CNN1DModule( self.input_shape, self.n_classes, self.n_filters, self.kernel_sizes, self.pool_sizes, self.dropout_rate, )
[docs] def train( self, X_train: np.ndarray, y_train: np.ndarray, X_val: Optional[np.ndarray] = None, y_val: Optional[np.ndarray] = None, epochs: int = 50, batch_size: int = 32, learning_rate: float = 0.001, **kwargs, ): """ Train the CNN model. Parameters ---------- X_train : ndarray of shape (n_samples, sequence_length, n_channels) Training data y_train : ndarray of shape (n_samples,) or (n_samples, n_classes) Training labels X_val : ndarray, optional Validation data y_val : ndarray, optional Validation labels epochs : int, default=50 Number of training epochs batch_size : int, default=32 Batch size for training learning_rate : float, default=0.001 Learning rate **kwargs : dict Additional training parameters Returns ------- history : dict Training history """ if self.backend == "tensorflow": return self._train_tensorflow( X_train, y_train, X_val, y_val, epochs, batch_size, learning_rate, **kwargs, ) elif self.backend == "pytorch": return self._train_pytorch( X_train, y_train, X_val, y_val, epochs, batch_size, learning_rate, **kwargs, )
def _train_tensorflow( self, X_train, y_train, X_val, y_val, epochs, batch_size, learning_rate, **kwargs, ): """Train TensorFlow model.""" # Compile model optimizer = keras.optimizers.Adam(learning_rate=learning_rate) if self.n_classes == 2: loss = "binary_crossentropy" metrics = ["accuracy", keras.metrics.AUC(name="auc")] else: loss = ( "sparse_categorical_crossentropy" if y_train.ndim == 1 else "categorical_crossentropy" ) metrics = ["accuracy"] self.model.compile(optimizer=optimizer, loss=loss, metrics=metrics) # Callbacks callback_list = [ callbacks.EarlyStopping( monitor="val_loss" if X_val is not None else "loss", patience=10, restore_best_weights=True, ), callbacks.ReduceLROnPlateau( monitor="val_loss" if X_val is not None else "loss", factor=0.5, patience=5, min_lr=1e-7, ), ] # Train validation_data = (X_val, y_val) if X_val is not None else None history = self.model.fit( X_train, y_train, validation_data=validation_data, epochs=epochs, batch_size=batch_size, callbacks=callback_list, verbose=kwargs.get("verbose", 1), ) self.history = history.history return self.history def _train_pytorch( self, X_train, y_train, X_val, y_val, epochs, batch_size, learning_rate, **kwargs, ): """Train PyTorch model.""" # Convert to PyTorch tensors X_train_tensor = torch.FloatTensor(X_train).permute(0, 2, 1) # (N, C, L) # For multi-class classification with CrossEntropyLoss, convert labels to Long # For binary classification with BCELoss, keep as Float if self.n_classes == 2: # BCELoss expects labels to have same shape as output, including feature dimension if len(y_train.shape) == 1: y_train_tensor = torch.FloatTensor(y_train).unsqueeze(1) else: y_train_tensor = torch.FloatTensor(y_train) else: # For multi-class, convert to class indices if needed if len(y_train.shape) > 1 and y_train.shape[1] > 1: # One-hot encoded, convert to class indices y_train_tensor = torch.LongTensor(np.argmax(y_train, axis=1)) else: # Already class indices y_train_tensor = torch.LongTensor(y_train.astype(np.int64).flatten()) train_dataset = torch.utils.data.TensorDataset(X_train_tensor, y_train_tensor) train_loader = torch.utils.data.DataLoader( train_dataset, batch_size=batch_size, shuffle=True ) # Loss and optimizer if self.n_classes == 2: criterion = nn.BCELoss() else: criterion = nn.CrossEntropyLoss() optimizer = torch.optim.Adam(self.model.parameters(), lr=learning_rate) # Training loop self.history = {"loss": [], "accuracy": []} if X_val is not None: self.history["val_loss"] = [] self.history["val_accuracy"] = [] device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.model.to(device) for epoch in range(epochs): self.model.train() epoch_loss = 0 correct = 0 total = 0 for batch_X, batch_y in train_loader: batch_X, batch_y = batch_X.to(device), batch_y.to(device) optimizer.zero_grad() outputs = self.model(batch_X) loss = criterion(outputs, batch_y) loss.backward() optimizer.step() epoch_loss += loss.item() if self.n_classes == 2: predicted = (outputs > 0.5).float().view(-1) batch_y_cmp = batch_y.view(-1) else: _, predicted = torch.max(outputs.data, 1) batch_y_cmp = batch_y total += batch_y.size(0) correct += (predicted == batch_y_cmp).sum().item() self.history["loss"].append(epoch_loss / len(train_loader)) self.history["accuracy"].append(correct / total) if kwargs.get("verbose", 1) > 0: print( f"Epoch {epoch+1}/{epochs} - loss: {self.history['loss'][-1]:.4f} - accuracy: {self.history['accuracy'][-1]:.4f}" ) return self.history
[docs] def predict(self, X: np.ndarray) -> np.ndarray: """ Make predictions. Parameters ---------- X : ndarray of shape (n_samples, sequence_length, n_channels) Input data Returns ------- predictions : ndarray Predicted class probabilities or labels """ if self.backend == "tensorflow": return self.model.predict(X) elif self.backend == "pytorch": self.model.eval() with torch.no_grad(): X_tensor = torch.FloatTensor(X).permute(0, 2, 1) device = next(self.model.parameters()).device X_tensor = X_tensor.to(device) outputs = self.model(X_tensor) return outputs.cpu().numpy()
[docs] class LSTMModel(BaseDeepModel): """ LSTM (Long Short-Term Memory) for sequence modeling. Ideal for: - Time series forecasting - Sequential pattern recognition - Long-term dependency modeling - Real-time signal prediction Parameters ---------- input_shape : tuple Shape of input sequences n_classes : int Number of output classes (use 1 for regression) lstm_units : list of int, default=[128, 64] Number of units in each LSTM layer dropout_rate : float, default=0.3 Dropout rate bidirectional : bool, default=True Whether to use bidirectional LSTM task : str, default='classification' Task type ('classification' or 'regression') Examples -------- >>> model = LSTMModel(input_shape=(100, 12), n_classes=4) >>> model.build_model() >>> model.train(X_train, y_train, epochs=100) """ def __init__( self, input_shape: Tuple[int, ...], n_classes: int, lstm_units: List[int] = None, dropout_rate: float = 0.3, bidirectional: bool = True, task: str = "classification", backend: str = "tensorflow", **kwargs, ): super().__init__(input_shape, n_classes, backend, **kwargs) self.lstm_units = lstm_units or [128, 64] self.dropout_rate = dropout_rate self.bidirectional = bidirectional self.task = task
[docs] def build_model(self): """Build LSTM architecture.""" if self.backend == "tensorflow": self._build_tensorflow_model() elif self.backend == "pytorch": self._build_pytorch_model() return self.model
def _build_tensorflow_model(self): """Build TensorFlow/Keras LSTM model.""" inputs = keras.Input(shape=self.input_shape) x = inputs # LSTM layers for i, units in enumerate(self.lstm_units): return_sequences = i < len(self.lstm_units) - 1 if self.bidirectional: x = layers.Bidirectional( layers.LSTM(units, return_sequences=return_sequences), name=f"bilstm_{i+1}", )(x) else: x = layers.LSTM( units, return_sequences=return_sequences, name=f"lstm_{i+1}" )(x) x = layers.Dropout(self.dropout_rate, name=f"dropout_{i+1}")(x) # Dense layers x = layers.Dense(64, activation="relu", name="dense")(x) x = layers.Dropout(self.dropout_rate, name="dropout_dense")(x) # Output layer if self.task == "classification": if self.n_classes == 2: outputs = layers.Dense(1, activation="sigmoid", name="output")(x) else: outputs = layers.Dense( self.n_classes, activation="softmax", name="output" )(x) else: # regression outputs = layers.Dense(1, activation="linear", name="output")(x) self.model = keras.Model(inputs=inputs, outputs=outputs, name="LSTM") def _build_pytorch_model(self): """Build PyTorch LSTM model.""" class LSTMModule(nn.Module): def __init__( self, input_size, lstm_units, n_classes, dropout_rate, bidirectional, task, ): super().__init__() self.lstm_layers = nn.ModuleList() self.task = task for i, units in enumerate(lstm_units): input_dim = ( input_size if i == 0 else lstm_units[i - 1] * (2 if bidirectional else 1) ) lstm = nn.LSTM( input_dim, units, batch_first=True, bidirectional=bidirectional ) self.lstm_layers.append(lstm) final_dim = lstm_units[-1] * (2 if bidirectional else 1) self.dropout = nn.Dropout(dropout_rate) self.fc1 = nn.Linear(final_dim, 64) self.fc2 = nn.Linear( 64, n_classes if task == "classification" and n_classes > 2 else 1 ) def forward(self, x): for lstm in self.lstm_layers: x, _ = lstm(x) # Take last output x = x[:, -1, :] x = self.dropout(x) x = F.relu(self.fc1(x)) x = self.dropout(x) x = self.fc2(x) if self.task == "classification": if x.shape[-1] == 1: x = torch.sigmoid(x) else: x = F.softmax(x, dim=-1) return x input_size = self.input_shape[-1] if len(self.input_shape) > 1 else 1 self.model = LSTMModule( input_size, self.lstm_units, self.n_classes, self.dropout_rate, self.bidirectional, self.task, )
[docs] def train( self, X_train: np.ndarray, y_train: np.ndarray, X_val: Optional[np.ndarray] = None, y_val: Optional[np.ndarray] = None, epochs: int = 100, batch_size: int = 32, learning_rate: float = 0.001, **kwargs, ): """Train the LSTM model.""" # Similar to CNN1D.train() if self.backend == "tensorflow": return self._train_tensorflow( X_train, y_train, X_val, y_val, epochs, batch_size, learning_rate, **kwargs, ) elif self.backend == "pytorch": return self._train_pytorch( X_train, y_train, X_val, y_val, epochs, batch_size, learning_rate, **kwargs, )
def _train_tensorflow( self, X_train, y_train, X_val, y_val, epochs, batch_size, learning_rate, **kwargs, ): """Train TensorFlow LSTM model.""" optimizer = keras.optimizers.Adam(learning_rate=learning_rate) if self.task == "classification": if self.n_classes == 2: loss = "binary_crossentropy" metrics = ["accuracy"] else: loss = ( "sparse_categorical_crossentropy" if y_train.ndim == 1 else "categorical_crossentropy" ) metrics = ["accuracy"] else: # regression loss = "mse" metrics = ["mae"] self.model.compile(optimizer=optimizer, loss=loss, metrics=metrics) callback_list = [ callbacks.EarlyStopping( monitor="val_loss" if X_val is not None else "loss", patience=15, restore_best_weights=True, ), callbacks.ReduceLROnPlateau( monitor="val_loss" if X_val is not None else "loss", factor=0.5, patience=7, min_lr=1e-7, ), ] validation_data = (X_val, y_val) if X_val is not None else None history = self.model.fit( X_train, y_train, validation_data=validation_data, epochs=epochs, batch_size=batch_size, callbacks=callback_list, verbose=kwargs.get("verbose", 1), ) self.history = history.history return self.history def _train_pytorch( self, X_train, y_train, X_val, y_val, epochs, batch_size, learning_rate, **kwargs, ): """Train PyTorch LSTM model.""" X_train_tensor = torch.FloatTensor(X_train) # (N, L, C) — batch_first if self.n_classes == 2: y_train_tensor = torch.FloatTensor(y_train.flatten()) else: if y_train.ndim > 1 and y_train.shape[1] > 1: y_train_tensor = torch.LongTensor(np.argmax(y_train, axis=1)) else: y_train_tensor = torch.LongTensor(y_train.astype(np.int64).flatten()) train_dataset = torch.utils.data.TensorDataset(X_train_tensor, y_train_tensor) train_loader = torch.utils.data.DataLoader( train_dataset, batch_size=batch_size, shuffle=True ) criterion = nn.BCELoss() if self.n_classes == 2 else nn.CrossEntropyLoss() optimizer = torch.optim.Adam(self.model.parameters(), lr=learning_rate) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.model.to(device) self.history = {"loss": [], "accuracy": []} for epoch in range(epochs): self.model.train() epoch_loss, correct, total = 0, 0, 0 for batch_X, batch_y in train_loader: batch_X, batch_y = batch_X.to(device), batch_y.to(device) optimizer.zero_grad() outputs = self.model(batch_X) if self.n_classes == 2: loss = criterion(outputs.squeeze(1), batch_y) predicted = (outputs.squeeze(1) > 0.5).float() else: loss = criterion(outputs, batch_y) _, predicted = torch.max(outputs.data, 1) loss.backward() optimizer.step() epoch_loss += loss.item() total += batch_y.size(0) correct += (predicted == batch_y).sum().item() self.history["loss"].append(epoch_loss / len(train_loader)) self.history["accuracy"].append(correct / total) if kwargs.get("verbose", 1) > 0: print( f"Epoch {epoch+1}/{epochs} - loss: {self.history['loss'][-1]:.4f} - accuracy: {self.history['accuracy'][-1]:.4f}" ) return self.history
[docs] def predict(self, X: np.ndarray) -> np.ndarray: """Make predictions.""" if self.backend == "tensorflow": return self.model.predict(X) elif self.backend == "pytorch": self.model.eval() with torch.no_grad(): X_tensor = torch.FloatTensor(X) device = next(self.model.parameters()).device X_tensor = X_tensor.to(device) outputs = self.model(X_tensor) return outputs.cpu().numpy()
# Additional models: Transformer and Autoencoder will be in separate files due to length # See: transformer_model.py and autoencoder_model.py