@@ -7,56 +7,59 @@ import (
77 "time"
88
99 "github.com/dapr/durabletask-go/workflow"
10- dapr "github.com/dapr/go-sdk/client"
10+ "github.com/dapr/go-sdk/client"
1111)
1212
1313func main () {
14- registry := workflow .NewTaskRegistry ()
14+ r := workflow .NewRegistry ()
1515
16- if err := registry .AddWorkflow (BatchProcessingWorkflow ); err != nil {
16+ if err := r .AddWorkflow (BatchProcessingWorkflow ); err != nil {
1717 log .Fatalf ("failed to register workflow: %v" , err )
1818 }
19- if err := registry .AddActivity (GetWorkBatch ); err != nil {
19+ if err := r .AddActivity (GetWorkBatch ); err != nil {
2020 log .Fatalf ("failed to register activity: %v" , err )
2121 }
22- if err := registry .AddActivity (ProcessWorkItem ); err != nil {
22+ if err := r .AddActivity (ProcessWorkItem ); err != nil {
2323 log .Fatalf ("failed to register activity: %v" , err )
2424 }
25- if err := registry .AddActivity (ProcessResults ); err != nil {
25+ if err := r .AddActivity (ProcessResults ); err != nil {
2626 log .Fatalf ("failed to register activity: %v" , err )
2727 }
2828 fmt .Println ("Workflow(s) and activities registered." )
2929
30- ctx := context .Background ()
31-
32- daprClient , err := dapr .NewClient ()
30+ dclient , err := client .NewClient ()
3331 if err != nil {
34- log .Fatalf ( "failed to create Dapr client: %v" , err )
32+ log .Fatal ( err )
3533 }
3634
37- wfclient := workflow .NewClient (daprClient .GrpcClientConn ())
38- if err := wfclient .StartWorker (ctx , registry ); err != nil {
39- log .Fatalf ("failed to start work item listener: %v" , err )
35+ wclient := workflow .NewClient (dclient .GrpcClientConn ())
36+ fmt .Println ("Worker initialized" )
37+
38+ ctx , cancel := context .WithCancel (context .Background ())
39+ defer cancel ()
40+
41+ if err = wclient .StartWorker (ctx , r ); err != nil {
42+ log .Fatal (err )
4043 }
4144
42- id , err := wfclient . ScheduleNewWorkflow (ctx , "BatchProcessingWorkflow" , workflow .WithInput (10 ))
45+ id , err := wclient . StartWorkflow (ctx , "BatchProcessingWorkflow" , workflow .WithInput (10 ))
4346 if err != nil {
4447 log .Fatalf ("failed to schedule a new workflow: %v" , err )
4548 }
4649
47- metadata , err := wfclient .WaitForWorkflowCompletion (ctx , id )
50+ metadata , err := wclient .WaitForWorkflowCompletion (ctx , id )
4851 if err != nil {
4952 log .Fatalf ("failed to get workflow: %v" , err )
5053 }
5154 fmt .Printf ("workflow status: %s\n " , metadata .RuntimeStatus .String ())
5255
53- err = wfclient .TerminateWorkflow (ctx , id )
56+ err = wclient .TerminateWorkflow (ctx , id )
5457 if err != nil {
5558 log .Fatalf ("failed to terminate workflow: %v" , err )
5659 }
5760 fmt .Println ("workflow terminated" )
5861
59- err = wfclient .PurgeWorkflowState (ctx , id )
62+ err = wclient .PurgeWorkflowState (ctx , id )
6063 if err != nil {
6164 log .Fatalf ("failed to purge workflow: %v" , err )
6265 }
@@ -80,9 +83,9 @@ func BatchProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) {
8083 }
8184
8285 var outputs int
83- for _ , workflow := range parallelTasks {
86+ for _ , task := range parallelTasks {
8487 var output int
85- err := workflow .Await (& output )
88+ err := task .Await (& output )
8689 if err == nil {
8790 outputs += output
8891 } else {
0 commit comments