|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +""" |
| 3 | +biosppy.quality |
| 4 | +------------- |
| 5 | +
|
| 6 | +This provides functions to assess the quality of several biosignals. |
| 7 | +
|
| 8 | +:copyright: (c) 2015-2023 by Instituto de Telecomunicacoes |
| 9 | +:license: BSD 3-clause, see LICENSE for more details. |
| 10 | +""" |
| 11 | + |
| 12 | +# Imports |
| 13 | +# compat |
| 14 | +from __future__ import absolute_import, division, print_function |
| 15 | + |
| 16 | +# local |
| 17 | +from . import utils |
| 18 | +from .signals import ecg, tools |
| 19 | + |
| 20 | +# 3rd party |
| 21 | +import numpy as np |
| 22 | + |
| 23 | + |
| 24 | +def quality_eda(x=None, methods=['bottcher'], sampling_rate=None): |
| 25 | + """Compute the quality index for one EDA segment. |
| 26 | +
|
| 27 | + Parameters |
| 28 | + ---------- |
| 29 | + x : array |
| 30 | + Input signal to test. |
| 31 | + methods : list |
| 32 | + Method to assess quality. One or more of the following: 'bottcher'. |
| 33 | + sampling_rate : int |
| 34 | + Sampling frequency (Hz). |
| 35 | + Returns |
| 36 | + ------- |
| 37 | + args : tuple |
| 38 | + Tuple containing the quality index for each method. |
| 39 | + names : tuple |
| 40 | + Tuple containing the name of each method. |
| 41 | + """ |
| 42 | + # check inputs |
| 43 | + if x is None: |
| 44 | + raise TypeError("Please specify the input signal.") |
| 45 | + |
| 46 | + if sampling_rate is None: |
| 47 | + raise TypeError("Please specify the sampling rate.") |
| 48 | + |
| 49 | + assert len(x) > sampling_rate * 2, 'Segment must be 5s long' |
| 50 | + |
| 51 | + args, names = (), () |
| 52 | + available_methods = ['bottcher'] |
| 53 | + |
| 54 | + for method in methods: |
| 55 | + |
| 56 | + assert method in available_methods, "Method should be one of the following: " + ", ".join(available_methods) |
| 57 | + |
| 58 | + if method == 'bottcher': |
| 59 | + quality = eda_sqi_bottcher(x, sampling_rate) |
| 60 | + |
| 61 | + args += (quality,) |
| 62 | + names += (method,) |
| 63 | + |
| 64 | + return utils.ReturnTuple(args, names) |
| 65 | + |
| 66 | + |
| 67 | +def quality_ecg(segment, methods=['Level3'], sampling_rate=None, |
| 68 | + fisher=True, f_thr=0.01, threshold=0.9, bit=0, |
| 69 | + nseg=1024, num_spectrum=[5, 20], dem_spectrum=None, |
| 70 | + mode_fsqi='simple'): |
| 71 | + |
| 72 | + """Compute the quality index for one ECG segment. |
| 73 | +
|
| 74 | + Parameters |
| 75 | + ---------- |
| 76 | + segment : array |
| 77 | + Input signal to test. |
| 78 | + method : string |
| 79 | + Method to assess quality. One of the following: 'Level3', 'pSQI', 'kSQI', 'fSQI'. |
| 80 | + sampling_rate : int |
| 81 | + Sampling frequency (Hz). |
| 82 | + threshold : float |
| 83 | + Threshold for the correlation coefficient. |
| 84 | + bit : int |
| 85 | + Number of bits of the ADC. Resolution bits, for the BITalino is 10 bits. |
| 86 | +
|
| 87 | + Returns |
| 88 | + ------- |
| 89 | + args : tuple |
| 90 | + Tuple containing the quality index for each method. |
| 91 | + names : tuple |
| 92 | + Tuple containing the name of each method. |
| 93 | + """ |
| 94 | + args, names = (), () |
| 95 | + available_methods = ['Level3', 'pSQI', 'kSQI', 'fSQI'] |
| 96 | + |
| 97 | + for method in methods: |
| 98 | + |
| 99 | + assert method in available_methods, 'Method should be one of the following: ' + ', '.join(available_methods) |
| 100 | + |
| 101 | + if method == 'Level3': |
| 102 | + # returns a SQI level 0, 0.5 or 1.0 |
| 103 | + quality = ecg_sqi_level3(segment, sampling_rate, threshold, bit) |
| 104 | + |
| 105 | + elif method == 'pSQI': |
| 106 | + quality = ecg.pSQI(segment, f_thr=f_thr) |
| 107 | + |
| 108 | + elif method == 'kSQI': |
| 109 | + quality = ecg.kSQI(segment, fisher=fisher) |
| 110 | + |
| 111 | + elif method == 'fSQI': |
| 112 | + quality = ecg.fSQI(segment, fs=sampling_rate, nseg=nseg, num_spectrum=num_spectrum, dem_spectrum=dem_spectrum, mode=mode_fsqi) |
| 113 | + |
| 114 | + args += (quality,) |
| 115 | + names += (method,) |
| 116 | + |
| 117 | + return utils.ReturnTuple(args, names) |
| 118 | + |
| 119 | + |
| 120 | +def ecg_sqi_level3(segment, sampling_rate, threshold, bit): |
| 121 | + |
| 122 | + """Compute the quality index for one ECG segment. The segment should have 10 seconds. |
| 123 | +
|
| 124 | +
|
| 125 | + Parameters |
| 126 | + ---------- |
| 127 | + segment : array |
| 128 | + Input signal to test. |
| 129 | + sampling_rate : int |
| 130 | + Sampling frequency (Hz). |
| 131 | + threshold : float |
| 132 | + Threshold for the correlation coefficient. |
| 133 | + bit : int |
| 134 | + Number of bits of the ADC.? Resolution bits, for the BITalino is 10 bits. |
| 135 | + |
| 136 | + Returns |
| 137 | + ------- |
| 138 | + quality : string |
| 139 | + Signal Quality Index ranging between 0 (LQ), 0.5 (MQ) and 1.0 (HQ). |
| 140 | +
|
| 141 | + """ |
| 142 | + LQ, MQ, HQ = 0.0, 0.5, 1.0 |
| 143 | + |
| 144 | + if bit != 0: |
| 145 | + if (max(segment) - min(segment)) >= (2**bit - 1): |
| 146 | + return LQ |
| 147 | + if sampling_rate is None: |
| 148 | + raise IOError('Sampling frequency is required') |
| 149 | + if len(segment) < sampling_rate * 5: |
| 150 | + raise IOError('Segment must be 5s long') |
| 151 | + else: |
| 152 | + # TODO: compute ecg quality when in contact with the body |
| 153 | + rpeak1 = ecg.hamilton_segmenter(segment, sampling_rate=sampling_rate)['rpeaks'] |
| 154 | + rpeak1 = ecg.correct_rpeaks(signal=segment, rpeaks=rpeak1, sampling_rate=sampling_rate, tol=0.05)['rpeaks'] |
| 155 | + if len(rpeak1) < 2: |
| 156 | + return LQ |
| 157 | + else: |
| 158 | + hr = sampling_rate * (60/np.diff(rpeak1)) |
| 159 | + quality = MQ if (max(hr) <= 200 and min(hr) >= 40) else LQ |
| 160 | + if quality == MQ: |
| 161 | + templates, _ = ecg.extract_heartbeats(signal=segment, rpeaks=rpeak1, sampling_rate=sampling_rate, before=0.2, after=0.4) |
| 162 | + corr_points = np.corrcoef(templates) |
| 163 | + if np.mean(corr_points) > threshold: |
| 164 | + quality = HQ |
| 165 | + |
| 166 | + return quality |
| 167 | + |
| 168 | + |
| 169 | +def eda_sqi_bottcher(x=None, sampling_rate=None): # -> Timeline |
| 170 | + """ |
| 171 | + Suggested by Böttcher et al. Scientific Reports, 2022, for wearable wrist EDA. |
| 172 | +
|
| 173 | + This is given by a binary score 0/1 defined by the following rules: |
| 174 | + - mean of the segment of 2 seconds should be > 0.05 |
| 175 | + - rate of amplitude change (given by racSQI) should be < 0.2 |
| 176 | + This score is calculated for each 2 seconds window of the segment. The average of the scores is the final SQI. |
| 177 | +
|
| 178 | + This method was designed for a segment of 60s |
| 179 | +
|
| 180 | + """ |
| 181 | + quality_score = 0 |
| 182 | + if x is None: |
| 183 | + raise TypeError("Please specify the input signal.") |
| 184 | + if sampling_rate is None: |
| 185 | + raise TypeError("Please specify the sampling rate.") |
| 186 | + |
| 187 | + if len(x) < sampling_rate * 60: |
| 188 | + print("This method was designed for a signal of 60s but will be applied to a signal of {}s".format(len(x)/sampling_rate)) |
| 189 | + # create segments of 2 seconds |
| 190 | + segments_2s = x.reshape(-1, int(sampling_rate*2)) |
| 191 | + ## compute racSQI for each segment |
| 192 | + # first compute the min and max of each segment |
| 193 | + min_ = np.min(segments_2s, axis=1) |
| 194 | + max_ = np.max(segments_2s, axis=1) |
| 195 | + # then compute the RAC (max-min)/max |
| 196 | + rac = np.abs((max_ - min_) / max_) |
| 197 | + # ratio will be 1 if the rac is < 0.2 and if the mean of the segment is > 0.05 and will be 0 otherwise |
| 198 | + quality_score = ((rac < 0.2) & (np.mean(segments_2s, axis=1) > 0.05)).astype(int) |
| 199 | + # the final SQI is the average of the scores |
| 200 | + return np.mean(quality_score) |
| 201 | + |
0 commit comments