Skip to content

Commit f18f843

Browse files
committed
Fix consistency of streams and add experiment_names argument for openephysbinary
1 parent 070eb47 commit f18f843

File tree

1 file changed

+128
-87
lines changed

1 file changed

+128
-87
lines changed

neo/rawio/openephysbinaryrawio.py

Lines changed: 128 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
See
77
https://open-ephys.github.io/gui-docs/User-Manual/Recording-data/Binary-format.html
88
9-
Author: Julia Sprenger and Samuel Garcia
9+
Author: Julia Sprenger, Samuel Garcia, and Alessio Buccino
1010
"""
1111

1212

@@ -26,6 +26,19 @@ class OpenEphysBinaryRawIO(BaseRawIO):
2626
"""
2727
Handle several Blocks and several Segments.
2828
29+
Parameters
30+
----------
31+
dirname : str
32+
Path to Open Ephys directory
33+
experiment_names : str or list or None
34+
If multiple experiments are available, this argument allows users to select one or more experiments.
35+
If None, all experiements are loaded as blocks.
36+
E.g. `experiment_names="experiment2"`, `experiment_names=["experiment1", "experiment2"]`
37+
38+
Note
39+
----
40+
For multi-experiment datasets, the streams need to be consistent across experiments. If this is not the case,
41+
you can select a subset of experiments with the `experiment_names` argument
2942
3043
# Correspondencies
3144
Neo OpenEphys
@@ -40,31 +53,25 @@ class OpenEphysBinaryRawIO(BaseRawIO):
4053
extensions = []
4154
rawmode = 'one-dir'
4255

43-
def __init__(self, dirname=''):
56+
def __init__(self, dirname='', experiment_names=None):
4457
BaseRawIO.__init__(self)
4558
self.dirname = dirname
59+
if experiment_names is not None:
60+
if isinstance(experiment_names, str):
61+
experiment_names = [experiment_names]
62+
self.experiment_names = experiment_names
4663

4764
def _source_name(self):
4865
return self.dirname
4966

5067
def _parse_header(self):
51-
all_streams, nb_block, nb_segment_per_block = explore_folder(self.dirname)
68+
all_streams, nb_block, nb_segment_per_block, possible_experiments = explore_folder(self.dirname,
69+
self.experiment_names)
70+
check_stream_consistency(all_streams, nb_block, nb_segment_per_block, possible_experiments)
5271

53-
# streams can be different across blocks. Gather all and assign a stream index
54-
sig_stream_names = {}
55-
event_stream_names = {}
56-
sig_stream_index = 0
57-
evt_stream_index = 0
58-
59-
for block_index in range(nb_block):
60-
sig_stream_names[block_index] = {}
61-
for stream_name in sorted(list(all_streams[block_index][0]['continuous'].keys())):
62-
sig_stream_names[block_index][sig_stream_index] = stream_name
63-
sig_stream_index += 1
64-
event_stream_names[block_index] = {}
65-
for stream_name in sorted(list(all_streams[block_index][0]['events'].keys())):
66-
event_stream_names[block_index][evt_stream_index] = stream_name
67-
evt_stream_index += 1
72+
# all streams are consistent across blocks and segments
73+
sig_stream_names = sorted(list(all_streams[0][0]['continuous'].keys()))
74+
event_stream_names = sorted(list(all_streams[0][0]['events'].keys()))
6875

6976
# first loop to reassign stream by "stream_index" instead of "stream_name"
7077
self._sig_streams = {}
@@ -75,42 +82,40 @@ def _parse_header(self):
7582
for seg_index in range(nb_segment_per_block[block_index]):
7683
self._sig_streams[block_index][seg_index] = {}
7784
self._evt_streams[block_index][seg_index] = {}
78-
for stream_index, stream_name in sig_stream_names[block_index].items():
85+
for stream_index, stream_name in enumerate(sig_stream_names):
7986
d = all_streams[block_index][seg_index]['continuous'][stream_name]
8087
d['stream_name'] = stream_name
8188
self._sig_streams[block_index][seg_index][stream_index] = d
82-
for i, stream_name in event_stream_names[block_index].items():
89+
for i, stream_name in enumerate(event_stream_names):
8390
d = all_streams[block_index][seg_index]['events'][stream_name]
8491
d['stream_name'] = stream_name
8592
self._evt_streams[block_index][seg_index][i] = d
8693

