Source code for vitalDSP.advanced_computation.anomaly_detection

"""
Anomaly Detection Module for Physiological Signal Processing

This module provides comprehensive anomaly detection capabilities for physiological
signals including ECG, PPG, EEG, and other vital signs. It implements multiple
detection methods including statistical approaches, machine learning techniques,
and frequency-domain analysis for identifying unusual patterns and artifacts.

Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0

Key Features:
- Statistical anomaly detection (Z-score, IQR)
- Moving average and rolling window methods
- Local Outlier Factor (LOF) for density-based detection
- Fourier-based frequency domain analysis
- Real-time streaming anomaly detection
- Performance monitoring and optimization

Examples:
---------
Basic Z-score anomaly detection:
    >>> import numpy as np
    >>> from vitalDSP.advanced_computation.anomaly_detection import AnomalyDetection
    >>> signal = np.sin(np.linspace(0, 10, 100)) + np.random.normal(0, 0.1, 100)
    >>> anomaly_detector = AnomalyDetection(signal)
    >>> anomalies = anomaly_detector.detect_anomalies(method="z_score", threshold=2.0)
    >>> print(f"Detected {len(anomalies)} anomalies")

Moving average detection:
    >>> anomalies_ma = anomaly_detector.detect_anomalies(method="moving_average", window_size=5, threshold=0.5)
    >>> print(f"Moving average anomalies: {len(anomalies_ma)}")

LOF-based detection:
    >>> anomalies_lof = anomaly_detector.detect_anomalies(method="lof", n_neighbors=20)
    >>> print(f"LOF anomalies: {len(anomalies_lof)}")

FFT-based detection:
    >>> anomalies_fft = anomaly_detector.detect_anomalies(method="fft", threshold=0.1)
    >>> print(f"FFT anomalies: {len(anomalies_fft)}")
"""

import numpy as np
import warnings
from vitalDSP.utils.quality_performance.performance_monitoring import (
    monitor_analysis_operation,
)


