Skip to content

Commit e074c9e

Browse files
committed
feat: Adds support for cross-app calls.
Signed-off-by: Albert Callarisa <[email protected]>
1 parent 99314a4 commit e074c9e

File tree

9 files changed

+219
-21
lines changed

9 files changed

+219
-21
lines changed

dev-requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ Flask>=1.1
1515
# needed for auto fix
1616
ruff===0.2.2
1717
# needed for dapr-ext-workflow
18-
durabletask-dapr >= 0.2.0a8
18+
# durabletask-dapr >= 0.2.0a8
19+
durabletask-dapr @ git+https://github.com/dapr/durabletask-python@main
1920
# needed for .env file loading in examples
2021
python-dotenv>=1.0.0
2122
# needed for enhanced schema generation from function features

examples/workflow/README.md

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pip3 install -r requirements.txt
2020
Each of the examples in this directory can be run directly from the command line.
2121

2222
### Simple Workflow
23-
This example represents a workflow that manages counters through a series of activities and child workflows.
23+
This example represents a workflow that manages counters through a series of activities and child workflows.
2424
It shows several Dapr Workflow features including:
2525
- Basic activity execution with counter increments
2626
- Retryable activities with configurable retry policies
@@ -269,4 +269,44 @@ When you run the example, you will see output like this:
269269
*** Calling child workflow 29a7592a1e874b07aad2bb58de309a51-child
270270
*** Child workflow 6feadc5370184b4998e50875b20084f6 called
271271
...
272-
```
272+
```
273+
274+
275+
### Cross-app Workflow
276+
277+
This example demonstrates how to call child workflows and activities in different apps. The multiple Dapr CLI instances can be started using the following commands:
278+
279+
<!-- STEP
280+
name: Run apps
281+
expected_stdout_lines:
282+
- '== APP == app1 - triggering app1 workflow'
283+
- '== APP == app1 - received workflow call'
284+
- '== APP == app1 - triggering app2 workflow'
285+
- '== APP == app2 - received workflow call'
286+
- '== APP == app2 - triggering app3 activity'
287+
- '== APP == app3 - received activity call'
288+
- '== APP == app3 - returning activity result'
289+
- '== APP == app2 - received activity result'
290+
- '== APP == app2 - returning workflow result'
291+
- '== APP == app1 - received workflow result'
292+
- '== APP == app1 - returning workflow result'
293+
background: true
294+
sleep: 5
295+
-->
296+
297+
```sh
298+
pip install ./ext/dapr-ext-workflow
299+
dapr run --app-id wfexample3 --dapr-grpc-port 50003 python3 cross-app3.py &
300+
dapr run --app-id wfexample2 --dapr-grpc-port 50002 python3 cross-app2.py &
301+
dapr run --app-id wfexample1 --dapr-grpc-port 50001 python3 cross-app1.py
302+
```
303+
<!-- END_STEP -->
304+
305+
When you run the apps, you will see output like this:
306+
```
307+
...
308+
app1 - triggering app2 workflow
309+
app2 - triggering app3 activity
310+
...
311+
```
312+
among others. This shows that the workflow calls are working as expected.

examples/workflow/cross-app1.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2025 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
import dapr.ext.workflow as wf
14+
import time
15+
16+
wfr = wf.WorkflowRuntime()
17+
18+
19+
@wfr.workflow
20+
def app1_workflow(ctx: wf.DaprWorkflowContext):
21+
print(f'app1 - received workflow call', flush=True)
22+
print(f'app1 - triggering app2 workflow', flush=True)
23+
24+
yield ctx.call_child_workflow(
25+
workflow='app2_workflow',
26+
input=None,
27+
app_id='wfexample2',
28+
)
29+
print(f'app1 - received workflow result', flush=True)
30+
print(f'app1 - returning workflow result', flush=True)
31+
32+
return 1
33+
34+
35+
if __name__ == '__main__':
36+
wfr.start()
37+
time.sleep(10) # wait for workflow runtime to start
38+
39+
wf_client = wf.DaprWorkflowClient()
40+
print(f'app1 - triggering app1 workflow', flush=True)
41+
instance_id = wf_client.schedule_new_workflow(workflow=app1_workflow)
42+
43+
# Wait for the workflow to complete
44+
time.sleep(5)
45+
46+
wfr.shutdown()

