diff --git a/examples/demo_workflow/app.py b/examples/demo_workflow/app.py index 8f67fdbd..cc82a81e 100644 --- a/examples/demo_workflow/app.py +++ b/examples/demo_workflow/app.py @@ -139,6 +139,8 @@ def main(): # Pause Test d.pause_workflow(instance_id=instance_id, workflow_component=workflow_component) + sleep(3) + get_response = d.get_workflow( instance_id=instance_id, workflow_component=workflow_component ) @@ -146,6 +148,8 @@ def main(): # Resume Test d.resume_workflow(instance_id=instance_id, workflow_component=workflow_component) + sleep(3) + get_response = d.get_workflow( instance_id=instance_id, workflow_component=workflow_component ) diff --git a/examples/workflow/monitor.py b/examples/workflow/monitor.py index 6bdb6cc3..6cf575cf 100644 --- a/examples/workflow/monitor.py +++ b/examples/workflow/monitor.py @@ -68,6 +68,7 @@ def send_alert(ctx, message: str): status = wf_client.get_workflow_state(job_id) except Exception: pass + if not status or status.runtime_status.name != 'RUNNING': # TODO update to use reuse_id_policy instance_id = wf_client.schedule_new_workflow( diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py index b9865344..6a24d343 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py @@ -24,6 +24,7 @@ from dapr.ext.workflow.workflow_state import WorkflowState from dapr.ext.workflow.workflow_context import Workflow from dapr.ext.workflow.util import getAddress +from grpc import RpcError from dapr.clients import DaprInternalError from dapr.clients.http.client import DAPR_API_TOKEN_HEADER @@ -130,8 +131,17 @@ def get_workflow_state( exist. """ - state = self.__obj.get_orchestration_state(instance_id, fetch_payloads=fetch_payloads) - return WorkflowState(state) if state else None + try: + state = self.__obj.get_orchestration_state(instance_id, fetch_payloads=fetch_payloads) + return WorkflowState(state) if state else None + except RpcError as error: + if 'no such instance exists' in error.details(): + self._logger.warning(f'Workflow instance not found: {instance_id}') + return None + self._logger.error( + f'Unhandled RPC error while fetching workflow state: {error.code()} - {error.details()}' + ) + raise def wait_for_workflow_start( self, instance_id: str, *, fetch_payloads: bool = False, timeout_in_seconds: int = 60 diff --git a/ext/dapr-ext-workflow/tests/test_workflow_client.py b/ext/dapr-ext-workflow/tests/test_workflow_client.py index e1c9b772..984c97cd 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_client.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_client.py @@ -21,6 +21,7 @@ from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient from durabletask import client import durabletask.internal.orchestrator_service_pb2 as pb +from grpc import RpcError mock_schedule_result = 'workflow001' mock_raise_event_result = 'event001' @@ -29,6 +30,19 @@ mock_resume_result = 'resume001' mock_purge_result = 'purge001' mock_instance_id = 'instance001' +wf_status = 'not-found' + + +class SimulatedRpcError(RpcError): + def __init__(self, code, details): + self._code = code + self._details = details + + def code(self): + return self._code + + def details(self): + return self._details class FakeTaskHubGrpcClient: @@ -43,7 +57,15 @@ def schedule_new_orchestration( return mock_schedule_result def get_orchestration_state(self, instance_id, fetch_payloads): - return self._inner_get_orchestration_state(instance_id, client.OrchestrationStatus.PENDING) + global wf_status + if wf_status == 'not-found': + raise SimulatedRpcError(code='UNKNOWN', details='no such instance exists') + elif wf_status == 'found': + return self._inner_get_orchestration_state( + instance_id, client.OrchestrationStatus.PENDING + ) + else: + raise SimulatedRpcError(code='UNKNOWN', details='unknown error') def wait_for_orchestration_start(self, instance_id, fetch_payloads, timeout): return self._inner_get_orchestration_state(instance_id, client.OrchestrationStatus.RUNNING) @@ -100,6 +122,20 @@ def test_client_functions(self): ) assert actual_schedule_result == mock_schedule_result + global wf_status + wf_status = 'not-found' + actual_get_result = wfClient.get_workflow_state( + instance_id=mock_instance_id, fetch_payloads=True + ) + assert actual_get_result is None + + wf_status = 'error' + with self.assertRaises(RpcError): + wfClient.get_workflow_state(instance_id=mock_instance_id, fetch_payloads=True) + + assert actual_get_result is None + + wf_status = 'found' actual_get_result = wfClient.get_workflow_state( instance_id=mock_instance_id, fetch_payloads=True )