-
Notifications
You must be signed in to change notification settings - Fork 241
Closed
Labels
metricsRelated to metrics moduleRelated to metrics module
Description
I ran into this error in the NeuroConv tests of the SortingAnalyzer. See test code below
class TestWriteSortingAnalyzer(TestCase):
@classmethod
def setUpClass(cls):
# import submodules to unlock extensions
from spikeinterface import create_sorting_analyzer
cls.num_units = 4
cls.num_channels = 4
duration_1 = 6
duration_2 = 7
single_segment_rec, single_segment_sort = generate_ground_truth_recording(
num_channels=cls.num_channels, durations=[duration_1]
)
multi_segment_rec, multi_segment_sort = generate_ground_truth_recording(
num_channels=cls.num_channels, durations=[duration_1, duration_2]
)
single_segment_rec.annotate(is_filtered=True)
multi_segment_rec.annotate(is_filtered=True)
single_segment_sort.delete_property("gt_unit_locations")
multi_segment_sort.delete_property("gt_unit_locations")
cls.single_segment_analyzer = create_sorting_analyzer(single_segment_sort, single_segment_rec, sparse=False)
cls.single_segment_analyzer_sparse = create_sorting_analyzer(
single_segment_sort, single_segment_rec, sparse=True
)
cls.multi_segment_analyzer = create_sorting_analyzer(multi_segment_sort, multi_segment_rec, sparse=False)
cls.multi_segment_analyzer_sparse = create_sorting_analyzer(multi_segment_sort, multi_segment_rec, sparse=True)
# add quality/template metrics to test property propagation
extension_list = ["random_spikes", "noise_levels", "templates", "template_metrics", "quality_metrics"]
cls.single_segment_analyzer.compute(extension_list)
cls.single_segment_analyzer_sparse.compute(extension_list)
cls.multi_segment_analyzer.compute(extension_list)
cls.multi_segment_analyzer_sparse.compute(extension_list)
# slice sorting
cls.analyzer_slice = cls.single_segment_analyzer.select_units(
unit_ids=cls.single_segment_analyzer.unit_ids[::2]
)
# recordingless
cls.tmpdir = Path(mkdtemp())
# create analyzer without recording
cls.analyzer_recless = cls.single_segment_analyzer.copy()
cls.analyzer_recless._recording = None
cls.analyzer_recless_recording = single_segment_rec
# slice recording before analyzer (to mimic bad channel removal)
single_segment_rec_sliced = single_segment_rec.select_channels(["0", "2", "3"])
cls.analyzer_channel_sliced = create_sorting_analyzer(single_segment_sort, single_segment_rec_sliced)
cls.analyzer_channel_sliced.compute(extension_list)
cls.analyzer_rec_sliced = single_segment_rec_sliced
cls.nwbfile_path = cls.tmpdir / "test.nwb"
if cls.nwbfile_path.exists():
cls.nwbfile_path.unlink()And resulting traceback
tests/test_modalities/test_ecephys/test_tools_spikeinterface.py:2086:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../spikeinterface/src/spikeinterface/core/sortinganalyzer.py:1680: in compute
self.compute_several_extensions(extensions=extensions, save=save, verbose=verbose, **job_kwargs)
../spikeinterface/src/spikeinterface/core/sortinganalyzer.py:1864: in compute_several_extensions
self.compute_one_extension(extension_name, save=save, verbose=verbose, **extension_params, **job_kwargs)
../spikeinterface/src/spikeinterface/core/sortinganalyzer.py:1744: in compute_one_extension
extension_instance.run(save=save, verbose=verbose, **job_kwargs)
../spikeinterface/src/spikeinterface/core/sortinganalyzer.py:2557: in run
self._run(**kwargs)
../spikeinterface/src/spikeinterface/core/analyzer_extension_core.py:1135: in _run
computed_metrics = self._compute_metrics(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <spikeinterface.metrics.quality.quality_metrics.ComputeQualityMetrics object at 0x1190c23c0>
sorting_analyzer = SortingAnalyzer: 4 channels - 10 units - 1 segments - memory - has recording
Loaded 4 extensions: random_spikes, noise_levels, templates, template_metrics
unit_ids = array(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9'], dtype='<U1')
metric_names = ['num_spikes', 'firing_rate', 'presence_ratio', 'snr', 'isi_violation', 'rp_violation', ...]
job_kwargs = {'chunk_duration': '1s', 'max_threads_per_worker': 1, 'mp_context': None, 'n_jobs': 1, ...}
pd = <module 'pandas' from '/opt/anaconda3/envs/neuroconv_dev_env/lib/python3.13/site-packages/pandas/__init__.py'>, tmp_data = {}
column_names_dtypes = {'firing_range': <class 'float'>, 'firing_rate': <class 'float'>, 'isi_violations_count': <class 'int'>, 'isi_violations_ratio': <class 'float'>, ...}
metric_name = 'rp_violation', metric = <class 'spikeinterface.metrics.quality.misc_metrics.RPViolation'>
metrics = num_spikes firing_rate presence_ratio snr isi_violations_ratio ... sliding_rp_violation sync_spike_2 sync_sp... 0.0 ... NaN NaN NaN NaN NaN
[10 rows x 13 columns]
def _compute_metrics(
self,
sorting_analyzer: SortingAnalyzer,
unit_ids: list[int | str] | None = None,
metric_names: list[str] | None = None,
**job_kwargs,
):
"""
Compute metrics.
Parameters
----------
sorting_analyzer : SortingAnalyzer
The SortingAnalyzer object.
unit_ids : list[int | str] | None, default: None
List of unit ids to compute metrics for. If None, all units are used.
metric_names : list[str] | None, default: None
List of metric names to compute. If None, all metrics in params["metric_names"]
are used.
Returns
-------
metrics : pd.DataFrame
DataFrame containing the computed metrics for each unit.
"""
import pandas as pd
if unit_ids is None:
unit_ids = sorting_analyzer.unit_ids
tmp_data = self._prepare_data(sorting_analyzer=sorting_analyzer, unit_ids=unit_ids)
if metric_names is None:
metric_names = self.params["metric_names"]
column_names_dtypes = {}
for metric_name in metric_names:
metric = [m for m in self.metric_list if m.metric_name == metric_name][0]
column_names_dtypes.update(metric.metric_columns)
metrics = pd.DataFrame(index=unit_ids, columns=list(column_names_dtypes.keys()))
for metric_name in metric_names:
metric = [m for m in self.metric_list if m.metric_name == metric_name][0]
print(f"{metric = }")
column_names = list(metric.metric_columns.keys())
try:
metric_params = self.params["metric_params"].get(metric_name, {})
res = metric.compute(
sorting_analyzer,
unit_ids=unit_ids,
metric_params=metric_params,
tmp_data=tmp_data,
job_kwargs=job_kwargs,
)
except Exception as e:
warnings.warn(f"Error computing metric {metric_name}: {e}")
if len(column_names) == 1:
res = {unit_id: np.nan for unit_id in unit_ids}
else:
res = namedtuple("MetricResult", column_names)(*([np.nan] * len(column_names)))
# res is a namedtuple with several dictionary entries (one per column)
if isinstance(res, dict):
column_name = column_names[0]
metrics.loc[unit_ids, column_name] = pd.Series(res)
else:
> for i, col in enumerate(res._fields):
^^^^^^^^^^^
E AttributeError: 'NoneType' object has no attribute '_fields'
../spikeinterface/src/spikeinterface/core/analyzer_extension_core.py:1119: AttributeErrorMetadata
Metadata
Assignees
Labels
metricsRelated to metrics moduleRelated to metrics module