Skip to content
19 changes: 12 additions & 7 deletions src/ess/reduce/nexus/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,23 +683,21 @@ def LoadMonitorWorkflow(
wf = sciline.Pipeline(
(*_common_providers, *_monitor_providers),
constraints=_gather_constraints(
run_types=run_types, monitor_types=monitor_types
run_types=run_types, monitor_types=monitor_types, component_types=[]
),
)
wf[PreopenNeXusFile] = PreopenNeXusFile(False)
return wf


def LoadDetectorWorkflow(
*,
run_types: Iterable[sciline.typing.Key],
monitor_types: Iterable[sciline.typing.Key],
Copy link
Member Author

@nvaytet nvaytet Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why we needed to have the monitor_types here, when we are loading detectors?
So I removed it.

*, run_types: Iterable[sciline.typing.Key]
) -> sciline.Pipeline:
"""Generic workflow for loading detector data from a NeXus file."""
wf = sciline.Pipeline(
(*_common_providers, *_detector_providers),
constraints=_gather_constraints(
run_types=run_types, monitor_types=monitor_types
run_types=run_types, monitor_types=[], component_types=[]
),
)
wf[DetectorBankSizes] = DetectorBankSizes({})
Expand All @@ -711,6 +709,7 @@ def GenericNeXusWorkflow(
*,
run_types: Iterable[sciline.typing.Key],
monitor_types: Iterable[sciline.typing.Key],
component_types: Iterable[sciline.typing.Key] | None = None,
) -> sciline.Pipeline:
"""
Generic workflow for loading detector and monitor data from a NeXus file.
Expand All @@ -735,6 +734,9 @@ def GenericNeXusWorkflow(
List of monitor types to include in the workflow.
Constrains the possible values of :class:`ess.reduce.nexus.types.MonitorType`
and :class:`ess.reduce.nexus.types.Component`.
component_types:
Additional component types to include in the workflow.
Constrains the possible values of :class:`ess.reduce.nexus.types.Component`.

Returns
-------
Expand All @@ -750,7 +752,9 @@ def GenericNeXusWorkflow(
*_metadata_providers,
),
constraints=_gather_constraints(
run_types=run_types, monitor_types=monitor_types
run_types=run_types,
monitor_types=monitor_types,
component_types=[] if component_types is None else component_types,
),
)
wf[DetectorBankSizes] = DetectorBankSizes({})
Expand All @@ -763,11 +767,12 @@ def _gather_constraints(
*,
run_types: Iterable[sciline.typing.Key],
monitor_types: Iterable[sciline.typing.Key],
component_types: Iterable[sciline.typing.Key],
) -> dict[TypeVar, Iterable[type]]:
mon = tuple(iter(monitor_types))
constraints = {
RunType: run_types,
MonitorType: mon,
Component: (*COMPONENT_CONSTRAINTS, *mon),
Component: (*COMPONENT_CONSTRAINTS, *mon, *component_types),
}
return constraints
53 changes: 52 additions & 1 deletion tests/nexus/workflow_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
from pathlib import Path

import pytest
import sciline as sl
import scipp as sc
import scippnexus as snx
from scipp.testing import assert_identical

from ess.reduce.nexus import compute_component_position, workflow
from ess.reduce.nexus import compute_component_position, load_component, workflow
from ess.reduce.nexus.types import (
BackgroundRun,
Beamline,
Expand Down Expand Up @@ -776,3 +777,53 @@ def assert_not_contains_type_arg(node: object, excluded: set[type]) -> None:
assert not any(
arg in excluded for arg in getattr(node, "__args__", ())
), f"Node {node} contains one of {excluded!r}"


def test_generic_nexus_workflow_load_custom_component_user_affiliation(
loki_tutorial_sample_run_60250: Path,
) -> None:
# Load a component in one of the top-level entries

class UserAffiliation(sl.Scope[RunType, str], str):
"""User affiliation."""

def load_user_affiliation(
location: NeXusComponentLocationSpec[UserAffiliation, RunType],
) -> UserAffiliation[RunType]:
return UserAffiliation[RunType](
load_component(location, nx_class=snx.NXuser)['affiliation']
Copy link
Member Author

@nvaytet nvaytet Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not figure out how to just have load_component(location, nx_class=None) and (below) wf[NeXusName[UserAffiliation]] = '/entry/user_0/affiliation'...

I'm running into the error ValueError: Expected a NeXus group as item '/entry/title' but got a field.
Help welcome!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand the approach at all: If we anyway add a custom "load" provider, why was the generic NeXus workflow modified? What role does it still play? Is it the location-spec mechanism?

Regarding the error, could it be related that load_component looks only within NXinstrument (except for the sample)? Is there some search logic that needs to be generalized?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the location-spec mechanism

Yes, I thought we needed to do things this way to re-use the load_component properly.
If there is a less invasive way to just add a provider and not modify the generic workflow, then it would be great if you can show me?
thanks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No that's fine... provided we can actually re-use load_component?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the error, could it be related that load_component looks only within NXinstrument (except for the sample)?

Sorry, the error message was confusing. I copied the wrong message (from a previous iteration).
The error is actually
ValueError: Expected a NeXus group as item '/entry/user_0/affiliation' but got a field.

So it is because load_component expects to load a group and not a field.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

load_component was made for any "physical component" (it handles positions, etc.). Say maybe it is reasonable and by design that it fails? Which brings us back to the question of what you gain by trying to uses the existing machinery instead of having a simple load provider?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't mind either way, I could not figure out how to properly just add a simple load provider which would hook into the part of the workflow that opens the file for reading.

I thought that using the Filename[SampleRun] as input to the provider would mean we open the file multiple times, and I wanted to avoid that.
So I would welcome some pointers as to what the provider would take in as input so it can load a custom field. Thanks in advance 🙂

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the same inputs that load_component does, but in a reusable provider (load_field or load_group?)?

)

wf = GenericNeXusWorkflow(
run_types=[SampleRun], monitor_types=[], component_types=[UserAffiliation]
)
wf.insert(load_user_affiliation)
wf[Filename[SampleRun]] = loki_tutorial_sample_run_60250
wf[NeXusName[UserAffiliation]] = '/entry/user_0'
affiliation = wf.compute(UserAffiliation[SampleRun])
assert affiliation == 'ESS'


def test_generic_nexus_workflow_load_custom_component_source_name(
loki_tutorial_sample_run_60250: Path,
) -> None:
# Load a component inside the instrument entry

class SourceName(sl.Scope[RunType, str], str):
"""Source name."""

def load_source_name(
location: NeXusComponentLocationSpec[SourceName, RunType],
) -> SourceName[RunType]:
return SourceName[RunType](
load_component(location, nx_class=snx.NXsource)['name']
)

wf = GenericNeXusWorkflow(
run_types=[SampleRun], monitor_types=[], component_types=[SourceName]
)
wf.insert(load_source_name)
wf[Filename[SampleRun]] = loki_tutorial_sample_run_60250
wf[NeXusName[SourceName]] = '/entry/instrument/source'
source_name = wf.compute(SourceName[SampleRun])
assert source_name == 'moderator'
Loading