Skip to content

Commit 60d5332

Browse files
committed
update author and manage workflow how-tos
Signed-off-by: Hannah Hunter <[email protected]>
1 parent b74c247 commit 60d5332

File tree

2 files changed

+200
-110
lines changed

2 files changed

+200
-110
lines changed

daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md

Lines changed: 186 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -36,31 +36,17 @@ The Dapr sidecar doesn’t load any workflow definitions. Rather, the sidecar si
3636

3737
<!--python-->
3838

39-
Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates task chaining activities that receive input
39+
Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates a counter (activity) called `hello_act` that notifies users of the current counter value. `hello_act` is a function derived from a class called `WorkflowActivityContext`.
4040

4141
```python
42-
@wfr.activity(name='step10')
43-
def step1(ctx, activity_input):
44-
print(f'Step 1: Received input: {activity_input}.')
45-
# Do some work
46-
return activity_input + 1
47-
48-
49-
@wfr.activity
50-
def step2(ctx, activity_input):
51-
print(f'Step 2: Received input: {activity_input}.')
52-
# Do some work
53-
return activity_input * 2
54-
55-
56-
@wfr.activity
57-
def step3(ctx, activity_input):
58-
print(f'Step 3: Received input: {activity_input}.')
59-
# Do some work
60-
return activity_input ^ 2
42+
@wfr.activity(name='hello_act')
43+
def hello_act(ctx: WorkflowActivityContext, wf_input):
44+
global counter
45+
counter += wf_input
46+
print(f'New counter value is: {counter}!', flush=True)
6147
```
6248

