Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/demo_workflow/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,17 @@ def main():

# Pause Test
d.pause_workflow(instance_id=instance_id, workflow_component=workflow_component)
sleep(3)

get_response = d.get_workflow(
instance_id=instance_id, workflow_component=workflow_component
)
print(f'Get response from {workflow_name} after pause call: {get_response.runtime_status}')

# Resume Test
d.resume_workflow(instance_id=instance_id, workflow_component=workflow_component)
sleep(3)

get_response = d.get_workflow(
instance_id=instance_id, workflow_component=workflow_component
)
Expand Down
1 change: 1 addition & 0 deletions examples/workflow/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def send_alert(ctx, message: str):
status = wf_client.get_workflow_state(job_id)
except Exception:
pass

if not status or status.runtime_status.name != 'RUNNING':
# TODO update to use reuse_id_policy
instance_id = wf_client.schedule_new_workflow(
Expand Down
14 changes: 12 additions & 2 deletions ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from dapr.ext.workflow.workflow_state import WorkflowState
from dapr.ext.workflow.workflow_context import Workflow
from dapr.ext.workflow.util import getAddress
from grpc import RpcError

from dapr.clients import DaprInternalError
from dapr.clients.http.client import DAPR_API_TOKEN_HEADER
Expand Down Expand Up @@ -130,8 +131,17 @@
exist.
"""
state = self.__obj.get_orchestration_state(instance_id, fetch_payloads=fetch_payloads)
return WorkflowState(state) if state else None
try:
state = self.__obj.get_orchestration_state(instance_id, fetch_payloads=fetch_payloads)
return WorkflowState(state) if state else None
except RpcError as error:
if 'no such instance exists' in error.details():
self._logger.warning(f'Workflow instance not found: {instance_id}')
return None
self._logger.error(

Check warning on line 141 in ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py#L141

Added line #L141 was not covered by tests
f'Unhandled RPC error while fetching workflow state: {error.code()} - {error.details()}'
)
raise

Check warning on line 144 in ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py#L144

Added line #L144 was not covered by tests

def wait_for_workflow_start(
self, instance_id: str, *, fetch_payloads: bool = False, timeout_in_seconds: int = 60
Expand Down
25 changes: 25 additions & 0 deletions ext/dapr-ext-workflow/tests/test_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient
from durabletask import client
import durabletask.internal.orchestrator_service_pb2 as pb
from grpc import RpcError

mock_schedule_result = 'workflow001'
mock_raise_event_result = 'event001'
Expand All @@ -29,6 +30,19 @@
mock_resume_result = 'resume001'
mock_purge_result = 'purge001'
mock_instance_id = 'instance001'
wf_exists = False


class SimulatedRpcError(RpcError):
def __init__(self, code, details):
self._code = code
self._details = details

def code(self):
return self._code

Check warning on line 42 in ext/dapr-ext-workflow/tests/test_workflow_client.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/tests/test_workflow_client.py#L42

Added line #L42 was not covered by tests

def details(self):
return self._details


class FakeTaskHubGrpcClient:
Expand All @@ -43,6 +57,10 @@
return mock_schedule_result

def get_orchestration_state(self, instance_id, fetch_payloads):
global wf_exists
if not wf_exists:
raise SimulatedRpcError(code='UNKNOWN', details='no such instance exists')

return self._inner_get_orchestration_state(instance_id, client.OrchestrationStatus.PENDING)

def wait_for_orchestration_start(self, instance_id, fetch_payloads, timeout):
Expand Down Expand Up @@ -100,6 +118,13 @@
)
assert actual_schedule_result == mock_schedule_result

actual_get_result = wfClient.get_workflow_state(
instance_id=mock_instance_id, fetch_payloads=True
)
assert actual_get_result is None

global wf_exists
wf_exists = True
actual_get_result = wfClient.get_workflow_state(
instance_id=mock_instance_id, fetch_payloads=True
)
Expand Down