Skip to content

Commit f612ab7

Browse files
authored
Merge pull request #4563 from hhunter-ms/issue_4410
Python SDK workflow updates
2 parents dd0fd38 + 79d0320 commit f612ab7

File tree

4 files changed

+216
-131
lines changed

4 files changed

+216
-131
lines changed

daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md

Lines changed: 178 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,14 @@ The Dapr sidecar doesn’t load any workflow definitions. Rather, the sidecar si
3939
Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates a counter (activity) called `hello_act` that notifies users of the current counter value. `hello_act` is a function derived from a class called `WorkflowActivityContext`.
4040

4141
```python
42-
def hello_act(ctx: WorkflowActivityContext, input):
42+
@wfr.activity(name='hello_act')
43+
def hello_act(ctx: WorkflowActivityContext, wf_input):
4344
global counter
44-
counter += input
45+
counter += wf_input
4546
print(f'New counter value is: {counter}!', flush=True)
4647
```
4748

48-
[See the `hello_act` workflow activity in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL40C1-L43C59)
49+
[See the task chaining workflow activity in context.](https://github.com/dapr/python-sdk/blob/main/examples/workflow/simple.py)
4950

5051

5152
{{% /codetab %}}
@@ -226,19 +227,32 @@ Next, register and call the activites in a workflow.
226227

227228
<!--python-->
228229

229-
The `hello_world_wf` function is derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities.
230+
The `hello_world_wf` function is a function derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities.
230231

231232
```python
232-
def hello_world_wf(ctx: DaprWorkflowContext, input):
233-
print(f'{input}')
233+
@wfr.workflow(name='hello_world_wf')
234+
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
235+
print(f'{wf_input}')
234236
yield ctx.call_activity(hello_act, input=1)
235237
yield ctx.call_activity(hello_act, input=10)
236-
yield ctx.wait_for_external_event("event1")
238+
yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
239+
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)
240+
241+
# Change in event handling: Use when_any to handle both event and timeout
242+
event = ctx.wait_for_external_event(event_name)
243+
timeout = ctx.create_timer(timedelta(seconds=30))
244+
winner = yield when_any([event, timeout])
245+
246+
if winner == timeout:
247+
print('Workflow timed out waiting for event')
248+
return 'Timeout'
249+
237250
yield ctx.call_activity(hello_act, input=100)
238251
yield ctx.call_activity(hello_act, input=1000)
252+
return 'Completed'
239253
```
240254

241-
[See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL32C1-L38C51)
255+
[See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/main/examples/workflow/simple.py)
242256

243257

244258
{{% /codetab %}}
@@ -405,89 +419,177 @@ Finally, compose the application using the workflow.
405419

406420
<!--python-->
407421

408-
[In the following example](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py), for a basic Python hello world application using the Python SDK, your project code would include:
422+
[In the following example](https://github.com/dapr/python-sdk/blob/main/examples/workflow/simple.py), for a basic Python hello world application using the Python SDK, your project code would include:
409423

410424
- A Python package called `DaprClient` to receive the Python SDK capabilities.
411425
- A builder with extensions called:
412-
- `WorkflowRuntime`: Allows you to register workflows and workflow activities
426+
- `WorkflowRuntime`: Allows you to register the workflow runtime.
413427
- `DaprWorkflowContext`: Allows you to [create workflows]({{< ref "#write-the-workflow" >}})
414428
- `WorkflowActivityContext`: Allows you to [create workflow activities]({{< ref "#write-the-workflow-activities" >}})
415-
- API calls. In the example below, these calls start, pause, resume, purge, and terminate the workflow.
429+
- API calls. In the example below, these calls start, pause, resume, purge, and completing the workflow.
416430

417431
```python
418-
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
419-
from dapr.clients import DaprClient
432+
from datetime import timedelta
433+
from time import sleep
434+
from dapr.ext.workflow import (
435+
WorkflowRuntime,
436+
DaprWorkflowContext,
437+
WorkflowActivityContext,
438+
RetryPolicy,
439+
DaprWorkflowClient,
440+
when_any,
441+
)
442+
from dapr.conf import Settings
443+
from dapr.clients.exceptions import DaprInternalError
444+
445+
settings = Settings()
446+
447+
counter = 0
448+
retry_count = 0
449+
child_orchestrator_count = 0
450+
child_orchestrator_string = ''
451+
child_act_retry_count = 0
452+
instance_id = 'exampleInstanceID'
453+
child_instance_id = 'childInstanceID'
454+
workflow_name = 'hello_world_wf'
455+
child_workflow_name = 'child_wf'
456+
input_data = 'Hi Counter!'
457+
event_name = 'event1'
458+
event_data = 'eventData'
459+
non_existent_id_error = 'no such instance exists'
460+
461+
retry_policy = RetryPolicy(
462+
first_retry_interval=timedelta(seconds=1),
463+
max_number_of_attempts=3,
464+
backoff_coefficient=2,
465+
max_retry_interval=timedelta(seconds=10),
466+
retry_timeout=timedelta(seconds=100),
467+
)
468+
469+
wfr = WorkflowRuntime()
470+
471+
472+
@wfr.workflow(name='hello_world_wf')
473+
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
474+
print(f'{wf_input}')
475+
yield ctx.call_activity(hello_act, input=1)
476+
yield ctx.call_activity(hello_act, input=10)
477+
yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
478+
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)
479+
480+
# Change in event handling: Use when_any to handle both event and timeout
481+
event = ctx.wait_for_external_event(event_name)
482+
timeout = ctx.create_timer(timedelta(seconds=30))
483+
winner = yield when_any([event, timeout])
484+
485+
if winner == timeout:
486+
print('Workflow timed out waiting for event')
487+
return 'Timeout'
488+
489+
yield ctx.call_activity(hello_act, input=100)
490+
yield ctx.call_activity(hello_act, input=1000)
491+
return 'Completed'
492+
493+
494+
@wfr.activity(name='hello_act')
495+
def hello_act(ctx: WorkflowActivityContext, wf_input):
496+
global counter
497+
counter += wf_input
498+
print(f'New counter value is: {counter}!', flush=True)
499+
500+
501+
@wfr.activity(name='hello_retryable_act')
502+
def hello_retryable_act(ctx: WorkflowActivityContext):
503+
global retry_count
504+
if (retry_count % 2) == 0:
505+
print(f'Retry count value is: {retry_count}!', flush=True)
506+
retry_count += 1
507+
raise ValueError('Retryable Error')
508+
print(f'Retry count value is: {retry_count}! This print statement verifies retry', flush=True)
509+
retry_count += 1
510+
511+
512+
@wfr.workflow(name='child_retryable_wf')
513+
def child_retryable_wf(ctx: DaprWorkflowContext):
514+
global child_orchestrator_string, child_orchestrator_count
515+
if not ctx.is_replaying:
516+
child_orchestrator_count += 1
517+
print(f'Appending {child_orchestrator_count} to child_orchestrator_string!', flush=True)
518+
child_orchestrator_string += str(child_orchestrator_count)
519+
yield ctx.call_activity(
520+
act_for_child_wf, input=child_orchestrator_count, retry_policy=retry_policy
521+
)
522+
if child_orchestrator_count < 3:
523+
raise ValueError('Retryable Error')
524+
525+
526+
@wfr.activity(name='act_for_child_wf')
527+
def act_for_child_wf(ctx: WorkflowActivityContext, inp):
528+
global child_orchestrator_string, child_act_retry_count
529+
inp_char = chr(96 + inp)
530+
print(f'Appending {inp_char} to child_orchestrator_string!', flush=True)
531+
child_orchestrator_string += inp_char
532+
if child_act_retry_count % 2 == 0:
533+
child_act_retry_count += 1
534+
raise ValueError('Retryable Error')
535+
child_act_retry_count += 1
420536

421-
# ...
422537

423538
def main():
424-
with DaprClient() as d:
425-
host = settings.DAPR_RUNTIME_HOST
426-
port = settings.DAPR_GRPC_PORT
427-
workflowRuntime = WorkflowRuntime(host, port)
428-
workflowRuntime = WorkflowRuntime()
429-
workflowRuntime.register_workflow(hello_world_wf)
430-
workflowRuntime.register_activity(hello_act)
431-
workflowRuntime.start()
432-
433-
# Start workflow
434-
print("==========Start Counter Increase as per Input:==========")
435-
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
436-
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
437-
print(f"start_resp {start_resp.instance_id}")
438-
439-
# ...
440-
441-
# Pause workflow
442-
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
443-
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
444-
print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}")
445-
446-
# Resume workflow
447-
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
448-
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
449-
print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}")
450-
451-
sleep(1)
452-
# Raise workflow
453-
d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
454-
event_name=eventName, event_data=eventData)
455-
456-
sleep(5)
457-
# Purge workflow
458-
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
459-
try:
460-
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
461-
except DaprInternalError as err:
462-
if nonExistentIDError in err._message:
463-
print("Instance Successfully Purged")
464-
465-
# Kick off another workflow for termination purposes
466-
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
467-
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
468-
print(f"start_resp {start_resp.instance_id}")
469-
470-
# Terminate workflow
471-
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
472-
sleep(1)
473-
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
474-
print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}")
475-
476-
# Purge workflow
477-
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
478-
try:
479-
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
480-
except DaprInternalError as err:
481-
if nonExistentIDError in err._message:
482-
print("Instance Successfully Purged")
483-
484-
workflowRuntime.shutdown()
539+
wfr.start()
540+
wf_client = DaprWorkflowClient()
541+
542+
print('==========Start Counter Increase as per Input:==========')
543+
wf_client.schedule_new_workflow(
544+
workflow=hello_world_wf, input=input_data, instance_id=instance_id
545+
)
546+
547+
wf_client.wait_for_workflow_start(instance_id)
548+
549+
# Sleep to let the workflow run initial activities
550+
sleep(12)
551+
552+
assert counter == 11
553+
assert retry_count == 2
554+
assert child_orchestrator_string == '1aa2bb3cc'
555+
556+
# Pause Test
557+
wf_client.pause_workflow(instance_id=instance_id)
558+
metadata = wf_client.get_workflow_state(instance_id=instance_id)
559+
print(f'Get response from {workflow_name} after pause call: {metadata.runtime_status.name}')
560+
561+
# Resume Test
562+
wf_client.resume_workflow(instance_id=instance_id)
563+
metadata = wf_client.get_workflow_state(instance_id=instance_id)
564+
print(f'Get response from {workflow_name} after resume call: {metadata.runtime_status.name}')
565+
566+
sleep(2) # Give the workflow time to reach the event wait state
567+
wf_client.raise_workflow_event(instance_id=instance_id, event_name=event_name, data=event_data)
568+
569+
print('========= Waiting for Workflow completion', flush=True)
570+
try:
571+
state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
572+
if state.runtime_status.name == 'COMPLETED':
573+
print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"')))
574+
else:
575+
print(f'Workflow failed! Status: {state.runtime_status.name}')
576+
except TimeoutError:
577+
print('*** Workflow timed out!')
578+
579+
wf_client.purge_workflow(instance_id=instance_id)
580+
try:
581+
wf_client.get_workflow_state(instance_id=instance_id)
582+
except DaprInternalError as err:
583+
if non_existent_id_error in err._message:
584+
print('Instance Successfully Purged')
585+
586+
wfr.shutdown()
587+
485588

486589
if __name__ == '__main__':
487590
main()
488591
```
489592

490-
491593
{{% /codetab %}}
492594

493595
{{% codetab %}}

daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ Now that you've [authored the workflow and its activities in your application]({
1414
{{% codetab %}}
1515

1616
Manage your workflow within your code. In the workflow example from the [Author a workflow]({{< ref "howto-author-workflow.md#write-the-application" >}}) guide, the workflow is registered in the code using the following APIs:
17-
- **start_workflow**: Start an instance of a workflow
18-
- **get_workflow**: Get information on the status of the workflow
17+
- **schedule_new_workflow**: Start an instance of a workflow
18+
- **get_workflow_state**: Get information on the status of the workflow
1919
- **pause_workflow**: Pauses or suspends a workflow instance that can later be resumed
2020
- **resume_workflow**: Resumes a paused workflow instance
2121
- **raise_workflow_event**: Raise an event on a workflow
2222
- **purge_workflow**: Removes all metadata related to a specific workflow instance
23-
- **terminate_workflow**: Terminate or stop a particular instance of a workflow
23+
- **wait_for_workflow_completion**: Complete a particular instance of a workflow
2424

2525
```python
2626
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
@@ -34,27 +34,28 @@ eventName = "event1"
3434
eventData = "eventData"
3535

3636
# Start the workflow
37-
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
38-
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
37+
wf_client.schedule_new_workflow(
38+
workflow=hello_world_wf, input=input_data, instance_id=instance_id
39+
)
3940

4041
# Get info on the workflow
41-
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
42+
wf_client.get_workflow_state(instance_id=instance_id)
4243

4344
# Pause the workflow
44-
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
45+
wf_client.pause_workflow(instance_id=instance_id)
46+
metadata = wf_client.get_workflow_state(instance_id=instance_id)
4547

4648
# Resume the workflow
47-
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
49+
wf_client.resume_workflow(instance_id=instance_id)
4850

4951
# Raise an event on the workflow.
50-
d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
51-
event_name=eventName, event_data=eventData)
52+
wf_client.raise_workflow_event(instance_id=instance_id, event_name=event_name, data=event_data)
5253

5354
# Purge the workflow
54-
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
55+
wf_client.purge_workflow(instance_id=instance_id)
5556

56-
# Terminate the workflow
57-
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
57+
# Wait for workflow completion
58+
wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
5859
```
5960

6061
{{% /codetab %}}

0 commit comments

Comments
 (0)