From 3ee2db8ffadf1836a09541df8488d00ecea0e0c4 Mon Sep 17 00:00:00 2001 From: Tamas Fehervari <58502181+zEdS15B3GCwq@users.noreply.github.com> Date: Wed, 27 Aug 2025 11:09:11 +0900 Subject: [PATCH 1/6] ENH: Support arbitrary number of wavelengths (>=2) in NIRS/SNIRF processing --- mne/io/snirf/_snirf.py | 7 +- mne/preprocessing/nirs/_beer_lambert_law.py | 97 ++++++++++++---- .../nirs/_scalp_coupling_index.py | 51 +++++++-- mne/preprocessing/nirs/nirs.py | 107 +++++++++++------- 4 files changed, 185 insertions(+), 77 deletions(-) diff --git a/mne/io/snirf/_snirf.py b/mne/io/snirf/_snirf.py index 55f3c54605c..cfb9c8f71e2 100644 --- a/mne/io/snirf/_snirf.py +++ b/mne/io/snirf/_snirf.py @@ -148,14 +148,13 @@ def __init__( # Extract wavelengths fnirs_wavelengths = np.array(dat.get("nirs/probe/wavelengths")) fnirs_wavelengths = [int(w) for w in fnirs_wavelengths] - if len(fnirs_wavelengths) != 2: + if len(fnirs_wavelengths) < 2: raise RuntimeError( f"The data contains " f"{len(fnirs_wavelengths)}" f" wavelengths: {fnirs_wavelengths}. " - f"MNE only supports reading continuous" - " wave amplitude SNIRF files " - "with two wavelengths." + f"MNE requires at least two wavelengths for " + "continuous wave amplitude SNIRF files." ) # Extract channels diff --git a/mne/preprocessing/nirs/_beer_lambert_law.py b/mne/preprocessing/nirs/_beer_lambert_law.py index c17cf31110c..ec5d20842fd 100644 --- a/mne/preprocessing/nirs/_beer_lambert_law.py +++ b/mne/preprocessing/nirs/_beer_lambert_law.py @@ -36,23 +36,32 @@ def beer_lambert_law(raw, ppf=6.0): _validate_type(raw, BaseRaw, "raw") _validate_type(ppf, ("numeric", "array-like"), "ppf") ppf = np.array(ppf, float) - if ppf.ndim == 0: # upcast single float to shape (2,) - ppf = np.array([ppf, ppf]) - if ppf.shape != (2,): - raise ValueError( - f"ppf must be float or array-like of shape (2,), got shape {ppf.shape}" - ) - ppf = ppf[:, np.newaxis] # shape (2, 1) picks = _validate_nirs_info(raw.info, fnirs="od", which="Beer-lambert") # This is the one place we *really* need the actual/accurate frequencies freqs = np.array([raw.info["chs"][pick]["loc"][9] for pick in picks], float) - abs_coef = _load_absorption(freqs) + + # Get unique wavelengths and determine number of wavelengths + unique_freqs = np.unique(freqs) + n_wavelengths = len(unique_freqs) + + # PPF validation for multiple wavelengths + if ppf.ndim == 0: # single float + ppf = np.full((n_wavelengths, 1), ppf) # same PPF for all wavelengths, shape (n_wavelengths, 1) + elif ppf.ndim == 1 and len(ppf) == n_wavelengths: # separate ppf for each wavelength + ppf = ppf[:, np.newaxis] # shape (n_wavelengths, 1) + else: + raise ValueError( + f"ppf must be a single float or an array-like of length {n_wavelengths} " + f"(number of wavelengths), got shape {ppf.shape}" + ) + + abs_coef = _load_absorption(unique_freqs) # shape (n_wavelengths, 2) distances = source_detector_distances(raw.info, picks="all") bad = ~np.isfinite(distances[picks]) bad |= distances[picks] <= 0 if bad.any(): warn( - "Source-detector distances are zero on NaN, some resulting " + "Source-detector distances are zero or NaN, some resulting " "concentrations will be zero. Consider setting a montage " "with raw.set_montage." ) @@ -64,20 +73,43 @@ def beer_lambert_law(raw, ppf=6.0): "likely due to optode locations being stored in a " " unit other than meters." ) + rename = dict() - for ii, jj in zip(picks[::2], picks[1::2]): - EL = abs_coef * distances[ii] * ppf - iEL = pinv(EL) + channels_to_drop_all = [] # Accumulate all channels to drop + + # Iterate over channel groups ([Si_Di all wavelengths, Sj_Dj all wavelengths, ...]) + pick_groups = zip(*[iter(picks)] * n_wavelengths) + for group_picks in pick_groups: - raw._data[[ii, jj]] = iEL @ raw._data[[ii, jj]] * 1e-3 + # Calculate Δc based on the system: ΔOD = E * L * PPF * Δc + # where E is (n_wavelengths, 2), Δc is (2, n_timepoints) + # using pseudo-inverse + EL = abs_coef * distances[group_picks[0]] * ppf + iEL = pinv(EL) # Pseudo-inverse for numerical stability + conc_data = iEL @ raw._data[group_picks] * 1e-3 + + # Replace the first two channels with HbO and HbR + raw._data[group_picks[:2]] = conc_data[:2] # HbO, HbR # Update channel information coil_dict = dict(hbo=FIFF.FIFFV_COIL_FNIRS_HBO, hbr=FIFF.FIFFV_COIL_FNIRS_HBR) - for ki, kind in zip((ii, jj), ("hbo", "hbr")): + for ki, kind in zip(group_picks[:2], ("hbo", "hbr")): ch = raw.info["chs"][ki] ch.update(coil_type=coil_dict[kind], unit=FIFF.FIFF_UNIT_MOL) new_name = f"{ch['ch_name'].split(' ')[0]} {kind}" rename[ch["ch_name"]] = new_name + + # Accumulate extra wavelength channels to drop (keep only HbO and HbR) + if n_wavelengths > 2: + channels_to_drop = group_picks[2:] + channel_names_to_drop = [raw.ch_names[idx] for idx in channels_to_drop] + channels_to_drop_all.extend(channel_names_to_drop) + + # Drop all accumulated extra wavelength channels after processing all groups + # This preserves channel indexing during the loop iterations + if channels_to_drop_all: + raw.drop_channels(channels_to_drop_all) + raw.rename_channels(rename) # Validate the format of data after transformation is valid @@ -86,16 +118,31 @@ def beer_lambert_law(raw, ppf=6.0): def _load_absorption(freqs): - """Load molar extinction coefficients.""" + """Load molar extinction coefficients + + Parameters + ---------- + freqs : array-like + Array of wavelengths (frequencies) in nm. + + Returns + ------- + abs_coef : array, shape (n_wavelengths, 2) + Absorption coefficients for HbO2 and Hb at each wavelength. + abs_coef[:, 0] contains HbO2 coefficients + abs_coef[:, 1] contains Hb coefficients + + E.g. [[HbO2(freq1)], [Hb(freq1)], + [HbO2(freq2)], [Hb(freq2)], + ..., + [HbO2(freqN)], [Hb(freqN)]] + """ # Data from https://omlc.org/spectra/hemoglobin/summary.html # The text was copied to a text file. The text before and # after the table was deleted. The the following was run in # matlab # extinct_coef=importdata('extinction_coef.txt') # save('extinction_coef.mat', 'extinct_coef') - # - # Returns data as [[HbO2(freq1), Hb(freq1)], - # [HbO2(freq2), Hb(freq2)]] extinction_fname = op.join( op.dirname(__file__), "..", "..", "data", "extinction_coef.mat" ) @@ -104,12 +151,12 @@ def _load_absorption(freqs): interp_hbo = interp1d(a[:, 0], a[:, 1], kind="linear") interp_hb = interp1d(a[:, 0], a[:, 2], kind="linear") - ext_coef = np.array( - [ - [interp_hbo(freqs[0]), interp_hb(freqs[0])], - [interp_hbo(freqs[1]), interp_hb(freqs[1])], - ] - ) - abs_coef = ext_coef * 0.2303 + # Build coefficient matrix for all wavelengths + # Shape: (n_wavelengths, 2) where columns are [HbO2, Hb] + ext_coef = np.zeros((len(freqs), 2)) + for i, freq in enumerate(freqs): + ext_coef[i, 0] = interp_hbo(freq) # HbO2 + ext_coef[i, 1] = interp_hb(freq) # Hb + abs_coef = ext_coef * 0.2303 return abs_coef diff --git a/mne/preprocessing/nirs/_scalp_coupling_index.py b/mne/preprocessing/nirs/_scalp_coupling_index.py index 5a82664dfd1..dec933a1014 100644 --- a/mne/preprocessing/nirs/_scalp_coupling_index.py +++ b/mne/preprocessing/nirs/_scalp_coupling_index.py @@ -56,14 +56,51 @@ def scalp_coupling_index( verbose=verbose, ).get_data() + # Determine number of wavelengths per source-detector pair + ch_wavelengths = [c["loc"][9] for c in raw.info["chs"]] + n_wavelengths = len(set(ch_wavelengths)) + + # freqs = np.array([raw.info["chs"][pick]["loc"][9] for pick in picks], float) + # n_wavelengths = len(set(unique_freqs)) + sci = np.zeros(picks.shape) - for ii in range(0, len(picks), 2): - with np.errstate(invalid="ignore"): - c = np.corrcoef(filtered_data[ii], filtered_data[ii + 1])[0][1] - if not np.isfinite(c): # someone had std=0 - c = 0 - sci[ii] = c - sci[ii + 1] = c + + if n_wavelengths == 2: + # Use pairwise correlation for 2 wavelengths (backward compatibility) + for ii in range(0, len(picks), 2): + with np.errstate(invalid="ignore"): + c = np.corrcoef(filtered_data[ii], filtered_data[ii + 1])[0][1] + if not np.isfinite(c): # someone had std=0 + c = 0 + sci[ii] = c + sci[ii + 1] = c + else: + # For multiple wavelengths: calculate all pairwise correlations within each group + # and use the minimum as the quality metric + + # Group picks by number of wavelengths + # Drops last incomplete group, but we're assuming valid data + pick_iter = iter(picks) + pick_groups = zip(*[pick_iter] * n_wavelengths) + + for group_picks in pick_groups: + group_data = filtered_data[group_picks] + + # Calculate pairwise correlations within the group + pair_indices = np.triu_indices(len(group_picks), k=1) + correlations = np.zeros(pair_indices[0].shape[0]) + + for n, (ii, jj) in enumerate(zip(*pair_indices)): + with np.errstate(invalid="ignore"): + c = np.corrcoef(group_data[ii], group_data[jj])[0][1] + correlations[n] = c if np.isfinite(c) else 0 + + # Use minimum correlation as the quality metric (most conservative) + group_sci = correlations.min() + + # Assign the same SCI value to all channels in the group + sci[group_picks] = group_sci + sci[zero_mask] = 0 sci = sci[np.argsort(picks)] # restore original order return sci diff --git a/mne/preprocessing/nirs/nirs.py b/mne/preprocessing/nirs/nirs.py index 94c7c78468c..b9dd8252c0c 100644 --- a/mne/preprocessing/nirs/nirs.py +++ b/mne/preprocessing/nirs/nirs.py @@ -2,6 +2,7 @@ # License: BSD-3-Clause # Copyright the MNE-Python contributors. +from calendar import c import re import numpy as np @@ -104,7 +105,7 @@ def _check_channels_ordered(info, pair_vals, *, throw_errors=True, check_bads=Tr # All chromophore fNIRS data picks_chroma = _picks_to_idx(info, ["hbo", "hbr"], exclude=[], allow_empty=True) - if (len(picks_wave) > 0) & (len(picks_chroma) > 0): + if (len(picks_wave) > 0) and (len(picks_chroma) > 0): picks = _throw_or_return_empty( "MNE does not support a combination of amplitude, optical " "density, and haemoglobin data in the same raw structure.", @@ -122,14 +123,14 @@ def _check_channels_ordered(info, pair_vals, *, throw_errors=True, check_bads=Tr picks = picks_chroma pair_vals = np.array(pair_vals) - if pair_vals.shape != (2,): + if pair_vals.shape[0] < 2: raise ValueError( - f"Exactly two {error_word} must exist in info, got {list(pair_vals)}" + f"At least two {error_word} must exist in info, got {list(pair_vals)}" ) # In principle we do not need to require that these be sorted -- # all we need to do is change our sorted() below to make use of a # pair_vals.index(...) in a sort key -- but in practice we always want - # (hbo, hbr) or (lower_freq, upper_freq) pairings, both of which will + # (hbo, hbr) or (lowest_freq, higher_freq, ...) pairings, both of which will # work with a naive string sort, so let's just enforce sorted-ness here is_str = pair_vals.dtype.kind == "U" pair_vals = list(pair_vals) @@ -145,16 +146,23 @@ def _check_channels_ordered(info, pair_vals, *, throw_errors=True, check_bads=Tr f"got {pair_vals} instead" ) - if len(picks) % 2 != 0: + # Check that the total number of channels is divisible by the number of pair values + # (e.g., for 2 wavelengths, we need even number of channels) + if len(picks) % len(pair_vals) !=0: picks = _throw_or_return_empty( - "NIRS channels not ordered correctly. An even number of NIRS " - f"channels is required. {len(info.ch_names)} channels were" - f"provided", + f"NIRS channels not ordered correctly. The number of channels " + f"must be a multiple of {len(pair_vals)} values, but " + f"{len(picks)} channels were provided.", throw_errors, ) - # Ensure wavelength info exists for waveform data all_freqs = [info["chs"][ii]["loc"][9] for ii in picks_wave] + if len(pair_vals) != len(set(all_freqs)): + picks = _throw_or_return_empty( + f"The {error_word} in info must match the number of wavelengths, " + f"but the data contains {len(set(all_freqs))} wavelengths instead.", + throw_errors, + ) if np.any(np.isnan(all_freqs)): picks = _throw_or_return_empty( f"NIRS channels is missing wavelength information in the " @@ -189,40 +197,42 @@ def _check_channels_ordered(info, pair_vals, *, throw_errors=True, check_bads=Tr # Reorder to be paired (naive sort okay here given validation above) picks = picks[np.argsort([info["ch_names"][pick] for pick in picks])] - # Validate our paired ordering - for ii, jj in zip(picks[::2], picks[1::2]): - ch1_name = info["chs"][ii]["ch_name"] - ch2_name = info["chs"][jj]["ch_name"] - ch1_re = use_RE.match(ch1_name) - ch2_re = use_RE.match(ch2_name) - ch1_S, ch1_D, ch1_value = ch1_re.groups()[:3] - ch2_S, ch2_D, ch2_value = ch2_re.groups()[:3] - if len(picks_wave): - ch1_value, ch2_value = float(ch1_value), float(ch2_value) - if ( - (ch1_S != ch2_S) - or (ch1_D != ch2_D) - or (ch1_value != pair_vals[0]) - or (ch2_value != pair_vals[1]) - ): + # Validate channel grouping (same source-detector pairs, all pair_vals match) + for i in range(0, len(picks), len(pair_vals)): + # Extract a group of channels (e.g., all wavelengths for one S-D pair) + group_picks = picks[i:i + len(pair_vals)] + + # Parse channel names using regex to extract source, detector, and value info + group_info = [(use_RE.match(info["ch_names"][pick]).groups() or (pick, 0, 0)) for pick in group_picks] + + # Separate the parsed components: source IDs, detector IDs, and values (freq/chromophore) + s_group, d_group, val_group = zip(*group_info) + + # For wavelength data, convert string frequencies to float for comparison + if len(picks_wave) > 0: + val_group = [float(v) for v in val_group] + + # Verify that all channels in this group have the same source-detector pair + # and that the values match the expected pair_vals sequence + if not (len(set(s_group))==1 and len(set(d_group))==1 and val_group == pair_vals): picks = _throw_or_return_empty( "NIRS channels not ordered correctly. Channels must be " - "ordered as source detector pairs with alternating" - f" {error_word} {pair_vals[0]} & {pair_vals[1]}, but got " - f"S{ch1_S}_D{ch1_D} pair " - f"{repr(ch1_name)} and {repr(ch2_name)}", + "grouped by source-detector pairs with alternating {error_word} " + f"values {pair_vals}, but got mismatching names {[info['ch_names'][pick] for pick in group_picks]}.", throw_errors, ) break if check_bads: - for ii, jj in zip(picks[::2], picks[1::2]): - want = [info.ch_names[ii], info.ch_names[jj]] + for i in range (0, len(picks), len(pair_vals)): + group_picks = picks[i:i + len(pair_vals)] + + want = [info.ch_names[pick] for pick in group_picks] got = list(set(info["bads"]).intersection(want)) - if len(got) == 1: + if 0 < len(got) < len(want): raise RuntimeError( - f"NIRS bad labelling is not consistent, found {got} but " - f"needed {want}" + "NIRS bad labelling is not consistent. " + f"Found {got} but needed {want}. " ) return picks @@ -276,14 +286,29 @@ def _fnirs_spread_bads(info): # as bad and spread the bad marking to all components of the optode pair. picks = _validate_nirs_info(info, check_bads=False) new_bads = set(info["bads"]) - for ii, jj in zip(picks[::2], picks[1::2]): - ch1_name, ch2_name = info.ch_names[ii], info.ch_names[jj] - if ch1_name in new_bads: - new_bads.add(ch2_name) - elif ch2_name in new_bads: - new_bads.add(ch1_name) - info["bads"] = sorted(new_bads) + # Extract SD pair groups from channel names + # E.g. all channels belonging to S1D1, S1D2, etc. + # Assumes valid channels (naming convention and number) + ch_names = [info.ch_names[i] for i in picks] + match = re.compile(r'^(S\d+_D\d+) ') + + # Create dict with keys corresponding to SD pairs + # Defaultdict would require another import + sd_groups = {} + for ch_name in ch_names: + sd_pair = match.match(ch_name).group(1) + if sd_pair not in sd_groups: + sd_groups[sd_pair] = [ch_name] + else: + sd_groups[sd_pair].append(ch_name) + + # Spread bad labeling across SD pairs + for channels in sd_groups.values(): + if any(channel in new_bads for channel in channels): + new_bads.update(channels) + + info["bads"] = sorted(new_bads) return info From 3e714265583af7b3f3347d2ee4302e49249733b1 Mon Sep 17 00:00:00 2001 From: Tamas Fehervari <58502181+zEdS15B3GCwq@users.noreply.github.com> Date: Thu, 28 Aug 2025 16:49:46 +0900 Subject: [PATCH 2/6] fixed minor issue in _scalp_couplying_index: correlation results were in np.zeros so there was need to write 0 to it when correlation was infinite. --- mne/preprocessing/nirs/_scalp_coupling_index.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mne/preprocessing/nirs/_scalp_coupling_index.py b/mne/preprocessing/nirs/_scalp_coupling_index.py index dec933a1014..e829ed94500 100644 --- a/mne/preprocessing/nirs/_scalp_coupling_index.py +++ b/mne/preprocessing/nirs/_scalp_coupling_index.py @@ -93,9 +93,10 @@ def scalp_coupling_index( for n, (ii, jj) in enumerate(zip(*pair_indices)): with np.errstate(invalid="ignore"): c = np.corrcoef(group_data[ii], group_data[jj])[0][1] - correlations[n] = c if np.isfinite(c) else 0 + if np.isfinite(c): + correlations[n] = c - # Use minimum correlation as the quality metric (most conservative) + # Use minimum correlation as the quality metric group_sci = correlations.min() # Assign the same SCI value to all channels in the group From 00fca679f9fa004356caadeb575595aeef553562 Mon Sep 17 00:00:00 2001 From: Tamas Fehervari <58502181+zEdS15B3GCwq@users.noreply.github.com> Date: Thu, 28 Aug 2025 19:38:38 +0900 Subject: [PATCH 3/6] removed rogue import calendar statement sneaked in by AI --- mne/preprocessing/nirs/nirs.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/mne/preprocessing/nirs/nirs.py b/mne/preprocessing/nirs/nirs.py index b9dd8252c0c..d95901df770 100644 --- a/mne/preprocessing/nirs/nirs.py +++ b/mne/preprocessing/nirs/nirs.py @@ -2,7 +2,6 @@ # License: BSD-3-Clause # Copyright the MNE-Python contributors. -from calendar import c import re import numpy as np @@ -148,7 +147,7 @@ def _check_channels_ordered(info, pair_vals, *, throw_errors=True, check_bads=Tr # Check that the total number of channels is divisible by the number of pair values # (e.g., for 2 wavelengths, we need even number of channels) - if len(picks) % len(pair_vals) !=0: + if len(picks) % len(pair_vals) != 0: picks = _throw_or_return_empty( f"NIRS channels not ordered correctly. The number of channels " f"must be a multiple of {len(pair_vals)} values, but " @@ -200,10 +199,13 @@ def _check_channels_ordered(info, pair_vals, *, throw_errors=True, check_bads=Tr # Validate channel grouping (same source-detector pairs, all pair_vals match) for i in range(0, len(picks), len(pair_vals)): # Extract a group of channels (e.g., all wavelengths for one S-D pair) - group_picks = picks[i:i + len(pair_vals)] + group_picks = picks[i : i + len(pair_vals)] # Parse channel names using regex to extract source, detector, and value info - group_info = [(use_RE.match(info["ch_names"][pick]).groups() or (pick, 0, 0)) for pick in group_picks] + group_info = [ + (use_RE.match(info["ch_names"][pick]).groups() or (pick, 0, 0)) + for pick in group_picks + ] # Separate the parsed components: source IDs, detector IDs, and values (freq/chromophore) s_group, d_group, val_group = zip(*group_info) @@ -214,7 +216,9 @@ def _check_channels_ordered(info, pair_vals, *, throw_errors=True, check_bads=Tr # Verify that all channels in this group have the same source-detector pair # and that the values match the expected pair_vals sequence - if not (len(set(s_group))==1 and len(set(d_group))==1 and val_group == pair_vals): + if not ( + len(set(s_group)) == 1 and len(set(d_group)) == 1 and val_group == pair_vals + ): picks = _throw_or_return_empty( "NIRS channels not ordered correctly. Channels must be " "grouped by source-detector pairs with alternating {error_word} " @@ -224,8 +228,8 @@ def _check_channels_ordered(info, pair_vals, *, throw_errors=True, check_bads=Tr break if check_bads: - for i in range (0, len(picks), len(pair_vals)): - group_picks = picks[i:i + len(pair_vals)] + for i in range(0, len(picks), len(pair_vals)): + group_picks = picks[i : i + len(pair_vals)] want = [info.ch_names[pick] for pick in group_picks] got = list(set(info["bads"]).intersection(want)) @@ -291,7 +295,7 @@ def _fnirs_spread_bads(info): # E.g. all channels belonging to S1D1, S1D2, etc. # Assumes valid channels (naming convention and number) ch_names = [info.ch_names[i] for i in picks] - match = re.compile(r'^(S\d+_D\d+) ') + match = re.compile(r"^(S\d+_D\d+) ") # Create dict with keys corresponding to SD pairs # Defaultdict would require another import From 2b39648d4cb29188a81b93f32be638d05e578ef1 Mon Sep 17 00:00:00 2001 From: Tamas Fehervari <58502181+zEdS15B3GCwq@users.noreply.github.com> Date: Thu, 28 Aug 2025 19:45:48 +0900 Subject: [PATCH 4/6] reverted doc string on _load_absorption, applied minor changes to comments to show correct return format --- mne/preprocessing/nirs/_beer_lambert_law.py | 35 ++++++++------------- 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/mne/preprocessing/nirs/_beer_lambert_law.py b/mne/preprocessing/nirs/_beer_lambert_law.py index ec5d20842fd..7082a2ee8cd 100644 --- a/mne/preprocessing/nirs/_beer_lambert_law.py +++ b/mne/preprocessing/nirs/_beer_lambert_law.py @@ -46,8 +46,12 @@ def beer_lambert_law(raw, ppf=6.0): # PPF validation for multiple wavelengths if ppf.ndim == 0: # single float - ppf = np.full((n_wavelengths, 1), ppf) # same PPF for all wavelengths, shape (n_wavelengths, 1) - elif ppf.ndim == 1 and len(ppf) == n_wavelengths: # separate ppf for each wavelength + ppf = np.full( + (n_wavelengths, 1), ppf + ) # same PPF for all wavelengths, shape (n_wavelengths, 1) + elif ( + ppf.ndim == 1 and len(ppf) == n_wavelengths + ): # separate ppf for each wavelength ppf = ppf[:, np.newaxis] # shape (n_wavelengths, 1) else: raise ValueError( @@ -118,31 +122,18 @@ def beer_lambert_law(raw, ppf=6.0): def _load_absorption(freqs): - """Load molar extinction coefficients - - Parameters - ---------- - freqs : array-like - Array of wavelengths (frequencies) in nm. - - Returns - ------- - abs_coef : array, shape (n_wavelengths, 2) - Absorption coefficients for HbO2 and Hb at each wavelength. - abs_coef[:, 0] contains HbO2 coefficients - abs_coef[:, 1] contains Hb coefficients - - E.g. [[HbO2(freq1)], [Hb(freq1)], - [HbO2(freq2)], [Hb(freq2)], - ..., - [HbO2(freqN)], [Hb(freqN)]] - """ + """Load molar extinction coefficients""" # Data from https://omlc.org/spectra/hemoglobin/summary.html # The text was copied to a text file. The text before and # after the table was deleted. The the following was run in # matlab # extinct_coef=importdata('extinction_coef.txt') # save('extinction_coef.mat', 'extinct_coef') + # + # Returns data as [[HbO2(freq1), Hb(freq1)], + # [HbO2(freq2), Hb(freq2)], + # ..., + # [HbO2(freqN), Hb(freqN)]] extinction_fname = op.join( op.dirname(__file__), "..", "..", "data", "extinction_coef.mat" ) @@ -156,7 +147,7 @@ def _load_absorption(freqs): ext_coef = np.zeros((len(freqs), 2)) for i, freq in enumerate(freqs): ext_coef[i, 0] = interp_hbo(freq) # HbO2 - ext_coef[i, 1] = interp_hb(freq) # Hb + ext_coef[i, 1] = interp_hb(freq) # Hb abs_coef = ext_coef * 0.2303 return abs_coef From aa285f46e53a6ea853311362aef1b6f7823f91c1 Mon Sep 17 00:00:00 2001 From: Tamas Fehervari <58502181+zEdS15B3GCwq@users.noreply.github.com> Date: Thu, 28 Aug 2025 19:56:17 +0900 Subject: [PATCH 5/6] put inline comments above code to prevent autoformat from spreading code over several lines --- mne/preprocessing/nirs/_beer_lambert_law.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/mne/preprocessing/nirs/_beer_lambert_law.py b/mne/preprocessing/nirs/_beer_lambert_law.py index 7082a2ee8cd..c3091585ac1 100644 --- a/mne/preprocessing/nirs/_beer_lambert_law.py +++ b/mne/preprocessing/nirs/_beer_lambert_law.py @@ -46,12 +46,10 @@ def beer_lambert_law(raw, ppf=6.0): # PPF validation for multiple wavelengths if ppf.ndim == 0: # single float - ppf = np.full( - (n_wavelengths, 1), ppf - ) # same PPF for all wavelengths, shape (n_wavelengths, 1) - elif ( - ppf.ndim == 1 and len(ppf) == n_wavelengths - ): # separate ppf for each wavelength + # same PPF for all wavelengths, shape (n_wavelengths, 1) + ppf = np.full((n_wavelengths, 1), ppf) + elif ppf.ndim == 1 and len(ppf) == n_wavelengths: + # separate ppf for each wavelength ppf = ppf[:, np.newaxis] # shape (n_wavelengths, 1) else: raise ValueError( From 4bae23e17501b932d13532e94f9129793a7cd429 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 4 Sep 2025 00:17:52 +0000 Subject: [PATCH 6/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- mne/preprocessing/nirs/_beer_lambert_law.py | 1 - 1 file changed, 1 deletion(-) diff --git a/mne/preprocessing/nirs/_beer_lambert_law.py b/mne/preprocessing/nirs/_beer_lambert_law.py index c3091585ac1..f26ea90a0d3 100644 --- a/mne/preprocessing/nirs/_beer_lambert_law.py +++ b/mne/preprocessing/nirs/_beer_lambert_law.py @@ -82,7 +82,6 @@ def beer_lambert_law(raw, ppf=6.0): # Iterate over channel groups ([Si_Di all wavelengths, Sj_Dj all wavelengths, ...]) pick_groups = zip(*[iter(picks)] * n_wavelengths) for group_picks in pick_groups: - # Calculate Δc based on the system: ΔOD = E * L * PPF * Δc # where E is (n_wavelengths, 2), Δc is (2, n_timepoints) # using pseudo-inverse