@@ -518,85 +518,68 @@ def main():
518518### Workflow
519519
520520``` python
521- from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
522- from dapr.clients import DaprClient
521+ from time import sleep
523522
524- instanceId = " exampleInstanceID"
525- workflowComponent = " dapr"
526- workflowName = " hello_world_wf"
527- eventName = " event1"
528- eventData = " eventData"
523+ import dapr.ext.workflow as wf
529524
530- def main ():
531- with DaprClient() as d:
532- host = settings.DAPR_RUNTIME_HOST
533- port = settings.DAPR_GRPC_PORT
534- workflowRuntime = WorkflowRuntime(host, port)
535- workflowRuntime = WorkflowRuntime()
536- workflowRuntime.register_workflow(hello_world_wf)
537- workflowRuntime.register_activity(hello_act)
538- workflowRuntime.start()
539-
540- # Start the workflow
541- start_resp = d.start_workflow(instance_id = instanceId, workflow_component = workflowComponent,
542- workflow_name = workflowName, input = inputData, workflow_options = workflowOptions)
543- print (f " start_resp { start_resp.instance_id} " )
544-
545- # ...
546-
547- # Pause Test
548- d.pause_workflow(instance_id = instanceId, workflow_component = workflowComponent)
549- getResponse = d.get_workflow(instance_id = instanceId, workflow_component = workflowComponent)
550- print (f " Get response from { workflowName} after pause call: { getResponse.runtime_status} " )
551-
552- # Resume Test
553- d.resume_workflow(instance_id = instanceId, workflow_component = workflowComponent)
554- getResponse = d.get_workflow(instance_id = instanceId, workflow_component = workflowComponent)
555- print (f " Get response from { workflowName} after resume call: { getResponse.runtime_status} " )
556-
557- sleep(1 )
558- # Raise event
559- d.raise_workflow_event(instance_id = instanceId, workflow_component = workflowComponent,
560- event_name = eventName, event_data = eventData)
561525
562- sleep(5 )
563- # Purge Test
564- d.purge_workflow(instance_id = instanceId, workflow_component = workflowComponent)
565- try :
566- getResponse = d.get_workflow(instance_id = instanceId, workflow_component = workflowComponent)
567- except DaprInternalError as err:
568- if nonExistentIDError in err._message:
569- print (" Instance Successfully Purged" )
570-
571-
572- # Kick off another workflow for termination purposes
573- # This will also test using the same instance ID on a new workflow after
574- # the old instance was purged
575- start_resp = d.start_workflow(instance_id = instanceId, workflow_component = workflowComponent,
576- workflow_name = workflowName, input = inputData, workflow_options = workflowOptions)
577- print (f " start_resp { start_resp.instance_id} " )
578-
579- # Terminate Test
580- d.terminate_workflow(instance_id = instanceId, workflow_component = workflowComponent)
581- sleep(1 )
582- getResponse = d.get_workflow(instance_id = instanceId, workflow_component = workflowComponent)
583- print (f " Get response from { workflowName} after terminate call: { getResponse.runtime_status} " )
584-
585- # Purge Test
586- d.purge_workflow(instance_id = instanceId, workflow_component = workflowComponent)
587- try :
588- getResponse = d.get_workflow(instance_id = instanceId, workflow_component = workflowComponent)
589- except DaprInternalError as err:
590- if nonExistentIDError in err._message:
591- print (" Instance Successfully Purged" )
526+ wfr = wf.WorkflowRuntime()
527+
528+
529+ @wfr.workflow (name = ' random_workflow' )
530+ def task_chain_workflow (ctx : wf.DaprWorkflowContext, wf_input : int ):
531+ try :
532+ result1 = yield ctx.call_activity(step1, input = wf_input)
533+ result2 = yield ctx.call_activity(step2, input = result1)
534+ except Exception as e:
535+ yield ctx.call_activity(error_handler, input = str (e))
536+ raise
537+ # TODO update to set custom status
538+ return [result1, result2]
539+
592540
593- workflowRuntime.shutdown()
541+ @wfr.activity (name = ' step1' )
542+ def step1 (ctx , activity_input ):
543+ print (f ' Step 1: Received input: { activity_input} . ' )
544+ # Do some work
545+ return activity_input + 1
546+
547+
548+ @wfr.activity
549+ def step2 (ctx , activity_input ):
550+ print (f ' Step 2: Received input: { activity_input} . ' )
551+ # Do some work
552+ return activity_input * 2
553+
554+ @wfr.activity
555+ def error_handler (ctx , error ):
556+ print (f ' Executing error handler: { error} . ' )
557+ # Do some compensating work
558+
559+
560+ if __name__ == ' __main__' :
561+ wfr.start()
562+ sleep(10 ) # wait for workflow runtime to start
563+
564+ wf_client = wf.DaprWorkflowClient()
565+ instance_id = wf_client.schedule_new_workflow(workflow = task_chain_workflow, input = 42 )
566+ print (f ' Workflow started. Instance ID: { instance_id} ' )
567+ state = wf_client.wait_for_workflow_completion(instance_id)
568+ print (f ' Workflow completed! Status: { state.runtime_status} ' )
569+
570+ wfr.shutdown()
594571```
595572
596573- Learn more about authoring and managing workflows:
597574 - [ How-To: Author a workflow] ({{< ref howto-author-workflow.md >}}).
598575 - [ How-To: Manage a workflow] ({{< ref howto-manage-workflow.md >}}).
599- - Visit [ Python SDK examples] ( https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py ) for code samples and instructions to try out Dapr Workflow.
576+ - Visit [ Python SDK examples] ( https://github.com/dapr/python-sdk/tree/main/examples/workflow ) for code samples and instructions to try out Dapr Workflow:
577+ - [ Task chaining example] ( https://github.com/dapr/python-sdk/blob/main/examples/workflow/task_chaining.py )
578+ - [ Fan-out/Fan-in example] ( https://github.com/dapr/python-sdk/blob/main/examples/workflow/fan_out_fan_in.py )
579+ - [ Child workflow example] ( https://github.com/dapr/python-sdk/blob/main/examples/workflow/child_workflow.py )
580+ - [ Human approval example] ( https://github.com/dapr/python-sdk/blob/main/examples/workflow/human_approval.py )
581+ - [ Monitor example] ( https://github.com/dapr/python-sdk/blob/main/examples/workflow/monitor.py )
582+
600583
601584
602585## Related links
0 commit comments