Skip to content

Commit c12c959

Browse files
famartingmikeee
andauthored
worflows: activity retry policy (#644)
* worflows: activity retry policy Signed-off-by: Fabian Martinez <[email protected]> * adjust name Signed-off-by: Fabian Martinez <[email protected]> * fix build Signed-off-by: Fabian Martinez <[email protected]> * add tests Signed-off-by: Fabian Martinez <[email protected]> * register activity Signed-off-by: Fabian Martinez <[email protected]> --------- Signed-off-by: Fabian Martinez <[email protected]> Co-authored-by: Mike Nguyen <[email protected]>
1 parent 59acca4 commit c12c959

File tree

9 files changed

+92
-8
lines changed

9 files changed

+92
-8
lines changed

examples/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ require (
2626
github.com/golang/protobuf v1.5.4 // indirect
2727
github.com/kr/pretty v0.3.1 // indirect
2828
github.com/marusama/semaphore/v2 v2.5.0 // indirect
29-
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d // indirect
29+
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 // indirect
3030
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
3131
go.opentelemetry.io/otel v1.27.0 // indirect
3232
go.opentelemetry.io/otel/metric v1.27.0 // indirect

examples/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
3939
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
4040
github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM=
4141
github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ=
42-
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d h1:Phnx8/wPd9BM6RPIjlqNl8kAaFjtU+Sdw9CzmZd8Wsw=
43-
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
42+
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 h1:I1yeX4tWqOdBzpRzSbY1TnHU2NI25Pdu6OXUm39emm0=
43+
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
4444
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
4545
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
4646
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=

examples/workflow/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@ expected_stdout_lines:
1515
- '== APP == Worker initialized'
1616
- '== APP == TestWorkflow registered'
1717
- '== APP == TestActivity registered'
18+
- '== APP == FailActivity registered'
1819
- '== APP == runner started'
1920
- '== APP == workflow started with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9'
2021
- '== APP == workflow paused'
2122
- '== APP == workflow resumed'
2223
- '== APP == stage: 1'
2324
- '== APP == workflow event raised'
2425
- '== APP == stage: 2'
26+
- '== APP == fail activity executions: 3'
2527
- '== APP == workflow status: COMPLETED'
2628
- '== APP == workflow purged'
2729
- '== APP == stage: 2'

examples/workflow/main.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package main
1616

1717
import (
1818
"context"
19+
"errors"
1920
"fmt"
2021
"log"
2122
"time"
@@ -24,6 +25,7 @@ import (
2425
)
2526

2627
var stage = 0
28+
var failActivityTries = 0
2729

2830
func main() {
2931
w, err := workflow.NewWorker()
@@ -43,6 +45,11 @@ func main() {
4345
}
4446
fmt.Println("TestActivity registered")
4547

48+
if err := w.RegisterActivity(FailActivity); err != nil {
49+
log.Fatal(err)
50+
}
51+
fmt.Println("FailActivity registered")
52+
4653
// Start workflow runner
4754
if err := w.Start(); err != nil {
4855
log.Fatal(err)
@@ -112,6 +119,15 @@ func main() {
112119

113120
fmt.Printf("stage: %d\n", stage)
114121

122+
waitCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
123+
_, err = wfClient.WaitForWorkflowCompletion(waitCtx, instanceID)
124+
cancel()
125+
if err != nil {
126+
log.Fatalf("failed to wait for workflow: %v", err)
127+
}
128+
129+
fmt.Printf("fail activity executions: %d\n", failActivityTries)
130+
115131
respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
116132
if err != nil {
117133
log.Fatalf("failed to get workflow: %v", err)
@@ -186,6 +202,15 @@ func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
186202
return nil, err
187203
}
188204

205+
if err := ctx.CallActivity(FailActivity, workflow.ActivityRetryPolicy(workflow.RetryPolicy{
206+
MaxAttempts: 3,
207+
InitialRetryInterval: 100 * time.Millisecond,
208+
BackoffCoefficient: 2,
209+
MaxRetryInterval: 1 * time.Second,
210+
})).Await(nil); err == nil {
211+
return nil, fmt.Errorf("unexpected no error executing fail activity")
212+
}
213+
189214
return output, nil
190215
}
191216

@@ -199,3 +224,8 @@ func TestActivity(ctx workflow.ActivityContext) (any, error) {
199224

200225
return fmt.Sprintf("Stage: %d", stage), nil
201226
}
227+
228+
func FailActivity(ctx workflow.ActivityContext) (any, error) {
229+
failActivityTries += 1
230+
return nil, errors.New("dummy activity error")
231+
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ require (
77
github.com/go-chi/chi/v5 v5.1.0
88
github.com/golang/mock v1.6.0
99
github.com/google/uuid v1.6.0
10-
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d
10+
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428
1111
github.com/stretchr/testify v1.9.0
1212
google.golang.org/grpc v1.65.0
1313
google.golang.org/protobuf v1.34.2

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
2828
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
2929
github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM=
3030
github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ=
31-
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d h1:Phnx8/wPd9BM6RPIjlqNl8kAaFjtU+Sdw9CzmZd8Wsw=
32-
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
31+
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 h1:I1yeX4tWqOdBzpRzSbY1TnHU2NI25Pdu6OXUm39emm0=
32+
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
3333
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
3434
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
3535
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=

workflow/activity_context.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package workflow
1717
import (
1818
"context"
1919
"encoding/json"
20+
"time"
2021

2122
"google.golang.org/protobuf/types/known/wrapperspb"
2223

@@ -38,7 +39,16 @@ func (wfac *ActivityContext) Context() context.Context {
3839
type callActivityOption func(*callActivityOptions) error
3940

4041
type callActivityOptions struct {
41-
rawInput *wrapperspb.StringValue
42+
rawInput *wrapperspb.StringValue
43+
retryPolicy *RetryPolicy
44+
}
45+
46+
type RetryPolicy struct {
47+
MaxAttempts int
48+
InitialRetryInterval time.Duration
49+
BackoffCoefficient float64
50+
MaxRetryInterval time.Duration
51+
RetryTimeout time.Duration
4252
}
4353

4454
// ActivityInput is an option to pass a JSON-serializable input
@@ -61,6 +71,26 @@ func ActivityRawInput(input string) callActivityOption {
6171
}
6272
}
6373

74+
func ActivityRetryPolicy(policy RetryPolicy) callActivityOption {
75+
return func(opts *callActivityOptions) error {
76+
opts.retryPolicy = &policy
77+
return nil
78+
}
79+
}
80+
81+
func (opts *callActivityOptions) getRetryPolicy() *task.ActivityRetryPolicy {
82+
if opts.retryPolicy == nil {
83+
return nil
84+
}
85+
return &task.ActivityRetryPolicy{
86+
MaxAttempts: opts.retryPolicy.MaxAttempts,
87+
InitialRetryInterval: opts.retryPolicy.InitialRetryInterval,
88+
BackoffCoefficient: opts.retryPolicy.BackoffCoefficient,
89+
MaxRetryInterval: opts.retryPolicy.MaxRetryInterval,
90+
RetryTimeout: opts.retryPolicy.RetryTimeout,
91+
}
92+
}
93+
6494
func marshalData(input any) ([]byte, error) {
6595
if input == nil {
6696
return nil, nil

workflow/activity_context_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ import (
1919
"encoding/json"
2020
"fmt"
2121
"testing"
22+
"time"
2223

24+
"github.com/microsoft/durabletask-go/task"
2325
"github.com/stretchr/testify/assert"
2426
"github.com/stretchr/testify/require"
2527
)
@@ -69,6 +71,26 @@ func TestCallActivityOptions(t *testing.T) {
6971
opts := returnCallActivityOptions(ActivityRawInput("test"))
7072
assert.Equal(t, "test", opts.rawInput.GetValue())
7173
})
74+
75+
t.Run("activity retry policy - set", func(t *testing.T) {
76+
opts := returnCallActivityOptions(ActivityRetryPolicy(RetryPolicy{
77+
MaxAttempts: 3,
78+
InitialRetryInterval: 100 * time.Millisecond,
79+
BackoffCoefficient: 2,
80+
MaxRetryInterval: 2 * time.Second,
81+
}))
82+
assert.Equal(t, &task.ActivityRetryPolicy{
83+
MaxAttempts: 3,
84+
InitialRetryInterval: 100 * time.Millisecond,
85+
BackoffCoefficient: 2,
86+
MaxRetryInterval: 2 * time.Second,
87+
}, opts.getRetryPolicy())
88+
})
89+
90+
t.Run("activity retry policy - empty", func(t *testing.T) {
91+
opts := returnCallActivityOptions()
92+
assert.Empty(t, opts.getRetryPolicy())
93+
})
7294
}
7395

7496
func returnCallActivityOptions(opts ...callActivityOption) callActivityOptions {

workflow/context.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (wfc *WorkflowContext) CallActivity(activity interface{}, opts ...callActiv
6868
}
6969
}
7070

71-
return wfc.orchestrationContext.CallActivity(activity, task.WithRawActivityInput(options.rawInput.GetValue()))
71+
return wfc.orchestrationContext.CallActivity(activity, task.WithRawActivityInput(options.rawInput.GetValue()), task.WithRetryPolicy(options.getRetryPolicy()))
7272
}
7373

7474
// CallChildWorkflow returns a completable task for a given workflow.

0 commit comments

Comments
 (0)