Skip to content

Commit 124aee2

Browse files
craig[bot]kev-cao
andcommitted
Merge #148551
148551: util: support max duration in retry loop r=stevendanna a=kev-cao This commit teaches `Retry` to also respect a specified maximum duration. Once the maximum duration has elapsed since the `Start` of a retry loop, no more retry attempts will be made. Informs: #148028 Release note: None Co-authored-by: Kevin Cao <[email protected]>
2 parents 71bcba2 + 5a07ac9 commit 124aee2

File tree

6 files changed

+499
-71
lines changed

6 files changed

+499
-71
lines changed

pkg/kv/kvserver/asim/state/state.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,10 @@ func (m *ManualSimClock) Since(t time.Time) time.Duration {
291291
return m.Now().Sub(t)
292292
}
293293

294+
func (m *ManualSimClock) Until(t time.Time) time.Duration {
295+
return t.Sub(m.Now())
296+
}
297+
294298
func (m *ManualSimClock) NewTimer() timeutil.TimerI {
295299
panic("unimplemented")
296300
}

pkg/util/retry/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ go_test(
2525
],
2626
embed = [":retry"],
2727
deps = [
28+
"//pkg/testutils/skip",
2829
"//pkg/util/log",
30+
"//pkg/util/timeutil",
2931
"@com_github_cockroachdb_errors//:errors",
3032
"@com_github_stretchr_testify//require",
3133
],

pkg/util/retry/retry.go

Lines changed: 128 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,22 @@ type Options struct {
2121
InitialBackoff time.Duration // Default retry backoff interval
2222
MaxBackoff time.Duration // Maximum retry backoff interval
2323
Multiplier float64 // Default backoff constant
24-
// Maximum number of retries; attempts = MaxRetries + 1. (0 for infinite)
25-
MaxRetries int
26-
RandomizationFactor float64 // Randomize the backoff interval by constant
24+
// Randomize the backoff interval by constant. Set to -1 to disable.
25+
RandomizationFactor float64
2726
Closer <-chan struct{} // Optionally end retry loop channel close
27+
// Maximum number of retries; attempts = MaxRetries + 1. (0 for infinite)
28+
MaxRetries int
29+
// MaxDuration is the maximum duration for which the retry loop will make
30+
// attempts. Once the deadline has elapsed, the loop will stop attempting
31+
// retries.
32+
// The loop will run for at least one iteration. (0 for infinite)
33+
MaxDuration time.Duration
34+
// PreemptivelyCancel indicates whether the retry loop should cancel itself if
35+
// it determines that the next backoff would exceed the MaxDuration.
36+
PreemptivelyCancel bool
37+
// Clock is used to control the time source for the retry loop. Intended for
38+
// testing purposes. Should be nil in production code.
39+
Clock timeutil.TimeSource
2840
}
2941

3042
// Retry implements the public methods necessary to control an exponential-
@@ -34,6 +46,13 @@ type Retry struct {
3446
ctx context.Context
3547
currentAttempt int
3648
isReset bool
49+
deadline time.Time // Deadline for the retry loop if MaxDuration is set.
50+
51+
// Testing hook that is called when the retry loop is waiting for the backoff.
52+
// If no max duration is set, deadline will be zero. Passes in the actual wait
53+
// time between each retry attempt (accounting for max duration).
54+
// Set here instead of options to allow Options to be compared.
55+
backingOffHook func(backoff time.Duration)
3756
}
3857

3958
// Start returns a new Retry initialized to some default values. The Retry can
@@ -55,10 +74,15 @@ func StartWithCtx(ctx context.Context, opts Options) Retry {
5574
}
5675
if opts.RandomizationFactor == 0 {
5776
opts.RandomizationFactor = 0.15
77+
} else if opts.RandomizationFactor < 0 {
78+
opts.RandomizationFactor = 0 // Disable randomization.
5879
}
5980
if opts.Multiplier == 0 {
6081
opts.Multiplier = 2
6182
}
83+
if opts.Clock == nil {
84+
opts.Clock = timeutil.DefaultTimeSource{}
85+
}
6286

