From 8d8ef7e775bab5f14c935b3cb35dc6097cbab707 Mon Sep 17 00:00:00 2001 From: Clemens Brunner Date: Tue, 7 Oct 2025 10:37:33 +0200 Subject: [PATCH 1/6] Add BDF export --- mne/export/_bdf.py | 240 ++++++++++++++++++++++++++++++++ mne/export/_export.py | 28 ++-- mne/export/tests/test_export.py | 48 +++++++ pyproject.toml | 4 +- 4 files changed, 307 insertions(+), 13 deletions(-) create mode 100644 mne/export/_bdf.py diff --git a/mne/export/_bdf.py b/mne/export/_bdf.py new file mode 100644 index 00000000000..85e05e1685f --- /dev/null +++ b/mne/export/_bdf.py @@ -0,0 +1,240 @@ +# Authors: The MNE-Python contributors. +# License: BSD-3-Clause +# Copyright the MNE-Python contributors. + +import datetime as dt +from collections.abc import Callable + +import numpy as np + +from ..annotations import _sync_onset +from ..utils import _check_edfio_installed, warn + +_check_edfio_installed() +from edfio import Bdf, EdfAnnotation, BdfSignal, Patient, Recording # noqa: E402 + + +# copied from edfio (Apache license) +def _round_float_to_8_characters( + value: float, + round_func: Callable[[float], int], +) -> float: + if isinstance(value, int) or value.is_integer(): + return value + length = 8 + integer_part_length = str(value).find(".") + if integer_part_length == length: + return round_func(value) + factor = 10 ** (length - 1 - integer_part_length) + return round_func(value * factor) / factor + + +def _export_raw(fname, raw, physical_range, add_ch_type): + """Export Raw objects to BDF files. + + TODO: if in future the Info object supports transducer or technician information, + allow writing those here. + """ + # get voltage-based data in uV + units = dict( + eeg="uV", ecog="uV", seeg="uV", eog="uV", ecg="uV", emg="uV", bio="uV", dbs="uV" + ) + + digital_min, digital_max = -8388607, 8388607 + annotations = [] + + # load data first + raw.load_data() + + ch_types = np.array(raw.get_channel_types()) + n_times = raw.n_times + + # get the entire dataset in uV + data = raw.get_data(units=units) + + # Sampling frequency in BDF only supports integers, so to allow for float sampling + # rates from Raw, we adjust the output sampling rate for all channels and the data + # record duration. + sfreq = raw.info["sfreq"] + if float(sfreq).is_integer(): + out_sfreq = int(sfreq) + data_record_duration = None + # make non-integer second durations work + if (pad_width := int(np.ceil(n_times / sfreq) * sfreq - n_times)) > 0: + warn( + "BDF format requires equal-length data blocks, so " + f"{pad_width / sfreq:.3g} seconds of edge values were appended to all " + "channels when writing the final block." + ) + orig_shape = data.shape + data = np.pad( + data, + ( + (0, 0), + (0, int(pad_width)), + ), + "edge", + ) + assert data.shape[0] == orig_shape[0] + assert data.shape[1] > orig_shape[1] + + annotations.append( + EdfAnnotation( + raw.times[-1] + 1 / sfreq, pad_width / sfreq, "BAD_ACQ_SKIP" + ) + ) + else: + data_record_duration = _round_float_to_8_characters( + np.floor(sfreq) / sfreq, round + ) + out_sfreq = np.floor(sfreq) / data_record_duration + warn( + f"Data has a non-integer sampling rate of {sfreq}; writing to BDF format " + "may cause a small change to sample times." + ) + + # get any filter information applied to the data + lowpass = raw.info["lowpass"] + highpass = raw.info["highpass"] + linefreq = raw.info["line_freq"] + filter_str_info = f"HP:{highpass}Hz LP:{lowpass}Hz" + if linefreq is not None: + filter_str_info += " N:{linefreq}Hz" + + if physical_range == "auto": + # get max and min for each channel type data + ch_types_phys_max = dict() + ch_types_phys_min = dict() + + for _type in np.unique(ch_types): + _picks = [n for n, t in zip(raw.ch_names, ch_types) if t == _type] + _data = raw.get_data(units=units, picks=_picks) + ch_types_phys_max[_type] = _data.max() + ch_types_phys_min[_type] = _data.min() + elif physical_range == "channelwise": + prange = None + else: + # get the physical min and max of the data in uV + # Physical ranges of the data in uV are usually set by the manufacturer and + # electrode properties. In general, physical min and max should be the clipping + # levels of the ADC input, and they should be the same for all channels. For + # example, Nihon Kohden uses ±3200 uV for all EEG channels (corresponding to the + # actual clipping levels of their input amplifiers & ADC). For a discussion, + # see https://github.com/sccn/eeglab/issues/246 + pmin, pmax = physical_range[0], physical_range[1] + + # check that physical min and max is not exceeded + if data.max() > pmax: + warn( + f"The maximum μV of the data {data.max()} is more than the physical max" + f" passed in {pmax}." + ) + if data.min() < pmin: + warn( + f"The minimum μV of the data {data.min()} is less than the physical min" + f" passed in {pmin}." + ) + data = np.clip(data, pmin, pmax) + prange = pmin, pmax + signals = [] + for idx, ch in enumerate(raw.ch_names): + ch_type = ch_types[idx] + signal_label = f"{ch_type.upper()} {ch}" if add_ch_type else ch + if len(signal_label) > 16: + raise RuntimeError( + f"Signal label for {ch} ({ch_type}) is longer than 16 characters, which" + " is not supported by the BDF standard. Please shorten the channel name" + "before exporting to BDF." + ) + + if physical_range == "auto": # per channel type + pmin = ch_types_phys_min[ch_type] + pmax = ch_types_phys_max[ch_type] + if pmax == pmin: + pmax = pmin + 1 + prange = pmin, pmax + + signals.append( + BdfSignal( + data[idx], + out_sfreq, + label=signal_label, + transducer_type="", + physical_dimension="" if ch_type == "stim" else "uV", + physical_range=prange, + digital_range=(digital_min, digital_max), + prefiltering=filter_str_info, + ) + ) + + # set patient info + subj_info = raw.info.get("subject_info") + if subj_info is not None: + # get the full name of subject if available + first_name = subj_info.get("first_name", "") + middle_name = subj_info.get("middle_name", "") + last_name = subj_info.get("last_name", "") + name = "_".join(filter(None, [first_name, middle_name, last_name])) + + birthday = subj_info.get("birthday") + hand = subj_info.get("hand") + weight = subj_info.get("weight") + height = subj_info.get("height") + sex = subj_info.get("sex") + + additional_patient_info = [] + for key, value in [("height", height), ("weight", weight), ("hand", hand)]: + if value: + additional_patient_info.append(f"{key}={value}") + + patient = Patient( + code=subj_info.get("his_id") or "X", + sex={0: "X", 1: "M", 2: "F", None: "X"}[sex], + birthdate=birthday, + name=name or "X", + additional=additional_patient_info, + ) + else: + patient = None + + # set measurement date + if (meas_date := raw.info["meas_date"]) is not None: + startdate = dt.date(meas_date.year, meas_date.month, meas_date.day) + starttime = dt.time( + meas_date.hour, meas_date.minute, meas_date.second, meas_date.microsecond + ) + else: + startdate = None + starttime = None + + device_info = raw.info.get("device_info") + if device_info is not None: + device_type = device_info.get("type") or "X" + recording = Recording(startdate=startdate, equipment_code=device_type) + else: + recording = Recording(startdate=startdate) + + for desc, onset, duration, ch_names in zip( + raw.annotations.description, + # subtract raw.first_time because BDF marks events starting from the first + # available data point and ignores raw.first_time + _sync_onset(raw, raw.annotations.onset, inverse=False), + raw.annotations.duration, + raw.annotations.ch_names, + ): + if ch_names: + for ch_name in ch_names: + annotations.append( + EdfAnnotation(onset, duration, desc + f"@@{ch_name}") + ) + else: + annotations.append(EdfAnnotation(onset, duration, desc)) + + Bdf( + signals=signals, + patient=patient, + recording=recording, + starttime=starttime, + data_record_duration=data_record_duration, + annotations=annotations, + ).write(fname) diff --git a/mne/export/_export.py b/mne/export/_export.py index 4b93fda917e..3634a94e240 100644 --- a/mne/export/_export.py +++ b/mne/export/_export.py @@ -56,13 +56,14 @@ def export_raw( """ fname = str(_check_fname(fname, overwrite=overwrite)) supported_export_formats = { # format : (extensions,) - "eeglab": ("set",), - "edf": ("edf",), + "bdf": ("bdf",), "brainvision": ( "eeg", "vmrk", "vhdr", ), + "edf": ("edf",), + "eeglab": ("set",), } fmt = _infer_check_export_fmt(fmt, fname, supported_export_formats) @@ -73,18 +74,23 @@ def export_raw( "them before exporting with raw.apply_proj()." ) - if fmt == "eeglab": - from ._eeglab import _export_raw + match fmt: + case "eeglab": + from ._eeglab import _export_raw + + _export_raw(fname, raw) + case "edf": + from ._edf import _export_raw - _export_raw(fname, raw) - elif fmt == "edf": - from ._edf import _export_raw + _export_raw(fname, raw, physical_range, add_ch_type) + case "brainvision": + from ._brainvision import _export_raw - _export_raw(fname, raw, physical_range, add_ch_type) - elif fmt == "brainvision": - from ._brainvision import _export_raw + _export_raw(fname, raw, overwrite) + case "bdf": + from ._bdf import _export_raw - _export_raw(fname, raw, overwrite) + _export_raw(fname, raw, physical_range, add_ch_type) @verbose diff --git a/mne/export/tests/test_export.py b/mne/export/tests/test_export.py index 743491f26c9..f9146227d50 100644 --- a/mne/export/tests/test_export.py +++ b/mne/export/tests/test_export.py @@ -25,6 +25,7 @@ from mne.fixes import _compare_version from mne.io import ( RawArray, + read_raw_bdf, read_raw_brainvision, read_raw_edf, read_raw_eeglab, @@ -670,3 +671,50 @@ def test_export_evokeds_unsupported_format(fmt, ext): errstr = fmt.lower() if fmt != "auto" else "vhdr" with pytest.raises(ValueError, match=f"Format '{errstr}' is not .*"): export_evokeds(f"output.{ext}", evoked, fmt=fmt) + + +@edfio_mark() +@pytest.mark.parametrize( + ("input_path", "warning_msg"), + [ + (fname_raw, "Data has a non-integer"), + pytest.param( + misc_path / "ecog" / "sample_ecog_ieeg.fif", + "BDF format requires", + marks=[pytest.mark.slowtest, misc._pytest_mark()], + ), + ], +) +def test_export_raw_bdf(tmp_path, input_path, warning_msg): + """Test saving a Raw instance to BDF format.""" + raw = read_raw_fif(input_path) + + # only test with EEG channels + raw.pick(picks=["eeg", "ecog", "seeg"]).load_data() + temp_fname = tmp_path / "test.bdf" + + with pytest.warns(RuntimeWarning, match=warning_msg): + raw.export(temp_fname) + + if "epoc" in raw.ch_names: + raw.drop_channels(["epoc"]) + + raw_read = read_raw_bdf(temp_fname, preload=True) + assert raw.ch_names == raw_read.ch_names + # only compare the original length, since extra zeros are appended + orig_raw_len = len(raw) + + # assert data and times are not different + # Due to the physical range of the data, reading and writing is not lossless. For + # example, a physical min/max of -/+ 3200 uV will result in a resolution of 0.38 nV. + # This resolution is more than sufficient for EEG. + assert_array_almost_equal( + raw.get_data(), raw_read.get_data()[:, :orig_raw_len], decimal=11 + ) + + # Due to the data record duration limitations of BDF files, one cannot store + # arbitrary float sampling rate exactly. Usually this results in two sampling rates + # that are off by very low number of decimal points. This for practical purposes + # does not matter but will result in an error when say the number of time points is + # very very large. + assert_allclose(raw.times, raw_read.times[:orig_raw_len], rtol=0, atol=1e-5) diff --git a/pyproject.toml b/pyproject.toml index 71a28352184..453b8634148 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -92,7 +92,7 @@ full-no-qt = [ "darkdetect", "defusedxml", "dipy", - "edfio >= 0.2.1", + "edfio >= 0.4.10", "eeglabio", "filelock >= 3.18.0", "h5py", @@ -160,7 +160,7 @@ test = [ # Dependencies for being able to run additional tests (rare/CIs/advanced devs) # Changes here should be reflected in the mne/utils/config.py dev dependencies section test_extra = [ - "edfio >= 0.2.1", + "edfio >= 0.4.10", "eeglabio", "imageio >= 2.6.1", "imageio-ffmpeg >= 0.4.1", From 8cca0c03b13b74ff91e43d9c18f142b6ba22c691 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 7 Oct 2025 08:40:52 +0000 Subject: [PATCH 2/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- environment.yml | 2 +- mne/export/_bdf.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/environment.yml b/environment.yml index 4586093473c..8e7e083563b 100644 --- a/environment.yml +++ b/environment.yml @@ -9,7 +9,7 @@ dependencies: - decorator - defusedxml - dipy - - edfio >=0.2.1 + - edfio >=0.4.10 - eeglabio - filelock >=3.18.0 - h5io >=0.2.4 diff --git a/mne/export/_bdf.py b/mne/export/_bdf.py index 85e05e1685f..826a153988e 100644 --- a/mne/export/_bdf.py +++ b/mne/export/_bdf.py @@ -11,7 +11,7 @@ from ..utils import _check_edfio_installed, warn _check_edfio_installed() -from edfio import Bdf, EdfAnnotation, BdfSignal, Patient, Recording # noqa: E402 +from edfio import Bdf, BdfSignal, EdfAnnotation, Patient, Recording # noqa: E402 # copied from edfio (Apache license) From 1425dcbc0e6d11b9374d9994e6ea8941aeef4f3a Mon Sep 17 00:00:00 2001 From: Clemens Brunner Date: Tue, 7 Oct 2025 10:42:13 +0200 Subject: [PATCH 3/6] Add changelog entry --- doc/changes/dev/13435.newfeature.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 doc/changes/dev/13435.newfeature.rst diff --git a/doc/changes/dev/13435.newfeature.rst b/doc/changes/dev/13435.newfeature.rst new file mode 100644 index 00000000000..1aed60dc123 --- /dev/null +++ b/doc/changes/dev/13435.newfeature.rst @@ -0,0 +1 @@ +Add support for BDF export in :func:`mne.export.export_raw`, by `Clemens Brunner`_ \ No newline at end of file From 8d9013de68fb9d45e53708868cea729c0ff1d19a Mon Sep 17 00:00:00 2001 From: Clemens Brunner Date: Tue, 7 Oct 2025 16:19:57 +0200 Subject: [PATCH 4/6] Deduplicate --- mne/export/_edf.py | 240 ---------------------------- mne/export/{_bdf.py => _edf_bdf.py} | 146 +++++++++++------ mne/export/_export.py | 32 ++-- 3 files changed, 109 insertions(+), 309 deletions(-) delete mode 100644 mne/export/_edf.py rename mne/export/{_bdf.py => _edf_bdf.py} (65%) diff --git a/mne/export/_edf.py b/mne/export/_edf.py deleted file mode 100644 index d537d55868f..00000000000 --- a/mne/export/_edf.py +++ /dev/null @@ -1,240 +0,0 @@ -# Authors: The MNE-Python contributors. -# License: BSD-3-Clause -# Copyright the MNE-Python contributors. - -import datetime as dt -from collections.abc import Callable - -import numpy as np - -from ..annotations import _sync_onset -from ..utils import _check_edfio_installed, warn - -_check_edfio_installed() -from edfio import Edf, EdfAnnotation, EdfSignal, Patient, Recording # noqa: E402 - - -# copied from edfio (Apache license) -def _round_float_to_8_characters( - value: float, - round_func: Callable[[float], int], -) -> float: - if isinstance(value, int) or value.is_integer(): - return value - length = 8 - integer_part_length = str(value).find(".") - if integer_part_length == length: - return round_func(value) - factor = 10 ** (length - 1 - integer_part_length) - return round_func(value * factor) / factor - - -def _export_raw(fname, raw, physical_range, add_ch_type): - """Export Raw objects to EDF files. - - TODO: if in future the Info object supports transducer or technician information, - allow writing those here. - """ - # get voltage-based data in uV - units = dict( - eeg="uV", ecog="uV", seeg="uV", eog="uV", ecg="uV", emg="uV", bio="uV", dbs="uV" - ) - - digital_min, digital_max = -32767, 32767 - annotations = [] - - # load data first - raw.load_data() - - ch_types = np.array(raw.get_channel_types()) - n_times = raw.n_times - - # get the entire dataset in uV - data = raw.get_data(units=units) - - # Sampling frequency in EDF only supports integers, so to allow for float sampling - # rates from Raw, we adjust the output sampling rate for all channels and the data - # record duration. - sfreq = raw.info["sfreq"] - if float(sfreq).is_integer(): - out_sfreq = int(sfreq) - data_record_duration = None - # make non-integer second durations work - if (pad_width := int(np.ceil(n_times / sfreq) * sfreq - n_times)) > 0: - warn( - "EDF format requires equal-length data blocks, so " - f"{pad_width / sfreq:.3g} seconds of edge values were appended to all " - "channels when writing the final block." - ) - orig_shape = data.shape - data = np.pad( - data, - ( - (0, 0), - (0, int(pad_width)), - ), - "edge", - ) - assert data.shape[0] == orig_shape[0] - assert data.shape[1] > orig_shape[1] - - annotations.append( - EdfAnnotation( - raw.times[-1] + 1 / sfreq, pad_width / sfreq, "BAD_ACQ_SKIP" - ) - ) - else: - data_record_duration = _round_float_to_8_characters( - np.floor(sfreq) / sfreq, round - ) - out_sfreq = np.floor(sfreq) / data_record_duration - warn( - f"Data has a non-integer sampling rate of {sfreq}; writing to EDF format " - "may cause a small change to sample times." - ) - - # get any filter information applied to the data - lowpass = raw.info["lowpass"] - highpass = raw.info["highpass"] - linefreq = raw.info["line_freq"] - filter_str_info = f"HP:{highpass}Hz LP:{lowpass}Hz" - if linefreq is not None: - filter_str_info += " N:{linefreq}Hz" - - if physical_range == "auto": - # get max and min for each channel type data - ch_types_phys_max = dict() - ch_types_phys_min = dict() - - for _type in np.unique(ch_types): - _picks = [n for n, t in zip(raw.ch_names, ch_types) if t == _type] - _data = raw.get_data(units=units, picks=_picks) - ch_types_phys_max[_type] = _data.max() - ch_types_phys_min[_type] = _data.min() - elif physical_range == "channelwise": - prange = None - else: - # get the physical min and max of the data in uV - # Physical ranges of the data in uV are usually set by the manufacturer and - # electrode properties. In general, physical min and max should be the clipping - # levels of the ADC input, and they should be the same for all channels. For - # example, Nihon Kohden uses ±3200 uV for all EEG channels (corresponding to the - # actual clipping levels of their input amplifiers & ADC). For a discussion, - # see https://github.com/sccn/eeglab/issues/246 - pmin, pmax = physical_range[0], physical_range[1] - - # check that physical min and max is not exceeded - if data.max() > pmax: - warn( - f"The maximum μV of the data {data.max()} is more than the physical max" - f" passed in {pmax}." - ) - if data.min() < pmin: - warn( - f"The minimum μV of the data {data.min()} is less than the physical min" - f" passed in {pmin}." - ) - data = np.clip(data, pmin, pmax) - prange = pmin, pmax - signals = [] - for idx, ch in enumerate(raw.ch_names): - ch_type = ch_types[idx] - signal_label = f"{ch_type.upper()} {ch}" if add_ch_type else ch - if len(signal_label) > 16: - raise RuntimeError( - f"Signal label for {ch} ({ch_type}) is longer than 16 characters, which" - " is not supported by the EDF standard. Please shorten the channel name" - "before exporting to EDF." - ) - - if physical_range == "auto": # per channel type - pmin = ch_types_phys_min[ch_type] - pmax = ch_types_phys_max[ch_type] - if pmax == pmin: - pmax = pmin + 1 - prange = pmin, pmax - - signals.append( - EdfSignal( - data[idx], - out_sfreq, - label=signal_label, - transducer_type="", - physical_dimension="" if ch_type == "stim" else "uV", - physical_range=prange, - digital_range=(digital_min, digital_max), - prefiltering=filter_str_info, - ) - ) - - # set patient info - subj_info = raw.info.get("subject_info") - if subj_info is not None: - # get the full name of subject if available - first_name = subj_info.get("first_name", "") - middle_name = subj_info.get("middle_name", "") - last_name = subj_info.get("last_name", "") - name = "_".join(filter(None, [first_name, middle_name, last_name])) - - birthday = subj_info.get("birthday") - hand = subj_info.get("hand") - weight = subj_info.get("weight") - height = subj_info.get("height") - sex = subj_info.get("sex") - - additional_patient_info = [] - for key, value in [("height", height), ("weight", weight), ("hand", hand)]: - if value: - additional_patient_info.append(f"{key}={value}") - - patient = Patient( - code=subj_info.get("his_id") or "X", - sex={0: "X", 1: "M", 2: "F", None: "X"}[sex], - birthdate=birthday, - name=name or "X", - additional=additional_patient_info, - ) - else: - patient = None - - # set measurement date - if (meas_date := raw.info["meas_date"]) is not None: - startdate = dt.date(meas_date.year, meas_date.month, meas_date.day) - starttime = dt.time( - meas_date.hour, meas_date.minute, meas_date.second, meas_date.microsecond - ) - else: - startdate = None - starttime = None - - device_info = raw.info.get("device_info") - if device_info is not None: - device_type = device_info.get("type") or "X" - recording = Recording(startdate=startdate, equipment_code=device_type) - else: - recording = Recording(startdate=startdate) - - for desc, onset, duration, ch_names in zip( - raw.annotations.description, - # subtract raw.first_time because EDF marks events starting from the first - # available data point and ignores raw.first_time - _sync_onset(raw, raw.annotations.onset, inverse=False), - raw.annotations.duration, - raw.annotations.ch_names, - ): - if ch_names: - for ch_name in ch_names: - annotations.append( - EdfAnnotation(onset, duration, desc + f"@@{ch_name}") - ) - else: - annotations.append(EdfAnnotation(onset, duration, desc)) - - Edf( - signals=signals, - patient=patient, - recording=recording, - starttime=starttime, - data_record_duration=data_record_duration, - annotations=annotations, - ).write(fname) diff --git a/mne/export/_bdf.py b/mne/export/_edf_bdf.py similarity index 65% rename from mne/export/_bdf.py rename to mne/export/_edf_bdf.py index 826a153988e..af66e0bf551 100644 --- a/mne/export/_bdf.py +++ b/mne/export/_edf_bdf.py @@ -7,11 +7,19 @@ import numpy as np -from ..annotations import _sync_onset -from ..utils import _check_edfio_installed, warn +from mne.annotations import _sync_onset +from mne.utils import _check_edfio_installed, warn _check_edfio_installed() -from edfio import Bdf, BdfSignal, EdfAnnotation, Patient, Recording # noqa: E402 +from edfio import ( # noqa: E402 + Bdf, + BdfSignal, + Edf, + EdfAnnotation, + EdfSignal, + Patient, + Recording, +) # copied from edfio (Apache license) @@ -29,44 +37,61 @@ def _round_float_to_8_characters( return round_func(value * factor) / factor -def _export_raw(fname, raw, physical_range, add_ch_type): - """Export Raw objects to BDF files. +def _export_raw_edf_bdf(fname, raw, physical_range, add_ch_type, file_format): + """Export Raw objects to EDF/BDF files. + Parameters + ---------- + fname : str + Output file name. + raw : instance of Raw + The raw instance to export. + physical_range : str or tuple + Physical range setting. + add_ch_type : bool + Whether to add channel type to signal label. + file_format : str + File format ("EDF" or "BDF"). + + Notes + ----- TODO: if in future the Info object supports transducer or technician information, allow writing those here. """ - # get voltage-based data in uV units = dict( eeg="uV", ecog="uV", seeg="uV", eog="uV", ecg="uV", emg="uV", bio="uV", dbs="uV" ) - digital_min, digital_max = -8388607, 8388607 - annotations = [] - - # load data first - raw.load_data() + if file_format == "EDF": + digital_min, digital_max = -32767, 32767 # 16-bit + signal_class = EdfSignal + writer_class = Edf + else: # BDF + digital_min, digital_max = -8388607, 8388607 # 24-bit + signal_class = BdfSignal + writer_class = Bdf ch_types = np.array(raw.get_channel_types()) - n_times = raw.n_times - # get the entire dataset in uV + # load and prepare data + raw.load_data() data = raw.get_data(units=units) - - # Sampling frequency in BDF only supports integers, so to allow for float sampling - # rates from Raw, we adjust the output sampling rate for all channels and the data - # record duration. sfreq = raw.info["sfreq"] + pad_annotations = [] + + # Sampling frequency in EDF/BDF only supports integers, so to allow for float + # sampling rates from Raw, we adjust the output sampling rate for all channels and + # the data record duration. if float(sfreq).is_integer(): out_sfreq = int(sfreq) data_record_duration = None # make non-integer second durations work - if (pad_width := int(np.ceil(n_times / sfreq) * sfreq - n_times)) > 0: + if (pad_width := int(np.ceil(raw.n_times / sfreq) * sfreq - raw.n_times)) > 0: warn( - "BDF format requires equal-length data blocks, so " + f"{file_format} format requires equal-length data blocks, so " f"{pad_width / sfreq:.3g} seconds of edge values were appended to all " "channels when writing the final block." ) - orig_shape = data.shape data = np.pad( data, ( @@ -75,10 +100,8 @@ def _export_raw(fname, raw, physical_range, add_ch_type): ), "edge", ) - assert data.shape[0] == orig_shape[0] - assert data.shape[1] > orig_shape[1] - annotations.append( + pad_annotations.append( EdfAnnotation( raw.times[-1] + 1 / sfreq, pad_width / sfreq, "BAD_ACQ_SKIP" ) @@ -89,41 +112,38 @@ def _export_raw(fname, raw, physical_range, add_ch_type): ) out_sfreq = np.floor(sfreq) / data_record_duration warn( - f"Data has a non-integer sampling rate of {sfreq}; writing to BDF format " - "may cause a small change to sample times." + f"Data has a non-integer sampling rate of {sfreq}; writing to " + f"{file_format} format may cause a small change to sample times." ) - # get any filter information applied to the data + # extract filter information lowpass = raw.info["lowpass"] highpass = raw.info["highpass"] linefreq = raw.info["line_freq"] filter_str_info = f"HP:{highpass}Hz LP:{lowpass}Hz" if linefreq is not None: - filter_str_info += " N:{linefreq}Hz" - - if physical_range == "auto": - # get max and min for each channel type data - ch_types_phys_max = dict() - ch_types_phys_min = dict() + filter_str_info += f" N:{linefreq}Hz" + # compute physical range for channels and clip data if needed + if physical_range == "auto": # compute max and min for each channel type + range_per_channel = {} for _type in np.unique(ch_types): _picks = [n for n, t in zip(raw.ch_names, ch_types) if t == _type] _data = raw.get_data(units=units, picks=_picks) - ch_types_phys_max[_type] = _data.max() - ch_types_phys_min[_type] = _data.min() + range_per_channel[_type] = (_data.min(), _data.max()) + elif physical_range == "channelwise": - prange = None - else: - # get the physical min and max of the data in uV - # Physical ranges of the data in uV are usually set by the manufacturer and + range_per_channel = None + + else: # fixed physical range for all channels + # Physical ranges of the data in µV are usually set by the manufacturer and # electrode properties. In general, physical min and max should be the clipping # levels of the ADC input, and they should be the same for all channels. For - # example, Nihon Kohden uses ±3200 uV for all EEG channels (corresponding to the + # example, Nihon Kohden uses ±3200 µV for all EEG channels (corresponding to the # actual clipping levels of their input amplifiers & ADC). For a discussion, # see https://github.com/sccn/eeglab/issues/246 pmin, pmax = physical_range[0], physical_range[1] - # check that physical min and max is not exceeded if data.max() > pmax: warn( f"The maximum μV of the data {data.max()} is more than the physical max" @@ -135,7 +155,9 @@ def _export_raw(fname, raw, physical_range, add_ch_type): f" passed in {pmin}." ) data = np.clip(data, pmin, pmax) - prange = pmin, pmax + range_per_channel = (pmin, pmax) + + # create signals signals = [] for idx, ch in enumerate(raw.ch_names): ch_type = ch_types[idx] @@ -143,31 +165,34 @@ def _export_raw(fname, raw, physical_range, add_ch_type): if len(signal_label) > 16: raise RuntimeError( f"Signal label for {ch} ({ch_type}) is longer than 16 characters, which" - " is not supported by the BDF standard. Please shorten the channel name" - "before exporting to BDF." + f" is not supported by the {file_format} standard. Please shorten the " + f"channel name before exporting to {file_format}." ) - if physical_range == "auto": # per channel type - pmin = ch_types_phys_min[ch_type] - pmax = ch_types_phys_max[ch_type] + if isinstance(range_per_channel, dict): # per-channel-type ranges + pmin, pmax = range_per_channel[ch_type] if pmax == pmin: pmax = pmin + 1 - prange = pmin, pmax + ch_prange = pmin, pmax + elif isinstance(range_per_channel, tuple): # same range for all channels + ch_prange = range_per_channel + else: # None (computed per channel by edfio) + ch_prange = None signals.append( - BdfSignal( + signal_class( data[idx], out_sfreq, label=signal_label, transducer_type="", physical_dimension="" if ch_type == "stim" else "uV", - physical_range=prange, + physical_range=ch_prange, digital_range=(digital_min, digital_max), prefiltering=filter_str_info, ) ) - # set patient info + # create patient info subj_info = raw.info.get("subject_info") if subj_info is not None: # get the full name of subject if available @@ -197,7 +222,7 @@ def _export_raw(fname, raw, physical_range, add_ch_type): else: patient = None - # set measurement date + # create recording info if (meas_date := raw.info["meas_date"]) is not None: startdate = dt.date(meas_date.year, meas_date.month, meas_date.day) starttime = dt.time( @@ -214,9 +239,11 @@ def _export_raw(fname, raw, physical_range, add_ch_type): else: recording = Recording(startdate=startdate) + # create annotations + annotations = [] for desc, onset, duration, ch_names in zip( raw.annotations.description, - # subtract raw.first_time because BDF marks events starting from the first + # subtract raw.first_time because EDF/BDF marks events starting from the first # available data point and ignores raw.first_time _sync_onset(raw, raw.annotations.onset, inverse=False), raw.annotations.duration, @@ -230,7 +257,10 @@ def _export_raw(fname, raw, physical_range, add_ch_type): else: annotations.append(EdfAnnotation(onset, duration, desc)) - Bdf( + annotations.extend(pad_annotations) + + # write to file + writer_class( signals=signals, patient=patient, recording=recording, @@ -238,3 +268,13 @@ def _export_raw(fname, raw, physical_range, add_ch_type): data_record_duration=data_record_duration, annotations=annotations, ).write(fname) + + +def _export_raw_edf(fname, raw, physical_range, add_ch_type): + """Export Raw object to EDF.""" + _export_raw_edf_bdf(fname, raw, physical_range, add_ch_type, file_format="EDF") + + +def _export_raw_bdf(fname, raw, physical_range, add_ch_type): + """Export Raw object to BDF.""" + _export_raw_edf_bdf(fname, raw, physical_range, add_ch_type, file_format="BDF") diff --git a/mne/export/_export.py b/mne/export/_export.py index 3634a94e240..2842b747f21 100644 --- a/mne/export/_export.py +++ b/mne/export/_export.py @@ -2,10 +2,10 @@ # License: BSD-3-Clause # Copyright the MNE-Python contributors. -import os.path as op +import os -from ..utils import _check_fname, _validate_type, logger, verbose, warn -from ._egimff import export_evokeds_mff +from mne.export._egimff import export_evokeds_mff +from mne.utils import _check_fname, _validate_type, logger, verbose, warn @verbose @@ -75,22 +75,22 @@ def export_raw( ) match fmt: - case "eeglab": - from ._eeglab import _export_raw - - _export_raw(fname, raw) - case "edf": - from ._edf import _export_raw + case "bdf": + from mne.export._edf_bdf import _export_raw_bdf - _export_raw(fname, raw, physical_range, add_ch_type) + _export_raw_bdf(fname, raw, physical_range, add_ch_type) case "brainvision": - from ._brainvision import _export_raw + from mne.export._brainvision import _export_raw _export_raw(fname, raw, overwrite) - case "bdf": - from ._bdf import _export_raw + case "edf": + from mne.export._edf_bdf import _export_raw_edf - _export_raw(fname, raw, physical_range, add_ch_type) + _export_raw_edf(fname, raw, physical_range, add_ch_type) + case "eeglab": + from mne.export._eeglab import _export_raw + + _export_raw(fname, raw) @verbose @@ -133,7 +133,7 @@ def export_epochs(fname, epochs, fmt="auto", *, overwrite=False, verbose=None): ) if fmt == "eeglab": - from ._eeglab import _export_epochs + from mne.export._eeglab import _export_epochs _export_epochs(fname, epochs) @@ -210,7 +210,7 @@ def _infer_check_export_fmt(fmt, fname, supported_formats): _validate_type(fmt, str, "fmt") fmt = fmt.lower() if fmt == "auto": - fmt = op.splitext(fname)[1] + fmt = os.path.splitext(fname)[1] if fmt: fmt = fmt[1:].lower() # find fmt in supported formats dict's tuples From adb0ce20888ada37c4912a0e6aa3da81462d1ebf Mon Sep 17 00:00:00 2001 From: Clemens Brunner Date: Tue, 7 Oct 2025 16:26:56 +0200 Subject: [PATCH 5/6] Fix test collection --- mne/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mne/conftest.py b/mne/conftest.py index cc5eb89bba5..5bd3222527f 100644 --- a/mne/conftest.py +++ b/mne/conftest.py @@ -78,7 +78,7 @@ event_id, tmin, tmax = 1, -0.1, 1.0 vv_layout = read_layout("Vectorview-all") -collect_ignore = ["export/_brainvision.py", "export/_eeglab.py", "export/_edf.py"] +collect_ignore = ["export/_brainvision.py", "export/_eeglab.py", "export/_edf_bdf.py"] def pytest_configure(config: pytest.Config): From fc8d8be23bb2ee57c236af2f03c4f3fa7140b2e6 Mon Sep 17 00:00:00 2001 From: Clemens Brunner Date: Tue, 7 Oct 2025 16:36:28 +0200 Subject: [PATCH 6/6] Simplify physical range --- mne/export/_edf_bdf.py | 39 ++++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/mne/export/_edf_bdf.py b/mne/export/_edf_bdf.py index af66e0bf551..fa4f9ebcf64 100644 --- a/mne/export/_edf_bdf.py +++ b/mne/export/_edf_bdf.py @@ -124,26 +124,30 @@ def _export_raw_edf_bdf(fname, raw, physical_range, add_ch_type, file_format): if linefreq is not None: filter_str_info += f" N:{linefreq}Hz" - # compute physical range for channels and clip data if needed - if physical_range == "auto": # compute max and min for each channel type - range_per_channel = {} + # compute physical range + if physical_range == "auto": + # get max and min for each channel type data + ch_types_phys_max = dict() + ch_types_phys_min = dict() + for _type in np.unique(ch_types): _picks = [n for n, t in zip(raw.ch_names, ch_types) if t == _type] _data = raw.get_data(units=units, picks=_picks) - range_per_channel[_type] = (_data.min(), _data.max()) - + ch_types_phys_max[_type] = _data.max() + ch_types_phys_min[_type] = _data.min() elif physical_range == "channelwise": - range_per_channel = None - - else: # fixed physical range for all channels - # Physical ranges of the data in µV are usually set by the manufacturer and + prange = None + else: + # get the physical min and max of the data in uV + # Physical ranges of the data in uV are usually set by the manufacturer and # electrode properties. In general, physical min and max should be the clipping # levels of the ADC input, and they should be the same for all channels. For - # example, Nihon Kohden uses ±3200 µV for all EEG channels (corresponding to the + # example, Nihon Kohden uses ±3200 uV for all EEG channels (corresponding to the # actual clipping levels of their input amplifiers & ADC). For a discussion, # see https://github.com/sccn/eeglab/issues/246 pmin, pmax = physical_range[0], physical_range[1] + # check that physical min and max is not exceeded if data.max() > pmax: warn( f"The maximum μV of the data {data.max()} is more than the physical max" @@ -155,7 +159,7 @@ def _export_raw_edf_bdf(fname, raw, physical_range, add_ch_type, file_format): f" passed in {pmin}." ) data = np.clip(data, pmin, pmax) - range_per_channel = (pmin, pmax) + prange = pmin, pmax # create signals signals = [] @@ -169,15 +173,12 @@ def _export_raw_edf_bdf(fname, raw, physical_range, add_ch_type, file_format): f"channel name before exporting to {file_format}." ) - if isinstance(range_per_channel, dict): # per-channel-type ranges - pmin, pmax = range_per_channel[ch_type] + if physical_range == "auto": # per channel type + pmin = ch_types_phys_min[ch_type] + pmax = ch_types_phys_max[ch_type] if pmax == pmin: pmax = pmin + 1 - ch_prange = pmin, pmax - elif isinstance(range_per_channel, tuple): # same range for all channels - ch_prange = range_per_channel - else: # None (computed per channel by edfio) - ch_prange = None + prange = pmin, pmax signals.append( signal_class( @@ -186,7 +187,7 @@ def _export_raw_edf_bdf(fname, raw, physical_range, add_ch_type, file_format): label=signal_label, transducer_type="", physical_dimension="" if ch_type == "stim" else "uV", - physical_range=ch_prange, + physical_range=prange, digital_range=(digital_min, digital_max), prefiltering=filter_str_info, )