"""
Physiological Features Module for Physiological Signal Processing
This module provides comprehensive capabilities for physiological
signal processing including ECG, PPG, EEG, and other vital signs.
Author: vitalDSP Team
Date: 2025-01-27
Version: 1.0.0
Key Features:
- Object-oriented design with comprehensive classes
- Multiple processing methods and functions
- NumPy integration for numerical computations
Examples:
---------
Basic usage:
>>> import numpy as np
>>> from vitalDSP.physiological_features.signal_segmentation import SignalSegmentation
>>> signal = np.random.randn(1000)
>>> ss = SignalSegmentation(signal)
>>> segments = ss.fixed_size_segmentation(segment_size=256)
"""
from sklearn.cluster import KMeans, SpectralClustering
from sklearn.mixture import GaussianMixture
from sklearn.tree import DecisionTreeClassifier
import numpy as np
[docs]
class SignalSegmentation:
"""
A comprehensive class for segmenting physiological signals.
Methods
-------
fixed_size_segmentation : function
Segments the signal into fixed-size segments.
adaptive_segmentation : function
Segments the signal based on adaptive criteria.
threshold_based_segmentation : function
Segments the signal based on a threshold.
variance_based_segmentation : function
Segments the signal based on local variance.
peak_based_segmentation : function
Segments the signal based on detected peaks.
ml_based_segmentation : function
Segments the signal using a machine learning-based approach with multiple default models.
custom_segmentation : function
Allows custom segmentation based on a user-defined function.
"""
def __init__(self, signal):
"""
Initialize the SignalSegmentation class with the signal.
Parameters
----------
signal : numpy.ndarray
The input physiological signal to be segmented.
"""
self.signal = signal
[docs]
def fixed_size_segmentation(self, segment_size):
"""
Segment the signal into fixed-size segments.
Parameters
----------
segment_size : int
The size of each segment.
Returns
-------
segments : list of numpy.ndarray
A list of fixed-size segments.
Examples
--------
>>> signal = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9])
>>> seg = SignalSegmentation(signal)
>>> fixed_segments = seg.fixed_size_segmentation(3)
>>> print(fixed_segments)
[array([1, 2, 3]), array([4, 5, 6]), array([7, 8, 9])]
"""
segments = [
self.signal[i : i + segment_size]
for i in range(0, len(self.signal), segment_size)
]
return segments
[docs]
def adaptive_segmentation(self, adaptive_fn):
"""
Segment the signal based on adaptive criteria.
Parameters
----------
adaptive_fn : callable
A function that defines the adaptive criteria for segmentation. This function should take a segment of the signal as input and return the length of the segment.
Returns
-------
segments : list of numpy.ndarray
A list of adaptively segmented parts of the signal.
Examples
--------
>>> signal = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9])
>>> def adaptive_fn(segment):
... return len(segment) // 2 # Example criterion
>>> seg = SignalSegmentation(signal)
>>> adaptive_segments = seg.adaptive_segmentation(adaptive_fn)
>>> print(adaptive_segments)
[array([1, 2, 3, 4]), array([5, 6, 7, 8, 9])]
"""
segments = []
start_idx = 0
while start_idx < len(self.signal):
# Limit minimum segment size to avoid unnecessary iterations
end_idx = adaptive_fn(self.signal[start_idx:])
end_idx = min(
len(self.signal) - start_idx, end_idx
) # Ensure no index out of range
if end_idx == 0:
break # Avoid infinite loops if adaptive_fn returns 0
segments.append(self.signal[start_idx : start_idx + end_idx])
start_idx += end_idx
return segments
[docs]
def threshold_based_segmentation(self, threshold):
"""
Segment the signal based on a threshold.
Parameters
----------
threshold : float
The threshold value for segmentation. The signal is segmented whenever it exceeds this threshold.
Returns
-------
segments : list of numpy.ndarray
A list of segments where the signal exceeds the threshold.
Examples
--------
>>> signal = np.array([1, 2, 5, 2, 8, 1])
>>> seg = SignalSegmentation(signal)
>>> threshold_segments = seg.threshold_based_segmentation(4)
>>> print(threshold_segments)
[array([5]), array([8])]
"""
segments = []
start_idx = None
for i in range(len(self.signal)):
if self.signal[i] > threshold:
if start_idx is None:
start_idx = i
else:
if start_idx is not None:
segments.append(self.signal[start_idx:i])
start_idx = None
if start_idx is not None:
segments.append(self.signal[start_idx:])
return segments
[docs]
def variance_based_segmentation(self, window_size, variance_threshold):
"""
Segment the signal based on local variance.
Parameters
----------
window_size : int
The size of the window for variance calculation.
variance_threshold : float
The threshold of variance to determine segment boundaries.
Returns
-------
segments : list of numpy.ndarray
A list of segments based on local variance.
Examples
--------
>>> signal = np.array([1, 2, 2, 2, 5, 6, 1, 1, 1, 8])
>>> seg = SignalSegmentation(signal)
>>> variance_segments = seg.variance_based_segmentation(3, 2.0)
>>> print(variance_segments)
[array([1, 2, 2, 2, 5, 6]), array([1, 1, 1, 8])]
"""
from numpy.lib.stride_tricks import as_strided
sig = np.ascontiguousarray(self.signal)
shape = (len(sig) - window_size, window_size)
strides = (sig.strides[0], sig.strides[0])
windows = as_strided(sig, shape=shape, strides=strides, writeable=False)
variances = np.var(windows, axis=1)
segments = []
start_idx = 0
for i in range(1, len(variances)):
if (
variances[i] > variance_threshold
and variances[i - 1] <= variance_threshold
):
segments.append(self.signal[start_idx : i + window_size])
start_idx = i + window_size
if start_idx < len(self.signal):
segments.append(self.signal[start_idx:])
return segments
[docs]
def peak_based_segmentation(self, min_distance=50, height=None):
"""
Segment the signal based on detected peaks.
Parameters
----------
min_distance : int, optional
Minimum distance between consecutive peaks.
height : float or None, optional
Minimum height of peaks to be considered.
Returns
-------
segments : list of numpy.ndarray
A list of segments around detected peaks.
Examples
--------
>>> signal = np.array([1, 2, 1, 2, 1, 2, 1, 8, 1])
>>> seg = SignalSegmentation(signal)
>>> peak_segments = seg.peak_based_segmentation(min_distance=2, height=5)
>>> print(peak_segments)
[array([1, 8, 1])]
"""
peaks = (
np.where(
(self.signal[1:-1] > self.signal[:-2])
& (self.signal[1:-1] > self.signal[2:])
)[0]
+ 1
)
if height is not None:
peaks = peaks[self.signal[peaks] > height]
if min_distance > 1:
peaks = peaks[np.diff(peaks, prepend=0) > min_distance]
all_points = np.concatenate(([0], peaks, [len(self.signal)]))
segments = []
for i in range(len(all_points) - 1):
segments.append(self.signal[all_points[i] : all_points[i + 1]])
return segments
[docs]
def ml_based_segmentation(self, model="change_detection"):
"""
Segment the signal using a machine learning-based approach.
Parameters
----------
model : str, optional
The name of the default model to use. Options include "change_detection", "kmeans", "gmm",
"decision_tree", "dtw", "spectral", "autoencoder". Default is "change_detection".
Returns
-------
segments : list of numpy.ndarray
A list of segments predicted by the model.
Examples
--------
>>> signal = np.array([1, 2, 2, 2, 5, 6, 1, 1, 8, 1])
>>> seg = SignalSegmentation(signal)
>>> ml_segments = seg.ml_based_segmentation(model="kmeans")
>>> print(ml_segments)
"""
if model == "change_detection":
change_points = (
np.where(np.abs(np.diff(self.signal)) > np.std(self.signal))[0] + 1
)
elif model == "kmeans":
n_clusters = 5
kmeans = KMeans(n_clusters=n_clusters)
labels = kmeans.fit_predict(self.signal.reshape(-1, 1))
change_points = np.where(np.diff(labels))[0] + 1
elif model == "gmm":
n_components = 3
gmm = GaussianMixture(n_components=n_components)
gmm.fit(self.signal.reshape(-1, 1))
hidden_states = gmm.predict(self.signal.reshape(-1, 1))
change_points = np.where(np.diff(hidden_states))[0] + 1
elif model == "decision_tree":
from sklearn.tree import DecisionTreeRegressor
dt = DecisionTreeRegressor(max_leaf_nodes=5)
X = np.arange(len(self.signal)).reshape(-1, 1)
dt.fit(X, self.signal)
predictions = dt.predict(X)
change_points = np.where(np.diff(predictions) != 0)[0] + 1
elif model == "dtw":
# Placeholder for a more complex DTW implementation
change_points = np.array([len(self.signal) // 2])
elif model == "spectral":
spectral = SpectralClustering(n_clusters=5, affinity="nearest_neighbors")
labels = spectral.fit_predict(self.signal.reshape(-1, 1))
change_points = np.where(np.diff(labels))[0] + 1
elif model == "autoencoder":
# Placeholder for autoencoder-based segmentation
change_points = np.array([len(self.signal) // 2])
else:
raise ValueError("Unknown model type specified.")
change_points = np.concatenate(([0], change_points, [len(self.signal)]))
segments = [
self.signal[change_points[i] : change_points[i + 1]]
for i in range(len(change_points) - 1)
]
if len(segments) == 0:
return [self.signal]
return segments
[docs]
def custom_segmentation(self, custom_fn):
"""
Segment the signal based on a custom function.
Parameters
----------
custom_fn : callable
A user-defined function that returns segment boundaries. The function should take the entire signal as input and return an array of indices where segments start and end.
Returns
-------
segments : list of numpy.ndarray
A list of segments based on custom criteria.
Examples
--------
>>> signal = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9])
>>> def custom_fn(signal):
... return np.array([0, 3, 6, 9])
>>> seg = SignalSegmentation(signal)
>>> custom_segments = seg.custom_segmentation(custom_fn)
>>> print(custom_segments)
[array([1, 2, 3]), array([4, 5, 6]), array([7, 8, 9])]
"""
segment_boundaries = custom_fn(self.signal)
segments = [
self.signal[segment_boundaries[i] : segment_boundaries[i + 1]]
for i in range(len(segment_boundaries) - 1)
]
return segments