15
15
from ...utils import _check_fname , fill_doc , logger , verbose , warn
16
16
from ..base import BaseRaw
17
17
18
- # these are the only non-data channel IDs (besides "AUX*", handled via glob)
19
- _NON_DATA_CHS = ("Quaternion " , "BufferChannel " , "RampChannel " , "LoadCellChannel " )
18
+ # these will all get mapped to `misc`. Quaternion channels are handled separately.
19
+ _NON_DATA_CHS = ("buffer " , "ramp " , "loadcell " , "aux " )
20
20
21
21
22
22
def _parse_date (dt ):
@@ -50,13 +50,14 @@ def _parse_sex(sex):
50
50
def _parse_otb_plus_metadata (metadata , extras_metadata ):
51
51
assert metadata .tag == "Device"
52
52
# device-level metadata
53
- sfreq = float (metadata .attrib ["SampleFrequency" ])
54
- n_chan = int (metadata .attrib ["DeviceTotalChannels" ])
53
+ adc_range = 0.0033 # 3.3 mV (TODO VERIFY)
55
54
bit_depth = int (metadata .attrib ["ad_bits" ])
56
55
device_name = metadata .attrib ["Name" ]
57
- adc_range = 0.0033 # 3.3 mV (TODO VERIFY)
56
+ n_chan = int (metadata .attrib ["DeviceTotalChannels" ])
57
+ sfreq = float (metadata .attrib ["SampleFrequency" ])
58
58
# containers
59
59
gains = np .full (n_chan , np .nan )
60
+ scalings = np .full (n_chan , np .nan )
60
61
ch_names = list ()
61
62
ch_types = list ()
62
63
highpass = list ()
@@ -93,19 +94,22 @@ def _parse_otb_plus_metadata(metadata, extras_metadata):
93
94
# store gains
94
95
gain_ix = ix + ch_offset
95
96
gains [gain_ix ] = float (ch .get ("Gain" )) * adapter_gain
96
- # TODO verify ch_type for quats, buffer channel, and ramp channel
97
- # ramp and controls channels definitely "MISC"
97
+ # TODO verify ch_type for quats & buffer channel
98
+ # ramp and control channels definitely "MISC"
98
99
# quats should maybe be FIFF.FIFFV_QUAT_{N} (N from 0-6), but need to verify
99
100
# what quats should be, as there are only 4 quat channels. The FIFF quats:
100
101
# 0: obsolete
101
102
# 1-3: rotations
102
103
# 4-6: translations
103
- if ch_id == "Quaternions" :
104
- ch_type = FIFF .FIFFV_QUAT_0 # TODO verify
105
- elif ch_id in _NON_DATA_CHS or ch_id .lower ().startswith ("aux" ):
104
+ if ch_id .startswith ("Quaternion" ):
105
+ ch_type = "chpi" # TODO verify
106
+ scalings [gain_ix ] = 1e-4
107
+ elif any (ch_id .lower ().startswith (_ch .lower ()) for _ch in _NON_DATA_CHS ):
106
108
ch_type = "misc"
109
+ scalings [gain_ix ] = 1.0
107
110
else :
108
111
ch_type = "emg"
112
+ scalings [gain_ix ] = 1.0
109
113
ch_types .append (ch_type )
110
114
# parse subject info
111
115
subject_info = _parse_patient_xml (extras_metadata )
@@ -122,11 +126,151 @@ def _parse_otb_plus_metadata(metadata, extras_metadata):
122
126
ch_types = ch_types ,
123
127
highpass = highpass ,
124
128
lowpass = lowpass ,
129
+ units = None ,
130
+ scalings = scalings ,
131
+ signal_paths = None ,
125
132
)
126
133
127
134
128
135
def _parse_otb_four_metadata (metadata , extras_metadata ):
129
- pass
136
+ def get_str (node , tag ):
137
+ return node .find (tag ).text
138
+
139
+ def get_int (node , tag ):
140
+ return int (get_str (node , tag ))
141
+
142
+ def get_float (node , tag , ** replacements ):
143
+ # filter freqs may be "Unknown", can't blindly parse as floats
144
+ val = get_str (node , tag )
145
+ return replacements .get (val , float (val ))
146
+
147
+ assert metadata .tag == "DeviceParameters"
148
+ # device-level metadata
149
+ adc_range = float (metadata .find ("ADC_Range" ).text )
150
+ bit_depth = int (metadata .find ("AdBits" ).text )
151
+ n_chan = int (metadata .find ("TotalChannelsInFile" ).text )
152
+ sfreq = float (metadata .find ("SamplingFrequency" ).text )
153
+ # containers
154
+ gains = np .full (n_chan , np .nan )
155
+ ch_names = list ()
156
+ ch_types = list ()
157
+ highpass = list ()
158
+ lowpass = list ()
159
+ n_chans = list ()
160
+ paths = list ()
161
+ scalings = list ()
162
+ units = list ()
163
+ # stored per-adapter, but should be uniform
164
+ adc_ranges = set ()
165
+ bit_depths = set ()
166
+ device_names = set ()
167
+ meas_dates = set ()
168
+ sfreqs = set ()
169
+ # adapter-level metadata
170
+ for adapter in extras_metadata .iter ("TrackInfo" ):
171
+ # expected to be same for all adapters
172
+ adc_ranges .add (get_float (adapter , "ADC_Range" ))
173
+ bit_depths .add (get_int (adapter , "ADC_Nbits" ))
174
+ device_names .add (get_str (adapter , "Device" ))
175
+ sfreqs .add (get_int (adapter , "SamplingFrequency" ))
176
+ # may be different for each adapter
177
+ adapter_id = get_str (adapter , "SubTitle" )
178
+ adapter_gain = get_float (adapter , "Gain" )
179
+ ch_offset = get_int (adapter , "AcquisitionChannel" )
180
+ n_chans .append (get_int (adapter , "NumberOfChannels" ))
181
+ paths .append (get_str (adapter , "SignalStreamPath" ))
182
+ scalings .append (get_str (adapter , "UnitOfMeasurementFactor" ))
183
+ units .append (get_str (adapter , "UnitOfMeasurement" ))
184
+ # we only really care about lowpass/highpass on the data channels
185
+ if adapter_id not in ("Quaternion" , "Buffer" , "Ramp" ):
186
+ highpass = get_float (adapter , "HighPassFilter" , Unknown = None )
187
+ lowpass = get_float (adapter , "LowPassFilter" , Unknown = None )
188
+ # meas_date
189
+ meas_date = adapter .find ("StringsDescriptions" ).find ("StartDate" )
190
+ if meas_date is not None :
191
+ meas_dates .add (_parse_date (meas_date .text ).astimezone (timezone .utc ))
192
+ # # range (TODO maybe not needed; might be just for mfg's GUI?)
193
+ # # FWIW in the example file: range for Buffer is 1-100,
194
+ # # Ramp and Control are -32767-32768, and
195
+ # # EMG chs are ±2.1237507098703645E-05
196
+ # rmin = get_float(adapter, "RangeMin")
197
+ # rmax = get_float(adapter, "RangeMax")
198
+ # if rmin.is_integer() and rmax.is_integer():
199
+ # rmin = int(rmin)
200
+ # rmax = int(rmax)
201
+
202
+ # extract channel-specific info ↓ not a typo
203
+ for ch in adapter .get ("Channels" ).iter ("ChannelRapresentation" ):
204
+ # channel names
205
+ ix = int (ch .find ("Index" ).text )
206
+ ch_name = ch .find ("Label" ).text
207
+ try :
208
+ _ = int (ch_name )
209
+ except ValueError :
210
+ pass
211
+ else :
212
+ ch_name = f"{ adapter_id } _{ ix } "
213
+ ch_names .append (ch_name )
214
+ # gains
215
+ gains [ix + ch_offset ] = adapter_gain
216
+ # channel types
217
+ # TODO verify for quats & buffer channel
218
+ # ramp and control channels definitely "MISC"
219
+ # quats should maybe be FIFF.FIFFV_QUAT_{N} (N from 0-6), but need to verify
220
+ # what quats should be, as there are only 4 quat channels. The FIFF quats:
221
+ # 0: obsolete
222
+ # 1-3: rotations
223
+ # 4-6: translations
224
+ if adapter_id .startswith ("Quaternion" ):
225
+ ch_type = "chpi" # TODO verify
226
+ elif any (adapter_id .lower ().startswith (_ch ) for _ch in _NON_DATA_CHS ):
227
+ ch_type = "misc"
228
+ else :
229
+ ch_type = "emg"
230
+ ch_types .append (ch_type )
231
+
232
+ # validate the fields stored at adapter level, but that ought to be uniform:
233
+ def check_uniform (name , adapter_values , device_value = None ):
234
+ if len (adapter_values ) > 1 :
235
+ raise RuntimeError (
236
+ f"multiple { name } s found ({ ', ' .join (sorted (adapter_values ))} ), "
237
+ "this is not yet supported"
238
+ )
239
+ adapter_value = adapter_values .pop ()
240
+ if device_value is not None and device_value != adapter_value :
241
+ raise RuntimeError (
242
+ f"mismatch between device-level { name } ({ device_value } ) and "
243
+ f"adapter-level { name } ({ adapter_value } )"
244
+ )
245
+ return adapter_value
246
+
247
+ device_name = check_uniform ("device name" , device_names )
248
+ adc_range = check_uniform ("analog-to-digital range" , adc_ranges , adc_range )
249
+ bit_depth = check_uniform ("bit depth" , bit_depths , bit_depth )
250
+ sfreq = check_uniform ("sampling frequency" , sfreqs , sfreq )
251
+
252
+ # verify number of channels in device metadata matches sum of adapters
253
+ assert sum (n_chans ) == n_chan , (
254
+ f"total channels ({ n_chan } ) doesn't match sum of channels for each adapter "
255
+ f"({ sum (n_chans )} )"
256
+ )
257
+
258
+ return dict (
259
+ adc_range = adc_range ,
260
+ bit_depth = bit_depth ,
261
+ ch_names = ch_names ,
262
+ ch_types = ch_types ,
263
+ device_name = device_name ,
264
+ gains = gains ,
265
+ highpass = highpass ,
266
+ lowpass = lowpass ,
267
+ n_chan = n_chan ,
268
+ scalings = scalings ,
269
+ sfreq = sfreq ,
270
+ signal_paths = paths ,
271
+ subject_info = dict (),
272
+ units = units ,
273
+ )
130
274
131
275
132
276
@fill_doc
@@ -160,8 +304,9 @@ def __init__(self, fname, *, verbose=None):
160
304
fnames = fid .getnames ()
161
305
# the .sig file(s) are the binary channel data.
162
306
sig_fnames = [_fname for _fname in fnames if _fname .endswith (".sig" )]
163
- # TODO ↓↓↓↓↓↓↓↓ make compatible with multiple sig_fnames
164
- data_size_bytes = fid .getmember (sig_fnames [0 ]).size
307
+ # TODO ↓↓↓↓↓↓↓↓ this may be wrong for Novecento+ devices
308
+ # (MATLAB code appears to skip the first sig_fname)
309
+ data_size_bytes = sum (fid .getmember (_fname ).size for _fname in sig_fnames )
165
310
# triage the file format versions
166
311
if v4_format :
167
312
metadata_fname = "DeviceParameters.xml"
@@ -187,17 +332,20 @@ def __init__(self, fname, *, verbose=None):
187
332
extras_tree = ET .fromstring (fid .extractfile (extras_fname ).read ())
188
333
# extract what we need from the tree
189
334
metadata = parse_func (metadata_tree , extras_tree )
190
- sfreq = metadata ["sfreq" ]
191
- n_chan = metadata ["n_chan" ]
192
- bit_depth = metadata ["bit_depth" ]
193
- device_name = metadata ["device_name" ]
194
335
adc_range = metadata ["adc_range" ]
195
- subject_info = metadata ["subject_info " ]
336
+ bit_depth = metadata ["bit_depth " ]
196
337
ch_names = metadata ["ch_names" ]
197
338
ch_types = metadata ["ch_types" ]
339
+ device_name = metadata ["device_name" ]
198
340
gains = metadata ["gains" ]
199
341
highpass = metadata ["highpass" ]
200
342
lowpass = metadata ["lowpass" ]
343
+ n_chan = metadata ["n_chan" ]
344
+ scalings = metadata ["scalings" ]
345
+ sfreq = metadata ["sfreq" ]
346
+ signal_paths = metadata ["signal_paths" ]
347
+ subject_info = metadata ["subject_info" ]
348
+ # units = metadata["units"] # TODO needed for orig_units maybe
201
349
202
350
if bit_depth == 16 :
203
351
_dtype = np .int16
@@ -206,7 +354,7 @@ def __init__(self, fname, *, verbose=None):
206
354
# https://stackoverflow.com/a/34128171
207
355
# https://stackoverflow.com/a/11967503
208
356
raise NotImplementedError (
209
- "OTB+ files with 24-bit data are not yet supported."
357
+ "OTB files with 24-bit data are not yet supported."
210
358
)
211
359
else :
212
360
raise NotImplementedError (
@@ -256,8 +404,8 @@ def __init__(self, fname, *, verbose=None):
256
404
info ["highpass" ] = highpass
257
405
info ["lowpass" ] = lowpass
258
406
for ix , _ch in enumerate (info ["chs" ]):
259
- cal = 1 / 2 ** bit_depth / gains [ ix ]
260
- # TODO need different range for Quaternions?
407
+ # divisor = 1.0 if _ch["kind"] == FIFF.FIFFV_MISC_CH else 2**bit_depth
408
+ cal = 1 / 2 ** bit_depth / gains [ ix ] * scalings [ ix ]
261
409
_range = (
262
410
adc_range
263
411
if _ch ["kind" ] in (FIFF .FIFFV_EMG_CH , FIFF .FIFFV_EEG_CH )
@@ -269,7 +417,7 @@ def __init__(self, fname, *, verbose=None):
269
417
timezone .utc
270
418
)
271
419
272
- # sanity check
420
+ # verify duration from metadata matches n_samples/sfreq
273
421
dur = extras_tree .find ("duration" )
274
422
if dur is not None :
275
423
np .testing .assert_almost_equal (
@@ -282,7 +430,12 @@ def __init__(self, fname, *, verbose=None):
282
430
# TODO parse files markers_0.xml, markers_1.xml as annotations?
283
431
284
432
# populate raw_extras
285
- raw_extras = dict (dtype = _dtype , sig_fnames = sig_fnames )
433
+ raw_extras = dict (
434
+ device_name = device_name ,
435
+ dtype = _dtype ,
436
+ sig_fnames = sig_fnames ,
437
+ signal_paths = signal_paths ,
438
+ )
286
439
FORMAT_MAPPING = dict (
287
440
d = "double" ,
288
441
f = "single" ,
@@ -306,6 +459,9 @@ def _preload_data(self, preload):
306
459
"""Load raw data from an OTB+ file."""
307
460
_extras = self ._raw_extras [0 ]
308
461
sig_fnames = _extras ["sig_fnames" ]
462
+ # if device_name=="Novecento+" then we may need these:
463
+ # sig_paths = _extras["signal_paths"]
464
+ # device_name = _extras["device_name"]
309
465
310
466
with tarfile .open (self .filenames [0 ], "r" ) as fid :
311
467
_data = list ()
@@ -318,10 +474,7 @@ def _preload_data(self, preload):
318
474
.reshape (- 1 , self .info ["nchan" ])
319
475
.T
320
476
)
321
- if len (_data ) == 1 :
322
- _data = _data [0 ]
323
- else :
324
- _data = np .concatenate (_data , axis = 0 )
477
+ _data = np .concatenate (_data , axis = 0 ) # no-op if len(_data) == 1
325
478
326
479
cals = np .array ([_ch ["cal" ] * _ch ["range" ] for _ch in self .info ["chs" ]])
327
480
self ._data = _data * cals [:, np .newaxis ]
0 commit comments