6387
var r Retry
6488
r.opts = opts
@@ -87,8 +111,14 @@ func (r *Retry) Reset() {
87111
func (r *Retry) mustReset() {
88112
r.currentAttempt = 0
89113
r.isReset = true
114+
if r.opts.MaxDuration != 0 {
115+
r.deadline = r.opts.Clock.Now().Add(r.opts.MaxDuration)
116+
} else {
117+
r.deadline = time.Time{}
118+
}
90119
}
91120

121+
// retryIn returns the duration to wait before the next retry attempt.
92122
func (r Retry) retryIn() time.Duration {
93123
backoff := float64(r.opts.InitialBackoff) * math.Pow(r.opts.Multiplier, float64(r.currentAttempt))
94124
if maxBackoff := float64(r.opts.MaxBackoff); backoff > maxBackoff {
@@ -102,6 +132,23 @@ func (r Retry) retryIn() time.Duration {
102132
return time.Duration(backoff - delta + rand.Float64()*(2*delta+1))
103133
}
104134

135+
// calcDurationScopedBackoff calculates the duration to wait before the next
136+
// attempt, taking into account the MaxDuration option. It returns the computed
137+
// backoff duration, the actual wait duration (if the backoff exceeds the max
138+
// duration), and a boolean indicating whether the retry should be attempted.
139+
func (r Retry) calcDurationScopedBackoff() (time.Duration, time.Duration, bool) {
140+
backoff := r.retryIn()
141+
actualWait := backoff
142+
shouldAttempt := true
143+
if r.opts.MaxDuration != 0 && !r.opts.Clock.Now().Add(backoff).Before(r.deadline) {
144+
// If the backoff would exceed the deadline, we return the remaining time
145+
// until the deadline instead.
146+
shouldAttempt = false
147+
actualWait = max(r.deadline.Sub(r.opts.Clock.Now()), 0)
148+
}
149+
return backoff, actualWait, shouldAttempt
150+
}
151+
105152
// Next returns whether the retry loop should continue, and blocks for the
106153
// appropriate length of time before yielding back to the caller.
107154
//
@@ -115,50 +162,107 @@ func (r *Retry) Next() bool {
115162
r.isReset = false
116163
return true
117164
}
165+
if r.retryLimitReached() {
166+
return false
167+
}
118168

119-
if r.opts.MaxRetries > 0 && r.currentAttempt >= r.opts.MaxRetries {
169+
backoff, actualWait, shouldAttempt := r.calcDurationScopedBackoff()
170+
171+
if !shouldAttempt && r.opts.PreemptivelyCancel {
172+
log.VEventf(
173+
r.ctx, 2 /* level */, "preemptively canceling retry loop as backoff would exceed MaxDuration",
174+
)
120175
return false
121176
}
122177

123-
// Wait before retry.
124-
d := r.retryIn()
125-
if d > 0 {
126-
log.VEventfDepth(r.ctx, 1 /* depth */, 2 /* level */, "will retry after %s", d)
178+
timer := r.opts.Clock.NewTimer()
179+
timer.Reset(actualWait)
180+
timerCh := timer.Ch()
181+
defer timer.Stop()
182+
183+
log.VEventfDepth(r.ctx, 1 /* depth */, 2 /* level */, "will retry after %s", backoff)
184+
185+
if r.backingOffHook != nil {
186+
r.backingOffHook(actualWait)
127187
}
188+
128189
select {
129-
case <-time.After(d):
130-
r.currentAttempt++
131-
return true
132190
case <-r.opts.Closer:
133191
return false
134192
case <-r.ctx.Done():
135193
return false
194+
case <-timerCh:
195+
if shouldAttempt {
196+
r.currentAttempt++
197+
}
198+
return shouldAttempt
136199
}
137200
}
138201

139-
// closedC is returned from Retry.NextCh whenever a retry
140-
// can begin immediately.
141-
var closedC = func() chan time.Time {
142-
c := make(chan time.Time)
202+
func (r *Retry) retryLimitReached() bool {
203+
return (r.opts.MaxRetries > 0 && r.currentAttempt >= r.opts.MaxRetries) ||
204+
(r.opts.MaxDuration > 0 && !r.opts.Clock.Now().Before(r.deadline))
205+
}
206+
207+
// immediateCh creates a channel that is immediately written to with the
208+
// provided value and then closed.
209+
func immediateCh(v bool) chan bool {
210+
c := make(chan bool, 1)
211+
c <- v
143212
close(c)
144213
return c
145-
}()
214+
}
215+
216+
var closedCh = immediateCh(false)
146217

147218
// NextCh returns a channel which will receive when the next retry
148-
// interval has expired.
149-
func (r *Retry) NextCh() <-chan time.Time {
219+
// interval has expired. If the received value is true, it indicates a retry
220+
// should be made. If the received value is false, it indicates that no retry
221+
// should be made.
222+
// Note: This does not respect the Closer or context cancellation and it is the
223+
// caller's responsibility to manage the lifecycle.
224+
func (r *Retry) NextCh() <-chan bool {
150225
if r.isReset {
151226
r.isReset = false
152-
return closedC
227+
return immediateCh(true)
228+
}
229+
if r.retryLimitReached() {
230+
return closedCh
231+
}
232+
233+
backoff, actualWait, shouldAttempt := r.calcDurationScopedBackoff()
234+
235+
if !shouldAttempt && r.opts.PreemptivelyCancel {
236+
log.VEventf(
237+
r.ctx, 2 /* level */, "preemptively canceling retry loop as backoff would exceed MaxDuration",
238+
)
239+
return closedCh
240+
}
241+
242+
timer := r.opts.Clock.NewTimer()
243+
timer.Reset(actualWait)
244+
timerCh := timer.Ch()
245+
246+
if r.backingOffHook != nil {
247+
r.backingOffHook(actualWait)
153248
}
154-
r.currentAttempt++
155-
if r.opts.MaxRetries > 0 && r.currentAttempt > r.opts.MaxRetries {
156-
return nil
249+
250+
log.VEventfDepth(r.ctx, 1 /* depth */, 2 /* level */, "will retry after %s", backoff)
251+
252+
ch := make(chan bool, 1)
253+
if shouldAttempt {
254+
r.currentAttempt++
157255
}
158-
return time.After(r.retryIn())
256+
go func() {
257+
defer timer.Stop()
258+
<-timerCh
259+
ch <- shouldAttempt
260+
}()
261+
262+
return ch
159263
}
160264

161-
// CurrentAttempt returns the current attempt
265+
// CurrentAttempt returns the current attempt (0-based index)
162266
func (r *Retry) CurrentAttempt() int {
163267
return r.currentAttempt
164268
}

0 commit comments

Comments
 (0)