Skip to content
This repository was archived by the owner on Aug 25, 2024. It is now read-only.

Commit 48c54b9

Browse files
authored
cli: dataflow: run: Commands to run dataflow without sources
Fixes: #812
1 parent e8c04a0 commit 48c54b9

File tree

4 files changed

+208
-5
lines changed

4 files changed

+208
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
7171
- Support for immediate response in HTTP service
7272
- Daal4py example usage.
7373
- Gitter chatbot tutorial.
74+
- Option to run dataflow without sources from cli.
7475
### Changed
7576
- Renamed `-seed` to `-inputs` in `dataflow create` command
7677
- Renamed configloader/png to configloader/image and added support for loading JPEG and TIFF file formats

dffml/cli/dataflow.py

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import pathlib
22
import hashlib
33
import contextlib
4-
from typing import List
4+
from typing import List, Dict, Any
55

66
from ..base import BaseConfig
77
from ..df.base import BaseOrchestrator, OperationImplementation
@@ -27,6 +27,7 @@
2727
)
2828
from ..util.cli.parser import ParseInputsAction
2929
from ..base import config, field
30+
from ..high_level import run as run_dataflow
3031

3132

3233
@config
@@ -329,9 +330,120 @@ class RunRecords(CMD):
329330
_all = RunAllRecords
330331

331332

333+
@config
334+
class RunSingleConfig:
335+
dataflow: str = field(
336+
"File containing exported DataFlow", required=True,
337+
)
338+
no_echo: bool = field(
339+
"Do not echo back records", default=False,
340+
)
341+
configloader: BaseConfigLoader = field(
342+
"ConfigLoader to use for importing DataFlow", default=None,
343+
)
344+
orchestrator: BaseOrchestrator = field(
345+
"Orchestrator", default=MemoryOrchestrator,
346+
)
347+
inputs: List[str] = field(
348+
"Other inputs to add under each ctx",
349+
action=ParseInputsAction,
350+
default_factory=lambda: [],
351+
)
352+
no_strict: bool = field(
353+
"Do not exit on operation exceptions, just log errors", default=False,
354+
)
355+
356+
357+
class RunSingle(CMD):
358+
CONFIG = RunSingleConfig
359+
360+
def __init__(self, *args, **kwargs):
361+
super().__init__(*args, **kwargs)
362+
self.orchestrator = self.orchestrator.withconfig(self.extra_config)
363+
364+
async def get_dataflow(self, dataflow_path):
365+
dataflow_path = pathlib.Path(dataflow_path)
366+
config_cls = self.configloader
367+
if config_cls is None:
368+
config_type = dataflow_path.suffix.replace(".", "")
369+
config_cls = BaseConfigLoader.load(config_type)
370+
async with config_cls.withconfig(self.extra_config) as configloader:
371+
async with configloader() as loader:
372+
exported = await loader.loadb(dataflow_path.read_bytes())
373+
dataflow = DataFlow._fromdict(**exported)
374+
return dataflow
375+
376+
async def run(self):
377+
dataflow = await self.get_dataflow(self.dataflow)
378+
dataflow_inputs = []
379+
for value, def_name in self.inputs:
380+
dataflow_inputs.append(
381+
Input(value=value, definition=dataflow.definitions[def_name],)
382+
)
383+
async for ctx, results in run_dataflow(
384+
dataflow,
385+
dataflow_inputs,
386+
orchestrator=self.orchestrator,
387+
strict=not self.no_strict,
388+
):
389+
if not self.no_echo:
390+
yield results
391+
if self.no_echo:
392+
yield CMDOutputOverride
393+
394+
395+
@config
396+
class RunContextsConfig(RunSingleConfig):
397+
context_def: str = field(
398+
"Definition to be used for contexts key. "
399+
+ "If set, the key will be added to the set of inputs "
400+
+ "under each context (which is also the contexts name)",
401+
default=False,
402+
)
403+
contexts: List[str] = field(
404+
"Contexts to run", default_factory=lambda: ["context1"], required=False
405+
)
406+
407+
408+
class RunContexts(RunSingle):
409+
CONFIG = RunContextsConfig
410+
411+
async def run(self):
412+
dataflow = await self.get_dataflow(self.dataflow)
413+
common_inputs = []
414+
for value, def_name in self.inputs:
415+
common_inputs.append(
416+
Input(value=value, definition=dataflow.definitions[def_name],)
417+
)
418+
419+
dataflow_inputs = {
420+
ctx_string: [
421+
Input(
422+
value=ctx_string,
423+
definition=dataflow.definitions[self.context_def],
424+
)
425+
]
426+
+ common_inputs
427+
for ctx_string in self.contexts
428+
}
429+
430+
async for ctx, result in run_dataflow(
431+
dataflow,
432+
dataflow_inputs,
433+
orchestrator=self.orchestrator,
434+
strict=not self.no_strict,
435+
):
436+
if not self.no_echo:
437+
yield {(await ctx.handle()).as_string(): result}
438+
if self.no_echo:
439+
yield CMDOutputOverride
440+
441+
332442
class Run(CMD):
333443
"""Run dataflow"""
334444

445+
single = RunSingle
446+
contexts = RunContexts
335447
records = RunRecords
336448

337449

docs/cli.rst

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,12 +150,12 @@ All
150150

151151
Update all the records in any source using the :py:class:`DataFlowSource <dffml.source.df.DataFlowSource>`.
152152

