Source code for vitalDSP.signal_quality_assessment.blind_source_separation

"""
Signal Quality Assessment Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological
signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0

Key Features:
- Multiple processing methods and functions
- NumPy integration for numerical computations

Examples:
---------
Basic usage:
    >>> import numpy as np
    >>> from vitalDSP.signal_quality_assessment.blind_source_separation import BlindSourceSeparation
    >>> signal = np.random.randn(1000)
    >>> processor = BlindSourceSeparation(signal)
    >>> result = processor.process()
    >>> print(f'Processing result: {result}')
"""

import numpy as np


[docs] def center_signal(signal): """ Center the signal by subtracting the mean. Parameters ---------- signal : numpy.ndarray The input signal matrix, where each row is a different signal. Returns ------- centered_signal : numpy.ndarray The centered signal matrix. mean_signal : numpy.ndarray The mean of the original signal. Examples -------- >>> signal = np.array([[1, 2, 3], [4, 5, 6]]) >>> centered_signal, mean_signal = center_signal(signal) >>> print(centered_signal) """ signal = np.atleast_2d(signal) # Check for NaN values if np.isnan(signal).any(): raise ValueError( "Input signal contains NaN values. Please clean the data before processing." ) mean_signal = np.mean(signal, axis=1, keepdims=True) centered_signal = signal - mean_signal return centered_signal, mean_signal
[docs] def whiten_signal(signal): """ Whiten the signal (decorrelate and scale to unit variance). Parameters ---------- signal : numpy.ndarray The input centered signal matrix. Returns ------- whitened_signal : numpy.ndarray The whitened signal matrix. whitening_matrix : numpy.ndarray The matrix used to whiten the signal. Examples -------- >>> signal = np.array([[1, 2, 3], [4, 5, 6]]) >>> whitened_signal, whitening_matrix = whiten_signal(signal) >>> print(whitened_signal) """ signal = np.atleast_2d(signal) # Check for NaN values if np.isnan(signal).any(): raise ValueError( "Input signal contains NaN values. Please clean the data before processing." ) # signal shape is (n_channels, n_samples); rowvar=True computes # the (n_channels × n_channels) covariance matrix, not (n_samples × n_samples). cov = np.cov(signal, rowvar=True) # Add a small positive constant to avoid numerical instability eigenvalues, eigenvectors = np.linalg.eigh(cov) eigenvalues = np.where(eigenvalues <= 0, 1e-10, eigenvalues) whitening_matrix = np.dot(eigenvectors, np.diag(1.0 / np.sqrt(eigenvalues))) # signal: (n_channels, n_samples); whitening_matrix: (n_channels, n_channels) # whitened = W^T @ signal → (n_channels, n_samples) whitened_signal = np.dot(whitening_matrix.T, signal) return whitened_signal, whitening_matrix
[docs] def create_synthetic_components(signal, n_components=3, method="derivatives"): """ Create synthetic signal components from a 1D signal for ICA processing. When only a single signal channel is available, this function generates additional synthetic components to enable blind source separation techniques like ICA and PCA. This is particularly useful for physiological signals where we want to separate artifacts from the underlying signal. Parameters ---------- signal : numpy.ndarray The 1D input signal n_components : int, optional (default=3) Number of synthetic components to generate (including original) Minimum is 2, recommended 3-5 method : str, optional (default='derivatives') Method for creating synthetic components: - 'derivatives': Uses signal derivatives and delayed versions - 'wavelet': Uses wavelet decomposition at different scales - 'frequency': Uses bandpass filtered versions Returns ------- synthetic_signals : numpy.ndarray Matrix of shape (n_components, len(signal)) with synthetic components Notes ----- The synthetic components are designed to capture different aspects of the signal: - Original signal - First derivative (captures rapid changes, useful for detecting spikes) - Delayed version (captures temporal patterns) - Second derivative (captures acceleration, useful for motion artifacts) - Smoothed version (captures baseline trends) Examples -------- >>> import numpy as np >>> signal = np.sin(2*np.pi*1*np.linspace(0,1,1000)) + 0.1*np.random.randn(1000) >>> synthetic = create_synthetic_components(signal, n_components=3) >>> print(synthetic.shape) # (3, 1000) """ if signal.ndim > 1: raise ValueError( "Input must be a 1D signal. Use signals directly if multi-channel." ) if n_components < 2: raise ValueError("n_components must be at least 2 for ICA to work.") signal = np.asarray(signal) n_samples = len(signal) components = [] if method == "derivatives": # Component 1: Original signal components.append(signal) # Component 2: First derivative (captures rapid changes) if n_components >= 2: derivative1 = np.gradient(signal) components.append(derivative1) # Component 3: Delayed version (captures temporal patterns) if n_components >= 3: delay = max(1, n_samples // 20) # 5% delay delayed = np.roll(signal, delay) delayed[:delay] = delayed[delay] # Avoid wrap-around artifacts components.append(delayed) # Component 4: Second derivative (captures acceleration) if n_components >= 4: derivative2 = np.gradient(derivative1) components.append(derivative2) # Component 5: Smoothed version (captures baseline) if n_components >= 5: from scipy.ndimage import gaussian_filter1d smoothed = gaussian_filter1d(signal, sigma=max(1, n_samples // 100)) components.append(smoothed) # Additional components: More delays with different lags for i in range(5, n_components): delay = max(1, n_samples // (10 + i)) delayed = np.roll(signal, delay) delayed[:delay] = delayed[delay] components.append(delayed) elif method == "frequency": # Component 1: Original signal components.append(signal) # Create bandpass filtered versions at different frequency bands from scipy.signal import butter, filtfilt # Determine signal sampling frequency (assume 250 Hz if not specified) fs = 250 # This could be passed as parameter # Define frequency bands bands = [ (0.5, 5), # Very low frequency (baseline) (5, 15), # Low frequency (15, 40), # Mid frequency (40, 100), # High frequency ] for low, high in bands[: n_components - 1]: try: nyq = fs / 2 low_norm = low / nyq high_norm = high / nyq b, a = butter(2, [low_norm, high_norm], btype="band") filtered = filtfilt(b, a, signal) components.append(filtered) except Exception as err: print(f"Error creating synthetic components: {err}") # Fallback to derivative if filtering fails components.append(np.gradient(signal)) if len(components) >= n_components: break # Fill remaining with derivatives if needed while len(components) < n_components: components.append(np.gradient(components[-1])) else: raise ValueError(f"Unknown method: {method}. Use 'derivatives' or 'frequency'.") # Convert to array and ensure correct shape synthetic_signals = np.array(components[:n_components]) return synthetic_signals
[docs] def ica_artifact_removal( signals, max_iter=1000, tol=1e-5, auto_synthetic=True, n_components=3 ): """ Perform ICA to separate sources and remove artifacts. This enhanced version automatically handles both multi-channel and single-channel (1D) signals. For 1D signals, it creates synthetic components to enable ICA. Parameters ---------- signals : numpy.ndarray The mixed signals. Can be: - 1D array: Single signal (will auto-generate synthetic components) - 2D array: Multiple signals, where each row is a different signal max_iter : int, optional (default=1000) The maximum number of iterations for the ICA algorithm. tol : float, optional (default=1e-5) Tolerance for the convergence of the algorithm. auto_synthetic : bool, optional (default=True) If True and input is 1D, automatically create synthetic components n_components : int, optional (default=3) Number of synthetic components to create for 1D signals Only used when auto_synthetic=True Returns ------- separated_signals : numpy.ndarray The matrix of separated signals. If input was 1D, returns the first (cleaned) component. Raises ------ ValueError If signals contain NaN values or have insufficient dimensionality Examples -------- >>> # Example 1: Multi-channel signals >>> signals = np.array([[1, 2, 3], [4, 5, 6]]) >>> separated = ica_artifact_removal(signals) >>> print(separated.shape) # (2, 3) >>> >>> # Example 2: Single channel (1D) signal - auto synthetic >>> signal_1d = np.sin(2*np.pi*np.linspace(0,10,1000)) + 0.1*np.random.randn(1000) >>> cleaned = ica_artifact_removal(signal_1d) >>> print(cleaned.shape) # (1000,) - returns cleaned 1D signal >>> >>> # Example 3: Single channel with custom n_components >>> cleaned = ica_artifact_removal(signal_1d, n_components=5) >>> print(cleaned.shape) # (1000,) """ # Check for NaN values first if np.isnan(signals).any(): raise ValueError( "Input signals contain NaN values. Please clean the data before processing." ) # Handle 1D input is_1d_input = signals.ndim == 1 if is_1d_input: if not auto_synthetic: raise ValueError( "ICA requires at least 2 signal components. " "For 1D signals, set auto_synthetic=True to generate synthetic components, " "or provide multi-channel signals." ) # Create synthetic components signals = create_synthetic_components( signals, n_components=n_components, method="derivatives" ) signals = np.atleast_2d(signals) # Validate dimensionality if signals.shape[0] < 2: raise ValueError( f"ICA requires at least 2 signal components, got {signals.shape[0]}. " f"Use auto_synthetic=True for 1D signals." ) # Center the signals centered_signals, _ = center_signal(signals) # Whiten the signals whitened_signals, _ = whiten_signal(centered_signals) num_components = whitened_signals.shape[0] weights = np.random.rand(num_components, num_components) for i in range(max_iter): # Update weights using FastICA weights_new = np.dot( np.tanh(np.dot(weights, whitened_signals)).dot(whitened_signals.T) / whitened_signals.shape[1], weights.T, ) weights_new /= np.linalg.norm(weights_new, axis=1, keepdims=True) # Check convergence if np.max(np.abs(np.abs(np.diag(np.dot(weights_new, weights.T))) - 1)) < tol: break weights = weights_new separated_signals = np.dot(weights, whitened_signals) # If input was 1D, return the first (cleaned) component as 1D array if is_1d_input: # The first separated component typically contains the cleaned signal # while artifacts are separated into other components return separated_signals[0] return separated_signals
[docs] def pca_artifact_removal(signals, n_components=None): """ Perform PCA to reduce dimensionality and remove artifacts. Parameters ---------- signals : numpy.ndarray The mixed signals matrix, where each row is a different signal. n_components : int, optional (default=None) Number of components to keep. If None, all components are kept. Returns ------- reduced_signals : numpy.ndarray The matrix of reduced signals. Examples -------- >>> signals = np.array([[1, 2, 3], [4, 5, 6]]) >>> reduced_signals = pca_artifact_removal(signals, n_components=1) >>> print(reduced_signals) """ signals = np.atleast_2d(signals) # Check for NaN values if np.isnan(signals).any(): raise ValueError( "Input signals contain NaN values. Please clean the data before processing." ) # Center the signals centered_signals, mean_signal = center_signal(signals) # Compute covariance matrix cov_matrix = np.cov(centered_signals) # Compute eigenvalues and eigenvectors eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix) # Sort eigenvectors by eigenvalues idx = np.argsort(eigenvalues)[::-1] eigenvectors = eigenvectors[:, idx] if n_components is not None: eigenvectors = eigenvectors[:, :n_components] reduced_signals = np.dot(eigenvectors.T, centered_signals) return reduced_signals
[docs] def jade_ica(signals, max_iter=1000, tol=1e-5): """ Perform Joint Approximate Diagonalization of Eigenmatrices (JADE) for ICA. Parameters ---------- signals : numpy.ndarray The mixed signals matrix, where each row is a different signal. max_iter : int, optional (default=1000) The maximum number of iterations for the ICA algorithm. tol : float, optional (default=1e-5) Tolerance for the convergence of the algorithm. Returns ------- separated_signals : numpy.ndarray The matrix of separated signals. Examples -------- >>> # Input must be square matrix >>> signals = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]) >>> separated_signals = jade_ica(signals) >>> print(separated_signals) """ signals = np.atleast_2d(signals) # Check for NaN values if np.isnan(signals).any(): raise ValueError( "Input signals contain NaN values. Please clean the data before processing." ) # Ensure there are at least two signals if signals.shape[0] < 2: raise ValueError("Input must contain at least two signals for JADE.") def jade(signals): # Whiten the signals whitened_signals, _ = whiten_signal(signals) # Compute the covariance matrix (ensure it's square) cov_matrix = np.cov(whitened_signals, rowvar=False) if cov_matrix.shape[0] != cov_matrix.shape[1]: raise ValueError("Covariance matrix must be square for JADE.") # Perform eigenvalue decomposition eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix) # Avoid numerical issues by ensuring all eigenvalues are positive eigenvalues = np.where(eigenvalues <= 0, 1e-10, eigenvalues) # Fix dimensionality of B matrix to match whitened signals B = np.dot(eigenvectors, np.diag(np.sqrt(eigenvalues))) U = np.dot(B.T, whitened_signals.T).T # Ensure U is square before inverting if U.shape[0] != U.shape[1]: raise ValueError("Matrix U must be square to perform inversion.") return np.dot(np.linalg.inv(U), whitened_signals) # Call the internal JADE algorithm separated_signals = jade(signals) return separated_signals