Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ filterwarnings = [
line-length = 88
extend-include = ["*.ipynb"]
extend-exclude = [
".*", "__pycache__", "build", "dist", "install",
".*", "__pycache__", "build", "dist", "install", "test-stream-processor.py",
]

[tool.ruff.lint]
Expand Down
16 changes: 15 additions & 1 deletion src/ess/estia/conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def wavelength(
):
"Converts event_time_offset to wavelength"
# Use frame unwrapping from scippneutron
w = sc.bins_like(event_time_offset, sc.scalar(7.0, unit='angstrom'))
return w
raise NotImplementedError()


Expand All @@ -76,7 +78,19 @@ def coordinate_transformation_graph() -> CoordTransformationGraph:
"L1": lambda source_position, sample_position: sc.norm(
sample_position - source_position
), # + extra correction for guides?
"L2": lambda position, sample_position: sc.norm(position - sample_position),
"L2": lambda position, sample_position: sc.norm(
position - sample_position.to(unit=position.unit)
),
"blade": lambda detector_number: sc.array(
dims=detector_number.dims, values=(detector_number.values - 1) // (32 * 64)
),
"wire": lambda detector_number: sc.array(
dims=detector_number.dims, values=((detector_number.values - 1) // 64) % 32
),
"strip": lambda detector_number: sc.array(
dims=detector_number.dims, values=(detector_number.values - 1) % 64
),
"z_index": lambda wire, blade: 32 * blade + wire,
}


Expand Down
8 changes: 8 additions & 0 deletions src/ess/estia/corrections.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
BeamDivergenceLimits,
CoordTransformationGraph,
DetectorData,
DetectorRotation,
ProtonCurrent,
ReducibleData,
RunType,
SampleRotation,
WavelengthBins,
YIndexLimits,
ZIndexLimits,
Expand All @@ -29,13 +31,19 @@ def add_coords_masks_and_apply_corrections(
bdlim: BeamDivergenceLimits,
wbins: WavelengthBins,
proton_current: ProtonCurrent[RunType],
sample_rotation: SampleRotation[RunType],
detector_rotation: DetectorRotation[RunType],
graph: CoordTransformationGraph,
) -> ReducibleData[RunType]:
"""
Computes coordinates, masks and corrections that are
the same for the sample measurement and the reference measurement.
"""
da = da.assign_coords(
sample_rotation=sample_rotation, detector_rotation=detector_rotation
)
da = add_coords(da, graph)
da = da.transform_coords(('blade', 'wire', 'strip', 'z_index'), graph)
da = add_masks(da, ylim, zlims, bdlim, wbins)
da = correct_by_footprint(da)

Expand Down
3 changes: 2 additions & 1 deletion src/ess/reflectometry/conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ def add_proton_current_coord(
) -> sc.DataArray:
"""Find the proton current value for each event and
adds it as a coord to the data array."""
time = pc.coords['time'].to(unit='ns', dtype='int64')
pc_lookup = sc.lookup(
pc.assign_coords(time=pc.coords['time'].to(unit='ns')),
pc.assign_coords(time=time + sc.datetime(0, unit=time.unit)),
dim='time',
mode='previous',
fill_value=sc.scalar(float('nan'), unit=pc.unit),
Expand Down
167 changes: 167 additions & 0 deletions test-stream-processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# ruff: noqa
from copy import deepcopy
from typing import TypeVar

import numpy as np
import scipp as sc
from scippnexus import NXdetector

import ess.estia as estia
import ess.reflectometry as reflectometry
from ess.amor.types import *
from ess.reduce.streaming import Accumulator, EternalAccumulator, StreamProcessor
from ess.reflectometry.types import *


def add_dummmy_value_to_all_transformations():
import h5py

with h5py.File('estia.hdf', 'r+') as f:

def visitor(name, g):
if (
isinstance(g, h5py.Group)
and g.attrs['NX_class'] == 'NXlog'
and 'transformation_type' in g.attrs
):
print(g)

if len(g['time']) == 0:
print('Fixed')

attrs = dict(f[name + '/time'].attrs)
del f[name + '/time']
f.create_dataset(name + '/time', data=np.zeros(1))
for k, v in attrs.items():
f[name + '/time'].attrs[k] = v

attrs = dict(f[name + '/value'].attrs)
del f[name + '/value']
f.create_dataset(name + '/value', data=np.zeros(1))
for k, v in attrs.items():
f[name + '/value'].attrs[k] = v

f.visititems(visitor)

with h5py.File('estia.hdf', 'r') as f:

def visitor(name, g):
if (
isinstance(g, h5py.Group)
and g.attrs['NX_class'] == 'NXlog'
and 'transformation_type' in g.attrs
):
print(name, len(g['time']))

f.visititems(visitor)


# add_dummmy_value_to_all_transformations()


def estia_stream_workflow() -> StreamProcessor:
workflow = estia.EstiaWorkflow()
workflow[SampleSize[SampleRun]] = sc.scalar(10.0, unit='mm')
workflow[SampleSize[ReferenceRun]] = sc.scalar(10.0, unit='mm')

workflow[ChopperPhase[ReferenceRun]] = sc.scalar(-7.5, unit='deg')
workflow[ChopperPhase[SampleRun]] = sc.scalar(-7.5, unit='deg')

workflow[QBins] = sc.geomspace(
dim='Q', start=0.005, stop=0.3, num=391, unit='1/angstrom'
)
workflow[WavelengthBins] = sc.geomspace(
'wavelength', 2.8, 12.5, 2001, unit='angstrom'
)

workflow[YIndexLimits] = sc.scalar(11), sc.scalar(41)
workflow[ZIndexLimits] = sc.scalar(80), sc.scalar(370)
workflow[BeamDivergenceLimits] = (
sc.scalar(-0.75, unit='deg'),
sc.scalar(0.75, unit='deg'),
)

workflow[SampleRotationOffset[SampleRun]] = sc.scalar(0.05, unit='deg')

workflow[SampleRotationOffset[ReferenceRun]] = sc.scalar(0.05, unit='deg')

workflow[Filename[SampleRun]] = './estia.hdf'
workflow[Filename[ReferenceRun]] = './estia.hdf'

workflow[ProtonCurrent[SampleRun]] = sc.DataArray(
sc.ones(dims=['time'], shape=(1,)),
coords={'time': sc.zeros(dims=['time'], shape=(1,), unit='s')},
)
workflow[ProtonCurrent[ReferenceRun]] = sc.DataArray(
sc.ones(dims=['time'], shape=(1,)),
coords={'time': sc.zeros(dims=['time'], shape=(1,), unit='s')},
)
workflow[DetectorSpatialResolution[SampleRun]] = 0.0025 * sc.units.m
workflow[DetectorSpatialResolution[ReferenceRun]] = 0.0025 * sc.units.m
import ess.reflectometry.supermirror as sm

workflow[sm.CriticalEdge] = 0.022 * sc.Unit("1/angstrom")
workflow[sm.Alpha] = sc.scalar(0.25 / 0.088, unit=sc.units.angstrom)
workflow[sm.MValue] = 5

workflow[SampleRotation[SampleRun]] = sc.scalar(1.0, unit='deg')
workflow[DetectorRotation[SampleRun]] = sc.scalar(2.0, unit='deg')
workflow[SampleRotation[ReferenceRun]] = sc.scalar(1.0, unit='deg')
workflow[DetectorRotation[ReferenceRun]] = sc.scalar(2.0, unit='deg')

workflow[DetectorData[ReferenceRun]] = workflow.compute(DetectorData[SampleRun])

wf = workflow.copy()
return StreamProcessor(
wf,
dynamic_keys=(NeXusComponent[NXdetector, SampleRun],),
target_keys=(ReflectivityOverQ,),
accumulators={
Sample: EternalAccumulator,
Reference: LatestAccumulator,
},
)


T = TypeVar('T')


class LatestAccumulator(Accumulator[T]):
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._value: T | None = None

@property
def is_empty(self) -> bool:
return self._value is None

def _get_value(self) -> T:
return deepcopy(self._value)

def _do_push(self, value: T) -> None:
self._value = deepcopy(value)

def clear(self) -> None:
"""Clear the accumulated value."""
self._value = None


fake = sc.DataGroup(
{
'data': sc.DataArray(
sc.ones(dims=['events'], shape=[10000]),
coords={
'event_id': sc.arange('events', 1, 10001),
'event_time_offset': sc.arange('events', 1, 10001, unit='us'),
},
)
}
)

s = estia_stream_workflow()
s.accumulate(
{
NeXusComponent[NXdetector, SampleRun]: fake,
}
)
s.finalize()
Loading