153-
For this example, we are using the `multiply` operation which multiplies every value in a record by a
153+
For this example, we are using the `multiply` operation which multiplies every value in a record by a
154154
factor which is 10 in this case. The example dataflow file looks like this:
155155

156156
.. literalinclude:: /../examples/edit_records.yaml
157157

158-
Create a source file:
158+
Create a source file:
159159

160160
.. code-block:: console
161161
@@ -175,7 +175,7 @@ Run the command:
175175
-sources f=csv -source-filename data.csv -source-readwrite \
176176
-features Years:int:1 Expertise:int:1 Trust:float:1 Salary:int:1 \
177177
-dataflow edit_records.yaml
178-
$ dffml list records -sources f=csv -source-filename data.csv
178+
$ dffml list records -sources f=csv -source-filename data.csv
179179
[
180180
{
181181
"extra": {},
@@ -325,6 +325,22 @@ command during generation.
325325
{'hello': 'world'}
326326
{'hello': 'user'}
327327
328+
We can also run the dataflow without using a source
329+
330+
.. code-block:: console
331+
332+
$ dffml dataflow run contexts \
333+
-no-echo \
334+
-dataflow df.yaml \
335+
-context-def value \
336+
-contexts \
337+
world \
338+
$USER \
339+
-input \
340+
hello=key
341+
{'hello': 'world'}
342+
{'hello': 'user'}
343+
328344
Diagram
329345
~~~~~~~
330346

tests/test_cli.py

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import contextlib
1010
from pathlib import Path
1111
from unittest.mock import patch
12-
from typing import List, AsyncIterator
12+
from typing import List, AsyncIterator, Dict
1313

1414
from dffml.record import Record
1515
from dffml.feature import Feature, Features
@@ -21,17 +21,21 @@
2121
from dffml.model.accuracy import Accuracy as AccuracyType
2222
from dffml.util.entrypoint import entrypoint
2323
from dffml.util.asynctestcase import (
24+
AsyncTestCase,
2425
AsyncExitStackTestCase,
2526
non_existant_tempfile,
2627
)
2728
from dffml.base import config
29+
from dffml.df.base import op
2830
from dffml.cli.cli import Merge
2931
from dffml.cli.ml import Train, Accuracy, Predict
3032
from dffml.cli.list import List
3133
from dffml.cli.dataflow import Dataflow
3234

3335
from .test_df import OPERATIONS, OPIMPS
3436

37+
from dffml import op, DataFlow, Definition
38+
3539

3640
class RecordsTestCase(AsyncExitStackTestCase):
3741
async def setUp(self):
@@ -350,6 +354,76 @@ async def test_run(self):
350354
shutil.rmtree(tmpdir)
351355

352356

357+
class TestDataflowRunSingle(AsyncTestCase):
358+
async def test_run(self):
359+
tmpdir = tempfile.mkdtemp()
360+
handle, dataflow_file = tempfile.mkstemp(suffix=".json", dir=tmpdir)
361+
os.close(handle)
362+
with open(dataflow_file, mode="w+b") as dataflow_file:
363+
dataflow = io.StringIO()
364+
with contextlib.redirect_stdout(dataflow):
365+
await Dataflow.cli(
366+
"create",
367+
"-configloader",
368+
"json",
369+
*map(lambda op: op.name, OPERATIONS),
370+
)
371+
dataflow_file.write(dataflow.getvalue().encode())
372+
dataflow_file.seek(0)
373+
results = await Dataflow.cli(
374+
"run",
375+
"single",
376+
"-dataflow",
377+
dataflow_file.name,
378+
"-inputs",
379+
'["result"]=get_single_spec',
380+
"add 40 and 2=calc_string",
381+
)
382+
self.assertEqual(len(results), 1)
383+
self.assertEqual(results[0], {"result": 42})
384+
shutil.rmtree(tmpdir)
385+
386+
387+
class TestDataflowRunContexts(AsyncTestCase):
388+
async def test_run(self):
389+
tmpdir = tempfile.mkdtemp()
390+
handle, dataflow_file = tempfile.mkstemp(suffix=".json", dir=tmpdir)
391+
os.close(handle)
392+
393+
with open(dataflow_file, mode="w+b") as dataflow_file:
394+
dataflow = io.StringIO()
395+
with contextlib.redirect_stdout(dataflow):
396+
await Dataflow.cli(
397+
"create",
398+
"-configloader",
399+
"json",
400+
*map(lambda op: op.name, OPERATIONS),
401+
)
402+
dataflow_file.write(dataflow.getvalue().encode())
403+
dataflow_file.seek(0)
404+
test_contexts = {"add 40 and 2": 42, "multiply 42 and 10": 420}
405+
results = await Dataflow.cli(
406+
"run",
407+
"contexts",
408+
"-dataflow",
409+
dataflow_file.name,
410+
"-context-def",
411+
"calc_string",
412+
"-contexts",
413+
*test_contexts.keys(),
414+
"-input",
415+
'["result"]=get_single_spec',
416+
)
417+
self.assertCountEqual(
418+
results,
419+
[
420+
{ctx_string: {"result": result}}
421+
for ctx_string, result in test_contexts.items()
422+
],
423+
)
424+
shutil.rmtree(tmpdir)
425+
426+
353427
class TestTrain(RecordsTestCase):
354428
async def test_run(self):
355429
await Train.cli(

0 commit comments

Comments
 (0)