Skip to content

Commit cd0d75b

Browse files
authored
Merge pull request #1387 from samuelgarcia/fix_openephys_legacy
openephys legacy format : handle gaps more correctly
2 parents ca7e142 + 4864c3c commit cd0d75b

File tree

3 files changed

+121
-78
lines changed

3 files changed

+121
-78
lines changed

neo/rawio/openephysrawio.py

Lines changed: 117 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -53,25 +53,31 @@ class OpenEphysRawIO(BaseRawIO):
5353
Limitation :
5454
* Works only if all continuous channels have the same sampling rate, which is a reasonable
5555
hypothesis.
56-
* When the recording is stopped and restarted all continuous files will contain gaps.
57-
Ideally this would lead to a new Segment but this use case is not implemented due to its
58-
complexity.
59-
Instead it will raise an error.
60-
61-
Special cases:
62-
* Normally all continuous files have the same first timestamp and length. In situations
63-
where it is not the case all files are clipped to the smallest one so that they are all
64-
aligned,
65-
and a warning is emitted.
56+
* A recording can contain gaps due to USB stream loss when high CPU load when recording.
57+
Theses gaps are checked channel per channel which make the parse_header() slow.
58+
If gaps are detected then they are filled with zeros but the the reading will be much slower for getting signals.
59+
60+
Parameters
61+
----------
62+
dirname: str
63+
The directory where the files are stored.
64+
ignore_timestamps_errors: bool
65+
(deprecated) This parameter is not used anymore.
66+
fill_gap_value: int
67+
When gaps are detected in continuous files, the gap is filled with this value.
68+
Default is 0.
69+
6670
"""
6771
# file formats used by openephys
6872
extensions = ['continuous', 'openephys', 'spikes', 'events', 'xml']
6973
rawmode = 'one-dir'
7074

71-
def __init__(self, dirname='', ignore_timestamps_errors=False):
75+
def __init__(self, dirname='', ignore_timestamps_errors=None, fill_gap_value=0):
7276
BaseRawIO.__init__(self)
7377
self.dirname = dirname
74-
self._ignore_timestamps_errors = ignore_timestamps_errors
78+
self.fill_gap_value = int(fill_gap_value)
79+
if ignore_timestamps_errors is not None:
80+
self.logger.warning("OpenEphysRawIO ignore_timestamps_errors=True/False is not used anymore")
7581

7682
def _source_name(self):
7783
return self.dirname
@@ -84,12 +90,14 @@ def _parse_header(self):
8490
self._sigs_memmap = {}
8591
self._sig_length = {}
8692
self._sig_timestamp0 = {}
93+
self._sig_has_gap = {}
94+
self._gap_mode = False
8795
signal_channels = []
8896
oe_indices = sorted(list(info['continuous'].keys()))
8997
for seg_index, oe_index in enumerate(oe_indices):
9098
self._sigs_memmap[seg_index] = {}
99+
self._sig_has_gap[seg_index] = {}
91100

92-
all_sigs_length = []
93101
all_first_timestamps = []
94102
all_last_timestamps = []
95103
all_samplerate = []
@@ -109,18 +117,18 @@ def _parse_header(self):
109117
dtype=continuous_dtype, shape=(size, ))
110118
self._sigs_memmap[seg_index][chan_index] = data_chan
111119

112-
all_sigs_length.append(data_chan.size * RECORD_SIZE)
113120
all_first_timestamps.append(data_chan[0]['timestamp'])
114-
all_last_timestamps.append(data_chan[-1]['timestamp'])
121+
all_last_timestamps.append(data_chan[-1]['timestamp'] + RECORD_SIZE)
115122
all_samplerate.append(chan_info['sampleRate'])
116123

117124
# check for continuity (no gaps)
118125
diff = np.diff(data_chan['timestamp'])
119-
if not np.all(diff == RECORD_SIZE) and not self._ignore_timestamps_errors:
120-
raise ValueError(
121-
'Not continuous timestamps for {}. ' \
122-
'Maybe because recording was paused/stopped.'.format(continuous_filename)
123-
)
126+
channel_has_gaps = not np.all(diff == RECORD_SIZE)
127+
self._sig_has_gap[seg_index][chan_index] = channel_has_gaps
128+
129+
if channel_has_gaps:
130+
# protect against strange timestamp block like in file 'OpenEphys_SampleData_3' CH32
131+
assert np.median(diff) == RECORD_SIZE, f"This file has a non valid data block size for channel {chan_id}, this case cannot be handled"
124132

125133
if seg_index == 0:
126134
# add in channel list
@@ -130,46 +138,39 @@ def _parse_header(self):
130138
units = 'V'
131139
signal_channels.append((ch_name, chan_id, chan_info['sampleRate'],
132140
'int16', units, chan_info['bitVolts'], 0., processor_id))
133-
134-
# In some cases, continuous do not have the same length because
135-
# one record block is missing when the "OE GUI is freezing"
136-
# So we need to clip to the smallest files
137-
if not all(all_sigs_length[0] == e for e in all_sigs_length) or\
138-
not all(all_first_timestamps[0] == e for e in all_first_timestamps):
139-
141+
142+
if any(self._sig_has_gap[seg_index].values()):
143+
channel_with_gaps = list(self._sig_has_gap[seg_index].keys())
144+
self.logger.warning(f"This OpenEphys dataset contains gaps for some channels {channel_with_gaps} in segment {seg_index} the read will be slow")
145+
self._gap_mode = True
146+
147+
148+
if not all(all_first_timestamps[0] == e for e in all_first_timestamps) or \
149+
not all(all_last_timestamps[0] == e for e in all_last_timestamps):
150+
# In some cases, continuous do not have the same length because
151+
# we need to clip
140152
self.logger.warning('Continuous files do not have aligned timestamps; '
141153
'clipping to make them aligned.')
142154

143-
first, last = -np.inf, np.inf
155+
first = max(all_first_timestamps)
156+
last = min(all_last_timestamps)
144157
for chan_index in self._sigs_memmap[seg_index]:
145158
data_chan = self._sigs_memmap[seg_index][chan_index]
146-
if data_chan[0]['timestamp'] > first:
147-
first = data_chan[0]['timestamp']
148-
if data_chan[-1]['timestamp'] < last:
149-
last = data_chan[-1]['timestamp']
150-
151-
all_sigs_length = []
152-
all_first_timestamps = []
153-
all_last_timestamps = []
154-
for chan_index in self._sigs_memmap[seg_index]:
155-
data_chan = self._sigs_memmap[seg_index][chan_index]
156-
keep = (data_chan['timestamp'] >= first) & (data_chan['timestamp'] <= last)
159+
keep = (data_chan['timestamp'] >= first) & (data_chan['timestamp'] < last)
157160
data_chan = data_chan[keep]
158161
self._sigs_memmap[seg_index][chan_index] = data_chan
159-
all_sigs_length.append(data_chan.size * RECORD_SIZE)
160-
all_first_timestamps.append(data_chan[0]['timestamp'])
161-
all_last_timestamps.append(data_chan[-1]['timestamp'])
162-
163-
# check that all signals have the same length and timestamp0 for this segment
164-
assert all(all_sigs_length[0] == e for e in all_sigs_length),\
165-
'Not all signals have the same length'
166-
assert all(all_first_timestamps[0] == e for e in all_first_timestamps),\
167-
'Not all signals have the same first timestamp'
162+
else:
163+
# no clip
164+
first = all_first_timestamps[0]
165+
last = all_last_timestamps[0]
166+
167+
168+
# check unique sampling rate
168169
assert all(all_samplerate[0] == e for e in all_samplerate),\
169170
'Not all signals have the same sample rate'
170171

171-
self._sig_length[seg_index] = all_sigs_length[0]
172-
self._sig_timestamp0[seg_index] = all_first_timestamps[0]
172+
self._sig_length[seg_index] = last - first
173+
self._sig_timestamp0[seg_index] = first
173174

174175
if len(signal_channels) > 0:
175176
signal_channels = np.array(signal_channels, dtype=_signal_channel_dtype)
@@ -316,23 +317,73 @@ def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop,
316317
if i_stop is None:
317318
i_stop = self._sig_length[seg_index]
318319

319-
block_start = i_start // RECORD_SIZE
320-
block_stop = i_stop // RECORD_SIZE + 1
321-
sl0 = i_start % RECORD_SIZE
322-
sl1 = sl0 + (i_stop - i_start)
323-
324320
stream_id = self.header['signal_streams'][stream_index]['id']
325321
mask = self.header['signal_channels']['stream_id']
326322
global_channel_indexes, = np.nonzero(mask == stream_id)
327323
if channel_indexes is None:
328324
channel_indexes = slice(None)
329325
global_channel_indexes = global_channel_indexes[channel_indexes]
330326

331-
sigs_chunk = np.zeros((i_stop - i_start, len(global_channel_indexes)), dtype='int16')
332-
for i, global_chan_index in enumerate(global_channel_indexes):
333-
data = self._sigs_memmap[seg_index][global_chan_index]
334-
sub = data[block_start:block_stop]
335-
sigs_chunk[:, i] = sub['samples'].flatten()[sl0:sl1]
327+
if not self._gap_mode:
328+
sigs_chunk = np.zeros((i_stop - i_start, len(global_channel_indexes)), dtype='int16')
329+
# previous behavior block index are linear
330+
block_start = i_start // RECORD_SIZE
331+
block_stop = i_stop // RECORD_SIZE + 1
332+
sl0 = i_start % RECORD_SIZE
333+
sl1 = sl0 + (i_stop - i_start)
334+
335+
for i, global_chan_index in enumerate(global_channel_indexes):
336+
data = self._sigs_memmap[seg_index][global_chan_index]
337+
sub = data[block_start:block_stop]
338+
sigs_chunk[:, i] = sub['samples'].flatten()[sl0:sl1]
339+
else:
340+
sigs_chunk = np.full(shape=(i_stop - i_start, len(global_channel_indexes)),
341+
fill_value=self.fill_gap_value,
342+
dtype='int16')
343+
# slow mode
344+
for i, global_chan_index in enumerate(global_channel_indexes):
345+
data = self._sigs_memmap[seg_index][global_chan_index]
346+
timestamp0 = data[0]['timestamp']
347+
348+
# find first block
349+
block0 = np.searchsorted(data['timestamp'], timestamp0 + i_start, side='right') - 1
350+
block0_pos = data[block0]['timestamp'] - timestamp0
351+
352+
if i_start - block0_pos > RECORD_SIZE:
353+
# the block has gap!!
354+
pos = - ((i_start - block0_pos) % RECORD_SIZE)
355+
block_index = block0 + 1
356+
else:
357+
# the first block do not have gaps
358+
shift0 = i_start - block0_pos
359+
360+
if shift0 + (i_stop - i_start) < RECORD_SIZE:
361+
# protect when only one small block
362+
pos = (i_stop - i_start)
363+
sigs_chunk[:, i][:pos] = data[block0]['samples'][shift0:shift0 + pos]
364+
else:
365+
366+
pos = RECORD_SIZE - shift0
367+
sigs_chunk[:, i][:pos] = data[block0]['samples'][shift0:]
368+
block_index = block0 + 1
369+
370+
# full block
371+
while block_index < data.size and data[block_index]['timestamp'] - timestamp0 < i_stop - RECORD_SIZE:
372+
diff = data[block_index]['timestamp'] - data[block_index - 1]['timestamp']
373+
if diff > RECORD_SIZE:
374+
# gap detected need jump
375+
pos += diff - RECORD_SIZE
376+
377+
sigs_chunk[:, i][pos:pos + RECORD_SIZE] = data[block_index]['samples'][:]
378+
pos += RECORD_SIZE
379+
block_index += 1
380+
381+
# last block
382+
if pos < i_stop - i_start:
383+
diff = data[block_index]['timestamp'] - data[block_index - 1]['timestamp']
384+
if diff == RECORD_SIZE:
385+
# ensure no gaps for last block
386+
sigs_chunk[:, i][pos:] = data[block_index]['samples'][:i_stop - i_start - pos]
336387

337388
return sigs_chunk
338389

@@ -524,9 +575,12 @@ def explore_folder(dirname):
524575
chan_ids_by_type[chan_type] = [chan_id]
525576
filenames_by_type[chan_type] = [continuous_filename]
526577
chan_types = list(chan_ids_by_type.keys())
527-
if chan_types[0] == 'ADC':
528-
# put ADC at last position
529-
chan_types = chan_types[1:] + chan_types[0:1]
578+
579+
if 'CH' in chan_types:
580+
# force CH at beginning
581+
chan_types.remove('CH')
582+
chan_types = ['CH'] + chan_types
583+
530584
ordered_continuous_filenames = []
531585
for chan_type in chan_types:
532586
local_order = np.argsort(chan_ids_by_type[chan_type])

neo/test/rawiotest/rawio_compliance.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ def read_analogsignals(reader):
243243
chunks.append(raw_chunk)
244244
i_start += chunksize
245245
chunk_raw_sigs = np.concatenate(chunks, axis=0)
246+
246247
np.testing.assert_array_equal(ref_raw_sigs, chunk_raw_sigs)
247248

248249

neo/test/rawiotest/test_openephysrawio.py

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,11 @@ class TestOpenEphysRawIO(BaseTestRawIO, unittest.TestCase, ):
1111
]
1212
entities_to_test = [
1313
'openephys/OpenEphys_SampleData_1',
14-
# 'OpenEphys_SampleData_2_(multiple_starts)', # This not implemented this raise error
15-
# 'OpenEphys_SampleData_3',
14+
# this file has gaps and this is now handle corretly
15+
'openephys/OpenEphys_SampleData_2_(multiple_starts)',
16+
# 'openephys/OpenEphys_SampleData_3',
1617
]
1718

18-
def test_raise_error_if_discontinuous_files(self):
19-
# the case of discontinuous signals is NOT cover by the IO for the moment
20-
# It must raise an error
21-
reader = OpenEphysRawIO(dirname=self.get_local_path(
22-
'openephys/OpenEphys_SampleData_2_(multiple_starts)'))
23-
with self.assertRaises(ValueError):
24-
reader.parse_header()
25-
# if ignore_timestamps_errors=True, no exception is raised
26-
reader = OpenEphysRawIO(dirname=self.get_local_path(
27-
'openephys/OpenEphys_SampleData_2_(multiple_starts)'),
28-
ignore_timestamps_errors=True)
29-
reader.parse_header()
30-
3119

3220
def test_raise_error_if_strange_timestamps(self):
3321
# In this dataset CH32 have strange timestamps

0 commit comments

Comments
 (0)