8794
# signals zone
8895
# create signals channel map: several channel per stream
8996
signal_channels = []
90-
for block_index in range(nb_block):
91-
for stream_index, stream_name in sig_stream_names[block_index].items():
92-
# stream_index is the index in vector stream names
93-
stream_id = str(stream_index)
94-
d = self._sig_streams[block_index][0][stream_index]
95-
new_channels = []
96-
for chan_info in d['channels']:
97-
chan_id = chan_info['channel_name']
98-
if chan_info["units"] == "":
99-
# in some cases for some OE version the unit is "", but the gain is to "uV"
100-
units = "uV"
101-
else:
102-
units = chan_info["units"]
103-
new_channels.append((chan_info['channel_name'],
104-
chan_id, float(d['sample_rate']), d['dtype'], units,
105-
chan_info['bit_volts'], 0., stream_id))
106-
signal_channels.extend(new_channels)
97+
for stream_index, stream_name in enumerate(sig_stream_names):
98+
# stream_index is the index in vector sytream names
99+
stream_id = str(stream_index)
100+
d = self._sig_streams[0][0][stream_index]
101+
new_channels = []
102+
for chan_info in d['channels']:
103+
chan_id = chan_info['channel_name']
104+
if chan_info["units"] == "":
105+
# in some cases for some OE version the unit is "", but the gain is to "uV"
106+
units = "uV"
107+
else:
108+
units = chan_info["units"]
109+
new_channels.append((chan_info['channel_name'],
110+
chan_id, float(d['sample_rate']), d['dtype'], units,
111+
chan_info['bit_volts'], 0., stream_id))
112+
signal_channels.extend(new_channels)
107113
signal_channels = np.array(signal_channels, dtype=_signal_channel_dtype)
108114

109115
signal_streams = []
110-
for block_index in range(nb_block):
111-
for stream_index, stream_name in sig_stream_names[block_index].items():
112-
stream_id = str(stream_index)
113-
signal_streams.append((stream_name, stream_id))
116+
for stream_index, stream_name in enumerate(sig_stream_names):
117+
stream_id = str(stream_index)
118+
signal_streams.append((stream_name, stream_id))
114119
signal_streams = np.array(signal_streams, dtype=_signal_stream_dtype)
115120

116121
# create memmap for signals
@@ -125,44 +130,42 @@ def _parse_header(self):
125130
# events zone
126131
# channel map: one channel one stream
127132
event_channels = []
128-
for block_index in range(nb_block):
129-
for stream_ind, stream_name in event_stream_names[block_index].items():
130-
d = self._evt_streams[block_index][0][stream_ind]
131-
event_channels.append((d['channel_name'], stream_ind, 'event'))
133+
for stream_ind, stream_name in enumerate(event_stream_names):
134+
d = self._evt_streams[0][0][stream_ind]
135+
event_channels.append((d['channel_name'], stream_ind, 'event'))
132136
event_channels = np.array(event_channels, dtype=_event_channel_dtype)
133137

