From 2336cf0d5386be4990590fae4288be06e849a4a6 Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Tue, 28 Jan 2025 12:32:39 +0000 Subject: [PATCH 1/6] Catch error in sdk when workflow instance not found Signed-off-by: Elena Kolevska --- examples/workflow/monitor.py | 6 +----- .../dapr/ext/workflow/dapr_workflow_client.py | 8 ++++++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/examples/workflow/monitor.py b/examples/workflow/monitor.py index 6bdb6cc3..ad6ff1ef 100644 --- a/examples/workflow/monitor.py +++ b/examples/workflow/monitor.py @@ -63,11 +63,7 @@ def send_alert(ctx, message: str): wf_client = wf.DaprWorkflowClient() job_id = 'job1' - status = None - try: - status = wf_client.get_workflow_state(job_id) - except Exception: - pass + status = wf_client.get_workflow_state(job_id) if not status or status.runtime_status.name != 'RUNNING': # TODO update to use reuse_id_policy instance_id = wf_client.schedule_new_workflow( diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py index b9865344..b41c15da 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py @@ -130,8 +130,12 @@ def get_workflow_state( exist. """ - state = self.__obj.get_orchestration_state(instance_id, fetch_payloads=fetch_payloads) - return WorkflowState(state) if state else None + try: + state = self.__obj.get_orchestration_state(instance_id, fetch_payloads=fetch_payloads) + return WorkflowState(state) if state else None + except Exception as error: + self._logger.error(f'Error fetching workflow state: {error}') + return None def wait_for_workflow_start( self, instance_id: str, *, fetch_payloads: bool = False, timeout_in_seconds: int = 60 From fb7ce53b1c562bd05431c35dd4863c1487ad1229 Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Tue, 28 Jan 2025 12:38:36 +0000 Subject: [PATCH 2/6] fixes demo workflow example test Signed-off-by: Elena Kolevska --- examples/demo_workflow/app.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/examples/demo_workflow/app.py b/examples/demo_workflow/app.py index 8f67fdbd..8a59f0ed 100644 --- a/examples/demo_workflow/app.py +++ b/examples/demo_workflow/app.py @@ -139,6 +139,8 @@ def main(): # Pause Test d.pause_workflow(instance_id=instance_id, workflow_component=workflow_component) + sleep(3) + get_response = d.get_workflow( instance_id=instance_id, workflow_component=workflow_component ) @@ -146,6 +148,8 @@ def main(): # Resume Test d.resume_workflow(instance_id=instance_id, workflow_component=workflow_component) + sleep(3) + get_response = d.get_workflow( instance_id=instance_id, workflow_component=workflow_component ) From 538a926a0b45bf818174f49ebce987faeb9cc43d Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Tue, 28 Jan 2025 13:52:13 +0000 Subject: [PATCH 3/6] Only return None for the correct error Signed-off-by: Elena Kolevska --- examples/workflow/monitor.py | 7 ++++++- .../dapr/ext/workflow/dapr_workflow_client.py | 10 +++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/examples/workflow/monitor.py b/examples/workflow/monitor.py index ad6ff1ef..6cf575cf 100644 --- a/examples/workflow/monitor.py +++ b/examples/workflow/monitor.py @@ -63,7 +63,12 @@ def send_alert(ctx, message: str): wf_client = wf.DaprWorkflowClient() job_id = 'job1' - status = wf_client.get_workflow_state(job_id) + status = None + try: + status = wf_client.get_workflow_state(job_id) + except Exception: + pass + if not status or status.runtime_status.name != 'RUNNING': # TODO update to use reuse_id_policy instance_id = wf_client.schedule_new_workflow( diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py index b41c15da..2a1792ae 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py @@ -24,6 +24,7 @@ from dapr.ext.workflow.workflow_state import WorkflowState from dapr.ext.workflow.workflow_context import Workflow from dapr.ext.workflow.util import getAddress +from grpc import RpcError from dapr.clients import DaprInternalError from dapr.clients.http.client import DAPR_API_TOKEN_HEADER @@ -133,9 +134,12 @@ def get_workflow_state( try: state = self.__obj.get_orchestration_state(instance_id, fetch_payloads=fetch_payloads) return WorkflowState(state) if state else None - except Exception as error: - self._logger.error(f'Error fetching workflow state: {error}') - return None + except RpcError as error: + if 'no such instance exists' in error.details(): + self._logger.warning(f"Workflow instance not found: {instance_id}") + return None + self._logger.error(f"Unhandled RPC error while fetching workflow state: {error.code()} - {error.details()}") + raise error def wait_for_workflow_start( self, instance_id: str, *, fetch_payloads: bool = False, timeout_in_seconds: int = 60 From 1d27a16f264fe0f6fd6c4296793ed87bbe7d2a99 Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Tue, 28 Jan 2025 14:05:45 +0000 Subject: [PATCH 4/6] Adds test Signed-off-by: Elena Kolevska --- .../dapr/ext/workflow/dapr_workflow_client.py | 2 +- .../tests/test_workflow_client.py | 25 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py index 2a1792ae..0460669f 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py @@ -139,7 +139,7 @@ def get_workflow_state( self._logger.warning(f"Workflow instance not found: {instance_id}") return None self._logger.error(f"Unhandled RPC error while fetching workflow state: {error.code()} - {error.details()}") - raise error + raise def wait_for_workflow_start( self, instance_id: str, *, fetch_payloads: bool = False, timeout_in_seconds: int = 60 diff --git a/ext/dapr-ext-workflow/tests/test_workflow_client.py b/ext/dapr-ext-workflow/tests/test_workflow_client.py index e1c9b772..39bf5c94 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_client.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_client.py @@ -21,6 +21,7 @@ from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient from durabletask import client import durabletask.internal.orchestrator_service_pb2 as pb +from grpc import StatusCode, RpcError mock_schedule_result = 'workflow001' mock_raise_event_result = 'event001' @@ -29,6 +30,18 @@ mock_resume_result = 'resume001' mock_purge_result = 'purge001' mock_instance_id = 'instance001' +wf_exists = False + +class SimulatedRpcError(RpcError): + def __init__(self, code, details): + self._code = code + self._details = details + + def code(self): + return self._code + + def details(self): + return self._details class FakeTaskHubGrpcClient: @@ -43,8 +56,13 @@ def schedule_new_orchestration( return mock_schedule_result def get_orchestration_state(self, instance_id, fetch_payloads): + global wf_exists + if not wf_exists: + raise SimulatedRpcError(code="UNKNOWN", details="no such instance exists") + return self._inner_get_orchestration_state(instance_id, client.OrchestrationStatus.PENDING) + def wait_for_orchestration_start(self, instance_id, fetch_payloads, timeout): return self._inner_get_orchestration_state(instance_id, client.OrchestrationStatus.RUNNING) @@ -100,6 +118,13 @@ def test_client_functions(self): ) assert actual_schedule_result == mock_schedule_result + actual_get_result = wfClient.get_workflow_state(instance_id=mock_instance_id, + fetch_payloads=True) + assert actual_get_result is None + + + global wf_exists + wf_exists = True actual_get_result = wfClient.get_workflow_state( instance_id=mock_instance_id, fetch_payloads=True ) From 01c75b4ee9cbd47ce084061e2a46f9a3563f138c Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Tue, 28 Jan 2025 14:26:54 +0000 Subject: [PATCH 5/6] Linter Signed-off-by: Elena Kolevska --- examples/demo_workflow/app.py | 2 +- .../dapr/ext/workflow/dapr_workflow_client.py | 6 ++++-- ext/dapr-ext-workflow/tests/test_workflow_client.py | 12 ++++++------ 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/examples/demo_workflow/app.py b/examples/demo_workflow/app.py index 8a59f0ed..cc82a81e 100644 --- a/examples/demo_workflow/app.py +++ b/examples/demo_workflow/app.py @@ -149,7 +149,7 @@ def main(): # Resume Test d.resume_workflow(instance_id=instance_id, workflow_component=workflow_component) sleep(3) - + get_response = d.get_workflow( instance_id=instance_id, workflow_component=workflow_component ) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py index 0460669f..6a24d343 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py @@ -136,9 +136,11 @@ def get_workflow_state( return WorkflowState(state) if state else None except RpcError as error: if 'no such instance exists' in error.details(): - self._logger.warning(f"Workflow instance not found: {instance_id}") + self._logger.warning(f'Workflow instance not found: {instance_id}') return None - self._logger.error(f"Unhandled RPC error while fetching workflow state: {error.code()} - {error.details()}") + self._logger.error( + f'Unhandled RPC error while fetching workflow state: {error.code()} - {error.details()}' + ) raise def wait_for_workflow_start( diff --git a/ext/dapr-ext-workflow/tests/test_workflow_client.py b/ext/dapr-ext-workflow/tests/test_workflow_client.py index 39bf5c94..689f6268 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_client.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_client.py @@ -21,7 +21,7 @@ from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient from durabletask import client import durabletask.internal.orchestrator_service_pb2 as pb -from grpc import StatusCode, RpcError +from grpc import RpcError mock_schedule_result = 'workflow001' mock_raise_event_result = 'event001' @@ -32,6 +32,7 @@ mock_instance_id = 'instance001' wf_exists = False + class SimulatedRpcError(RpcError): def __init__(self, code, details): self._code = code @@ -58,11 +59,10 @@ def schedule_new_orchestration( def get_orchestration_state(self, instance_id, fetch_payloads): global wf_exists if not wf_exists: - raise SimulatedRpcError(code="UNKNOWN", details="no such instance exists") + raise SimulatedRpcError(code='UNKNOWN', details='no such instance exists') return self._inner_get_orchestration_state(instance_id, client.OrchestrationStatus.PENDING) - def wait_for_orchestration_start(self, instance_id, fetch_payloads, timeout): return self._inner_get_orchestration_state(instance_id, client.OrchestrationStatus.RUNNING) @@ -118,11 +118,11 @@ def test_client_functions(self): ) assert actual_schedule_result == mock_schedule_result - actual_get_result = wfClient.get_workflow_state(instance_id=mock_instance_id, - fetch_payloads=True) + actual_get_result = wfClient.get_workflow_state( + instance_id=mock_instance_id, fetch_payloads=True + ) assert actual_get_result is None - global wf_exists wf_exists = True actual_get_result = wfClient.get_workflow_state( From 5fd9ae904388d786204088ea9fdae1496b53052d Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Tue, 28 Jan 2025 22:14:50 +0000 Subject: [PATCH 6/6] Extends test Signed-off-by: Elena Kolevska --- .../tests/test_workflow_client.py | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/ext/dapr-ext-workflow/tests/test_workflow_client.py b/ext/dapr-ext-workflow/tests/test_workflow_client.py index 689f6268..984c97cd 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_client.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_client.py @@ -30,7 +30,7 @@ mock_resume_result = 'resume001' mock_purge_result = 'purge001' mock_instance_id = 'instance001' -wf_exists = False +wf_status = 'not-found' class SimulatedRpcError(RpcError): @@ -57,11 +57,15 @@ def schedule_new_orchestration( return mock_schedule_result def get_orchestration_state(self, instance_id, fetch_payloads): - global wf_exists - if not wf_exists: + global wf_status + if wf_status == 'not-found': raise SimulatedRpcError(code='UNKNOWN', details='no such instance exists') - - return self._inner_get_orchestration_state(instance_id, client.OrchestrationStatus.PENDING) + elif wf_status == 'found': + return self._inner_get_orchestration_state( + instance_id, client.OrchestrationStatus.PENDING + ) + else: + raise SimulatedRpcError(code='UNKNOWN', details='unknown error') def wait_for_orchestration_start(self, instance_id, fetch_payloads, timeout): return self._inner_get_orchestration_state(instance_id, client.OrchestrationStatus.RUNNING) @@ -118,13 +122,20 @@ def test_client_functions(self): ) assert actual_schedule_result == mock_schedule_result + global wf_status + wf_status = 'not-found' actual_get_result = wfClient.get_workflow_state( instance_id=mock_instance_id, fetch_payloads=True ) assert actual_get_result is None - global wf_exists - wf_exists = True + wf_status = 'error' + with self.assertRaises(RpcError): + wfClient.get_workflow_state(instance_id=mock_instance_id, fetch_payloads=True) + + assert actual_get_result is None + + wf_status = 'found' actual_get_result = wfClient.get_workflow_state( instance_id=mock_instance_id, fetch_payloads=True )