Skip to content

Commit d005a15

Browse files
authored
Implement multi-app workflows (#844)
* feat: Adds support for cross-app calls. Signed-off-by: Albert Callarisa <[email protected]> * Use durabletask alpha.9 Signed-off-by: Albert Callarisa <[email protected]> * Added examples for error scenarios in multi-app workflow Signed-off-by: Albert Callarisa <[email protected]> * Remove unnecessary hardcoded ports Signed-off-by: Albert Callarisa <[email protected]> --------- Signed-off-by: Albert Callarisa <[email protected]>
1 parent 99314a4 commit d005a15

File tree

9 files changed

+340
-27
lines changed

9 files changed

+340
-27
lines changed

dev-requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ Flask>=1.1
1515
# needed for auto fix
1616
ruff===0.2.2
1717
# needed for dapr-ext-workflow
18-
durabletask-dapr >= 0.2.0a8
18+
durabletask-dapr >= 0.2.0a9
1919
# needed for .env file loading in examples
2020
python-dotenv>=1.0.0
2121
# needed for enhanced schema generation from function features

examples/workflow/README.md

Lines changed: 133 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pip3 install -r requirements.txt
2020
Each of the examples in this directory can be run directly from the command line.
2121

2222
### Simple Workflow
23-
This example represents a workflow that manages counters through a series of activities and child workflows.
23+
This example represents a workflow that manages counters through a series of activities and child workflows.
2424
It shows several Dapr Workflow features including:
2525
- Basic activity execution with counter increments
2626
- Retryable activities with configurable retry policies
@@ -57,7 +57,7 @@ timeout_seconds: 30
5757
-->
5858

5959
```sh
60-
dapr run --app-id wf-simple-example --dapr-grpc-port 50001 -- python3 simple.py
60+
dapr run --app-id wf-simple-example -- python3 simple.py
6161
```
6262
<!--END_STEP-->
6363

@@ -99,7 +99,7 @@ timeout_seconds: 30
9999
-->
100100

101101
```sh
102-
dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 task_chaining.py
102+
dapr run --app-id wfexample -- python3 task_chaining.py
103103
```
104104
<!--END_STEP-->
105105

@@ -146,7 +146,7 @@ timeout_seconds: 30
146146
-->
147147

148148
```sh
149-
dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 fan_out_fan_in.py
149+
dapr run --app-id wfexample -- python3 fan_out_fan_in.py
150150
```
151151
<!--END_STEP-->
152152

@@ -186,7 +186,7 @@ This example demonstrates how to use a workflow to interact with a human user. T
186186
The Dapr CLI can be started using the following command:
187187

188188
```sh
189-
dapr run --app-id wfexample --dapr-grpc-port 50001
189+
dapr run --app-id wfexample
190190
```
191191

192192
In a separate terminal window, run the following command to start the Python workflow app:
@@ -222,7 +222,7 @@ This example demonstrates how to eternally running workflow that polls an endpoi
222222
The Dapr CLI can be started using the following command:
223223

224224
```sh
225-
dapr run --app-id wfexample --dapr-grpc-port 50001
225+
dapr run --app-id wfexample
226226
```
227227

228228
In a separate terminal window, run the following command to start the Python workflow app:
@@ -254,7 +254,7 @@ This workflow runs forever or until you press `ENTER` to stop it. Starting the a
254254
This example demonstrates how to call a child workflow. The Dapr CLI can be started using the following command:
255255

256256
```sh
257-
dapr run --app-id wfexample --dapr-grpc-port 50001
257+
dapr run --app-id wfexample
258258
```
259259

260260
In a separate terminal window, run the following command to start the Python workflow app:
@@ -269,4 +269,129 @@ When you run the example, you will see output like this:
269269
*** Calling child workflow 29a7592a1e874b07aad2bb58de309a51-child
270270
*** Child workflow 6feadc5370184b4998e50875b20084f6 called
271271
...
272-
```
272+
```
273+
274+
275+
### Cross-app Workflow
276+
277+
This example demonstrates how to call child workflows and activities in different apps. The multiple Dapr CLI instances can be started using the following commands:
278+
279+
<!-- STEP
280+
name: Run apps
281+
expected_stdout_lines:
282+
- '== APP == app1 - triggering app1 workflow'
283+
- '== APP == app1 - received workflow call'
284+
- '== APP == app1 - triggering app2 workflow'
285+
- '== APP == app2 - received workflow call'
286+
- '== APP == app2 - triggering app3 activity'
287+
- '== APP == app3 - received activity call'
288+
- '== APP == app3 - returning activity result'
289+
- '== APP == app2 - received activity result'
290+
- '== APP == app2 - returning workflow result'
291+
- '== APP == app1 - received workflow result'
292+
- '== APP == app1 - returning workflow result'
293+
background: true
294+
sleep: 20
295+
-->
296+
297+
```sh
298+
dapr run --app-id wfexample3 python3 cross-app3.py &
299+
dapr run --app-id wfexample2 python3 cross-app2.py &
300+
dapr run --app-id wfexample1 python3 cross-app1.py
301+
```
302+
<!-- END_STEP -->
303+
304+
When you run the apps, you will see output like this:
305+
```
306+
...
307+
app1 - triggering app2 workflow
308+
app2 - triggering app3 activity
309+
...
310+
```
311+
among others. This shows that the workflow calls are working as expected.
312+
313+
314+
#### Error handling on activity calls
315+
316+
This example demonstrates how the error handling works on activity calls across apps.
317+
318+
Error handling on activity calls across apps works as normal workflow activity calls.
319+
320+
In this example we run `app3` in failing mode, which makes the activity call return error constantly. The activity call from `app2` will fail after the retry policy is exhausted.
321+
322+
<!-- STEP
323+
name: Run apps
324+
expected_stdout_lines:
325+
- '== APP == app1 - triggering app1 workflow'
326+
- '== APP == app1 - received workflow call'
327+
- '== APP == app1 - triggering app2 workflow'
328+
- '== APP == app2 - received workflow call'
329+
- '== APP == app2 - triggering app3 activity'
330+
- '== APP == app3 - received activity call'
331+
- '== APP == app3 - raising error in activity due to error mode being enabled'
332+
- '== APP == app2 - received activity error from app3'
333+
- '== APP == app2 - returning workflow result'
334+
- '== APP == app1 - received workflow result'
335+
- '== APP == app1 - returning workflow result'
336+
sleep: 20
337+
-->
338+
339+
```sh
340+
export ERROR_ACTIVITY_MODE=true
341+
dapr run --app-id wfexample3 python3 cross-app3.py &
342+
dapr run --app-id wfexample2 python3 cross-app2.py &
343+
dapr run --app-id wfexample1 python3 cross-app1.py
344+
```
345+
<!-- END_STEP -->
346+
347+
348+
When you run the apps with the `ERROR_ACTIVITY_MODE` environment variable set, you will see output like this:
349+
```
350+
...
351+
app3 - received activity call
352+
app3 - raising error in activity due to error mode being enabled
353+
app2 - received activity error from app3
354+
...
355+
```
356+
among others. This shows that the activity calls are failing as expected, and they are being handled as expected too.
357+
358+
359+
#### Error handling on workflow calls
360+
361+
This example demonstrates how the error handling works on workflow calls across apps.
362+
363+
Error handling on workflow calls across apps works as normal workflow calls.
364+
365+
In this example we run `app2` in failing mode, which makes the workflow call return error constantly. The workflow call from `app1` will fail after the retry policy is exhausted.
366+
367+
<!-- STEP
368+
name: Run apps
369+
expected_stdout_lines:
370+
- '== APP == app1 - triggering app1 workflow'
371+
- '== APP == app1 - received workflow call'
372+
- '== APP == app1 - triggering app2 workflow'
373+
- '== APP == app2 - received workflow call'
374+
- '== APP == app2 - raising error in workflow due to error mode being enabled'
375+
- '== APP == app1 - received workflow error from app2'
376+
- '== APP == app1 - returning workflow result'
377+
sleep: 20
378+
-->
379+
380+
```sh
381+
export ERROR_WORKFLOW_MODE=true
382+
dapr run --app-id wfexample3 python3 cross-app3.py &
383+
dapr run --app-id wfexample2 python3 cross-app2.py &
384+
dapr run --app-id wfexample1 python3 cross-app1.py
385+
```
386+
<!-- END_STEP -->
387+
388+
When you run the apps with the `ERROR_WORKFLOW_MODE` environment variable set, you will see output like this:
389+
```
390+
...
391+
app2 - received workflow call
392+
app2 - raising error in workflow due to error mode being enabled
393+
app1 - received workflow error from app2
394+
...
395+
```
396+
among others. This shows that the workflow calls are failing as expected, and they are being handled as expected too.
397+

examples/workflow/cross-app1.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2025 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
from datetime import timedelta
14+
15+
from durabletask.task import TaskFailedError
16+
import dapr.ext.workflow as wf
17+
import time
18+
19+
wfr = wf.WorkflowRuntime()
20+
21+
22+
@wfr.workflow
23+
def app1_workflow(ctx: wf.DaprWorkflowContext):
24+
print(f'app1 - received workflow call', flush=True)
25+
print(f'app1 - triggering app2 workflow', flush=True)
26+
27+
try:
28+
retry_policy = wf.RetryPolicy(
29+
max_number_of_attempts=2,
30+
first_retry_interval=timedelta(milliseconds=100),
31+
max_retry_interval=timedelta(seconds=3),
32+
)
33+
yield ctx.call_child_workflow(
34+
workflow='app2_workflow',
35+
input=None,
36+
app_id='wfexample2',
37+
retry_policy=retry_policy,
38+
)
39+
print(f'app1 - received workflow result', flush=True)
40+
except TaskFailedError as e:
41+
print(f'app1 - received workflow error from app2', flush=True)
42+
43+
print(f'app1 - returning workflow result', flush=True)
44+
return 1
45+
46+
47+
if __name__ == '__main__':
48+
wfr.start()
49+
time.sleep(10) # wait for workflow runtime to start
50+
51+
wf_client = wf.DaprWorkflowClient()
52+
print(f'app1 - triggering app1 workflow', flush=True)
53+
instance_id = wf_client.schedule_new_workflow(workflow=app1_workflow)
54+
55+
# Wait for the workflow to complete
56+
time.sleep(7)
57+
58+
wfr.shutdown()

examples/workflow/cross-app2.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2025 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
from datetime import timedelta
14+
import os
15+
16+
from durabletask.task import TaskFailedError
17+
import dapr.ext.workflow as wf
18+
import time
19+
20+
wfr = wf.WorkflowRuntime()
21+
22+
23+
@wfr.workflow
24+
def app2_workflow(ctx: wf.DaprWorkflowContext):
25+
print(f'app2 - received workflow call', flush=True)
26+
if os.getenv('ERROR_WORKFLOW_MODE', 'false') == 'true':
27+
print(f'app2 - raising error in workflow due to error mode being enabled', flush=True)
28+
raise ValueError('Error in workflow due to error mode being enabled')
29+
print(f'app2 - triggering app3 activity', flush=True)
30+
try:
31+
retry_policy = wf.RetryPolicy(
32+
max_number_of_attempts=2,
33+
first_retry_interval=timedelta(milliseconds=100),
34+
max_retry_interval=timedelta(seconds=3),
35+
)
36+
result = yield ctx.call_activity(
37+
'app3_activity', input=None, app_id='wfexample3', retry_policy=retry_policy
38+
)
39+
print(f'app2 - received activity result', flush=True)
40+
except TaskFailedError as e:
41+
print(f'app2 - received activity error from app3', flush=True)
42+
43+
print(f'app2 - returning workflow result', flush=True)
44+
return 2
45+
46+
47+
if __name__ == '__main__':
48+
wfr.start()
49+
time.sleep(15) # wait for workflow runtime to start
50+
wfr.shutdown()

examples/workflow/cross-app3.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2025 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
import os
13+
import dapr.ext.workflow as wf
14+
import time
15+
16+
wfr = wf.WorkflowRuntime()
17+
18+
19+
@wfr.activity
20+
def app3_activity(ctx: wf.DaprWorkflowContext) -> int:
21+
print(f'app3 - received activity call', flush=True)
22+
if os.getenv('ERROR_ACTIVITY_MODE', 'false') == 'true':
23+
print(f'app3 - raising error in activity due to error mode being enabled', flush=True)
24+
raise ValueError('Error in activity due to error mode being enabled')
25+
print(f'app3 - returning activity result', flush=True)
26+
return 3
27+
28+
29+
if __name__ == '__main__':
30+
wfr.start()
31+
time.sleep(15) # wait for workflow runtime to start
32+
wfr.shutdown()

0 commit comments

Comments
 (0)