Skip to content

Commit fce3598

Browse files
committed
feat: Adds support for cross-app calls.
Signed-off-by: Albert Callarisa <[email protected]>
1 parent 99314a4 commit fce3598

File tree

8 files changed

+203
-19
lines changed

8 files changed

+203
-19
lines changed

dev-requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ Flask>=1.1
1515
# needed for auto fix
1616
ruff===0.2.2
1717
# needed for dapr-ext-workflow
18-
durabletask-dapr >= 0.2.0a8
18+
# durabletask-dapr >= 0.2.0a8
19+
durabletask-dapr @ git+https://github.com/dapr/durabletask-python@main
1920
# needed for .env file loading in examples
2021
python-dotenv>=1.0.0
2122
# needed for enhanced schema generation from function features

examples/workflow/README.md

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pip3 install -r requirements.txt
2020
Each of the examples in this directory can be run directly from the command line.
2121

2222
### Simple Workflow
23-
This example represents a workflow that manages counters through a series of activities and child workflows.
23+
This example represents a workflow that manages counters through a series of activities and child workflows.
2424
It shows several Dapr Workflow features including:
2525
- Basic activity execution with counter increments
2626
- Retryable activities with configurable retry policies
@@ -269,4 +269,50 @@ When you run the example, you will see output like this:
269269
*** Calling child workflow 29a7592a1e874b07aad2bb58de309a51-child
270270
*** Child workflow 6feadc5370184b4998e50875b20084f6 called
271271
...
272-
```
272+
```
273+
274+
275+
### Cross-app Workflow
276+
277+
This example demonstrates how to call workflows and activities in different apps. The Dapr CLI can be started using the following command:
278+
279+
280+
<!-- STEP
281+
name: Run apps
282+
expected_stdout_lines:
283+
- '== APP == app1 - triggering app1 workflow'
284+
- '== APP == app1 - received workflow call'
285+
- '== APP == app1 - triggering app2 workflow'
286+
- '== APP == app2 - received workflow call'
287+
- '== APP == app2 - triggering app3 activity'
288+
- '== APP == app3 - received activity call'
289+
- '== APP == app3 - returning activity result'
290+
- '== APP == app2 - received activity result'
291+
- '== APP == app2 - returning workflow result'
292+
- '== APP == app1 - received workflow result'
293+
- '== APP == app1 - returning workflow result'
294+
background: true
295+
sleep: 5
296+
-->
297+
298+
```sh
299+
pip install ./ext/dapr-ext-workflow
300+
dapr run --app-id wfexample3 --dapr-grpc-port 50003 python3 cross-app3.py &
301+
dapr run --app-id wfexample2 --dapr-grpc-port 50002 python3 cross-app2.py &
302+
dapr run --app-id wfexample1 --dapr-grpc-port 50001 python3 cross-app1.py
303+
```
304+
<!-- END_STEP -->
305+
306+
In a separate terminal window, run the following command to start the Python workflow app:
307+
308+
```sh
309+
python3 cross-app1.py
310+
```
311+
312+
When you run the example, you will see output like this:
313+
```
314+
...
315+
*** Calling child workflow 29a7592a1e874b07aad2bb58de309a51-child
316+
*** Child workflow 6feadc5370184b4998e50875b20084f6 called
317+
...
318+
```

examples/workflow/cross-app1.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2023 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
import dapr.ext.workflow as wf
14+
import time
15+
16+
wfr = wf.WorkflowRuntime()
17+
18+
19+
@wfr.workflow
20+
def app1_workflow(ctx: wf.DaprWorkflowContext):
21+
print(f'app1 - received workflow call', flush=True)
22+
print(f'app1 - triggering app2 workflow', flush=True)
23+
24+
# yield ctx.call_activity('app3_activity', input=None, app_id='wfexample3')
25+
yield ctx.call_child_workflow(
26+
workflow='app2_workflow',
27+
input=None,
28+
app_id='wfexample2',
29+
)
30+
print(f'app1 - received workflow result', flush=True)
31+
print(f'app1 - returning workflow result', flush=True)
32+
33+
return 1
34+
35+
if __name__ == '__main__':
36+
wfr.start()
37+
time.sleep(10) # wait for workflow runtime to start
38+
39+
wf_client = wf.DaprWorkflowClient()
40+
print(f'app1 - triggering app1 workflow', flush=True)
41+
instance_id = wf_client.schedule_new_workflow(workflow=app1_workflow)
42+
43+
# Wait for the workflow to complete
44+
time.sleep(5)
45+
46+
wfr.shutdown()

