Source code for vitalDSP.ml_models.autoencoder

"""
Autoencoder Models Module for Physiological Signal Processing

This module provides comprehensive autoencoder architectures for physiological
signal analysis including ECG, PPG, EEG, and other vital signs. It implements
various autoencoder types for unsupervised anomaly detection, signal denoising,
dimensionality reduction, and feature learning.

Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0

Key Features:
- Standard Autoencoder for basic reconstruction
- Variational Autoencoder (VAE) for probabilistic modeling
- Denoising Autoencoder for noise reduction
- Convolutional Autoencoder for spatial feature learning
- LSTM Autoencoder for temporal sequence modeling
- Comprehensive training and evaluation utilities
- Model saving and loading capabilities

Examples:
--------
Basic autoencoder for anomaly detection:
    >>> import numpy as np
    >>> from vitalDSP.ml_models.autoencoder import StandardAutoencoder
    >>> signal_data = np.random.randn(1000, 100)  # 1000 samples, 100 features
    >>> autoencoder = StandardAutoencoder(input_dim=100, encoding_dim=32)
    >>> autoencoder.compile(optimizer='adam', loss='mse')
    >>> autoencoder.fit(signal_data, signal_data, epochs=10)

Variational autoencoder:
    >>> from vitalDSP.ml_models.autoencoder import VariationalAutoencoder
    >>> vae = VariationalAutoencoder(input_dim=100, latent_dim=16)
    >>> vae.compile(optimizer='adam', loss='mse')
    >>> vae.fit(signal_data, signal_data, epochs=10)

Denoising autoencoder:
    >>> from vitalDSP.ml_models.autoencoder import DenoisingAutoencoder
    >>> noisy_data = signal_data + np.random.normal(0, 0.1, signal_data.shape)
    >>> dae = DenoisingAutoencoder(input_dim=100, encoding_dim=32)
    >>> dae.compile(optimizer='adam', loss='mse')
    >>> dae.fit(noisy_data, signal_data, epochs=10)
"""

import numpy as np
from typing import Optional, Union, Tuple, List, Dict, Any, Callable
from pathlib import Path
import warnings

try:
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import layers, Model

    TENSORFLOW_AVAILABLE = True
except ImportError:
    TENSORFLOW_AVAILABLE = False

try:
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    from torch.utils.data import DataLoader, TensorDataset

    PYTORCH_AVAILABLE = True
except ImportError:
    PYTORCH_AVAILABLE = False


