Skip to content

Commit 026c572

Browse files
committed
Creation of DTS example and passing of completionToken
Signed-off-by: Ryan Lettieri <[email protected]>
1 parent b92dc52 commit 026c572

File tree

3 files changed

+114
-11
lines changed

3 files changed

+114
-11
lines changed

durabletask/worker.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,11 @@ def run_loop():
143143
request_type = work_item.WhichOneof('request')
144144
self._logger.debug(f'Received "{request_type}" work item')
145145
if work_item.HasField('orchestratorRequest'):
146-
executor.submit(self._execute_orchestrator, work_item.orchestratorRequest, stub)
146+
executor.submit(self._execute_orchestrator, work_item.orchestratorRequest, stub, work_item.completionToken)
147147
elif work_item.HasField('activityRequest'):
148-
executor.submit(self._execute_activity, work_item.activityRequest, stub)
148+
executor.submit(self._execute_activity, work_item.activityRequest, stub, work_item.completionToken)
149+
elif work_item.HasField('healthPing'):
150+
pass # no-op
149151
else:
150152
self._logger.warning(f'Unexpected work item type: {request_type}')
151153

@@ -184,39 +186,42 @@ def stop(self):
184186
self._logger.info("Worker shutdown completed")
185187
self._is_running = False
186188

187-
def _execute_orchestrator(self, req: pb.OrchestratorRequest, stub: stubs.TaskHubSidecarServiceStub):
189+
def _execute_orchestrator(self, req: pb.OrchestratorRequest, stub: stubs.TaskHubSidecarServiceStub, completionToken):
188190
try:
189191
executor = _OrchestrationExecutor(self._registry, self._logger)
190192
result = executor.execute(req.instanceId, req.pastEvents, req.newEvents)
191193
res = pb.OrchestratorResponse(
192194
instanceId=req.instanceId,
193195
actions=result.actions,
194-
customStatus=pbh.get_string_value(result.encoded_custom_status))
196+
customStatus=pbh.get_string_value(result.encoded_custom_status),
197+
completionToken=completionToken)
195198
except Exception as ex:
196199
self._logger.exception(f"An error occurred while trying to execute instance '{req.instanceId}': {ex}")
197200
failure_details = pbh.new_failure_details(ex)
198201
actions = [pbh.new_complete_orchestration_action(-1, pb.ORCHESTRATION_STATUS_FAILED, "", failure_details)]
199-
res = pb.OrchestratorResponse(instanceId=req.instanceId, actions=actions)
202+
res = pb.OrchestratorResponse(instanceId=req.instanceId, actions=actions, completionToken=completionToken)
200203

201204
try:
202205
stub.CompleteOrchestratorTask(res)
203206
except Exception as ex:
204207
self._logger.exception(f"Failed to deliver orchestrator response for '{req.instanceId}' to sidecar: {ex}")
205208

206-
def _execute_activity(self, req: pb.ActivityRequest, stub: stubs.TaskHubSidecarServiceStub):
209+
def _execute_activity(self, req: pb.ActivityRequest, stub: stubs.TaskHubSidecarServiceStub, completionToken):
207210
instance_id = req.orchestrationInstance.instanceId
208211
try:
209212
executor = _ActivityExecutor(self._registry, self._logger)
210213
result = executor.execute(instance_id, req.name, req.taskId, req.input.value)
211214
res = pb.ActivityResponse(
212215
instanceId=instance_id,
213216
taskId=req.taskId,
214-
result=pbh.get_string_value(result))
217+
result=pbh.get_string_value(result),
218+
completionToken=completionToken)
215219
except Exception as ex:
216220
res = pb.ActivityResponse(
217221
instanceId=instance_id,
218222
taskId=req.taskId,
219-
failureDetails=pbh.new_failure_details(ex))
223+
failureDetails=pbh.new_failure_details(ex),
224+
completionToken=completionToken)
220225

221226
try:
222227
stub.CompleteActivityTask(res)

examples/README.md

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,39 @@
11
# Examples
22

3-
This directory contains examples of how to author durable orchestrations using the Durable Task Python SDK.
3+
This directory contains examples of how to author durable orchestrations using the Durable Task Python SDK. There are two backends that are compatible with the Durable Task Python SDK: The Dapr sidecar, and the Durable Task Scheduler (DTS)
44

5-
## Prerequisites
5+
## Prerequisites for using Dapr
66

77
All the examples assume that you have a Durable Task-compatible sidecar running locally. There are two options for this:
88

99
1. Install the latest version of the [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/), which contains and exposes an embedded version of the Durable Task engine. The setup process (which requires Docker) will configure the workflow engine to store state in a local Redis container.
1010

