import numpy as np
from scipy.signal import hamming, welch
import warnings
from pobm._ErrorHandler import _check_shape_, _check_fragment_PRSA_, WrongParameter
from pobm._ResultsClasses import PRSAResults, PSDResults
[docs]class PRSAMeasures:
"""
Class that calculates PRSA features from SpO2 time series.
"""
def __init__(self, PRSA_Window: int = 10, K_AC: int = 2):
"""
:param PRSA_Window: Fragment duration of PRSA.
:type PRSA_Window: int, optional
:param K_AC: Number of values to shift when computing autocorrelation
:type K_AC: int, optional
"""
if PRSA_Window <= 0:
raise WrongParameter("DI_Window should be strictly positive")
self.PRSA_Window = PRSA_Window
self.K_AC = K_AC
[docs] def compute(self, signal) -> PRSAResults:
"""
Computes all the biomarkers of this category.
:param signal: 1-d array, of shape (N,) where N is the length of the signal
:return: PRSAResults class containing the following features:
* PRSAc: PRSA capacity.
* PRSAad: PRSA amplitude difference.
* PRSAos: PRSA overall slope.
* PRSAsb: PRSA slope before the anchor point.
* PRSAsa: PRSA slope after the anchor point.
* AC: Autocorrelation.
Example:
.. code-block:: python
from pobm.obm.periodicity import PRSAMeasures
# Initialize the class with the desired parameters
prsa_class = PRSAMeasures(PRSA_Window=10, K_AC=2)
# Compute the biomarkers
results_PRSA = prsa_class.compute(spo2_signal)
"""
_check_shape_(signal)
warnings.filterwarnings("ignore", category=RuntimeWarning)
d = self.PRSA_Window
_check_fragment_PRSA_(d)
anchor_points = []
anchor_found = False
for i in range(len(signal)):
if i < d:
continue
if len(signal) - i < d:
continue
if signal[i - 1] > signal[i]:
anchor_found = True
anchor_points.append(signal[i - d:i + d])
if anchor_found is False:
PRSA_features = PRSAResults(0, 0, 0, 0, 0, np.correlate(signal, signal, "same")[self.K_AC])
else:
anchor_points = np.array(anchor_points)
windows = np.zeros(2 * d)
for i in range(2 * d):
windows[i] = np.nansum(anchor_points[:, i]) / len(anchor_points)
PRSA_features = PRSAResults((windows[d] + windows[d + 1] - windows[d - 1] - windows[d - 2]) / 4,
np.nanmax(windows) - np.nanmin(windows),
float(np.polyfit(range(2 * d), windows, 1)[0]),
float(np.polyfit(range(d), windows[0:d], 1)[0]),
float(np.polyfit(range(d), windows[d:], 1)[0]),
np.correlate(windows, windows, "same")[self.K_AC])
return PRSA_features
[docs]class PSDMeasures:
"""
Class that calculates PSD features from SpO2 time series.
"""
def __init__(self, frequency_low_threshold: float = 0.014, frequency_high_threshold: float = 0.033):
"""
:param frequency_low_threshold: Low threshold for the PSD_band biomarker.
:type frequency_low_threshold: float, optional
:param frequency_high_threshold: High threshold for the PSD_band biomarker.
:type frequency_high_threshold: float, optional
"""
if frequency_low_threshold >= frequency_high_threshold:
raise WrongParameter("frequency_low_threshold should be lower than frequency_high_threshold")
self.frequency_low = frequency_low_threshold
self.frequency_high = frequency_high_threshold
[docs] def compute(self, signal) -> PSDResults:
"""
Computes all the biomarkers of this category.
:param signal: The SpO2 signal, of shape (N,)
:return: PSDResults class containing the following features:
* PSD_total: The amplitude of the spectral signal.
* PSD_band: The amplitude of the signal multiplied by a band-pass filter in the desired band.
* PSD_ratio: The ratio between PSD_total and PSD_band.
* PDS_peak: The max value of the FFT into the desired band.
Example:
.. code-block:: python
from pobm.obm.periodicity import PSDMeasures
# Initialize the class with the desired parameters
psd_class = PSDMeasures()
# Compute the biomarkers
results_PSD = psd_class.compute(spo2_signal)
"""
_check_shape_(signal)
signal = np.array(signal)
signal = signal[np.logical_not(np.isnan(signal))]
freq, signal_fft = self.__get_psd(signal)
amplitude_signal = np.sqrt((signal_fft.real ** 2) + (signal_fft.imag ** 2))
# taking only positive frequencies, since the signal is real.
freq = freq[0:int(len(freq) / 2)]
amplitude_signal = amplitude_signal[0:int(len(amplitude_signal) / 2)]
# Taking the spectral signal in the relevant band
amplitude_bp = self.__get_bandpass(amplitude_signal, freq, self.frequency_low, self.frequency_high)
if len(amplitude_bp) == 0:
return PSDResults(np.nansum(amplitude_signal), 0, 0, 0)
return PSDResults(np.nansum(amplitude_signal), np.nansum(amplitude_bp),
np.nansum(amplitude_bp) / np.nansum(amplitude_signal),
float(np.nanmax(amplitude_bp)))
@staticmethod
def __get_bandpass(signal, freq, lower_f, higher_f):
"""
Helper function, to get the amplitude within the desired band.
:param signal: The amplitude signal, of shape (L,)
:param freq: Array of frequencies, of shape (L,)
:param lower_f: The lower frequency of the band
:param higher_f: The higher frequency of the band
:return: The amplitude within the band
"""
amplitude_bp = signal[lower_f < freq]
freq_bp = freq[lower_f < freq]
amplitude_bp = amplitude_bp[freq_bp < higher_f]
return amplitude_bp
@staticmethod
def __get_psd(signal):
"""
Helper function, compute the PSD
:param signal: The SpO2 signal, of shape (N,)
:return:
freq: array of frequencies, of shape (L,)
signal_fft: The PSD of the signal, of shape (L,)
"""
# N = len(signal)
# w = hamming(N)
# signal_fft = np.fft.fft(signal * w) / N
# freq = np.fft.fftfreq(signal.shape[-1])
freq, signal_fft = welch(signal, window="hamming")
signal_fft = signal_fft / len(signal)
return freq, signal_fft