Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Flask>=1.1
# needed for auto fix
ruff===0.2.2
# needed for dapr-ext-workflow
durabletask-dapr >= 0.2.0a8
durabletask-dapr >= 0.2.0a9
# needed for .env file loading in examples
python-dotenv>=1.0.0
# needed for enhanced schema generation from function features
Expand Down
44 changes: 42 additions & 2 deletions examples/workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pip3 install -r requirements.txt
Each of the examples in this directory can be run directly from the command line.

### Simple Workflow
This example represents a workflow that manages counters through a series of activities and child workflows.
This example represents a workflow that manages counters through a series of activities and child workflows.
It shows several Dapr Workflow features including:
- Basic activity execution with counter increments
- Retryable activities with configurable retry policies
Expand Down Expand Up @@ -269,4 +269,44 @@ When you run the example, you will see output like this:
*** Calling child workflow 29a7592a1e874b07aad2bb58de309a51-child
*** Child workflow 6feadc5370184b4998e50875b20084f6 called
...
```
```


### Cross-app Workflow

This example demonstrates how to call child workflows and activities in different apps. The multiple Dapr CLI instances can be started using the following commands:

<!-- STEP
name: Run apps
expected_stdout_lines:
- '== APP == app1 - triggering app1 workflow'
- '== APP == app1 - received workflow call'
- '== APP == app1 - triggering app2 workflow'
- '== APP == app2 - received workflow call'
- '== APP == app2 - triggering app3 activity'
- '== APP == app3 - received activity call'
- '== APP == app3 - returning activity result'
- '== APP == app2 - received activity result'
- '== APP == app2 - returning workflow result'
- '== APP == app1 - received workflow result'
- '== APP == app1 - returning workflow result'
background: true
sleep: 5
-->

```sh
pip install ./ext/dapr-ext-workflow
dapr run --app-id wfexample3 --dapr-grpc-port 50003 python3 cross-app3.py &
dapr run --app-id wfexample2 --dapr-grpc-port 50002 python3 cross-app2.py &
dapr run --app-id wfexample1 --dapr-grpc-port 50001 python3 cross-app1.py
```
<!-- END_STEP -->

When you run the apps, you will see output like this:
```
...
app1 - triggering app2 workflow
app2 - triggering app3 activity
...
```
among others. This shows that the workflow calls are working as expected.
46 changes: 46 additions & 0 deletions examples/workflow/cross-app1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
# Copyright 2025 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import dapr.ext.workflow as wf
import time

wfr = wf.WorkflowRuntime()


@wfr.workflow
def app1_workflow(ctx: wf.DaprWorkflowContext):
print(f'app1 - received workflow call', flush=True)
print(f'app1 - triggering app2 workflow', flush=True)

yield ctx.call_child_workflow(
workflow='app2_workflow',
input=None,
app_id='wfexample2',
)
print(f'app1 - received workflow result', flush=True)
print(f'app1 - returning workflow result', flush=True)

return 1


if __name__ == '__main__':
wfr.start()
time.sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
print(f'app1 - triggering app1 workflow', flush=True)
instance_id = wf_client.schedule_new_workflow(workflow=app1_workflow)

# Wait for the workflow to complete
time.sleep(5)

wfr.shutdown()
33 changes: 33 additions & 0 deletions examples/workflow/cross-app2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
# Copyright 2025 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import dapr.ext.workflow as wf
import time

wfr = wf.WorkflowRuntime()


@wfr.workflow
def app2_workflow(ctx: wf.DaprWorkflowContext):
print(f'app2 - received workflow call', flush=True)
print(f'app2 - triggering app3 activity', flush=True)
yield ctx.call_activity('app3_activity', input=None, app_id='wfexample3')
print(f'app2 - received activity result', flush=True)
print(f'app2 - returning workflow result', flush=True)

return 2


if __name__ == '__main__':
wfr.start()
time.sleep(15) # wait for workflow runtime to start
wfr.shutdown()
29 changes: 29 additions & 0 deletions examples/workflow/cross-app3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
# Copyright 2025 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import dapr.ext.workflow as wf
import time

wfr = wf.WorkflowRuntime()


@wfr.activity
def app3_activity(ctx: wf.DaprWorkflowContext) -> int:
print(f'app3 - received activity call', flush=True)
print(f'app3 - returning activity result', flush=True)
return 3


if __name__ == '__main__':
wfr.start()
time.sleep(15) # wait for workflow runtime to start
wfr.shutdown()
Original file line number Diff line number Diff line change
Expand Up @@ -63,29 +63,68 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:

def call_activity(
self,
activity: Callable[[WorkflowActivityContext, TInput], TOutput],
activity: Union[Callable[[WorkflowActivityContext, TInput], TOutput], str],
*,
input: TInput = None,
retry_policy: Optional[RetryPolicy] = None,
app_id: Optional[str] = None,
) -> task.Task[TOutput]:
# Handle string activity names for cross-app scenarios
if isinstance(activity, str):
activity_name = activity
if app_id is not None:
self._logger.debug(
f'{self.instance_id}: Creating cross-app activity {activity_name} for app {app_id}'
)
else:
self._logger.debug(f'{self.instance_id}: Creating activity {activity_name}')