[docs] class AnomalyDetection: """ Comprehensive Anomaly Detection for detecting anomalies in real-time from streaming data. This class offers multiple methods to detect anomalies in a given signal, including statistical methods, moving averages, Local Outlier Factor (LOF), and Fourier-based methods. Methods ------- detect_anomalies : function Detects anomalies using various methods including z-score, moving average, custom LOF, and more. Examples -------- >>> import numpy as np >>> from vitalDSP.advanced_computation.anomaly_detection import AnomalyDetection >>> >>> # Example 1: Z-score anomaly detection >>> signal = np.sin(np.linspace(0, 10, 100)) + np.random.normal(0, 0.1, 100) >>> anomaly_detector = AnomalyDetection(signal) >>> anomalies_z_score = anomaly_detector.detect_anomalies(method="z_score", threshold=2.0) >>> print(f"Z-score anomalies: {len(anomalies_z_score)}") >>> >>> # Example 2: Moving average anomaly detection >>> anomalies_moving_avg = anomaly_detector.detect_anomalies(method="moving_average", window_size=5, threshold=0.5) >>> print(f"Moving average anomalies: {len(anomalies_moving_avg)}") >>> >>> # Example 3: LOF anomaly detection >>> anomalies_lof = anomaly_detector.detect_anomalies(method="lof", n_neighbors=20) >>> print(f"LOF anomalies: {len(anomalies_lof)}") >>> >>> # Example 4: FFT-based anomaly detection >>> anomalies_fft = anomaly_detector.detect_anomalies(method="fft", threshold=0.1) >>> print(f"FFT anomalies: {len(anomalies_fft)}") """ def __init__(self, signal): """ Initialize the AnomalyDetection class with the given signal. Parameters ---------- signal : numpy.ndarray The input signal on which anomaly detection is to be performed. """ self.signal = signal
[docs] @monitor_analysis_operation def detect_anomalies(self, method="z_score", **kwargs): """ Detect anomalies in the signal using the specified method. Parameters ---------- method : str, optional The method to use for detecting anomalies. Options: 'z_score', 'moving_average', 'lof', 'fft', 'threshold'. Default is 'z_score'. **kwargs : additional arguments Additional parameters depending on the chosen method. Returns ------- numpy.ndarray Indices of the detected anomalies. Raises ------ ValueError If the specified method is unknown. Examples -------- >>> anomalies_z_score = anomaly_detector.detect_anomalies(method="z_score", threshold=2.0) >>> anomalies_moving_avg = anomaly_detector.detect_anomalies(method="moving_average", window_size=5, threshold=0.5) """ if method == "z_score": return self._z_score_anomaly_detection(kwargs.get("threshold", 3.0)) elif method == "moving_average": return self._moving_average_anomaly_detection( kwargs.get("window_size", 5), kwargs.get("threshold", "auto"), # Changed to 'auto' ) elif method == "lof": return self._lof_anomaly_detection(kwargs.get("n_neighbors", 20)) elif method == "fft": return self._fft_anomaly_detection(kwargs.get("threshold", 1.5)) elif method == "threshold": return self._threshold_anomaly_detection(kwargs.get("threshold", 1.0)) else: raise ValueError(f"Unknown method: {method}")
def _z_score_anomaly_detection(self, threshold): """ Z-Score based anomaly detection. Anomalies are detected by calculating the z-score for each data point and identifying those that exceed the specified threshold. Parameters ---------- threshold : float The z-score threshold for anomaly detection. Returns ------- numpy.ndarray Indices of the detected anomalies. Examples -------- >>> anomalies = anomaly_detector._z_score_anomaly_detection(threshold=2.0) >>> print(anomalies) """ mean = np.mean(self.signal) std = np.std(self.signal) # Avoid divide by zero warning if std > 0: z_scores = (self.signal - mean) / std else: z_scores = np.zeros_like(self.signal) # If std is 0, all z-scores are 0 anomalies = np.where(np.abs(z_scores) > threshold)[0] return anomalies def _moving_average_anomaly_detection(self, window_size, threshold): """ Moving average based anomaly detection with adaptive thresholding. Anomalies are detected by calculating the moving average of the signal and identifying points where the deviation from the moving average exceeds the specified threshold. **IMPORTANT UPDATE:** The threshold parameter now supports: 1. 'auto' (recommended): Automatically computes threshold as 3 * std(residuals) 2. Numeric value: Interpreted as multiple of standard deviation (e.g., 3.0 = 3 sigma) 3. None: Defaults to 'auto' This change prevents the previous issue where default threshold=0.5 was too low for physiological signals, causing 95-99% false positive rates. Parameters ---------- window_size : int The window size for the moving average. threshold : float, str, or None The threshold for detecting anomalies. Options: - 'auto' or None: Automatically computes as 3 * std(residuals) - Numeric value: Number of standard deviations (e.g., 2.0, 3.0, 4.0) For backward compatibility, absolute values > 10 are treated as absolute thresholds. Returns ------- numpy.ndarray Indices of the detected anomalies. Examples -------- >>> # Recommended: Use adaptive threshold >>> anomalies = anomaly_detector._moving_average_anomaly_detection(window_size=5, threshold='auto') >>> # Use 3 standard deviations >>> anomalies = anomaly_detector._moving_average_anomaly_detection(window_size=5, threshold=3.0) >>> # Legacy: Use absolute threshold (not recommended) >>> anomalies = anomaly_detector._moving_average_anomaly_detection(window_size=5, threshold=50.0) Notes ----- For physiological signals (ECG, PPG), the deviation from moving average is typically 20-100 units. The old default of 0.5 was inappropriate and caused false positives. The new adaptive threshold automatically adjusts to signal characteristics. """ moving_avg = np.convolve( self.signal, np.ones(window_size) / window_size, mode="valid" ) residuals = np.abs(self.signal[window_size - 1 :] - moving_avg) # Adaptive threshold calculation if threshold is None or threshold == "auto": # Default: 3 standard deviations (catches ~0.3% of points in normal distribution) threshold_value = 3.0 * np.std(residuals) elif isinstance(threshold, (int, float)): # Interpret small values as multiples of std, large values as absolute if threshold < 10: # Likely a sigma multiplier threshold_value = threshold * np.std(residuals) else: # Likely an absolute threshold threshold_value = threshold else: # Fallback to adaptive warnings.warn( f"Invalid threshold type: {type(threshold)}. Using adaptive threshold." ) threshold_value = 3.0 * np.std(residuals) anomalies = np.where(residuals > threshold_value)[0] + window_size - 1 return anomalies def _lof_anomaly_detection(self, n_neighbors): """ Local Outlier Factor (LOF) based anomaly detection - OPTIMIZED VERSION. LOF identifies anomalies by comparing the local density of a data point to that of its neighbors. Points with significantly lower density are considered anomalies. OPTIMIZATION: Uses spatial data structures for O(n log n) complexity instead of O(n²). Parameters ---------- n_neighbors : int Number of neighbors to use for LOF. Returns ------- numpy.ndarray Indices of the detected anomalies. Examples -------- >>> anomalies = anomaly_detector._lof_anomaly_detection(n_neighbors=20) >>> print(anomalies) """ try: from sklearn.neighbors import NearestNeighbors except ImportError: # Fallback to original implementation if sklearn not available return self._lof_anomaly_detection_fallback(n_neighbors) n_points = len(self.signal) # Input validation if n_points < n_neighbors + 1: return np.array([]) # Not enough points for LOF # Create phase space (embedding dimension = 2) phase_space = np.column_stack((self.signal[:-1], self.signal[1:])) # Use spatial data structure for efficient neighbor search nbrs = NearestNeighbors(n_neighbors=n_neighbors + 1, algorithm="ball_tree") nbrs.fit(phase_space) # Find neighbors for each point distances, indices = nbrs.kneighbors(phase_space) # Compute reachability distances reachability_distances = np.zeros((len(phase_space), n_neighbors)) for i in range(len(phase_space)): for j in range(n_neighbors): # Reachability distance is max of distance and k-distance reachability_distances[i, j] = max( distances[i, j + 1], # j+1 because we exclude the point itself distances[indices[i, j + 1], n_neighbors], # k-distance of neighbor ) # Compute local reachability density (LRD) lrd = np.zeros(len(phase_space)) for i in range(len(phase_space)): avg_reach_dist = np.mean(reachability_distances[i]) lrd[i] = 1 / (avg_reach_dist + 1e-10) # Compute LOF lof = np.zeros(len(phase_space)) for i in range(len(phase_space)): neighbor_lrd = lrd[indices[i, 1 : n_neighbors + 1]] # Exclude self lof[i] = np.mean(neighbor_lrd) / lrd[i] # LOF > 1.5 threshold selected to reduce false positives from normal physiological variability. anomalies = np.where(lof > 1.5)[0] return anomalies def _lof_anomaly_detection_fallback(self, n_neighbors): """ Fallback LOF implementation for when sklearn is not available. Uses sampling to reduce computational complexity. """ n_points = len(self.signal) if n_points < n_neighbors + 1: return np.array([]) # Sample-based approach to reduce complexity sample_size = min(1000, n_points) # Limit sample size if n_points > sample_size: # Sort indices to preserve temporal ordering for valid delay embedding. sample_indices = np.sort( np.random.choice(n_points, sample_size, replace=False) ) sample_signal = self.signal[sample_indices] else: sample_signal = self.signal sample_indices = np.arange(n_points) # Create phase space phase_space = np.column_stack((sample_signal[:-1], sample_signal[1:])) # Compute distances only for sampled points n_sample = len(phase_space) distances = np.zeros((n_sample, n_sample)) for i in range(n_sample): for j in range(i + 1, n_sample): dist = np.linalg.norm(phase_space[i] - phase_space[j]) distances[i, j] = dist distances[j, i] = dist # Rest of LOF computation on sampled data sorted_distances = np.sort(distances, axis=1) reachability_distances = np.zeros_like(distances) for i in range(n_sample): reachability_distances[i, :] = np.maximum( distances[i, :], sorted_distances[i, n_neighbors] ) # Compute LRD lrd = np.zeros(n_sample) for i in range(n_sample): neighbors = np.argsort(distances[i, :])[1 : n_neighbors + 1] lrd[i] = 1 / (np.mean(reachability_distances[i, neighbors]) + 1e-10) # Compute LOF lof = np.zeros(n_sample) for i in range(n_sample): neighbors = np.argsort(distances[i, :])[1 : n_neighbors + 1] lof[i] = np.mean(lrd[neighbors]) / lrd[i] # LOF > 1.5 threshold selected to reduce false positives from normal physiological variability. sample_anomalies = np.where(lof > 1.5)[0] # Map back to original indices if n_points > sample_size: anomalies = sample_indices[sample_anomalies] else: anomalies = sample_anomalies return anomalies def _fft_anomaly_detection(self, threshold): """ FFT based anomaly detection by analyzing the frequency domain. Anomalies are detected by transforming the signal to the frequency domain using FFT and identifying components whose magnitude exceeds the specified threshold. Parameters ---------- threshold : float The threshold for detecting anomalies based on frequency domain components. Returns ------- numpy.ndarray Indices of the detected anomalies. Examples -------- >>> anomalies = anomaly_detector._fft_anomaly_detection(threshold=1.5) >>> print(anomalies) """ fft_values = np.fft.fft(self.signal) magnitude = np.abs(fft_values) mean_magnitude = np.mean(magnitude) anomalous_fft = np.where(magnitude > threshold * mean_magnitude, fft_values, 0) anomaly_signal = np.abs(np.fft.ifft(anomalous_fft)) anomaly_std = np.std(anomaly_signal) if anomaly_std == 0: return np.array([], dtype=int) anomaly_threshold = np.mean(anomaly_signal) + anomaly_std return np.where(anomaly_signal > anomaly_threshold)[0] def _threshold_anomaly_detection(self, threshold): """ Simple threshold based anomaly detection. Anomalies are detected by identifying points in the signal that exceed the specified threshold. Parameters ---------- threshold : float The threshold value for detecting anomalies. Returns ------- numpy.ndarray Indices of the detected anomalies. Examples -------- >>> anomalies = anomaly_detector._threshold_anomaly_detection(threshold=1.0) >>> print(anomalies) """ anomalies = np.where(np.abs(self.signal) > threshold)[0] return anomalies