@@ -53,25 +53,20 @@ class OpenEphysRawIO(BaseRawIO):
5353 Limitation :
5454 * Works only if all continuous channels have the same sampling rate, which is a reasonable
5555 hypothesis.
56- * When the recording is stopped and restarted all continuous files will contain gaps.
57- Ideally this would lead to a new Segment but this use case is not implemented due to its
58- complexity.
59- Instead it will raise an error.
60-
61- Special cases:
62- * Normally all continuous files have the same first timestamp and length. In situations
63- where it is not the case all files are clipped to the smallest one so that they are all
64- aligned,
65- and a warning is emitted.
56+ * A recording can contain gaps due to USB stream loss when high CPU load when recording.
57+ Theses gaps are checked channel per channel which make the parse_header() slow.
58+ I gaps are detected then they are filled with zeros but the the reading will be much slower for getting signals.
59+
6660 """
6761 # file formats used by openephys
6862 extensions = ['continuous' , 'openephys' , 'spikes' , 'events' , 'xml' ]
6963 rawmode = 'one-dir'
7064
71- def __init__ (self , dirname = '' , ignore_timestamps_errors = False ):
65+ def __init__ (self , dirname = '' , ignore_timestamps_errors = None ):
7266 BaseRawIO .__init__ (self )
7367 self .dirname = dirname
74- self ._ignore_timestamps_errors = ignore_timestamps_errors
68+ if ignore_timestamps_errors is not None :
69+ self .logger .warning ("OpenEphysRawIO ignore_timestamps_errors=True/False is not used anymore" )
7570
7671 def _source_name (self ):
7772 return self .dirname
@@ -84,10 +79,13 @@ def _parse_header(self):
8479 self ._sigs_memmap = {}
8580 self ._sig_length = {}
8681 self ._sig_timestamp0 = {}
82+ self ._sig_has_gap = {}
83+ self ._gap_mode = False
8784 signal_channels = []
8885 oe_indices = sorted (list (info ['continuous' ].keys ()))
8986 for seg_index , oe_index in enumerate (oe_indices ):
9087 self ._sigs_memmap [seg_index ] = {}
88+ self ._sig_has_gap [seg_index ] = {}
9189
9290 all_sigs_length = []
9391 all_first_timestamps = []
@@ -109,18 +107,26 @@ def _parse_header(self):
109107 dtype = continuous_dtype , shape = (size , ))
110108 self ._sigs_memmap [seg_index ][chan_index ] = data_chan
111109
112- all_sigs_length .append (data_chan .size * RECORD_SIZE )
110+ # print(data_chan)
111+
112+ # import matplotlib.pyplot as plt
113+ # fig, ax = plt.subplots()
114+ # ax.plot(data_chan['timestamp'])
115+ # plt.show()
116+
117+ # all_sigs_length.append(data_chan.size * RECORD_SIZE)
113118 all_first_timestamps .append (data_chan [0 ]['timestamp' ])
114- all_last_timestamps .append (data_chan [- 1 ]['timestamp' ])
119+ all_last_timestamps .append (data_chan [- 1 ]['timestamp' ] + RECORD_SIZE )
115120 all_samplerate .append (chan_info ['sampleRate' ])
116121
117122 # check for continuity (no gaps)
118123 diff = np .diff (data_chan ['timestamp' ])
119- if not np .all (diff == RECORD_SIZE ) and not self ._ignore_timestamps_errors :
120- raise ValueError (
121- 'Not continuous timestamps for {}. ' \
122- 'Maybe because recording was paused/stopped.' .format (continuous_filename )
123- )
124+ self ._sig_has_gap [seg_index ][chan_index ] = not np .all (diff == RECORD_SIZE )
125+ # if not np.all(diff == RECORD_SIZE) and not self._ignore_timestamps_errors:
126+ # raise ValueError(
127+ # 'Not continuous timestamps for {}. ' \
128+ # 'Maybe because recording was paused/stopped.'.format(continuous_filename)
129+ # )
124130
125131 if seg_index == 0 :
126132 # add in channel list
@@ -130,46 +136,39 @@ def _parse_header(self):
130136 units = 'V'
131137 signal_channels .append ((ch_name , chan_id , chan_info ['sampleRate' ],
132138 'int16' , units , chan_info ['bitVolts' ], 0. , processor_id ))
133-
134- # In some cases, continuous do not have the same length because
135- # one record block is missing when the "OE GUI is freezing"
136- # So we need to clip to the smallest files
137- if not all (all_sigs_length [0 ] == e for e in all_sigs_length ) or \
138- not all (all_first_timestamps [0 ] == e for e in all_first_timestamps ):
139-
139+
140+ if any (self ._sig_has_gap [seg_index ].values ()):
141+ channel_with_gapes = list (self ._sig_has_gap [seg_index ].keys ())
142+ self .logger .warning (f"This OpenEphys dataset contains gaps for some channels { channel_with_gapes } in segment { seg_index } the read will be slow" )
143+ self ._gap_mode = True
144+
145+
146+ if not all (all_first_timestamps [0 ] == e for e in all_first_timestamps ) or \
147+ not all (all_last_timestamps [0 ] == e for e in all_last_timestamps ):
148+ # In some cases, continuous do not have the same length because
149+ # we need to clip
140150 self .logger .warning ('Continuous files do not have aligned timestamps; '
141151 'clipping to make them aligned.' )
142152
143- first , last = - np .inf , np .inf
153+ first = max (all_first_timestamps )
154+ last = max (all_last_timestamps )
144155 for chan_index in self ._sigs_memmap [seg_index ]:
145156 data_chan = self ._sigs_memmap [seg_index ][chan_index ]
146- if data_chan [0 ]['timestamp' ] > first :
147- first = data_chan [0 ]['timestamp' ]
148- if data_chan [- 1 ]['timestamp' ] < last :
149- last = data_chan [- 1 ]['timestamp' ]
150-
151- all_sigs_length = []
152- all_first_timestamps = []
153- all_last_timestamps = []
154- for chan_index in self ._sigs_memmap [seg_index ]:
155- data_chan = self ._sigs_memmap [seg_index ][chan_index ]
156- keep = (data_chan ['timestamp' ] >= first ) & (data_chan ['timestamp' ] <= last )
157+ keep = (data_chan ['timestamp' ] >= first ) & (data_chan ['timestamp' ] < last )
157158 data_chan = data_chan [keep ]
158159 self ._sigs_memmap [seg_index ][chan_index ] = data_chan
159- all_sigs_length .append (data_chan .size * RECORD_SIZE )
160- all_first_timestamps .append (data_chan [0 ]['timestamp' ])
161- all_last_timestamps .append (data_chan [- 1 ]['timestamp' ])
162-
163- # check that all signals have the same length and timestamp0 for this segment
164- assert all (all_sigs_length [0 ] == e for e in all_sigs_length ),\
165- 'Not all signals have the same length'
166- assert all (all_first_timestamps [0 ] == e for e in all_first_timestamps ),\
167- 'Not all signals have the same first timestamp'
160+ else :
161+ # no clip
162+ first = all_first_timestamps [0 ]
163+ last = all_last_timestamps [0 ]
164+
165+
166+ # check unique sampling rate
168167 assert all (all_samplerate [0 ] == e for e in all_samplerate ),\
169168 'Not all signals have the same sample rate'
170169
171- self ._sig_length [seg_index ] = all_sigs_length [ 0 ]
172- self ._sig_timestamp0 [seg_index ] = all_first_timestamps [ 0 ]
170+ self ._sig_length [seg_index ] = last - first
171+ self ._sig_timestamp0 [seg_index ] = first
173172
174173 if len (signal_channels ) > 0 :
175174 signal_channels = np .array (signal_channels , dtype = _signal_channel_dtype )
@@ -316,11 +315,6 @@ def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop,
316315 if i_stop is None :
317316 i_stop = self ._sig_length [seg_index ]
318317
319- block_start = i_start // RECORD_SIZE
320- block_stop = i_stop // RECORD_SIZE + 1
321- sl0 = i_start % RECORD_SIZE
322- sl1 = sl0 + (i_stop - i_start )
323-
324318 stream_id = self .header ['signal_streams' ][stream_index ]['id' ]
325319 mask = self .header ['signal_channels' ]['stream_id' ]
326320 global_channel_indexes , = np .nonzero (mask == stream_id )
@@ -329,10 +323,45 @@ def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop,
329323 global_channel_indexes = global_channel_indexes [channel_indexes ]
330324
331325 sigs_chunk = np .zeros ((i_stop - i_start , len (global_channel_indexes )), dtype = 'int16' )
332- for i , global_chan_index in enumerate (global_channel_indexes ):
333- data = self ._sigs_memmap [seg_index ][global_chan_index ]
334- sub = data [block_start :block_stop ]
335- sigs_chunk [:, i ] = sub ['samples' ].flatten ()[sl0 :sl1 ]
326+
327+ if not self ._gap_mode :
328+ # previous behavior block index are linear
329+ block_start = i_start // RECORD_SIZE
330+ block_stop = i_stop // RECORD_SIZE + 1
331+ sl0 = i_start % RECORD_SIZE
332+ sl1 = sl0 + (i_stop - i_start )
333+
334+ for i , global_chan_index in enumerate (global_channel_indexes ):
335+ data = self ._sigs_memmap [seg_index ][global_chan_index ]
336+ sub = data [block_start :block_stop ]
337+ sigs_chunk [:, i ] = sub ['samples' ].flatten ()[sl0 :sl1 ]
338+ else :
339+ # slow mode
340+ for i , global_chan_index in enumerate (global_channel_indexes ):
341+ data = self ._sigs_memmap [seg_index ][global_chan_index ]
342+ t0 = data [0 ]['timestamp' ]
343+
344+ # find first block
345+ block0 = np .searchsorted (data ['timestamp' ], t0 + i_start , side = 'right' ) - 1
346+ shift0 = i_start + t0 - data [block0 ]['timestamp' ]
347+ pos = RECORD_SIZE - shift0
348+ sigs_chunk [:, i ][:pos ] = data [block0 ]['samples' ][shift0 :]
349+
350+ # full block
351+ block_index = block0 + 1
352+ while data [block_index ]['timestamp' ] - t0 < i_stop - RECORD_SIZE :
353+ diff = data [block_index ]['timestamp' ] - data [block_index - 1 ]['timestamp' ]
354+ if diff > RECORD_SIZE :
355+ # gap detected need jump
356+ pos += diff - RECORD_SIZE
357+
358+ sigs_chunk [:, i ][pos :pos + RECORD_SIZE ] = data [block_index ]['samples' ][:]
359+ pos += RECORD_SIZE
360+ block_index += 1
361+
362+ # last block
363+ if pos < i_stop - i_start :
364+ sigs_chunk [:, i ][pos :] = data [block_index ]['samples' ][:i_stop - i_start - pos ]
336365
337366 return sigs_chunk
338367
@@ -524,9 +553,12 @@ def explore_folder(dirname):
524553 chan_ids_by_type [chan_type ] = [chan_id ]
525554 filenames_by_type [chan_type ] = [continuous_filename ]
526555 chan_types = list (chan_ids_by_type .keys ())
527- if chan_types [0 ] == 'ADC' :
528- # put ADC at last position
529- chan_types = chan_types [1 :] + chan_types [0 :1 ]
556+
557+ if 'CH' in chan_types :
558+ # force CH at beginning
559+ chan_types .remove ('CH' )
560+ chan_types = ['CH' ] + chan_types
561+
530562 ordered_continuous_filenames = []
531563 for chan_type in chan_types :
532564 local_order = np .argsort (chan_ids_by_type [chan_type ])
0 commit comments