Skip to content

Commit 9e5e28e

Browse files
committed
feat: Adds support for cross-app calls.
Signed-off-by: Albert Callarisa <[email protected]>
1 parent 99314a4 commit 9e5e28e

File tree

8 files changed

+196
-19
lines changed

8 files changed

+196
-19
lines changed

dev-requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ Flask>=1.1
1515
# needed for auto fix
1616
ruff===0.2.2
1717
# needed for dapr-ext-workflow
18-
durabletask-dapr >= 0.2.0a8
18+
# durabletask-dapr >= 0.2.0a8
19+
durabletask-dapr @ git+https://github.com/dapr/durabletask-python@main
1920
# needed for .env file loading in examples
2021
python-dotenv>=1.0.0
2122
# needed for enhanced schema generation from function features

examples/workflow/README.md

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pip3 install -r requirements.txt
2020
Each of the examples in this directory can be run directly from the command line.
2121

2222
### Simple Workflow
23-
This example represents a workflow that manages counters through a series of activities and child workflows.
23+
This example represents a workflow that manages counters through a series of activities and child workflows.
2424
It shows several Dapr Workflow features including:
2525
- Basic activity execution with counter increments
2626
- Retryable activities with configurable retry policies
@@ -269,4 +269,44 @@ When you run the example, you will see output like this:
269269
*** Calling child workflow 29a7592a1e874b07aad2bb58de309a51-child
270270
*** Child workflow 6feadc5370184b4998e50875b20084f6 called
271271
...
272-
```
272+
```
273+
274+
275+
### Cross-app Workflow
276+
277+
This example demonstrates how to call child workflows and activities in different apps. The multiple Dapr CLI instances can be started using the following commands:
278+
279+
<!-- STEP
280+
name: Run apps
281+
expected_stdout_lines:
282+
- '== APP == app1 - triggering app1 workflow'
283+
- '== APP == app1 - received workflow call'
284+
- '== APP == app1 - triggering app2 workflow'
285+
- '== APP == app2 - received workflow call'
286+
- '== APP == app2 - triggering app3 activity'
287+
- '== APP == app3 - received activity call'
288+
- '== APP == app3 - returning activity result'
289+
- '== APP == app2 - received activity result'
290+
- '== APP == app2 - returning workflow result'
291+
- '== APP == app1 - received workflow result'
292+
- '== APP == app1 - returning workflow result'
293+
background: true
294+
sleep: 5
295+
-->
296+
297+
```sh
298+
pip install ./ext/dapr-ext-workflow
299+
dapr run --app-id wfexample3 --dapr-grpc-port 50003 python3 cross-app3.py &
300+
dapr run --app-id wfexample2 --dapr-grpc-port 50002 python3 cross-app2.py &
301+
dapr run --app-id wfexample1 --dapr-grpc-port 50001 python3 cross-app1.py
302+
```
303+
<!-- END_STEP -->
304+
305+
When you run the apps, you will see output like this:
306+
```
307+
...
308+
app1 - triggering app2 workflow
309+
app2 - triggering app3 activity
310+
...
311+
```
312+
among others. This shows that the workflow calls are working as expected.

examples/workflow/cross-app1.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2025 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
import dapr.ext.workflow as wf
14+
import time
15+
16+
wfr = wf.WorkflowRuntime()
17+
18+
19+
@wfr.workflow
20+
def app1_workflow(ctx: wf.DaprWorkflowContext):
21+
print(f'app1 - received workflow call', flush=True)
22+
print(f'app1 - triggering app2 workflow', flush=True)
23+
24+
yield ctx.call_child_workflow(
25+
workflow='app2_workflow',
26+
input=None,
27+
app_id='wfexample2',
28+
)
29+
print(f'app1 - received workflow result', flush=True)
30+
print(f'app1 - returning workflow result', flush=True)
31+
32+
return 1
33+
34+
if __name__ == '__main__':
35+
wfr.start()
36+
time.sleep(10) # wait for workflow runtime to start
37+
38+
wf_client = wf.DaprWorkflowClient()
39+
print(f'app1 - triggering app1 workflow', flush=True)
40+
instance_id = wf_client.schedule_new_workflow(workflow=app1_workflow)
41+
42+
# Wait for the workflow to complete
43+
time.sleep(5)
44+
45+
wfr.shutdown()