11-
1. Clone and run the [Durable Task Sidecar](https://github.com/microsoft/durabletask-go) project locally (requires Go 1.18 or higher). Orchestration state will be stored in a local sqlite database.
11+
2. Clone and run the [Durable Task Sidecar](https://github.com/microsoft/durabletask-go) project locally (requires Go 1.18 or higher). Orchestration state will be stored in a local sqlite database.
12+
13+
14+
## Prerequisites for using DTS
15+
16+
All the examples assume that you have a Durable Task Scheduler taskhub created.
17+
18+
The simplest way to create a taskhub is by using the az cli commands:
19+
20+
1. Create a scheduler:
21+
az durabletask scheduler create --resource-group <testrg> --name <testscheduler> --location <eastus> --ip-allowlist "[0.0.0.0/0]" --sku-capacity 1, --sku-name "Dedicated" --tags "{}"
22+
23+
2. Create your taskhub
24+
az durabletask taskhub create --resource-group <testrg> --scheduler-name <testscheduler> --name <testtaskhub>
25+
26+
3. Retrieve the endpoint for the taskhub. This can be done by locating the taskhub in the portal.
27+
28+
4. Set the appropriate environment variables for the TASKHUB and ENDPOINT
29+
30+
```sh
31+
export TASKHUB=<taskhubname>
32+
```
33+
34+
```sh
35+
export ENDPOINT=<taskhubEndpoint>
36+
```
1237

1338
## Running the examples
1439

examples/dts_activity_sequence.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import os
2+
from azure.identity import DefaultAzureCredential
3+
4+
"""End-to-end sample that demonstrates how to configure an orchestrator
5+
that calls an activity function in a sequence and prints the outputs."""
6+
from durabletask import client, task, worker
7+
8+
9+
def hello(ctx: task.ActivityContext, name: str) -> str:
10+
"""Activity function that returns a greeting"""
11+
return f'Hello {name}!'
12+
13+
14+
def sequence(ctx: task.OrchestrationContext, _):
15+
"""Orchestrator function that calls the 'hello' activity function in a sequence"""
16+
# call "hello" activity function in a sequence
17+
result1 = yield ctx.call_activity(hello, input='Tokyo')
18+
result2 = yield ctx.call_activity(hello, input='Seattle')
19+
result3 = yield ctx.call_activity(hello, input='London')
20+
21+
# return an array of results
22+
return [result1, result2, result3]
23+
24+
25+
# Read the environment variable
26+
taskhub_name = os.getenv("TASKHUB")
27+
28+
# Check if the variable exists
29+
if taskhub_name:
30+
print(f"The value of TASKHUB is: {taskhub_name}")
31+
else:
32+
print("TASKHUB is not set. Please set the TASKHUB environment variable to the name of the taskhub you wish to use")
33+
print("If you are using windows powershell, run the following: $env:TASKHUB=\"<taskhubname>\"")
34+
print("If you are using bash, run the following: export TASKHUB=\"<taskhubname>\"")
35+
exit()
36+
37+
# Read the environment variable
38+
endpoint = os.getenv("ENDPOINT")
39+
40+
# Check if the variable exists
41+
if endpoint:
42+
print(f"The value of ENDPOINT is: {endpoint}")
43+
else:
44+
print("ENDPOINT is not set. Please set the ENDPOINT environment variable to the endpoint of the taskhub")
45+
print("If you are using windows powershell, run the following: $env:ENDPOINT=\"<taskhubEndpoint>\"")
46+
print("If you are using bash, run the following: export ENDPOINT=\"<taskhubEndpoint>\"")
47+
exit()
48+
49+
50+
default_credential = DefaultAzureCredential()
51+
# Define the scope for Azure Resource Manager (ARM)
52+
arm_scope = "https://durabletask.io/.default"
53+
54+
# Retrieve the access token
55+
access_token = "Bearer " + default_credential.get_token(arm_scope).token
56+
# create a client, start an orchestration, and wait for it to finish
57+
metaData: list[tuple[str, str]] = [
58+
("taskhub", taskhub_name), # Hardcode for now, just the taskhub name
59+
("authorization", access_token) # use azure identity sdk for python
60+
]
61+
# configure and start the worker
62+
with worker.TaskHubGrpcWorker(host_address=endpoint, metadata=metaData, secure_channel=True) as w:
63+
w.add_orchestrator(sequence)
64+
w.add_activity(hello)
65+
w.start()
66+
67+
c = client.TaskHubGrpcClient(host_address=endpoint, metadata=metaData, secure_channel=True)
68+
instance_id = c.schedule_new_orchestration(sequence)
69+
state = c.wait_for_orchestration_completion(instance_id, timeout=45)
70+
if state and state.runtime_status == client.OrchestrationStatus.COMPLETED:
71+
print(f'Orchestration completed! Result: {state.serialized_output}')
72+
elif state:
73+
print(f'Orchestration failed: {state.failure_details}')

0 commit comments

Comments
 (0)