Skip to content

Commit b13ab9c

Browse files
authored
no retry when context is done (#241)
* no retry when context is done * adding unit test
1 parent 815dd18 commit b13ab9c

File tree

6 files changed

+66
-26
lines changed

6 files changed

+66
-26
lines changed

common/backoff/retry.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package backoff
2222

2323
import (
24+
"context"
2425
"sync"
2526
"time"
2627
)
@@ -86,11 +87,12 @@ func NewConcurrentRetrier(retryPolicy RetryPolicy) *ConcurrentRetrier {
8687
}
8788

8889
// Retry function can be used to wrap any call with retry logic using the passed in policy
89-
func Retry(operation Operation, policy RetryPolicy, isRetryable IsRetryable) error {
90+
func Retry(ctx context.Context, operation Operation, policy RetryPolicy, isRetryable IsRetryable) error {
9091
var err error
9192
var next time.Duration
9293

9394
r := NewRetrier(policy, SystemClock)
95+
Retry_Loop:
9496
for {
9597
// operation completed successfully. No need to retry.
9698
if err = operation(); err == nil {
@@ -106,6 +108,18 @@ func Retry(operation Operation, policy RetryPolicy, isRetryable IsRetryable) err
106108
return err
107109
}
108110

111+
// check if ctx is done
112+
if ctxDone := ctx.Done(); ctxDone != nil {
113+
timer := time.NewTimer(next)
114+
select {
115+
case <-ctxDone:
116+
return err
117+
case <-timer.C:
118+
continue Retry_Loop
119+
}
120+
}
121+
122+
// ctx is not cancellable
109123
time.Sleep(next)
110124
}
111125
}

common/backoff/retry_test.go

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package backoff
2222

2323
import (
24+
"context"
2425
"fmt"
2526
"testing"
2627
"time"
@@ -62,11 +63,35 @@ func (s *RetrySuite) TestRetrySuccess() {
6263
policy.SetMaximumInterval(5 * time.Millisecond)
6364
policy.SetMaximumAttempts(10)
6465

65-
err := Retry(op, policy, nil)
66+
err := Retry(context.Background(), op, policy, nil)
6667
s.NoError(err)
6768
s.Equal(5, i)
6869
}
6970

71+
func (s *RetrySuite) TestNoRetryAfterContextDone() {
72+
i := 0
73+
op := func() error {
74+
i++
75+
76+
if i == 5 {
77+
return nil
78+
}
79+
80+
return &someError{}
81+
}
82+
83+
policy := NewExponentialRetryPolicy(1 * time.Millisecond)
84+
policy.SetMaximumInterval(5 * time.Millisecond)
85+
policy.SetMaximumAttempts(10)
86+
87+
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*5)
88+
defer cancel()
89+
90+
err := Retry(ctx, op, policy, nil)
91+
s.Error(err)
92+
s.True(i >= 2) // verify that we did retried
93+
}
94+
7095
func (s *RetrySuite) TestRetryFailed() {
7196
i := 0
7297
op := func() error {
@@ -83,7 +108,7 @@ func (s *RetrySuite) TestRetryFailed() {
83108
policy.SetMaximumInterval(5 * time.Millisecond)
84109
policy.SetMaximumAttempts(5)
85110

86-
err := Retry(op, policy, nil)
111+
err := Retry(context.Background(), op, policy, nil)
87112
s.Error(err)
88113
}
89114

@@ -111,7 +136,7 @@ func (s *RetrySuite) TestIsRetryableSuccess() {
111136
policy.SetMaximumInterval(5 * time.Millisecond)
112137
policy.SetMaximumAttempts(10)
113138

114-
err := Retry(op, policy, isRetryable)
139+
err := Retry(context.Background(), op, policy, isRetryable)
115140
s.NoError(err, "Retry count: %v", i)
116141
s.Equal(5, i)
117142
}
@@ -132,7 +157,7 @@ func (s *RetrySuite) TestIsRetryableFailure() {
132157
policy.SetMaximumInterval(5 * time.Millisecond)
133158
policy.SetMaximumAttempts(10)
134159

135-
err := Retry(op, policy, IgnoreErrors([]error{&someError{}}))
160+
err := Retry(context.Background(), op, policy, IgnoreErrors([]error{&someError{}}))
136161
s.Error(err)
137162
s.Equal(1, i)
138163
}

internal_task_handlers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1042,7 +1042,7 @@ func recordActivityHeartbeat(
10421042
Identity: common.StringPtr(identity)}
10431043

10441044
var heartbeatResponse *s.RecordActivityTaskHeartbeatResponse
1045-
heartbeatErr := backoff.Retry(
1045+
heartbeatErr := backoff.Retry(ctx,
10461046
func() error {
10471047
tchCtx, cancel := newTChannelContext(ctx)
10481048
defer cancel()

internal_task_pollers.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,12 @@ func (wtp *workflowTaskPoller) ProcessTask(task interface{}) error {
161161
}
162162
wtp.metricsScope.Timer(metrics.DecisionExecutionLatency).Record(time.Now().Sub(executionStartTime))
163163

164+
ctx := context.Background()
164165
responseStartTime := time.Now()
165166
// Respond task completion.
166-
err = backoff.Retry(
167+
err = backoff.Retry(ctx,
167168
func() error {
168-
tchCtx, cancel := newTChannelContext(context.Background())
169+
tchCtx, cancel := newTChannelContext(ctx)
169170
defer cancel()
170171
var err1 error
171172
switch request := completedRequest.(type) {
@@ -255,7 +256,7 @@ func newGetHistoryPageFunc(
255256
metricsScope.Counter(metrics.WorkflowGetHistoryCounter).Inc(1)
256257
startTime := time.Now()
257258
var resp *s.GetWorkflowExecutionHistoryResponse
258-
err := backoff.Retry(
259+
err := backoff.Retry(ctx,
259260
func() error {
260261
tchCtx, cancel := newTChannelContext(ctx)
261262
defer cancel()
@@ -401,17 +402,17 @@ func reportActivityComplete(ctx context.Context, service m.TChanWorkflowService,
401402
var reportErr error
402403
switch request := request.(type) {
403404
case *s.RespondActivityTaskCanceledRequest:
404-
reportErr = backoff.Retry(
405+
reportErr = backoff.Retry(ctx,
405406
func() error {
406407
return service.RespondActivityTaskCanceled(tchCtx, request)
407408
}, serviceOperationRetryPolicy, isServiceTransientError)
408409
case *s.RespondActivityTaskFailedRequest:
409-
reportErr = backoff.Retry(
410+
reportErr = backoff.Retry(ctx,
410411
func() error {
411412
return service.RespondActivityTaskFailed(tchCtx, request)
412413
}, serviceOperationRetryPolicy, isServiceTransientError)
413414
case *s.RespondActivityTaskCompletedRequest:
414-
reportErr = backoff.Retry(
415+
reportErr = backoff.Retry(ctx,
415416
func() error {
416417
return service.RespondActivityTaskCompleted(tchCtx, request)
417418
}, serviceOperationRetryPolicy, isServiceTransientError)

internal_worker.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,9 @@ func ensureRequiredParams(params *workerExecutionParameters) {
162162
// It returns an error, if the server returns an EntityNotExist or BadRequest error
163163
// On any other transient error, this method will just return success
164164
func verifyDomainExist(client m.TChanWorkflowService, domain string, logger *zap.Logger) error {
165-
165+
ctx := context.Background()
166166
descDomainOp := func() error {
167-
tchCtx, cancel := newTChannelContext(context.Background())
167+
tchCtx, cancel := newTChannelContext(ctx)
168168
defer cancel()
169169
_, err := client.DescribeDomain(tchCtx, &shared.DescribeDomainRequest{Name: &domain})
170170
if err != nil {
@@ -187,7 +187,7 @@ func verifyDomainExist(client m.TChanWorkflowService, domain string, logger *zap
187187
}
188188

189189
// exponential backoff retry for upto a minute
190-
return backoff.Retry(descDomainOp, serviceOperationRetryPolicy, isServiceTransientError)
190+
return backoff.Retry(ctx, descDomainOp, serviceOperationRetryPolicy, isServiceTransientError)
191191
}
192192

193193
func newWorkflowWorkerInternal(

internal_workflow_client.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func (wc *workflowClient) StartWorkflow(
117117
var response *s.StartWorkflowExecutionResponse
118118

119119
// Start creating workflow request.
120-
err = backoff.Retry(
120+
err = backoff.Retry(ctx,
121121
func() error {
122122
tchCtx, cancel := newTChannelContext(ctx)
123123
defer cancel()
@@ -162,7 +162,7 @@ func (wc *workflowClient) SignalWorkflow(ctx context.Context, workflowID string,
162162
Identity: common.StringPtr(wc.identity),
163163
}
164164

165-
return backoff.Retry(
165+
return backoff.Retry(ctx,
166166
func() error {
167167
tchCtx, cancel := newTChannelContext(ctx)
168168
defer cancel()
@@ -181,7 +181,7 @@ func (wc *workflowClient) CancelWorkflow(ctx context.Context, workflowID string,
181181
Identity: common.StringPtr(wc.identity),
182182
}
183183

184-
return backoff.Retry(
184+
return backoff.Retry(ctx,
185185
func() error {
186186
tchCtx, cancel := newTChannelContext(ctx)
187187
defer cancel()
@@ -203,7 +203,7 @@ func (wc *workflowClient) TerminateWorkflow(ctx context.Context, workflowID stri
203203
Identity: common.StringPtr(wc.identity),
204204
}
205205

206-
err := backoff.Retry(
206+
err := backoff.Retry(ctx,
207207
func() error {
208208
tchCtx, cancel := newTChannelContext(ctx)
209209
defer cancel()
@@ -231,7 +231,7 @@ GetHistoryLoop:
231231
}
232232

233233
var response *s.GetWorkflowExecutionHistoryResponse
234-
err := backoff.Retry(
234+
err := backoff.Retry(ctx,
235235
func() error {
236236
var err1 error
237237
tchCtx, cancel := newTChannelContext(ctx)
@@ -352,7 +352,7 @@ func (wc *workflowClient) ListClosedWorkflow(ctx context.Context, request *s.Lis
352352
request.Domain = common.StringPtr(wc.domain)
353353
}
354354
var response *s.ListClosedWorkflowExecutionsResponse
355-
err := backoff.Retry(
355+
err := backoff.Retry(ctx,
356356
func() error {
357357
var err1 error
358358
tchCtx, cancel := newTChannelContext(ctx)
@@ -376,7 +376,7 @@ func (wc *workflowClient) ListOpenWorkflow(ctx context.Context, request *s.ListO
376376
request.Domain = common.StringPtr(wc.domain)
377377
}
378378
var response *s.ListOpenWorkflowExecutionsResponse
379-
err := backoff.Retry(
379+
err := backoff.Retry(ctx,
380380
func() error {
381381
var err1 error
382382
tchCtx, cancel := newTChannelContext(ctx)
@@ -423,7 +423,7 @@ func (wc *workflowClient) QueryWorkflow(ctx context.Context, workflowID string,
423423
}
424424

425425
var resp *s.QueryWorkflowResponse
426-
err := backoff.Retry(
426+
err := backoff.Retry(ctx,
427427
func() error {
428428
tchCtx, cancel := newTChannelContext(ctx)
429429
defer cancel()
@@ -444,7 +444,7 @@ func (wc *workflowClient) QueryWorkflow(ctx context.Context, workflowID string,
444444
// - BadRequestError
445445
// - InternalServiceError
446446
func (dc *domainClient) Register(ctx context.Context, request *s.RegisterDomainRequest) error {
447-
return backoff.Retry(
447+
return backoff.Retry(ctx,
448448
func() error {
449449
tchCtx, cancel := newTChannelContext(ctx)
450450
defer cancel()
@@ -465,7 +465,7 @@ func (dc *domainClient) Describe(ctx context.Context, name string) (*s.DomainInf
465465
}
466466

467467
var response *s.DescribeDomainResponse
468-
err := backoff.Retry(
468+
err := backoff.Retry(ctx,
469469
func() error {
470470
tchCtx, cancel := newTChannelContext(ctx)
471471
defer cancel()
@@ -493,7 +493,7 @@ func (dc *domainClient) Update(ctx context.Context, name string, domainInfo *s.U
493493
Configuration: domainConfig,
494494
}
495495

496-
return backoff.Retry(
496+
return backoff.Retry(ctx,
497497
func() error {
498498
tchCtx, cancel := newTChannelContext(ctx)
499499
defer cancel()

0 commit comments

Comments
 (0)