Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/dev/13548.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug with reading large CNT files by `Teon Brooks`_.
26 changes: 15 additions & 11 deletions mne/io/cnt/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@

from ...utils import warn

_NCHANNELS_OFFSET = 370
_NSAMPLES_OFFSET = 864
_EVENTTABLEPOS_OFFSET = 886
_DATA_OFFSET = 900 # Size of the 'SETUP' header.
_CH_SIZE = 75 # Size of each channel in bytes


def _read_teeg(f, teeg_offset):
"""
Expand Down Expand Up @@ -116,15 +122,11 @@ def _compute_robust_event_table_position(fid, data_format="int32"):
Otherwise, the address of the table position is computed from:
n_samples, n_channels, and the bytes size.
"""
SETUP_NCHANNELS_OFFSET = 370
SETUP_NSAMPLES_OFFSET = 864
SETUP_EVENTTABLEPOS_OFFSET = 886

fid_origin = fid.tell() # save the state

if fid.seek(0, SEEK_END) < 2e9:
fid.seek(SETUP_EVENTTABLEPOS_OFFSET)
(event_table_pos,) = np.frombuffer(fid.read(4), dtype="<i4")
fid.seek(_EVENTTABLEPOS_OFFSET)
event_table_pos = int(np.frombuffer(fid.read(4), dtype="<i4").item())

else:
if data_format == "auto":
Expand All @@ -136,14 +138,16 @@ def _compute_robust_event_table_position(fid, data_format="int32"):

n_bytes = 2 if data_format == "int16" else 4

fid.seek(SETUP_NSAMPLES_OFFSET)
(n_samples,) = np.frombuffer(fid.read(4), dtype="<i4")
fid.seek(_NSAMPLES_OFFSET)
n_samples = int(np.frombuffer(fid.read(4), dtype="<u4").item())
assert n_samples > 0

fid.seek(SETUP_NCHANNELS_OFFSET)
(n_channels,) = np.frombuffer(fid.read(2), dtype="<u2")
fid.seek(_NCHANNELS_OFFSET)
n_channels = int(np.frombuffer(fid.read(2), dtype="<u2").item())
assert n_channels > 0

event_table_pos = (
900 + 75 * int(n_channels) + n_bytes * int(n_channels) * int(n_samples)
_DATA_OFFSET + _CH_SIZE * n_channels + n_bytes * n_channels * n_samples
)

fid.seek(fid_origin) # restore the state
Expand Down
80 changes: 50 additions & 30 deletions mne/io/cnt/cnt.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.

from os import path

import numpy as np

