Skip to content

Commit b74c247

Browse files
committed
update python examples for workflow; update conversation quickstart to python sdk
Signed-off-by: Hannah Hunter <[email protected]>
1 parent 3688a1d commit b74c247

File tree

3 files changed

+121
-126
lines changed

3 files changed

+121
-126
lines changed

daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md

Lines changed: 97 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,31 @@ The Dapr sidecar doesn’t load any workflow definitions. Rather, the sidecar si
3636

3737
<!--python-->
3838

39-
Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates a counter (activity) called `hello_act` that notifies users of the current counter value. `hello_act` is a function derived from a class called `WorkflowActivityContext`.
39+
Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates task chaining activities that receive input
4040

4141
```python
42-
def hello_act(ctx: WorkflowActivityContext, input):
43-
global counter
44-
counter += input
45-
print(f'New counter value is: {counter}!', flush=True)
42+
@wfr.activity(name='step10')
43+
def step1(ctx, activity_input):
44+
print(f'Step 1: Received input: {activity_input}.')
45+
# Do some work
46+
return activity_input + 1
47+
48+
49+
@wfr.activity
50+
def step2(ctx, activity_input):
51+
print(f'Step 2: Received input: {activity_input}.')
52+
# Do some work
53+
return activity_input * 2
54+
55+
56+
@wfr.activity
57+
def step3(ctx, activity_input):
58+
print(f'Step 3: Received input: {activity_input}.')
59+
# Do some work
60+
return activity_input ^ 2
4661
```
4762

