11from typing import Dict , Any
22
33from dffml .base import config
4- from dffml .df .base import op
4+ from dffml .df .base import op , OperationImplementationContext
55from dffml .df .types import DataFlow , Input , Definition
66
77
@@ -21,24 +21,24 @@ class RunDataFlowConfig:
2121 config_cls = RunDataFlowConfig ,
2222 expand = ["results" ],
2323)
24- async def run_dataflow (self , inputs : Dict [ str , Any ]) -> Dict [ str , Any ] :
24+ class run_dataflow (OperationImplementationContext ) :
2525 """
2626 Starts a subflow ``self.config.dataflow`` and adds ``inputs`` in it.
2727
2828 Parameters
29- ----------
29+ ++++++++++
3030 inputs : dict
3131 The inputs to add to the subflow. These should be a key value mapping of
3232 the context string to the inputs which should be seeded for that context
3333 string.
3434
3535 Returns
36- -------
36+ +++++++
3737 dict
3838 Maps context strings in inputs to output after running through dataflow.
3939
4040 Examples
41- --------
41+ ++++++++
4242
4343 >>> URL = Definition(name="URL", primitive="string")
4444 >>>
@@ -81,21 +81,24 @@ async def run_dataflow(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
8181 >>> asyncio.run(main())
8282 {'flow_results': {'dffml': {'URL': 'https://github.com/intel/dffml'}}}
8383 """
84- inputs_created = {}
85- definitions = self .config .dataflow .definitions
8684
87- for ctx_str , val_defs in inputs .items ():
88- inputs_created [ctx_str ] = [
89- Input (
90- value = val_def ["value" ],
91- definition = definitions [val_def ["definition" ]],
92- )
93- for val_def in val_defs
94- ]
95- async with self .subflow (self .config .dataflow ) as octx :
96- results = [
97- {(await ctx .handle ()).as_string (): result }
98- async for ctx , result in octx .run (inputs_created )
99- ]
85+ async def run (self , inputs : Dict [str , Any ]) -> Dict [str , Any ]:
86+ inputs = inputs ["inputs" ]
87+ inputs_created = {}
88+ definitions = self .config .dataflow .definitions
10089
101- return {"results" : results }
90+ for ctx_str , val_defs in inputs .items ():
91+ inputs_created [ctx_str ] = [
92+ Input (
93+ value = val_def ["value" ],
94+ definition = definitions [val_def ["definition" ]],
95+ )
96+ for val_def in val_defs
97+ ]
98+ async with self .subflow (self .config .dataflow ) as octx :
99+ results = [
100+ {(await ctx .handle ()).as_string (): result }
101+ async for ctx , result in octx .run (inputs_created )
102+ ]
103+
104+ return {"results" : results }
0 commit comments