diff --git a/patterns/timeout/handle_timeout_workflow.py b/patterns/timeout/handle_timeout_workflow.py new file mode 100644 index 0000000..2a5747e --- /dev/null +++ b/patterns/timeout/handle_timeout_workflow.py @@ -0,0 +1,65 @@ +from typing import List, Dict, Any, Optional +from datetime import timedelta + +from iwf.workflow_state import WorkflowState +from iwf.state_decision import StateDecision +from iwf.object_workflow import ObjectWorkflow +from iwf.workflow_context import WorkflowContext +from iwf.state_def import StateDef +from iwf.command.command_request import CommandRequest +from iwf.command.command_results import CommandResults +from iwf.command.timer_command import TimerCommand + + +class InitState(WorkflowState): + def get_state_options(self): + return None + + async def execute(self, context: WorkflowContext, input_data: Optional[Any]) -> StateDecision: + # Create a timer command that will trigger after 10 seconds + timer_command = TimerCommand(fire_after=timedelta(seconds=10)) + command_request = CommandRequest(timer_commands=[timer_command]) + + # Execute the command + command_results = await context.command_client.execute_command(command_request) + + # Check if timer has fired + if command_results.timer_results and command_results.timer_results[0].fired: + # Timer fired, go to timeout state + return StateDecision.single_next_state("TimeoutState") + else: + # Timer not fired, go to task state + return StateDecision.single_next_state("TaskState") + + +class TimeoutState(WorkflowState): + def get_state_options(self): + return None + + async def execute(self, context: WorkflowContext, input_data: Optional[Any]) -> StateDecision: + # Handle timeout logic here + print("Handling timeout...") + + # Complete the workflow + return StateDecision.graceful_complete_workflow(None) + + +class TaskState(WorkflowState): + def get_state_options(self): + return None + + async def execute(self, context: WorkflowContext, input_data: Optional[Any]) -> StateDecision: + # Perform the task + print("Performing the task...") + + # Complete the workflow + return StateDecision.graceful_complete_workflow(None) + + +class HandlingTimeoutWorkflow(ObjectWorkflow): + def get_workflow_states(self) -> List[StateDef]: + return [ + StateDef.starting_state(InitState()), + StateDef.non_starting_state(TimeoutState()), + StateDef.non_starting_state(TaskState()) + ] \ No newline at end of file diff --git a/patterns/timeout/iwf-config.py b/patterns/timeout/iwf-config.py new file mode 100644 index 0000000..a3a7fa4 --- /dev/null +++ b/patterns/timeout/iwf-config.py @@ -0,0 +1,5 @@ +# IWF server configuration +IWF_SERVER_URL = "http://localhost:8801" + +# Registry configuration +REGISTRY_NAMESPACE = "default" \ No newline at end of file diff --git a/patterns/timeout/main.py b/patterns/timeout/main.py new file mode 100644 index 0000000..6f48720 --- /dev/null +++ b/patterns/timeout/main.py @@ -0,0 +1,48 @@ +import asyncio +import sys +from iwf.client import Client +from iwf.registry import Registry +from iwf.iwf_api.models import WorkflowOptions + +from handle_timeout_workflow import HandlingTimeoutWorkflow +from iwf_config import IWF_SERVER_URL, REGISTRY_NAMESPACE + +# Create registry +registry = Registry(REGISTRY_NAMESPACE) + +# Register workflow +registry.add_workflow(HandlingTimeoutWorkflow()) + +# Create client +client = Client(IWF_SERVER_URL) + + +async def start_workflow(): + # Start a workflow + workflow_id = "handling-timeout-workflow-" + str(hash(str(sys.argv))) + workflow_options = WorkflowOptions(workflow_id=workflow_id) + + # Start the workflow with no input + await client.start_workflow( + workflow_type=HandlingTimeoutWorkflow.__name__, + workflow_options=workflow_options, + workflow_input=None + ) + print(f"Started workflow with ID: {workflow_id}") + + +async def run_worker(): + # Start the worker + await client.start_worker(registry) + + +async def main(): + # Start a workflow + await start_workflow() + + # Run the worker + await run_worker() + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file