Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ Flask>=1.1
# needed for auto fix
ruff===0.2.2
# needed for dapr-ext-workflow
durabletask-dapr >= 0.2.0a4
durabletask-dapr >= 0.2.0a7
66 changes: 66 additions & 0 deletions examples/workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,72 @@ pip3 install -r requirements.txt

Each of the examples in this directory can be run directly from the command line.

### Simple Workflow
This example represents a workflow that manages counters through a series of activities and child workflows.
It shows several Dapr Workflow features including:
- Basic activity execution with counter increments
- Retryable activities with configurable retry policies
- Child workflow orchestration with retry logic
- External event handling with timeouts
- Workflow state management (pause/resume)
- Activity error handling and retry backoff
- Global state tracking across workflow components
- Workflow lifecycle management (start, pause, resume, purge)

<!--STEP
name: Run the simple workflow example
expected_stdout_lines:
- "== APP == Hi Counter!"
- "== APP == New counter value is: 1!"
- "== APP == New counter value is: 11!"
- "== APP == Retry count value is: 0!"
- "== APP == Retry count value is: 1! This print statement verifies retry"
- "== APP == Appending 1 to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending 2 to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending 3 to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Get response from hello_world_wf after pause call: SUSPENDED"
- "== APP == Get response from hello_world_wf after resume call: RUNNING"
- "== APP == New counter value is: 111!"
- "== APP == New counter value is: 1111!"
- "== APP == Workflow completed! Result: Completed"
timeout_seconds: 30
-->

```sh
dapr run --app-id wf-simple-example --dapr-grpc-port 50001 -- python3 simple.py
```
<!--END_STEP-->

The output of this example should look like this:

```
- "== APP == Hi Counter!"
- "== APP == New counter value is: 1!"
- "== APP == New counter value is: 11!"
- "== APP == Retry count value is: 0!"
- "== APP == Retry count value is: 1! This print statement verifies retry"
- "== APP == Appending 1 to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending 2 to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending 3 to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Get response from hello_world_wf after pause call: SUSPENDED"
- "== APP == Get response from hello_world_wf after resume call: RUNNING"
- "== APP == New counter value is: 111!"
- "== APP == New counter value is: 1111!"
- "== APP == Workflow completed! Result: Completed"
```

### Task Chaining

This example demonstrates how to chain "activity" tasks together in a workflow. You can run this sample using the following command:
Expand Down
170 changes: 170 additions & 0 deletions examples/workflow/simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
# -*- coding: utf-8 -*-
# Copyright 2025 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import timedelta
from time import sleep
from dapr.ext.workflow import (
WorkflowRuntime,
DaprWorkflowContext,
WorkflowActivityContext,
RetryPolicy,
DaprWorkflowClient,
when_any,
)
from dapr.conf import Settings
from dapr.clients.exceptions import DaprInternalError

settings = Settings()

counter = 0
retry_count = 0
child_orchestrator_count = 0
child_orchestrator_string = ''
child_act_retry_count = 0
instance_id = 'exampleInstanceID'
child_instance_id = 'childInstanceID'
workflow_name = 'hello_world_wf'
child_workflow_name = 'child_wf'
input_data = 'Hi Counter!'
event_name = 'event1'
event_data = 'eventData'
non_existent_id_error = 'no such instance exists'

retry_policy = RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_number_of_attempts=3,
backoff_coefficient=2,
max_retry_interval=timedelta(seconds=10),
retry_timeout=timedelta(seconds=100),
)

wfr = WorkflowRuntime()


@wfr.workflow(name='hello_world_wf')
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
print(f'{wf_input}')
yield ctx.call_activity(hello_act, input=1)
yield ctx.call_activity(hello_act, input=10)
yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)

# Change in event handling: Use when_any to handle both event and timeout
event = ctx.wait_for_external_event(event_name)
timeout = ctx.create_timer(timedelta(seconds=30))
winner = yield when_any([event, timeout])

if winner == timeout:
print('Workflow timed out waiting for event')
return 'Timeout'

yield ctx.call_activity(hello_act, input=100)
yield ctx.call_activity(hello_act, input=1000)
return 'Completed'


@wfr.activity(name='hello_act')
def hello_act(ctx: WorkflowActivityContext, wf_input):
global counter
counter += wf_input
print(f'New counter value is: {counter}!', flush=True)


@wfr.activity(name='hello_retryable_act')
def hello_retryable_act(ctx: WorkflowActivityContext):
global retry_count
if (retry_count % 2) == 0:
print(f'Retry count value is: {retry_count}!', flush=True)
retry_count += 1
raise ValueError('Retryable Error')
print(f'Retry count value is: {retry_count}! This print statement verifies retry', flush=True)
retry_count += 1


@wfr.workflow(name='child_retryable_wf')
def child_retryable_wf(ctx: DaprWorkflowContext):
global child_orchestrator_string, child_orchestrator_count
if not ctx.is_replaying:
child_orchestrator_count += 1
print(f'Appending {child_orchestrator_count} to child_orchestrator_string!', flush=True)
child_orchestrator_string += str(child_orchestrator_count)
yield ctx.call_activity(
act_for_child_wf, input=child_orchestrator_count, retry_policy=retry_policy
)
if child_orchestrator_count < 3:
raise ValueError('Retryable Error')


