Skip to content
1 change: 0 additions & 1 deletion src/modacor/dataclasses/process_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ def __attrs_post_init__(self):
def __hash__(self):
return hash((self.documentation.__repr__(), self.configuration.__repr__(), self.step_id))


def prepare_execution(self):
"""
Prepare the execution of the ProcessStep
Expand Down
1 change: 1 addition & 0 deletions src/modacor/modules/base_modules/poisson_uncertainties.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@ def calculate(self, data: DataBundle, **kwargs: Any):

# Add the variance to the data
data["signal"].variances["Poisson"] = np.clip(signal, 1, None)
return {"signal": data["signal"]}
6 changes: 3 additions & 3 deletions src/modacor/runner/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ def add_outgoing_branch(self, branch: Self, branching_node):
# reinitialize the TopologicalSorter
super().__init__(graph=self.graph)

def run(self, data: DataBundle, **kwargs):
def run(self, **kwargs):
"""
run pipeline. to be extended for different schedulers
run pipeline with simple scheduling.
"""
self.prepare()
while self.is_active():
for node in self.get_ready():
node.execute(data, **kwargs)
node.execute(**kwargs)
self.done(node)
Empty file.
63 changes: 63 additions & 0 deletions src/modacor/tests/integration/test_pipeline_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import pytest
from pathlib import Path
import numpy as np
from pint import UnitRegistry

ureg = UnitRegistry()

from ...runner.pipeline import Pipeline
from ...dataclasses.process_step import ProcessStep
from ...dataclasses.process_step_describer import ProcessStepDescriber
from ...dataclasses.databundle import DataBundle
from ...dataclasses.basedata import BaseData
from ...io.io_sources import IoSources

from ...modules.base_modules.poisson_uncertainties import PoissonUncertainties

TEST_IO_SOURCES = IoSources()
TEST_DATA = DataBundle()

@pytest.fixture
def flat_data():
data = DataBundle()
data["signal"] = BaseData(
ingest_units=ureg.counts,
internal_units=ureg.counts,
display_units=ureg.counts,
signal=100 * np.ones((10, 10)),
)
return data


class DummyProcessStep(ProcessStep):
def calculate(self, data):
return {"test": 0}


def test_processstep_pipeline():
"tests execution of a linear processstep pipeline (not actually doing anything)"
steps = [DummyProcessStep(TEST_IO_SOURCES, step_id=i) for i in range(3)]
graph = {steps[i]: {steps[i + 1]} for i in range(len(steps) - 1)}

pipeline = Pipeline(graph=graph)
pipeline.prepare()
sequence = []
while pipeline.is_active():
for node in pipeline.get_ready():
sequence.append(node)
node.execute(data=TEST_DATA)
pipeline.done(node)
assert pipeline._nfinished == len(steps)


def test_actual_processstep(flat_data):
"test running the PoissonUncertainties Process step"
graph = {PoissonUncertainties(TEST_IO_SOURCES): {}}

pipeline = Pipeline(graph=graph)
pipeline.prepare()
while pipeline.is_active():
for node in pipeline.get_ready():
node.execute(data=flat_data)
pipeline.done(node)
assert node.produced_outputs["signal"].variances["Poisson"].mean().astype(int) == 100
Loading