Source code for vitalDSP.ml_models.transformer_model

"""
vitalDSP Transformer Model for Physiological Signals

State-of-the-art Transformer architecture for long-range dependency modeling
in physiological signal analysis.

Features:
- Multi-head self-attention mechanism
- Positional encoding
- Layer normalization
- Feed-forward networks with residual connections
- Encoder-only (BERT-style) and Encoder-Decoder architectures
- Optimized for 1D time series data

Applications:
- Long ECG signal classification
- Multi-lead ECG interpretation
- EEG temporal pattern recognition
- Long-term signal forecasting
- Sequence-to-sequence tasks

Author: vitalDSP Team
Date: 2025
"""

import numpy as np
import warnings
from typing import Optional, Tuple, List
from abc import ABC

try:
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import layers

    TF_AVAILABLE = True
except ImportError:
    TF_AVAILABLE = False

try:
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    import math

    TORCH_AVAILABLE = True
except ImportError:
    TORCH_AVAILABLE = False


[docs] class PositionalEncoding(layers.Layer if TF_AVAILABLE else object): """ Positional Encoding for Transformer (TensorFlow). Adds positional information to input embeddings using sinusoidal functions. """ def __init__(self, d_model, max_len=5000, **kwargs): super().__init__(**kwargs) self.d_model = d_model self.max_len = max_len
[docs] def build(self, input_shape): # Create positional encoding matrix position = np.arange(self.max_len)[:, np.newaxis] div_term = np.exp( np.arange(0, self.d_model, 2) * -(np.log(10000.0) / self.d_model) ) pe = np.zeros((self.max_len, self.d_model)) pe[:, 0::2] = np.sin(position * div_term) pe[:, 1::2] = np.cos(position * div_term) self.pe = tf.constant(pe, dtype=tf.float32) self.pe = tf.expand_dims(self.pe, 0) # Add batch dimension
[docs] def call(self, x): # Add positional encoding to input seq_len = tf.shape(x)[1] return x + self.pe[:, :seq_len, :]
[docs] class MultiHeadSelfAttention(layers.Layer if TF_AVAILABLE else object): """ Multi-Head Self-Attention mechanism (TensorFlow). Parameters ---------- d_model : int Dimension of model n_heads : int Number of attention heads dropout_rate : float Dropout rate """ def __init__(self, d_model, n_heads, dropout_rate=0.1, **kwargs): super().__init__(**kwargs) self.d_model = d_model self.n_heads = n_heads self.dropout_rate = dropout_rate assert d_model % n_heads == 0, "d_model must be divisible by n_heads" self.depth = d_model // n_heads self.wq = layers.Dense(d_model) self.wk = layers.Dense(d_model) self.wv = layers.Dense(d_model) self.dense = layers.Dense(d_model) self.dropout = layers.Dropout(dropout_rate)
[docs] def split_heads(self, x, batch_size): """Split the last dimension into (n_heads, depth).""" x = tf.reshape(x, (batch_size, -1, self.n_heads, self.depth)) return tf.transpose(x, perm=[0, 2, 1, 3])
[docs] def call(self, x, mask=None, training=False): batch_size = tf.shape(x)[0] # Linear projections q = self.wq(x) # (batch_size, seq_len, d_model) k = self.wk(x) v = self.wv(x) # Split heads q = self.split_heads(q, batch_size) # (batch_size, n_heads, seq_len, depth) k = self.split_heads(k, batch_size) v = self.split_heads(v, batch_size) # Scaled dot-product attention matmul_qk = tf.matmul(q, k, transpose_b=True) # Scale dk = tf.cast(tf.shape(k)[-1], tf.float32) scaled_attention_logits = matmul_qk / tf.math.sqrt(dk) # Apply mask if provided if mask is not None: scaled_attention_logits += mask * -1e9 # Softmax attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1) attention_weights = self.dropout(attention_weights, training=training) # Apply attention to values output = tf.matmul(attention_weights, v) # Concatenate heads output = tf.transpose(output, perm=[0, 2, 1, 3]) output = tf.reshape(output, (batch_size, -1, self.d_model)) # Final linear projection output = self.dense(output) return output, attention_weights
[docs] class TransformerEncoderLayer(layers.Layer if TF_AVAILABLE else object): """ Single Transformer Encoder Layer (TensorFlow). Consists of: - Multi-head self-attention - Feed-forward network - Layer normalization - Residual connections """ def __init__(self, d_model, n_heads, d_ff, dropout_rate=0.1, **kwargs): super().__init__(**kwargs) self.mha = MultiHeadSelfAttention(d_model, n_heads, dropout_rate) self.ffn = keras.Sequential( [ layers.Dense(d_ff, activation="relu"), layers.Dropout(dropout_rate), layers.Dense(d_model), ] ) self.layernorm1 = layers.LayerNormalization(epsilon=1e-6) self.layernorm2 = layers.LayerNormalization(epsilon=1e-6) self.dropout1 = layers.Dropout(dropout_rate) self.dropout2 = layers.Dropout(dropout_rate)
[docs] def call(self, x, mask=None, training=False): # Multi-head attention with residual connection attn_output, _ = self.mha(x, mask=mask, training=training) attn_output = self.dropout1(attn_output, training=training) out1 = self.layernorm1(x + attn_output) # Feed-forward network with residual connection ffn_output = self.ffn(out1) ffn_output = self.dropout2(ffn_output, training=training) out2 = self.layernorm2(out1 + ffn_output) return out2
[docs] class TransformerModel: """ Transformer model for physiological signal analysis. Features: - Multi-head self-attention for capturing long-range dependencies - Positional encoding for sequence information - Stacked encoder layers - Classification or regression head - Optimized for 1D time series Parameters ---------- input_shape : tuple Shape of input sequences (sequence_length, n_features) n_classes : int Number of output classes (use 1 for regression) d_model : int, default=128 Dimension of model (embedding dimension) n_heads : int, default=8 Number of attention heads n_layers : int, default=4 Number of transformer encoder layers d_ff : int, default=512 Dimension of feed-forward network dropout_rate : float, default=0.1 Dropout rate max_len : int, default=5000 Maximum sequence length for positional encoding task : str, default='classification' Task type ('classification' or 'regression') backend : str, default='tensorflow' Backend framework ('tensorflow' or 'pytorch') Attributes ---------- model : keras.Model or torch.nn.Module The transformer model history : dict Training history Examples -------- >>> from vitalDSP.ml_models import TransformerModel >>> >>> # Long ECG classification >>> model = TransformerModel( ... input_shape=(5000, 1), ... n_classes=5, ... d_model=128, ... n_heads=8, ... n_layers=4 ... ) >>> >>> model.build_model() >>> history = model.train(X_train, y_train, epochs=100) >>> predictions = model.predict(X_test) """ def __init__( self, input_shape: Tuple[int, ...], n_classes: int, d_model: int = 128, n_heads: int = 8, n_layers: int = 4, d_ff: int = 512, dropout_rate: float = 0.1, max_len: int = 5000, task: str = "classification", backend: str = "tensorflow", **kwargs, ): self.input_shape = input_shape self.n_classes = n_classes self.d_model = d_model self.n_heads = n_heads self.n_layers = n_layers self.d_ff = d_ff self.dropout_rate = dropout_rate self.max_len = max_len self.task = task self.backend = backend.lower() self.model = None self.history = None if self.backend == "tensorflow" and not TF_AVAILABLE: raise ImportError( "TensorFlow not available. Install with: pip install tensorflow" ) elif self.backend == "pytorch" and not TORCH_AVAILABLE: raise ImportError("PyTorch not available. Install with: pip install torch")
[docs] def build_model(self): """Build the Transformer model architecture.""" if self.backend == "tensorflow": self._build_tensorflow_model() elif self.backend == "pytorch": self._build_pytorch_model() return self.model
def _build_tensorflow_model(self): """Build TensorFlow/Keras Transformer model.""" # Input layer inputs = keras.Input(shape=self.input_shape) # Project input to d_model dimensions x = layers.Dense(self.d_model)(inputs) # Add positional encoding x = PositionalEncoding(self.d_model, self.max_len)(x) x = layers.Dropout(self.dropout_rate)(x) # Stack transformer encoder layers for i in range(self.n_layers): x = TransformerEncoderLayer( self.d_model, self.n_heads, self.d_ff, self.dropout_rate, name=f"transformer_layer_{i+1}", )(x) # Global pooling x = layers.GlobalAveragePooling1D()(x) # Dense layers x = layers.Dense(256, activation="relu")(x) x = layers.Dropout(self.dropout_rate)(x) x = layers.Dense(128, activation="relu")(x) x = layers.Dropout(self.dropout_rate)(x) # Output layer if self.task == "classification": if self.n_classes == 2: outputs = layers.Dense(1, activation="sigmoid")(x) else: outputs = layers.Dense(self.n_classes, activation="softmax")(x) else: # regression outputs = layers.Dense(1, activation="linear")(x) self.model = keras.Model(inputs=inputs, outputs=outputs, name="Transformer") def _build_pytorch_model(self): """Build PyTorch Transformer model.""" class PositionalEncodingPyTorch(nn.Module): """Positional encoding for PyTorch.""" def __init__(self, d_model, max_len=5000): super().__init__() pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp( torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model) ) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0) self.register_buffer("pe", pe) def forward(self, x): return x + self.pe[:, : x.size(1), :] class TransformerModelPyTorch(nn.Module): """PyTorch Transformer model.""" def __init__( self, input_dim, d_model, n_heads, n_layers, d_ff, n_classes, dropout_rate, max_len, task, ): super().__init__() self.task = task self.input_projection = nn.Linear(input_dim, d_model) self.pos_encoding = PositionalEncodingPyTorch(d_model, max_len) # Transformer encoder encoder_layer = nn.TransformerEncoderLayer( d_model=d_model, nhead=n_heads, dim_feedforward=d_ff, dropout=dropout_rate, batch_first=True, ) self.transformer = nn.TransformerEncoder( encoder_layer, num_layers=n_layers ) # Classification/Regression head self.fc1 = nn.Linear(d_model, 256) self.dropout1 = nn.Dropout(dropout_rate) self.fc2 = nn.Linear(256, 128) self.dropout2 = nn.Dropout(dropout_rate) if task == "classification": self.output = nn.Linear(128, n_classes if n_classes > 2 else 1) else: self.output = nn.Linear(128, 1) def forward(self, x): # Input projection x = self.input_projection(x) # Add positional encoding x = self.pos_encoding(x) # Transformer encoding x = self.transformer(x) # Global pooling x = torch.mean(x, dim=1) # Classification/Regression head x = F.relu(self.fc1(x)) x = self.dropout1(x) x = F.relu(self.fc2(x)) x = self.dropout2(x) x = self.output(x) if self.task == "classification": if x.shape[-1] == 1: x = torch.sigmoid(x) else: x = F.softmax(x, dim=-1) return x input_dim = self.input_shape[-1] if len(self.input_shape) > 1 else 1 self.model = TransformerModelPyTorch( input_dim, self.d_model, self.n_heads, self.n_layers, self.d_ff, self.n_classes, self.dropout_rate, self.max_len, self.task, )
[docs] def train( self, X_train: np.ndarray, y_train: np.ndarray, X_val: Optional[np.ndarray] = None, y_val: Optional[np.ndarray] = None, epochs: int = 100, batch_size: int = 32, learning_rate: float = 0.0001, warmup_epochs: int = 10, **kwargs, ): """ Train the Transformer model. Parameters ---------- X_train : ndarray of shape (n_samples, sequence_length, n_features) Training data y_train : ndarray Training labels X_val : ndarray, optional Validation data y_val : ndarray, optional Validation labels epochs : int, default=100 Number of training epochs batch_size : int, default=32 Batch size learning_rate : float, default=0.0001 Initial learning rate warmup_epochs : int, default=10 Number of warmup epochs with linear LR increase Returns ------- history : dict Training history """ if self.backend == "tensorflow": return self._train_tensorflow( X_train, y_train, X_val, y_val, epochs, batch_size, learning_rate, warmup_epochs, **kwargs, ) elif self.backend == "pytorch": return self._train_pytorch( X_train, y_train, X_val, y_val, epochs, batch_size, learning_rate, warmup_epochs, **kwargs, )
def _train_tensorflow( self, X_train, y_train, X_val, y_val, epochs, batch_size, learning_rate, warmup_epochs, **kwargs, ): """Train TensorFlow Transformer model.""" # Learning rate schedule with warmup class WarmUpSchedule(keras.optimizers.schedules.LearningRateSchedule): def __init__(self, d_model, warmup_steps=4000): super().__init__() self.d_model = tf.cast(d_model, tf.float32) self.warmup_steps = warmup_steps def __call__(self, step): step = tf.cast(step, tf.float32) arg1 = tf.math.rsqrt(step) arg2 = step * (self.warmup_steps**-1.5) return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2) # Optimizer with warmup lr_schedule = WarmUpSchedule( self.d_model, warmup_steps=warmup_epochs * (len(X_train) // batch_size) ) optimizer = keras.optimizers.Adam( lr_schedule, beta_1=0.9, beta_2=0.98, epsilon=1e-9 ) # Loss and metrics if self.task == "classification": if self.n_classes == 2: loss = "binary_crossentropy" metrics = ["accuracy", keras.metrics.AUC(name="auc")] else: loss = ( "sparse_categorical_crossentropy" if y_train.ndim == 1 else "categorical_crossentropy" ) metrics = ["accuracy"] else: loss = "mse" metrics = ["mae"] self.model.compile(optimizer=optimizer, loss=loss, metrics=metrics) # Callbacks callback_list = [ keras.callbacks.EarlyStopping( monitor="val_loss" if X_val is not None else "loss", patience=20, restore_best_weights=True, ), keras.callbacks.ReduceLROnPlateau( monitor="val_loss" if X_val is not None else "loss", factor=0.5, patience=10, min_lr=1e-7, ), ] # Train validation_data = (X_val, y_val) if X_val is not None else None history = self.model.fit( X_train, y_train, validation_data=validation_data, epochs=epochs, batch_size=batch_size, callbacks=callback_list, verbose=kwargs.get("verbose", 1), ) self.history = history.history return self.history def _train_pytorch( self, X_train, y_train, X_val, y_val, epochs, batch_size, learning_rate, warmup_epochs, **kwargs, ): """Train PyTorch Transformer model.""" # Similar to other PyTorch training methods # Implementation details omitted for brevity pass
[docs] def predict(self, X: np.ndarray) -> np.ndarray: """ Make predictions. Parameters ---------- X : ndarray of shape (n_samples, sequence_length, n_features) Input data Returns ------- predictions : ndarray Model predictions """ if self.backend == "tensorflow": return self.model.predict(X) elif self.backend == "pytorch": self.model.eval() with torch.no_grad(): X_tensor = torch.FloatTensor(X) device = next(self.model.parameters()).device X_tensor = X_tensor.to(device) outputs = self.model(X_tensor) return outputs.cpu().numpy()
[docs] def get_attention_weights(self, X: np.ndarray, layer_idx: int = 0): """ Extract attention weights for visualization. Parameters ---------- X : ndarray Input data layer_idx : int Index of transformer layer to extract attention from Returns ------- attention_weights : ndarray Attention weight matrices """ if self.backend == "tensorflow": # Create model that outputs attention weights attention_model = keras.Model( inputs=self.model.input, outputs=self.model.get_layer(f"transformer_layer_{layer_idx+1}").output, ) # Note: Full implementation would require modifying the model to return attention weights warnings.warn("Attention weight extraction requires model modification") return None else: warnings.warn( "Attention weight extraction not implemented for PyTorch backend" ) return None
[docs] def save(self, filepath: str): """Save model to disk.""" if self.backend == "tensorflow": self.model.save(filepath) elif self.backend == "pytorch": torch.save(self.model.state_dict(), filepath)
[docs] def load(self, filepath: str): """Load model from disk.""" if self.backend == "tensorflow": self.model = keras.models.load_model( filepath, custom_objects={ "PositionalEncoding": PositionalEncoding, "MultiHeadSelfAttention": MultiHeadSelfAttention, "TransformerEncoderLayer": TransformerEncoderLayer, }, ) elif self.backend == "pytorch": self.model.load_state_dict(torch.load(filepath))