Skip to content

Commit 3425929

Browse files
committed
move things a bit & document
1 parent 40cf7da commit 3425929

File tree

6 files changed

+47
-38
lines changed

6 files changed

+47
-38
lines changed

internal/partitions/partitions.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,7 @@ func getMidIDBounds(
613613

614614
// Append the copied bound to the other mid _id bounds.
615615
midIDBounds = append(midIDBounds, bound)
616-
ri.IterationSuccess()
616+
ri.NoteSuccess()
617617
}
618618

619619
return cursor.Err()

internal/retry/error.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,15 @@ func (rde RetryDurationLimitExceededErr) Error() string {
2525
func (rde RetryDurationLimitExceededErr) Unwrap() error {
2626
return rde.lastErr
2727
}
28+
29+
// errgroupErr is an internal error type that we return from errgroup
30+
// callbacks. It allows us to know (reliably) which error is the one
31+
// that triggers the errgroup's failure
32+
type errgroupErr struct {
33+
funcNum int
34+
errFromCallback error
35+
}
36+
37+
func (ege errgroupErr) Error() string {
38+
return fmt.Sprintf("func %d failed: %v", ege.funcNum, ege.errFromCallback)
39+
}

internal/retry/retry.go

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,20 @@ import (
1515

1616
type RetryCallback = func(context.Context, *FuncInfo) error
1717

18-
// Run retries f() whenever a transient error happens, up to the retryer's
19-
// configured duration limit.
18+
// Run() runs each given callback in parallel. If none of them fail,
19+
// then no error is returned.
20+
//
21+
// If one of them fails, the other allbacks' contexts are canceled.
22+
// If the error is non-transient, it's returned. If the error is transient,
23+
// though, then every function will be retried.
24+
//
25+
// The retries last until a function fails and it's exceeded the retryer's
26+
// limit without either a success or being canceled (i.e., because another
27+
// thread fails).
28+
//
29+
// Note that, if a given callback runs multiple potentially-retryable reqeusts,
30+
// each successful request should be noted in the callback's FuncInfo.
31+
// See that struct's documentation for more details.
2032
//
2133
// IMPORTANT: This function should generally NOT be used within a transaction
2234
// callback. It may be used within a transaction callback if and only if:
@@ -35,18 +47,6 @@ func (r *Retryer) Run(
3547
return r.runRetryLoop(ctx, logger, f)
3648
}
3749

38-
type errgroupErr struct {
39-
funcNum int
40-
err error
41-
}
42-
43-
func (ege errgroupErr) Error() string {
44-
return fmt.Sprintf("func %d failed: %v", ege.funcNum, ege.err)
45-
}
46-
func (ege errgroupErr) Unwrap() error {
47-
return ege.err
48-
}
49-
5050
// runRetryLoop contains the core logic for the retry loops.
5151
func (r *Retryer) runRetryLoop(
5252
ctx context.Context,
@@ -82,8 +82,8 @@ func (r *Retryer) runRetryLoop(
8282

8383
if err != nil {
8484
return errgroupErr{
85-
funcNum: i,
86-
err: err,
85+
funcNum: i,
86+
errFromCallback: err,
8787
}
8888
}
8989

@@ -97,26 +97,25 @@ func (r *Retryer) runRetryLoop(
9797
return nil
9898
}
9999

100-
// Not a transient error? Fail immediately.
101-
if !r.shouldRetryWithSleep(logger, sleepTime, err) {
102-
return err
103-
}
104-
105-
// Our error is transient. First we learn which function failed.
106-
// We have to get this information from the error itself in order
107-
// for it to be fully reliable.
100+
// Let's get the actual error from the function.
108101
groupErr := errgroupErr{}
109102
if !errors.As(err, &groupErr) {
110103
panic(fmt.Sprintf("Error should be a %T, not %T: %v", groupErr, err, err))
111104
}
112-
failedFuncInfo := funcinfos[groupErr.funcNum]
113105

114-
// If we've exhausted the allowed time then fail.
106+
// Not a transient error? Fail immediately.
107+
if !r.shouldRetryWithSleep(logger, sleepTime, groupErr.errFromCallback) {
108+
return groupErr.errFromCallback
109+
}
110+
111+
// Our error is transient. If we've exhausted the allowed time
112+
// then fail.
113+
failedFuncInfo := funcinfos[groupErr.funcNum]
115114
if failedFuncInfo.GetDurationSoFar() > li.durationLimit {
116115
return RetryDurationLimitExceededErr{
117116
attempts: li.attemptNumber,
118117
duration: failedFuncInfo.GetDurationSoFar(),
119-
lastErr: groupErr.err,
118+
lastErr: groupErr.errFromCallback,
120119
}
121120
}
122121

internal/retry/retry_info.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,12 @@ func (fi *FuncInfo) GetDurationSoFar() time.Duration {
6969
return time.Since(fi.lastResetTime)
7070
}
7171

72-
// IterationSuccess is used to tell the retry util to reset its measurement
72+
// NoteSuccess is used to tell the retry util to reset its measurement
7373
// of how long the closure has been running for. This is useful for long
7474
// running operations that might run successfully for a few days and then fail.
75-
// Essentially, calling this function tells the retry util not to include the
76-
// closure's run time as a part of the overall measurement of how long the
77-
// closure took including retries, since that measurement is used to determine
78-
// whether we want to retry the operation or not. (If the measurement is greater
79-
// than the retry time, we will not retry.)
80-
func (i *FuncInfo) IterationSuccess() {
75+
//
76+
// Call this after every successful command in a multi-command callback.
77+
// (It’s useless--but harmless--in a single-command callback.)
78+
func (i *FuncInfo) NoteSuccess() {
8179
i.lastResetTime = time.Now()
8280
}

internal/retry/retryer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() {
124124
// Artificially advance how much time was taken.
125125
ri.lastResetTime = ri.lastResetTime.Add(-2 * ri.loopInfo.durationLimit)
126126

127-
ri.IterationSuccess()
127+
ri.NoteSuccess()
128128

129129
successIterations++
130130
if successIterations == 1 {
@@ -308,7 +308,7 @@ func (suite *UnitTestSuite) TestMulti_LongRunningSuccess() {
308308
ctx,
309309
logger,
310310
func(ctx context.Context, fi *FuncInfo) error {
311-
fi.IterationSuccess()
311+
fi.NoteSuccess()
312312

313313
if time.Now().Before(succeedPastTime) {
314314
time.Sleep(1 * time.Second)

internal/verifier/change_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
188188
eventsRead++
189189
}
190190

191-
ri.IterationSuccess()
191+
ri.NoteSuccess()
192192

193193
if eventsRead == 0 {
194194
return nil

0 commit comments

Comments
 (0)