Source code for OBM.OverallGeneralMeasures
import numpy as np
import warnings
from OBM._ErrorHandler import _check_shape_, _check_window_delta_
from OBM._ResultsClasses import OverallGeneralMeasuresResult
[docs]class OverallGeneralMeasures:
"""
Class that calculates Overall General Features from spo2 time series.
Suppose that the data has been preprocessed.
:param ZC_Baseline: Baseline for calculating number of zero-crossing points.
:param percentile: Percentile to perform. For example, for percentile 1, the argument should be 1
:param M_Threshold: Percentage of the signal M_Threshold % below median oxygen saturation. Typically use 1,2 or 5
"""
def __init__(self, ZC_Baseline=None, percentile=1, M_Threshold=2, DI_Window=12):
self.ZC_Baseline = ZC_Baseline
self.percentile = percentile
self.M_Threshold = M_Threshold
self.DI_Window = DI_Window
[docs] def compute(self, signal) -> OverallGeneralMeasuresResult:
"""
:param signal: 1-d array, of shape (N,) where N is the length of the signal
:return:
OveralGeneralMeasuresResult class containing the following features:
- AV: Average of the signal.
- MED: Median of the signal.
- Min: Minimum value of the signal.
- SD: Std of the signal.
- RG: SpO2 range (difference between the max and min value).
- P: percentile.
- M: Percentage of the signal x% below median oxygen saturation.
- ZC: Number of zero-crossing points.
- DI: Delta Index.
"""
_check_shape_(signal)
if self.ZC_Baseline is None:
self.ZC_Baseline = np.nanmean(signal)
return OverallGeneralMeasuresResult(np.nanmean(signal), np.nanmedian(signal), np.nanmin(signal),
np.nanstd(signal),
self.__compute_range(signal),
self.__apply_percentile(signal),
self.__below_median(signal),
self.__num_zc(signal),
self.__delta_index(signal))
def __apply_percentile(self, signal):
"""
Apply percentile to the spo2 signal
:param signal: 1-d array, of shape (N,) where N is the length of the signal
:return: the percentile
"""
return np.nanpercentile(signal, self.percentile)
def __below_median(self, signal):
"""
Compute the below median biomarker from the spo2 signal
:param signal: 1-d array, of shape (N,) where N is the length of the signal
:return: the BM biomarker
"""
baseline = np.nanmedian(signal) - self.M_Threshold
with np.errstate(invalid='ignore'):
return 100 * (np.nansum(signal < baseline) / len(signal))
def __compute_range(self, signal):
"""
Compute the range biomarker from the spo2 signal
:param signal: 1-d array, of shape (N,) where N is the length of the signal
:return: the R biomarker
"""
return np.nanmax(signal) - np.nanmin(signal)
def __num_zc(self, signal):
"""
Compute the numZC biomarker from the spo2 signal
:param signal: 1-d array, of shape (N,) where N is the length of the signal
:return: the ZC biomarker
"""
numZC_count = 0
baseline = self.ZC_Baseline
for idx_signal in range(2, len(signal) - 1):
if signal[idx_signal] == baseline:
if (signal[idx_signal - 1] <= baseline) & (signal[idx_signal + 1] >= baseline):
numZC_count += 1
if (signal[idx_signal - 1] >= baseline) & (signal[idx_signal + 1] <= baseline):
numZC_count += 1
if (signal[idx_signal - 1] < baseline) & (signal[idx_signal] > baseline):
numZC_count += 1
if (signal[idx_signal - 1] > baseline) & (signal[idx_signal] < baseline):
numZC_count += 1
return numZC_count
def __delta_index(self, signal):
"""
Compute the delta index biomarker from the spo2 signal
:param signal: 1-d array, of shape (N,) where N is the length of the signal
:return: the DI biomarker
"""
_check_window_delta_(len(signal), self.DI_Window)
signal_splitted = [signal[i:i + self.DI_Window] for i in range(0, len(signal), self.DI_Window)]
if len(signal_splitted[-1]) != self.DI_Window:
signal_splitted.pop()
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
mean_window = np.nanmean(signal_splitted, axis=1)
diff = abs(mean_window - np.roll(mean_window, 1))
return np.nanmean(diff[1:])