Source code for vitalDSP.ml_models.transformer_model
"""
vitalDSP Transformer Model for Physiological Signals
State-of-the-art Transformer architecture for long-range dependency modeling
in physiological signal analysis.
Features:
- Multi-head self-attention mechanism
- Positional encoding
- Layer normalization
- Feed-forward networks with residual connections
- Encoder-only (BERT-style) and Encoder-Decoder architectures
- Optimized for 1D time series data
Applications:
- Long ECG signal classification
- Multi-lead ECG interpretation
- EEG temporal pattern recognition
- Long-term signal forecasting
- Sequence-to-sequence tasks
Author: vitalDSP Team
Date: 2025
"""
import numpy as np
import warnings
from typing import Optional, Tuple, List
from abc import ABC
try:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
TF_AVAILABLE = True
except ImportError:
TF_AVAILABLE = False
try:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
[docs]
class PositionalEncoding(layers.Layer if TF_AVAILABLE else object):
"""
Positional Encoding for Transformer (TensorFlow).
Adds positional information to input embeddings using sinusoidal functions.
"""
def __init__(self, d_model, max_len=5000, **kwargs):
super().__init__(**kwargs)
self.d_model = d_model
self.max_len = max_len
[docs]
def build(self, input_shape):
# Create positional encoding matrix
position = np.arange(self.max_len)[:, np.newaxis]
div_term = np.exp(
np.arange(0, self.d_model, 2) * -(np.log(10000.0) / self.d_model)
)
pe = np.zeros((self.max_len, self.d_model))
pe[:, 0::2] = np.sin(position * div_term)
pe[:, 1::2] = np.cos(position * div_term)
self.pe = tf.constant(pe, dtype=tf.float32)
self.pe = tf.expand_dims(self.pe, 0) # Add batch dimension
[docs]
def call(self, x):
# Add positional encoding to input
seq_len = tf.shape(x)[1]
return x + self.pe[:, :seq_len, :]
[docs]
class MultiHeadSelfAttention(layers.Layer if TF_AVAILABLE else object):
"""
Multi-Head Self-Attention mechanism (TensorFlow).
Parameters
----------
d_model : int
Dimension of model
n_heads : int
Number of attention heads
dropout_rate : float
Dropout rate
"""
def __init__(self, d_model, n_heads, dropout_rate=0.1, **kwargs):
super().__init__(**kwargs)
self.d_model = d_model
self.n_heads = n_heads
self.dropout_rate = dropout_rate
assert d_model % n_heads == 0, "d_model must be divisible by n_heads"
self.depth = d_model // n_heads
self.wq = layers.Dense(d_model)
self.wk = layers.Dense(d_model)
self.wv = layers.Dense(d_model)
self.dense = layers.Dense(d_model)
self.dropout = layers.Dropout(dropout_rate)
[docs]
def split_heads(self, x, batch_size):
"""Split the last dimension into (n_heads, depth)."""
x = tf.reshape(x, (batch_size, -1, self.n_heads, self.depth))
return tf.transpose(x, perm=[0, 2, 1, 3])
[docs]
def call(self, x, mask=None, training=False):
batch_size = tf.shape(x)[0]
# Linear projections
q = self.wq(x) # (batch_size, seq_len, d_model)
k = self.wk(x)
v = self.wv(x)
# Split heads
q = self.split_heads(q, batch_size) # (batch_size, n_heads, seq_len, depth)
k = self.split_heads(k, batch_size)
v = self.split_heads(v, batch_size)
# Scaled dot-product attention
matmul_qk = tf.matmul(q, k, transpose_b=True)
# Scale
dk = tf.cast(tf.shape(k)[-1], tf.float32)
scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
# Apply mask if provided
if mask is not None:
scaled_attention_logits += mask * -1e9
# Softmax
attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
attention_weights = self.dropout(attention_weights, training=training)
# Apply attention to values
output = tf.matmul(attention_weights, v)
# Concatenate heads
output = tf.transpose(output, perm=[0, 2, 1, 3])
output = tf.reshape(output, (batch_size, -1, self.d_model))
# Final linear projection
output = self.dense(output)
return output, attention_weights
[docs]
class TransformerEncoderLayer(layers.Layer if TF_AVAILABLE else object):
"""
Single Transformer Encoder Layer (TensorFlow).
Consists of:
- Multi-head self-attention
- Feed-forward network
- Layer normalization
- Residual connections
"""
def __init__(self, d_model, n_heads, d_ff, dropout_rate=0.1, **kwargs):
super().__init__(**kwargs)
self.mha = MultiHeadSelfAttention(d_model, n_heads, dropout_rate)
self.ffn = keras.Sequential(
[
layers.Dense(d_ff, activation="relu"),
layers.Dropout(dropout_rate),
layers.Dense(d_model),
]
)
self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
self.dropout1 = layers.Dropout(dropout_rate)
self.dropout2 = layers.Dropout(dropout_rate)
[docs]
def call(self, x, mask=None, training=False):
# Multi-head attention with residual connection
attn_output, _ = self.mha(x, mask=mask, training=training)
attn_output = self.dropout1(attn_output, training=training)
out1 = self.layernorm1(x + attn_output)
# Feed-forward network with residual connection
ffn_output = self.ffn(out1)
ffn_output = self.dropout2(ffn_output, training=training)
out2 = self.layernorm2(out1 + ffn_output)
return out2
[docs]
class TransformerModel:
"""
Transformer model for physiological signal analysis.
Features:
- Multi-head self-attention for capturing long-range dependencies
- Positional encoding for sequence information
- Stacked encoder layers
- Classification or regression head
- Optimized for 1D time series
Parameters
----------
input_shape : tuple
Shape of input sequences (sequence_length, n_features)
n_classes : int
Number of output classes (use 1 for regression)
d_model : int, default=128
Dimension of model (embedding dimension)
n_heads : int, default=8
Number of attention heads
n_layers : int, default=4
Number of transformer encoder layers
d_ff : int, default=512
Dimension of feed-forward network
dropout_rate : float, default=0.1
Dropout rate
max_len : int, default=5000
Maximum sequence length for positional encoding
task : str, default='classification'
Task type ('classification' or 'regression')
backend : str, default='tensorflow'
Backend framework ('tensorflow' or 'pytorch')
Attributes
----------
model : keras.Model or torch.nn.Module
The transformer model
history : dict
Training history
Examples
--------
>>> from vitalDSP.ml_models import TransformerModel
>>>
>>> # Long ECG classification
>>> model = TransformerModel(
... input_shape=(5000, 1),
... n_classes=5,
... d_model=128,
... n_heads=8,
... n_layers=4
... )
>>>
>>> model.build_model()
>>> history = model.train(X_train, y_train, epochs=100)
>>> predictions = model.predict(X_test)
"""
def __init__(
self,
input_shape: Tuple[int, ...],
n_classes: int,
d_model: int = 128,
n_heads: int = 8,
n_layers: int = 4,
d_ff: int = 512,
dropout_rate: float = 0.1,
max_len: int = 5000,
task: str = "classification",
backend: str = "tensorflow",
**kwargs,
):
self.input_shape = input_shape
self.n_classes = n_classes
self.d_model = d_model
self.n_heads = n_heads
self.n_layers = n_layers
self.d_ff = d_ff
self.dropout_rate = dropout_rate
self.max_len = max_len
self.task = task
self.backend = backend.lower()
self.model = None
self.history = None
if self.backend == "tensorflow" and not TF_AVAILABLE:
raise ImportError(
"TensorFlow not available. Install with: pip install tensorflow"
)
elif self.backend == "pytorch" and not TORCH_AVAILABLE:
raise ImportError("PyTorch not available. Install with: pip install torch")
[docs]
def build_model(self):
"""Build the Transformer model architecture."""
if self.backend == "tensorflow":
self._build_tensorflow_model()
elif self.backend == "pytorch":
self._build_pytorch_model()
return self.model
def _build_tensorflow_model(self):
"""Build TensorFlow/Keras Transformer model."""
# Input layer
inputs = keras.Input(shape=self.input_shape)
# Project input to d_model dimensions
x = layers.Dense(self.d_model)(inputs)
# Add positional encoding
x = PositionalEncoding(self.d_model, self.max_len)(x)
x = layers.Dropout(self.dropout_rate)(x)
# Stack transformer encoder layers
for i in range(self.n_layers):
x = TransformerEncoderLayer(
self.d_model,
self.n_heads,
self.d_ff,
self.dropout_rate,
name=f"transformer_layer_{i+1}",
)(x)
# Global pooling
x = layers.GlobalAveragePooling1D()(x)
# Dense layers
x = layers.Dense(256, activation="relu")(x)
x = layers.Dropout(self.dropout_rate)(x)
x = layers.Dense(128, activation="relu")(x)
x = layers.Dropout(self.dropout_rate)(x)
# Output layer
if self.task == "classification":
if self.n_classes == 2:
outputs = layers.Dense(1, activation="sigmoid")(x)
else:
outputs = layers.Dense(self.n_classes, activation="softmax")(x)
else: # regression
outputs = layers.Dense(1, activation="linear")(x)
self.model = keras.Model(inputs=inputs, outputs=outputs, name="Transformer")
def _build_pytorch_model(self):
"""Build PyTorch Transformer model."""
class PositionalEncodingPyTorch(nn.Module):
"""Positional encoding for PyTorch."""
def __init__(self, d_model, max_len=5000):
super().__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(
torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)
)
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer("pe", pe)
def forward(self, x):
return x + self.pe[:, : x.size(1), :]
class TransformerModelPyTorch(nn.Module):
"""PyTorch Transformer model."""
def __init__(
self,
input_dim,
d_model,
n_heads,
n_layers,
d_ff,
n_classes,
dropout_rate,
max_len,
task,
):
super().__init__()
self.task = task
self.input_projection = nn.Linear(input_dim, d_model)
self.pos_encoding = PositionalEncodingPyTorch(d_model, max_len)
# Transformer encoder
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model,
nhead=n_heads,
dim_feedforward=d_ff,
dropout=dropout_rate,
batch_first=True,
)
self.transformer = nn.TransformerEncoder(
encoder_layer, num_layers=n_layers
)
# Classification/Regression head
self.fc1 = nn.Linear(d_model, 256)
self.dropout1 = nn.Dropout(dropout_rate)
self.fc2 = nn.Linear(256, 128)
self.dropout2 = nn.Dropout(dropout_rate)
if task == "classification":
self.output = nn.Linear(128, n_classes if n_classes > 2 else 1)
else:
self.output = nn.Linear(128, 1)
def forward(self, x):
# Input projection
x = self.input_projection(x)
# Add positional encoding
x = self.pos_encoding(x)
# Transformer encoding
x = self.transformer(x)
# Global pooling
x = torch.mean(x, dim=1)
# Classification/Regression head
x = F.relu(self.fc1(x))
x = self.dropout1(x)
x = F.relu(self.fc2(x))
x = self.dropout2(x)
x = self.output(x)
if self.task == "classification":
if x.shape[-1] == 1:
x = torch.sigmoid(x)
else:
x = F.softmax(x, dim=-1)
return x
input_dim = self.input_shape[-1] if len(self.input_shape) > 1 else 1
self.model = TransformerModelPyTorch(
input_dim,
self.d_model,
self.n_heads,
self.n_layers,
self.d_ff,
self.n_classes,
self.dropout_rate,
self.max_len,
self.task,
)
[docs]
def train(
self,
X_train: np.ndarray,
y_train: np.ndarray,
X_val: Optional[np.ndarray] = None,
y_val: Optional[np.ndarray] = None,
epochs: int = 100,
batch_size: int = 32,
learning_rate: float = 0.0001,
warmup_epochs: int = 10,
**kwargs,
):
"""
Train the Transformer model.
Parameters
----------
X_train : ndarray of shape (n_samples, sequence_length, n_features)
Training data
y_train : ndarray
Training labels
X_val : ndarray, optional
Validation data
y_val : ndarray, optional
Validation labels
epochs : int, default=100
Number of training epochs
batch_size : int, default=32
Batch size
learning_rate : float, default=0.0001
Initial learning rate
warmup_epochs : int, default=10
Number of warmup epochs with linear LR increase
Returns
-------
history : dict
Training history
"""
if self.backend == "tensorflow":
return self._train_tensorflow(
X_train,
y_train,
X_val,
y_val,
epochs,
batch_size,
learning_rate,
warmup_epochs,
**kwargs,
)
elif self.backend == "pytorch":
return self._train_pytorch(
X_train,
y_train,
X_val,
y_val,
epochs,
batch_size,
learning_rate,
warmup_epochs,
**kwargs,
)
def _train_tensorflow(
self,
X_train,
y_train,
X_val,
y_val,
epochs,
batch_size,
learning_rate,
warmup_epochs,
**kwargs,
):
"""Train TensorFlow Transformer model."""
# Learning rate schedule with warmup
class WarmUpSchedule(keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, d_model, warmup_steps=4000):
super().__init__()
self.d_model = tf.cast(d_model, tf.float32)
self.warmup_steps = warmup_steps
def __call__(self, step):
step = tf.cast(step, tf.float32)
step = tf.maximum(step, 1.0) # guard against rsqrt(0) = inf → NaN
arg1 = tf.math.rsqrt(step)
arg2 = step * (self.warmup_steps**-1.5)
return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)
# Optimizer with warmup
lr_schedule = WarmUpSchedule(
self.d_model, warmup_steps=warmup_epochs * (len(X_train) // batch_size)
)
optimizer = keras.optimizers.Adam(
lr_schedule, beta_1=0.9, beta_2=0.98, epsilon=1e-9
)
# Loss and metrics
if self.task == "classification":
if self.n_classes == 2:
loss = "binary_crossentropy"
metrics = ["accuracy", keras.metrics.AUC(name="auc")]
else:
loss = (
"sparse_categorical_crossentropy"
if y_train.ndim == 1
else "categorical_crossentropy"
)
metrics = ["accuracy"]
else:
loss = "mse"
metrics = ["mae"]
self.model.compile(optimizer=optimizer, loss=loss, metrics=metrics)
# Callbacks
callback_list = [
keras.callbacks.EarlyStopping(
monitor="val_loss" if X_val is not None else "loss",
patience=20,
restore_best_weights=True,
),
keras.callbacks.ReduceLROnPlateau(
monitor="val_loss" if X_val is not None else "loss",
factor=0.5,
patience=10,
min_lr=1e-7,
),
]
# Train
validation_data = (X_val, y_val) if X_val is not None else None
history = self.model.fit(
X_train,
y_train,
validation_data=validation_data,
epochs=epochs,
batch_size=batch_size,
callbacks=callback_list,
verbose=kwargs.get("verbose", 1),
)
self.history = history.history
return self.history
def _train_pytorch(
self,
X_train,
y_train,
X_val,
y_val,
epochs,
batch_size,
learning_rate,
warmup_epochs,
**kwargs,
):
"""Train PyTorch Transformer model."""
raise NotImplementedError(
"PyTorch Transformer training is not yet implemented. Use backend='tensorflow'."
)
[docs]
def predict(self, X: np.ndarray) -> np.ndarray:
"""
Make predictions.
Parameters
----------
X : ndarray of shape (n_samples, sequence_length, n_features)
Input data
Returns
-------
predictions : ndarray
Model predictions
"""
if self.backend == "tensorflow":
return self.model.predict(X)
elif self.backend == "pytorch":
self.model.eval()
with torch.no_grad():
X_tensor = torch.FloatTensor(X)
device = next(self.model.parameters()).device
X_tensor = X_tensor.to(device)
outputs = self.model(X_tensor)
return outputs.cpu().numpy()
[docs]
def get_attention_weights(self, X: np.ndarray, layer_idx: int = 0):
"""
Extract attention weights for visualization.
Parameters
----------
X : ndarray
Input data
layer_idx : int
Index of transformer layer to extract attention from
Returns
-------
attention_weights : ndarray
Attention weight matrices
"""
if self.backend == "tensorflow":
# Create model that outputs attention weights
attention_model = keras.Model(
inputs=self.model.input,
outputs=self.model.get_layer(f"transformer_layer_{layer_idx+1}").output,
)
# Note: Full implementation would require modifying the model to return attention weights
warnings.warn("Attention weight extraction requires model modification")
return None
else:
warnings.warn(
"Attention weight extraction not implemented for PyTorch backend"
)
return None
[docs]
def save(self, filepath: str):
"""Save model to disk."""
if self.backend == "tensorflow":
self.model.save(filepath)
elif self.backend == "pytorch":
torch.save(self.model.state_dict(), filepath)
[docs]
def load(self, filepath: str):
"""Load model from disk."""
if self.backend == "tensorflow":
self.model = keras.models.load_model(
filepath,
custom_objects={
"PositionalEncoding": PositionalEncoding,
"MultiHeadSelfAttention": MultiHeadSelfAttention,
"TransformerEncoderLayer": TransformerEncoderLayer,
},
)
elif self.backend == "pytorch":
self.model.load_state_dict(torch.load(filepath))