134138
# create memmap
135-
for block_index in range(nb_block):
136-
for stream_ind, stream_name in event_stream_names[block_index].items():
137-
# inject memmap loaded into main dict structure
138-
d = self._evt_streams[block_index][0][stream_ind]
139-
140-
for name in _possible_event_stream_names:
141-
if name + '_npy' in d:
142-
data = np.load(d[name + '_npy'], mmap_mode='r')
143-
d[name] = data
144-
145-
# check that events have timestamps
146-
assert 'timestamps' in d
147-
148-
# for event the neo "label" will change depending the nature
149-
# of event (ttl, text, binary)
150-
# and this is transform into unicode
151-
# all theses data are put in event array annotations
152-
if 'text' in d:
153-
# text case
154-
d['labels'] = d['text'].astype('U')
155-
elif 'metadata' in d:
156-
# binary case
157-
d['labels'] = d['channels'].astype('U')
158-
elif 'channels' in d:
159-
# ttl case use channels
160-
d['labels'] = d['channels'].astype('U')
161-
elif 'states' in d:
162-
# ttl case use states
163-
d['labels'] = d['states'].astype('U')
164-
else:
165-
raise ValueError(f'There is no possible labels for this event: {stream_name}')
139+
for stream_ind, stream_name in enumerate(event_stream_names):
140+
# inject memmap loaded into main dict structure
141+
d = self._evt_streams[0][0][stream_ind]
142+
143+
for name in _possible_event_stream_names:
144+
if name + '_npy' in d:
145+
data = np.load(d[name + '_npy'], mmap_mode='r')
146+
d[name] = data
147+
148+
# check that events have timestamps
149+
assert 'timestamps' in d
150+
151+
# for event the neo "label" will change depending the nature
152+
# of event (ttl, text, binary)
153+
# and this is transform into unicode
154+
# all theses data are put in event array annotations
155+
if 'text' in d:
156+
# text case
157+
d['labels'] = d['text'].astype('U')
158+
elif 'metadata' in d:
159+
# binary case
160+
d['labels'] = d['channels'].astype('U')
161+
elif 'channels' in d:
162+
# ttl case use channels
163+
d['labels'] = d['channels'].astype('U')
164+
elif 'states' in d:
165+
# ttl case use states
166+
d['labels'] = d['states'].astype('U')
167+
else:
168+
raise ValueError(f'There is no possible labels for this event: {stream_name}')
166169

167170
# no spike read yet
168171
# can be implemented on user demand
@@ -189,8 +192,8 @@ def _parse_header(self):
189192
global_t_stop = t_stop
190193

191194
# loop over events
192-
for stream_ind, stream_name in event_stream_names[block_index].items():
193-
d = self._evt_streams[block_index][0][stream_ind]
195+
for stream_index, stream_name in enumerate(event_stream_names):
196+
d = self._evt_streams[0][0][stream_index]
194197
if d['timestamps'].size == 0:
195198
continue
196199
t_start = d['timestamps'][0] / d['sample_rate']
@@ -220,9 +223,9 @@ def _parse_header(self):
220223
seg_ann = bl_ann['segments'][seg_index]
221224

222225
# array annotations for signal channels
223-
for stream_index, stream_name in sig_stream_names[block_index].items():
226+
for stream_index, stream_name in enumerate(sig_stream_names):
224227
sig_ann = seg_ann['signals'][stream_index]
225-
d = self._sig_streams[block_index][0][stream_index]
228+
d = self._sig_streams[0][0][stream_index]
226229
for k in ('identifier', 'history', 'source_processor_index',
227230
'recorded_processor_index'):
228231
if k in d['channels'][0]:
@@ -231,9 +234,9 @@ def _parse_header(self):
231234

232235
# array annotations for event channels
233236
# use other possible data in _possible_event_stream_names
234-
for stream_index, stream_name in event_stream_names[block_index].items():
237+
for stream_index, stream_name in enumerate(event_stream_names):
235238
ev_ann = seg_ann['events'][stream_index]
236-
d = self._evt_streams[block_index][0][stream_index]
239+
d = self._evt_streams[0][0][stream_index]
237240
for k in _possible_event_stream_names:
238241
if k in ('timestamps', ):
239242
continue
@@ -324,7 +327,7 @@ def _rescale_epoch_duration(self, raw_duration, dtype):
324327
'full_word', 'channel_states', 'data_array', 'metadata')
325328

326329