examples/workflow/cross-app2.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2025 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
import dapr.ext.workflow as wf
14+
import time
15+
16+
wfr = wf.WorkflowRuntime()
17+
18+
19+
@wfr.workflow
20+
def app2_workflow(ctx: wf.DaprWorkflowContext):
21+
print(f'app2 - received workflow call', flush=True)
22+
print(f'app2 - triggering app3 activity', flush=True)
23+
yield ctx.call_activity('app3_activity', input=None, app_id='wfexample3')
24+
print(f'app2 - received activity result', flush=True)
25+
print(f'app2 - returning workflow result', flush=True)
26+
27+
return 2
28+
29+
30+
if __name__ == '__main__':
31+
wfr.start()
32+
time.sleep(15) # wait for workflow runtime to start
33+
wfr.shutdown()

examples/workflow/cross-app3.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2025 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
import dapr.ext.workflow as wf
14+
import time
15+
16+
wfr = wf.WorkflowRuntime()
17+
18+
19+
@wfr.activity
20+
def app3_activity(ctx: wf.DaprWorkflowContext) -> int:
21+
print(f'app3 - received activity call', flush=True)
22+
print(f'app3 - returning activity result', flush=True)
23+
return 3
24+
25+
26+
if __name__ == '__main__':
27+
wfr.start()
28+
time.sleep(15) # wait for workflow runtime to start
29+
wfr.shutdown()

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,29 +63,68 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
6363

6464
def call_activity(
6565
self,
66-
activity: Callable[[WorkflowActivityContext, TInput], TOutput],
66+
activity: Union[Callable[[WorkflowActivityContext, TInput], TOutput], str],
6767
*,
6868
input: TInput = None,
6969
retry_policy: Optional[RetryPolicy] = None,
70+
app_id: Optional[str] = None,
7071
) -> task.Task[TOutput]:
72+
# Handle string activity names for cross-app scenarios
73+
if isinstance(activity, str):
74+
activity_name = activity
75+
if app_id is not None:
76+
self._logger.debug(
77+
f'{self.instance_id}: Creating cross-app activity {activity_name} for app {app_id}'
78+
)
79+
else:
80+
self._logger.debug(f'{self.instance_id}: Creating activity {activity_name}')
81+
82+
if retry_policy is None:
83+
return self.__obj.call_activity(activity=activity_name, input=input, app_id=app_id)
84+
return self.__obj.call_activity(
85+
activity=activity_name, input=input, retry_policy=retry_policy.obj, app_id=app_id
86+
)
87+
88+
# Handle function activity objects (original behavior)
7189
self._logger.debug(f'{self.instance_id}: Creating activity {activity.__name__}')
7290
if hasattr(activity, '_dapr_alternate_name'):
7391
act = activity.__dict__['_dapr_alternate_name']
7492
else:
7593
# this case should ideally never happen
7694
act = activity.__name__
7795
if retry_policy is None:
78-
return self.__obj.call_activity(activity=act, input=input)
79-
return self.__obj.call_activity(activity=act, input=input, retry_policy=retry_policy.obj)
96+
return self.__obj.call_activity(activity=act, input=input, app_id=app_id)
97+
return self.__obj.call_activity(
98+
activity=act, input=input, retry_policy=retry_policy.obj, app_id=app_id
99+
)
80100

81101
def call_child_workflow(
82102
self,
83-
workflow: Workflow,
103+
workflow: Union[Workflow, str],
84104
*,
85105
input: Optional[TInput] = None,
86106
instance_id: Optional[str] = None,
87107
retry_policy: Optional[RetryPolicy] = None,
108+
app_id: Optional[str] = None,
88109
) -> task.Task[TOutput]:
110+
# Handle string workflow names for cross-app scenarios
111+
if isinstance(workflow, str):
112+
workflow_name = workflow
113+
self._logger.debug(f'{self.instance_id}: Creating child workflow {workflow_name}')
114+
115+
if retry_policy is None:
116+
return self.__obj.call_sub_orchestrator(
117+
workflow_name, input=input, instance_id=instance_id, app_id=app_id
118+
)
119+
return self.__obj.call_sub_orchestrator(
120+
workflow_name,
121+
input=input,
122+
instance_id=instance_id,
123+
retry_policy=retry_policy.obj,
124+
app_id=app_id,
125+
)
126+
127+
# Handle function workflow objects (original behavior)
89128
self._logger.debug(f'{self.instance_id}: Creating child workflow {workflow.__name__}')
90129

