Skip to content

Commit 516360e

Browse files
committed
Fix localactivity should not retry on CancelError (#890)
1 parent 5a70ce8 commit 516360e

File tree

5 files changed

+130
-10
lines changed

5 files changed

+130
-10
lines changed

internal/error.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,12 @@ func NewCanceledError(details ...interface{}) *CanceledError {
195195
return &CanceledError{details: ErrorDetailsValues(details)}
196196
}
197197

198+
// IsCanceledError return whether error in CanceledError
199+
func IsCanceledError(err error) bool {
200+
_, ok := err.(*CanceledError)
201+
return ok
202+
}
203+
198204
// NewContinueAsNewError creates ContinueAsNewError instance
199205
// If the workflow main function returns this error then the current execution is ended and
200206
// the new execution with same workflow ID is started automatically with options

internal/error_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,35 @@ func Test_CanceledError(t *testing.T) {
343343
require.Equal(t, testErrorDetails3, b3)
344344
}
345345

346+
func Test_IsCanceledError(t *testing.T) {
347+
348+
tests := []struct {
349+
name string
350+
err error
351+
expected bool
352+
}{
353+
{
354+
name: "empty detail",
355+
err: NewCanceledError(),
356+
expected: true,
357+
},
358+
{
359+
name: "with detail",
360+
err: NewCanceledError("details"),
361+
expected: true,
362+
},
363+
{
364+
name: "not canceled error",
365+
err: errors.New("details"),
366+
expected: false,
367+
},
368+
}
369+
370+
for _, test := range tests {
371+
require.Equal(t, test.expected, IsCanceledError(test.err))
372+
}
373+
}
374+
346375
func TestErrorDetailsValues(t *testing.T) {
347376
e := ErrorDetailsValues{}
348377
require.Equal(t, ErrNoData, e.Get())

internal/internal_task_handlers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -879,7 +879,7 @@ func (w *workflowExecutionContextImpl) ProcessLocalActivityResult(workflowTask *
879879
}
880880

881881
func (w *workflowExecutionContextImpl) retryLocalActivity(lar *localActivityResult) bool {
882-
if lar.task.retryPolicy == nil || lar.err == nil || lar.err == ErrCanceled {
882+
if lar.task.retryPolicy == nil || lar.err == nil || IsCanceledError(lar.err) {
883883
return false
884884
}
885885

internal/internal_workflow_testsuite_test.go

Lines changed: 93 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ func (s *WorkflowTestSuiteUnitTest) SetupSuite() {
6565
RegisterActivityWithOptions(testActivityHello, RegisterActivityOptions{Name: "testActivityHello"})
6666
RegisterActivity(testActivityContext)
6767
RegisterActivity(testActivityHeartbeat)
68+
RegisterActivity(testActivityCanceled)
6869
}
6970

7071
func TestUnitTestSuite(t *testing.T) {
@@ -494,6 +495,14 @@ func testActivityContext(ctx context.Context) (string, error) {
494495
return "", fmt.Errorf("context did not propagate to workflow")
495496
}
496497

498+
func testActivityCanceled(ctx context.Context) (int32, error) {
499+
info := GetActivityInfo(ctx)
500+
if info.Attempt < 2 {
501+
return int32(-1), NewCanceledError("details")
502+
}
503+
return info.Attempt, nil
504+
}
505+
497506
func testWorkflowHeartbeat(ctx Context, msg string, waitTime time.Duration) (string, error) {
498507
ao := ActivityOptions{
499508
ScheduleToStartTimeout: time.Minute,
@@ -2214,15 +2223,15 @@ func (s *WorkflowTestSuiteUnitTest) Test_ActivityHeartbeatRetry() {
22142223

22152224
func (s *WorkflowTestSuiteUnitTest) Test_LocalActivityRetry() {
22162225

2217-
localActivityFn := func(ctx context.Context) (string, error) {
2226+
localActivityFn := func(ctx context.Context) (int32, error) {
22182227
info := GetActivityInfo(ctx)
22192228
if info.Attempt < 2 {
2220-
return "", NewCustomError("bad-luck")
2229+
return int32(-1), NewCustomError("bad-luck")
22212230
}
2222-
return "retry-done", nil
2231+
return info.Attempt, nil
22232232
}
22242233

2225-
workflowFn := func(ctx Context) (string, error) {
2234+
workflowFn := func(ctx Context) (int32, error) {
22262235
lao := LocalActivityOptions{
22272236
ScheduleToCloseTimeout: time.Minute,
22282237
RetryPolicy: &RetryPolicy{
@@ -2236,10 +2245,10 @@ func (s *WorkflowTestSuiteUnitTest) Test_LocalActivityRetry() {
22362245
}
22372246
ctx = WithLocalActivityOptions(ctx, lao)
22382247

2239-
var result string
2248+
var result int32
22402249
err := ExecuteLocalActivity(ctx, localActivityFn).Get(ctx, &result)
22412250
if err != nil {
2242-
return "", err
2251+
return int32(-1), err
22432252
}
22442253
return result, nil
22452254
}
@@ -2250,9 +2259,85 @@ func (s *WorkflowTestSuiteUnitTest) Test_LocalActivityRetry() {
22502259

22512260
s.True(env.IsWorkflowCompleted())
22522261
s.NoError(env.GetWorkflowError())
2253-
var result string
2262+
var result int32
22542263
s.NoError(env.GetWorkflowResult(&result))
2255-
s.Equal("retry-done", result)
2264+
s.Equal(int32(2), result)
2265+
}
2266+
2267+
func (s *WorkflowTestSuiteUnitTest) Test_LocalActivityRetryOnCancel() {
2268+
attempts := 0
2269+
localActivityFn := func(ctx context.Context) (int32, error) {
2270+
attempts++
2271+
info := GetActivityInfo(ctx)
2272+
if info.Attempt < 2 {
2273+
return int32(-1), NewCanceledError("details")
2274+
}
2275+
return info.Attempt, nil
2276+
}
2277+
2278+
workflowFn := func(ctx Context) (int32, error) {
2279+
lao := LocalActivityOptions{
2280+
ScheduleToCloseTimeout: time.Minute,
2281+
RetryPolicy: &RetryPolicy{
2282+
MaximumAttempts: 3,
2283+
InitialInterval: time.Second,
2284+
MaximumInterval: time.Second * 10,
2285+
BackoffCoefficient: 2,
2286+
NonRetriableErrorReasons: []string{"bad-bug"},
2287+
ExpirationInterval: time.Minute,
2288+
},
2289+
}
2290+
ctx = WithLocalActivityOptions(ctx, lao)
2291+
2292+
var result int32
2293+
err := ExecuteLocalActivity(ctx, localActivityFn).Get(ctx, &result)
2294+
if err != nil {
2295+
return int32(-1), err
2296+
}
2297+
return result, nil
2298+
}
2299+
2300+
env := s.NewTestWorkflowEnvironment()
2301+
RegisterWorkflow(workflowFn)
2302+
env.ExecuteWorkflow(workflowFn)
2303+
2304+
s.True(env.IsWorkflowCompleted())
2305+
s.Error(env.GetWorkflowError())
2306+
s.True(IsCanceledError(env.GetWorkflowError()))
2307+
s.Equal(1, attempts)
2308+
}
2309+
2310+
func (s *WorkflowTestSuiteUnitTest) Test_ActivityRetryOnCancel() {
2311+
workflowFn := func(ctx Context) (int32, error) {
2312+
ao := ActivityOptions{
2313+
ScheduleToStartTimeout: time.Minute,
2314+
StartToCloseTimeout: time.Minute,
2315+
RetryPolicy: &RetryPolicy{
2316+
MaximumAttempts: 3,
2317+
InitialInterval: time.Second,
2318+
MaximumInterval: time.Second * 10,
2319+
BackoffCoefficient: 2,
2320+
NonRetriableErrorReasons: []string{"bad-bug"},
2321+
ExpirationInterval: time.Minute,
2322+
},
2323+
}
2324+
ctx = WithActivityOptions(ctx, ao)
2325+
2326+
var result int32
2327+
err := ExecuteActivity(ctx, testActivityCanceled).Get(ctx, &result)
2328+
if err != nil {
2329+
return int32(-1), err
2330+
}
2331+
return result, nil
2332+
}
2333+
2334+
env := s.NewTestWorkflowEnvironment()
2335+
RegisterWorkflow(workflowFn)
2336+
env.ExecuteWorkflow(workflowFn)
2337+
2338+
s.True(env.IsWorkflowCompleted())
2339+
s.Error(env.GetWorkflowError())
2340+
s.True(IsCanceledError(env.GetWorkflowError()))
22562341
}
22572342

22582343
func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflowRetry() {

internal/workflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ func scheduleLocalActivity(ctx Context, params *executeLocalActivityParams) Futu
514514
ctxDone.removeReceiveCallback(cancellationCallback)
515515
}
516516

517-
if lar.err == nil || lar.backoff <= 0 {
517+
if lar.err == nil || IsCanceledError(lar.err) || lar.backoff <= 0 {
518518
f.Set(lar.result, lar.err)
519519
return
520520
}

0 commit comments

Comments
 (0)