"""
Advanced Computation Module for Physiological Signal Processing
This module provides comprehensive capabilities for physiological
signal processing including ECG, PPG, EEG, and other vital signs.
Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0
Key Features:
- Object-oriented design with comprehensive classes
- Multiple processing methods and functions
- NumPy integration for numerical computations
- Pattern and anomaly detection
Examples:
---------
Basic usage:
>>> import numpy as np
>>> from vitalDSP.advanced_computation.real_time_anomaly_detection import RealTimeAnomalyDetection
>>> signal = np.random.randn(1000)
>>> rtad = RealTimeAnomalyDetection(signal)
>>> anomalies = rtad.detect_statistical()
"""
import numpy as np
from collections import deque
from vitalDSP.transforms.wavelet_transform import (
WaveletTransform,
) # Assuming WaveletTransform is in utils
[docs]
class RealTimeAnomalyDetection:
"""
Comprehensive Real-Time Anomaly Detection for detecting anomalies in streaming data.
This class supports multiple anomaly detection techniques including statistical methods, machine learning models, and deep learning models.
It is designed for use in real-time environments with online learning capabilities.
Methods
-------
detect_statistical : method
Detects anomalies using statistical methods like Z-score and moving average.
detect_knn : method
Detects anomalies using k-Nearest Neighbors (k-NN).
detect_svm : method
Detects anomalies using Support Vector Machine (SVM).
detect_autoencoder : method
Detects anomalies using Autoencoders.
detect_lstm : method
Detects anomalies using LSTM-based models.
detect_wavelet : method
Detects anomalies using wavelet transforms.
update_model : method
Updates the model with new data for online learning.
evaluate : method
Evaluates the performance of the anomaly detection method on a test dataset.
Example Usage
-------------
data_stream = np.sin(np.linspace(0, 10, 100)) + np.random.normal(0, 0.1, 100)
anomaly_detector = RealTimeAnomalyDetection(window_size=10)
# Detect anomalies using Z-score
for data_point in data_stream:
anomaly = anomaly_detector.detect_statistical(data_point, method='z_score', threshold=2.0)
print("Anomaly (Z-Score):", anomaly)
# Train and detect anomalies using k-NN
anomaly_detector.train_knn(data_stream[:50])
for data_point in data_stream[50:]:
anomaly = anomaly_detector.detect_knn(data_point)
print("Anomaly (k-NN):", anomaly)
"""
def __init__(self, window_size=10):
"""
Initialize the RealTimeAnomalyDetection class with a specified window size.
Parameters
----------
window_size : int
The number of data points to consider for detecting anomalies.
"""
self.window_size = window_size
self.data_window = deque(maxlen=window_size)
self.models = {}
[docs]
def detect_statistical(self, data_point, method="z_score", threshold=2.0, **kwargs):
"""
Detect anomalies using statistical methods like Z-score, moving average, etc.
Parameters
----------
data_point : float
The new data point to be analyzed.
method : str, optional
The statistical method to use ('z_score' or 'moving_average'). Default is 'z_score'.
threshold : float, optional
The threshold value for detecting anomalies. Default is 2.0.
Returns
-------
bool
True if the data point is an anomaly, False otherwise.
"""
self.data_window.append(data_point)
if len(self.data_window) < self.window_size:
return False # Not enough data to detect anomalies
if method == "z_score":
return self._z_score_detection(data_point, threshold)
elif method == "moving_average":
return self._moving_average_detection(data_point, threshold)
else:
raise ValueError(f"Unknown statistical method: {method}")
def _z_score_detection(self, data_point, threshold):
"""Detect anomalies using Z-score method."""
mean = np.mean(self.data_window)
std_dev = np.std(self.data_window)
if std_dev == 0:
return False # constant window — no meaningful deviation
z_score = (data_point - mean) / std_dev
return abs(z_score) > threshold
def _moving_average_detection(self, data_point, threshold):
"""Detect anomalies using moving average method."""
moving_avg = np.mean(self.data_window)
return abs(data_point - moving_avg) > threshold
[docs]
def train_knn(self, training_data, k=5):
"""
Train a k-Nearest Neighbors (k-NN) model on the training data.
Parameters
----------
training_data : numpy.ndarray
The training dataset.
k : int, optional
The number of nearest neighbors to consider. Default is 5.
Returns
-------
None
"""
self.models["knn"] = {"training_data": np.array(training_data), "k": k}
[docs]
def detect_knn(self, data_point):
"""
Detect anomalies using the k-Nearest Neighbors (k-NN) method.
Parameters
----------
data_point : float
The new data point to be analyzed.
Returns
-------
bool
True if the data point is an anomaly, False otherwise.
"""
if "knn" not in self.models:
raise ValueError("k-NN model has not been trained. Call train_knn() first.")
training_data = self.models["knn"]["training_data"]
k = self.models["knn"]["k"]
distances = np.abs(training_data - data_point)
nearest_neighbors = np.sort(distances)[:k]
mean_distance = np.mean(nearest_neighbors)
return mean_distance > np.std(
training_data
) # Anomaly if distance is larger than standard deviation
[docs]
def train_svm(self, training_data, kernel="rbf"):
"""
Train a Support Vector Machine (SVM) model on the training data.
Parameters
----------
training_data : numpy.ndarray
The training dataset.
kernel : str, optional
The kernel type for SVM ('linear', 'poly', 'rbf'). Default is 'rbf'.
Returns
-------
None
"""
self.models["svm"] = SimpleSVM(training_data, kernel)
[docs]
def detect_svm(self, data_point):
"""
Detect anomalies using the Support Vector Machine (SVM) method.
Parameters
----------
data_point : float
The new data point to be analyzed.
Returns
-------
bool
True if the data point is an anomaly, False otherwise.
"""
if "svm" not in self.models:
raise ValueError("SVM model has not been trained. Call train_svm() first.")
return self.models["svm"].predict(data_point)
[docs]
def train_autoencoder(self, training_data, encoding_dim=3):
"""
Train an Autoencoder model on the training data.
Parameters
----------
training_data : numpy.ndarray
The training dataset.
encoding_dim : int, optional
The dimension of the encoding layer. Default is 3.
Returns
-------
None
"""
self.models["autoencoder"] = SimpleAutoencoder(training_data, encoding_dim)
[docs]
def detect_autoencoder(self, data_point, threshold=0.1):
"""
Detect anomalies using the Autoencoder method.
Parameters
----------
data_point : float
The new data point to be analyzed.
threshold : float, optional
The reconstruction error threshold for detecting anomalies. Default is 0.1.
Returns
-------
bool
True if the data point is an anomaly, False otherwise.
"""
if "autoencoder" not in self.models:
raise ValueError(
"Autoencoder model has not been trained. Call train_autoencoder() first."
)
reconstruction_error = self.models["autoencoder"].reconstruction_error(
data_point
)
return reconstruction_error > threshold
[docs]
def train_lstm(self, training_data, hidden_units=50):
"""
Train an LSTM-based model on the training data.
Parameters
----------
training_data : numpy.ndarray
The training dataset.
hidden_units : int, optional
The number of hidden units in the LSTM. Default is 50.
Returns
-------
None
"""
self.models["lstm"] = SimpleLSTM(training_data, hidden_units)
[docs]
def detect_lstm(self, data_point, threshold=0.1):
"""
Detect anomalies using the LSTM-based model.
Parameters
----------
data_point : float
The new data point to be analyzed.
threshold : float, optional
The prediction error threshold for detecting anomalies. Default is 0.1.
Returns
-------
bool
True if the data point is an anomaly, False otherwise.
"""
if "lstm" not in self.models:
raise ValueError(
"LSTM model has not been trained. Call train_lstm() first."
)
prediction_error = self.models["lstm"].prediction_error(data_point)
return prediction_error > threshold
[docs]
def detect_wavelet(self, data_point, wavelet_name="haar", level=1, threshold=0.1):
"""
Detect anomalies using Wavelet Transform.
Parameters
----------
data_point : float
The new data point to be analyzed.
wavelet_name : str, optional
The name of the wavelet to use for the transform (default is 'haar').
level : int, optional
The number of decomposition levels in the wavelet transform (default is 1).
threshold : float, optional
The threshold for detecting anomalies in the wavelet coefficients (default is 0.1).
Returns
-------
bool
True if the data point is an anomaly, False otherwise.
"""
self.data_window.append(data_point)
if len(self.data_window) < self.window_size:
return False # Not enough data to detect anomalies
wavelet_transform = WaveletTransform(np.array(self.data_window), wavelet_name)
coeffs = wavelet_transform.perform_wavelet_transform(level)
detail_coeffs = np.concatenate(coeffs[:-1])
return np.any(np.abs(detail_coeffs) > threshold)
[docs]
def update_model(self, data_point, model_type="knn"):
"""
Update the model with new data for online learning.
Parameters
----------
data_point : float
The new data point to update the model.
model_type : str, optional
The type of model to update ('knn', 'svm', 'autoencoder', 'lstm'). Default is 'knn'.
Returns
-------
None
"""
if model_type == "knn":
self.models["knn"]["training_data"] = np.append(
self.models["knn"]["training_data"], data_point
)
elif model_type == "svm":
self.models["svm"].update(data_point)
elif model_type == "autoencoder":
self.models["autoencoder"].update(data_point)
elif model_type == "lstm":
self.models["lstm"].update(data_point)
else:
raise ValueError(f"Unknown model type: {model_type}")
[docs]
def evaluate(self, test_data, model_type="knn"):
"""
Evaluate the performance of the anomaly detection method on a test dataset.
Parameters
----------
test_data : numpy.ndarray
The test dataset.
model_type : str, optional
The type of model to evaluate ('knn', 'svm', 'autoencoder', 'lstm'). Default is 'knn'.
Returns
-------
float
The accuracy of the anomaly detection method on the test dataset.
"""
correct = 0
for data_point in test_data:
if model_type == "knn":
prediction = self.detect_knn(data_point)
elif model_type == "svm":
prediction = self.detect_svm(data_point)
elif model_type == "autoencoder":
prediction = self.detect_autoencoder(data_point)
elif model_type == "lstm":
prediction = self.detect_lstm(data_point)
else:
raise ValueError(f"Unknown model type: {model_type}")
correct += int(prediction == self._is_anomaly(data_point))
return correct / len(test_data)
def _is_anomaly(self, data_point):
"""Placeholder method to determine if a data point is an anomaly (for evaluation)."""
return (
False # This should be implemented based on the ground truth of the dataset
)
# Simple SVM Implementation (Placeholder)
[docs]
class SimpleSVM:
def __init__(self, training_data, kernel="rbf"):
self.training_data = training_data
self.kernel = kernel
self.support_vectors = self._train(training_data)
def _train(self, data):
# Placeholder SVM training logic
# Ensure data is 1D for np.random.choice
if data.ndim > 1:
data = data.flatten()
return np.random.choice(data, size=int(len(data) * 0.1), replace=False)
[docs]
def predict(self, data_point):
# Placeholder SVM prediction logic
distance = np.min(np.abs(self.support_vectors - data_point))
return distance > np.std(self.training_data)
[docs]
def update(self, data_point):
# Placeholder SVM online update logic
self.training_data = np.append(self.training_data, data_point)
self.support_vectors = self._train(self.training_data)
# Simple Autoencoder Implementation (Placeholder)
[docs]
class SimpleAutoencoder:
def __init__(self, training_data, encoding_dim=3):
"""
Initialize the SimpleAutoencoder with training data and an encoding dimension.
If the training data is 1D, it is reshaped to 2D.
Parameters
----------
training_data : np.array
The training data used to initialize the autoencoder.
encoding_dim : int, optional
The dimension of the encoded space. Default is 3.
Example
-------
>>> model = SimpleAutoencoder(training_data=np.array([1, 2, 3]), encoding_dim=3)
"""
self.encoding_dim = encoding_dim
# Ensure training_data is at least 2D
if training_data.ndim == 1:
training_data = training_data.reshape(-1, 1)
self.training_data = training_data
# Initialize weights based on input dimension
self.weights = np.random.randn(self.training_data.shape[1], encoding_dim)
[docs]
def reconstruction_error(self, data_point):
"""
Compute the reconstruction error for a given data point.
Parameters
----------
data_point : float or np.array
The data point to compute the reconstruction error for.
Returns
-------
float
The mean squared reconstruction error.
Example
-------
>>> error = model.reconstruction_error(np.array([1, 2, 3]))
"""
dp = np.atleast_2d(np.asarray(data_point, dtype=float))
input_dim = self.weights.shape[0]
if dp.shape[1] != input_dim:
# Scalar or mismatched input: broadcast to expected input dimension
dp = np.full((1, input_dim), float(data_point))
encoded = np.dot(dp, self.weights)
reconstructed = np.dot(encoded, self.weights.T)
return float(np.mean((dp - reconstructed) ** 2))
[docs]
def update(self, data_point):
"""
Update the autoencoder with a new data point.
Parameters
----------
data_point : float or np.array
The new data point to be added for online learning.
Example
-------
>>> model.update(np.array([4, 5, 6]))
"""
dp = np.atleast_2d(np.asarray(data_point, dtype=float))
self.training_data = np.append(self.training_data, dp, axis=0)
self.weights += np.random.randn(*self.weights.shape) * 0.01
# Simple LSTM Implementation (Placeholder)
[docs]
class SimpleLSTM:
def __init__(self, training_data, hidden_units=50):
self.hidden_units = hidden_units
data = np.atleast_2d(np.asarray(training_data, dtype=float))
input_dim = data.shape[1]
self.input_dim = input_dim
self.W = np.random.randn(input_dim, hidden_units) * 0.1
self.U = np.random.randn(hidden_units, hidden_units) * 0.1
self.V = np.random.randn(hidden_units, input_dim) * 0.1
[docs]
def prediction_error(self, data_point):
# Accept scalar, 1-D, or 2-D input
dp = np.atleast_2d(np.asarray(data_point, dtype=float))
if dp.shape[1] != self.input_dim:
# Scalar passed as single point: reshape to (1, input_dim)
dp = np.full((1, self.input_dim), float(data_point))
h = np.zeros(self.hidden_units)
for t in range(dp.shape[0]):
h = np.tanh(np.dot(dp[t], self.W) + np.dot(h, self.U))
predicted = np.dot(h, self.V)
return float(np.mean((predicted - dp) ** 2))
[docs]
def update(self, data_point):
# Placeholder LSTM online update logic
self.W += np.random.randn(*self.W.shape) * 0.01
self.U += np.random.randn(*self.U.shape) * 0.01
self.V += np.random.randn(*self.V.shape) * 0.01