Source code for vitalDSP.utils.data_processing.synthesize_data

"""
Utility Functions Module for Physiological Signal Processing

This module provides comprehensive capabilities for physiological
signal processing including ECG, PPG, EEG, and other vital signs.

Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0

Key Features:
- Object-oriented design with comprehensive classes
- Multiple processing methods and functions
- NumPy integration for numerical computations
- SciPy integration for advanced signal processing
- Interactive visualization capabilities

Examples:
--------
Basic usage:
    >>> import numpy as np
    >>> from vitalDSP.data_processing.synthesize_data import SynthesizeData
    >>> signal = np.random.randn(1000)
    >>> processor = SynthesizeData(signal)
    >>> result = processor.process()
    >>> print(f'Processing result: {result}')
"""

import numpy as np
from scipy.interpolate import interp1d

# from scipy.fft import ifft
# from scipy.signal import resample
import matplotlib.pyplot as plt

# from scipy.integrate import odeint
from scipy.integrate import solve_ivp

# from vitalDSP.utils.signal_processing.peak_detection import PeakDetection
# from vitalDSP.utils.config_utilities.common import ecg_detect_peaks


[docs] def generate_sinusoidal(frequency, sampling_rate, duration, amplitude=1.0, phase=0.0): """ Generate a sinusoidal wave. Parameters ---------- frequency : float Frequency of the sinusoid in Hz. sampling_rate : float Sampling rate in Hz. duration : float Duration of the signal in seconds. amplitude : float, optional (default=1.0) Amplitude of the sinusoid. phase : float, optional (default=0.0) Phase shift of the sinusoid in radians. Returns ------- signal : numpy.ndarray The generated sinusoidal signal. Examples -------- >>> signal = generate_sinusoidal(frequency=1.0, sampling_rate=100.0, duration=5.0) >>> print(signal) """ t = np.arange(0, duration, 1 / sampling_rate) signal = amplitude * np.sin(2 * np.pi * frequency * t + phase) return signal
[docs] def generate_square_wave( frequency, sampling_rate, duration, amplitude=1.0, duty_cycle=0.5 ): """ Generate a square wave. Parameters ---------- frequency : float Frequency of the square wave in Hz. sampling_rate : float Sampling rate in Hz. duration : float Duration of the signal in seconds. amplitude : float, optional (default=1.0) Amplitude of the square wave. duty_cycle : float, optional (default=0.5) Duty cycle of the square wave (fraction of one period in which the signal is high). Returns ------- signal : numpy.ndarray The generated square wave signal. Examples -------- >>> signal = generate_square_wave(frequency=1.0, sampling_rate=100.0, duration=5.0) >>> print(signal) """ t = np.arange(0, duration, 1 / sampling_rate) signal = amplitude * np.where(np.mod(t * frequency, 1) < duty_cycle, 1.0, -1.0) return signal
[docs] def generate_noisy_signal(base_signal, noise_level=0.1): """ Add Gaussian noise to a base signal. Parameters ---------- base_signal : numpy.ndarray The input signal to which noise will be added. noise_level : float, optional (default=0.1) The standard deviation of the Gaussian noise. Returns ------- noisy_signal : numpy.ndarray The signal with added Gaussian noise. Examples -------- >>> base_signal = generate_sinusoidal(frequency=1.0, sampling_rate=100.0, duration=5.0) >>> noisy_signal = generate_noisy_signal(base_signal, noise_level=0.2) >>> print(noisy_signal) """ noise = np.random.normal(0, noise_level, len(base_signal)) noisy_signal = base_signal + noise return noisy_signal
[docs] def generate_ecg_signal( sfecg=256, N=None, duration=30, Anoise=0.0, hrmean=60, hrstd=1.0, lfhfratio=0.5, sfint=512, ti=None, ai=None, bi=None, ): """ Generate a synthetic ECG signal using the dynamical model from McSharry et al. Parameters ---------- sfecg : int, optional ECG sampling frequency in Hz (default is 256). N : int, optional Approximate number of heartbeats (default is 60). duration : int, optional Approximate number of seconds (default is 30). Anoise : float, optional Amplitude of additive uniformly distributed measurement noise (default is 0). hrmean : float, optional Mean heart rate in beats per minute (default is 60). hrstd : float, optional Standard deviation of heart rate in beats per minute (default is 1). lfhfratio : float, optional Low frequency to high frequency ratio (default is 0.5). sfint : int, optional Internal sampling frequency in Hz (default is 512). ti : list, optional Angles of extrema in degrees for PQRST (default is [-70, -15, 0, 15, 100]). ai : list, optional Z-position of extrema for PQRST (default is [1.2, -5, 30, -7.5, 0.75]). bi : list, optional Gaussian width of peaks for PQRST (default is [0.25, 0.1, 0.1, 0.1, 0.4]). Returns ------- s : ndarray Generated ECG signal. ipeaks : ndarray Labels for PQRST peaks. Examples -------- >>> signal = generate_ecg_signal(sfecg=256, N=256, Anoise=0.01, hrmean=70, sfint=512) >>> print(signal) """ if ti is None: ti = [-70, -15, 0, 15, 100] if ai is None: ai = [1.2, -5, 30, -7.5, 0.75] if bi is None: bi = [0.25, 0.1, 0.1, 0.1, 0.4] ti = np.array(ti) * np.pi / 180.0 hrfact = np.sqrt(hrmean / 60.0) hrfact2 = np.sqrt(hrfact) bi = hrfact * np.array(bi) ti = np.array([hrfact2, hrfact, 1, hrfact, hrfact2]) * ti # q = round(sfint / sfecg) if sfint % sfecg != 0: raise ValueError("sfint must be an integer multiple of sfecg") flo = 0.1 fhi = 0.25 flostd = 0.01 fhistd = 0.01 # calculate time scales for rr and total output sampfreqrr = 1 trr = 1 / sampfreqrr rrmean = 60 / hrmean if duration is not None: Nrr = int(duration * hrmean / 60.0) else: Nrr = 2 ** (np.ceil(np.log2(N * rrmean / trr))) rr0 = rrprocess( flo, fhi, flostd, fhistd, lfhfratio, hrmean, hrstd, sampfreqrr, # 1, Nrr, # 2 ** int(np.ceil(np.log2(N * 60.0 / hrmean))), ) # upsample rr time series from 1 Hz to sfint Hz rr = interp1d(np.linspace(0, len(rr0) - 1, len(rr0)), rr0, kind="cubic")( np.linspace(0, len(rr0) - 1, len(rr0) * sfint) ) dt = 1.0 / sfint rrn = np.zeros_like(rr) tecg = 0 i = 0 while i < len(rr): tecg = tecg + rr[i] ip = int(np.round(tecg / dt)) rrn[i : ip + 1] = rr[i] i = ip + 1 Nt = ip x0 = [1, 0, 0.04] tspan = np.arange(0, (Nt - 1) * dt, dt) args = (rrn, sfint, ti, ai, bi) # solv_ode = solve_ivp(ordinary_differential_equation, [tspan[0], tspan[-1]], # x0, t_eval=np.arange(20.5, 21.5, 0.00001), args=args) solv_ode = solve_ivp( ordinary_differential_equation, [tspan[0], tspan[-1]], x0, t_eval=np.arange(tspan[0], tspan[-1], 1 / sfecg), args=args, ) X = solv_ode.y z = X[2] z = (z - np.min(z)) * 1.6 / (np.max(z) - np.min(z)) - 0.4 s = z + Anoise * (2 * np.random.rand(len(z)) - 1) # detector = PeakDetection(s, method="ecg_r_peak", # **{"distance":50,"window_size":7,"threshold_factor":1.8}) # ipeaks = detector.detect_peaks() return s
[docs] def ordinary_differential_equation( t, x_equations, rr=None, sfint=None, ti=None, ai=None, bi=None ): x = x_equations[0] y = x_equations[1] z = x_equations[2] ta = np.arctan2(y, x) r0 = 1 a0 = 1.0 - np.sqrt(x**2 + y**2) / r0 ip = int(1 + np.floor(t * sfint)) try: w0 = 2 * np.pi / rr[ip] except Exception: w0 = 2 * np.pi / rr[-1] fresp = 0.25 zbase = 0.005 * np.sin(2 * np.pi * fresp * t) dx1dt = a0 * x - w0 * y dx2dt = a0 * y + w0 * x dti = np.fmod(ta - ti, 2 * np.pi) dx3dt = -np.sum(ai * dti * np.exp(-0.5 * np.divide(dti, bi) ** 2)) dx3dt = dx3dt - 1.0 * (z - zbase) return [dx1dt, dx2dt, dx3dt]
[docs] def rrprocess(flo, fhi, flostd, fhistd, lfhfratio, hrmean, hrstd, sfrr, n): w1 = 2 * np.pi * flo w2 = 2 * np.pi * fhi c1 = 2 * np.pi * flostd c2 = 2 * np.pi * fhistd sig2 = 1 sig1 = lfhfratio rrmean = 60 / hrmean rrstd = 60 * hrstd / (hrmean * hrmean) """ Generating RR-intervals which have a bimodal power spectrum consisting of the sum of two Gaussian distributions """ df = sfrr / n w = np.arange(0, n).T * 2 * np.pi * df dw1 = w - w1 dw2 = w - w2 # Resample Hw to ensure it has the correct length Hw1 = sig1 * np.exp(-0.5 * (dw1 / c1) ** 2) / np.sqrt(2 * np.pi * np.power(c1, 2)) Hw2 = sig2 * np.exp(-0.5 * (dw2 / c2) ** 2) / np.sqrt(2 * np.pi * np.power(c2, 2)) Hw = Hw1 + Hw2 """ An RR-interval time series T(t) with power spectrum is S(f) generated by taking the inverse Fourier transform of a sequence of complex numbers with amplitudes sqrt(S(f)) and phases which are randomly distributed between 0 and 2pi """ # Ensure Hw_resampled is non-negative before taking the square root Hw0_half = np.array(Hw[0 : int(n / 2)]) Hw0 = np.append(Hw0_half, np.flip(Hw0_half)) Sw = (sfrr / 2) * (Hw0**0.5) # Generate phase with correct length ph0 = 2 * np.pi * np.random.rand(int(n / 2) - 1, 1) # ph0 = 2 * np.pi * 0.001*np.arange(127).reshape(-1,1) ph = np.vstack((0, ph0, 0, -np.flip(ph0))) # create the complex number SwC = np.multiply(Sw.reshape(-1, 1), np.exp(1j * ph)) inverse_res = np.fft.ifft(SwC.reshape(-1)) x = (1 / n) * np.real(inverse_res) """ By multiplying this time series by an appropriate scaling constant and adding an offset value, the resulting time series can be given any required mean and standard deviation """ xstd = np.std(x) ratio = rrstd / xstd # Handle potential NaN values in rr rr = rrmean + x * ratio return rr
[docs] def generate_resp_signal(sampling_rate, duration, frequency=0.2, amplitude=0.5): """ Generate a synthetic respiratory signal. Parameters ---------- sampling_rate : float Sampling rate in Hz. duration : float Duration of the signal in seconds. frequency : float, optional (default=0.2) Frequency of the respiratory signal in Hz. amplitude : float, optional (default=0.5) Amplitude of the respiratory signal. Returns ------- resp_signal : numpy.ndarray The generated synthetic respiratory signal. Examples -------- >>> resp_signal = generate_resp_signal(sampling_rate=1000.0, duration=10.0) >>> print(resp_signal) """ t = np.arange(0, duration, 1 / sampling_rate) resp_signal = amplitude * np.sin(2 * np.pi * frequency * t) return resp_signal
[docs] def generate_synthetic_ppg_reversed( duration=10, sampling_rate=1000, heart_rate=60, noise_level=0.01, display=False ): """ Generate a synthetic PPG signal. Parameters ---------- duration : float, optional Duration of the signal in seconds (default is 10). sampling_rate : int, optional Sampling rate in Hz (default is 1000). heart_rate : float, optional Heart rate in beats per minute (default is 60). noise_level : float, optional Standard deviation of the Gaussian noise to be added (default is 0.01). display : bool, optional If True, displays the generated PPG signal (default is False). Returns ------- time : numpy.ndarray Array of time values. ppg_signal : numpy.ndarray The generated synthetic PPG signal. Example ------- >>> time, ppg_signal = generate_synthetic_ppg(duration=10, heart_rate=60, display=True) """ # Calculate the number of heartbeats and time array num_beats = int(duration * heart_rate / 60) time = np.linspace(0, duration, int(sampling_rate * duration)) # Define PPG pulse using a combination of Gaussian functions def ppg_pulse(t): # Parameters for a typical PPG pulse waveform amplitude = 1.0 width_systolic = 0.1 # Width of the systolic peak width_diastolic = 0.2 # Width of the diastolic component dicrotic_notch_depth = 0.4 # Depth of the dicrotic notch dicrotic_notch_delay = 0.1 # Time delay for the dicrotic notch # Systolic peak systolic_peak = amplitude * np.exp(-((t - 0.15) ** 2) / (2 * width_systolic**2)) # Dicrotic notch and wave dicrotic_wave = dicrotic_notch_depth * np.exp( -((t - (0.15 + dicrotic_notch_delay)) ** 2) / (2 * width_diastolic**2) ) return systolic_peak - dicrotic_wave # Generate one cycle of the PPG signal cycle_length = int(sampling_rate * 60 / heart_rate) ppg_cycle = ppg_pulse(np.linspace(0, 1, cycle_length)) # Tile the cycle to create the full PPG signal ppg_signal = np.tile(ppg_cycle, num_beats) ppg_signal = ppg_signal[: len(time)] # Trim to the exact length # Add Gaussian noise to simulate artifacts noise = np.random.normal(0, noise_level, len(ppg_signal)) ppg_signal += noise # Display the signal if required if display: plt.figure(figsize=(10, 4)) plt.plot(time, ppg_signal) plt.title("Synthetic PPG Signal") plt.xlabel("Time (s)") plt.ylabel("Amplitude") if display: plt.show() return time, ppg_signal
[docs] def generate_synthetic_ppg( duration=10, sampling_rate=1000, heart_rate=60, noise_level=0.01, diastolic_amplitude=0.8, diastolic_width=0.15, dicrotic_notch_depth=0.6, dicrotic_notch_delay=0.2, randomize=False, display=False, ): """ Generate a synthetic PPG signal with adjustable parameters. Parameters ---------- duration : float, optional Duration of the signal in seconds (default is 10). sampling_rate : int, optional Sampling rate in Hz (default is 1000). heart_rate : float, optional Heart rate in beats per minute (default is 60). noise_level : float, optional Standard deviation of the Gaussian noise to be added (default is 0.01). diastolic_amplitude : float, optional Amplitude of the diastolic peak relative to the systolic peak (default is 0.8). diastolic_width : float, optional Width of the diastolic peak (default is 0.15). dicrotic_notch_depth : float, optional Depth of the dicrotic notch (default is 0.6). dicrotic_notch_delay : float, optional Time delay for the diastolic peak (default is 0.2). randomize : bool, optional If True, introduces randomness in the diastolic amplitude, width, and dicrotic notch depth (default is False). display : bool, optional If True, displays the generated PPG signal (default is False). Returns ------- time : numpy.ndarray Array of time values. ppg_signal : numpy.ndarray The generated synthetic PPG signal. Example ------- >>> time, ppg_signal = generate_synthetic_ppg(duration=10, heart_rate=60, randomize=True, display=True) """ # Apply randomization if requested if randomize: diastolic_amplitude += np.random.uniform(-0.1, 0.1) * diastolic_amplitude diastolic_width += np.random.uniform(-0.05, 0.05) * diastolic_width dicrotic_notch_depth += np.random.uniform(-0.1, 0.1) * dicrotic_notch_depth dicrotic_notch_delay += np.random.uniform(-0.05, 0.05) * dicrotic_notch_delay # Calculate the number of heartbeats and time array num_beats = int(duration * heart_rate / 60) time = np.linspace(0, duration, int(sampling_rate * duration)) # Define PPG pulse using a combination of Gaussian functions def ppg_pulse(t): amplitude = 1.0 width_systolic = 0.08 # Width of the systolic peak # Systolic peak (tallest peak) systolic_peak = amplitude * np.exp(-((t - 0.2) ** 2) / (2 * width_systolic**2)) # Diastolic peak (adjustable) diastolic_peak = ( diastolic_amplitude * amplitude * np.exp( -((t - (0.2 + dicrotic_notch_delay)) ** 2) / (2 * diastolic_width**2) ) ) return systolic_peak + diastolic_peak # Generate one cycle of the PPG signal cycle_length = int(sampling_rate * 60 / heart_rate) ppg_cycle = ppg_pulse(np.linspace(0, 1, cycle_length)) # Tile the cycle to create the full PPG signal ppg_signal = np.tile(ppg_cycle, num_beats) ppg_signal = ppg_signal[: len(time)] # Trim to the exact length # Add Gaussian noise to simulate artifacts noise = np.random.normal(0, noise_level, len(ppg_signal)) ppg_signal += noise # Display the signal if required if display: plt.figure(figsize=(10, 4)) plt.plot(time, ppg_signal) plt.title("Synthetic PPG Signal") plt.xlabel("Time (s)") plt.ylabel("Amplitude") if display: plt.show() return time, ppg_signal
[docs] class SynthesizeData: """ A class that provides access to all signal synthesis functions. This class wraps the individual functions for easier access and organization. """
[docs] @staticmethod def generate_sinusoidal( frequency, sampling_rate, duration, amplitude=1.0, phase=0.0 ): """Generate a sinusoidal wave.""" return generate_sinusoidal(frequency, sampling_rate, duration, amplitude, phase)
[docs] @staticmethod def generate_square_wave( frequency, sampling_rate, duration, amplitude=1.0, duty_cycle=0.5 ): """Generate a square wave.""" return generate_square_wave( frequency, sampling_rate, duration, amplitude, duty_cycle )
[docs] @staticmethod def generate_noisy_signal(base_signal, noise_level=0.1): """Add Gaussian noise to a base signal.""" return generate_noisy_signal(base_signal, noise_level)
[docs] @staticmethod def generate_ecg_signal( duration=10, sampling_rate=1000, heart_rate=60, noise_level=0.01, qrs_width=0.08, t_wave_amplitude=0.3, p_wave_amplitude=0.25, randomize=False, display=False, ): """Generate a synthetic ECG signal.""" return generate_ecg_signal( duration, sampling_rate, heart_rate, noise_level, qrs_width, t_wave_amplitude, p_wave_amplitude, randomize, display, )
[docs] @staticmethod def generate_resp_signal(sampling_rate, duration, frequency=0.2, amplitude=0.5): """Generate a synthetic respiratory signal.""" return generate_resp_signal(sampling_rate, duration, frequency, amplitude)
[docs] @staticmethod def generate_synthetic_ppg_reversed( duration=10, sampling_rate=1000, heart_rate=60, noise_level=0.01, diastolic_amplitude=0.8, diastolic_width=0.15, dicrotic_notch_depth=0.6, dicrotic_notch_delay=0.2, randomize=False, display=False, ): """Generate a synthetic PPG signal with reversed parameters.""" return generate_synthetic_ppg_reversed( duration, sampling_rate, heart_rate, noise_level, diastolic_amplitude, diastolic_width, dicrotic_notch_depth, dicrotic_notch_delay, randomize, display, )
[docs] @staticmethod def generate_synthetic_ppg( duration=10, sampling_rate=1000, heart_rate=60, noise_level=0.01, diastolic_amplitude=0.8, diastolic_width=0.15, dicrotic_notch_depth=0.6, dicrotic_notch_delay=0.2, randomize=False, display=False, ): """Generate a synthetic PPG signal.""" return generate_synthetic_ppg( duration, sampling_rate, heart_rate, noise_level, diastolic_amplitude, diastolic_width, dicrotic_notch_depth, dicrotic_notch_delay, randomize, display, )
[docs] @staticmethod def generate_ppg_data(duration=50, fs=1000): """Generate synthetic PPG data for testing.""" t = np.linspace(0, duration, int(duration * fs)) # Simple PPG-like signal signal = np.sin(2 * np.pi * 1.2 * t) * np.exp(-0.1 * t) + 0.5 * np.sin( 2 * np.pi * 0.8 * t ) signal += 0.1 * np.random.randn(len(signal)) return t, signal