Skip to content

Commit ce814ba

Browse files
committed
REP-5329: Rework retryer internals to compute duration from a start time.
1 parent 5534bcc commit ce814ba

File tree

4 files changed

+73
-45
lines changed

4 files changed

+73
-45
lines changed

internal/retry/error.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package retry
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"github.com/10gen/migration-verifier/internal/reportutils"
8+
)
9+
10+
type RetryDurationLimitExceededErr struct {
11+
lastErr error
12+
attempts int
13+
duration time.Duration
14+
}
15+
16+
func (rde RetryDurationLimitExceededErr) Error() string {
17+
return fmt.Sprintf(
18+
"retryable function did not succeed after %d attempt(s) over %s; last error was: %v",
19+
rde.attempts,
20+
reportutils.DurationToHMS(rde.duration),
21+
rde.lastErr,
22+
)
23+
}
24+
25+
func (rde RetryDurationLimitExceededErr) Unwrap() error {
26+
return rde.lastErr
27+
}

internal/retry/retry.go

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@ import (
66
"time"
77

88
"github.com/10gen/migration-verifier/internal/logger"
9-
"github.com/10gen/migration-verifier/internal/reportutils"
109
"github.com/10gen/migration-verifier/internal/util"
11-
"github.com/pkg/errors"
1210
)
1311