91130
def wf(ctx: task.OrchestrationContext, inp: TInput):
@@ -100,9 +139,11 @@ def wf(ctx: task.OrchestrationContext, inp: TInput):
100139
# this case should ideally never happen
101140
wf.__name__ = workflow.__name__
102141
if retry_policy is None:
103-
return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id)
142+
return self.__obj.call_sub_orchestrator(
143+
wf, input=input, instance_id=instance_id, app_id=app_id
144+
)
104145
return self.__obj.call_sub_orchestrator(
105-
wf, input=input, instance_id=instance_id, retry_policy=retry_policy.obj
146+
wf, input=input, instance_id=instance_id, retry_policy=retry_policy.obj, app_id=app_id
106147
)
107148

108149
def wait_for_external_event(self, name: str) -> task.Task:

ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -107,18 +107,22 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
107107

108108
@abstractmethod
109109
def call_activity(
110-
self, activity: Activity[TOutput], *, input: Optional[TInput] = None
110+
self,
111+
activity: Union[Activity[TOutput], str],
112+
*,
113+
input: Optional[TInput] = None,
114+
app_id: Optional[str] = None,
111115
) -> task.Task[TOutput]:
112116
"""Schedule an activity for execution.
113117
114118
Parameters
115119
----------
116-
activity: Activity[TInput, TOutput]
117-
A reference to the activity function to call.
120+
activity: Activity[TInput, TOutput] | str
121+
A reference to the activity function to call, or a string name for cross-app activities.
118122
input: TInput | None
119123
The JSON-serializable input (or None) to pass to the activity.
120-
return_type: task.Task[TOutput]
121-
The JSON-serializable output type to expect from the activity result.
124+
app_id: str | None
125+
The AppID that will execute the activity.
122126
123127
Returns
124128
-------
@@ -130,22 +134,25 @@ def call_activity(
130134
@abstractmethod
131135
def call_child_workflow(
132136
self,
133-
orchestrator: Workflow[TOutput],
137+
orchestrator: Union[Workflow[TOutput], str],
134138
*,
135139
input: Optional[TInput] = None,
136140
instance_id: Optional[str] = None,
141+
app_id: Optional[str] = None,
137142
) -> task.Task[TOutput]:
138143
"""Schedule child-workflow function for execution.
139144
140145
Parameters
141146
----------
142-
orchestrator: Orchestrator[TInput, TOutput]
143-
A reference to the orchestrator function to call.
147+
orchestrator: Orchestrator[TInput, TOutput] | str
148+
A reference to the orchestrator function to call, or a string name for cross-app workflows.
144149
input: TInput
145150
The optional JSON-serializable input to pass to the orchestrator function.
146151
instance_id: str
147152
A unique ID to use for the sub-orchestration instance. If not specified, a
148153
random UUID will be used.
154+
app_id: str
155+
The AppID that will execute the workflow.
149156
150157
Returns
151158
-------

ext/dapr-ext-workflow/setup.cfg

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ packages = find_namespace:
2525
include_package_data = True
2626
install_requires =
2727
dapr >= 1.16.0
28-
durabletask-dapr >= 0.2.0a8
28+
# durabletask-dapr >= 0.2.0a8
29+
durabletask-dapr @ git+https://github.com/dapr/durabletask-python@main
2930

3031
[options.packages.find]
3132
include =
3233
dapr.*
3334

34-
exclude =
35+
exclude =
3536
tests

ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ def __init__(self):
3636
def create_timer(self, fire_at):
3737
return mock_create_timer
3838

39-
def call_activity(self, activity, input):
39+
def call_activity(self, activity, input, app_id):
4040
return mock_call_activity
4141

42-
def call_sub_orchestrator(self, orchestrator, input, instance_id):
42+
def call_sub_orchestrator(self, orchestrator, input, instance_id, app_id):
4343
return mock_call_sub_orchestrator
4444

4545
def set_custom_status(self, custom_status):

0 commit comments

Comments
 (0)