from ..._fiff._digitization import _make_dig_points
Expand All @@ -14,9 +12,20 @@
from ..._fiff.utils import _create_chs, _find_channels, _mult_cal_one, read_str
from ...annotations import Annotations
from ...channels.layout import _topo_to_sphere
from ...utils import _check_option, _explain_exception, _validate_type, fill_doc, warn
from ...utils import (
_check_fname,
_check_option,
_explain_exception,
_validate_type,
fill_doc,
warn,
)
from ..base import BaseRaw
from ._utils import (
_CH_SIZE,
_DATA_OFFSET,
_NCHANNELS_OFFSET,
_NSAMPLES_OFFSET,
CNTEventType3,
_compute_robust_event_table_position,
_get_event_parser,
Expand Down Expand Up @@ -64,7 +73,7 @@ def _translating_function(offset, n_channels, event_type, data_format=data_forma
n_bytes = 2 if data_format == "int16" else 4
if event_type == CNTEventType3:
offset *= n_bytes * n_channels
event_time = offset - 900 - (75 * n_channels)
event_time = offset - _DATA_OFFSET - (_CH_SIZE * n_channels)
event_time //= n_channels * n_bytes
event_time = event_time - 1
# Prevent negative event times
Expand Down Expand Up @@ -276,7 +285,6 @@ def read_raw_cnt(

def _get_cnt_info(input_fname, eog, ecg, emg, misc, data_format, date_format, header):
"""Read the cnt header."""
data_offset = 900 # Size of the 'SETUP' header.
cnt_info = dict()
# Reading only the fields of interest. Structure of the whole header at
# http://paulbourke.net/dataformats/eeg/
Expand Down Expand Up @@ -309,43 +317,51 @@ def _get_cnt_info(input_fname, eog, ecg, emg, misc, data_format, date_format, he
session_date = f"{read_str(fid, 10)} {read_str(fid, 12)}"
meas_date = _session_date_2_meas_date(session_date, date_format)

fid.seek(370)
n_channels = np.fromfile(fid, dtype="<u2", count=1).item()
fid.seek(_NCHANNELS_OFFSET)
n_channels = np.fromfile(fid, dtype="<u2", count=1).astype(int).item()
fid.seek(376)
sfreq = np.fromfile(fid, dtype="<u2", count=1).item()
sfreq = np.fromfile(fid, dtype="<u2", count=1).astype(float).item()
if eog == "header":
fid.seek(402)
eog = [idx for idx in np.fromfile(fid, dtype="i2", count=2) if idx >= 0]
fid.seek(438)
lowpass_toggle = np.fromfile(fid, "i1", count=1).item()
highpass_toggle = np.fromfile(fid, "i1", count=1).item()
lowpass_toggle = bool(np.fromfile(fid, "i1", count=1).item())
highpass_toggle = bool(np.fromfile(fid, "i1", count=1).item())

# Reference: https://paulbourke.net/dataformats/eeg/
# Header has a field for number of samples, but it does not seem to be
# too reliable. That's why we have option for setting n_bytes manually.
fid.seek(864)
n_samples = np.fromfile(fid, dtype="<u4", count=1).item()
# According to link above, the number of samples should be
# calculated as follows:
# nsamples = SETUP.EventTablePos - (900 + 75 * nchannels) / (2 * nchannels)
# where 2 likely refers to the data format with default 2 bytes.
fid.seek(_NSAMPLES_OFFSET)
n_samples = np.fromfile(fid, dtype="<i4", count=1).astype(int).item()
n_samples_header = n_samples
fid.seek(869)
lowcutoff = np.fromfile(fid, dtype="f4", count=1).item()
lowcutoff = float(np.fromfile(fid, dtype="f4", count=1).item())
fid.seek(2, 1)
highcutoff = np.fromfile(fid, dtype="f4", count=1).item()

highcutoff = float(np.fromfile(fid, dtype="f4", count=1).item())
# _EVENTTABLEPOS_OFFSET = 886
event_offset = _compute_robust_event_table_position(
fid=fid, data_format=data_format
)
fid.seek(890)
cnt_info["continuous_seconds"] = np.fromfile(fid, dtype="<f4", count=1).item()
cnt_info["continuous_seconds"] = float(
np.fromfile(fid, dtype="<f4", count=1).item()
)

if event_offset < data_offset: # no events
if event_offset < _DATA_OFFSET: # no events
data_size = n_samples * n_channels
else:
data_size = event_offset - (data_offset + 75 * n_channels)
data_size = event_offset - (_DATA_OFFSET + _CH_SIZE * n_channels)

_check_option("data_format", data_format, ["auto", "int16", "int32"])
if data_format == "auto":
if n_samples == 0 or data_size // (n_samples * n_channels) not in [2, 4]:
warn(
"Could not define the number of bytes automatically. "
f"Could not define the number of bytes automatically ({data_size=} "
f"but {n_samples=} and {n_channels=}). "
"Defaulting to 2."
)
n_bytes = 2
Expand Down Expand Up @@ -382,37 +398,37 @@ def _get_cnt_info(input_fname, eog, ecg, emg, misc, data_format, date_format, he
_validate_type(header, str, "header")
_check_option("header", header, ("auto", "new", "old"))
for ch_idx in range(n_channels): # ELECTLOC fields
fid.seek(data_offset + 75 * ch_idx)
fid.seek(_DATA_OFFSET + _CH_SIZE * ch_idx)
ch_name = read_str(fid, 10)
ch_names.append(ch_name)

# Some files have bad channels marked differently in the header.
if header in ("new", "auto"):
fid.seek(data_offset + 75 * ch_idx + 14)
fid.seek(_DATA_OFFSET + _CH_SIZE * ch_idx + 14)
if np.fromfile(fid, dtype="u1", count=1).item():
bads.append(ch_name)
if header in ("old", "auto"):
fid.seek(data_offset + 75 * ch_idx + 4)
fid.seek(_DATA_OFFSET + _CH_SIZE * ch_idx + 4)
if np.fromfile(fid, dtype="u1", count=1).item():
bads.append(ch_name)

fid.seek(data_offset + 75 * ch_idx + 19)
fid.seek(_DATA_OFFSET + _CH_SIZE * ch_idx + 19)
xy = np.fromfile(fid, dtype="f4", count=2)
xy[1] *= -1 # invert y-axis
pos.append(xy)
fid.seek(data_offset + 75 * ch_idx + 47)
fid.seek(_DATA_OFFSET + _CH_SIZE * ch_idx + 47)
# Baselines are subtracted before scaling the data.
baselines.append(np.fromfile(fid, dtype="i2", count=1).item())
fid.seek(data_offset + 75 * ch_idx + 59)
fid.seek(_DATA_OFFSET + _CH_SIZE * ch_idx + 59)
sensitivity = np.fromfile(fid, dtype="f4", count=1).item()
fid.seek(data_offset + 75 * ch_idx + 71)
fid.seek(_DATA_OFFSET + _CH_SIZE * ch_idx + 71)
cal = np.fromfile(fid, dtype="f4", count=1).item()
cals.append(cal * sensitivity * 1e-6 / 204.8)

info = _empty_info(sfreq)
if lowpass_toggle == 1:
if lowpass_toggle and highcutoff > 0:
info["lowpass"] = highcutoff
if highpass_toggle == 1:
if highpass_toggle and lowcutoff > 0:
info["highpass"] = lowcutoff
subject_info = {
"hand": hand,
Expand Down Expand Up @@ -540,7 +556,9 @@ def __init__(
else:
_date_format = "%m/%d/%y %H:%M:%S"

input_fname = path.abspath(input_fname)
input_fname = _check_fname(
input_fname, overwrite="read", must_exist=True, name="input_fname"
)
try:
info, cnt_info = _get_cnt_info(
input_fname, eog, ecg, emg, misc, data_format, _date_format, header
Expand Down Expand Up @@ -594,7 +612,9 @@ def _read_segment_file(self, data, idx, fi, start, stop, cals, mult):
block_size = min(data_left, block_size)
s_offset = start % channel_offset
with open(self.filenames[fi], "rb", buffering=0) as fid:
fid.seek(900 + f_channels * (75 + (start - s_offset) * n_bytes))
fid.seek(
_DATA_OFFSET + f_channels * (_CH_SIZE + (start - s_offset) * n_bytes)
)
for sample_start in np.arange(0, data_left, block_size) // f_channels:
# Earlier comment says n_samples is unreliable, but I think it
# is because it needed to be changed to unsigned int
Expand Down
Loading