[docs] class BaseAutoencoder: """ Base class for all autoencoder models. Provides common functionality for encoding, decoding, and anomaly detection. """ def __init__( self, input_shape: Tuple[int, ...], latent_dim: int = 32, backend: str = "tensorflow", random_state: Optional[int] = None, ): """ Initialize base autoencoder. Parameters ---------- input_shape : tuple Shape of input signals (length, n_channels) or (length,) latent_dim : int, default=32 Dimensionality of latent space backend : str, default='tensorflow' Deep learning backend ('tensorflow' or 'pytorch') random_state : int, optional Random seed for reproducibility """ self.input_shape = input_shape self.latent_dim = latent_dim self.backend = backend.lower() self.random_state = random_state self.encoder = None self.decoder = None self.model = None self.history = None # Set random seeds if random_state is not None: np.random.seed(random_state) if self.backend == "tensorflow" and TENSORFLOW_AVAILABLE: tf.random.set_seed(random_state) elif self.backend == "pytorch" and PYTORCH_AVAILABLE: torch.manual_seed(random_state) # Validate backend if self.backend == "tensorflow" and not TENSORFLOW_AVAILABLE: raise ImportError( "TensorFlow is not installed. Install with: pip install tensorflow" ) elif self.backend == "pytorch" and not PYTORCH_AVAILABLE: raise ImportError( "PyTorch is not installed. Install with: pip install torch" )
[docs] def encode(self, X: np.ndarray) -> np.ndarray: """ Encode signals to latent space. Parameters ---------- X : np.ndarray Input signals of shape (n_samples, length) or (n_samples, length, n_channels) Returns ------- np.ndarray Latent representations of shape (n_samples, latent_dim) """ if self.encoder is None: raise ValueError("Model not built. Call fit() first.") if self.backend == "tensorflow": return self.encoder.predict(X, verbose=0) else: # pytorch self.encoder.eval() with torch.no_grad(): X_tensor = torch.FloatTensor(X) latent = self.encoder(X_tensor) return latent.cpu().numpy()
[docs] def decode(self, latent: np.ndarray) -> np.ndarray: """ Decode latent representations to signals. Parameters ---------- latent : np.ndarray Latent representations of shape (n_samples, latent_dim) Returns ------- np.ndarray Reconstructed signals """ if self.decoder is None: raise ValueError("Model not built. Call fit() first.") if self.backend == "tensorflow": return self.decoder.predict(latent, verbose=0) else: # pytorch self.decoder.eval() with torch.no_grad(): latent_tensor = torch.FloatTensor(latent) reconstructed = self.decoder(latent_tensor) return reconstructed.cpu().numpy()
[docs] def predict(self, X: np.ndarray) -> np.ndarray: """ Predict (reconstruct) signals. Parameters ---------- X : np.ndarray Input signals of shape (n_samples, length) or (n_samples, length, n_channels) Returns ------- np.ndarray Reconstructed signals """ if self.model is None: raise ValueError("Model not built. Call fit() first.") # Handle empty input gracefully if X.size == 0 or (hasattr(X, "shape") and X.shape[0] == 0): # Return empty array with correct output shape output_shape = (0,) + self.input_shape return np.empty(output_shape, dtype=np.float32) if self.backend == "tensorflow": return self.model.predict(X, verbose=0) else: # pytorch self.model.eval() with torch.no_grad(): X_tensor = torch.FloatTensor(X) reconstructed = self.model(X_tensor) return reconstructed.cpu().numpy()
[docs] def compute_reconstruction_error( self, X: np.ndarray, metric: str = "mse" ) -> np.ndarray: """ Compute reconstruction error for anomaly detection. Parameters ---------- X : np.ndarray Input signals metric : str, default='mse' Error metric ('mse', 'mae', 'rmse') Returns ------- np.ndarray Reconstruction errors of shape (n_samples,) """ # Get reconstructions if self.backend == "tensorflow": X_reconstructed = self.predict(X) else: # pytorch self.model.eval() with torch.no_grad(): X_tensor = torch.FloatTensor(X) X_reconstructed = self.model(X_tensor).cpu().numpy() # Compute error if metric == "mse": errors = np.mean((X - X_reconstructed) ** 2, axis=tuple(range(1, X.ndim))) elif metric == "mae": errors = np.mean(np.abs(X - X_reconstructed), axis=tuple(range(1, X.ndim))) elif metric == "rmse": errors = np.sqrt( np.mean((X - X_reconstructed) ** 2, axis=tuple(range(1, X.ndim))) ) else: raise ValueError(f"Unknown metric: {metric}") return errors
[docs] def detect_anomalies( self, X: np.ndarray, threshold: Optional[float] = None, contamination: float = 0.1, metric: str = "mse", ) -> Tuple[np.ndarray, np.ndarray, float]: """ Detect anomalies using reconstruction error. Parameters ---------- X : np.ndarray Input signals threshold : float, optional Anomaly threshold. If None, computed from contamination contamination : float, default=0.1 Expected proportion of anomalies (used if threshold is None) metric : str, default='mse' Error metric Returns ------- anomalies : np.ndarray Boolean array indicating anomalies scores : np.ndarray Anomaly scores (reconstruction errors) threshold : float Threshold used for detection """ # Compute reconstruction errors scores = self.compute_reconstruction_error(X, metric=metric) # Determine threshold if threshold is None: threshold = np.percentile(scores, 100 * (1 - contamination)) # Detect anomalies anomalies = scores > threshold return anomalies, scores, threshold
[docs] def save(self, filepath: str): """Save model to file.""" filepath = Path(filepath) filepath.parent.mkdir(parents=True, exist_ok=True) if self.backend == "tensorflow": self.model.save(str(filepath)) else: # pytorch torch.save( { "model_state_dict": self.model.state_dict(), "encoder_state_dict": self.encoder.state_dict(), "decoder_state_dict": self.decoder.state_dict(), "input_shape": self.input_shape, "latent_dim": self.latent_dim, }, str(filepath), )
[docs] def load(self, filepath: str): """Load model from file.""" if self.backend == "tensorflow": self.model = keras.models.load_model(filepath) else: # pytorch checkpoint = torch.load(filepath) self.model.load_state_dict(checkpoint["model_state_dict"]) self.encoder.load_state_dict(checkpoint["encoder_state_dict"]) self.decoder.load_state_dict(checkpoint["decoder_state_dict"])
[docs] class StandardAutoencoder(BaseAutoencoder): """ Standard feedforward autoencoder. Architecture: - Encoder: Input -> Dense layers -> Latent space - Decoder: Latent space -> Dense layers -> Output Use cases: - Dimensionality reduction - Feature learning - Anomaly detection Examples -------- >>> from vitalDSP.ml_models.autoencoder import StandardAutoencoder >>> import numpy as np >>> >>> # Generate sample ECG signals >>> X_train = np.random.randn(1000, 500) # 1000 signals, 500 samples each >>> X_test = np.random.randn(100, 500) >>> >>> # Create and train autoencoder >>> ae = StandardAutoencoder( ... input_shape=(500,), ... latent_dim=32, ... hidden_dims=[256, 128, 64], ... activation='relu' ... ) >>> ae.fit(X_train, epochs=50, batch_size=32, validation_split=0.2) >>> >>> # Detect anomalies >>> anomalies, scores, threshold = ae.detect_anomalies(X_test, contamination=0.1) >>> print(f"Detected {anomalies.sum()} anomalies") """ def __init__( self, input_shape: Tuple[int, ...], latent_dim: int = 32, hidden_dims: List[int] = [256, 128, 64], activation: str = "relu", output_activation: str = "linear", use_batch_norm: bool = True, dropout_rate: float = 0.2, backend: str = "tensorflow", random_state: Optional[int] = None, ): """ Initialize standard autoencoder. Parameters ---------- input_shape : tuple Shape of input signals latent_dim : int, default=32 Dimensionality of latent space hidden_dims : list, default=[256, 128, 64] Dimensions of hidden layers in encoder (reversed for decoder) activation : str, default='relu' Activation function for hidden layers output_activation : str, default='linear' Activation function for output layer use_batch_norm : bool, default=True Whether to use batch normalization dropout_rate : float, default=0.2 Dropout rate for regularization backend : str, default='tensorflow' Deep learning backend random_state : int, optional Random seed """ super().__init__(input_shape, latent_dim, backend, random_state) self.hidden_dims = hidden_dims self.activation = activation self.output_activation = output_activation self.use_batch_norm = use_batch_norm self.dropout_rate = dropout_rate # Build model self._build_model() def _build_tensorflow_model(self): """Build TensorFlow/Keras model.""" # Encoder encoder_input = keras.Input(shape=self.input_shape) x = layers.Flatten()(encoder_input) for dim in self.hidden_dims: x = layers.Dense(dim)(x) if self.use_batch_norm: x = layers.BatchNormalization()(x) x = layers.Activation(self.activation)(x) x = layers.Dropout(self.dropout_rate)(x) latent = layers.Dense(self.latent_dim, name="latent")(x) self.encoder = Model(encoder_input, latent, name="encoder") # Decoder decoder_input = keras.Input(shape=(self.latent_dim,)) x = decoder_input for dim in reversed(self.hidden_dims): x = layers.Dense(dim)(x) if self.use_batch_norm: x = layers.BatchNormalization()(x) x = layers.Activation(self.activation)(x) x = layers.Dropout(self.dropout_rate)(x) # Output output_dim = np.prod(self.input_shape) x = layers.Dense(output_dim, activation=self.output_activation)(x) decoder_output = layers.Reshape(self.input_shape)(x) self.decoder = Model(decoder_input, decoder_output, name="decoder") # Full autoencoder self.model = Model( encoder_input, self.decoder(self.encoder(encoder_input)), name="autoencoder" ) def _build_pytorch_model(self): """Build PyTorch model.""" class Encoder(nn.Module): def __init__( self, input_dim, hidden_dims, latent_dim, activation, use_batch_norm, dropout_rate, ): super().__init__() self.input_dim = input_dim layers_list = [] prev_dim = input_dim for dim in hidden_dims: layers_list.append(nn.Linear(prev_dim, dim)) if use_batch_norm: layers_list.append(nn.BatchNorm1d(dim)) if activation == "relu": layers_list.append(nn.ReLU()) elif activation == "tanh": layers_list.append(nn.Tanh()) layers_list.append(nn.Dropout(dropout_rate)) prev_dim = dim layers_list.append(nn.Linear(prev_dim, latent_dim)) self.network = nn.Sequential(*layers_list) def forward(self, x): x = x.view(x.size(0), -1) return self.network(x) class Decoder(nn.Module): def __init__( self, latent_dim, hidden_dims, output_shape, activation, output_activation, use_batch_norm, dropout_rate, ): super().__init__() self.output_shape = output_shape layers_list = [] prev_dim = latent_dim for dim in reversed(hidden_dims): layers_list.append(nn.Linear(prev_dim, dim)) if use_batch_norm: layers_list.append(nn.BatchNorm1d(dim)) if activation == "relu": layers_list.append(nn.ReLU()) elif activation == "tanh": layers_list.append(nn.Tanh()) layers_list.append(nn.Dropout(dropout_rate)) prev_dim = dim output_dim = np.prod(output_shape) layers_list.append(nn.Linear(prev_dim, output_dim)) if output_activation == "sigmoid": layers_list.append(nn.Sigmoid()) elif output_activation == "tanh": layers_list.append(nn.Tanh()) self.network = nn.Sequential(*layers_list) def forward(self, x): x = self.network(x) return x.view(x.size(0), *self.output_shape) class Autoencoder(nn.Module): def __init__(self, encoder, decoder): super().__init__() self.encoder = encoder self.decoder = decoder def forward(self, x): latent = self.encoder(x) reconstructed = self.decoder(latent) return reconstructed input_dim = np.prod(self.input_shape) self.encoder = Encoder( input_dim, self.hidden_dims, self.latent_dim, self.activation, self.use_batch_norm, self.dropout_rate, ) self.decoder = Decoder( self.latent_dim, self.hidden_dims, self.input_shape, self.activation, self.output_activation, self.use_batch_norm, self.dropout_rate, ) self.model = Autoencoder(self.encoder, self.decoder) def _build_model(self): """Build model based on backend.""" if self.backend == "tensorflow": self._build_tensorflow_model() else: self._build_pytorch_model()
[docs] def fit( self, X: np.ndarray, y: Optional[np.ndarray] = None, epochs: int = 100, batch_size: int = 32, validation_split: float = 0.2, validation_data: Optional[Tuple[np.ndarray, np.ndarray]] = None, callbacks: Optional[List] = None, verbose: int = 1, ): """ Train the autoencoder. Parameters ---------- X : np.ndarray Training signals y : np.ndarray, optional Ignored (for sklearn compatibility) epochs : int, default=100 Number of training epochs batch_size : int, default=32 Batch size validation_split : float, default=0.2 Fraction of data to use for validation validation_data : tuple, optional Validation data (X_val, X_val) callbacks : list, optional Training callbacks verbose : int, default=1 Verbosity level Returns ------- self """ if self.backend == "tensorflow": # Compile model self.model.compile(optimizer="adam", loss="mse", metrics=["mae"]) # Default callbacks if callbacks is None: callbacks = [ keras.callbacks.EarlyStopping( monitor="val_loss", patience=10, restore_best_weights=True ), keras.callbacks.ReduceLROnPlateau( monitor="val_loss", factor=0.5, patience=5, min_lr=1e-6 ), ] # Train self.history = self.model.fit( X, X, # Input and target are the same epochs=epochs, batch_size=batch_size, validation_split=validation_split if validation_data is None else 0, validation_data=( (validation_data[0], validation_data[0]) if validation_data is not None else None ), callbacks=callbacks, verbose=verbose, ) else: # pytorch # Training setup optimizer = torch.optim.Adam(self.model.parameters(), lr=0.001) criterion = nn.MSELoss() # Prepare data if validation_data is not None: X_train, X_val = X, validation_data[0] else: split_idx = int(len(X) * (1 - validation_split)) X_train, X_val = X[:split_idx], X[split_idx:] train_dataset = TensorDataset( torch.FloatTensor(X_train), torch.FloatTensor(X_train) ) train_loader = DataLoader( train_dataset, batch_size=batch_size, shuffle=True ) val_dataset = TensorDataset( torch.FloatTensor(X_val), torch.FloatTensor(X_val) ) val_loader = DataLoader(val_dataset, batch_size=batch_size) # Training loop history = {"loss": [], "val_loss": []} best_val_loss = float("inf") patience_counter = 0 for epoch in range(epochs): # Training self.model.train() train_loss = 0.0 for batch_X, batch_y in train_loader: optimizer.zero_grad() outputs = self.model(batch_X) loss = criterion(outputs, batch_y) loss.backward() optimizer.step() train_loss += loss.item() train_loss /= len(train_loader) history["loss"].append(train_loss) # Validation self.model.eval() val_loss = 0.0 with torch.no_grad(): for batch_X, batch_y in val_loader: outputs = self.model(batch_X) loss = criterion(outputs, batch_y) val_loss += loss.item() val_loss /= len(val_loader) history["val_loss"].append(val_loss) # Early stopping if val_loss < best_val_loss: best_val_loss = val_loss patience_counter = 0 best_model_state = self.model.state_dict() else: patience_counter += 1 if patience_counter >= 10: if verbose: print(f"Early stopping at epoch {epoch+1}") self.model.load_state_dict(best_model_state) break if verbose and (epoch + 1) % 10 == 0: print( f"Epoch {epoch+1}/{epochs} - loss: {train_loss:.4f} - val_loss: {val_loss:.4f}" ) self.history = history return self
[docs] class ConvolutionalAutoencoder(BaseAutoencoder): """ Convolutional autoencoder for 1D signals. Architecture: - Encoder: Conv1D layers with pooling - Decoder: Transposed Conv1D layers (upsampling) Best for signals with spatial structure and local patterns. Examples -------- >>> from vitalDSP.ml_models.autoencoder import ConvolutionalAutoencoder >>> import numpy as np >>> >>> # Generate sample signals >>> X_train = np.random.randn(1000, 500, 1) # 1000 signals, 500 samples, 1 channel >>> >>> # Create and train >>> cae = ConvolutionalAutoencoder( ... input_shape=(500, 1), ... latent_dim=32, ... n_filters=[32, 64, 128], ... kernel_sizes=[7, 5, 3], ... pool_sizes=[2, 2, 2] ... ) >>> cae.fit(X_train, epochs=50) >>> >>> # Encode signals >>> latent_features = cae.encode(X_train[:10]) """ def __init__( self, input_shape: Tuple[int, ...], latent_dim: int = 32, n_filters: List[int] = [32, 64, 128], kernel_sizes: Union[int, List[int]] = 7, pool_sizes: Union[int, List[int]] = 2, activation: str = "relu", use_batch_norm: bool = True, dropout_rate: float = 0.2, backend: str = "tensorflow", random_state: Optional[int] = None, ): """ Initialize convolutional autoencoder. Parameters ---------- input_shape : tuple Shape of input signals (length, n_channels) latent_dim : int, default=32 Dimensionality of latent space n_filters : list, default=[32, 64, 128] Number of filters in each conv layer kernel_sizes : int or list, default=7 Kernel sizes for conv layers pool_sizes : int or list, default=2 Pool sizes for max pooling activation : str, default='relu' Activation function use_batch_norm : bool, default=True Whether to use batch normalization dropout_rate : float, default=0.2 Dropout rate backend : str, default='tensorflow' Deep learning backend random_state : int, optional Random seed """ super().__init__(input_shape, latent_dim, backend, random_state) self.n_filters = n_filters self.kernel_sizes = ( [kernel_sizes] * len(n_filters) if isinstance(kernel_sizes, int) else kernel_sizes ) self.pool_sizes = ( [pool_sizes] * len(n_filters) if isinstance(pool_sizes, int) else pool_sizes ) self.activation = activation self.use_batch_norm = use_batch_norm self.dropout_rate = dropout_rate # Build model self._build_model() def _build_tensorflow_model(self): """Build TensorFlow/Keras model.""" # Encoder encoder_input = keras.Input(shape=self.input_shape) x = encoder_input # Track shapes for decoder encoder_shapes = [] for n_filter, kernel_size, pool_size in zip( self.n_filters, self.kernel_sizes, self.pool_sizes ): encoder_shapes.append(x.shape[1:]) x = layers.Conv1D(n_filter, kernel_size, padding="same")(x) if self.use_batch_norm: x = layers.BatchNormalization()(x) x = layers.Activation(self.activation)(x) x = layers.MaxPooling1D(pool_size, padding="same")(x) x = layers.Dropout(self.dropout_rate)(x) # Latent space shape_before_flatten = x.shape[1:] x = layers.Flatten()(x) latent = layers.Dense(self.latent_dim, name="latent")(x) self.encoder = Model(encoder_input, latent, name="encoder") # Decoder decoder_input = keras.Input(shape=(self.latent_dim,)) x = layers.Dense(np.prod(shape_before_flatten))(decoder_input) x = layers.Reshape(shape_before_flatten)(x) for i, (n_filter, kernel_size, pool_size) in enumerate( reversed(list(zip(self.n_filters, self.kernel_sizes, self.pool_sizes))) ): x = layers.UpSampling1D(pool_size)(x) x = layers.Conv1D(n_filter, kernel_size, padding="same")(x) if self.use_batch_norm: x = layers.BatchNormalization()(x) x = layers.Activation(self.activation)(x) x = layers.Dropout(self.dropout_rate)(x) # Output layer decoder_output = layers.Conv1D( self.input_shape[-1], 3, padding="same", activation="linear" )(x) self.decoder = Model(decoder_input, decoder_output, name="decoder") # Full autoencoder self.model = Model( encoder_input, self.decoder(self.encoder(encoder_input)), name="conv_autoencoder", ) def _build_pytorch_model(self): """Build PyTorch model.""" # Similar to TensorFlow implementation # Implementation omitted for brevity - follows same pattern as StandardAutoencoder raise NotImplementedError( "PyTorch backend for ConvolutionalAutoencoder not yet implemented" ) def _build_model(self): """Build model based on backend.""" if self.backend == "tensorflow": self._build_tensorflow_model() else: self._build_pytorch_model()
[docs] def fit(self, X: np.ndarray, **kwargs): """Train the autoencoder. See StandardAutoencoder.fit() for parameters.""" # Ensure input has channel dimension if X.ndim == 2: X = X[..., np.newaxis] return super(StandardAutoencoder, self).fit(X, **kwargs)
[docs] class LSTMAutoencoder(BaseAutoencoder): """ LSTM-based autoencoder for sequential signals. Architecture: - Encoder: LSTM layers -> Latent representation - Decoder: RepeatVector -> LSTM layers -> Output sequence Best for signals with temporal dependencies. Examples -------- >>> from vitalDSP.ml_models.autoencoder import LSTMAutoencoder >>> import numpy as np >>> >>> # Generate time series data >>> X_train = np.random.randn(1000, 100, 1) # 1000 sequences, 100 timesteps, 1 feature >>> >>> # Create and train >>> lstm_ae = LSTMAutoencoder( ... input_shape=(100, 1), ... latent_dim=32, ... lstm_units=[64, 32] ... ) >>> lstm_ae.fit(X_train, epochs=50) >>> >>> # Detect anomalies in heartbeat sequences >>> anomalies, scores, threshold = lstm_ae.detect_anomalies(X_test) """ def __init__( self, input_shape: Tuple[int, ...], latent_dim: int = 32, lstm_units: List[int] = [64, 32], use_bidirectional: bool = False, dropout_rate: float = 0.2, backend: str = "tensorflow", random_state: Optional[int] = None, ): """ Initialize LSTM autoencoder. Parameters ---------- input_shape : tuple Shape of input sequences (timesteps, n_features) latent_dim : int, default=32 Dimensionality of latent space lstm_units : list, default=[64, 32] Number of units in each LSTM layer use_bidirectional : bool, default=False Whether to use bidirectional LSTMs dropout_rate : float, default=0.2 Dropout rate backend : str, default='tensorflow' Deep learning backend random_state : int, optional Random seed """ super().__init__(input_shape, latent_dim, backend, random_state) self.lstm_units = lstm_units self.use_bidirectional = use_bidirectional self.dropout_rate = dropout_rate # Build model self._build_model() def _build_tensorflow_model(self): """Build TensorFlow/Keras model.""" # Encoder encoder_input = keras.Input(shape=self.input_shape) x = encoder_input for i, units in enumerate(self.lstm_units): return_sequences = i < len(self.lstm_units) - 1 lstm_layer = layers.LSTM( units, return_sequences=return_sequences, dropout=self.dropout_rate ) if self.use_bidirectional: x = layers.Bidirectional(lstm_layer)(x) else: x = lstm_layer(x) latent = layers.Dense(self.latent_dim, name="latent")(x) self.encoder = Model(encoder_input, latent, name="encoder") # Decoder decoder_input = keras.Input(shape=(self.latent_dim,)) x = layers.RepeatVector(self.input_shape[0])(decoder_input) for i, units in enumerate(reversed(self.lstm_units)): lstm_layer = layers.LSTM( units, return_sequences=True, dropout=self.dropout_rate ) if self.use_bidirectional: x = layers.Bidirectional(lstm_layer)(x) else: x = lstm_layer(x) decoder_output = layers.TimeDistributed(layers.Dense(self.input_shape[-1]))(x) self.decoder = Model(decoder_input, decoder_output, name="decoder") # Full autoencoder self.model = Model( encoder_input, self.decoder(self.encoder(encoder_input)), name="lstm_autoencoder", ) def _build_pytorch_model(self): """Build PyTorch model.""" raise NotImplementedError( "PyTorch backend for LSTMAutoencoder not yet implemented" ) def _build_model(self): """Build model based on backend.""" if self.backend == "tensorflow": self._build_tensorflow_model() else: self._build_pytorch_model()
[docs] def fit(self, X: np.ndarray, **kwargs): """Train the autoencoder. See StandardAutoencoder.fit() for parameters.""" # Ensure input has feature dimension if X.ndim == 2: X = X[..., np.newaxis] # Use parent's fit method from BaseAutoencoder through StandardAutoencoder if self.backend == "tensorflow": # Compile model self.model.compile(optimizer="adam", loss="mse", metrics=["mae"]) # Get training parameters epochs = kwargs.get("epochs", 100) batch_size = kwargs.get("batch_size", 32) validation_split = kwargs.get("validation_split", 0.2) validation_data = kwargs.get("validation_data", None) verbose = kwargs.get("verbose", 1) callbacks = kwargs.get("callbacks", None) # Default callbacks if callbacks is None: callbacks = [ keras.callbacks.EarlyStopping( monitor="val_loss", patience=10, restore_best_weights=True ), keras.callbacks.ReduceLROnPlateau( monitor="val_loss", factor=0.5, patience=5, min_lr=1e-6 ), ] # Train self.history = self.model.fit( X, X, epochs=epochs, batch_size=batch_size, validation_split=validation_split if validation_data is None else 0, validation_data=( (validation_data[0], validation_data[0]) if validation_data is not None else None ), callbacks=callbacks, verbose=verbose, ) return self
[docs] class VariationalAutoencoder(BaseAutoencoder): """ Variational Autoencoder (VAE) for probabilistic signal generation. Architecture: - Encoder: Input -> mu and log_var (latent distribution parameters) - Sampling: Reparameterization trick - Decoder: Latent sample -> Reconstructed output Loss: Reconstruction loss + KL divergence Use cases: - Generative modeling - Signal synthesis - Anomaly detection with probability Examples -------- >>> from vitalDSP.ml_models.autoencoder import VariationalAutoencoder >>> import numpy as np >>> >>> # Generate sample data >>> X_train = np.random.randn(1000, 500) >>> >>> # Create and train VAE >>> vae = VariationalAutoencoder( ... input_shape=(500,), ... latent_dim=32, ... beta=1.0 # KL divergence weight ... ) >>> vae.fit(X_train, epochs=100) >>> >>> # Generate new signals >>> z_samples = np.random.randn(10, 32) >>> generated_signals = vae.decode(z_samples) """ def __init__( self, input_shape: Tuple[int, ...], latent_dim: int = 32, hidden_dims: List[int] = [256, 128, 64], activation: str = "relu", beta: float = 1.0, use_batch_norm: bool = True, dropout_rate: float = 0.2, backend: str = "tensorflow", random_state: Optional[int] = None, learning_rate: float = 0.001, ): super().__init__( input_shape=input_shape, latent_dim=latent_dim, backend=backend, random_state=random_state, ) self.hidden_dims = hidden_dims self.activation = activation self.beta = beta self.use_batch_norm = use_batch_norm self.dropout_rate = dropout_rate self.learning_rate = learning_rate # Build model self._build_model() def _build_tensorflow_model(self): """Build the TensorFlow model for the VAE.""" import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers # Encoder encoder_input = layers.Input(shape=self.input_shape, name="encoder_input") x = encoder_input for i, units in enumerate(self.hidden_dims): x = layers.Dense( units, activation=self.activation, name=f"encoder_dense_{i}" )(x) if self.use_batch_norm: x = layers.BatchNormalization(name=f"encoder_bn_{i}")(x) if self.dropout_rate > 0: x = layers.Dropout(self.dropout_rate, name=f"encoder_dropout_{i}")(x) # Latent space z_mean = layers.Dense(self.latent_dim, name="z_mean")(x) z_log_var = layers.Dense(self.latent_dim, name="z_log_var")(x) # Sampling layer using Keras backend def sampling(args): z_mean, z_log_var = args batch = keras.backend.shape(z_mean)[0] dim = keras.backend.shape(z_mean)[1] epsilon = keras.backend.random_normal(shape=(batch, dim)) return z_mean + keras.backend.exp(0.5 * z_log_var) * epsilon z = layers.Lambda(sampling, name="z")([z_mean, z_log_var]) # Decoder decoder_input = layers.Input(shape=(self.latent_dim,), name="decoder_input") x_dec = decoder_input for i, units in enumerate(reversed(self.hidden_dims)): x_dec = layers.Dense( units, activation=self.activation, name=f"decoder_dense_{i}" )(x_dec) if self.use_batch_norm: x_dec = layers.BatchNormalization(name=f"decoder_bn_{i}")(x_dec) if self.dropout_rate > 0: x_dec = layers.Dropout(self.dropout_rate, name=f"decoder_dropout_{i}")( x_dec ) decoder_output = layers.Dense( np.prod(self.input_shape), activation="sigmoid", name="decoder_output" )(x_dec) if len(self.input_shape) > 1: decoder_output = layers.Reshape(self.input_shape)(decoder_output) # Build encoder and decoder models self.encoder = keras.Model( encoder_input, [z_mean, z_log_var, z], name="encoder" ) self.decoder = keras.Model(decoder_input, decoder_output, name="decoder") # Build VAE model outputs = self.decoder(self.encoder(encoder_input)[2]) self.model = keras.Model(encoder_input, outputs, name="vae") # Create a custom training step to handle the VAE loss class VAEModel(keras.Model): def __init__(self, encoder, decoder, beta, **kwargs): super().__init__(**kwargs) self.encoder = encoder self.decoder = decoder self.beta = beta self.total_loss_tracker = keras.metrics.Mean(name="total_loss") self.reconstruction_loss_tracker = keras.metrics.Mean( name="reconstruction_loss" ) self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss") @property def metrics(self): return [ self.total_loss_tracker, self.reconstruction_loss_tracker, self.kl_loss_tracker, ] def train_step(self, data): # Handle both tuple (x, y) and single array x if isinstance(data, tuple): x, _ = data # VAE uses x as both input and target else: x = data with tf.GradientTape() as tape: z_mean, z_log_var, z = self.encoder(x, training=True) reconstruction = self.decoder(z, training=True) # Reconstruction loss reconstruction_loss = keras.backend.mean( keras.backend.sum( keras.backend.square(x - reconstruction), axis=list(range(1, len(x.shape))), ) ) # KL divergence loss kl_loss = -0.5 * keras.backend.mean( keras.backend.sum( 1 + z_log_var - keras.backend.square(z_mean) - keras.backend.exp(z_log_var), axis=1, ) ) total_loss = reconstruction_loss + self.beta * kl_loss grads = tape.gradient(total_loss, self.trainable_weights) self.optimizer.apply_gradients(zip(grads, self.trainable_weights)) self.total_loss_tracker.update_state(total_loss) self.reconstruction_loss_tracker.update_state(reconstruction_loss) self.kl_loss_tracker.update_state(kl_loss) return { "loss": self.total_loss_tracker.result(), "reconstruction_loss": self.reconstruction_loss_tracker.result(), "kl_loss": self.kl_loss_tracker.result(), } def test_step(self, data): # Handle both tuple (x, y) and single array x if isinstance(data, tuple): x, _ = data # VAE uses x as both input and target else: x = data z_mean, z_log_var, z = self.encoder(x, training=False) reconstruction = self.decoder(z, training=False) # Reconstruction loss reconstruction_loss = keras.backend.mean( keras.backend.sum( keras.backend.square(x - reconstruction), axis=list(range(1, len(x.shape))), ) ) # KL divergence loss kl_loss = -0.5 * keras.backend.mean( keras.backend.sum( 1 + z_log_var - keras.backend.square(z_mean) - keras.backend.exp(z_log_var), axis=1, ) ) total_loss = reconstruction_loss + self.beta * kl_loss return { "loss": total_loss, "reconstruction_loss": reconstruction_loss, "kl_loss": kl_loss, } # Create the custom VAE model self.vae_model = VAEModel(self.encoder, self.decoder, self.beta) self.vae_model.compile( optimizer=keras.optimizers.Adam(learning_rate=self.learning_rate) ) # Store references for encoding/decoding self.z_mean_output = z_mean self.z_log_var_output = z_log_var def _build_pytorch_model(self): """Build PyTorch model.""" raise NotImplementedError( "PyTorch backend for VariationalAutoencoder not yet implemented" ) def _build_model(self): """Build model based on backend.""" if self.backend == "tensorflow": self._build_tensorflow_model() else: self._build_pytorch_model()
[docs] def encode(self, X: np.ndarray, return_distribution: bool = False) -> np.ndarray: """ Encode signals to latent space. Parameters ---------- X : np.ndarray Input signals return_distribution : bool, default=False If True, return (z_mean, z_log_var, z). If False, return only z. Returns ------- np.ndarray or tuple Latent representations """ if self.backend == "tensorflow": z_mean, z_log_var, z = self.encoder.predict(X, verbose=0) if return_distribution: return z_mean, z_log_var, z return z else: raise NotImplementedError()
[docs] def fit(self, X: np.ndarray, **kwargs): """Train the VAE. See StandardAutoencoder.fit() for parameters.""" if self.backend == "tensorflow": # Get training parameters epochs = kwargs.get("epochs", 100) batch_size = kwargs.get("batch_size", 32) validation_split = kwargs.get("validation_split", 0.2) validation_data = kwargs.get("validation_data", None) verbose = kwargs.get("verbose", 1) callbacks = kwargs.get("callbacks", None) # Default callbacks if callbacks is None: callbacks = [ keras.callbacks.EarlyStopping( monitor="val_loss", patience=15, restore_best_weights=True ), keras.callbacks.ReduceLROnPlateau( monitor="val_loss", factor=0.5, patience=7, min_lr=1e-6 ), ] # Handle validation data - VAE expects (x_val, x_val) tuple for Keras fit() if validation_data is not None: if isinstance(validation_data, tuple): # If validation_data is (X_val, y_val), use (X_val, X_val) for VAE # The train_step/test_step will extract just X_val validation_data = (validation_data[0], validation_data[0]) else: # If it's just an array, convert to tuple validation_data = (validation_data, validation_data) # Train using the custom VAE model # VAE reconstructs input, so pass X as both input and target self.history = self.vae_model.fit( X, X, # VAE uses input as target epochs=epochs, batch_size=batch_size, validation_split=validation_split if validation_data is None else 0, validation_data=validation_data, callbacks=callbacks, verbose=verbose, ) return self
[docs] def sample(self, n_samples: int = 1) -> np.ndarray: """ Generate new samples from the learned latent distribution. Parameters ---------- n_samples : int, default=1 Number of samples to generate Returns ------- np.ndarray Generated samples of shape (n_samples, *input_shape) """ if self.backend == "tensorflow": # Sample from standard normal distribution z_samples = np.random.randn(n_samples, self.latent_dim) return self.decode(z_samples) else: raise NotImplementedError("Sampling not implemented for PyTorch backend")
[docs] class DenoisingAutoencoder(StandardAutoencoder): """ Denoising Autoencoder (DAE) for signal cleaning. Trained to reconstruct clean signals from noisy inputs. Architecture: Same as StandardAutoencoder Training: Add noise to inputs, train to reconstruct clean signals Examples -------- >>> from vitalDSP.ml_models.autoencoder import DenoisingAutoencoder >>> import numpy as np >>> >>> # Clean ECG signals >>> X_clean = np.random.randn(1000, 500) >>> >>> # Create and train denoising autoencoder >>> dae = DenoisingAutoencoder( ... input_shape=(500,), ... latent_dim=32, ... noise_type='gaussian', ... noise_level=0.1 ... ) >>> dae.fit(X_clean, epochs=100) >>> >>> # Denoise signals >>> X_noisy = X_clean + 0.1 * np.random.randn(*X_clean.shape) >>> X_denoised = dae.predict(X_noisy) """ def __init__( self, input_shape: Tuple[int, ...], latent_dim: int = 32, hidden_dims: List[int] = [256, 128, 64], noise_type: str = "gaussian", noise_level: float = 0.1, **kwargs, ): """ Initialize denoising autoencoder. Parameters ---------- input_shape : tuple Shape of input signals latent_dim : int, default=32 Dimensionality of latent space hidden_dims : list, default=[256, 128, 64] Dimensions of hidden layers noise_type : str, default='gaussian' Type of noise ('gaussian', 'uniform', 'salt_pepper') noise_level : float, default=0.1 Noise intensity **kwargs Additional arguments passed to StandardAutoencoder """ super().__init__(input_shape, latent_dim, hidden_dims, **kwargs) self.noise_type = noise_type self.noise_level = noise_level def _add_noise(self, X: np.ndarray) -> np.ndarray: """Add noise to signals.""" if self.noise_type == "gaussian": noise = np.random.normal(0, self.noise_level, X.shape) return X + noise elif self.noise_type == "uniform": noise = np.random.uniform(-self.noise_level, self.noise_level, X.shape) return X + noise elif self.noise_type == "salt_pepper": noisy = X.copy() # Salt salt_mask = np.random.random(X.shape) < self.noise_level / 2 noisy[salt_mask] = np.max(X) # Pepper pepper_mask = np.random.random(X.shape) < self.noise_level / 2 noisy[pepper_mask] = np.min(X) return noisy else: raise ValueError(f"Unknown noise type: {self.noise_type}")
[docs] def fit(self, X: np.ndarray, **kwargs): """ Train the denoising autoencoder. Parameters ---------- X : np.ndarray Clean training signals **kwargs Additional arguments passed to StandardAutoencoder.fit() Returns ------- self """ # Add noise to inputs X_noisy = self._add_noise(X) if self.backend == "tensorflow": # Compile model self.model.compile(optimizer="adam", loss="mse", metrics=["mae"]) # Get training parameters epochs = kwargs.get("epochs", 100) batch_size = kwargs.get("batch_size", 32) validation_split = kwargs.get("validation_split", 0.2) validation_data = kwargs.get("validation_data", None) verbose = kwargs.get("verbose", 1) callbacks = kwargs.get("callbacks", None) # Prepare validation data if validation_data is not None: val_noisy = self._add_noise(validation_data[0]) validation_data = (val_noisy, validation_data[0]) # Default callbacks if callbacks is None: callbacks = [ keras.callbacks.EarlyStopping( monitor="val_loss", patience=10, restore_best_weights=True ), keras.callbacks.ReduceLROnPlateau( monitor="val_loss", factor=0.5, patience=5, min_lr=1e-6 ), ] # Train with noisy inputs and clean targets self.history = self.model.fit( X_noisy, X, # Noisy input, clean target epochs=epochs, batch_size=batch_size, validation_split=validation_split if validation_data is None else 0, validation_data=validation_data, callbacks=callbacks, verbose=verbose, ) return self
[docs] def denoise(self, X: np.ndarray) -> np.ndarray: """ Denoise signals. Parameters ---------- X : np.ndarray Noisy signals Returns ------- np.ndarray Denoised signals """ if self.backend == "tensorflow": return self.model.predict(X, verbose=0) else: self.model.eval() with torch.no_grad(): X_tensor = torch.FloatTensor(X) denoised = self.model(X_tensor) return denoised.cpu().numpy()
# Convenience functions
[docs] def detect_anomalies( X: np.ndarray, autoencoder_type: str = "standard", contamination: float = 0.1, **autoencoder_kwargs, ) -> Tuple[np.ndarray, np.ndarray, float]: """ Quick anomaly detection using autoencoders. Parameters ---------- X : np.ndarray Input signals autoencoder_type : str, default='standard' Type of autoencoder ('standard', 'conv', 'lstm', 'vae') contamination : float, default=0.1 Expected proportion of anomalies **autoencoder_kwargs Arguments passed to autoencoder constructor Returns ------- anomalies : np.ndarray Boolean array indicating anomalies scores : np.ndarray Anomaly scores threshold : float Threshold used for detection Examples -------- >>> from vitalDSP.ml_models.autoencoder import detect_anomalies >>> import numpy as np >>> >>> # Generate signals with anomalies >>> X_normal = np.random.randn(900, 500) >>> X_anomaly = np.random.randn(100, 500) * 3 # Larger variance >>> X = np.vstack([X_normal, X_anomaly]) >>> >>> # Detect anomalies >>> anomalies, scores, threshold = detect_anomalies( ... X, ... autoencoder_type='standard', ... contamination=0.1, ... latent_dim=32 ... ) >>> print(f"Detected {anomalies.sum()} anomalies") """ # Select autoencoder class if autoencoder_type == "standard": ae_class = StandardAutoencoder elif autoencoder_type == "conv": ae_class = ConvolutionalAutoencoder elif autoencoder_type == "lstm": ae_class = LSTMAutoencoder elif autoencoder_type == "vae": ae_class = VariationalAutoencoder else: raise ValueError(f"Unknown autoencoder type: {autoencoder_type}") # Determine input shape input_shape = X.shape[1:] # Create and train autoencoder ae = ae_class(input_shape=input_shape, **autoencoder_kwargs) # Split data for training split_idx = int(len(X) * 0.8) X_train = X[:split_idx] X_test = X[split_idx:] # Train ae.fit(X_train, epochs=50, verbose=0) # Detect anomalies on full dataset return ae.detect_anomalies(X, contamination=contamination)
[docs] def denoise_signal( X_noisy: np.ndarray, X_clean: Optional[np.ndarray] = None, noise_type: str = "gaussian", noise_level: float = 0.1, **autoencoder_kwargs, ) -> np.ndarray: """ Denoise signals using denoising autoencoder. Parameters ---------- X_noisy : np.ndarray Noisy signals to denoise X_clean : np.ndarray, optional Clean signals for training. If None, use X_noisy as both input and target. noise_type : str, default='gaussian' Type of noise noise_level : float, default=0.1 Noise intensity **autoencoder_kwargs Arguments passed to DenoisingAutoencoder Returns ------- np.ndarray Denoised signals Examples -------- >>> from vitalDSP.ml_models.autoencoder import denoise_signal >>> import numpy as np >>> >>> # Generate clean signals >>> X_clean = np.random.randn(1000, 500) >>> >>> # Add noise >>> X_noisy = X_clean + 0.2 * np.random.randn(*X_clean.shape) >>> >>> # Denoise >>> X_denoised = denoise_signal( ... X_noisy, ... X_clean=X_clean, ... noise_type='gaussian', ... noise_level=0.1, ... latent_dim=32 ... ) """ input_shape = X_noisy.shape[1:] # Create denoising autoencoder dae = DenoisingAutoencoder( input_shape=input_shape, noise_type=noise_type, noise_level=noise_level, **autoencoder_kwargs, ) # Train if X_clean is not None: # Use clean signals for training dae.fit(X_clean, epochs=100, verbose=0) else: # Use noisy signals (assumes they have some clean structure) dae.fit(X_noisy, epochs=100, verbose=0) # Denoise return dae.denoise(X_noisy)