Skip to content
This repository was archived by the owner on Aug 25, 2024. It is now read-only.

Commit 154f34f

Browse files
authored
df: memory: Auto start operations without inputs
Fixes: #392
1 parent 34f6ddf commit 154f34f

File tree

3 files changed

+61
-0
lines changed

3 files changed

+61
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1616
- Add python code for tensorflow DNNEstimator
1717
- Ability to run a subflow as if it were an operation using the
1818
`dffml.dataflow.run` operation.
19+
- Support for operations without inputs.
1920
### Fixed
2021
- New model tutorial mentions file paths that should be edited.
2122
- DataFlow is no longer a dataclass to prevent it from being exported

dffml/df/memory.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1026,6 +1026,23 @@ async def dispatch(
10261026
task.add_done_callback(ignore_args(self.completed_event.set))
10271027
return task
10281028

1029+
async def dispatch_auto_starts(self, octx: BaseOrchestratorContext, ctx):
1030+
"""
1031+
Schedule the running of all operations without inputs
1032+
"""
1033+
empty_parameter_set = MemoryParameterSet(
1034+
MemoryParameterSetConfig(ctx=ctx, parameters=[])
1035+
)
1036+
1037+
for operation in octx.config.dataflow.operations.values():
1038+
if operation.inputs:
1039+
continue
1040+
task = asyncio.create_task(
1041+
self.run_dispatch(octx, operation, empty_parameter_set)
1042+
)
1043+
task.add_done_callback(ignore_args(self.completed_event.set))
1044+
yield task
1045+
10291046

10301047
@entrypoint("memory")
10311048
class MemoryOperationImplementationNetwork(
@@ -1416,6 +1433,10 @@ async def run_operations_for_ctx(
14161433
# TODO(dfass) Make ictx.added(ctx) specific to dataflow
14171434
input_set_enters_network = asyncio.create_task(self.ictx.added(ctx))
14181435
tasks.add(input_set_enters_network)
1436+
# schedule running of operations with no inputs
1437+
async for task in self.nctx.dispatch_auto_starts(self, ctx):
1438+
tasks.add(task)
1439+
14191440
try:
14201441
# Return when outstanding operations reaches zero
14211442
while tasks:

tests/df/test_auto_start.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from dffml.df.base import op
2+
from dffml.df.types import DataFlow, Input, Definition
3+
from dffml.operation.output import GetSingle
4+
from dffml.util.asynctestcase import AsyncTestCase
5+
from dffml.df.memory import MemoryOrchestrator
6+
7+
STRING = Definition(name="string", primitive="str")
8+
9+
10+
@op(inputs={}, outputs={"string_out": STRING})
11+
async def announce():
12+
return {"string_out": "EXISTS"}
13+
14+
15+
class TestAutoStart(AsyncTestCase):
16+
async def setUp(self):
17+
dataflow = DataFlow(
18+
operations={
19+
"announce": announce.op,
20+
"get_single": GetSingle.imp.op,
21+
},
22+
seed=[
23+
Input(
24+
value=[announce.op.outputs["string_out"].name],
25+
definition=GetSingle.op.inputs["spec"],
26+
)
27+
],
28+
implementations={announce.op.name: announce.imp,},
29+
)
30+
31+
self.dataflow = dataflow
32+
33+
async def test_auto_start(self):
34+
test_inputs = {"testStart": []}
35+
async with MemoryOrchestrator.withconfig({}) as orchestrator:
36+
async with orchestrator(self.dataflow) as octx:
37+
async for ctx_str, results in octx.run(test_inputs):
38+
self.assertIn("string", results)
39+
self.assertEqual("EXISTS", results["string"])

0 commit comments

Comments
 (0)