Skip to content

Commit f63cea3

Browse files
authored
Merge pull request #16 from BAMresearch/basic_pipeline_generator
Runner/pipeline draft based on graphlibs TopologicalSorter Anja says everything is ok.
2 parents a461e79 + d995aff commit f63cea3

File tree

3 files changed

+154
-0
lines changed

3 files changed

+154
-0
lines changed

src/modacor/runner/__init__.py

Whitespace-only changes.

src/modacor/runner/pipeline.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# src/modacor/runner/pipeline.py
2+
# # -*- coding: utf-8 -*-
3+
from __future__ import annotations
4+
5+
from graphlib import TopologicalSorter
6+
from pathlib import Path
7+
8+
from attrs import define, field
9+
from attrs import validators as v
10+
11+
from ..dataclasses.process_step import ProcessStep
12+
13+
__all__ = ["Pipeline"]
14+
15+
16+
@define
17+
class Pipeline(TopologicalSorter):
18+
"""
19+
Pipeline nodes are assumed to be of type ProcessStep
20+
"""
21+
22+
name: str = field(default="Unnamed Pipeline")
23+
graph: dict[ProcessStep] = field(factory=dict)
24+
25+
def __attrs_post_init__(self):
26+
super().__init__(graph=self.graph)
27+
28+
@classmethod
29+
def from_json(cls, path_to_json: Path):
30+
# functionality postponed
31+
return cls(name="dummy")
32+
33+
@classmethod
34+
def from_dict(cls, graph_dict: dict, name=""):
35+
return cls(name=name, graph=graph_dict)
36+
37+
def add_incoming_branch(self, branch: Self, branching_node):
38+
"""
39+
Add a pipeline as a branch whose outcome shall be combined the existing pipeline.
40+
41+
This assumes that the branch to be added has a single exit point.
42+
43+
"""
44+
pipeline_to_add = Pipeline(graph=branch.graph)
45+
pipeline_to_add_ordered = [*pipeline_to_add.static_order()]
46+
# add the last node of the incoming as a predecessor to the connection point
47+
self.graph[branching_node].update({pipeline_to_add_ordered[-1]})
48+
# add the rest of the graph
49+
self.graph = self.graph | branch.graph
50+
# reinitialize the TopologicalSorter
51+
super().__init__(graph=self.graph)
52+
53+
def add_outgoing_branch(self, branch: Self, branching_node):
54+
"""
55+
Add a pipeline as a branch whose input is based on the existing pipeline.
56+
57+
This assumes that the branch to be added has a single entry point.
58+
59+
"""
60+
pipeline_to_add = Pipeline(graph=branch.graph)
61+
pipeline_to_add_ordered = [*pipeline_to_add.static_order()]
62+
# add the connection node as a predecessor to the first node of the outgoing branch
63+
branch.graph[pipeline_to_add_ordered[0]].update({branching_node})
64+
# add the rest of the graph
65+
self.graph = self.graph | branch.graph
66+
# reinitialize the TopologicalSorter
67+
super().__init__(graph=self.graph)
68+
69+
def run(self, data: DataBundle, **kwargs):
70+
"""
71+
run pipeline. to be extended for different schedulers
72+
"""
73+
self.prepare()
74+
while self.is_active():
75+
for node in self.get_ready():
76+
node.execute(data, **kwargs)
77+
self.done(node)

src/modacor/tests/test_pipeline.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import pytest
2+
from pathlib import Path
3+
4+
from ..runner.pipeline import Pipeline
5+
from ..dataclasses.process_step import ProcessStep
6+
from ..dataclasses.process_step_describer import ProcessStepDescriber
7+
8+
9+
@pytest.fixture
10+
def linear_pipeline():
11+
return {3: {2, 1}, 2: {1}}
12+
13+
14+
class DummyIoSources:
15+
pass
16+
17+
18+
class DummyProcessStepDescriber:
19+
pass
20+
21+
22+
class DummyProcessStep:
23+
pass
24+
25+
26+
def test_linear_pipeline(linear_pipeline):
27+
"tests the sequence is expected for a linear graph"
28+
pipeline = Pipeline(graph=linear_pipeline)
29+
pipeline.prepare()
30+
sequence = []
31+
while pipeline.is_active():
32+
for node in pipeline.get_ready():
33+
sequence.append(node)
34+
pipeline.done(node)
35+
assert sequence == [1, 2, 3]
36+
37+
38+
def test_node_addition(linear_pipeline):
39+
pipeline = Pipeline.from_dict(linear_pipeline)
40+
ps = DummyProcessStep()
41+
pipeline.add(ps, *[1, 2, 3])
42+
pipeline.prepare()
43+
sequence = []
44+
while pipeline.is_active():
45+
for node in pipeline.get_ready():
46+
sequence.append(node)
47+
pipeline.done(node)
48+
assert sequence == [1, 2, 3, ps]
49+
50+
51+
def test_branch_addition(linear_pipeline, pipeline_to_add={5: {6}}, at_node=2):
52+
"""
53+
add a pipeline as a branch on an existing pipeline, using the inherited add method
54+
55+
"""
56+
pipeline_1 = Pipeline(graph=linear_pipeline)
57+
pipeline_2 = Pipeline(graph=pipeline_to_add)
58+
pipeline_1.add(at_node, *pipeline_2.static_order())
59+
assert [*pipeline_1.static_order()] == [1, 6, 5, 2, 3]
60+
61+
62+
def test_branch_addition_method(linear_pipeline, branch_graph={5: {6}}, branching_node=2):
63+
pipeline = Pipeline(graph=linear_pipeline)
64+
branch = Pipeline(graph=branch_graph)
65+
pipeline.add_incoming_branch(branch, branching_node=2)
66+
assert [*pipeline.static_order()] == [1, 6, 5, 2, 3]
67+
assert pipeline.graph == {3: {2, 1}, 2: {1, 5}, 5: {6}}
68+
69+
70+
def test_diverging_branch_addition(
71+
linear_pipeline, branch_graph={5: {6}, 6: set()}, branching_node=2
72+
):
73+
pipeline = Pipeline(graph=linear_pipeline)
74+
branch = Pipeline(graph=branch_graph)
75+
pipeline.add_outgoing_branch(branch, branching_node)
76+
assert [*pipeline.static_order()] == [1, 2, 3, 6, 5]
77+
assert pipeline.graph == {3: {2, 1}, 2: {1}, 5: {6}, 6: {2}}

0 commit comments

Comments
 (0)