Skip to content
This repository was archived by the owner on Aug 25, 2024. It is now read-only.

Commit 0f30866

Browse files
committed
operation: dataflow: run: Run dataflow as operation
Fixes: #415 Signed-off-by: John Andersen <[email protected]>
1 parent 75bd5fa commit 0f30866

File tree

5 files changed

+209
-6
lines changed

5 files changed

+209
-6
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1212
- Logistic Regression with SAG optimizer
1313
- Test tensorflow DNNEstimator documentation exaples in CI
1414
- Add python code for tensorflow DNNEstimator
15+
- Ability to run a subflow as if it were an operation using the
16+
`dffml.dataflow.run` operation.
1517
### Fixed
1618
- New model tutorial mentions file paths that should be edited.
1719
- DataFlow is no longer a dataclass to prevent it from being exported

dffml/operation/dataflow.py

Lines changed: 110 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,33 @@
55
from dffml.df.types import DataFlow, Input, Definition
66

77

8+
class InvalidCustomRunDataFlowContext(Exception):
9+
"""
10+
Thrown when custom inputs for dffml.dataflow.run do not list an input with
11+
string as its primitive as the first input.
12+
"""
13+
14+
15+
class InvalidCustomRunDataFlowOutputs(Exception):
16+
"""
17+
Thrown when outputs for a custom dffml.dataflow.run do not match that of
18+
it's subflow.
19+
"""
20+
21+
822
@config
923
class RunDataFlowConfig:
1024
dataflow: DataFlow
1125

1226

