Skip to content

Commit 8c1af46

Browse files
committed
refactored appendOplogNote (& al.)
1 parent 75e49b0 commit 8c1af46

File tree

2 files changed

+98
-77
lines changed

2 files changed

+98
-77
lines changed

internal/verifier/change_stream.go

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -126,60 +126,60 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
126126
return []bson.D{stage}
127127
}
128128

129-
func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) {
130-
defer cs.Close(ctx)
129+
func (verifier *Verifier) readAndHandleOneChangeEventBatch(ctx context.Context, cs *mongo.ChangeStream) error {
130+
eventsRead := 0
131+
var changeEventBatch []ParsedEvent
131132

132-
var lastPersistedTime time.Time
133+
for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 {
134+
gotEvent := cs.TryNext(ctx)
133135

134-
persistResumeTokenIfNeeded := func() error {
135-
if time.Since(lastPersistedTime) <= minChangeStreamPersistInterval {
136-
return nil
136+
if cs.Err() != nil {
137+
return errors.Wrap(cs.Err(), "change stream iteration failed")
137138
}
138139

139-
err := verifier.persistChangeStreamResumeToken(ctx, cs)
140-
if err == nil {
141-
lastPersistedTime = time.Now()
140+
if !gotEvent {
141+
break
142142
}
143143

144-
return err
145-
}
144+
if changeEventBatch == nil {
145+
changeEventBatch = make([]ParsedEvent, cs.RemainingBatchLength()+1)
146+
}
146147

147-
readAndHandleOneChangeEventBatch := func() error {
148-
eventsRead := 0
149-
var changeEventBatch []ParsedEvent
148+
if err := cs.Decode(&changeEventBatch[eventsRead]); err != nil {
149+
return errors.Wrap(err, "failed to decode change event")
150+
}
150151

151-
for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 {
152-
gotEvent := cs.TryNext(ctx)
152+
eventsRead++
153+
}
153154

154-
if cs.Err() != nil {
155-
return errors.Wrap(cs.Err(), "change stream iteration failed")
156-
}
155+
if eventsRead == 0 {
156+
return nil
157+
}
157158

158-
if !gotEvent {
159-
break
160-
}
159+
err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch)
160+
if err != nil {
161+
return errors.Wrap(err, "failed to handle change events")
162+
}
161163

162-
if changeEventBatch == nil {
163-
changeEventBatch = make([]ParsedEvent, cs.RemainingBatchLength()+1)
164-
}
164+
return nil
165+
}
165166

166-
if err := cs.Decode(&changeEventBatch[eventsRead]); err != nil {
167-
return errors.Wrap(err, "failed to decode change event")
168-
}
167+
func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) {
168+
defer cs.Close(ctx)
169169

170-
eventsRead++
171-
}
170+
var lastPersistedTime time.Time
172171

173-
if eventsRead == 0 {
172+
persistResumeTokenIfNeeded := func() error {
173+
if time.Since(lastPersistedTime) <= minChangeStreamPersistInterval {
174174
return nil
175175
}
176176

177-
err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch)
178-
if err != nil {
179-
return errors.Wrap(err, "failed to handle change events")
177+
err := verifier.persistChangeStreamResumeToken(ctx, cs)
178+
if err == nil {
179+
lastPersistedTime = time.Now()
180180
}
181181

182-
return nil
182+
return err
183183
}
184184

185185
for {
@@ -224,15 +224,15 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
224224
break
225225
}
226226

227-
err = readAndHandleOneChangeEventBatch()
227+
err = verifier.readAndHandleOneChangeEventBatch(ctx, cs)
228228

229229
if err != nil {
230230
break
231231
}
232232
}
233233

234234
default:
235-
err = readAndHandleOneChangeEventBatch()
235+
err = verifier.readAndHandleOneChangeEventBatch(ctx, cs)
236236

237237
if err == nil {
238238
err = persistResumeTokenIfNeeded()

internal/verifier/clustertime.go

Lines changed: 61 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ const opTimeKeyInServerResponse = "operationTime"
2323
func GetClusterTime(
2424
ctx context.Context,
2525
logger *logger.Logger,
26-
client *mongo.Client, // could change to type=any if we need
26+
client *mongo.Client,
2727
) (primitive.Timestamp, error) {
2828
retryer := retry.New(retry.DefaultDurationLimit)
2929

@@ -39,7 +39,7 @@ func GetClusterTime(
3939
logger,
4040
func(_ *retry.Info) error {
4141
var err error
42-
optime, err = runAppendOplogNote(ctx, client, true)
42+
optime, err = syncClusterTimeAcrossShards(ctx, client)
4343
if err != nil {
4444
err = fmt.Errorf("%w: %w", retry.CustomTransientErr, err)
4545
}
@@ -51,16 +51,17 @@ func GetClusterTime(
5151
return primitive.Timestamp{}, err
5252
}
5353

54-
// OPTIMIZATION: We now append an oplog entry--this time for real!--to
55-
// cause any lagging shards to output their events.
54+
// OPTIMIZATION FOR SHARDED CLUSTERS: We append another oplog entry to
55+
// bring all shards to a cluster time that’s *after* the optime that we’ll
56+
// return. That way any events *at*
5657
//
5758
// Since this is just an optimization, failures here are nonfatal.
5859
err = retryer.RunForTransientErrorsOnly(
5960
ctx,
6061
logger,
6162
func(_ *retry.Info) error {
6263
var err error
63-
optime, err = runAppendOplogNote(ctx, client, false)
64+
optime, err = syncClusterTimeAcrossShards(ctx, client)
6465
if err != nil {
6566
err = fmt.Errorf("%w: %w", retry.CustomTransientErr, err)
6667
}
@@ -76,55 +77,75 @@ func GetClusterTime(
7677
return optime, nil
7778
}
7879

79-
func runAppendOplogNote(
80+
// Use this when we just need the correct cluster time without
81+
// actually changing any shards’ oplogs.
82+
func fetchClusterTime(
8083
ctx context.Context,
8184
client *mongo.Client,
82-
doomed bool,
8385
) (primitive.Timestamp, error) {
84-
// We don’t need to write to any shards’ oplogs; we just
85-
// need to fetch
86-
cmd := bson.D{
87-
{"appendOplogNote", 1},
88-
{"data", bson.D{
89-
{"migration-verifier", "expect StaleClusterTime error"},
90-
}},
91-
}
86+
cmd, rawResponse, err := runAppendOplogNote(
87+
ctx,
88+
client,
89+
"expect StaleClusterTime error",
90+
bson.E{"maxClusterTime", primitive.Timestamp{1, 0}},
91+
)
9292

93-
if doomed {
94-
cmd = append(cmd, bson.E{"maxClusterTime", primitive.Timestamp{1, 0}})
93+
// We expect an error here; if we didn't get one then something is
94+
// amiss on the server.
95+
if err == nil {
96+
return primitive.Timestamp{}, errors.Errorf("server request unexpectedly succeeded: %v", cmd)
9597
}
9698

97-
resp := client.Database("admin").RunCommand(ctx, cmd)
98-
99-
err := resp.Err()
100-
101-
if doomed {
102-
// We expect an error here; if we didn't get one then something is
103-
// amiss on the server.
104-
if err == nil {
105-
return primitive.Timestamp{}, errors.Errorf("server request unexpectedly succeeded: %v", cmd)
106-
}
107-
108-
if !util.IsStaleClusterTimeError(err) {
109-
return primitive.Timestamp{}, errors.Errorf(
110-
"unexpected error (expected StaleClusterTime) from request (%v): %v",
111-
err,
112-
cmd,
113-
)
114-
}
115-
} else if err != nil {
116-
return primitive.Timestamp{}, errors.Wrapf(
99+
if !util.IsStaleClusterTimeError(err) {
100+
return primitive.Timestamp{}, errors.Wrap(
117101
err,
118-
"command (%v) failed unexpectedly",
119-
cmd,
102+
"unexpected error (expected StaleClusterTime) from request",
120103
)
121104
}
122105

123-
rawResponse, _ := resp.Raw()
106+
return getOpTimeFromRawResponse(rawResponse)
107+
}
108+
109+
func syncClusterTimeAcrossShards(
110+
ctx context.Context,
111+
client *mongo.Client,
112+
) (primitive.Timestamp, error) {
113+
_, rawResponse, err := runAppendOplogNote(ctx, client, "syncing cluster time")
114+
115+
if err != nil {
116+
return primitive.Timestamp{}, err
117+
}
124118

125119
return getOpTimeFromRawResponse(rawResponse)
126120
}
127121

122+
func runAppendOplogNote(
123+
ctx context.Context,
124+
client *mongo.Client,
125+
note string,
126+
extraPieces ...bson.E,
127+
) (bson.D, bson.Raw, error) {
128+
cmd := append(
129+
bson.D{
130+
{"appendOplogNote", 1},
131+
{"data", bson.D{
132+
{"migration-verifier", note},
133+
}},
134+
},
135+
extraPieces...,
136+
)
137+
138+
resp := client.Database("admin").RunCommand(ctx, cmd)
139+
140+
raw, err := resp.Raw()
141+
142+
return cmd, raw, errors.Wrapf(
143+
err,
144+
"command (%v) failed unexpectedly",
145+
cmd,
146+
)
147+
}
148+
128149
func getOpTimeFromRawResponse(rawResponse bson.Raw) (primitive.Timestamp, error) {
129150
// Get the `operationTime` from the response and return it.
130151
var optime primitive.Timestamp

0 commit comments

Comments
 (0)