examples/workflow/cross-app2.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2023 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
import dapr.ext.workflow as wf
14+
import time
15+
16+
wfr = wf.WorkflowRuntime()
17+
18+
@wfr.workflow
19+
def app2_workflow(ctx: wf.DaprWorkflowContext):
20+
print(f'app2 - received workflow call', flush=True)
21+
print(f'app2 - triggering app3 activity', flush=True)
22+
yield ctx.call_activity('app3_activity', input=None, app_id='wfexample3')
23+
print(f'app2 - received activity result', flush=True)
24+
print(f'app2 - returning workflow result', flush=True)
25+
26+
return 2
27+
28+
if __name__ == '__main__':
29+
wfr.start()
30+
time.sleep(15) # wait for workflow runtime to start
31+
wfr.shutdown()

examples/workflow/cross-app3.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2023 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
import dapr.ext.workflow as wf
14+
import time
15+
16+
wfr = wf.WorkflowRuntime()
17+
18+
19+
@wfr.activity
20+
def app3_activity(ctx: wf.DaprWorkflowContext) -> int:
21+
print(f'app3 - received activity call', flush=True)
22+
print(f'app3 - returning activity result', flush=True)
23+
return 3
24+
25+
26+
if __name__ == '__main__':
27+
wfr.start()
28+
time.sleep(15) # wait for workflow runtime to start
29+
wfr.shutdown()

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,29 +63,56 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
6363

6464
def call_activity(
6565
self,
66-
activity: Callable[[WorkflowActivityContext, TInput], TOutput],
66+
activity: Union[Callable[[WorkflowActivityContext, TInput], TOutput], str],
6767
*,
6868
input: TInput = None,
6969
retry_policy: Optional[RetryPolicy] = None,
70+
app_id: Optional[str] = None,
7071
) -> task.Task[TOutput]:
72+
# Handle string activity names for cross-app scenarios
73+
if isinstance(activity, str):
74+
activity_name = activity
75+
if app_id is not None:
76+
self._logger.debug(f'{self.instance_id}: Creating cross-app activity {activity_name} for app {app_id}')
77+
else:
78+
self._logger.debug(f'{self.instance_id}: Creating activity {activity_name}')
79+
80+
if retry_policy is None:
81+
return self.__obj.call_activity(activity=activity_name, input=input, app_id=app_id)
82+
return self.__obj.call_activity(activity=activity_name, input=input, retry_policy=retry_policy.obj, app_id=app_id)
83+
84+
# Handle function activity objects (original behavior)
7185
self._logger.debug(f'{self.instance_id}: Creating activity {activity.__name__}')
7286
if hasattr(activity, '_dapr_alternate_name'):
7387
act = activity.__dict__['_dapr_alternate_name']
7488
else:
7589
# this case should ideally never happen
7690
act = activity.__name__
7791
if retry_policy is None:
78-
return self.__obj.call_activity(activity=act, input=input)
79-
return self.__obj.call_activity(activity=act, input=input, retry_policy=retry_policy.obj)
92+
return self.__obj.call_activity(activity=act, input=input, app_id=app_id)
93+
return self.__obj.call_activity(activity=act, input=input, retry_policy=retry_policy.obj, app_id=app_id)
8094

