Source code for OBM.ComplexityMeasures

import numpy as np
from lempel_ziv_complexity import lempel_ziv_complexity
from scipy import integrate, stats
import warnings

from OBM._ErrorHandler import _check_shape_, _check_len_ApEn_
from OBM._ResultsClasses import ComplexityMeasuresResults


[docs]class ComplexityMeasures: """ Class that calculates Complexity Features from spo2 time series. Suppose that the data has been preprocessed. :param signal: 1-d array, of shape (N,) where N is the length of the signal CTM_Threshold: Radius of Central Tendency Measure. DFA_Window: Length of window to calculate DFA biomarker. M_Sampen: Embedding dimension to compute SampEn. R_Sampen: Tolerance to compute SampEn. """ def __init__(self, CTM_Threshold=0.25, DFA_Window=20, M_Sampen=3, R_Sampen=0.2): self.CTM_Threshold = CTM_Threshold self.DFA_Window = DFA_Window self.M_Sampen = M_Sampen self.R_Sampen = R_Sampen
[docs] def compute(self, signal) -> ComplexityMeasuresResults: """ :param signal: 1-d array, of shape (N,) where N is the length of the signal :return: ComplexityMeasuresResults class containing the following features: - ApEn: Approximate Entropy. - LZ: Lempel-Ziv complexity. - CTM: Central Tendency Measure. - SampEn: Sample Entropy. - DFA: Detrended Fluctuation Analysis. """ _check_shape_(signal) return ComplexityMeasuresResults(self.__comp_apen(signal), self.__comp_lz(signal), self.__comp_ctm(signal), self.__comp_sampen(signal), self.__comp_dfa(signal))
def __comp_apen(self, signal): """ Compute the approximate entropy, according to the paper Utility of Approximate Entropy From Overnight Pulse Oximetry Data in the Diagnosis of the Obstructive Sleep Apnea Syndrome :param signal: 1-d array, of shape (N,) where N is the length of the signal :return: ApEn """ signal = np.array(signal) signal = signal[np.logical_not(np.isnan(signal))] phi_m = self.__apen(2, signal) phi_m1 = self.__apen(3, signal) with np.errstate(invalid='ignore'): res = phi_m - phi_m1 return res def __apen(self, m, signal): """ Help function of CompApEn """ N = len(signal) r = 0.25 * np.nanstd(signal) _check_len_ApEn_(N, m) C = np.zeros(shape=(N - m + 1)) res = [signal[i:i + m] for i in range(0, N - m + 1)] for i in range(0, N - m + 1): C[i] = np.nansum(self.__dist(res[i], res, r)) / (N - m + 1) phi_m = np.nansum(np.log(C)) / (N - m + 1) return phi_m def __dist(self, window1, window2, r): """ Help function of CompApEn """ window1 = np.array(window1) window2 = np.array(window2) with np.errstate(invalid='ignore'): return np.nanmax(abs(window1 - window2), axis=1) < r def __comp_lz(self, signal): """ Compute lempel-ziv, according to the paper Non-linear characteristics of blood oxygen saturation from nocturnal oximetry for obstructive sleep apnoea detection :param signal: 1-d array, of shape (N,) where N is the length of the signal :return: LZ """ median = np.median(signal) res = [signal[i] > median for i in range(0, len(signal))] byte = [str(int(b is True)) for b in res] return lempel_ziv_complexity(''.join(byte)) def __comp_ctm(self, signal): """ Compute CTM, according to the paper Non-linear characteristics of blood oxygen saturation from nocturnal oximetry for obstructive sleep apnoea detection :param signal: 1-d array, of shape (N,) where N is the length of the signal :return: CTM """ res = 0 for i in range(len(signal) - 2): res += self.__d_ctm(i, self.CTM_Threshold, signal) return res / (len(signal) - 2) def __d_ctm(self, i, p, signal): """ Help function of CompCTM """ if np.sqrt(((signal[i + 2] - signal[i + 1]) ** 2) + ((signal[i + 1] - signal[i]) ** 2)) < p: return 1 return 0 def __comp_sampen(self, signal): """ Compute the Sample Entropy :param signal: 1-d array, of shape (N,) where N is the length of the signal :return: SampEn """ N = len(signal) m = self.M_Sampen r = self.R_Sampen # Split time series and save all templates of length m xmi = np.array([signal[i: i + m] for i in range(N - m)]) xmj = np.array([signal[i: i + m] for i in range(N - m + 1)]) # Save all matches minus the self-match, compute B with np.errstate(invalid='ignore'): B = np.sum([np.sum(np.abs(xmii - xmj).max(axis=1) <= r) - 1 for xmii in xmi]) # Similar for computing A m += 1 xm = np.array([signal[i: i + m] for i in range(N - m + 1)]) with np.errstate(invalid='ignore'): A = np.sum([np.sum(np.abs(xmi - xm).max(axis=1) <= r) - 1 for xmi in xm]) # Return SampEn return -np.log(A / B) def __comp_dfa(self, signal): """ Compute DFA :param signal: 1-d array, of shape (N,) where N is the length of the signal :return: DFA """ n = self.DFA_Window y = integrate.cumtrapz(signal - np.nanmean(signal)) least_square = np.zeros(len(y)) i = 0 while i < len(y): if i + n > len(y): n = len(y) - i if n == 1: least_square[-1] = y[-1] break x = np.array(range(0, n)) with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) slope, intercept, _, _, _ = stats.linregress(x, y[i:i + n]) least_square[i:i + n] = slope * x + intercept i += n return np.sqrt(np.nansum((y - least_square) ** 2) / len(y))