Source code for pobm.obm.complex

import numpy as np
from lempel_ziv_complexity import lempel_ziv_complexity
from scipy import integrate, stats
import warnings

from pobm._ErrorHandler import _check_shape_, _check_len_ApEn_, WrongParameter
from pobm._ResultsClasses import ComplexityMeasuresResults


[docs]class ComplexityMeasures: """ Class that calculates complexity features from SpO2 time series. """ def __init__(self, CTM_Threshold: float = 0.25, DFA_Window: int = 20, M_Sampen: int = 3, R_Sampen: float = 0.2, M_ApEn: int = 2, R_ApEn: float = 0.25): """ :param CTM_Threshold: Radius of Central Tendency Measure. :type CTM_Threshold: float, optional :param DFA_Window: Length of window to calculate DFA biomarker. :type DFA_Window: int, optional :param M_Sampen: Embedding dimension to compute SampEn. :type M_Sampen: int, optional :param R_Sampen: Tolerance to compute SampEn. :type R_Sampen: float, optional :param M_ApEn: Embedding dimension to compute ApEn. :type M_ApEn: int, optional :param R_ApEn: Tolerance to compute ApEn. :type R_ApEn: float, optional """ if DFA_Window <= 0: raise WrongParameter("DFA_Window should be strictly positive") if M_Sampen <= 0: raise WrongParameter("M_Sampen should be strictly positive") if R_Sampen <= 0: raise WrongParameter("R_Sampen should be strictly positive") if M_ApEn <= 0: raise WrongParameter("DFA_Window should be strictly positive") if R_ApEn <= 0: raise WrongParameter("R_ApEn should be strictly positive") self.CTM_Threshold = CTM_Threshold self.DFA_Window = DFA_Window self.M_Sampen = M_Sampen self.R_Sampen = R_Sampen self.M_ApEn = M_ApEn self.R_ApEn = R_ApEn
[docs] def compute(self, signal) -> ComplexityMeasuresResults: """ Computes all the biomarkers of this category. :param signal: 1-d array, of shape (N,) where N is the length of the signal :return: ComplexityMeasuresResults class containing the following features: * ApEn: Approximate Entropy. * LZ: Lempel-Ziv complexity. * CTM: Central Tendency Measure. * SampEn: Sample Entropy. * DFA: Detrended Fluctuation Analysis. Example: .. code-block:: python from pobm.obm.complex import ComplexityMeasures # Initialize the class with the desired parameters complexity_class = ComplexityMeasures(CTM_Threshold=0.25, DFA_Window=20, M_Sampen=3, R_Sampen=0.2, M_ApEn=2, R_ApEn=0.25) # Compute the biomarkers results_complexity = complexity_class.compute(spo2_signal) """ _check_shape_(signal) warnings.filterwarnings("ignore", category=RuntimeWarning) return ComplexityMeasuresResults(self.comp_apen(signal), self.comp_lz(signal), self.comp_ctm(signal), self.comp_sampen(signal), self.comp_dfa(signal))
[docs] def comp_apen(self, signal): """ Compute the approximate entropy, according to [1]_ :param signal: 1-d array, of shape (N,) where N is the length of the signal :return: ApEn (float) .. [1] Pincus, S. M. Approximate entropy as a measure of system complexity. Proc. Natl. Acad. Sci. U. S. A. 88, 2297–2301 (1991). """ signal = np.array(signal) signal = signal[np.logical_not(np.isnan(signal))] phi_m = self.__apen(self.M_ApEn, signal) phi_m1 = self.__apen(self.M_ApEn + 1, signal) with np.errstate(invalid='ignore'): res = phi_m - phi_m1 return res
def __apen(self, m, signal): """ Help function of CompApEn """ N = len(signal) r = self.R_ApEn _check_len_ApEn_(N, m) C = np.zeros(shape=(N - m + 1)) res = [signal[i:i + m] for i in range(0, N - m + 1)] for i in range(0, N - m + 1): C[i] = np.nansum(self.__dist(res[i], res, r)) / (N - m + 1) phi_m = np.nansum(np.log(C)) / (N - m + 1) return phi_m def __dist(self, window1, window2, r): """ Help function of CompApEn """ window1 = np.array(window1) window2 = np.array(window2) with np.errstate(invalid='ignore'): return np.nanmax(abs(window1 - window2), axis=1) < r
[docs] def comp_lz(self, signal, dual_quantization=True): """ Compute Lempel-Ziv, according to [2]_ :param signal: 1-d array, of shape (N,) where N is the length of the signal :param dual_quantization: if True, the signal is converted into a binary signal, according to the original algorithm [2]_. If False, the signal is quantized in 4 levels. :return: LZ (float) .. [2] Lempel, A. & Ziv, J. On the Complexity of Finite Sequences. IEEE Trans. Inf. Theory 22, 75–81 (1976). """ signal = np.array(signal) if dual_quantization is True: median = np.nanmedian(signal) bool_median = signal > median byte = [str(int(b == np.bool(True))) for b in bool_median] else: quantile1, median, quantile3 = np.quantile(signal, 0.25), np.quantile(signal, 0.5), np.quantile(signal, 0.75) byte = np.empty(shape=signal.shape, dtype=str) byte[signal <= quantile1] = "0" byte[(signal > quantile1) & (signal <= median)] = "1" byte[(signal > median) & (signal <= quantile3)] = "2" byte[signal > quantile3] = "3" sequence = ''.join(byte) sub_strings = set() ind = 0 inc = 1 while True: if ind + inc > len(sequence): break sub_str = sequence[ind: ind + inc] if sub_str in sub_strings: inc += 1 else: sub_strings.add(sub_str) ind += inc inc = 1 return len(sub_strings)
[docs] def comp_ctm(self, signal): """ Compute CTM, according to [3]_ :param signal: 1-d array, of shape (N,) where N is the length of the signal :return: CTM (float) .. [3] Cohen, M. E., Hudson, D. L. & Deedwania, P. C. Applying continuous chaotic modeling to cardiac signal analysis. IEEE Eng. Med. Biol. Mag. 15, 97–102 (1996). """ res = 0 for i in range(len(signal) - 2): res += self.__d_ctm(i, self.CTM_Threshold, signal) return res / (len(signal) - 2)
def __d_ctm(self, i, p, signal): """ Help function of CompCTM """ if np.sqrt(((signal[i + 2] - signal[i + 1]) ** 2) + ((signal[i + 1] - signal[i]) ** 2)) < p: return 1 return 0
[docs] def comp_sampen(self, signal): """ Compute the sample entropy, according to [4]_ :param signal: 1-d array, of shape (N,) where N is the length of the signal :return: SampEn (float) .. [4] Richman, J. S. & Moorman, J. R. Physiological time-series analysis using approximate entropy and sample entropy. Am J Physiol-Heart C 278, H2039–H2049 (2000). """ N = len(signal) m = self.M_Sampen r = self.R_Sampen # Split time series and save all templates of length m xmi = np.array([signal[i: i + m] for i in range(N - m)]) xmj = np.array([signal[i: i + m] for i in range(N - m + 1)]) # Save all matches minus the self-match, compute B with np.errstate(invalid='ignore'): B = np.sum([np.sum(np.abs(xmii - xmj).max(axis=1) <= r) - 1 for xmii in xmi]) # Similar for computing A m += 1 xm = np.array([signal[i: i + m] for i in range(N - m + 1)]) with np.errstate(invalid='ignore'): A = np.sum([np.sum(np.abs(xmi - xm).max(axis=1) <= r) - 1 for xmi in xm]) # Return SampEn return -np.log(A / B)
[docs] def comp_dfa(self, signal): """ Compute DFA, Detrended Fluctuation Analysis according to [5]_ :param signal: 1-d array, of shape (N,) where N is the length of the signal :return: DFA (float) .. [5] Peng, C. ‐K., Havlin, S., Stanley, H. E. & Goldberger, A. L. Quantification of scaling exponents and crossover phenomena in nonstationary heartbeat time series. Chaos An Interdiscip. J. Nonlinear Sci. 5, 82–87 (1995). """ n = self.DFA_Window y = integrate.cumtrapz(signal - np.nanmean(signal)) least_square = np.zeros(len(y)) i = 0 while i < len(y): if i + n > len(y): n = len(y) - i if n == 1: least_square[-1] = y[-1] break x = np.array(range(0, n)) with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) slope, intercept, _, _, _ = stats.linregress(x, y[i:i + n]) least_square[i:i + n] = slope * x + intercept i += n return np.sqrt(np.nansum((y - least_square) ** 2) / len(y))