1412
// Run retries f() whenever a transient error happens, up to the retryer's
@@ -41,48 +39,49 @@ func (r *Retryer) runRetryLoop(
4139

4240
ri := &Info{
4341
durationLimit: r.retryLimit,
42+
lastResetTime: time.Now(),
4443
}
4544
sleepTime := minSleepTime
4645

47-
for ri.durationSoFar <= ri.durationLimit {
48-
retryStartTime := time.Now()
49-
50-
// Run f() with the current collection name.
46+
for {
5147
err = f(ri)
5248

5349
// If f() returned a transient error, sleep and increase the sleep
5450
// time for the next retry, maxing out at the maxSleepTime.
55-
if r.shouldRetryWithSleep(logger, sleepTime, err) {
56-
select {
57-
case <-ctx.Done():
58-
logger.Error().Err(ctx.Err()).Msg("Context was canceled. Aborting retry loop.")
59-
return ctx.Err()
60-
case <-time.After(sleepTime):
61-
sleepTime *= sleepTimeMultiplier
62-
if sleepTime > maxSleepTime {
63-
sleepTime = maxSleepTime
64-
}
65-
}
51+
if err == nil {
52+
return nil
53+
}
6654

67-
ri.attemptNumber++
55+
if !r.shouldRetryWithSleep(logger, sleepTime, err) {
56+
return err
57+
}
6858

69-
if ri.shouldResetDuration {
70-
ri.durationSoFar = 0
71-
ri.shouldResetDuration = false
72-
} else {
73-
ri.durationSoFar += time.Since(retryStartTime)
59+
select {
60+
case <-ctx.Done():
61+
logger.Error().Err(ctx.Err()).Msg("Context was canceled. Aborting retry loop.")
62+
return ctx.Err()
63+
case <-time.After(sleepTime):
64+
sleepTime *= sleepTimeMultiplier
65+
if sleepTime > maxSleepTime {
66+
sleepTime = maxSleepTime
7467
}
75-
continue
7668
}
7769

78-
return err
79-
}
70+
ri.attemptNumber++
8071

81-
return errors.Wrapf(err,
82-
"retryable function did not succeed after %d attempt(s) over %s",
83-
ri.attemptNumber,
84-
reportutils.DurationToHMS(ri.durationSoFar),
85-
)
72+
if ri.shouldResetDuration {
73+
ri.lastResetTime = time.Now()
74+
ri.shouldResetDuration = false
75+
}
76+
77+
if ri.GetDurationSoFar() > ri.durationLimit {
78+
return RetryDurationLimitExceededErr{
79+
attempts: ri.attemptNumber,
80+
duration: ri.GetDurationSoFar(),
81+
lastErr: err,
82+
}
83+
}
84+
}
8685
}
8786

8887
//

internal/retry/retry_info.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
type Info struct {
1616
attemptNumber int
1717

18-
durationSoFar time.Duration
18+
lastResetTime time.Time
1919
durationLimit time.Duration
2020

2121
// Used to reset the time elapsed for long running operations.
@@ -52,7 +52,7 @@ func (ri *Info) Log(logger *zerolog.Logger, cmdName string, clientType string, d
5252
}
5353
event.Str("context", msg).
5454
Int("attemptNumber", ri.attemptNumber).
55-
Str("durationSoFar", reportutils.DurationToHMS(ri.durationSoFar)).
55+
Str("durationSoFar", reportutils.DurationToHMS(ri.GetDurationSoFar())).
5656
Str("durationLimit", reportutils.DurationToHMS(ri.durationLimit)).
5757
Msg("Running retryable function")
5858
}
@@ -65,7 +65,7 @@ func (ri *Info) GetAttemptNumber() int {
6565
// GetDurationSoFar returns the Info's current duration so far. This duration
6666
// applies to the duration of retrying for transient errors only.
6767
func (ri *Info) GetDurationSoFar() time.Duration {
68-
return ri.durationSoFar
68+
return time.Since(ri.lastResetTime)
6969
}
7070

7171
// IterationSuccess is used to tell the retry util to reset its measurement

internal/retry/retryer_test.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (suite *UnitTestSuite) TestRetryerDurationLimitIsZero() {
7373
retryer := New(0)
7474

7575
attemptNumber := -1
76-
cmdErr := mongo.CommandError{
76+
cmdErr := &mongo.CommandError{
7777
Labels: []string{"NetworkError"},
7878
Name: "NetworkError",
7979
}
@@ -83,8 +83,8 @@ func (suite *UnitTestSuite) TestRetryerDurationLimitIsZero() {
8383
}
8484

8585
err := retryer.Run(suite.Context(), suite.Logger(), f)
86-
suite.Equal(cmdErr, err)
87-
suite.Equal(0, attemptNumber)
86+
suite.Assert().ErrorIs(err, cmdErr)
87+
suite.Assert().Equal(0, attemptNumber)
8888
}
8989

9090
func (suite *UnitTestSuite) TestRetryerDurationReset() {
@@ -95,7 +95,7 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() {
9595
// to execute. (f will artificially advance the time to greater than the
9696
// durationLimit.)
9797

98-
transientNetworkError := mongo.CommandError{
98+
transientNetworkError := &mongo.CommandError{
9999
Labels: []string{"NetworkError"},
100100
Name: "NetworkError",
101101
}
@@ -105,7 +105,7 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() {
105105
noSuccessIterations := 0
106106
f1 := func(ri *Info) error {
107107
// Artificially advance how much time was taken.
108-
ri.durationSoFar += 2 * ri.durationLimit
108+
ri.lastResetTime = ri.lastResetTime.Add(-2 * ri.durationLimit)
109109

110110
noSuccessIterations++
111111
if noSuccessIterations == 1 {
@@ -116,17 +116,19 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() {
116116
}
117117

118118
err := retryer.Run(suite.Context(), logger, f1)
119-
// err is not nil and err is not the network error thrown by the function.
120-
suite.Error(err)
121-
suite.NotEqual(err, transientNetworkError)
119+
120+
// The error should be the limit-exceeded error, with the
121+
// last-noted error being the transient error.
122+
suite.Assert().ErrorAs(err, &RetryDurationLimitExceededErr{})
123+
suite.Assert().ErrorIs(err, transientNetworkError)
122124
suite.Equal(1, noSuccessIterations)
123125

124126
// 2) Calling IterationSuccess() means f will run more than once because the
125127
// duration should be reset.
126128
successIterations := 0
127129
f2 := func(ri *Info) error {
128130
// Artificially advance how much time was taken.
129-
ri.durationSoFar += 2 * ri.durationLimit
131+
ri.lastResetTime = ri.lastResetTime.Add(-2 * ri.durationLimit)
130132

131133
ri.IterationSuccess()
132134

@@ -139,8 +141,8 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() {
139141
}
140142

141143
err = retryer.Run(suite.Context(), logger, f2)
142-
suite.NoError(err)
143-
suite.Equal(2, successIterations)
144+
suite.Assert().NoError(err)
145+
suite.Assert().Equal(2, successIterations)
144146
}
145147

146148
func (suite *UnitTestSuite) TestCancelViaContext() {

0 commit comments

Comments
 (0)