Source code for vitalDSP.advanced_computation.real_time_anomaly_detection

"""
Advanced Computation Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological
signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0

Key Features:
- Object-oriented design with comprehensive classes
- Multiple processing methods and functions
- NumPy integration for numerical computations
- Pattern and anomaly detection

Examples:
--------
Basic usage:
    >>> import numpy as np
    >>> from vitalDSP.advanced_computation.real_time_anomaly_detection import RealTimeAnomalyDetection
    >>> signal = np.random.randn(1000)
    >>> processor = RealTimeAnomalyDetection(signal)
    >>> result = processor.process()
    >>> print(f'Processing result: {result}')
"""

import numpy as np
from collections import deque
from vitalDSP.transforms.wavelet_transform import (
    WaveletTransform,
)  # Assuming WaveletTransform is in utils


[docs] class RealTimeAnomalyDetection: """ Comprehensive Real-Time Anomaly Detection for detecting anomalies in streaming data. This class supports multiple anomaly detection techniques including statistical methods, machine learning models, and deep learning models. It is designed for use in real-time environments with online learning capabilities. Methods ------- detect_statistical : method Detects anomalies using statistical methods like Z-score and moving average. detect_knn : method Detects anomalies using k-Nearest Neighbors (k-NN). detect_svm : method Detects anomalies using Support Vector Machine (SVM). detect_autoencoder : method Detects anomalies using Autoencoders. detect_lstm : method Detects anomalies using LSTM-based models. detect_wavelet : method Detects anomalies using wavelet transforms. update_model : method Updates the model with new data for online learning. evaluate : method Evaluates the performance of the anomaly detection method on a test dataset. Example Usage ------------- data_stream = np.sin(np.linspace(0, 10, 100)) + np.random.normal(0, 0.1, 100) anomaly_detector = RealTimeAnomalyDetection(window_size=10) # Detect anomalies using Z-score for data_point in data_stream: anomaly = anomaly_detector.detect_statistical(data_point, method='z_score', threshold=2.0) print("Anomaly (Z-Score):", anomaly) # Train and detect anomalies using k-NN anomaly_detector.train_knn(data_stream[:50]) for data_point in data_stream[50:]: anomaly = anomaly_detector.detect_knn(data_point) print("Anomaly (k-NN):", anomaly) """ def __init__(self, window_size=10): """ Initialize the RealTimeAnomalyDetection class with a specified window size. Parameters ---------- window_size : int The number of data points to consider for detecting anomalies. """ self.window_size = window_size self.data_window = deque(maxlen=window_size) self.models = {}
[docs] def detect_statistical(self, data_point, method="z_score", threshold=2.0, **kwargs): """ Detect anomalies using statistical methods like Z-score, moving average, etc. Parameters ---------- data_point : float The new data point to be analyzed. method : str, optional The statistical method to use ('z_score' or 'moving_average'). Default is 'z_score'. threshold : float, optional The threshold value for detecting anomalies. Default is 2.0. Returns ------- bool True if the data point is an anomaly, False otherwise. """ self.data_window.append(data_point) if len(self.data_window) < self.window_size: return False # Not enough data to detect anomalies if method == "z_score": return self._z_score_detection(data_point, threshold) elif method == "moving_average": return self._moving_average_detection(data_point, threshold) else: raise ValueError(f"Unknown statistical method: {method}")
def _z_score_detection(self, data_point, threshold): """Detect anomalies using Z-score method.""" mean = np.mean(self.data_window) std_dev = np.std(self.data_window) z_score = (data_point - mean) / std_dev return abs(z_score) > threshold def _moving_average_detection(self, data_point, threshold): """Detect anomalies using moving average method.""" moving_avg = np.mean(self.data_window) return abs(data_point - moving_avg) > threshold
[docs] def train_knn(self, training_data, k=5): """ Train a k-Nearest Neighbors (k-NN) model on the training data. Parameters ---------- training_data : numpy.ndarray The training dataset. k : int, optional The number of nearest neighbors to consider. Default is 5. Returns ------- None """ self.models["knn"] = {"training_data": np.array(training_data), "k": k}
[docs] def detect_knn(self, data_point): """ Detect anomalies using the k-Nearest Neighbors (k-NN) method. Parameters ---------- data_point : float The new data point to be analyzed. Returns ------- bool True if the data point is an anomaly, False otherwise. """ if "knn" not in self.models: raise ValueError("k-NN model has not been trained. Call train_knn() first.") training_data = self.models["knn"]["training_data"] k = self.models["knn"]["k"] distances = np.abs(training_data - data_point) nearest_neighbors = np.sort(distances)[:k] mean_distance = np.mean(nearest_neighbors) return mean_distance > np.std( training_data ) # Anomaly if distance is larger than standard deviation
[docs] def train_svm(self, training_data, kernel="rbf"): """ Train a Support Vector Machine (SVM) model on the training data. Parameters ---------- training_data : numpy.ndarray The training dataset. kernel : str, optional The kernel type for SVM ('linear', 'poly', 'rbf'). Default is 'rbf'. Returns ------- None """ self.models["svm"] = SimpleSVM(training_data, kernel)
[docs] def detect_svm(self, data_point): """ Detect anomalies using the Support Vector Machine (SVM) method. Parameters ---------- data_point : float The new data point to be analyzed. Returns ------- bool True if the data point is an anomaly, False otherwise. """ if "svm" not in self.models: raise ValueError("SVM model has not been trained. Call train_svm() first.") return self.models["svm"].predict(data_point)
[docs] def train_autoencoder(self, training_data, encoding_dim=3): """ Train an Autoencoder model on the training data. Parameters ---------- training_data : numpy.ndarray The training dataset. encoding_dim : int, optional The dimension of the encoding layer. Default is 3. Returns ------- None """ self.models["autoencoder"] = SimpleAutoencoder(training_data, encoding_dim)
[docs] def detect_autoencoder(self, data_point, threshold=0.1): """ Detect anomalies using the Autoencoder method. Parameters ---------- data_point : float The new data point to be analyzed. threshold : float, optional The reconstruction error threshold for detecting anomalies. Default is 0.1. Returns ------- bool True if the data point is an anomaly, False otherwise. """ if "autoencoder" not in self.models: raise ValueError( "Autoencoder model has not been trained. Call train_autoencoder() first." ) reconstruction_error = self.models["autoencoder"].reconstruction_error( data_point ) return reconstruction_error > threshold
[docs] def train_lstm(self, training_data, hidden_units=50): """ Train an LSTM-based model on the training data. Parameters ---------- training_data : numpy.ndarray The training dataset. hidden_units : int, optional The number of hidden units in the LSTM. Default is 50. Returns ------- None """ self.models["lstm"] = SimpleLSTM(training_data, hidden_units)
[docs] def detect_lstm(self, data_point, threshold=0.1): """ Detect anomalies using the LSTM-based model. Parameters ---------- data_point : float The new data point to be analyzed. threshold : float, optional The prediction error threshold for detecting anomalies. Default is 0.1. Returns ------- bool True if the data point is an anomaly, False otherwise. """ if "lstm" not in self.models: raise ValueError( "LSTM model has not been trained. Call train_lstm() first." ) prediction_error = self.models["lstm"].prediction_error(data_point) return prediction_error > threshold
[docs] def detect_wavelet(self, data_point, wavelet_name="haar", level=1, threshold=0.1): """ Detect anomalies using Wavelet Transform. Parameters ---------- data_point : float The new data point to be analyzed. wavelet_name : str, optional The name of the wavelet to use for the transform (default is 'haar'). level : int, optional The number of decomposition levels in the wavelet transform (default is 1). threshold : float, optional The threshold for detecting anomalies in the wavelet coefficients (default is 0.1). Returns ------- bool True if the data point is an anomaly, False otherwise. """ self.data_window.append(data_point) if len(self.data_window) < self.window_size: return False # Not enough data to detect anomalies wavelet_transform = WaveletTransform(np.array(self.data_window), wavelet_name) coeffs = wavelet_transform.perform_wavelet_transform(level) detail_coeffs = np.concatenate(coeffs[:-1]) return np.any(np.abs(detail_coeffs) > threshold)
[docs] def update_model(self, data_point, model_type="knn"): """ Update the model with new data for online learning. Parameters ---------- data_point : float The new data point to update the model. model_type : str, optional The type of model to update ('knn', 'svm', 'autoencoder', 'lstm'). Default is 'knn'. Returns ------- None """ if model_type == "knn": self.models["knn"]["training_data"] = np.append( self.models["knn"]["training_data"], data_point ) elif model_type == "svm": self.models["svm"].update(data_point) elif model_type == "autoencoder": self.models["autoencoder"].update(data_point) elif model_type == "lstm": self.models["lstm"].update(data_point) else: raise ValueError(f"Unknown model type: {model_type}")
[docs] def evaluate(self, test_data, model_type="knn"): """ Evaluate the performance of the anomaly detection method on a test dataset. Parameters ---------- test_data : numpy.ndarray The test dataset. model_type : str, optional The type of model to evaluate ('knn', 'svm', 'autoencoder', 'lstm'). Default is 'knn'. Returns ------- float The accuracy of the anomaly detection method on the test dataset. """ correct = 0 for data_point in test_data: if model_type == "knn": prediction = self.detect_knn(data_point) elif model_type == "svm": prediction = self.detect_svm(data_point) elif model_type == "autoencoder": prediction = self.detect_autoencoder(data_point) elif model_type == "lstm": prediction = self.detect_lstm(data_point) else: raise ValueError(f"Unknown model type: {model_type}") correct += int(prediction == self._is_anomaly(data_point)) return correct / len(test_data)
def _is_anomaly(self, data_point): """Placeholder method to determine if a data point is an anomaly (for evaluation).""" return ( False # This should be implemented based on the ground truth of the dataset )
# Simple SVM Implementation (Placeholder)
[docs] class SimpleSVM: def __init__(self, training_data, kernel="rbf"): self.training_data = training_data self.kernel = kernel self.support_vectors = self._train(training_data) def _train(self, data): # Placeholder SVM training logic # Ensure data is 1D for np.random.choice if data.ndim > 1: data = data.flatten() return np.random.choice(data, size=int(len(data) * 0.1), replace=False)
[docs] def predict(self, data_point): # Placeholder SVM prediction logic distance = np.min(np.abs(self.support_vectors - data_point)) return distance > np.std(self.training_data)
[docs] def update(self, data_point): # Placeholder SVM online update logic self.training_data = np.append(self.training_data, data_point) self.support_vectors = self._train(self.training_data)
# Simple Autoencoder Implementation (Placeholder)
[docs] class SimpleAutoencoder: def __init__(self, training_data, encoding_dim=3): """ Initialize the SimpleAutoencoder with training data and an encoding dimension. If the training data is 1D, it is reshaped to 2D. Parameters ---------- training_data : np.array The training data used to initialize the autoencoder. encoding_dim : int, optional The dimension of the encoded space. Default is 3. Example ------- >>> model = SimpleAutoencoder(training_data=np.array([1, 2, 3]), encoding_dim=3) """ self.encoding_dim = encoding_dim # Ensure training_data is at least 2D if training_data.ndim == 1: training_data = training_data.reshape(-1, 1) self.training_data = training_data # Initialize weights based on input dimension self.weights = np.random.randn(self.training_data.shape[1], encoding_dim)
[docs] def reconstruction_error(self, data_point): """ Compute the reconstruction error for a given data point. Parameters ---------- data_point : np.array The data point to compute the reconstruction error for. Returns ------- float The mean squared reconstruction error. Example ------- >>> error = model.reconstruction_error(np.array([1, 2, 3])) """ encoded = np.dot(data_point, self.weights) reconstructed = np.dot(encoded, self.weights.T) return np.mean((data_point - reconstructed) ** 2)
[docs] def update(self, data_point): """ Update the autoencoder with a new data point. Parameters ---------- data_point : np.array The new data point to be added for online learning. Notes ----- If the `data_point` is 1D, it is reshaped to 2D before appending. Example ------- >>> model.update(np.array([4, 5, 6])) """ # Ensure the new data point is 2D if data_point.ndim == 1: data_point = data_point.reshape(1, -1) # Append the new data point to the training data self.training_data = np.append(self.training_data, data_point, axis=0) # Update weights (placeholder update logic) self.weights += np.random.randn(*self.weights.shape) * 0.01
# Simple LSTM Implementation (Placeholder)
[docs] class SimpleLSTM: def __init__(self, training_data, hidden_units=50): self.hidden_units = hidden_units self.W = np.random.randn(training_data.shape[1], hidden_units) self.U = np.random.randn(hidden_units, hidden_units) self.V = np.random.randn(hidden_units, training_data.shape[1])
[docs] def prediction_error(self, data_point): h = np.zeros(self.hidden_units) for t in range(len(data_point)): h = np.tanh(np.dot(data_point[t], self.W) + np.dot(h, self.U)) predicted = np.dot(h, self.V) return np.mean((predicted - data_point) ** 2)
[docs] def update(self, data_point): # Placeholder LSTM online update logic self.W += np.random.randn(*self.W.shape) * 0.01 self.U += np.random.randn(*self.U.shape) * 0.01 self.V += np.random.randn(*self.V.shape) * 0.01