examples/workflow/cross-app2.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2025 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
import dapr.ext.workflow as wf
14+
import time
15+
16+
wfr = wf.WorkflowRuntime()
17+
18+
@wfr.workflow
19+
def app2_workflow(ctx: wf.DaprWorkflowContext):
20+
print(f'app2 - received workflow call', flush=True)
21+
print(f'app2 - triggering app3 activity', flush=True)
22+
yield ctx.call_activity('app3_activity', input=None, app_id='wfexample3')
23+
print(f'app2 - received activity result', flush=True)
24+
print(f'app2 - returning workflow result', flush=True)
25+
26+
return 2
27+
28+
if __name__ == '__main__':
29+
wfr.start()
30+
time.sleep(15) # wait for workflow runtime to start
31+
wfr.shutdown()

examples/workflow/cross-app3.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2025 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
import dapr.ext.workflow as wf
14+
import time
15+
16+
wfr = wf.WorkflowRuntime()
17+
18+
19+
@wfr.activity
20+
def app3_activity(ctx: wf.DaprWorkflowContext) -> int:
21+
print(f'app3 - received activity call', flush=True)
22+
print(f'app3 - returning activity result', flush=True)
23+
return 3
24+
25+
26+
if __name__ == '__main__':
27+
wfr.start()
28+
time.sleep(15) # wait for workflow runtime to start
29+
wfr.shutdown()

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,29 +63,56 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
6363

6464
def call_activity(
6565
self,
66-
activity: Callable[[WorkflowActivityContext, TInput], TOutput],
66+
activity: Union[Callable[[WorkflowActivityContext, TInput], TOutput], str],
6767
*,
6868
input: TInput = None,
6969
retry_policy: Optional[RetryPolicy] = None,
70+
app_id: Optional[str] = None,
7071
) -> task.Task[TOutput]:
72+
# Handle string activity names for cross-app scenarios
73+
if isinstance(activity, str):
74+
activity_name = activity
75+
if app_id is not None:
76+
self._logger.debug(f'{self.instance_id}: Creating cross-app activity {activity_name} for app {app_id}')
77+
else:
78+
self._logger.debug(f'{self.instance_id}: Creating activity {activity_name}')
79+
80+
if retry_policy is None:
81+
return self.__obj.call_activity(activity=activity_name, input=input, app_id=app_id)
82+
return self.__obj.call_activity(activity=activity_name, input=input, retry_policy=retry_policy.obj, app_id=app_id)
83+
84+
# Handle function activity objects (original behavior)
7185
self._logger.debug(f'{self.instance_id}: Creating activity {activity.__name__}')
7286
if hasattr(activity, '_dapr_alternate_name'):
7387
act = activity.__dict__['_dapr_alternate_name']
7488
else:
7589
# this case should ideally never happen
7690
act = activity.__name__
7791
if retry_policy is None:
78-
return self.__obj.call_activity(activity=act, input=input)
79-
return self.__obj.call_activity(activity=act, input=input, retry_policy=retry_policy.obj)
92+
return self.__obj.call_activity(activity=act, input=input, app_id=app_id)
93+
return self.__obj.call_activity(activity=act, input=input, retry_policy=retry_policy.obj, app_id=app_id)
8094