8195
def call_child_workflow(
8296
self,
83-
workflow: Workflow,
97+
workflow: Union[Workflow, str],
8498
*,
8599
input: Optional[TInput] = None,
86100
instance_id: Optional[str] = None,
87101
retry_policy: Optional[RetryPolicy] = None,
102+
app_id: Optional[str] = None,
88103
) -> task.Task[TOutput]:
104+
# Handle string workflow names for cross-app scenarios
105+
if isinstance(workflow, str):
106+
workflow_name = workflow
107+
self._logger.debug(f'{self.instance_id}: Creating child workflow {workflow_name}')
108+
109+
if retry_policy is None:
110+
return self.__obj.call_sub_orchestrator(workflow_name, input=input, instance_id=instance_id, app_id=app_id)
111+
return self.__obj.call_sub_orchestrator(
112+
workflow_name, input=input, instance_id=instance_id, retry_policy=retry_policy.obj, app_id=app_id
113+
)
114+
115+
# Handle function workflow objects (original behavior)
89116
self._logger.debug(f'{self.instance_id}: Creating child workflow {workflow.__name__}')
90117

91118
def wf(ctx: task.OrchestrationContext, inp: TInput):
@@ -100,9 +127,9 @@ def wf(ctx: task.OrchestrationContext, inp: TInput):
100127
# this case should ideally never happen
101128
wf.__name__ = workflow.__name__
102129
if retry_policy is None:
103-
return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id)
130+
return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id, app_id=app_id)
104131
return self.__obj.call_sub_orchestrator(
105-
wf, input=input, instance_id=instance_id, retry_policy=retry_policy.obj
132+
wf, input=input, instance_id=instance_id, retry_policy=retry_policy.obj, app_id=app_id
106133
)
107134

108135
def wait_for_external_event(self, name: str) -> task.Task:

ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -107,18 +107,18 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
107107

108108
@abstractmethod
109109
def call_activity(
110-
self, activity: Activity[TOutput], *, input: Optional[TInput] = None
110+
self, activity: Union[Activity[TOutput], str], *, input: Optional[TInput] = None, app_id: Optional[str] = None
111111
) -> task.Task[TOutput]:
112112
"""Schedule an activity for execution.
113113
114114
Parameters
115115
----------
116-
activity: Activity[TInput, TOutput]
117-
A reference to the activity function to call.
116+
activity: Activity[TInput, TOutput] | str
117+
A reference to the activity function to call, or a string name for cross-app activities.
118118
input: TInput | None
119119
The JSON-serializable input (or None) to pass to the activity.
120-
return_type: task.Task[TOutput]
121-
The JSON-serializable output type to expect from the activity result.
120+
app_id: str | None
121+
The AppID that will execute the activity.
122122
123123
Returns
124124
-------
@@ -130,22 +130,25 @@ def call_activity(
130130
@abstractmethod
131131
def call_child_workflow(
132132
self,
133-
orchestrator: Workflow[TOutput],
133+
orchestrator: Union[Workflow[TOutput], str],
134134
*,
135135
input: Optional[TInput] = None,
136136
instance_id: Optional[str] = None,
137+
app_id: Optional[str] = None,
137138
) -> task.Task[TOutput]:
138139
"""Schedule child-workflow function for execution.
139140
140141
Parameters
141142
----------
142-
orchestrator: Orchestrator[TInput, TOutput]
143-
A reference to the orchestrator function to call.
143+
orchestrator: Orchestrator[TInput, TOutput] | str
144+
A reference to the orchestrator function to call, or a string name for cross-app workflows.
144145
input: TInput
145146
The optional JSON-serializable input to pass to the orchestrator function.
146147
instance_id: str
147148
A unique ID to use for the sub-orchestration instance. If not specified, a
148149
random UUID will be used.
150+
app_id: str
151+
The AppID that will execute the workflow.
149152
150153
Returns
151154
-------

ext/dapr-ext-workflow/setup.cfg

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ packages = find_namespace:
2525
include_package_data = True
2626
install_requires =
2727
dapr >= 1.16.0
28-
durabletask-dapr >= 0.2.0a8
28+
# durabletask-dapr >= 0.2.0a8
29+
durabletask-dapr @ git+https://github.com/dapr/durabletask-python@main
2930

3031
[options.packages.find]
3132
include =
3233
dapr.*
3334

34-
exclude =
35+
exclude =
3536
tests

0 commit comments

Comments
 (0)