27+
DEFAULT_INPUTS = {
28+
"inputs": Definition(name="flow_inputs", primitive="Dict[str,Any]")
29+
}
30+
31+
1332
@op(
1433
name="dffml.dataflow.run",
15-
inputs={
16-
"inputs": Definition(name="flow_inputs", primitive="Dict[str,Any]")
17-
},
34+
inputs=DEFAULT_INPUTS,
1835
outputs={
1936
"results": Definition(name="flow_results", primitive="Dict[str,Any]")
2037
},
@@ -40,6 +57,8 @@ class run_dataflow(OperationImplementationContext):
4057
Examples
4158
++++++++
4259
60+
The following shows how to use run dataflow in its default behavior.
61+
4362
>>> URL = Definition(name="URL", primitive="string")
4463
>>>
4564
>>> subflow = DataFlow.auto(GetSingle)
@@ -80,10 +99,63 @@ class run_dataflow(OperationImplementationContext):
8099
>>>
81100
>>> asyncio.run(main())
82101
{'flow_results': {'dffml': {'URL': 'https://github.com/intel/dffml'}}}
102+
103+
The following shows how to use run dataflow with custom inputs and outputs.
104+
This allows you to run a subflow as if it were an opertion.
105+
106+
>>> URL = Definition(name="URL", primitive="string")
107+
>>>
108+
>>> @op(
109+
... inputs={"url": URL},
110+
... outputs={"last": Definition("last_element_in_path", primitive="string")},
111+
... )
112+
... def last_path(url):
113+
... return {"last": url.split("/")[-1]}
114+
>>>
115+
>>> subflow = DataFlow.auto(last_path, GetSingle)
116+
>>> subflow.seed.append(
117+
... Input(
118+
... value=[last_path.op.outputs["last"].name],
119+
... definition=GetSingle.op.inputs["spec"],
120+
... )
121+
... )
122+
>>>
123+
>>> dataflow = DataFlow.auto(run_dataflow, GetSingle)
124+
>>> dataflow.operations[run_dataflow.op.name] = run_dataflow.op._replace(
125+
... inputs={"URL": URL},
126+
... outputs={last_path.op.outputs["last"].name: last_path.op.outputs["last"]},
127+
... expand=[],
128+
... )
129+
>>> dataflow.configs[run_dataflow.op.name] = RunDataFlowConfig(subflow)
130+
>>> dataflow.seed.append(
131+
... Input(
132+
... value=[last_path.op.outputs["last"].name],
133+
... definition=GetSingle.op.inputs["spec"],
134+
... )
135+
... )
136+
>>> dataflow.update(auto_flow=True)
137+
>>>
138+
>>> async def main():
139+
... async for ctx, results in MemoryOrchestrator.run(
140+
... dataflow,
141+
... {
142+
... "run_subflow": [
143+
... Input(value="https://github.com/intel/dffml", definition=URL)
144+
... ]
145+
... },
146+
... ):
147+
... print(results)
148+
>>>
149+
>>> asyncio.run(main())
150+
{'last_element_in_path': 'dffml'}
83151
"""
84152

85-
async def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
86-
inputs = inputs["inputs"]
153+
async def run_default(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
154+
"""
155+
The default implementation for the dataflow.run operation is the uctx
156+
mode. This mode is when we map unique strings to a list of inputs to be
157+
given to the respective string's context.
158+
"""
87159
inputs_created = {}
88160
definitions = self.config.dataflow.definitions
89161

@@ -102,3 +174,36 @@ async def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
102174
]
103175

104176
return {"results": results}
177+
178+
async def run_custom(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
179+
# TODO Move string primitive validation into init of
180+
# an OperationImplementation (and then keep this as the context).
181+
ctx_input_name, ctx_definition = list(self.parent.op.inputs.items())[0]
182+
183+
if ctx_definition.primitive != "string":
184+
raise InvalidCustomRunDataFlowContext(ctx_definition.export())
185+
186+
subflow_inputs = {inputs[ctx_input_name]: []}
187+
188+
for input_name, value in inputs.items():
189+
definition = self.parent.op.inputs[input_name]
190+
subflow_inputs[inputs[ctx_input_name]].append(
191+
Input(value=value, definition=definition)
192+
)
193+
194+
op_outputs = sorted(self.parent.op.outputs.keys())
195+
196+
async with self.subflow(self.config.dataflow) as octx:
197+
async for ctx, result in octx.run(subflow_inputs):
198+
if op_outputs != sorted(result.keys()):
199+
raise InvalidCustomRunDataFlowOutputs(
200+
ctx_definition.export()
201+
)
202+
return result
203+
204+
async def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
205+
# Support redefinition of operation
206+
if self.parent.op.inputs == DEFAULT_INPUTS:
207+
return await self.run_default(inputs["inputs"])
208+
else:
209+
return await self.run_custom(inputs)

docs/plugins/dffml_operation.rst

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ dict
5858
Examples
5959
++++++++
6060

61+
The following shows how to use run dataflow in its default behavior.
62+
6163
>>> URL = Definition(name="URL", primitive="string")
6264
>>>
6365
>>> subflow = DataFlow.auto(GetSingle)
@@ -99,6 +101,55 @@ Examples
99101
>>> asyncio.run(main())
100102
{'flow_results': {'dffml': {'URL': 'https://github.com/intel/dffml'}}}
101103

104+
The following shows how to use run dataflow with custom inputs and outputs.
105+
This allows you to run a subflow as if it were an opertion.
106+
107+
>>> URL = Definition(name="URL", primitive="string")
108+
>>>
109+
>>> @op(
110+
... inputs={"url": URL},
111+
... outputs={"last": Definition("last_element_in_path", primitive="string")},
112+
... )
113+
... def last_path(url):
114+
... return {"last": url.split("/")[-1]}
115+
>>>
116+
>>> subflow = DataFlow.auto(last_path, GetSingle)
117+
>>> subflow.seed.append(
118+
... Input(
119+
... value=[last_path.op.outputs["last"].name],
120+
... definition=GetSingle.op.inputs["spec"],
121+
... )
122+
... )
123+
>>>
124+
>>> dataflow = DataFlow.auto(run_dataflow, GetSingle)
125+
>>> dataflow.operations[run_dataflow.op.name] = run_dataflow.op._replace(
126+
... inputs={"URL": URL},
127+
... outputs={last_path.op.outputs["last"].name: last_path.op.outputs["last"]},
128+
... expand=[],
129+
... )
130+
>>> dataflow.configs[run_dataflow.op.name] = RunDataFlowConfig(subflow)
131+
>>> dataflow.seed.append(
132+
... Input(
133+
... value=[last_path.op.outputs["last"].name],
134+
... definition=GetSingle.op.inputs["spec"],
135+
... )
136+
... )
137+
>>> dataflow.update(auto_flow=True)
138+
>>>
139+
>>> async def main():
140+
... async for ctx, results in MemoryOrchestrator.run(
141+
... dataflow,
142+
... {
143+
... "run_subflow": [
144+
... Input(value="https://github.com/intel/dffml", definition=URL)
145+
... ]
146+
... },
147+
... ):
148+
... print(results)
149+
>>>
150+
>>> asyncio.run(main())
151+
{'last_element_in_path': 'dffml'}
152+
102153
**Stage: processing**
103154

104155

tests/operation/test_dataflow.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import copy
2+
13
from dffml.df.types import DataFlow, Input
24
from dffml.df.memory import MemoryOrchestrator
35
from dffml.operation.dataflow import run_dataflow, RunDataFlowConfig
@@ -77,3 +79,46 @@ async def test_run(self):
7779
results = results["result"]
7880
expected_results = test_outputs[ctx_str]
7981
self.assertEqual(expected_results, results)
82+
83+
async def test_run_custom(self):
84+
output_definition = add.op.outputs["sum"]
85+
86+
get_single_spec_input = Input(
87+
value=[output_definition.name],
88+
definition=GetSingle.op.inputs["spec"],
89+
)
90+
91+
subflow = copy.deepcopy(DATAFLOW)
92+
subflow.seed.append(get_single_spec_input)
93+
94+
test_dataflow = DataFlow(
95+
operations={
96+
"run_dataflow": run_dataflow.op._replace(
97+
inputs=parse_line.op.inputs,
98+
outputs={output_definition.name: output_definition},
99+
),
100+
"get_single": GetSingle.imp.op,
101+
},
102+
configs={"run_dataflow": RunDataFlowConfig(dataflow=subflow)},
103+
seed=[get_single_spec_input],
104+
)
105+
106+
test_outputs = {"add 40 and 2": 42, "multiply 42 and 10": 420}
107+
108+
async with MemoryOrchestrator.withconfig({}) as orchestrator:
109+
async with orchestrator(test_dataflow) as octx:
110+
async for _ctx, results in octx.run(
111+
{
112+
input_line: [
113+
Input(
114+
value=input_line,
115+
definition=parse_line.op.inputs["line"],
116+
)
117+
]
118+
for input_line in test_outputs
119+
}
120+
):
121+
ctx_str = (await _ctx.handle()).as_string()
122+
results = results[output_definition.name]
123+
expected_results = test_outputs[ctx_str]
124+
self.assertEqual(expected_results, results)

tests/test_df.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from dffml.util.asynctestcase import AsyncTestCase
3434

3535
definitions = [
36-
Definition(name="calc_string", primitive="str"),
36+
Definition(name="calc_string", primitive="string"),
3737
Definition(name="is_add", primitive="bool"),
3838
Definition(name="is_mult", primitive="bool"),
3939
Definition(name="numbers", primitive="List[int]"),

0 commit comments

Comments
 (0)