Skip to content

Commit 0df73fc

Browse files
authored
add onRetryCallback callback function (#405)
* add onRetryCallback callback function * use only one OnRetryCallback function, instead of one with and one without context * move retry attempt trace log to be the default onRetryCallback behaviour * use parentCtx when calling onRetryCallback function
1 parent 524ce8f commit 0df73fc

File tree

3 files changed

+54
-8
lines changed

3 files changed

+54
-8
lines changed

interceptors/retry/options.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ var (
2626
backoffFunc: BackoffFuncContext(func(ctx context.Context, attempt uint) time.Duration {
2727
return BackoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10)(attempt)
2828
}),
29+
onRetryCallback: OnRetryCallback(func(ctx context.Context, attempt uint, err error) {
30+
logTrace(ctx, "grpc_retry attempt: %d, backoff for %v", attempt, err)
31+
}),
2932
}
3033
)
3134

@@ -45,6 +48,9 @@ type BackoffFunc func(attempt uint) time.Duration
4548
// with the next iteration. The context can be used to extract request scoped metadata and context values.
4649
type BackoffFuncContext func(ctx context.Context, attempt uint) time.Duration
4750

51+
// OnRetryCallback is the type of function called when a retry occurs.
52+
type OnRetryCallback func(ctx context.Context, attempt uint, err error)
53+
4854
// Disable disables the retry behaviour on this call, or this interceptor.
4955
//
5056
// Its semantically the same to `WithMax`
@@ -75,6 +81,15 @@ func WithBackoffContext(bf BackoffFuncContext) CallOption {
7581
}}
7682
}
7783

84+
// WithOnRetryCallback sets the callback to use when a retry occurs.
85+
//
86+
// By default, when no callback function provided, we will just print a log to trace
87+
func WithOnRetryCallback(fn OnRetryCallback) CallOption {
88+
return CallOption{applyFunc: func(o *options) {
89+
o.onRetryCallback = fn
90+
}}
91+
}
92+
7893
// WithCodes sets which codes should be retried.
7994
//
8095
// Please *use with care*, as you may be retrying non-idempotent calls.
@@ -105,11 +120,12 @@ func WithPerRetryTimeout(timeout time.Duration) CallOption {
105120
}
106121

107122
type options struct {
108-
max uint
109-
perCallTimeout time.Duration
110-
includeHeader bool
111-
codes []codes.Code
112-
backoffFunc BackoffFuncContext
123+
max uint
124+
perCallTimeout time.Duration
125+
includeHeader bool
126+
codes []codes.Code
127+
backoffFunc BackoffFuncContext
128+
onRetryCallback OnRetryCallback
113129
}
114130

115131
// CallOption is a grpc.CallOption that is local to grpc_retry.

interceptors/retry/retry.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func UnaryClientInterceptor(optFuncs ...CallOption) grpc.UnaryClientInterceptor
4848
if lastErr == nil {
4949
return nil
5050
}
51-
logTrace(parentCtx, "grpc_retry attempt: %d, got err: %v", attempt, lastErr)
51+
callOpts.onRetryCallback(parentCtx, attempt, lastErr)
5252
if isContextError(lastErr) {
5353
if parentCtx.Err() != nil {
5454
logTrace(parentCtx, "grpc_retry attempt: %d, parent context error: %v", attempt, parentCtx.Err())
@@ -110,8 +110,7 @@ func StreamClientInterceptor(optFuncs ...CallOption) grpc.StreamClientIntercepto
110110
}
111111
return retryingStreamer, nil
112112
}
113-
114-
logTrace(parentCtx, "grpc_retry attempt: %d, got err: %v", attempt, lastErr)
113+
callOpts.onRetryCallback(parentCtx, attempt, lastErr)
115114
if isContextError(lastErr) {
116115
if parentCtx.Err() != nil {
117116
logTrace(parentCtx, "grpc_retry attempt: %d, parent context error: %v", attempt, parentCtx.Err())
@@ -189,6 +188,7 @@ func (s *serverStreamingRetryingStream) RecvMsg(m interface{}) error {
189188
if err := waitRetryBackoff(attempt, s.parentCtx, s.callOpts); err != nil {
190189
return err
191190
}
191+
s.callOpts.onRetryCallback(s.parentCtx, attempt, lastErr)
192192
// TODO(bwplotka): Close cancel as it might leak some resources.
193193
callCtx, _ := perCallContext(s.parentCtx, s.callOpts, attempt) //nolint
194194
newStream, err := s.reestablishStreamAndResendBuffer(callCtx)

interceptors/retry/retry_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,21 @@ func (s *RetrySuite) TestUnary_PerCallDeadline_FailsOnParent() {
215215
require.Equal(s.T(), codes.DeadlineExceeded, status.Code(err), "failre code must be a gRPC error of Deadline class")
216216
}
217217

218+
func (s *RetrySuite) TestUnary_OnRetryCallbackCalled() {
219+
retryCallbackCount := 0
220+
221+
s.srv.resetFailingConfiguration(3, codes.Unavailable, noSleep) // see retriable_errors
222+
out, err := s.Client.Ping(s.SimpleCtx(), testpb.GoodPing,
223+
retry.WithOnRetryCallback(func(ctx context.Context, attempt uint, err error) {
224+
retryCallbackCount++
225+
}),
226+
)
227+
228+
require.NoError(s.T(), err, "the third invocation should succeed")
229+
require.NotNil(s.T(), out, "Pong must be not nil")
230+
require.EqualValues(s.T(), 2, retryCallbackCount, "two retry callbacks should be called")
231+
}
232+
218233
func (s *RetrySuite) TestServerStream_SucceedsOnRetriableError() {
219234
s.srv.resetFailingConfiguration(3, codes.DataLoss, noSleep) // see retriable_errors
220235
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList)
@@ -266,6 +281,21 @@ func (s *RetrySuite) TestServerStream_PerCallDeadline_FailsOnParent() {
266281
require.Equal(s.T(), codes.DeadlineExceeded, status.Code(err), "failre code must be a gRPC error of Deadline class")
267282
}
268283

284+
func (s *RetrySuite) TestServerStream_OnRetryCallbackCalled() {
285+
retryCallbackCount := 0
286+
287+
s.srv.resetFailingConfiguration(3, codes.Unavailable, noSleep) // see retriable_errors
288+
stream, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList,
289+
retry.WithOnRetryCallback(func(ctx context.Context, attempt uint, err error) {
290+
retryCallbackCount++
291+
}),
292+
)
293+
294+
require.NoError(s.T(), err, "establishing the connection must always succeed")
295+
s.assertPingListWasCorrect(stream)
296+
require.EqualValues(s.T(), 2, retryCallbackCount, "two retry callbacks should be called")
297+
}
298+
269299
func (s *RetrySuite) TestServerStream_CallFailsOnOutOfRetries() {
270300
restarted := s.RestartServer(3 * retryTimeout)
271301
_, err := s.Client.PingList(s.SimpleCtx(), testpb.GoodPingList)

0 commit comments

Comments
 (0)