15
15
from ..base import BaseRaw
16
16
17
17
18
+ def _parse_date (dt ):
19
+ return datetime .fromisoformat (dt ).date ()
20
+
21
+
18
22
def _parse_patient_xml (tree ):
19
23
"""Convert an ElementTree to a dict."""
20
24
@@ -23,9 +27,6 @@ def _parse_sex(sex):
23
27
# F options and choosing one is mandatory. For `.otb4` the field is optional.
24
28
return dict (m = 1 , f = 2 )[sex .lower ()[0 ]] if sex else 0 # 0 means "unknown"
25
29
26
- def _parse_date (dt ):
27
- return datetime .fromisoformat (dt ).date ()
28
-
29
30
subj_info_mapping = (
30
31
("family_name" , "last_name" , str ),
31
32
("first_name" , "first_name" , str ),
@@ -42,6 +43,26 @@ def _parse_date(dt):
42
43
return subject_info
43
44
44
45
46
+ def _parse_otb_plus_metadata (metadata , extras_metadata ):
47
+ assert metadata .tag == "Device"
48
+ sfreq = float (metadata .attrib ["SampleFrequency" ])
49
+ n_chan = int (metadata .attrib ["DeviceTotalChannels" ])
50
+ bit_depth = int (metadata .attrib ["ad_bits" ])
51
+ model = metadata .attrib ["Name" ]
52
+ adc_range = 3.3
53
+ return dict (
54
+ sfreq = sfreq ,
55
+ n_chan = n_chan ,
56
+ bit_depth = bit_depth ,
57
+ model = model ,
58
+ adc_range = adc_range ,
59
+ )
60
+
61
+
62
+ def _parse_otb_four_metadata (metadata , extras_metadata ):
63
+ pass
64
+
65
+
45
66
@fill_doc
46
67
class RawOTB (BaseRaw ):
47
68
"""Raw object from an OTB file.
@@ -64,8 +85,7 @@ def __init__(self, fname, *, verbose=None):
64
85
# with permission to relicense as BSD-3 granted here:
65
86
# https://github.com/OTBioelettronica/OTB-Python/issues/2#issuecomment-2979135882
66
87
fname = str (_check_fname (fname , "read" , True , "fname" ))
67
- if fname .endswith (".otb4" ):
68
- raise NotImplementedError (".otb4 format is not yet supported" )
88
+ v4_format = fname .endswith (".otb4" )
69
89
logger .info (f"Loading { fname } " )
70
90
71
91
self .preload = True # lazy loading not supported
@@ -75,52 +95,46 @@ def __init__(self, fname, *, verbose=None):
75
95
ch_names = list ()
76
96
ch_types = list ()
77
97
78
- # TODO verify these are the only non-data channel IDs (other than "AUX*" which
79
- # are handled separately via glob)
98
+ # these are the only non-data channel IDs (besides "AUX*", handled via glob)
80
99
NON_DATA_CHS = ("Quaternion" , "BufferChannel" , "RampChannel" , "LoadCellChannel" )
81
- POWER_SUPPLY = 3.3 # volts
82
100
83
101
with tarfile .open (fname , "r" ) as fid :
84
102
fnames = fid .getnames ()
85
- # the .sig file is the binary channel data
86
- sig_fname = [_fname for _fname in fnames if _fname .endswith (".sig" )]
87
- if len (sig_fname ) != 1 :
88
- raise NotImplementedError (
89
- "multiple .sig files found in the OTB+ archive. Probably this "
90
- "means that an acquisition was imported into another session. "
91
- "This is not yet supported; please open an issue at "
92
- "https://github.com/mne-tools/mne-emg/issues if you want us to add "
93
- "such support."
94
- )
95
- sig_fname = sig_fname [0 ]
96
- data_size_bytes = fid .getmember (sig_fname ).size
97
- # the .xml file with the matching basename contains signal metadata
98
- metadata_fname = str (Path (sig_fname ).with_suffix (".xml" ))
99
- metadata = ET .fromstring (fid .extractfile (metadata_fname ).read ())
100
- # patient info
101
- patient_info_xml = ET .fromstring (fid .extractfile ("patient.xml" ).read ())
102
- # structure of `metadata` is:
103
- # Device
104
- # └ Channels
105
- # ├ Adapter
106
- # │ ├ Channel
107
- # │ ├ ...
108
- # │ └ Channel
109
- # ├ ...
110
- # └ Adapter
111
- # ├ Channel
112
- # ├ ...
113
- # └ Channel
114
- assert metadata .tag == "Device"
115
- sfreq = float (metadata .attrib ["SampleFrequency" ])
116
- n_chan = int (metadata .attrib ["DeviceTotalChannels" ])
117
- bit_depth = int (metadata .attrib ["ad_bits" ])
118
- model = metadata .attrib ["Name" ]
119
-
120
- # TODO we may not need this? only relevant for Quattrocento device, and `n_chan`
121
- # defined above should already be correct/sufficient
122
- # if model := metadata.attrib.get("Model"):
123
- # max_n_chan = int(model[-3:])
103
+ # the .sig file(s) are the binary channel data.
104
+ sig_fnames = [_fname for _fname in fnames if _fname .endswith (".sig" )]
105
+ # TODO ↓↓↓↓↓↓↓↓ make compatible with multiple sig_fnames
106
+ data_size_bytes = fid .getmember (sig_fnames [0 ]).size
107
+ # triage the file format versions
108
+ if v4_format :
109
+ metadata_fname = "DeviceParameters.xml"
110
+ extras_fname = "Tracks_000.xml"
111
+ parse_func = _parse_otb_four_metadata
112
+ else :
113
+ # .otb4 format may legitimately have multiple .sig files, but
114
+ # .otb+ should not (if it's truly raw data)
115
+ if len (sig_fnames ) > 1 :
116
+ raise NotImplementedError (
117
+ "multiple .sig files found in the OTB+ archive. Probably this "
118
+ "means that an acquisition was imported into another session. "
119
+ "This is not yet supported; please open an issue at "
120
+ "https://github.com/mne-tools/mne-emg/issues if you want us to "
121
+ "add such support."
122
+ )
123
+ # the .xml file with the matching basename contains signal metadata
124
+ metadata_fname = str (Path (sig_fnames [0 ]).with_suffix (".xml" ))
125
+ extras_fname = "patient.xml"
126
+ parse_func = _parse_otb_plus_metadata
127
+ # parse the XML into a tree
128
+ metadata_tree = ET .fromstring (fid .extractfile (metadata_fname ).read ())
129
+ extras_tree = ET .fromstring (fid .extractfile (extras_fname ).read ())
130
+ # extract what we need from the tree
131
+ metadata = parse_func (metadata_tree , extras_tree )
132
+ sfreq = metadata ["sfreq" ]
133
+ n_chan = metadata ["n_chan" ]
134
+ bit_depth = metadata ["bit_depth" ]
135
+ model = metadata ["model" ]
136
+ adc_range = metadata ["adc_range" ]
137
+
124
138
if bit_depth == 16 :
125
139
_dtype = np .int16
126
140
elif bit_depth == 24 : # EEG data recorded on OTB devices do this
@@ -140,10 +154,10 @@ def __init__(self, fname, *, verbose=None):
140
154
)
141
155
gains = np .full (n_chan , np .nan )
142
156
# check in advance where we'll need to append indices to uniquify ch_names
143
- n_ch_by_type = Counter ([ch .get ("ID" ) for ch in metadata .iter ("Channel" )])
157
+ n_ch_by_type = Counter ([ch .get ("ID" ) for ch in metadata_tree .iter ("Channel" )])
144
158
dupl_ids = [k for k , v in n_ch_by_type .items () if v > 1 ]
145
159
# iterate over adapters & channels to extract gain, filters, names, etc
146
- for adapter_ix , adapter in enumerate (metadata .iter ("Adapter" )):
160
+ for adapter_ix , adapter in enumerate (metadata_tree .iter ("Adapter" )):
147
161
adapter_ch_offset = int (adapter .get ("ChannelStartIndex" ))
148
162
adapter_gain = float (adapter .get ("Gain" ))
149
163
# we only care about lowpass/highpass on the data channels
@@ -188,28 +202,28 @@ def __init__(self, fname, *, verbose=None):
188
202
)
189
203
n_samples = int (n_samples )
190
204
191
- # check filter freqs.
192
- # TODO filter freqs can vary by adapter, so in theory we might get different
205
+ # check filter freqs. Can vary by adapter, so in theory we might get different
193
206
# filters for different *data* channels (not just different between data and
194
207
# misc/aux/whatever).
195
208
if len (highpass ) > 1 :
196
209
warn (
197
210
"More than one highpass frequency found in file; choosing lowest "
198
- f"({ min (highpass )} )"
211
+ f"({ min (highpass )} Hz )"
199
212
)
200
213
if len (lowpass ) > 1 :
201
214
warn (
202
215
"More than one lowpass frequency found in file; choosing highest "
203
- f"({ max (lowpass )} )"
216
+ f"({ max (lowpass )} Hz )"
204
217
)
205
218
highpass = min (highpass )
206
219
lowpass = max (lowpass )
207
220
208
221
# create info
209
222
info = create_info (ch_names = ch_names , ch_types = ch_types , sfreq = sfreq )
210
- subject_info = _parse_patient_xml (patient_info_xml )
211
- device_info = dict (type = "OTB" , model = model ) # TODO type, model, serial, site
212
- site = patient_info_xml .find ("place" )
223
+ subject_info = _parse_patient_xml (extras_tree )
224
+ device_info = dict (type = "OTB" , model = model ) # other allowed keys: serial
225
+ meas_date = extras_tree .find ("time" )
226
+ site = extras_tree .find ("place" )
213
227
if site is not None :
214
228
device_info .update (site = site .text )
215
229
info .update (subject_info = subject_info , device_info = device_info )
@@ -218,27 +232,26 @@ def __init__(self, fname, *, verbose=None):
218
232
info ["lowpass" ] = lowpass
219
233
for _ch in info ["chs" ]:
220
234
cal = 1 / 2 ** bit_depth / gains [ix + adapter_ch_offset ]
221
- _ch .update (cal = cal , range = POWER_SUPPLY )
222
- meas_date = patient_info_xml .find ("time" )
235
+ _ch .update (cal = cal , range = adc_range )
223
236
if meas_date is not None :
224
237
info ["meas_date" ] = datetime .fromisoformat (meas_date .text ).astimezone (
225
238
timezone .utc
226
239
)
227
240
228
241
# sanity check
229
- dur = patient_info_xml .find ("duration" )
242
+ dur = extras_tree .find ("duration" )
230
243
if dur is not None :
231
244
np .testing .assert_almost_equal (
232
245
float (dur .text ), n_samples / sfreq , decimal = 3
233
246
)
234
247
235
- # TODO other fields in patient_info_xml :
248
+ # TODO other fields in extras_tree :
236
249
# protocol_code, pathology, commentsPatient, comments
237
250
238
251
# TODO parse files markers_0.xml, markers_1.xml as annotations?
239
252
240
253
# populate raw_extras
241
- raw_extras = dict (dtype = _dtype , sig_fname = sig_fname )
254
+ raw_extras = dict (dtype = _dtype , sig_fnames = sig_fnames )
242
255
FORMAT_MAPPING = dict (
243
256
d = "double" ,
244
257
f = "single" ,
@@ -261,17 +274,24 @@ def __init__(self, fname, *, verbose=None):
261
274
def _preload_data (self , preload ):
262
275
"""Load raw data from an OTB+ file."""
263
276
_extras = self ._raw_extras [0 ]
264
- sig_fname = _extras ["sig_fname " ]
277
+ sig_fnames = _extras ["sig_fnames " ]
265
278
266
279
with tarfile .open (self .filenames [0 ], "r" ) as fid :
267
- _data = (
268
- np .frombuffer (
269
- fid .extractfile (sig_fname ).read (),
270
- dtype = _extras ["dtype" ],
280
+ _data = list ()
281
+ for sig_fname in sig_fnames :
282
+ _data .append (
283
+ np .frombuffer (
284
+ fid .extractfile (sig_fname ).read (),
285
+ dtype = _extras ["dtype" ],
286
+ )
287
+ .reshape (- 1 , self .info ["nchan" ])
288
+ .T
271
289
)
272
- .reshape (- 1 , self .info ["nchan" ])
273
- .T
274
- )
290
+ if len (_data ) == 1 :
291
+ _data = _data [0 ]
292
+ else :
293
+ _data = np .concatenate (_data , axis = 0 )
294
+
275
295
cals = np .array (
276
296
[
277
297
_ch ["cal" ] * _ch ["range" ] * _ch .get ("scale" , 1.0 )
@@ -283,7 +303,7 @@ def _preload_data(self, preload):
283
303
284
304
@fill_doc
285
305
def read_raw_otb (fname , verbose = None ) -> RawOTB :
286
- """Reader for an OTB (.otb4 /.otb+) recording.
306
+ """Reader for an OTB (.otb /.otb+/.otb4 ) recording.
287
307
288
308
Parameters
289
309
----------
0 commit comments