15
15
from ..base import BaseRaw
16
16
17
17
# these will all get mapped to `misc`. Quaternion channels are handled separately.
18
- _NON_DATA_CHS = ("buffer" , "ramp" , "loadcell" , "aux" )
18
+ _CONTROL_CHS = ("buffer" , "ramp" )
19
+ _AUX_CHS = ("loadcell" , "aux" )
20
+
21
+
22
+ def _get_str (node , tag ):
23
+ val = node .find (tag )
24
+ if val is not None :
25
+ return val .text
26
+
27
+
28
+ def _get_int (node , tag ):
29
+ return int (_get_str (node , tag ))
30
+
31
+
32
+ def _get_float (node , tag , ** replacements ):
33
+ # filter freqs may be "Unknown", can't blindly parse as floats
34
+ val = _get_str (node , tag )
35
+ return replacements [val ] if val in replacements else float (val )
19
36
20
37
21
38
def _parse_otb_plus_metadata (metadata , extras_metadata ):
@@ -75,9 +92,13 @@ def _parse_otb_plus_metadata(metadata, extras_metadata):
75
92
# 4-6: translations
76
93
if ch_id .startswith ("Quaternion" ):
77
94
ch_type = "chpi" # TODO verify
78
- scalings [gain_ix ] = 1e-3 # TODO CHPI is usually 1e-4
95
+ scalings [gain_ix ] = 1e-3 # CHPI is usually 1e-4; limbs move more
96
+ adc_ranges [gain_ix ] = 1.0
97
+ elif any (ch_id .lower ().startswith (_ch .lower ()) for _ch in _CONTROL_CHS ):
98
+ ch_type = "stim"
99
+ scalings [gain_ix ] = 1.0
79
100
adc_ranges [gain_ix ] = 1.0
80
- elif any (ch_id .lower ().startswith (_ch .lower ()) for _ch in _NON_DATA_CHS ):
101
+ elif any (ch_id .lower ().startswith (_ch .lower ()) for _ch in _AUX_CHS ):
81
102
ch_type = "misc"
82
103
scalings [gain_ix ] = 1.0
83
104
adc_ranges [gain_ix ] = 1.0
@@ -88,11 +109,6 @@ def _parse_otb_plus_metadata(metadata, extras_metadata):
88
109
ch_types .append (ch_type )
89
110
90
111
# parse subject info
91
- def get_str (node , tag ):
92
- val = node .find (tag )
93
- if val is not None :
94
- return val .text
95
-
96
112
def parse_date (dt ):
97
113
return datetime .fromisoformat (dt ).date ()
98
114
@@ -111,13 +127,13 @@ def parse_sex(sex):
111
127
)
112
128
subject_info = dict ()
113
129
for source , target , func in subj_info_mapping :
114
- value = get_str (extras_metadata , source )
130
+ value = _get_str (extras_metadata , source )
115
131
if value is not None :
116
132
subject_info [target ] = func (value )
117
133
118
- meas_date = get_str (extras_metadata , "time" )
119
- duration = get_str (extras_metadata , "duration" )
120
- site = get_str (extras_metadata , "place" )
134
+ meas_date = _get_str (extras_metadata , "time" )
135
+ duration = _get_float (extras_metadata , "duration" )
136
+ site = _get_str (extras_metadata , "place" )
121
137
122
138
return dict (
123
139
adc_range = adc_ranges ,
@@ -141,22 +157,11 @@ def parse_sex(sex):
141
157
142
158
143
159
def _parse_otb_four_metadata (metadata , extras_metadata ):
144
- def get_str (node , tag ):
145
- return node .find (tag ).text
146
-
147
- def get_int (node , tag ):
148
- return int (get_str (node , tag ))
149
-
150
- def get_float (node , tag , ** replacements ):
151
- # filter freqs may be "Unknown", can't blindly parse as floats
152
- val = get_str (node , tag )
153
- return replacements [val ] if val in replacements else float (val )
154
-
155
160
assert metadata .tag == "DeviceParameters"
156
161
# device-level metadata
157
- bit_depth = get_int (metadata , "AdBits" ) # TODO use `SampleSize * 8` instead?
158
- sfreq = get_float (metadata , "SamplingFrequency" )
159
- device_gain = get_float (metadata , "Gain" )
162
+ bit_depth = _get_int (metadata , "AdBits" ) # TODO use `SampleSize * 8` instead?
163
+ sfreq = _get_float (metadata , "SamplingFrequency" )
164
+ device_gain = _get_float (metadata , "Gain" )
160
165
# containers
161
166
gains = list ()
162
167
ch_names = list ()
@@ -178,23 +183,26 @@ def get_float(node, tag, **replacements):
178
183
for adapter in extras_metadata .iter ("TrackInfo" ):
179
184
strings = adapter .find ("StringsDescriptions" )
180
185
# expected to be same for all adapters
181
- bit_depths .add (get_int (adapter , "ADC_Nbits" ))
182
- device_names .add (get_str (adapter , "Device" ))
183
- sfreqs .add (get_int (adapter , "SamplingFrequency" ))
184
- durations .add (get_float (adapter , "TimeDuration" ))
186
+ bit_depths .add (_get_int (adapter , "ADC_Nbits" ))
187
+ device_names .add (_get_str (adapter , "Device" ))
188
+ sfreqs .add (_get_int (adapter , "SamplingFrequency" ))
189
+ durations .add (_get_float (adapter , "TimeDuration" ))
185
190
# may be different for each adapter
186
- adapter_adc_range = get_float (adapter , "ADC_Range" )
187
- adapter_id = get_str (adapter , "SubTitle" )
188
- adapter_gain = get_float (adapter , "Gain" )
189
- adapter_scaling = 1.0 / get_float (adapter , "UnitOfMeasurementFactor" )
190
- # ch_offset = get_int(adapter, "AcquisitionChannel")
191
- n_chans .append (get_int (adapter , "NumberOfChannels" ))
192
- paths .append (get_str (adapter , "SignalStreamPath" ))
193
- units .append (get_str (adapter , "UnitOfMeasurement" ))
191
+ adapter_adc_range = _get_float (adapter , "ADC_Range" )
192
+ adapter_id = _get_str (adapter , "SubTitle" )
193
+ adapter_gain = _get_float (adapter , "Gain" )
194
+ if adapter_id .startswith ("Quaternion" ):
195
+ adapter_scaling = 1e-3
196
+ else :
197
+ adapter_scaling = 1.0 / _get_float (adapter , "UnitOfMeasurementFactor" )
198
+ # ch_offset = _get_int(adapter, "AcquisitionChannel")
199
+ n_chans .append (_get_int (adapter , "NumberOfChannels" ))
200
+ paths .append (_get_str (adapter , "SignalStreamPath" ))
201
+ units .append (_get_str (adapter , "UnitOfMeasurement" ))
194
202
# we only really care about lowpass/highpass on the data channels
195
203
if adapter_id not in ("Quaternion" , "Buffer" , "Ramp" ):
196
- hp = get_float (strings , "HighPassFilter" , Unknown = None )
197
- lp = get_float (strings , "LowPassFilter" , Unknown = None )
204
+ hp = _get_float (strings , "HighPassFilter" , Unknown = None )
205
+ lp = _get_float (strings , "LowPassFilter" , Unknown = None )
198
206
if hp is not None :
199
207
highpass .append (hp )
200
208
if lp is not None :
@@ -207,8 +215,8 @@ def get_float(node, tag, **replacements):
207
215
# # FWIW in the example file: range for Buffer is 1-100,
208
216
# # Ramp and Control are -32767-32768, and
209
217
# # EMG chs are ±2.1237507098703645E-05
210
- # rmin = get_float (adapter, "RangeMin")
211
- # rmax = get_float (adapter, "RangeMax")
218
+ # rmin = _get_float (adapter, "RangeMin")
219
+ # rmax = _get_float (adapter, "RangeMax")
212
220
# if rmin.is_integer() and rmax.is_integer():
213
221
# rmin = int(rmin)
214
222
# rmax = int(rmax)
@@ -231,15 +239,18 @@ def get_float(node, tag, **replacements):
231
239
scalings .append (adapter_scaling )
232
240
# channel types
233
241
# TODO verify for quats & buffer channel
234
- # ramp and control channels definitely "MISC"
242
+ # ramp and control channels maybe "MISC", arguably "STIM"?
235
243
# quats should maybe be FIFF.FIFFV_QUAT_{N} (N from 0-6), but need to verify
236
244
# what quats should be, as there are only 4 quat channels. The FIFF quats:
237
- # 0: obsolete
245
+ # 0: obsolete (?)
238
246
# 1-3: rotations
239
247
# 4-6: translations
240
248
if adapter_id .startswith ("Quaternion" ):
241
249
ch_type = "chpi" # TODO verify
242
- elif any (adapter_id .lower ().startswith (_ch ) for _ch in _NON_DATA_CHS ):
250
+ # adc_ranges[gain_ix] = 1.0
251
+ elif any (adapter_id .lower ().startswith (_ch ) for _ch in _CONTROL_CHS ):
252
+ ch_type = "stim"
253
+ elif any (adapter_id .lower ().startswith (_ch ) for _ch in _AUX_CHS ):
243
254
ch_type = "misc"
244
255
else :
245
256
ch_type = "emg"
@@ -369,7 +380,7 @@ def __init__(self, fname, *, verbose=None):
369
380
# bit_depth seems to be unreliable for some OTB4 files, so let's check:
370
381
if duration is not None : # None for OTB+ files
371
382
expected_n_samp = int (duration * sfreq * n_chan )
372
- expected_bit_depth = int (8 * data_size_bytes / expected_n_samp )
383
+ expected_bit_depth = int (np . rint ( 8 * data_size_bytes / expected_n_samp ) )
373
384
if bit_depth != expected_bit_depth :
374
385
warn (
375
386
f"mismatch between file metadata `AdBits` ({ bit_depth } bit) and "
0 commit comments