8195
def call_child_workflow(
8296
self,
83-
workflow: Workflow,
97+
workflow: Union[Workflow, str],
8498
*,
8599
input: Optional[TInput] = None,
86100
instance_id: Optional[str] = None,
87101
retry_policy: Optional[RetryPolicy] = None,
102+
app_id: Optional[str] = None,
88103
) -> task.Task[TOutput]:
104+
# Handle string workflow names for cross-app scenarios
105+
if isinstance(workflow, str):
106+
workflow_name = workflow
107+
self._logger.debug(f'{self.instance_id}: Creating child workflow {workflow_name}')
108+
109+
if retry_policy is None:
110+
return self.__obj.call_sub_orchestrator(workflow_name, input=input, instance_id=instance_id, app_id=app_id)
111+
return self.__obj.call_sub_orchestrator(
112+
workflow_name, input=input, instance_id=instance_id, retry_policy=retry_policy.obj, app_id=app_id
113+
)
114+
115+
# Handle function workflow objects (original behavior)
89116
self._logger.debug(f'{self.instance_id}: Creating child workflow {workflow.__name__}')
90117

91118
def wf(ctx: task.OrchestrationContext, inp: TInput):
@@ -100,9 +127,9 @@ def wf(ctx: task.OrchestrationContext, inp: TInput):
100127
# this case should ideally never happen
101128
wf.__name__ = workflow.__name__
102129
if retry_policy is None:
103-
return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id)
130+
return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id, app_id=app_id)
104131
return self.__obj.call_sub_orchestrator(
105-
wf, input=input, instance_id=instance_id, retry_policy=retry_policy.obj
132+
wf, input=input, instance_id=instance_id, retry_policy=retry_policy.obj, app_id=app_id
106133
)
107134

108135
def wait_for_external_event(self, name: str) -> task.Task:

ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -107,18 +107,18 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
107107

108108
@abstractmethod
109109
def call_activity(
110-
self, activity: Activity[TOutput], *, input: Optional[TInput] = None
110+
self, activity: Union[Activity[TOutput], str], *, input: Optional[TInput] = None, app_id: Optional[str] = None
111111
) -> task.Task[TOutput]:
112112
"""Schedule an activity for execution.
113113
114114
Parameters
115115
----------
116-
activity: Activity[TInput, TOutput]
117-
A reference to the activity function to call.
116+
activity: Activity[TInput, TOutput] | str
117+
A reference to the activity function to call, or a string name for cross-app activities.
118118
input: TInput | None
119119
The JSON-serializable input (or None) to pass to the activity.
120-
return_type: task.Task[TOutput]
121-
The JSON-serializable output type to expect from the activity result.
120+
app_id: str | None
121+
The AppID that will execute the activity.
122122
123123
Returns
124124
-------
@@ -130,22 +130,25 @@ def call_activity(
130130
@abstractmethod
131131
def call_child_workflow(
132132
self,
133-
orchestrator: Workflow[TOutput],
133+
orchestrator: Union[Workflow[TOutput], str],
134134
*,
135135
input: Optional[TInput] = None,
136136
instance_id: Optional[str] = None,
137+
app_id: Optional[str] = None,
137138
) -> task.Task[TOutput]:
138139
"""Schedule child-workflow function for execution.
139140
140141
Parameters
141142
----------
142-
orchestrator: Orchestrator[TInput, TOutput]
143-
A reference to the orchestrator function to call.
143+
orchestrator: Orchestrator[TInput, TOutput] | str
144+
A reference to the orchestrator function to call, or a string name for cross-app workflows.
144145
input: TInput
145146
The optional JSON-serializable input to pass to the orchestrator function.
146147
instance_id: str
147148
A unique ID to use for the sub-orchestration instance. If not specified, a
148149
random UUID will be used.
150+
app_id: str
151+
The AppID that will execute the workflow.
149152
150153
Returns
151154
-------

ext/dapr-ext-workflow/setup.cfg

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ packages = find_namespace:
2525
include_package_data = True
2626
install_requires =
2727
dapr >= 1.16.0
28-
durabletask-dapr >= 0.2.0a8
28+
# durabletask-dapr >= 0.2.0a8
29+
durabletask-dapr @ git+https://github.com/dapr/durabletask-python@main
2930

3031
[options.packages.find]
3132
include =
3233
dapr.*
3334

34-
exclude =
35+
exclude =
3536
tests

0 commit comments

Comments
 (0)