Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 67 additions & 84 deletions daprdocs/content/en/python-sdk-docs/python-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,73 @@ The base endpoint for gRPC calls is the one used for the client initialisation (
- For a full guide on service invocation visit [How-To: Invoke a service]({{< ref howto-invoke-discover-services.md >}}).
- Visit [Python SDK examples](https://github.com/dapr/python-sdk/tree/master/examples/invoke-simple) for code samples and instructions to try out service invocation.

### Workflow

```python
from time import sleep

import dapr.ext.workflow as wf


wfr = wf.WorkflowRuntime()


@wfr.workflow(name='random_workflow')
def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
try:
result1 = yield ctx.call_activity(step1, input=wf_input)
result2 = yield ctx.call_activity(step2, input=result1)
except Exception as e:
yield ctx.call_activity(error_handler, input=str(e))
raise
# TODO update to set custom status
return [result1, result2]


@wfr.activity(name='step1')
def step1(ctx, activity_input):
print(f'Step 1: Received input: {activity_input}.')
# Do some work
return activity_input + 1


@wfr.activity
def step2(ctx, activity_input):
print(f'Step 2: Received input: {activity_input}.')
# Do some work
return activity_input * 2

@wfr.activity
def error_handler(ctx, error):
print(f'Executing error handler: {error}.')
# Do some compensating work


if __name__ == '__main__':
wfr.start()
sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(workflow=task_chain_workflow, input=42)
print(f'Workflow started. Instance ID: {instance_id}')
state = wf_client.wait_for_workflow_completion(instance_id)
print(f'Workflow completed! Status: {state.runtime_status}')

wfr.shutdown()
```

- Learn more about authoring and managing workflows:
- [How-To: Author a workflow]({{< ref howto-author-workflow.md >}}).
- [How-To: Manage a workflow]({{< ref howto-manage-workflow.md >}}).
- Visit [Python SDK examples](https://github.com/dapr/python-sdk/tree/main/examples/workflow) for code samples and instructions to try out Dapr Workflow:
- [Task chaining example](https://github.com/dapr/python-sdk/blob/main/examples/workflow/task_chaining.py)
- [Fan-out/Fan-in example](https://github.com/dapr/python-sdk/blob/main/examples/workflow/fan_out_fan_in.py)
- [Child workflow example](https://github.com/dapr/python-sdk/blob/main/examples/workflow/child_workflow.py)
- [Human approval example](https://github.com/dapr/python-sdk/blob/main/examples/workflow/human_approval.py)
- [Monitor example](https://github.com/dapr/python-sdk/blob/main/examples/workflow/monitor.py)



### Save & get application state

```python
Expand Down Expand Up @@ -515,89 +582,5 @@ def main():
- For a full list of state operations visit [How-To: Use the cryptography APIs]({{< ref howto-cryptography.md >}}).
- Visit [Python SDK examples](https://github.com/dapr/python-sdk/tree/master/examples/crypto) for code samples and instructions to try out cryptography

### Workflow

```python
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
from dapr.clients import DaprClient

instanceId = "exampleInstanceID"
workflowComponent = "dapr"
workflowName = "hello_world_wf"
eventName = "event1"
eventData = "eventData"

def main():
with DaprClient() as d:
host = settings.DAPR_RUNTIME_HOST
port = settings.DAPR_GRPC_PORT
workflowRuntime = WorkflowRuntime(host, port)
workflowRuntime = WorkflowRuntime()
workflowRuntime.register_workflow(hello_world_wf)
workflowRuntime.register_activity(hello_act)
workflowRuntime.start()

# Start the workflow
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")

# ...

# Pause Test
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}")

# Resume Test
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}")

sleep(1)
# Raise event
d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
event_name=eventName, event_data=eventData)

sleep(5)
# Purge Test
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("Instance Successfully Purged")


# Kick off another workflow for termination purposes
# This will also test using the same instance ID on a new workflow after
# the old instance was purged
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")

# Terminate Test
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
sleep(1)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}")

# Purge Test
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("Instance Successfully Purged")

workflowRuntime.shutdown()
```

- Learn more about authoring and managing workflows:
- [How-To: Author a workflow]({{< ref howto-author-workflow.md >}}).
- [How-To: Manage a workflow]({{< ref howto-manage-workflow.md >}}).
- Visit [Python SDK examples](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py) for code samples and instructions to try out Dapr Workflow.


## Related links
[Python SDK examples](https://github.com/dapr/python-sdk/tree/master/examples)
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ description: How to get up and running with the Dapr Workflow extension
no_list: true
---

{{% alert title="Note" color="primary" %}}
Dapr Workflow is currently in alpha.
{{% /alert %}}

The Dapr Python SDK provides a built in Dapr Workflow extension, `dapr.ext.workflow`, for creating Dapr services.
The Dapr Python SDK provides a built-in Dapr Workflow extension, `dapr.ext.workflow`, for creating Dapr services.

## Installation

Expand Down
Loading