Skip to content

Commit 910e30f

Browse files
committed
Use durabletask-go workflow vanity client
Signed-off-by: joshvanl <me@joshvanl.dev>
1 parent 04656fb commit 910e30f

File tree

8 files changed

+62
-278
lines changed

8 files changed

+62
-278
lines changed

examples/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ require (
2525
github.com/go-chi/chi/v5 v5.2.2 // indirect
2626
github.com/go-logr/logr v1.4.3 // indirect
2727
github.com/go-logr/stdr v1.2.2 // indirect
28-
github.com/sirupsen/logrus v1.9.3 // indirect
2928
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
3029
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
3130
go.opentelemetry.io/otel v1.37.0 // indirect
@@ -36,5 +35,6 @@ require (
3635
golang.org/x/text v0.26.0 // indirect
3736
google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect
3837
gopkg.in/yaml.v3 v3.0.1 // indirect
39-
k8s.io/utils v0.0.0-20250502105355-0f33e8f1c979 // indirect
4038
)
39+
40+
replace github.com/dapr/durabletask-go => github.com/joshvanl/durabletask-go v0.0.0-20250909232152-78b35605272e

examples/go.sum

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,9 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
88
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
99
github.com/dapr/dapr v1.16.0-rc.3 h1:D99V20GOhb+bZXH1PngME+wgzIZCcBFOvmaP7DOZxGo=
1010
github.com/dapr/dapr v1.16.0-rc.3/go.mod h1:uyKnxMohSg87LSFzZ/oyuiGSo0+qkzeR0eXncPyIV9c=
11-
github.com/dapr/durabletask-go v0.8.3 h1:ntcaUOigNtDLCvQ0pFqxJTziibU1q/ItrCh89Jyrw3E=
12-
github.com/dapr/durabletask-go v0.8.3/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q=
1311
github.com/dapr/kit v0.16.1 h1:MqLAhHVg8trPy2WJChMZFU7ToeondvxcNHYVvMDiVf4=
1412
github.com/dapr/kit v0.16.1/go.mod h1:40ZWs5P6xfYf7O59XgwqZkIyDldTIXlhTQhGop8QoSM=
1513
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
16-
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1714
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
1815
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1916
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
@@ -37,6 +34,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
3734
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
3835
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
3936
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
37+
github.com/joshvanl/durabletask-go v0.0.0-20250909232152-78b35605272e h1:a8MVh2/GrjbM+NhldSY+uiHfkKDC69D4fN+g13dLiVw=
38+
github.com/joshvanl/durabletask-go v0.0.0-20250909232152-78b35605272e/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q=
4039
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
4140
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
4241
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
@@ -52,11 +51,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
5251
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
5352
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
5453
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
55-
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
56-
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
5754
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
5855
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
59-
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
6056
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
6157
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
6258
github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8Ydu2Bstc=
@@ -75,7 +71,6 @@ go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mx
7571
go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0=
7672
golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw=
7773
golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA=
78-
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
7974
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
8075
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
8176
golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
@@ -96,8 +91,5 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD
9691
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
9792
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
9893
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
99-
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
10094
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
10195
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
102-
k8s.io/utils v0.0.0-20250502105355-0f33e8f1c979 h1:jgJW5IePPXLGB8e/1wvd0Ich9QE97RvvF3a8J3fP/Lg=
103-
k8s.io/utils v0.0.0-20250502105355-0f33e8f1c979/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=

examples/workflow-parallel/main.go

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,14 @@ import (
66
"log"
77
"time"
88

9-
"github.com/dapr/durabletask-go/api"
10-
"github.com/dapr/durabletask-go/backend"
11-
"github.com/dapr/durabletask-go/client"
12-
"github.com/dapr/durabletask-go/task"
9+
"github.com/dapr/durabletask-go/workflow"
1310
dapr "github.com/dapr/go-sdk/client"
1411
)
1512

1613
func main() {
17-
registry := task.NewTaskRegistry()
14+
registry := workflow.NewTaskRegistry()
1815

19-
if err := registry.AddOrchestrator(BatchProcessingWorkflow); err != nil {
16+
if err := registry.AddWorkflow(BatchProcessingWorkflow); err != nil {
2017
log.Fatalf("failed to register workflow: %v", err)
2118
}
2219
if err := registry.AddActivity(GetWorkBatch); err != nil {
@@ -37,70 +34,70 @@ func main() {
3734
log.Fatalf("failed to create Dapr client: %v", err)
3835
}
3936

40-
client := client.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger())
41-
if err := client.StartWorkItemListener(ctx, registry); err != nil {
37+
wfclient := workflow.NewClient(daprClient.GrpcClientConn())
38+
if err := wfclient.StartWorker(ctx, registry); err != nil {
4239
log.Fatalf("failed to start work item listener: %v", err)
4340
}
4441

45-
id, err := client.ScheduleNewOrchestration(ctx, "BatchProcessingWorkflow", api.WithInput(10))
42+
id, err := wfclient.ScheduleNewWorkflow(ctx, "BatchProcessingWorkflow", workflow.WithInput(10))
4643
if err != nil {
4744
log.Fatalf("failed to schedule a new workflow: %v", err)
4845
}
4946

50-
metadata, err := client.WaitForOrchestrationCompletion(ctx, id)
47+
metadata, err := wfclient.WaitForWorkflowCompletion(ctx, id)
5148
if err != nil {
5249
log.Fatalf("failed to get workflow: %v", err)
5350
}
5451
fmt.Printf("workflow status: %s\n", metadata.RuntimeStatus.String())
5552

56-
err = client.TerminateOrchestration(ctx, id)
53+
err = wfclient.TerminateWorkflow(ctx, id)
5754
if err != nil {
5855
log.Fatalf("failed to terminate workflow: %v", err)
5956
}
6057
fmt.Println("workflow terminated")
6158

62-
err = client.PurgeOrchestrationState(ctx, id)
59+
err = wfclient.PurgeWorkflowState(ctx, id)
6360
if err != nil {
6461
log.Fatalf("failed to purge workflow: %v", err)
6562
}
6663
fmt.Println("workflow purged")
6764
}
6865

69-
func BatchProcessingWorkflow(ctx *task.OrchestrationContext) (any, error) {
66+
func BatchProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) {
7067
var input int
7168
if err := ctx.GetInput(&input); err != nil {
7269
return 0, err
7370
}
7471

7572
var workBatch []int
76-
if err := ctx.CallActivity(GetWorkBatch, task.WithActivityInput(input)).Await(&workBatch); err != nil {
73+
if err := ctx.CallActivity(GetWorkBatch, workflow.WithActivityInput(input)).Await(&workBatch); err != nil {
7774
return 0, err
7875
}
7976

80-
parallelTasks := make([]task.Task, len(workBatch))
77+
parallelTasks := make([]workflow.Task, len(workBatch))
8178
for i, workItem := range workBatch {
82-
parallelTasks[i] = ctx.CallActivity(ProcessWorkItem, task.WithActivityInput(workItem))
79+
parallelTasks[i] = ctx.CallActivity(ProcessWorkItem, workflow.WithActivityInput(workItem))
8380
}
8481

8582
var outputs int
86-
for _, task := range parallelTasks {
83+
for _, workflow := range parallelTasks {
8784
var output int
88-
err := task.Await(&output)
85+
err := workflow.Await(&output)
8986
if err == nil {
9087
outputs += output
9188
} else {
9289
return 0, err
9390
}
9491
}
9592

96-
if err := ctx.CallActivity(ProcessResults, task.WithActivityInput(outputs)).Await(nil); err != nil {
93+
if err := ctx.CallActivity(ProcessResults, workflow.WithActivityInput(outputs)).Await(nil); err != nil {
9794
return 0, err
9895
}
9996

10097
return 0, nil
10198
}
10299

103-
func GetWorkBatch(ctx task.ActivityContext) (any, error) {
100+
func GetWorkBatch(ctx workflow.ActivityContext) (any, error) {
104101
var batchSize int
105102
if err := ctx.GetInput(&batchSize); err != nil {
106103
return 0, err
@@ -112,7 +109,7 @@ func GetWorkBatch(ctx task.ActivityContext) (any, error) {
112109
return batch, nil
113110
}
114111

115-
func ProcessWorkItem(ctx task.ActivityContext) (any, error) {
112+
func ProcessWorkItem(ctx workflow.ActivityContext) (any, error) {
116113
var workItem int
117114
if err := ctx.GetInput(&workItem); err != nil {
118115
return 0, err
@@ -124,7 +121,7 @@ func ProcessWorkItem(ctx task.ActivityContext) (any, error) {
124121
return result, nil
125122
}
126123

127-
func ProcessResults(ctx task.ActivityContext) (any, error) {
124+
func ProcessResults(ctx workflow.ActivityContext) (any, error) {
128125
var finalResult int
129126
if err := ctx.GetInput(&finalResult); err != nil {
130127
return 0, err

examples/workflow-taskexecutionid/main.go

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,14 @@ import (
88
"sync/atomic"
99
"time"
1010

11-
"github.com/dapr/durabletask-go/api"
12-
"github.com/dapr/durabletask-go/backend"
13-
"github.com/dapr/durabletask-go/client"
14-
"github.com/dapr/durabletask-go/task"
11+
"github.com/dapr/durabletask-go/workflow"
1512
dapr "github.com/dapr/go-sdk/client"
1613
)
1714

1815
func main() {
19-
registry := task.NewTaskRegistry()
16+
registry := workflow.NewTaskRegistry()
2017

21-
if err := registry.AddOrchestrator(TaskExecutionIdWorkflow); err != nil {
18+
if err := registry.AddWorkflow(TaskExecutionIdWorkflow); err != nil {
2219
log.Fatalf("failed to register workflow: %v", err)
2320
}
2421
if err := registry.AddActivity(RetryN); err != nil {
@@ -31,32 +28,30 @@ func main() {
3128
log.Fatalf("failed to create Dapr client: %v", err)
3229
}
3330

34-
client := client.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger())
35-
3631
ctx := context.Background()
37-
38-
if err := client.StartWorkItemListener(ctx, registry); err != nil {
32+
wfclient := workflow.NewClient(daprClient.GrpcClientConn())
33+
if err := wfclient.StartWorker(ctx, registry); err != nil {
3934
log.Fatalf("failed to start work item listener: %v", err)
4035
}
4136

42-
id, err := client.ScheduleNewOrchestration(ctx, "TaskExecutionIdWorkflow", api.WithInput(5))
37+
id, err := wfclient.ScheduleNewWorkflow(ctx, "TaskExecutionIdWorkflow", workflow.WithInput(5))
4338
if err != nil {
4439
log.Fatalf("failed to schedule a new workflow: %v", err)
4540
}
4641

47-
metadata, err := client.WaitForOrchestrationCompletion(ctx, id)
42+
metadata, err := wfclient.WaitForWorkflowCompletion(ctx, id)
4843
if err != nil {
4944
log.Fatalf("failed to get workflow: %v", err)
5045
}
5146
fmt.Printf("workflow status: %s\n", metadata.RuntimeStatus.String())
5247

53-
err = client.TerminateOrchestration(ctx, id)
48+
err = wfclient.TerminateWorkflow(ctx, id)
5449
if err != nil {
5550
log.Fatalf("failed to terminate workflow: %v", err)
5651
}
5752
fmt.Println("workflow terminated")
5853

59-
err = client.PurgeOrchestrationState(ctx, id)
54+
err = wfclient.PurgeWorkflowState(ctx, id)
6055
if err != nil {
6156
log.Fatalf("failed to purge workflow: %v", err)
6257
}
@@ -65,35 +60,35 @@ func main() {
6560

6661
var eMap = sync.Map{}
6762

68-
func TaskExecutionIdWorkflow(ctx *task.OrchestrationContext) (any, error) {
63+
func TaskExecutionIdWorkflow(ctx *workflow.WorkflowContext) (any, error) {
6964
var retries int
7065
if err := ctx.GetInput(&retries); err != nil {
7166
return 0, err
7267
}
7368

7469
var workBatch []int
75-
if err := ctx.CallActivity(RetryN, task.WithActivityRetryPolicy(&task.RetryPolicy{
70+
if err := ctx.CallActivity(RetryN, workflow.WithActivityRetryPolicy(&workflow.RetryPolicy{
7671
MaxAttempts: retries,
7772
InitialRetryInterval: 100 * time.Millisecond,
7873
BackoffCoefficient: 2,
7974
MaxRetryInterval: 1 * time.Second,
80-
}), task.WithActivityInput(retries)).Await(&workBatch); err != nil {
75+
}), workflow.WithActivityInput(retries)).Await(&workBatch); err != nil {
8176
return 0, err
8277
}
8378

84-
if err := ctx.CallActivity(RetryN, task.WithActivityRetryPolicy(&task.RetryPolicy{
79+
if err := ctx.CallActivity(RetryN, workflow.WithActivityRetryPolicy(&workflow.RetryPolicy{
8580
MaxAttempts: retries,
8681
InitialRetryInterval: 100 * time.Millisecond,
8782
BackoffCoefficient: 2,
8883
MaxRetryInterval: 1 * time.Second,
89-
}), task.WithActivityInput(retries)).Await(&workBatch); err != nil {
84+
}), workflow.WithActivityInput(retries)).Await(&workBatch); err != nil {
9085
return 0, err
9186
}
9287

9388
return 0, nil
9489
}
9590

96-
func RetryN(ctx task.ActivityContext) (any, error) {
91+
func RetryN(ctx workflow.ActivityContext) (any, error) {
9792
taskExecutionID := ctx.GetTaskExecutionID()
9893
counter, _ := eMap.LoadOrStore(taskExecutionID, &atomic.Int32{})
9994
var retries int32

0 commit comments

Comments
 (0)