327-
def explore_folder(dirname):
330+
def explore_folder(dirname, experiment_names=None):
328331
"""
329332
Exploring the OpenEphys folder structure and structure.oebin
330333
@@ -349,6 +352,7 @@ def explore_folder(dirname):
349352
nb_segment_per_block = {}
350353
# nested dictionary: block_index > seg_index > data_type > stream_name
351354
all_streams = {}
355+
possible_experiment_names = []
352356
for root, dirs, files in os.walk(dirname):
353357
for file in files:
354358
if not file == 'structure.oebin':
@@ -361,6 +365,11 @@ def explore_folder(dirname):
361365
# so no node_name
362366
node_name = ''
363367

368+
# here we skip if self.experiment_names is not None
369+
experiment_name = root.parents[0].stem
370+
possible_experiment_names.append(experiment_name)
371+
if experiment_names is not None and experiment_name not in experiment_names:
372+
continue
364373
block_index = int(root.parents[0].stem.lower().replace('experiment', '')) - 1
365374
if block_index not in all_streams:
366375
all_streams[block_index] = {}
@@ -389,7 +398,13 @@ def explore_folder(dirname):
389398

390399
raw_filename = root / 'continuous' / d['folder_name'] / 'continuous.dat'
391400

392-
timestamp_file = root / 'continuous' / d['folder_name'] / 'timestamps.npy'
401+
# Updates for OpenEphys v0.6:
402+
# In new vesion (>=0.6) timestamps.npy is now called sample_numbers.npy
403+
# see https://open-ephys.github.io/gui-docs/User-Manual/Recording-data/Binary-format.html#continuous
404+
if (root / 'continuous' / d['folder_name'] / 'sample_numbers.npy').is_file():
405+
timestamp_file = root / 'continuous' / d['folder_name'] / 'sample_numbers.npy'
406+
else:
407+
timestamp_file = root / 'continuous' / d['folder_name'] / 'timestamps.npy'
393408
timestamps = np.load(str(timestamp_file), mmap_mode='r')
394409
timestamp0 = timestamps[0]
395410
t_start = timestamp0 / d['sample_rate']
@@ -415,6 +430,32 @@ def explore_folder(dirname):
415430

416431
all_streams[block_index][seg_index]['events'][stream_name] = event_stream
417432

418-
# TODO for later: check stream / channel consistency across segment
419-
420-
return all_streams, nb_block, nb_segment_per_block
433+
possible_experiment_names = list(np.unique(possible_experiment_names))
434+
435+
return all_streams, nb_block, nb_segment_per_block, possible_experiment_names
436+
437+
438+
def check_stream_consistency(all_streams, nb_block, nb_segment_per_block, possible_experiment_names=None):
439+
# "continuous" streams across segments
440+
for block_index in range(nb_block):
441+
segment_stream_names = None
442+
if nb_segment_per_block[block_index] > 1:
443+
for segment_index in range(nb_segment_per_block[block_index]):
444+
stream_names = sorted(list(all_streams[block_index][segment_index]["continuous"].keys()))
445+
if segment_stream_names is None:
446+
segment_stream_names = stream_names
447+
assert segment_stream_names == stream_names, \
448+
("Inconsistent continuous streams across segments! Streams for different segments in the "
449+
"same experiment must be the same. Check your open ephys folder.")
450+
451+
# "continuous" streams across blocks
452+
block_stream_names = None
453+
for block_index in range(nb_block):
454+
# use 1st segment
455+
stream_names = sorted(list(all_streams[block_index][0]["continuous"].keys()))
456+
if block_stream_names is None:
457+
block_stream_names = stream_names
458+
assert block_stream_names == stream_names, \
459+
(f"Inconsistent continuous streams across blocks (experiments)! Streams for different experiments in the "
460+
f"same folder must be the same. You can load a subset of experiments with the 'experiment_names' "
461+
f"argument: {possible_experiment_names}")

0 commit comments

Comments
 (0)