|
| 1 | +""" |
| 2 | +Class for reading data from a 3-brain Biocam system. |
| 3 | +
|
| 4 | +See: |
| 5 | +https://www.3brain.com/products/single-well/biocam-x |
| 6 | +
|
| 7 | +Author : Alessio Buccino |
| 8 | +""" |
| 9 | + |
| 10 | +from .baserawio import (BaseRawIO, _signal_channel_dtype, _signal_stream_dtype, |
| 11 | + _spike_channel_dtype, _event_channel_dtype) |
| 12 | + |
| 13 | +import numpy as np |
| 14 | + |
| 15 | +try: |
| 16 | + import h5py |
| 17 | + HAVE_H5PY = True |
| 18 | +except ImportError: |
| 19 | + HAVE_H5PY = False |
| 20 | + |
| 21 | + |
| 22 | +class BiocamRawIO(BaseRawIO): |
| 23 | + """ |
| 24 | + Class for reading data from a Biocam h5 file. |
| 25 | +
|
| 26 | + Usage: |
| 27 | + >>> import neo.rawio |
| 28 | + >>> r = neo.rawio.BiocamRawIO(filename='biocam.h5') |
| 29 | + >>> r.parse_header() |
| 30 | + >>> print(r) |
| 31 | + >>> raw_chunk = r.get_analogsignal_chunk(block_index=0, seg_index=0, |
| 32 | + i_start=0, i_stop=1024, |
| 33 | + channel_names=channel_names) |
| 34 | + >>> float_chunk = r.rescale_signal_raw_to_float(raw_chunk, dtype='float64', |
| 35 | + channel_indexes=[0, 3, 6]) |
| 36 | + """ |
| 37 | + extensions = ['h5'] |
| 38 | + rawmode = 'one-file' |
| 39 | + |
| 40 | + def __init__(self, filename=''): |
| 41 | + BaseRawIO.__init__(self) |
| 42 | + self.filename = filename |
| 43 | + |
| 44 | + def _source_name(self): |
| 45 | + return self.filename |
| 46 | + |
| 47 | + def _parse_header(self): |
| 48 | + assert HAVE_H5PY, 'h5py is not installed' |
| 49 | + self._header_dict = open_biocam_file_header(self.filename) |
| 50 | + self._num_channels = self._header_dict["num_channels"] |
| 51 | + self._num_frames = self._header_dict["num_frames"] |
| 52 | + self._sampling_rate = self._header_dict["sampling_rate"] |
| 53 | + self._filehandle = self._header_dict["file_handle"] |
| 54 | + self._read_function = self._header_dict["read_function"] |
| 55 | + self._channels = self._header_dict["channels"] |
| 56 | + gain = self._header_dict["gain"] |
| 57 | + offset = self._header_dict["offset"] |
| 58 | + |
| 59 | + signal_streams = np.array([('Signals', '0')], dtype=_signal_stream_dtype) |
| 60 | + |
| 61 | + sig_channels = [] |
| 62 | + for c, chan in enumerate(self._channels): |
| 63 | + ch_name = f'ch{chan[0]}-{chan[1]}' |
| 64 | + chan_id = str(c + 1) |
| 65 | + sr = self._sampling_rate # Hz |
| 66 | + dtype = "uint16" |
| 67 | + units = 'uV' |
| 68 | + gain = gain |
| 69 | + offset = offset |
| 70 | + stream_id = '0' |
| 71 | + sig_channels.append((ch_name, chan_id, sr, dtype, units, gain, offset, stream_id)) |
| 72 | + sig_channels = np.array(sig_channels, dtype=_signal_channel_dtype) |
| 73 | + |
| 74 | + # No events |
| 75 | + event_channels = [] |
| 76 | + event_channels = np.array(event_channels, dtype=_event_channel_dtype) |
| 77 | + |
| 78 | + # No spikes |
| 79 | + spike_channels = [] |
| 80 | + spike_channels = np.array(spike_channels, dtype=_spike_channel_dtype) |
| 81 | + |
| 82 | + self.header = {} |
| 83 | + self.header['nb_block'] = 1 |
| 84 | + self.header['nb_segment'] = [1] |
| 85 | + self.header['signal_streams'] = signal_streams |
| 86 | + self.header['signal_channels'] = sig_channels |
| 87 | + self.header['spike_channels'] = spike_channels |
| 88 | + self.header['event_channels'] = event_channels |
| 89 | + |
| 90 | + self._generate_minimal_annotations() |
| 91 | + |
| 92 | + def _segment_t_start(self, block_index, seg_index): |
| 93 | + all_starts = [[0.]] |
| 94 | + return all_starts[block_index][seg_index] |
| 95 | + |
| 96 | + def _segment_t_stop(self, block_index, seg_index): |
| 97 | + t_stop = self._num_frames / self._sampling_rate |
| 98 | + all_stops = [[t_stop]] |
| 99 | + return all_stops[block_index][seg_index] |
| 100 | + |
| 101 | + def _get_signal_size(self, block_index, seg_index, stream_index): |
| 102 | + assert stream_index == 0 |
| 103 | + return self._num_frames |
| 104 | + |
| 105 | + def _get_signal_t_start(self, block_index, seg_index, stream_index): |
| 106 | + assert stream_index == 0 |
| 107 | + return self._segment_t_start(block_index, seg_index) |
| 108 | + |
| 109 | + def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, |
| 110 | + stream_index, channel_indexes): |
| 111 | + if i_start is None: |
| 112 | + i_start = 0 |
| 113 | + if i_stop is None: |
| 114 | + i_stop = self._num_frames |
| 115 | + |
| 116 | + data = self._read_function(self._filehandle, i_start, i_stop, self._num_channels) |
| 117 | + return np.squeeze(data[:, channel_indexes]) |
| 118 | + |
| 119 | + |
| 120 | +def open_biocam_file_header(filename): |
| 121 | + """Open a Biocam hdf5 file, read and return the recording info, pick te correct method to access raw data, |
| 122 | + and return this to the caller.""" |
| 123 | + assert HAVE_H5PY, 'h5py is not installed' |
| 124 | + |
| 125 | + rf = h5py.File(filename, 'r') |
| 126 | + # Read recording variables |
| 127 | + rec_vars = rf.require_group('3BRecInfo/3BRecVars/') |
| 128 | + bit_depth = rec_vars['BitDepth'][0] |
| 129 | + max_uv = rec_vars['MaxVolt'][0] |
| 130 | + min_uv = rec_vars['MinVolt'][0] |
| 131 | + n_frames = rec_vars['NRecFrames'][0] |
| 132 | + sampling_rate = rec_vars['SamplingRate'][0] |
| 133 | + signal_inv = rec_vars['SignalInversion'][0] |
| 134 | + |
| 135 | + # Get the actual number of channels used in the recording |
| 136 | + file_format = rf['3BData'].attrs.get('Version', None) |
| 137 | + format_100 = False |
| 138 | + if file_format == 100: |
| 139 | + n_channels = len(rf['3BData/Raw'][0]) |
| 140 | + format_100 = True |
| 141 | + elif file_format in (101, 102) or file_format is None: |
| 142 | + n_channels = int(rf['3BData/Raw'].shape[0] / n_frames) |
| 143 | + else: |
| 144 | + raise Exception('Unknown data file format.') |
| 145 | + |
| 146 | + # # get channels |
| 147 | + channels = rf['3BRecInfo/3BMeaStreams/Raw/Chs'][:] |
| 148 | + |
| 149 | + # determine correct function to read data |
| 150 | + if format_100: |
| 151 | + if signal_inv == 1: |
| 152 | + read_function = readHDF5t_100 |
| 153 | + elif signal_inv == 1: |
| 154 | + read_function = readHDF5t_100_i |
| 155 | + else: |
| 156 | + raise Exception("Unknown signal inversion") |
| 157 | + else: |
| 158 | + if signal_inv == 1: |
| 159 | + read_function = readHDF5t_101 |
| 160 | + elif signal_inv == 1: |
| 161 | + read_function = readHDF5t_101_i |
| 162 | + else: |
| 163 | + raise Exception("Unknown signal inversion") |
| 164 | + |
| 165 | + gain = (max_uv - min_uv) / (2 ** bit_depth) |
| 166 | + offset = min_uv |
| 167 | + |
| 168 | + return dict(file_handle=rf, num_frames=n_frames, sampling_rate=sampling_rate, num_channels=n_channels, |
| 169 | + channels=channels, file_format=file_format, signal_inv=signal_inv, |
| 170 | + read_function=read_function, gain=gain, offset=offset) |
| 171 | + |
| 172 | + |
| 173 | +def readHDF5t_100(rf, t0, t1, nch): |
| 174 | + return rf['3BData/Raw'][t0:t1] |
| 175 | + |
| 176 | + |
| 177 | +def readHDF5t_100_i(rf, t0, t1, nch): |
| 178 | + return 4096 - rf['3BData/Raw'][t0:t1] |
| 179 | + |
| 180 | + |
| 181 | +def readHDF5t_101(rf, t0, t1, nch): |
| 182 | + return rf['3BData/Raw'][nch * t0:nch * t1].reshape((t1 - t0, nch), order='C') |
| 183 | + |
| 184 | + |
| 185 | +def readHDF5t_101_i(rf, t0, t1, nch): |
| 186 | + return 4096 - rf['3BData/Raw'][nch * t0:nch * t1].reshape((t1 - t0, nch), order='C') |
0 commit comments