@wfr.activity(name='act_for_child_wf')
def act_for_child_wf(ctx: WorkflowActivityContext, inp):
global child_orchestrator_string, child_act_retry_count
inp_char = chr(96 + inp)
print(f'Appending {inp_char} to child_orchestrator_string!', flush=True)
child_orchestrator_string += inp_char
if child_act_retry_count % 2 == 0:
child_act_retry_count += 1
raise ValueError('Retryable Error')
child_act_retry_count += 1


def main():
wfr.start()
wf_client = DaprWorkflowClient()

print('==========Start Counter Increase as per Input:==========')
wf_client.schedule_new_workflow(
workflow=hello_world_wf, input=input_data, instance_id=instance_id
)

wf_client.wait_for_workflow_start(instance_id)

# Sleep to let the workflow run initial activities
sleep(12)

assert counter == 11
assert retry_count == 2
assert child_orchestrator_string == '1aa2bb3cc'

# Pause Test
wf_client.pause_workflow(instance_id=instance_id)
metadata = wf_client.get_workflow_state(instance_id=instance_id)
print(f'Get response from {workflow_name} after pause call: {metadata.runtime_status.name}')

# Resume Test
wf_client.resume_workflow(instance_id=instance_id)
metadata = wf_client.get_workflow_state(instance_id=instance_id)
print(f'Get response from {workflow_name} after resume call: {metadata.runtime_status.name}')

sleep(2) # Give the workflow time to reach the event wait state
wf_client.raise_workflow_event(instance_id=instance_id, event_name=event_name, data=event_data)

print('========= Waiting for Workflow completion', flush=True)
try:
state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
if state.runtime_status.name == 'COMPLETED':
print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"')))
else:
print(f'Workflow failed! Status: {state.runtime_status.name}')
except TimeoutError:
print('*** Workflow timed out!')

wf_client.purge_workflow(instance_id=instance_id)
try:
wf_client.get_workflow_state(instance_id=instance_id)
except DaprInternalError as err:
if non_existent_id_error in err._message:
print('Instance Successfully Purged')

wfr.shutdown()


if __name__ == '__main__':
main()
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def get_workflow_state(
"""Fetches runtime state for the specified workflow instance.

Args:
instanceId: The unique ID of the workflow instance to fetch.
instance_id: The unique ID of the workflow instance to fetch.
fetch_payloads: If true, fetches the input, output payloads and custom status
for the workflow instance. Defaults to true.

Expand All @@ -144,7 +144,7 @@ def get_workflow_state(
raise

def wait_for_workflow_start(
self, instance_id: str, *, fetch_payloads: bool = False, timeout_in_seconds: int = 60
self, instance_id: str, *, fetch_payloads: bool = False, timeout_in_seconds: int = 0
) -> Optional[WorkflowState]:
"""Waits for a workflow to start running and returns a WorkflowState object that contains
metadata about the started workflow.
Expand All @@ -158,7 +158,7 @@ def wait_for_workflow_start(
fetch_payloads: If true, fetches the input, output payloads and custom status for
the workflow instance. Defaults to false.
timeout_in_seconds: The maximum time to wait for the workflow instance to start running.
Defaults to 60 seconds.
Defaults to meaning no timeout.

Returns:
WorkflowState record that describes the workflow instance and its execution status.
Expand All @@ -170,7 +170,7 @@ def wait_for_workflow_start(
return WorkflowState(state) if state else None

def wait_for_workflow_completion(
self, instance_id: str, *, fetch_payloads: bool = True, timeout_in_seconds: int = 60
self, instance_id: str, *, fetch_payloads: bool = True, timeout_in_seconds: int = 0
) -> Optional[WorkflowState]:
"""Waits for a workflow to complete and returns a WorkflowState object that contains
metadata about the started instance.
Expand All @@ -192,7 +192,7 @@ def wait_for_workflow_completion(
fetch_payloads: If true, fetches the input, output payloads and custom status
for the workflow instance. Defaults to true.
timeout_in_seconds: The maximum time in seconds to wait for the workflow instance to
complete. Defaults to 60 seconds.
complete. Defaults to 0 seconds, meaning no timeout.

Returns:
WorkflowState record that describes the workflow instance and its execution status.
Expand Down Expand Up @@ -222,8 +222,8 @@ def raise_workflow_event(
discarded.

Args:
instanceId: The ID of the workflow instance that will handle the event.
eventName: The name of the event. Event names are case-insensitive.
instance_id: The ID of the workflow instance that will handle the event.
event_name: The name of the event. Event names are case-insensitive.
data: The serializable data payload to include with the event.
"""
return self.__obj.raise_orchestration_event(instance_id, event_name, data=data)
Expand Down
2 changes: 1 addition & 1 deletion ext/dapr-ext-workflow/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ packages = find_namespace:
include_package_data = True
install_requires =
dapr >= 1.15.0rc2
durabletask-dapr >= 0.2.0a4
durabletask-dapr >= 0.2.0a7

[options.packages.find]
include =
Expand Down