|
| 1 | +import asyncio |
| 2 | +import concurrent.futures |
| 3 | +from typing import Dict, Any |
| 4 | + |
| 5 | +from dffml.df.types import Operation, Definition |
| 6 | +from dffml.df.base import ( |
| 7 | + op, |
| 8 | + OperationImplementationContext, |
| 9 | + OperationImplementation, |
| 10 | +) |
| 11 | + |
| 12 | + |
| 13 | +# Definitions |
| 14 | +UserInput = Definition(name="UserInput", primitive="str") |
| 15 | +DataToPrint = Definition(name="DataToPrint", primitive="str") |
| 16 | + |
| 17 | +AcceptUserInput = Operation( |
| 18 | + name="AcceptUserInput", |
| 19 | + inputs={}, |
| 20 | + outputs={"InputData": UserInput}, |
| 21 | + conditions=[], |
| 22 | +) |
| 23 | + |
| 24 | + |
| 25 | +class AcceptUserInputContext(OperationImplementationContext): |
| 26 | + @staticmethod |
| 27 | + def receive_input(): |
| 28 | + return input() |
| 29 | + |
| 30 | + async def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]: |
| 31 | + user_input = await self.parent.loop.run_in_executor( |
| 32 | + self.parent.pool, self.receive_input |
| 33 | + ) |
| 34 | + return {"InputData": {"data": user_input}} |
| 35 | + |
| 36 | + |
| 37 | +class AcceptUserInput(OperationImplementation): |
| 38 | + """ |
| 39 | + Accept input from stdin using python input() |
| 40 | +
|
| 41 | + Parameters |
| 42 | + ++++++++++ |
| 43 | + inputs : dict |
| 44 | + A dictionary with a key and empty list as value. |
| 45 | +
|
| 46 | + Returns |
| 47 | + +++++++ |
| 48 | + dict |
| 49 | + A dictionary containing user input. |
| 50 | +
|
| 51 | + Examples |
| 52 | + ++++++++ |
| 53 | +
|
| 54 | + The following example shows how to use AcceptUserInput. |
| 55 | + >>> dataflow = DataFlow.auto(AcceptUserInput, GetSingle) |
| 56 | + >>> dataflow.seed.append( |
| 57 | + ... Input( |
| 58 | + ... value=[AcceptUserInput.op.outputs["InputData"].name], |
| 59 | + ... definition=GetSingle.op.inputs["spec"] |
| 60 | + ... ) |
| 61 | + ... ) |
| 62 | + >>> |
| 63 | + >>> async def main(): |
| 64 | + ... async for ctx, results in MemoryOrchestrator.run(dataflow, {"input":[]}): |
| 65 | + ... print(results) |
| 66 | + >>> |
| 67 | + >>> asyncio.run(main()) |
| 68 | + {'UserInput': {'data': 'Data flow is awesome'}} |
| 69 | + """ |
| 70 | + |
| 71 | + op = AcceptUserInput |
| 72 | + CONTEXT = AcceptUserInputContext |
| 73 | + |
| 74 | + def __init__(self, *args, **kwargs): |
| 75 | + super().__init__(*args, **kwargs) |
| 76 | + self.loop = None |
| 77 | + self.pool = None |
| 78 | + self.__pool = None |
| 79 | + |
| 80 | + async def __aenter__(self) -> "OperationImplementationContext": |
| 81 | + self.loop = asyncio.get_event_loop() |
| 82 | + self.pool = concurrent.futures.ThreadPoolExecutor() |
| 83 | + self.__pool = self.pool.__enter__() |
| 84 | + return self |
| 85 | + |
| 86 | + async def __aexit__(self, exc_type, exc_value, traceback): |
| 87 | + self.__pool.__exit__(exc_type, exc_value, traceback) |
| 88 | + self.__pool = None |
| 89 | + self.pool = None |
| 90 | + self.loop = None |
| 91 | + |
| 92 | + |
| 93 | +@op( |
| 94 | + inputs={"data": DataToPrint}, outputs={}, conditions=[], |
| 95 | +) |
| 96 | +async def print_output(data: str): |
| 97 | + """ |
| 98 | + Print the output on stdout using python print() |
| 99 | +
|
| 100 | + Parameters |
| 101 | + ++++++++++ |
| 102 | + inputs : list |
| 103 | + A list of Inputs whose value is to be printed. |
| 104 | +
|
| 105 | + Examples |
| 106 | + ++++++++ |
| 107 | +
|
| 108 | + The following example shows how to use print_output. |
| 109 | + >>> dataflow = DataFlow.auto(print_output, GetSingle) |
| 110 | + >>> inputs = [ |
| 111 | + ... Input( |
| 112 | + ... value="print_output example", |
| 113 | + ... definition=dataflow.definitions["DataToPrint"], |
| 114 | + ... parents=None,)] |
| 115 | + >>> |
| 116 | + >>> async def main(): |
| 117 | + ... async for ctx, results in MemoryOrchestrator.run(dataflow, inputs): |
| 118 | + ... print("String to be printed is 'print_output example'") |
| 119 | + >>> |
| 120 | + >>> asyncio.run(main()) |
| 121 | + print_output example |
| 122 | + String to be printed is 'print_output example' |
| 123 | + """ |
| 124 | + print("\n" + data) |
0 commit comments