Skip to content

Commit c199009

Browse files
committed
revert retryer changes
1 parent 490b2d6 commit c199009

File tree

6 files changed

+36
-89
lines changed

6 files changed

+36
-89
lines changed

internal/partitions/partitions.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -621,7 +621,7 @@ func getMidIDBounds(
621621

622622
// Append the copied bound to the other mid _id bounds.
623623
midIDBounds = append(midIDBounds, bound)
624-
ri.NoteSuccess("received an ID partition")
624+
ri.NoteSuccess()
625625
}
626626

627627
return cursor.Err()

internal/retry/retry.go

Lines changed: 20 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,11 @@ func (r *Retryer) runRetryLoop(
6969
li := &LoopInfo{
7070
durationLimit: r.retryLimit,
7171
}
72-
funcinfos := lo.Map(
73-
r.callbacks,
74-
func(cb retryCallbackInfo, _ int) *FuncInfo {
72+
funcinfos := lo.RepeatBy(
73+
len(r.callbacks),
74+
func(_ int) *FuncInfo {
7575
return &FuncInfo{
76-
lastReset: msync.NewTypedAtomic(lastResetInfo{
77-
time: startTime,
78-
}),
79-
description: cb.description,
76+
lastResetTime: msync.NewTypedAtomic(startTime),
8077
loopDescription: r.description,
8178
loopInfo: li,
8279
}
@@ -116,25 +113,17 @@ func (r *Retryer) runRetryLoop(
116113
defer ticker.Stop()
117114

118115
for {
119-
lastReset := funcinfos[i].lastReset.Load()
116+
lastSuccessTime := funcinfos[i].lastResetTime.Load()
120117

121118
select {
122119
case <-cbDoneChan:
123120
return
124121
case <-ticker.C:
125-
if funcinfos[i].lastReset.Load() == lastReset {
126-
event := logger.Warn().
122+
if funcinfos[i].lastResetTime.Load() == lastSuccessTime {
123+
logger.Warn().
127124
Str("callbackDescription", curCbInfo.description).
128-
Time("noSuccessSince", lastReset.time).
129-
Uint64("successesSoFar", lastReset.resetsSoFar)
130-
131-
if successDesc, hasDesc := lastReset.description.Get(); hasDesc {
132-
event.
133-
Str("lastSuccessDescription", successDesc)
134-
}
135-
136-
event.
137-
Str("elapsedTime", reportutils.DurationToHMS(time.Since(lastReset.time))).
125+
Time("lastSuccessAt", lastSuccessTime).
126+
Str("elapsedTime", reportutils.DurationToHMS(time.Since(lastSuccessTime))).
138127
Msg("Operation has not reported success for a while.")
139128
}
140129
}
@@ -175,11 +164,9 @@ func (r *Retryer) runRetryLoop(
175164
}
176165

177166
failedFuncInfo := funcinfos[groupErr.funcNum]
178-
descriptions := failedFuncInfo.GetDescriptions()
179-
cbErr := groupErr.errFromCallback
180167

181168
// Not a transient error? Fail immediately.
182-
if !r.shouldRetryWithSleep(logger, sleepTime, descriptions, cbErr) {
169+
if !r.shouldRetryWithSleep(logger, sleepTime, *failedFuncInfo, groupErr.errFromCallback) {
183170
return groupErr.errFromCallback
184171
}
185172

@@ -214,7 +201,7 @@ func (r *Retryer) runRetryLoop(
214201
// Set all of the funcs that did *not* fail as having just succeeded.
215202
for i, curInfo := range funcinfos {
216203
if i != groupErr.funcNum {
217-
curInfo.lastReset.Store(lastResetInfo{time: now})
204+
curInfo.lastResetTime.Store(now)
218205
}
219206
}
220207
}
@@ -248,7 +235,7 @@ func (r *Retryer) addDescriptionToEvent(event *zerolog.Event) *zerolog.Event {
248235
func (r *Retryer) shouldRetryWithSleep(
249236
logger *logger.Logger,
250237
sleepTime time.Duration,
251-
descriptions []string,
238+
funcinfo FuncInfo,
252239
err error,
253240
) bool {
254241
if err == nil {
@@ -263,35 +250,26 @@ func (r *Retryer) shouldRetryWithSleep(
263250
)
264251

265252
event := logger.WithLevel(
266-
lo.Ternary(
267-
// If it’s transient, surface it as info.
268-
isTransient,
269-
zerolog.InfoLevel,
270-
271-
lo.Ternary(
272-
// Context cancellation is unimportant, so debug.
273-
errors.Is(err, context.Canceled),
274-
zerolog.DebugLevel,
275-
276-
// Other non-retryables are serious, so warn.
277-
zerolog.WarnLevel,
278-
),
279-
),
253+
lo.Ternary(isTransient, zerolog.InfoLevel, zerolog.WarnLevel),
280254
)
281255

282-
event.Strs("description", descriptions).
256+
if loopDesc, hasLoopDesc := r.description.Get(); hasLoopDesc {
257+
event.Str("operationDescription", loopDesc)
258+
}
259+
260+
event.Str("callbackDescription", funcinfo.description).
283261
Int("error code", util.GetErrorCode(err)).
284262
Err(err)
285263

286264
if isTransient {
287265
event.
288266
Stringer("delay", sleepTime).
289-
Msg("Got retryable error. Pausing, then will retry.")
267+
Msg("Pausing before retrying after transient error.")
290268

291269
return true
292270
}
293271

294-
event.Msg("Non-retryable error occurred.")
272+
event.Msg("Non-transient error occurred.")
295273

296274
return false
297275
}

internal/retry/retry_info.go

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package retry
22

33
import (
4-
"slices"
54
"time"
65

76
"github.com/10gen/migration-verifier/internal/reportutils"
8-
"github.com/10gen/migration-verifier/mslices"
97
"github.com/10gen/migration-verifier/msync"
108
"github.com/10gen/migration-verifier/option"
119
"github.com/rs/zerolog"
@@ -21,20 +19,11 @@ type LoopInfo struct {
2119
durationLimit time.Duration
2220
}
2321

24-
type lastResetInfo struct {
25-
time time.Time
26-
27-
// These go into logs to facilitate debugging.
28-
description option.Option[string]
29-
resetsSoFar uint64
30-
}
31-
3222
type FuncInfo struct {
3323
loopInfo *LoopInfo
3424
description string
3525
loopDescription option.Option[string]
36-
37-
lastReset *msync.TypedAtomic[lastResetInfo]
26+
lastResetTime *msync.TypedAtomic[time.Time]
3827
}
3928

4029
// Log will log a debug-level message for the current Info values and the provided strings.
@@ -80,7 +69,7 @@ func (fi *FuncInfo) GetAttemptNumber() int {
8069
// GetDurationSoFar returns the Info's current duration so far. This duration
8170
// applies to the duration of retrying for transient errors only.
8271
func (fi *FuncInfo) GetDurationSoFar() time.Duration {
83-
return time.Since(fi.lastReset.Load().time)
72+
return time.Since(fi.lastResetTime.Load())
8473
}
8574

8675
// NoteSuccess is used to tell the retry util to reset its measurement
@@ -89,21 +78,6 @@ func (fi *FuncInfo) GetDurationSoFar() time.Duration {
8978
//
9079
// Call this after every successful command in a multi-command callback.
9180
// (It’s useless--but harmless--in a single-command callback.)
92-
func (i *FuncInfo) NoteSuccess(description string) {
93-
totalResets := i.lastReset.Load().resetsSoFar
94-
95-
i.lastReset.Store(lastResetInfo{
96-
description: option.Some(description),
97-
time: time.Now(),
98-
resetsSoFar: 1 + totalResets,
99-
})
100-
}
101-
102-
func (i *FuncInfo) GetDescriptions() []string {
103-
descriptions := mslices.Of(i.description)
104-
if loopDesc, hasDesc := i.loopDescription.Get(); hasDesc {
105-
descriptions = slices.Insert(descriptions, 0, loopDesc)
106-
}
107-
108-
return descriptions
81+
func (i *FuncInfo) NoteSuccess() {
82+
i.lastResetTime.Store(time.Now())
10983
}

internal/retry/retryer_test.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"time"
99

1010
"github.com/10gen/migration-verifier/internal/util"
11-
"github.com/10gen/migration-verifier/option"
1211
"go.mongodb.org/mongo-driver/mongo"
1312
)
1413

@@ -100,11 +99,8 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() {
10099
noSuccessIterations := 0
101100
f1 := func(_ context.Context, ri *FuncInfo) error {
102101
// Artificially advance how much time was taken.
103-
ri.lastReset.Store(
104-
lastResetInfo{
105-
time: ri.lastReset.Load().time.Add(-2 * ri.loopInfo.durationLimit),
106-
description: option.Some("artificially rewinding time"),
107-
},
102+
ri.lastResetTime.Store(
103+
ri.lastResetTime.Load().Add(-2 * ri.loopInfo.durationLimit),
108104
)
109105

110106
noSuccessIterations++
@@ -128,13 +124,12 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() {
128124
successIterations := 0
129125
f2 := func(_ context.Context, ri *FuncInfo) error {
130126
// Artificially advance how much time was taken.
131-
ri.lastReset.Store(
132-
lastResetInfo{
133-
time: ri.lastReset.Load().time.Add(-2 * ri.loopInfo.durationLimit),
134-
description: option.Some("artificially rewinding time"),
135-
},
127+
ri.lastResetTime.Store(
128+
ri.lastResetTime.Load().Add(-2 * ri.loopInfo.durationLimit),
136129
)
137130

131+
ri.NoteSuccess()
132+
138133
successIterations++
139134
if successIterations == 1 {
140135
return someNetworkError
@@ -312,7 +307,7 @@ func (suite *UnitTestSuite) TestMulti_LongRunningSuccess() {
312307

313308
err := retryer.WithCallback(
314309
func(ctx context.Context, fi *FuncInfo) error {
315-
fi.NoteSuccess("success right away")
310+
fi.NoteSuccess()
316311

317312
if time.Now().Before(succeedPastTime) {
318313
time.Sleep(1 * time.Second)

internal/verifier/change_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
298298
eventsRead++
299299
}
300300

301-
ri.NoteSuccess("received a batch of change events")
301+
ri.NoteSuccess()
302302

303303
if eventsRead == 0 {
304304
return nil

internal/verifier/compare.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks(
324324
)
325325

326326
if err == nil {
327-
state.NoteSuccess("opened src find cursor")
327+
state.NoteSuccess()
328328

329329
err = errors.Wrap(
330330
iterateCursorToChannel(ctx, state, cursor, srcChannel),
@@ -350,7 +350,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks(
350350
)
351351

352352
if err == nil {
353-
state.NoteSuccess("opened dst find cursor")
353+
state.NoteSuccess()
354354

355355
err = errors.Wrap(
356356
iterateCursorToChannel(ctx, state, cursor, dstChannel),
@@ -378,7 +378,7 @@ func iterateCursorToChannel(
378378
defer close(writer)
379379

380380
for cursor.Next(ctx) {
381-
state.NoteSuccess("received a document")
381+
state.NoteSuccess()
382382

383383
select {
384384
case <-ctx.Done():

0 commit comments

Comments
 (0)