"""
Transfer Learning for Physiological Signal Analysis
This module provides utilities for transfer learning, enabling efficient model
adaptation to new tasks and domains with limited labeled data.
Features:
- Feature extraction (frozen base model)
- Fine-tuning strategies
- Domain adaptation techniques
- Few-shot learning support
- Progressive unfreezing
- Learning rate scheduling
Use Cases:
- Adapt ECG classifier to different populations
- Transfer from large to small datasets
- Cross-domain transfer (e.g., ECG to PPG)
- Personalized model adaptation
Author: vitalDSP
License: MIT
"""
"""
Machine Learning Models Module for Physiological Signal Processing
This module provides comprehensive capabilities for physiological
signal processing including ECG, PPG, EEG, and other vital signs.
Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0
Key Features:
- Object-oriented design with comprehensive classes
- Multiple processing methods and functions
- NumPy integration for numerical computations
- Deep learning framework integration
- Signal validation and error handling
Examples:
--------
Basic usage:
>>> import numpy as np
>>> from vitalDSP.ml_models.transfer_learning import TransferLearning
>>> signal = np.random.randn(1000)
>>> processor = TransferLearning(signal)
>>> result = processor.process()
>>> print(f'Processing result: {result}')
"""
import numpy as np
from typing import Optional, Union, Tuple, List, Dict, Any, Callable
from pathlib import Path
import warnings
try:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
TENSORFLOW_AVAILABLE = True
except ImportError:
TENSORFLOW_AVAILABLE = False
try:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
PYTORCH_AVAILABLE = True
except ImportError:
PYTORCH_AVAILABLE = False
[docs]
class TransferLearningStrategy:
"""
Base class for transfer learning strategies.
Implements common functionality for adapting pre-trained models to new tasks.
Examples
--------
>>> from vitalDSP.ml_models.transfer_learning import TransferFeatureExtractor
>>> from vitalDSP.ml_models.pretrained_models import load_pretrained_model
>>> import numpy as np
>>>
>>> # Load pre-trained model
>>> base_model = load_pretrained_model('ecg_classifier_mitbih')
>>>
>>> # Create feature extractor
>>> extractor = TransferFeatureExtractor(base_model.model, freeze_base=True)
>>>
>>> # Train on new task
>>> X_train = np.random.randn(100, 187, 1)
>>> y_train = np.random.randint(0, 3, 100) # New 3-class task
>>> extractor.fit(X_train, y_train, n_classes=3, epochs=20)
"""
def __init__(self, base_model: Any, backend: str = "tensorflow"):
"""
Initialize transfer learning strategy.
Parameters
----------
base_model : object
Pre-trained base model
backend : str, default='tensorflow'
Deep learning backend
"""
self.base_model = base_model
self.backend = backend.lower()
self.model = None
self.history = None
[docs]
def freeze_layers(self, n_layers: Optional[int] = None):
"""
Freeze layers in base model.
Parameters
----------
n_layers : int, optional
Number of layers to freeze. If None, freeze all.
"""
raise NotImplementedError("Subclasses must implement freeze_layers()")
[docs]
def unfreeze_layers(self, n_layers: Optional[int] = None):
"""
Unfreeze layers for fine-tuning.
Parameters
----------
n_layers : int, optional
Number of layers to unfreeze from the end. If None, unfreeze all.
"""
raise NotImplementedError("Subclasses must implement unfreeze_layers()")
[docs]
class FineTuner(TransferLearningStrategy):
"""
Fine-tuning transfer learning.
Unfreezes part or all of the base model and trains with a small learning rate.
Best when:
- Target dataset is moderately sized
- Target task differs somewhat from source task
- More accuracy is needed than feature extraction provides
Strategies:
1. All-at-once: Unfreeze all layers, train with small LR
2. Progressive: Gradually unfreeze layers from top to bottom
3. Discriminative: Use different learning rates for different layers
Examples
--------
>>> from vitalDSP.ml_models.transfer_learning import FineTuner
>>> import numpy as np
>>>
>>> # Assuming we have a base model
>>> finetuner = FineTuner(base_model, strategy='progressive')
>>>
>>> # Fine-tune with progressive unfreezing
>>> X_train = np.random.randn(500, 187, 1)
>>> y_train = np.random.randint(0, 4, 500)
>>> finetuner.fit(X_train, y_train, n_classes=4, epochs=30)
"""
def __init__(
self,
base_model: Any,
strategy: str = "all_at_once",
backend: str = "tensorflow",
):
"""
Initialize fine-tuner.
Parameters
----------
base_model : object
Pre-trained base model
strategy : str, default='all_at_once'
Fine-tuning strategy ('all_at_once', 'progressive', 'discriminative')
backend : str, default='tensorflow'
Deep learning backend
"""
super().__init__(base_model, backend)
self.strategy = strategy.lower()
# Initially freeze all layers
self.freeze_layers()
[docs]
def freeze_layers(self, n_layers: Optional[int] = None):
"""Freeze layers."""
if self.backend == "tensorflow":
if hasattr(self.base_model, "trainable"):
if n_layers is None:
self.base_model.trainable = False
else:
for i, layer in enumerate(self.base_model.layers):
layer.trainable = i >= n_layers
else:
raise NotImplementedError("PyTorch backend not yet implemented")
[docs]
def unfreeze_layers(self, n_layers: Optional[int] = None):
"""Unfreeze layers."""
if self.backend == "tensorflow":
if hasattr(self.base_model, "trainable"):
if n_layers is None:
self.base_model.trainable = True
else:
total = len(self.base_model.layers)
for i, layer in enumerate(self.base_model.layers):
layer.trainable = i >= total - n_layers
else:
raise NotImplementedError("PyTorch backend not yet implemented")
[docs]
def fit(
self,
X_train: np.ndarray,
y_train: np.ndarray,
n_classes: Optional[int] = None,
task: str = "classification",
X_val: Optional[np.ndarray] = None,
y_val: Optional[np.ndarray] = None,
epochs: int = 50,
batch_size: int = 32,
base_learning_rate: float = 1e-5,
head_learning_rate: float = 1e-3,
progressive_epochs: Optional[List[int]] = None,
verbose: int = 1,
) -> Dict[str, List[float]]:
"""
Fine-tune model on new task.
Parameters
----------
X_train : np.ndarray
Training signals
y_train : np.ndarray
Training labels
n_classes : int, optional
Number of classes
task : str, default='classification'
Task type
X_val : np.ndarray, optional
Validation signals
y_val : np.ndarray, optional
Validation labels
epochs : int, default=50
Total training epochs
batch_size : int, default=32
Batch size
base_learning_rate : float, default=1e-5
Learning rate for base model (smaller)
head_learning_rate : float, default=1e-3
Learning rate for new head (larger)
progressive_epochs : list of int, optional
Epochs at which to unfreeze layers (for progressive strategy)
verbose : int, default=1
Verbosity level
Returns
-------
dict
Training history
"""
if self.backend != "tensorflow":
raise NotImplementedError("Only TensorFlow backend currently supported")
# Build model with new head
inputs = keras.Input(shape=X_train.shape[1:])
x = self.base_model(inputs, training=False) # Start with frozen base
# Ensure flattened features
if len(x.shape) > 2:
x = (
layers.GlobalAveragePooling1D()(x)
if len(x.shape) == 3
else layers.Flatten()(x)
)
# New head
x = layers.Dense(256, activation="relu", name="ft_dense1")(x)
x = layers.BatchNormalization(name="ft_bn1")(x)
x = layers.Dropout(0.4, name="ft_dropout1")(x)
x = layers.Dense(128, activation="relu", name="ft_dense2")(x)
x = layers.Dropout(0.3, name="ft_dropout2")(x)
if task == "classification":
if n_classes is None:
n_classes = len(np.unique(y_train))
outputs = layers.Dense(n_classes, activation="softmax", name="ft_output")(x)
loss = "sparse_categorical_crossentropy"
metrics = ["accuracy"]
else:
outputs = layers.Dense(1, activation="linear", name="ft_output")(x)
loss = "mse"
metrics = ["mae"]
self.model = keras.Model(inputs, outputs)
# Strategy-specific training
if self.strategy == "all_at_once":
return self._fit_all_at_once(
X_train,
y_train,
X_val,
y_val,
loss,
metrics,
epochs,
batch_size,
base_learning_rate,
head_learning_rate,
verbose,
)
elif self.strategy == "progressive":
return self._fit_progressive(
X_train,
y_train,
X_val,
y_val,
loss,
metrics,
epochs,
batch_size,
base_learning_rate,
head_learning_rate,
progressive_epochs,
verbose,
)
elif self.strategy == "discriminative":
return self._fit_discriminative(
X_train,
y_train,
X_val,
y_val,
loss,
metrics,
epochs,
batch_size,
base_learning_rate,
head_learning_rate,
verbose,
)
else:
raise ValueError(f"Unknown strategy: {self.strategy}")
def _fit_all_at_once(
self,
X_train,
y_train,
X_val,
y_val,
loss,
metrics,
epochs,
batch_size,
base_lr,
head_lr,
verbose,
):
"""Fit with all layers unfrozen at once."""
# First train only the head
self.freeze_layers()
self.model.compile(
optimizer=keras.optimizers.Adam(head_lr), loss=loss, metrics=metrics
)
history_head = self.model.fit(
X_train,
y_train,
validation_data=(X_val, y_val) if X_val is not None else None,
epochs=min(10, epochs // 3),
batch_size=batch_size,
verbose=verbose,
)
# Then unfreeze all and fine-tune
self.unfreeze_layers()
self.model.compile(
optimizer=keras.optimizers.Adam(base_lr), loss=loss, metrics=metrics
)
callbacks = [
keras.callbacks.EarlyStopping(
monitor="val_loss" if X_val is not None else "loss",
patience=15,
restore_best_weights=True,
),
keras.callbacks.ReduceLROnPlateau(
monitor="val_loss" if X_val is not None else "loss",
factor=0.5,
patience=7,
min_lr=1e-8,
),
]
history_finetune = self.model.fit(
X_train,
y_train,
validation_data=(X_val, y_val) if X_val is not None else None,
epochs=epochs,
batch_size=batch_size,
callbacks=callbacks,
verbose=verbose,
)
# Combine histories
combined_history = {}
for key in history_head.history:
combined_history[key] = (
history_head.history[key] + history_finetune.history[key]
)
self.history = combined_history
return self.history
def _fit_progressive(
self,
X_train,
y_train,
X_val,
y_val,
loss,
metrics,
epochs,
batch_size,
base_lr,
head_lr,
progressive_epochs,
verbose,
):
"""Fit with progressive layer unfreezing."""
if progressive_epochs is None:
# Default: unfreeze in 3 stages
progressive_epochs = [epochs // 4, epochs // 2, 3 * epochs // 4]
# Stage 1: Train head only
self.freeze_layers()
self.model.compile(
optimizer=keras.optimizers.Adam(head_lr), loss=loss, metrics=metrics
)
history = {"loss": [], "val_loss": []}
if "accuracy" in metrics:
history["accuracy"] = []
history["val_accuracy"] = []
current_epoch = 0
n_layers = len(self.base_model.layers)
for stage, target_epoch in enumerate(progressive_epochs):
stage_epochs = target_epoch - current_epoch
if stage > 0:
# Progressively unfreeze layers
n_to_unfreeze = min(
n_layers // len(progressive_epochs) * stage, n_layers
)
self.unfreeze_layers(n_to_unfreeze)
# Reduce learning rate for base
lr = base_lr * (0.5 ** (stage - 1))
self.model.compile(
optimizer=keras.optimizers.Adam(lr), loss=loss, metrics=metrics
)
if verbose:
print(f"\nStage {stage + 1}: Training for {stage_epochs} epochs")
h = self.model.fit(
X_train,
y_train,
validation_data=(X_val, y_val) if X_val is not None else None,
epochs=stage_epochs,
batch_size=batch_size,
verbose=verbose,
)
# Accumulate history
for key in h.history:
history[key].extend(h.history[key])
current_epoch = target_epoch
self.history = history
return self.history
def _fit_discriminative(
self,
X_train,
y_train,
X_val,
y_val,
loss,
metrics,
epochs,
batch_size,
base_lr,
head_lr,
verbose,
):
"""Fit with discriminative learning rates."""
# Unfreeze all layers
self.unfreeze_layers()
# Use different LRs for different parts
# This is simplified - in practice, you'd create custom optimizer
# with per-layer learning rates
self.model.compile(
optimizer=keras.optimizers.Adam(base_lr), loss=loss, metrics=metrics
)
callbacks = [
keras.callbacks.EarlyStopping(
monitor="val_loss" if X_val is not None else "loss",
patience=15,
restore_best_weights=True,
),
keras.callbacks.ReduceLROnPlateau(
monitor="val_loss" if X_val is not None else "loss",
factor=0.5,
patience=7,
min_lr=1e-8,
),
]
history = self.model.fit(
X_train,
y_train,
validation_data=(X_val, y_val) if X_val is not None else None,
epochs=epochs,
batch_size=batch_size,
callbacks=callbacks,
verbose=verbose,
)
self.history = history.history
return self.history
[docs]
def predict(self, X: np.ndarray, batch_size: int = 32) -> np.ndarray:
"""Make predictions."""
if self.model is None:
raise ValueError("Model not fitted. Call fit() first.")
return self.model.predict(X, batch_size=batch_size, verbose=0)
[docs]
class DomainAdapter:
"""
Domain adaptation for transfer learning.
Adapts model from source domain to target domain when distributions differ.
Techniques:
- Feature alignment
- Adversarial training
- Maximum Mean Discrepancy (MMD)
Use cases:
- Transfer from one patient population to another
- Adapt across different sensors/devices
- Cross-institution transfer
Examples
--------
>>> from vitalDSP.ml_models.transfer_learning import DomainAdapter
>>> import numpy as np
>>>
>>> # Source and target data from different domains
>>> X_source = np.random.randn(500, 187, 1)
>>> y_source = np.random.randint(0, 3, 500)
>>> X_target = np.random.randn(100, 187, 1) * 1.5 # Different distribution
>>>
>>> # Create domain adapter
>>> adapter = DomainAdapter(base_model, method='mmd')
>>> adapter.fit(X_source, y_source, X_target, epochs=50)
"""
def __init__(
self, base_model: Any, method: str = "mmd", backend: str = "tensorflow"
):
"""
Initialize domain adapter.
Parameters
----------
base_model : object
Pre-trained base model
method : str, default='mmd'
Domain adaptation method ('mmd', 'coral', 'dann')
backend : str, default='tensorflow'
Deep learning backend
"""
self.base_model = base_model
self.method = method.lower()
self.backend = backend.lower()
self.model = None
[docs]
def fit(
self,
X_source: np.ndarray,
y_source: np.ndarray,
X_target: np.ndarray,
y_target: Optional[np.ndarray] = None,
n_classes: Optional[int] = None,
epochs: int = 50,
batch_size: int = 32,
learning_rate: float = 1e-4,
lambda_da: float = 0.1,
verbose: int = 1,
) -> Dict[str, List[float]]:
"""
Fit domain adaptation model.
Parameters
----------
X_source : np.ndarray
Source domain signals (labeled)
y_source : np.ndarray
Source domain labels
X_target : np.ndarray
Target domain signals (can be unlabeled)
y_target : np.ndarray, optional
Target domain labels (if available)
n_classes : int, optional
Number of classes
epochs : int, default=50
Training epochs
batch_size : int, default=32
Batch size
learning_rate : float, default=1e-4
Learning rate
lambda_da : float, default=0.1
Weight for domain adaptation loss
verbose : int, default=1
Verbosity level
Returns
-------
dict
Training history
"""
if self.backend != "tensorflow":
raise NotImplementedError("Only TensorFlow backend currently supported")
if n_classes is None:
n_classes = len(np.unique(y_source))
if self.method == "mmd":
return self._fit_mmd(
X_source,
y_source,
X_target,
y_target,
n_classes,
epochs,
batch_size,
learning_rate,
lambda_da,
verbose,
)
else:
raise NotImplementedError(f"Method '{self.method}' not yet implemented")
def _fit_mmd(
self,
X_source,
y_source,
X_target,
y_target,
n_classes,
epochs,
batch_size,
learning_rate,
lambda_da,
verbose,
):
"""Fit using Maximum Mean Discrepancy."""
# Build model
inputs = keras.Input(shape=X_source.shape[1:])
features = self.base_model(inputs)
if len(features.shape) > 2:
features = layers.GlobalAveragePooling1D()(features)
classifier = layers.Dense(n_classes, activation="softmax")(features)
self.model = keras.Model(inputs, [classifier, features])
# Custom training loop for MMD
optimizer = keras.optimizers.Adam(learning_rate)
@tf.function
def train_step(x_src, y_src, x_tgt):
with tf.GradientTape() as tape:
# Forward pass
pred_src, feat_src = self.model(x_src, training=True)
_, feat_tgt = self.model(x_tgt, training=True)
# Classification loss
class_loss = tf.reduce_mean(
keras.losses.sparse_categorical_crossentropy(y_src, pred_src)
)
# MMD loss (simplified)
mmd_loss = self._compute_mmd(feat_src, feat_tgt)
# Total loss
total_loss = class_loss + lambda_da * mmd_loss
# Backward pass
gradients = tape.gradient(total_loss, self.model.trainable_variables)
optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))
return class_loss, mmd_loss, total_loss
# Training loop
history = {"class_loss": [], "mmd_loss": [], "total_loss": []}
n_batches = len(X_source) // batch_size
for epoch in range(epochs):
epoch_class_loss = 0.0
epoch_mmd_loss = 0.0
epoch_total_loss = 0.0
# Shuffle data
indices_src = np.random.permutation(len(X_source))
indices_tgt = np.random.permutation(len(X_target))
for batch in range(n_batches):
# Get batch data
batch_idx_src = indices_src[
batch * batch_size : (batch + 1) * batch_size
]
batch_idx_tgt = indices_tgt[
batch * batch_size : (batch + 1) * batch_size
]
x_src_batch = X_source[batch_idx_src]
y_src_batch = y_source[batch_idx_src]
x_tgt_batch = X_target[batch_idx_tgt]
# Train step
c_loss, m_loss, t_loss = train_step(
x_src_batch, y_src_batch, x_tgt_batch
)
epoch_class_loss += c_loss.numpy()
epoch_mmd_loss += m_loss.numpy()
epoch_total_loss += t_loss.numpy()
# Average losses
epoch_class_loss /= n_batches
epoch_mmd_loss /= n_batches
epoch_total_loss /= n_batches
history["class_loss"].append(epoch_class_loss)
history["mmd_loss"].append(epoch_mmd_loss)
history["total_loss"].append(epoch_total_loss)
if verbose and (epoch + 1) % 10 == 0:
print(
f"Epoch {epoch + 1}/{epochs} - "
f"class_loss: {epoch_class_loss:.4f} - "
f"mmd_loss: {epoch_mmd_loss:.4f} - "
f"total_loss: {epoch_total_loss:.4f}"
)
return history
@staticmethod
def _compute_mmd(features_source, features_target):
"""Compute Maximum Mean Discrepancy between source and target features."""
if not TENSORFLOW_AVAILABLE:
raise ImportError("TensorFlow is required for MMD computation")
# Gaussian kernel
def gaussian_kernel(x, y, sigma=1.0):
x = tf.expand_dims(x, 1) # (n, 1, d)
y = tf.expand_dims(y, 0) # (1, m, d)
diff = x - y # (n, m, d)
dist_sq = tf.reduce_sum(diff**2, axis=2) # (n, m)
return tf.exp(-dist_sq / (2 * sigma**2))
# MMD computation
K_ss = gaussian_kernel(features_source, features_source)
K_tt = gaussian_kernel(features_target, features_target)
K_st = gaussian_kernel(features_source, features_target)
m = tf.cast(tf.shape(features_source)[0], tf.float32)
n = tf.cast(tf.shape(features_target)[0], tf.float32)
mmd = (
tf.reduce_sum(K_ss) / (m * m)
+ tf.reduce_sum(K_tt) / (n * n)
- 2 * tf.reduce_sum(K_st) / (m * n)
)
return mmd
[docs]
def predict(self, X: np.ndarray, batch_size: int = 32) -> np.ndarray:
"""Make predictions."""
if self.model is None:
raise ValueError("Model not fitted. Call fit() first.")
predictions, _ = self.model.predict(X, batch_size=batch_size, verbose=0)
return np.argmax(predictions, axis=1)
# Convenience functions
[docs]
def quick_transfer(
base_model: Any,
X_train: np.ndarray,
y_train: np.ndarray,
strategy: str = "feature_extraction",
n_classes: Optional[int] = None,
epochs: int = 30,
**kwargs,
) -> Union[TransferFeatureExtractor, FineTuner]:
"""
Quick transfer learning with sensible defaults.
Parameters
----------
base_model : object
Pre-trained base model
X_train : np.ndarray
Training signals
y_train : np.ndarray
Training labels
strategy : str, default='feature_extraction'
Transfer strategy ('feature_extraction', 'fine_tuning')
n_classes : int, optional
Number of classes
epochs : int, default=30
Training epochs
**kwargs
Additional arguments
Returns
-------
TransferFeatureExtractor or FineTuner
Trained transfer learning model
Examples
--------
>>> from vitalDSP.ml_models.transfer_learning import quick_transfer
>>> from vitalDSP.ml_models.pretrained_models import load_pretrained_model
>>> import numpy as np
>>>
>>> # Load pre-trained model
>>> base = load_pretrained_model('ecg_classifier_mitbih')
>>>
>>> # Quick transfer learning
>>> X = np.random.randn(200, 187, 1)
>>> y = np.random.randint(0, 3, 200)
>>> model = quick_transfer(base.model, X, y, strategy='feature_extraction', epochs=20)
"""
if strategy == "feature_extraction":
model = TransferFeatureExtractor(base_model, freeze_base=True)
model.fit(X_train, y_train, n_classes=n_classes, epochs=epochs, **kwargs)
elif strategy == "fine_tuning":
model = FineTuner(base_model, strategy="all_at_once")
model.fit(X_train, y_train, n_classes=n_classes, epochs=epochs, **kwargs)
else:
raise ValueError(f"Unknown strategy: {strategy}")
return model