Skip to content
This repository was archived by the owner on Aug 25, 2024. It is now read-only.

Commit 81b7aa1

Browse files
committed
df: operation: Add example for run_dataflow
Signed-off-by: John Andersen <[email protected]>
1 parent 1f24d6f commit 81b7aa1

File tree

6 files changed

+136
-22
lines changed

6 files changed

+136
-22
lines changed

dffml/base.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,12 @@ def __init__(self):
279279
self.dest = None
280280

281281

282+
class ConfigAndKWArgsMutuallyExclusive(Exception):
283+
"""
284+
Raised when both kwargs and config are specified.
285+
"""
286+
287+
282288
class BaseConfigurableMetaClass(type, abc.ABC):
283289
def __new__(cls, name, bases, props, module=None):
284290
# Create the class
@@ -298,7 +304,9 @@ def wrap(cls, func):
298304

299305
@functools.wraps(func)
300306
def wrapper(self, config: Optional[BaseConfig] = None, **kwargs):
301-
if config is None and hasattr(self, "CONFIG") and kwargs:
307+
if config is not None and len(kwargs):
308+
raise ConfigAndKWArgsMutuallyExclusive
309+
elif config is None and hasattr(self, "CONFIG") and kwargs:
302310
try:
303311
config = self.CONFIG(**kwargs)
304312
except TypeError as error:

dffml/df/base.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -788,4 +788,13 @@ async def run_operations(
788788

789789
@base_entry_point("dffml.orchestrator", "orchestrator")
790790
class BaseOrchestrator(BaseDataFlowObject):
791-
pass # pragma: no cov
791+
@classmethod
792+
async def run(cls, dataflow, inputs, *, config=None, **kwargs):
793+
if config is None:
794+
self = cls.withconfig({})
795+
else:
796+
self = cls(config=config, **kwargs)
797+
async with self as orchestrator:
798+
async with orchestrator(dataflow) as octx:
799+
async for ctx, results in octx.run(inputs):
800+
yield ctx, results

dffml/operation/dataflow.py

Lines changed: 57 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,63 @@ class RunDataFlowConfig:
2323
)
2424
async def run_dataflow(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
2525
"""
26-
Starts a subflow `self.config.dataflow` and runs `inputs` in it.
27-
28-
Parameters:
29-
inputs: Dict[str,Any] ->
30-
eg: {
31-
"ctx_str" : [
32-
{
33-
"value":val1,
34-
"defintion":defintion1
35-
},
36-
{
37-
"value":val2,
38-
"defintion":defintion2
39-
}
40-
]
41-
}
42-
Returns:
43-
Dict[str,Any] -> maps context strings in inputs to output after running
44-
through dataflow
26+
Starts a subflow ``self.config.dataflow`` and adds ``inputs`` in it.
27+
28+
Parameters
29+
----------
30+
inputs : dict
31+
The inputs to add to the subflow. These should be a key value mapping of
32+
the context string to the inputs which should be seeded for that context
33+
string.
34+
35+
Returns
36+
-------
37+
dict
38+
Maps context strings in inputs to output after running through dataflow.
39+
40+
Examples
41+
--------
42+
43+
>>> URL = Definition(name="URL", primitive="string")
44+
>>>
45+
>>> subflow = DataFlow.auto(GetSingle)
46+
>>> subflow.definitions[URL.name] = URL
47+
>>> subflow.seed.append(
48+
... Input(
49+
... value=[URL.name],
50+
... definition=GetSingle.op.inputs["spec"]
51+
... )
52+
... )
53+
>>>
54+
>>> dataflow = DataFlow.auto(run_dataflow, GetSingle)
55+
>>> dataflow.configs[run_dataflow.imp.op.name] = RunDataFlowConfig(subflow)
56+
>>> dataflow.seed.append(
57+
... Input(
58+
... value=[run_dataflow.imp.op.outputs["results"].name],
59+
... definition=GetSingle.op.inputs["spec"]
60+
... )
61+
... )
62+
>>>
63+
>>> async def main():
64+
... async for ctx, results in MemoryOrchestrator.run(dataflow, {
65+
... "run_subflow": [
66+
... Input(
67+
... value={
68+
... "dffml": [
69+
... {
70+
... "value": "https://github.com/intel/dffml",
71+
... "definition": URL.name
72+
... }
73+
... ]
74+
... },
75+
... definition=run_dataflow.imp.op.inputs["inputs"]
76+
... )
77+
... ]
78+
... }):
79+
... print(results)
80+
>>>
81+
>>> asyncio.run(main())
82+
{'flow_results': {'dffml': {'URL': 'https://github.com/intel/dffml'}}}
4583
"""
4684
inputs_created = {}
4785
definitions = self.config.dataflow.definitions

docs/doctest_header.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,5 @@
2424
from dffml.df.types import *
2525
from dffml.df.memory import *
2626
from dffml.util.net import *
27+
from dffml.operation.output import *
28+
from dffml.operation.dataflow import *

docs/plugins/dffml_operation.rst

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,63 @@ dffml.dataflow.run
4141

4242
*Official*
4343

44-
No description
44+
Starts a subflow ``self.config.dataflow`` and adds ``inputs`` in it.
45+
46+
Parameters
47+
----------
48+
inputs : dict
49+
The inputs to add to the subflow. These should be a key value mapping of
50+
the context string to the inputs which should be seeded for that context
51+
string.
52+
53+
Returns
54+
-------
55+
dict
56+
Maps context strings in inputs to output after running through dataflow.
57+
58+
Examples
59+
--------
60+
61+
>>> URL = Definition(name="URL", primitive="string")
62+
>>>
63+
>>> subflow = DataFlow.auto(GetSingle)
64+
>>> subflow.definitions[URL.name] = URL
65+
>>> subflow.seed.append(
66+
... Input(
67+
... value=[URL.name],
68+
... definition=GetSingle.op.inputs["spec"]
69+
... )
70+
... )
71+
>>>
72+
>>> dataflow = DataFlow.auto(run_dataflow, GetSingle)
73+
>>> dataflow.configs[run_dataflow.imp.op.name] = RunDataFlowConfig(subflow)
74+
>>> dataflow.seed.append(
75+
... Input(
76+
... value=[run_dataflow.imp.op.outputs["results"].name],
77+
... definition=GetSingle.op.inputs["spec"]
78+
... )
79+
... )
80+
>>>
81+
>>> async def main():
82+
... async for ctx, results in MemoryOrchestrator.run(dataflow, {
83+
... "run_subflow": [
84+
... Input(
85+
... value={
86+
... "dffml": [
87+
... {
88+
... "value": "https://github.com/intel/dffml",
89+
... "definition": URL.name
90+
... }
91+
... ]
92+
... },
93+
... definition=run_dataflow.imp.op.inputs["inputs"]
94+
... )
95+
... ]
96+
... }):
97+
... print(results)
98+
>>>
99+
>>> asyncio.run(main())
100+
{'flow_results': {'dffml': {'URL': 'https://github.com/intel/dffml'}}}
45101

46102
**Stage: processing**
47103

scripts/doctest.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@
22
set -e
33

44
rm -rf doctest
5+
python3.7 scripts/docs.py
56
python3.7 -c 'import os, pkg_resources; [e.load() for e in pkg_resources.iter_entry_points("console_scripts") if e.name.startswith("sphinx-build")][0]()' -b doctest docs doctest

0 commit comments

Comments
 (0)