Skip to content

Commit 90aa035

Browse files
committed
add more detail to retries
1 parent 5f9733c commit 90aa035

File tree

7 files changed

+73
-34
lines changed

7 files changed

+73
-34
lines changed

internal/partitions/partitions.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -621,7 +621,7 @@ func getMidIDBounds(
621621

622622
// Append the copied bound to the other mid _id bounds.
623623
midIDBounds = append(midIDBounds, bound)
624-
ri.NoteSuccess()
624+
ri.NoteSuccess("received an ID partition")
625625
}
626626

627627
return cursor.Err()

internal/retry/retry.go

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,14 @@ func (r *Retryer) runRetryLoop(
6969
li := &LoopInfo{
7070
durationLimit: r.retryLimit,
7171
}
72-
funcinfos := lo.RepeatBy(
73-
len(r.callbacks),
74-
func(_ int) *FuncInfo {
72+
funcinfos := lo.Map(
73+
r.callbacks,
74+
func(cb retryCallbackInfo, _ int) *FuncInfo {
7575
return &FuncInfo{
76-
lastResetTime: msync.NewTypedAtomic(startTime),
76+
lastReset: msync.NewTypedAtomic(lastResetInfo{
77+
time: startTime,
78+
}),
79+
description: cb.description,
7780
loopDescription: r.description,
7881
loopInfo: li,
7982
}
@@ -113,17 +116,25 @@ func (r *Retryer) runRetryLoop(
113116
defer ticker.Stop()
114117

115118
for {
116-
lastSuccessTime := funcinfos[i].lastResetTime.Load()
119+
lastReset := funcinfos[i].lastReset.Load()
117120

118121
select {
119122
case <-cbDoneChan:
120123
return
121124
case <-ticker.C:
122-
if funcinfos[i].lastResetTime.Load() == lastSuccessTime {
123-
logger.Warn().
125+
if funcinfos[i].lastReset.Load() == lastReset {
126+
event := logger.Warn().
124127
Str("callbackDescription", curCbInfo.description).
125-
Time("lastSuccessAt", lastSuccessTime).
126-
Str("elapsedTime", reportutils.DurationToHMS(time.Since(lastSuccessTime))).
128+
Time("since", lastReset.time)
129+
130+
if successDesc, hasDesc := lastReset.description.Get(); hasDesc {
131+
event.
132+
Str("successDescription", successDesc).
133+
Uint64("successesSoFar", lastReset.resetsSoFar)
134+
}
135+
136+
event.
137+
Str("elapsedTime", reportutils.DurationToHMS(time.Since(lastReset.time))).
127138
Msg("Operation has not reported success for a while.")
128139
}
129140
}
@@ -164,9 +175,11 @@ func (r *Retryer) runRetryLoop(
164175
}
165176

166177
failedFuncInfo := funcinfos[groupErr.funcNum]
178+
descriptions := failedFuncInfo.GetDescriptions()
179+
cbErr := groupErr.errFromCallback
167180

168181
// Not a transient error? Fail immediately.
169-
if !r.shouldRetryWithSleep(logger, sleepTime, r.callbacks[groupErr.funcNum], groupErr.errFromCallback) {
182+
if !r.shouldRetryWithSleep(logger, sleepTime, descriptions, cbErr) {
170183
return groupErr.errFromCallback
171184
}
172185

@@ -201,7 +214,7 @@ func (r *Retryer) runRetryLoop(
201214
// Set all of the funcs that did *not* fail as having just succeeded.
202215
for i, curInfo := range funcinfos {
203216
if i != groupErr.funcNum {
204-
curInfo.lastResetTime.Store(now)
217+
curInfo.lastReset.Store(lastResetInfo{time: now})
205218
}
206219
}
207220
}
@@ -235,7 +248,7 @@ func (r *Retryer) addDescriptionToEvent(event *zerolog.Event) *zerolog.Event {
235248
func (r *Retryer) shouldRetryWithSleep(
236249
logger *logger.Logger,
237250
sleepTime time.Duration,
238-
cbInfo retryCallbackInfo,
251+
descriptions []string,
239252
err error,
240253
) bool {
241254
if err == nil {
@@ -266,11 +279,7 @@ func (r *Retryer) shouldRetryWithSleep(
266279
),
267280
)
268281

269-
if loopDesc, hasLoopDesc := r.description.Get(); hasLoopDesc {
270-
event.Str("operationDescription", loopDesc)
271-
}
272-
273-
event.Str("callbackDescription", cbInfo.description).
282+
event.Strs("description", descriptions).
274283
Int("error code", util.GetErrorCode(err)).
275284
Err(err)
276285

internal/retry/retry_info.go

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package retry
22

33
import (
4+
"slices"
45
"time"
56

67
"github.com/10gen/migration-verifier/internal/reportutils"
8+
"github.com/10gen/migration-verifier/mslices"
79
"github.com/10gen/migration-verifier/msync"
810
"github.com/10gen/migration-verifier/option"
911
"github.com/rs/zerolog"
@@ -19,11 +21,20 @@ type LoopInfo struct {
1921
durationLimit time.Duration
2022
}
2123

24+
type lastResetInfo struct {
25+
time time.Time
26+
27+
// These go into logs to facilitate debugging.
28+
description option.Option[string]
29+
resetsSoFar uint64
30+
}
31+
2232
type FuncInfo struct {
2333
loopInfo *LoopInfo
2434
description string
2535
loopDescription option.Option[string]
26-
lastResetTime *msync.TypedAtomic[time.Time]
36+
37+
lastReset *msync.TypedAtomic[lastResetInfo]
2738
}
2839

2940
// Log will log a debug-level message for the current Info values and the provided strings.
@@ -69,7 +80,7 @@ func (fi *FuncInfo) GetAttemptNumber() int {
6980
// GetDurationSoFar returns the Info's current duration so far. This duration
7081
// applies to the duration of retrying for transient errors only.
7182
func (fi *FuncInfo) GetDurationSoFar() time.Duration {
72-
return time.Since(fi.lastResetTime.Load())
83+
return time.Since(fi.lastReset.Load().time)
7384
}
7485

7586
// NoteSuccess is used to tell the retry util to reset its measurement
@@ -78,6 +89,21 @@ func (fi *FuncInfo) GetDurationSoFar() time.Duration {
7889
//
7990
// Call this after every successful command in a multi-command callback.
8091
// (It’s useless--but harmless--in a single-command callback.)
81-
func (i *FuncInfo) NoteSuccess() {
82-
i.lastResetTime.Store(time.Now())
92+
func (i *FuncInfo) NoteSuccess(description string) {
93+
totalResets := i.lastReset.Load().resetsSoFar
94+
95+
i.lastReset.Store(lastResetInfo{
96+
description: option.Some(description),
97+
time: time.Now(),
98+
resetsSoFar: 1 + totalResets,
99+
})
100+
}
101+
102+
func (i *FuncInfo) GetDescriptions() []string {
103+
descriptions := mslices.Of(i.description)
104+
if loopDesc, hasDesc := i.loopDescription.Get(); hasDesc {
105+
descriptions = slices.Insert(descriptions, 0, loopDesc)
106+
}
107+
108+
return descriptions
83109
}

internal/retry/retryer_test.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"github.com/10gen/migration-verifier/internal/util"
11+
"github.com/10gen/migration-verifier/option"
1112
"go.mongodb.org/mongo-driver/mongo"
1213
)
1314

@@ -99,8 +100,11 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() {
99100
noSuccessIterations := 0
100101
f1 := func(_ context.Context, ri *FuncInfo) error {
101102
// Artificially advance how much time was taken.
102-
ri.lastResetTime.Store(
103-
ri.lastResetTime.Load().Add(-2 * ri.loopInfo.durationLimit),
103+
ri.lastReset.Store(
104+
lastResetInfo{
105+
time: ri.lastReset.Load().time.Add(-2 * ri.loopInfo.durationLimit),
106+
description: option.Some("artificially rewinding time"),
107+
},
104108
)
105109

106110
noSuccessIterations++
@@ -124,12 +128,13 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() {
124128
successIterations := 0
125129
f2 := func(_ context.Context, ri *FuncInfo) error {
126130
// Artificially advance how much time was taken.
127-
ri.lastResetTime.Store(
128-
ri.lastResetTime.Load().Add(-2 * ri.loopInfo.durationLimit),
131+
ri.lastReset.Store(
132+
lastResetInfo{
133+
time: ri.lastReset.Load().time.Add(-2 * ri.loopInfo.durationLimit),
134+
description: option.Some("artificially rewinding time"),
135+
},
129136
)
130137

131-
ri.NoteSuccess()
132-
133138
successIterations++
134139
if successIterations == 1 {
135140
return someNetworkError
@@ -307,7 +312,7 @@ func (suite *UnitTestSuite) TestMulti_LongRunningSuccess() {
307312

308313
err := retryer.WithCallback(
309314
func(ctx context.Context, fi *FuncInfo) error {
310-
fi.NoteSuccess()
315+
fi.NoteSuccess("success right away")
311316

312317
if time.Now().Before(succeedPastTime) {
313318
time.Sleep(1 * time.Second)

internal/verifier/change_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
299299
eventsRead++
300300
}
301301

302-
ri.NoteSuccess()
302+
ri.NoteSuccess("received a batch of change events")
303303

304304
if eventsRead == 0 {
305305
return nil

internal/verifier/change_stream_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,6 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_WithNamespaces() {
172172
events = append(events, newEvent)
173173
}
174174

175-
suite.T().Logf("Change stream op time (got event? %v): %v", gotEvent, csOpTime)
176175
if csOpTime.After(*changeStreamStopTime) {
177176
break
178177
}

internal/verifier/compare.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks(
324324
)
325325

326326
if err == nil {
327-
state.NoteSuccess()
327+
state.NoteSuccess("opened src find cursor")
328328

329329
err = errors.Wrap(
330330
iterateCursorToChannel(ctx, state, cursor, srcChannel),
@@ -350,7 +350,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks(
350350
)
351351

352352
if err == nil {
353-
state.NoteSuccess()
353+
state.NoteSuccess("opened dst find cursor")
354354

355355
err = errors.Wrap(
356356
iterateCursorToChannel(ctx, state, cursor, dstChannel),
@@ -376,7 +376,7 @@ func iterateCursorToChannel(
376376
writer chan<- bson.Raw,
377377
) error {
378378
for cursor.Next(ctx) {
379-
state.NoteSuccess()
379+
state.NoteSuccess("received a document")
380380
writer <- slices.Clone(cursor.Current)
381381
}
382382

0 commit comments

Comments
 (0)