if retry_policy is None:
return self.__obj.call_activity(activity=activity_name, input=input, app_id=app_id)
return self.__obj.call_activity(
activity=activity_name, input=input, retry_policy=retry_policy.obj, app_id=app_id
)

# Handle function activity objects (original behavior)
self._logger.debug(f'{self.instance_id}: Creating activity {activity.__name__}')
if hasattr(activity, '_dapr_alternate_name'):
act = activity.__dict__['_dapr_alternate_name']
else:
# this case should ideally never happen
act = activity.__name__
if retry_policy is None:
return self.__obj.call_activity(activity=act, input=input)
return self.__obj.call_activity(activity=act, input=input, retry_policy=retry_policy.obj)
return self.__obj.call_activity(activity=act, input=input, app_id=app_id)
return self.__obj.call_activity(
activity=act, input=input, retry_policy=retry_policy.obj, app_id=app_id
)

def call_child_workflow(
self,
workflow: Workflow,
workflow: Union[Workflow, str],
*,
input: Optional[TInput] = None,
instance_id: Optional[str] = None,
retry_policy: Optional[RetryPolicy] = None,
app_id: Optional[str] = None,
) -> task.Task[TOutput]:
# Handle string workflow names for cross-app scenarios
if isinstance(workflow, str):
workflow_name = workflow
self._logger.debug(f'{self.instance_id}: Creating child workflow {workflow_name}')

if retry_policy is None:
return self.__obj.call_sub_orchestrator(
workflow_name, input=input, instance_id=instance_id, app_id=app_id
)
return self.__obj.call_sub_orchestrator(
workflow_name,
input=input,
instance_id=instance_id,
retry_policy=retry_policy.obj,
app_id=app_id,
)

# Handle function workflow objects (original behavior)
self._logger.debug(f'{self.instance_id}: Creating child workflow {workflow.__name__}')

def wf(ctx: task.OrchestrationContext, inp: TInput):
Expand All @@ -100,9 +139,11 @@ def wf(ctx: task.OrchestrationContext, inp: TInput):
# this case should ideally never happen
wf.__name__ = workflow.__name__
if retry_policy is None:
return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id)
return self.__obj.call_sub_orchestrator(
wf, input=input, instance_id=instance_id, app_id=app_id
)
return self.__obj.call_sub_orchestrator(
wf, input=input, instance_id=instance_id, retry_policy=retry_policy.obj
wf, input=input, instance_id=instance_id, retry_policy=retry_policy.obj, app_id=app_id
)

def wait_for_external_event(self, name: str) -> task.Task:
Expand Down
23 changes: 15 additions & 8 deletions ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,22 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:

@abstractmethod
def call_activity(
self, activity: Activity[TOutput], *, input: Optional[TInput] = None
self,
activity: Union[Activity[TOutput], str],
*,
input: Optional[TInput] = None,
app_id: Optional[str] = None,
) -> task.Task[TOutput]:
"""Schedule an activity for execution.

Parameters
----------
activity: Activity[TInput, TOutput]
A reference to the activity function to call.
activity: Activity[TInput, TOutput] | str
A reference to the activity function to call, or a string name for cross-app activities.
input: TInput | None
The JSON-serializable input (or None) to pass to the activity.
return_type: task.Task[TOutput]
The JSON-serializable output type to expect from the activity result.
app_id: str | None
The AppID that will execute the activity.

Returns
-------
Expand All @@ -130,22 +134,25 @@ def call_activity(
@abstractmethod
def call_child_workflow(
self,
orchestrator: Workflow[TOutput],
orchestrator: Union[Workflow[TOutput], str],
*,
input: Optional[TInput] = None,
instance_id: Optional[str] = None,
app_id: Optional[str] = None,
) -> task.Task[TOutput]:
"""Schedule child-workflow function for execution.

Parameters
----------
orchestrator: Orchestrator[TInput, TOutput]
A reference to the orchestrator function to call.
orchestrator: Orchestrator[TInput, TOutput] | str
A reference to the orchestrator function to call, or a string name for cross-app workflows.
input: TInput
The optional JSON-serializable input to pass to the orchestrator function.
instance_id: str
A unique ID to use for the sub-orchestration instance. If not specified, a
random UUID will be used.
app_id: str
The AppID that will execute the workflow.

Returns
-------
Expand Down
4 changes: 2 additions & 2 deletions ext/dapr-ext-workflow/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ packages = find_namespace:
include_package_data = True
install_requires =
dapr >= 1.16.0
durabletask-dapr >= 0.2.0a8
durabletask-dapr >= 0.2.0a9

[options.packages.find]
include =
dapr.*

exclude =
exclude =
tests
4 changes: 2 additions & 2 deletions ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ def __init__(self):
def create_timer(self, fire_at):
return mock_create_timer

def call_activity(self, activity, input):
def call_activity(self, activity, input, app_id):
return mock_call_activity

def call_sub_orchestrator(self, orchestrator, input, instance_id):
def call_sub_orchestrator(self, orchestrator, input, instance_id, app_id):
return mock_call_sub_orchestrator

def set_custom_status(self, custom_status):
Expand Down