diff --git a/doc/changes/dev/13548.bugfix.rst b/doc/changes/dev/13548.bugfix.rst new file mode 100644 index 00000000000..01c0d2eafcc --- /dev/null +++ b/doc/changes/dev/13548.bugfix.rst @@ -0,0 +1 @@ +Fix bug with reading large CNT files by `Teon Brooks`_. diff --git a/mne/io/cnt/_utils.py b/mne/io/cnt/_utils.py index cf2d45cb1ef..28dd1471445 100644 --- a/mne/io/cnt/_utils.py +++ b/mne/io/cnt/_utils.py @@ -12,6 +12,12 @@ from ...utils import warn +_NCHANNELS_OFFSET = 370 +_NSAMPLES_OFFSET = 864 +_EVENTTABLEPOS_OFFSET = 886 +_DATA_OFFSET = 900 # Size of the 'SETUP' header. +_CH_SIZE = 75 # Size of each channel in bytes + def _read_teeg(f, teeg_offset): """ @@ -116,15 +122,11 @@ def _compute_robust_event_table_position(fid, data_format="int32"): Otherwise, the address of the table position is computed from: n_samples, n_channels, and the bytes size. """ - SETUP_NCHANNELS_OFFSET = 370 - SETUP_NSAMPLES_OFFSET = 864 - SETUP_EVENTTABLEPOS_OFFSET = 886 - fid_origin = fid.tell() # save the state if fid.seek(0, SEEK_END) < 2e9: - fid.seek(SETUP_EVENTTABLEPOS_OFFSET) - (event_table_pos,) = np.frombuffer(fid.read(4), dtype=" 0 - fid.seek(SETUP_NCHANNELS_OFFSET) - (n_channels,) = np.frombuffer(fid.read(2), dtype=" 0 event_table_pos = ( - 900 + 75 * int(n_channels) + n_bytes * int(n_channels) * int(n_samples) + _DATA_OFFSET + _CH_SIZE * n_channels + n_bytes * n_channels * n_samples ) fid.seek(fid_origin) # restore the state diff --git a/mne/io/cnt/cnt.py b/mne/io/cnt/cnt.py index 196a87564d1..ba30146e341 100644 --- a/mne/io/cnt/cnt.py +++ b/mne/io/cnt/cnt.py @@ -4,8 +4,6 @@ # License: BSD-3-Clause # Copyright the MNE-Python contributors. -from os import path - import numpy as np from ..._fiff._digitization import _make_dig_points @@ -14,9 +12,20 @@ from ..._fiff.utils import _create_chs, _find_channels, _mult_cal_one, read_str from ...annotations import Annotations from ...channels.layout import _topo_to_sphere -from ...utils import _check_option, _explain_exception, _validate_type, fill_doc, warn +from ...utils import ( + _check_fname, + _check_option, + _explain_exception, + _validate_type, + fill_doc, + warn, +) from ..base import BaseRaw from ._utils import ( + _CH_SIZE, + _DATA_OFFSET, + _NCHANNELS_OFFSET, + _NSAMPLES_OFFSET, CNTEventType3, _compute_robust_event_table_position, _get_event_parser, @@ -64,7 +73,7 @@ def _translating_function(offset, n_channels, event_type, data_format=data_forma n_bytes = 2 if data_format == "int16" else 4 if event_type == CNTEventType3: offset *= n_bytes * n_channels - event_time = offset - 900 - (75 * n_channels) + event_time = offset - _DATA_OFFSET - (_CH_SIZE * n_channels) event_time //= n_channels * n_bytes event_time = event_time - 1 # Prevent negative event times @@ -276,7 +285,6 @@ def read_raw_cnt( def _get_cnt_info(input_fname, eog, ecg, emg, misc, data_format, date_format, header): """Read the cnt header.""" - data_offset = 900 # Size of the 'SETUP' header. cnt_info = dict() # Reading only the fields of interest. Structure of the whole header at # http://paulbourke.net/dataformats/eeg/ @@ -309,43 +317,51 @@ def _get_cnt_info(input_fname, eog, ecg, emg, misc, data_format, date_format, he session_date = f"{read_str(fid, 10)} {read_str(fid, 12)}" meas_date = _session_date_2_meas_date(session_date, date_format) - fid.seek(370) - n_channels = np.fromfile(fid, dtype="= 0] fid.seek(438) - lowpass_toggle = np.fromfile(fid, "i1", count=1).item() - highpass_toggle = np.fromfile(fid, "i1", count=1).item() + lowpass_toggle = bool(np.fromfile(fid, "i1", count=1).item()) + highpass_toggle = bool(np.fromfile(fid, "i1", count=1).item()) + # Reference: https://paulbourke.net/dataformats/eeg/ # Header has a field for number of samples, but it does not seem to be # too reliable. That's why we have option for setting n_bytes manually. - fid.seek(864) - n_samples = np.fromfile(fid, dtype=" 0: info["lowpass"] = highcutoff - if highpass_toggle == 1: + if highpass_toggle and lowcutoff > 0: info["highpass"] = lowcutoff subject_info = { "hand": hand, @@ -540,7 +556,9 @@ def __init__( else: _date_format = "%m/%d/%y %H:%M:%S" - input_fname = path.abspath(input_fname) + input_fname = _check_fname( + input_fname, overwrite="read", must_exist=True, name="input_fname" + ) try: info, cnt_info = _get_cnt_info( input_fname, eog, ecg, emg, misc, data_format, _date_format, header @@ -594,7 +612,9 @@ def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): block_size = min(data_left, block_size) s_offset = start % channel_offset with open(self.filenames[fi], "rb", buffering=0) as fid: - fid.seek(900 + f_channels * (75 + (start - s_offset) * n_bytes)) + fid.seek( + _DATA_OFFSET + f_channels * (_CH_SIZE + (start - s_offset) * n_bytes) + ) for sample_start in np.arange(0, data_left, block_size) // f_channels: # Earlier comment says n_samples is unreliable, but I think it # is because it needed to be changed to unsigned int