63-
[See the task chaining workflow activity in context.](https://github.com/dapr/python-sdk/blob/main/examples/workflow/task_chaining.py)
49+
[See the task chaining workflow activity in context.](https://github.com/dapr/python-sdk/blob/main/examples/workflow/simple.py)
6450

6551

6652
{{% /codetab %}}
@@ -241,22 +227,32 @@ Next, register and call the activites in a workflow.
241227

242228
<!--python-->
243229

244-
The `random_workflow` function is a task chaining workflow pattern derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities.
230+
The `hello_world_wf` function is a function derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities.
245231

246232
```python
247-
@wfr.workflow(name='random_workflow')
248-
def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
249-
try:
250-
result1 = yield ctx.call_activity(step1, input=wf_input)
251-
result2 = yield ctx.call_activity(step2, input=result1)
252-
result3 = yield ctx.call_activity(step3, input=result2)
253-
except Exception as e:
254-
yield ctx.call_activity(error_handler, input=str(e))
255-
raise
256-
return [result1, result2, result3]
233+
@wfr.workflow(name='hello_world_wf')
234+
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
235+
print(f'{wf_input}')
236+
yield ctx.call_activity(hello_act, input=1)
237+
yield ctx.call_activity(hello_act, input=10)
238+
yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
239+
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)
240+
241+
# Change in event handling: Use when_any to handle both event and timeout
242+
event = ctx.wait_for_external_event(event_name)
243+
timeout = ctx.create_timer(timedelta(seconds=30))
244+
winner = yield when_any([event, timeout])
245+
246+
if winner == timeout:
247+
print('Workflow timed out waiting for event')
248+
return 'Timeout'
249+
250+
yield ctx.call_activity(hello_act, input=100)
251+
yield ctx.call_activity(hello_act, input=1000)
252+
return 'Completed'
257253
```
258254

259-
[See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL32C1-L38C51)
255+
[See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/main/examples/workflow/simple.py)
260256

261257

262258
{{% /codetab %}}
@@ -423,84 +419,177 @@ Finally, compose the application using the workflow.
423419

424420
<!--python-->
425421

426-
[In the following example](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py), for a basic Python hello world application using the Python SDK, your project code would include:
422+
[In the following example](https://github.com/dapr/python-sdk/blob/main/examples/workflow/simple.py), for a basic Python hello world application using the Python SDK, your project code would include:
427423

428424
- A Python package called `DaprClient` to receive the Python SDK capabilities.
429425
- A builder with extensions called:
430426
- `WorkflowRuntime`: Allows you to register the workflow runtime.
431-
- `DaprWorkflowContext`: Allows you to [create workflows and workflow activities]({{< ref "#write-the-workflow" >}})
427+
- `DaprWorkflowContext`: Allows you to [create workflows]({{< ref "#write-the-workflow" >}})
432428
- `WorkflowActivityContext`: Allows you to [create workflow activities]({{< ref "#write-the-workflow-activities" >}})
433-
- API calls. In the example below, these calls start, pause, resume, purge, and terminate the workflow.
429+
- API calls. In the example below, these calls start, pause, resume, purge, and completing the workflow.
434430

435431
```python
436-
from durabletask import worker, task
437-
438-
from dapr.ext.workflow.workflow_context import Workflow
439-
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext
440-
from dapr.ext.workflow.workflow_activity_context import Activity, WorkflowActivityContext
441-
from dapr.ext.workflow.util import getAddress
442-
443-
from dapr.clients import DaprInternalError
444-
from dapr.clients.http.client import DAPR_API_TOKEN_HEADER
445-
from dapr.conf import settings
446-
from dapr.conf.helpers import GrpcEndpoint
447-
from dapr.ext.workflow.logger import LoggerOptions, Logger
448-
449-
wfr = wf.WorkflowRuntime()
450-
451-
@wfr.workflow(name='hello_world_wf')
452-
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
453-
# Workflow definition...
454-
455-
@wfr.activity(name='hello_act')
456-
def hello_act(ctx: WorkflowActivityContext, wf_input):
457-
# Activity definition...
458-
459-
# Start workflow
460-
wfr = WorkflowRuntime()
461-
wfr.start()
462-
wf_client = DaprWorkflowClient()
463-
464-
# ...
465-
466-
# Pause workflow
467-
wf_client.pause_workflow(instance_id=instance_id)
468-
metadata = wf_client.get_workflow_state(instance_id=instance_id)
469-
# ... check status ...
470-
wf_client.resume_workflow(instance_id=instance_id)
471-
472-
sleep(1)
473-
474-
# Raise workflow
475-
wf_client.raise_workflow_event(
476-
instance_id=instance_id,
477-
event_name=event_name,
478-
data=event_data
479-
)
480-
481-
# Purge workflow
482-
state = wf_client.wait_for_workflow_completion(
483-
instance_id,
484-
timeout_in_seconds=30
485-
)
486-
wf_client.purge_workflow(instance_id=instance_id)
487-
488-
workflowRuntime.shutdown()
432+
from datetime import timedelta
433+
from time import sleep
434+
from dapr.ext.workflow import (
435+
WorkflowRuntime,
436+
DaprWorkflowContext,
437+
WorkflowActivityContext,
438+
RetryPolicy,
439+
DaprWorkflowClient,
440+
when_any,
441+
)
442+
from dapr.conf import Settings
443+
from dapr.clients.exceptions import DaprInternalError
444+
445+
settings = Settings()
446+
447+
counter = 0
448+
retry_count = 0
449+
child_orchestrator_count = 0
450+
child_orchestrator_string = ''
451+
child_act_retry_count = 0
452+
instance_id = 'exampleInstanceID'
453+
child_instance_id = 'childInstanceID'
454+
workflow_name = 'hello_world_wf'
455+
child_workflow_name = 'child_wf'
456+
input_data = 'Hi Counter!'
457+
event_name = 'event1'
458+
event_data = 'eventData'
459+
non_existent_id_error = 'no such instance exists'
460+
461+
retry_policy = RetryPolicy(
462+
first_retry_interval=timedelta(seconds=1),
463+
max_number_of_attempts=3,
464+
backoff_coefficient=2,
465+
max_retry_interval=timedelta(seconds=10),
466+
retry_timeout=timedelta(seconds=100),
467+
)
489468

490-
if __name__ == '__main__':
469+
wfr = WorkflowRuntime()
470+
471+
472+
@wfr.workflow(name='hello_world_wf')
473+
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
474+
print(f'{wf_input}')
475+
yield ctx.call_activity(hello_act, input=1)
476+
yield ctx.call_activity(hello_act, input=10)
477+
yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
478+
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)
479+
480+
# Change in event handling: Use when_any to handle both event and timeout
481+
event = ctx.wait_for_external_event(event_name)
482+
timeout = ctx.create_timer(timedelta(seconds=30))
483+
winner = yield when_any([event, timeout])
484+
485+
if winner == timeout:
486+
print('Workflow timed out waiting for event')
487+
return 'Timeout'
488+
489+
yield ctx.call_activity(hello_act, input=100)
490+
yield ctx.call_activity(hello_act, input=1000)
491+
return 'Completed'
492+
493+
494+
@wfr.activity(name='hello_act')
495+
def hello_act(ctx: WorkflowActivityContext, wf_input):
496+
global counter
497+
counter += wf_input
498+
print(f'New counter value is: {counter}!', flush=True)
499+
500+
501+
@wfr.activity(name='hello_retryable_act')
502+
def hello_retryable_act(ctx: WorkflowActivityContext):
503+
global retry_count
504+
if (retry_count % 2) == 0:
505+
print(f'Retry count value is: {retry_count}!', flush=True)
506+
retry_count += 1
507+
raise ValueError('Retryable Error')
508+
print(f'Retry count value is: {retry_count}! This print statement verifies retry', flush=True)
509+
retry_count += 1
510+
511+
512+
@wfr.workflow(name='child_retryable_wf')
513+
def child_retryable_wf(ctx: DaprWorkflowContext):
514+
global child_orchestrator_string, child_orchestrator_count
515+
if not ctx.is_replaying:
516+
child_orchestrator_count += 1
517+
print(f'Appending {child_orchestrator_count} to child_orchestrator_string!', flush=True)
518+
child_orchestrator_string += str(child_orchestrator_count)
519+
yield ctx.call_activity(
520+
act_for_child_wf, input=child_orchestrator_count, retry_policy=retry_policy
521+
)
522+
if child_orchestrator_count < 3:
523+
raise ValueError('Retryable Error')
524+
525+
526+
@wfr.activity(name='act_for_child_wf')
527+
def act_for_child_wf(ctx: WorkflowActivityContext, inp):
528+
global child_orchestrator_string, child_act_retry_count
529+
inp_char = chr(96 + inp)
530+
print(f'Appending {inp_char} to child_orchestrator_string!', flush=True)
531+
child_orchestrator_string += inp_char
532+
if child_act_retry_count % 2 == 0:
533+
child_act_retry_count += 1
534+
raise ValueError('Retryable Error')
535+
child_act_retry_count += 1
536+
537+
538+
def main():
491539
wfr.start()
492-
sleep(10) # wait for workflow runtime to start
540+
wf_client = DaprWorkflowClient()
541+
542+
print('==========Start Counter Increase as per Input:==========')
543+
wf_client.schedule_new_workflow(
544+
workflow=hello_world_wf, input=input_data, instance_id=instance_id
545+
)
546+
547+
wf_client.wait_for_workflow_start(instance_id)
493548

494-
wf_client = wf.DaprWorkflowClient()
495-
instance_id = wf_client.schedule_new_workflow(workflow=task_chain_workflow, input=42)
496-
print(f'Workflow started. Instance ID: {instance_id}')
497-
state = wf_client.wait_for_workflow_completion(instance_id)
498-
print(f'Workflow completed! Status: {state.runtime_status}')
549+
# Sleep to let the workflow run initial activities
550+
sleep(12)
551+
552+
assert counter == 11
553+
assert retry_count == 2
554+
assert child_orchestrator_string == '1aa2bb3cc'
555+
556+
# Pause Test
557+
wf_client.pause_workflow(instance_id=instance_id)
558+
metadata = wf_client.get_workflow_state(instance_id=instance_id)
559+
print(f'Get response from {workflow_name} after pause call: {metadata.runtime_status.name}')
560+
561+
# Resume Test
562+
wf_client.resume_workflow(instance_id=instance_id)
563+
metadata = wf_client.get_workflow_state(instance_id=instance_id)
564+
print(f'Get response from {workflow_name} after resume call: {metadata.runtime_status.name}')
565+
566+
sleep(2) # Give the workflow time to reach the event wait state
567+
wf_client.raise_workflow_event(instance_id=instance_id, event_name=event_name, data=event_data)
568+
569+
print('========= Waiting for Workflow completion', flush=True)
570+
try:
571+
state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
572+
if state.runtime_status.name == 'COMPLETED':
573+
print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"')))
574+
else:
575+
print(f'Workflow failed! Status: {state.runtime_status.name}')
576+
except TimeoutError:
577+
print('*** Workflow timed out!')
578+
579+
wf_client.purge_workflow(instance_id=instance_id)
580+
try:
581+
wf_client.get_workflow_state(instance_id=instance_id)
582+
except DaprInternalError as err:
583+
if non_existent_id_error in err._message:
584+
print('Instance Successfully Purged')
499585

500586
wfr.shutdown()
501-
```
502587

503588

589+
if __name__ == '__main__':
590+
main()
591+
```
592+
504593
{{% /codetab %}}
505594

506595
{{% codetab %}}

daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ Now that you've [authored the workflow and its activities in your application]({
1414
{{% codetab %}}
1515

1616
Manage your workflow within your code. In the workflow example from the [Author a workflow]({{< ref "howto-author-workflow.md#write-the-application" >}}) guide, the workflow is registered in the code using the following APIs:
17-
- **start_workflow**: Start an instance of a workflow
18-
- **get_workflow**: Get information on the status of the workflow
17+
- **schedule_new_workflow**: Start an instance of a workflow
18+
- **get_workflow_state**: Get information on the status of the workflow
1919
- **pause_workflow**: Pauses or suspends a workflow instance that can later be resumed
2020
- **resume_workflow**: Resumes a paused workflow instance
2121
- **raise_workflow_event**: Raise an event on a workflow
2222
- **purge_workflow**: Removes all metadata related to a specific workflow instance
23-
- **terminate_workflow**: Terminate or stop a particular instance of a workflow
23+
- **wait_for_workflow_completion**: Complete a particular instance of a workflow
2424

2525
```python
2626
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
@@ -34,27 +34,28 @@ eventName = "event1"
3434
eventData = "eventData"
3535

3636
# Start the workflow
37-
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
38-
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
37+
wf_client.schedule_new_workflow(
38+
workflow=hello_world_wf, input=input_data, instance_id=instance_id
39+
)
3940

4041
# Get info on the workflow
41-
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
42+
wf_client.get_workflow_state(instance_id=instance_id)
4243

4344
# Pause the workflow
44-
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
45+
wf_client.pause_workflow(instance_id=instance_id)
46+
metadata = wf_client.get_workflow_state(instance_id=instance_id)
4547

4648
# Resume the workflow
47-
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
49+
wf_client.resume_workflow(instance_id=instance_id)
4850

4951
# Raise an event on the workflow.
50-
d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
51-
event_name=eventName, event_data=eventData)
52+
wf_client.raise_workflow_event(instance_id=instance_id, event_name=event_name, data=event_data)
5253

5354
# Purge the workflow
54-
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
55+
wf_client.purge_workflow(instance_id=instance_id)
5556

56-
# Terminate the workflow
57-
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
57+
# Wait for workflow completion
58+
wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
5859
```
5960

6061
{{% /codetab %}}

0 commit comments

Comments
 (0)