Skip to content

Commit cfdcf73

Browse files
authored
Merge pull request #33 from BAMresearch/pipeline_with_processsteps
Pipeline with processsteps, integration test
2 parents 59d325c + 993efe6 commit cfdcf73

File tree

5 files changed

+67
-4
lines changed

5 files changed

+67
-4
lines changed

src/modacor/dataclasses/process_step.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ def __attrs_post_init__(self):
109109
def __hash__(self):
110110
return hash((self.documentation.__repr__(), self.configuration.__repr__(), self.step_id))
111111

112-
113112
def prepare_execution(self):
114113
"""
115114
Prepare the execution of the ProcessStep

src/modacor/modules/base_modules/poisson_uncertainties.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,4 @@ def calculate(self, data: DataBundle, **kwargs: Any):
4646

4747
# Add the variance to the data
4848
data["signal"].variances["Poisson"] = np.clip(signal, 1, None)
49+
return {"signal": data["signal"]}

src/modacor/runner/pipeline.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,12 @@ def add_outgoing_branch(self, branch: Self, branching_node):
8383
# reinitialize the TopologicalSorter
8484
super().__init__(graph=self.graph)
8585

86-
def run(self, data: DataBundle, **kwargs):
86+
def run(self, **kwargs):
8787
"""
88-
run pipeline. to be extended for different schedulers
88+
run pipeline with simple scheduling.
8989
"""
9090
self.prepare()
9191
while self.is_active():
9292
for node in self.get_ready():
93-
node.execute(data, **kwargs)
93+
node.execute(**kwargs)
9494
self.done(node)

src/modacor/tests/integration/__init__.py

Whitespace-only changes.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import pytest
2+
from pathlib import Path
3+
import numpy as np
4+
from pint import UnitRegistry
5+
6+
ureg = UnitRegistry()
7+
8+
from ...runner.pipeline import Pipeline
9+
from ...dataclasses.process_step import ProcessStep
10+
from ...dataclasses.process_step_describer import ProcessStepDescriber
11+
from ...dataclasses.databundle import DataBundle
12+
from ...dataclasses.basedata import BaseData
13+
from ...io.io_sources import IoSources
14+
15+
from ...modules.base_modules.poisson_uncertainties import PoissonUncertainties
16+
17+
TEST_IO_SOURCES = IoSources()
18+
TEST_DATA = DataBundle()
19+
20+
@pytest.fixture
21+
def flat_data():
22+
data = DataBundle()
23+
data["signal"] = BaseData(
24+
ingest_units=ureg.counts,
25+
internal_units=ureg.counts,
26+
display_units=ureg.counts,
27+
signal=100 * np.ones((10, 10)),
28+
)
29+
return data
30+
31+
32+
class DummyProcessStep(ProcessStep):
33+
def calculate(self, data):
34+
return {"test": 0}
35+
36+
37+
def test_processstep_pipeline():
38+
"tests execution of a linear processstep pipeline (not actually doing anything)"
39+
steps = [DummyProcessStep(TEST_IO_SOURCES, step_id=i) for i in range(3)]
40+
graph = {steps[i]: {steps[i + 1]} for i in range(len(steps) - 1)}
41+
42+
pipeline = Pipeline(graph=graph)
43+
pipeline.prepare()
44+
sequence = []
45+
while pipeline.is_active():
46+
for node in pipeline.get_ready():
47+
sequence.append(node)
48+
node.execute(data=TEST_DATA)
49+
pipeline.done(node)
50+
assert pipeline._nfinished == len(steps)
51+
52+
53+
def test_actual_processstep(flat_data):
54+
"test running the PoissonUncertainties Process step"
55+
graph = {PoissonUncertainties(TEST_IO_SOURCES): {}}
56+
57+
pipeline = Pipeline(graph=graph)
58+
pipeline.prepare()
59+
while pipeline.is_active():
60+
for node in pipeline.get_ready():
61+
node.execute(data=flat_data)
62+
pipeline.done(node)
63+
assert node.produced_outputs["signal"].variances["Poisson"].mean().astype(int) == 100

0 commit comments

Comments
 (0)