48-
[See the `hello_act` workflow activity in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL40C1-L43C59)
63+
[See the task chaining workflow activity in context.](https://github.com/dapr/python-sdk/blob/main/examples/workflow/task_chaining.py)
4964

5065

5166
{{% /codetab %}}
@@ -226,16 +241,19 @@ Next, register and call the activites in a workflow.
226241

227242
<!--python-->
228243

229-
The `hello_world_wf` function is derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities.
244+
The `random_workflow` function is a task chaining workflow pattern derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities.
230245

231246
```python
232-
def hello_world_wf(ctx: DaprWorkflowContext, input):
233-
print(f'{input}')
234-
yield ctx.call_activity(hello_act, input=1)
235-
yield ctx.call_activity(hello_act, input=10)
236-
yield ctx.wait_for_external_event("event1")
237-
yield ctx.call_activity(hello_act, input=100)
238-
yield ctx.call_activity(hello_act, input=1000)
247+
@wfr.workflow(name='random_workflow')
248+
def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
249+
try:
250+
result1 = yield ctx.call_activity(step1, input=wf_input)
251+
result2 = yield ctx.call_activity(step2, input=result1)
252+
result3 = yield ctx.call_activity(step3, input=result2)
253+
except Exception as e:
254+
yield ctx.call_activity(error_handler, input=str(e))
255+
raise
256+
return [result1, result2, result3]
239257
```
240258

241259
[See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL32C1-L38C51)
@@ -409,82 +427,77 @@ Finally, compose the application using the workflow.
409427

410428
- A Python package called `DaprClient` to receive the Python SDK capabilities.
411429
- A builder with extensions called:
412-
- `WorkflowRuntime`: Allows you to register workflows and workflow activities
413-
- `DaprWorkflowContext`: Allows you to [create workflows]({{< ref "#write-the-workflow" >}})
430+
- `WorkflowRuntime`: Allows you to register the workflow runtime.
431+
- `DaprWorkflowContext`: Allows you to [create workflows and workflow activities]({{< ref "#write-the-workflow" >}})
414432
- `WorkflowActivityContext`: Allows you to [create workflow activities]({{< ref "#write-the-workflow-activities" >}})
415433
- API calls. In the example below, these calls start, pause, resume, purge, and terminate the workflow.
416434

417435
```python
418-
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
419-
from dapr.clients import DaprClient
420-
421-
# ...
422-
423-
def main():
424-
with DaprClient() as d:
425-
host = settings.DAPR_RUNTIME_HOST
426-
port = settings.DAPR_GRPC_PORT
427-
workflowRuntime = WorkflowRuntime(host, port)
428-
workflowRuntime = WorkflowRuntime()
429-
workflowRuntime.register_workflow(hello_world_wf)
430-
workflowRuntime.register_activity(hello_act)
431-
workflowRuntime.start()
432-
433-
# Start workflow
434-
print("==========Start Counter Increase as per Input:==========")
435-
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
436-
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
437-
print(f"start_resp {start_resp.instance_id}")
438-
439-
# ...
440-
441-
# Pause workflow
442-
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
443-
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
444-
print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}")
445-
446-
# Resume workflow
447-
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
448-
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
449-
print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}")
450-
451-
sleep(1)
452-
# Raise workflow
453-
d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
454-
event_name=eventName, event_data=eventData)
455-
456-
sleep(5)
457-
# Purge workflow
458-
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
459-
try:
460-
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
461-
except DaprInternalError as err:
462-
if nonExistentIDError in err._message:
463-
print("Instance Successfully Purged")
464-
465-
# Kick off another workflow for termination purposes
466-
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
467-
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
468-
print(f"start_resp {start_resp.instance_id}")
469-
470-
# Terminate workflow
471-
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
472-
sleep(1)
473-
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
474-
print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}")
475-
476-
# Purge workflow
477-
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
478-
try:
479-
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
480-
except DaprInternalError as err:
481-
if nonExistentIDError in err._message:
482-
print("Instance Successfully Purged")
483-
484-
workflowRuntime.shutdown()
436+
from durabletask import worker, task
437+
438+
from dapr.ext.workflow.workflow_context import Workflow
439+
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext
440+
from dapr.ext.workflow.workflow_activity_context import Activity, WorkflowActivityContext
441+
from dapr.ext.workflow.util import getAddress
442+
443+
from dapr.clients import DaprInternalError
444+
from dapr.clients.http.client import DAPR_API_TOKEN_HEADER
445+
from dapr.conf import settings
446+
from dapr.conf.helpers import GrpcEndpoint
447+
from dapr.ext.workflow.logger import LoggerOptions, Logger
448+
449+
wfr = wf.WorkflowRuntime()
450+
451+
@wfr.workflow(name='hello_world_wf')
452+
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
453+
# Workflow definition...
454+
455+
@wfr.activity(name='hello_act')
456+
def hello_act(ctx: WorkflowActivityContext, wf_input):
457+
# Activity definition...
458+
459+
# Start workflow
460+
wfr = WorkflowRuntime()
461+
wfr.start()
462+
wf_client = DaprWorkflowClient()
463+
464+
# ...
465+
466+
# Pause workflow
467+
wf_client.pause_workflow(instance_id=instance_id)
468+
metadata = wf_client.get_workflow_state(instance_id=instance_id)
469+
# ... check status ...
470+
wf_client.resume_workflow(instance_id=instance_id)
471+
472+
sleep(1)
473+
474+
# Raise workflow
475+
wf_client.raise_workflow_event(
476+
instance_id=instance_id,
477+
event_name=event_name,
478+
data=event_data
479+
)
480+
481+
# Purge workflow
482+
state = wf_client.wait_for_workflow_completion(
483+
instance_id,
484+
timeout_in_seconds=30
485+
)
486+
wf_client.purge_workflow(instance_id=instance_id)
487+
488+
workflowRuntime.shutdown()
485489

486490
if __name__ == '__main__':
487-
main()
491+
wfr.start()
492+
sleep(10) # wait for workflow runtime to start
493+
494+
wf_client = wf.DaprWorkflowClient()
495+
instance_id = wf_client.schedule_new_workflow(workflow=task_chain_workflow, input=42)
496+
print(f'Workflow started. Instance ID: {instance_id}')
497+
state = wf_client.wait_for_workflow_completion(instance_id)
498+
print(f'Workflow completed! Status: {state.runtime_status}')
499+
500+
wfr.shutdown()
488501
```
489502

490503

daprdocs/content/en/getting-started/quickstarts/conversation-quickstart.md

Lines changed: 24 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@ description: Get started with the Dapr conversation building block
1010
The conversation building block is currently in **alpha**.
1111
{{% /alert %}}
1212

13-
Let's take a look at how the [Dapr conversation building block]({{< ref conversation-overview.md >}}) makes interacting with Large Language Models (LLMs) easier. In this quickstart, you use the echo component to communicate with the mock LLM and ask it for a poem about Dapr.
13+
Let's take a look at how the [Dapr conversation building block]({{< ref conversation-overview.md >}}) makes interacting with Large Language Models (LLMs) easier. In this quickstart, you use the echo component to communicate with the mock LLM and ask it to define Dapr.
1414

1515
You can try out this conversation quickstart by either:
1616

1717
- [Running the application in this sample with the Multi-App Run template file]({{< ref "#run-the-app-with-the-template-file" >}}), or
1818
- [Running the application without the template]({{< ref "#run-the-app-without-the-template" >}})
1919

2020
{{% alert title="Note" color="primary" %}}
21-
Currently, only the HTTP quickstart sample is available in Python and JavaScript.
21+
Currently, you can only use JavaScript for the quickstart sample using HTTP, not the JavaScript SDK.
2222
{{% /alert %}}
2323

2424
## Run the app with the template file
@@ -50,7 +50,7 @@ git clone https://github.com/dapr/quickstarts.git
5050
From the root of the Quickstarts directory, navigate into the conversation directory:
5151

5252
```bash
53-
cd conversation/python/http/conversation
53+
cd conversation/python/sdk/conversation
5454
```
5555

5656
Install the dependencies:
@@ -61,7 +61,7 @@ pip3 install -r requirements.txt
6161

6262
### Step 3: Launch the conversation service
6363

64-
Navigate back to the `http` directory and start the conversation service with the following command:
64+
Navigate back to the `sdk` directory and start the conversation service with the following command:
6565

6666
```bash
6767
dapr run -f .
@@ -117,37 +117,28 @@ In the application code:
117117
- The mock LLM echoes "What is dapr?".
118118

119119
```python
120-
import logging
121-
import requests
122-
import os
123-
124-
logging.basicConfig(level=logging.INFO)
125-
126-
base_url = os.getenv('BASE_URL', 'http://localhost') + ':' + os.getenv(
127-
'DAPR_HTTP_PORT', '3500')
128-
129-
CONVERSATION_COMPONENT_NAME = 'echo'
130-
131-
input = {
132-
'name': 'echo',
133-
'inputs': [{'message':'What is dapr?'}],
134-
'parameters': {},
135-
'metadata': {}
120+
from dapr.clients import DaprClient
121+
from dapr.clients.grpc._request import ConversationInput
122+
123+
with DaprClient() as d:
124+
inputs = [
125+
ConversationInput(content="What is dapr?", role='user', scrub_pii=True),
126+
]
127+
128+
metadata = {
129+
'model': 'modelname',
130+
'key': 'authKey',
131+
'cacheTTL': '10m',
136132
}
137133
138-
# Send input to conversation endpoint
139-
result = requests.post(
140-
url='%s/v1.0-alpha1/conversation/%s/converse' % (base_url, CONVERSATION_COMPONENT_NAME),
141-
json=input
142-
)
143-
144-
logging.info('Input sent: What is dapr?')
134+
print('Input sent: What is dapr?')
145135
146-
# Parse conversation output
147-
data = result.json()
148-
output = data["outputs"][0]["result"]
136+
response = d.converse_alpha1(
137+
name='echo', inputs=inputs, temperature=0.7, context_id='chat-123', metadata=metadata
138+
)
149139
150-
logging.info('Output response: ' + output)
140+
for output in response.outputs:
141+
print(f'Output response: {output.result}')
151142
```
152143

153144
{{% /codetab %}}
@@ -575,7 +566,7 @@ git clone https://github.com/dapr/quickstarts.git
575566
From the root of the Quickstarts directory, navigate into the conversation directory:
576567

577568
```bash
578-
cd conversation/python/http/conversation
569+
cd conversation/python/sdk/conversation
579570
```
580571

581572
Install the dependencies:
@@ -586,7 +577,7 @@ pip3 install -r requirements.txt
586577

587578
### Step 3: Launch the conversation service
588579

589-
Navigate back to the `http` directory and start the conversation service with the following command:
580+
Navigate back to the `sdk` directory and start the conversation service with the following command:
590581

591582
```bash
592583
dapr run --app-id conversation --resources-path ../../../components -- python3 app.py

daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,6 @@ class WorkflowConsoleApp:
251251
if __name__ == '__main__':
252252
app = WorkflowConsoleApp()
253253
app.main()
254-
255254
```
256255

257256
#### `order-processor/workflow.py`
@@ -276,7 +275,6 @@ wfr = WorkflowRuntime()
276275

277276
logging.basicConfig(level=logging.INFO)
278277

279-
280278
@wfr.workflow(name="order_processing_workflow")
281279
def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: str):
282280
"""Defines the order processing workflow.
@@ -343,7 +341,6 @@ def notify_activity(ctx: WorkflowActivityContext, input: Notification):
343341
logger = logging.getLogger('NotifyActivity')
344342
logger.info(input.message)
345343

346-
347344
@wfr.activity(name="process_payment_activity")
348345
def process_payment_activity(ctx: WorkflowActivityContext, input: PaymentRequest):
349346
"""Defines Process Payment Activity.This is used by the workflow to process a payment"""
@@ -353,7 +350,6 @@ def process_payment_activity(ctx: WorkflowActivityContext, input: PaymentRequest
353350
+' USD')
354351
logger.info(f'Payment for request ID {input.request_id} processed successfully')
355352

356-
357353
@wfr.activity(name="verify_inventory_activity")
358354
def verify_inventory_activity(ctx: WorkflowActivityContext,
359355
input: InventoryRequest) -> InventoryResult:
@@ -377,8 +373,6 @@ def verify_inventory_activity(ctx: WorkflowActivityContext,
377373
return InventoryResult(True, inventory_item)
378374
return InventoryResult(False, None)
379375

380-
381-
382376
@wfr.activity(name="update_inventory_activity")
383377
def update_inventory_activity(ctx: WorkflowActivityContext,
384378
input: PaymentRequest) -> InventoryResult:
@@ -401,8 +395,6 @@ def update_inventory_activity(ctx: WorkflowActivityContext,
401395
client.save_state(store_name, input.item_being_purchased, new_val)
402396
logger.info(f'There are now {new_quantity} {input.item_being_purchased} left in stock')
403397

404-
405-
406398
@wfr.activity(name="request_approval_activity")
407399
def request_approval_activity(ctx: WorkflowActivityContext,
408400
input: OrderPayload):
@@ -413,7 +405,6 @@ def request_approval_activity(ctx: WorkflowActivityContext,
413405

414406
logger.info('Requesting approval for payment of '+f'{input["total_cost"]}'+' USD for '
415407
+f'{input["quantity"]}' +' ' +f'{input["item_name"]}')
416-
417408
```
418409
{{% /codetab %}}
419410

0 commit comments

Comments
 (0)