Source code for vitalDSP.transforms.stft

"""
Signal Transforms Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological
signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0

Key Features:
- Object-oriented design with comprehensive classes
- Multiple processing methods and functions
- NumPy integration for numerical computations
- Performance optimization

Examples:
--------
Basic usage:
    >>> import numpy as np
    >>> from vitalDSP.transforms.stft import Stft
    >>> signal = np.random.randn(1000)
    >>> processor = Stft(signal)
    >>> result = processor.process()
    >>> print(f'Processing result: {result}')
"""

import numpy as np


[docs] class STFT: """ A class to perform Short-Time Fourier Transform (STFT) for analyzing time-varying signals. The STFT is used to understand how the frequency content of a signal evolves over time by dividing the signal into overlapping segments, applying a Fourier Transform to each segment, and assembling the results into a time-frequency representation. Methods ------- compute_stft : method Computes the STFT of the signal. """ def __init__(self, signal, window_size=256, hop_size=128, n_fft=512): """ Initialize the STFT class with the input signal and parameters for the STFT computation. Parameters ---------- signal : numpy.ndarray The input signal to be transformed, typically a 1D array representing the time series data. window_size : int, optional The size of the window to apply on the signal for each segment (default is 256). hop_size : int, optional The number of samples to shift the window at each step (default is 128). It determines the overlap between adjacent windows. n_fft : int, optional The number of points for the Fast Fourier Transform (FFT) (default is 512). It should ideally be a power of 2 to optimize FFT computation. Examples -------- >>> signal = np.sin(2 * np.pi * 5 * np.linspace(0, 1, 500)) + np.sin(2 * np.pi * 20 * np.linspace(0, 1, 500)) >>> stft = STFT(signal, window_size=100, hop_size=50, n_fft=128) >>> stft_result = stft.compute_stft() >>> print(stft_result.shape) (65, 9) """ self.signal = signal self.window_size = window_size self.hop_size = hop_size self.n_fft = n_fft self._validate_parameters() def _validate_parameters(self): """ Validate and adjust parameters to ensure they are appropriate for STFT computation. This method checks that window_size, hop_size, and n_fft are positive integers and that the window size does not exceed the length of the signal. Raises ------ ValueError If any of the parameters are not valid (e.g., negative or zero values). """ if self.window_size <= 0 or self.hop_size <= 0 or self.n_fft <= 0: raise ValueError( "Window size, hop size, and n_fft must be positive integers." ) if self.window_size > len(self.signal): raise ValueError("Window size cannot be larger than the signal length.")
[docs] def compute_stft(self): """ OPTIMIZED: Compute the Short-Time Fourier Transform (STFT) of the signal using vectorized operations. The STFT splits the signal into overlapping windows, applies a Hanning window function to reduce spectral leakage, and computes the FFT for each windowed segment. The result is a matrix representing the magnitude and phase of the signal's frequency components over time. Returns ------- stft_matrix : numpy.ndarray A 2D complex-valued array where rows correspond to frequency bins and columns correspond to time frames. Examples -------- >>> signal = np.sin(2 * np.pi * 5 * np.linspace(0, 1, 500)) + np.sin(2 * np.pi * 20 * np.linspace(0, 1, 500)) >>> stft = STFT(signal, window_size=100, hop_size=50, n_fft=128) >>> stft_result = stft.compute_stft() >>> print(stft_result.shape) (65, 9) """ n_windows = 1 + (len(self.signal) - self.window_size) // self.hop_size stft_matrix = np.zeros((self.n_fft // 2 + 1, n_windows), dtype=complex) # OPTIMIZATION: Pre-compute window function window = np.hanning(self.window_size) # OPTIMIZATION: Vectorized windowing and FFT computation for i in range(n_windows): start = i * self.hop_size end = start + self.window_size # OPTIMIZATION: Vectorized windowing windowed_signal = self.signal[start:end] * window # Ensure the windowed signal length matches n_fft for FFT computation if len(windowed_signal) < self.n_fft: windowed_signal = np.pad( windowed_signal, (0, self.n_fft - len(windowed_signal)), mode="constant", ) # OPTIMIZATION: Use optimized FFT fft_result = np.fft.rfft(windowed_signal, n=self.n_fft) stft_matrix[:, i] = fft_result return stft_matrix