Skip to content

Commit a58b996

Browse files
committed
Merge branch 'main' into rep-5219-batch-attempt-2
2 parents 91b1896 + 5639f59 commit a58b996

File tree

7 files changed

+457
-32
lines changed

7 files changed

+457
-32
lines changed

internal/verifier/change_stream.go

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type ParsedEvent struct {
2525
}
2626

2727
func (pe *ParsedEvent) String() string {
28-
return fmt.Sprintf("{ OpType: %s, namespace: %s, docID: %v}", pe.OpType, pe.Ns, pe.DocKey.ID)
28+
return fmt.Sprintf("{OpType: %s, namespace: %s, docID: %v, clusterTime: %v}", pe.OpType, pe.Ns, pe.DocKey.ID, pe.ClusterTime)
2929
}
3030

3131
// DocKey is a deserialized form for the ChangeEvent documentKey field. We currently only care about
@@ -47,7 +47,7 @@ type UnknownEventError struct {
4747
}
4848

4949
func (uee UnknownEventError) Error() string {
50-
return fmt.Sprintf("Unknown event type: %#q", uee.Event.OpType)
50+
return fmt.Sprintf("Received event with unknown optype: %+v", uee.Event)
5151
}
5252

5353
// HandleChangeStreamEvent performs the necessary work for change stream events that occur during
@@ -88,8 +88,6 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
8888
}
8989

9090
func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) {
91-
var changeEvent ParsedEvent
92-
9391
var lastPersistedTime time.Time
9492

9593
persistResumeTokenIfNeeded := func() error {
@@ -109,10 +107,13 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
109107
eventsRead := 0
110108
for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 {
111109
gotEvent := cs.TryNext(ctx)
110+
112111
if !gotEvent {
113112
break
114113
}
115114

115+
var changeEvent ParsedEvent
116+
116117
if err := cs.Decode(&changeEvent); err != nil {
117118
return false, errors.Wrap(err, "failed to decode change event")
118119
}
@@ -147,24 +148,22 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
147148
// source writes are ended. This means we should exit rather than continue
148149
// reading the change stream since there should be no more events.
149150
case <-verifier.changeStreamEnderChan:
150-
var gotEvent bool
151+
verifier.logger.Debug().
152+
Msg("Change stream thread received shutdown request.")
151153

152154
changeStreamEnded = true
153155

154156
// Read all change events until the source reports no events.
155157
// (i.e., the `getMore` call returns empty)
156158
for {
159+
var gotEvent bool
157160
gotEvent, err = readAndHandleOneChangeEventBatch()
158161

159162
if !gotEvent || err != nil {
160163
break
161164
}
162165
}
163166

164-
if err != nil {
165-
break
166-
}
167-
168167
default:
169168
_, err = readAndHandleOneChangeEventBatch()
170169
}
@@ -174,7 +173,15 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
174173
}
175174

176175
if err != nil && !errors.Is(err, context.Canceled) {
176+
verifier.logger.Debug().
177+
Err(err).
178+
Msg("Sending change stream error.")
179+
177180
verifier.changeStreamErrChan <- err
181+
182+
if !changeStreamEnded {
183+
break
184+
}
178185
}
179186

180187
if changeStreamEnded {
@@ -187,11 +194,18 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
187194
// since we have started Recheck, we must signal that we have
188195
// finished the change stream changes so that Recheck can continue.
189196
verifier.changeStreamDoneChan <- struct{}{}
190-
// since the changeStream is exhausted, we now return
191-
verifier.logger.Debug().Msg("Change stream is done")
192-
return
197+
break
193198
}
194199
}
200+
201+
infoLog := verifier.logger.Info()
202+
if verifier.lastChangeEventTime == nil {
203+
infoLog = infoLog.Str("changeStreamStopTime", "none")
204+
} else {
205+
infoLog = infoLog.Interface("changeStreamStopTime", *verifier.lastChangeEventTime)
206+
}
207+
208+
infoLog.Msg("Change stream is done.")
195209
}
196210

197211
// StartChangeStream starts the change stream.

internal/verifier/check.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,12 @@ func (verifier *Verifier) waitForChangeStream() error {
5151
verifier.changeStreamEnderChan <- struct{}{}
5252
select {
5353
case err := <-verifier.changeStreamErrChan:
54+
verifier.logger.Warn().Err(err).
55+
Msg("Received error from change stream.")
5456
return err
5557
case <-verifier.changeStreamDoneChan:
58+
verifier.logger.Debug().
59+
Msg("Received completion signal from change stream.")
5660
break
5761
}
5862
}
@@ -149,6 +153,17 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
149153
if err != nil {
150154
return err
151155
}
156+
157+
err = verifier.doInMetaTransaction(
158+
ctx,
159+
func(ctx context.Context, sCtx mongo.SessionContext) error {
160+
return verifier.ResetInProgressTasks(sCtx)
161+
},
162+
)
163+
if err != nil {
164+
return errors.Wrap(err, "failed to reset any in-progress tasks")
165+
}
166+
152167
verifier.logger.Debug().Msg("Starting Check")
153168

154169
verifier.phase = Check
@@ -188,16 +203,31 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
188203
return err
189204
}
190205
// we will only coordinate when the number of channels is exactly 2.
191-
// * Channel 0 signals a generation is done
192-
// * Channel 1 signals to check to continue the next generation
206+
// * Channel 0 informs the test of a generation bounary.
207+
// * Block until the test (via channel 1) tells us to do the
208+
// next generation.
193209
if len(testChan) == 2 {
210+
211+
verifier.logger.Debug().
212+
Msg("Telling test about generation boundary.")
194213
testChan[0] <- struct{}{}
214+
215+
verifier.logger.Debug().
216+
Msg("Awaiting test's signal to continue.")
195217
<-testChan[1]
218+
219+
verifier.logger.Debug().
220+
Msg("Received test's signal. Continuing.")
196221
}
197222
time.Sleep(verifier.generationPauseDelayMillis * time.Millisecond)
198223
verifier.mux.Lock()
199224
if verifier.lastGeneration {
200225
verifier.mux.Unlock()
226+
227+
verifier.logger.Debug().
228+
Int("generation", verifier.generation).
229+
Msg("Final generation done.")
230+
201231
return nil
202232
}
203233
// TODO: wait here until writesOff is hit or enough time has passed, so we don't spin

internal/verifier/migration_verifier_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1581,6 +1581,10 @@ func (suite *MultiDataVersionTestSuite) TestVerifierWithFilter() {
15811581

15821582
// Turn writes off.
15831583
verifier.WritesOff(ctx)
1584+
1585+
// Tell CheckDriver to do one more pass. This should terminate the change stream.
1586+
checkContinueChan <- struct{}{}
1587+
<-checkDoneChan
15841588
}
15851589

15861590
func (suite *MultiDataVersionTestSuite) TestPartitionWithFilter() {

internal/verifier/reset.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package verifier
2+
3+
import (
4+
"github.com/pkg/errors"
5+
"go.mongodb.org/mongo-driver/bson"
6+
"go.mongodb.org/mongo-driver/mongo"
7+
)
8+
9+
var defaultTaskUpdate = bson.M{
10+
"$set": bson.M{"status": verificationTaskAdded},
11+
"$unset": bson.M{"begin_time": 1},
12+
}
13+
14+
func (verifier *Verifier) ResetInProgressTasks(ctx mongo.SessionContext) error {
15+
didReset, err := verifier.handleIncompletePrimary(ctx)
16+
17+
if err == nil {
18+
if didReset {
19+
return nil
20+
}
21+
22+
err = verifier.resetCollectionTasksIfNeeded(ctx)
23+
}
24+
25+
if err == nil {
26+
err = verifier.resetPartitionTasksIfNeeded(ctx)
27+
}
28+
29+
return err
30+
}
31+
32+
func (verifier *Verifier) handleIncompletePrimary(ctx mongo.SessionContext) (bool, error) {
33+
taskColl := verifier.verificationTaskCollection()
34+
35+
cursor, err := taskColl.Find(
36+
ctx,
37+
bson.M{
38+
"type": verificationTaskPrimary,
39+
"status": bson.M{"$ne": verificationTaskCompleted},
40+
},
41+
)
42+
if err != nil {
43+
return false, errors.Wrapf(err, "failed to fetch incomplete %#q task", verificationTaskPrimary)
44+
}
45+
46+
var incompletePrimaries []VerificationTask
47+
err = cursor.All(ctx, &incompletePrimaries)
48+
if err != nil {
49+
return false, errors.Wrapf(err, "failed to read incomplete %#q task", verificationTaskPrimary)
50+
}
51+
52+
switch len(incompletePrimaries) {
53+
case 0:
54+
// Nothing to do.
55+
case 1:
56+
// Invariant: task status should be “added”.
57+
if incompletePrimaries[0].Status != verificationTaskAdded {
58+
verifier.logger.Panic().
59+
Interface("task", incompletePrimaries[0]).
60+
Msg("Primary task status has invalid state.")
61+
}
62+
63+
verifier.logger.Info().
64+
Msg("Previous verifier run left primary task incomplete. Deleting non-primary tasks.")
65+
66+
deleted, err := taskColl.DeleteMany(
67+
ctx,
68+
bson.M{
69+
"type": bson.M{
70+
"$ne": verificationTaskPrimary,
71+
},
72+
},
73+
)
74+
if err != nil {
75+
return false, errors.Wrapf(err, "failed to delete non-%#q tasks", verificationTaskPrimary)
76+
}
77+
78+
verifier.logger.Info().
79+
Int64("deletedTasksCount", deleted.DeletedCount).
80+
Msg("Found and deleted non-primary tasks.")
81+
82+
return true, nil
83+
default:
84+
verifier.logger.Panic().
85+
Interface("tasks", incompletePrimaries).
86+
Msg("Found multiple incomplete primary tasks; there should only be 1.")
87+
}
88+
89+
return false, nil
90+
}
91+
92+
func (verifier *Verifier) resetCollectionTasksIfNeeded(ctx mongo.SessionContext) error {
93+
taskColl := verifier.verificationTaskCollection()
94+
95+
cursor, err := taskColl.Find(
96+
ctx,
97+
bson.M{
98+
"type": verificationTaskVerifyCollection,
99+
"status": verificationTaskProcessing,
100+
},
101+
)
102+
if err != nil {
103+
return errors.Wrapf(err, "failed to find incomplete %#q tasks", verificationTaskVerifyCollection)
104+
}
105+
var incompleteCollTasks []VerificationTask
106+
err = cursor.All(ctx, &incompleteCollTasks)
107+
if err != nil {
108+
return errors.Wrapf(err, "failed to read incomplete %#q tasks", verificationTaskVerifyCollection)
109+
}
110+
111+
if len(incompleteCollTasks) > 0 {
112+
verifier.logger.Info().
113+
Int("count", len(incompleteCollTasks)).
114+
Msg("Previous verifier run left collection-level verification task(s) pending. Resetting.")
115+
}
116+
117+
for _, task := range incompleteCollTasks {
118+
_, err := taskColl.DeleteMany(
119+
ctx,
120+
bson.M{
121+
"type": verificationTaskVerifyDocuments,
122+
"query_filter.namespace": task.QueryFilter.Namespace,
123+
},
124+
)
125+
if err != nil {
126+
return errors.Wrapf(err, "failed to delete namespace %#q's %#q tasks", task.QueryFilter.Namespace, verificationTaskVerifyDocuments)
127+
}
128+
129+
_, err = taskColl.UpdateOne(
130+
ctx,
131+
bson.M{
132+
"type": verificationTaskVerifyCollection,
133+
"query_filter.namespace": task.QueryFilter.Namespace,
134+
},
135+
defaultTaskUpdate,
136+
)
137+
if err != nil {
138+
return errors.Wrapf(err, "failed to reset namespace %#q's %#q task", task.QueryFilter.Namespace, verificationTaskVerifyCollection)
139+
}
140+
}
141+
142+
return nil
143+
}
144+
145+
func (verifier *Verifier) resetPartitionTasksIfNeeded(ctx mongo.SessionContext) error {
146+
taskColl := verifier.verificationTaskCollection()
147+
148+
_, err := taskColl.UpdateMany(
149+
ctx,
150+
bson.M{
151+
"type": verificationTaskVerifyDocuments,
152+
"status": verificationTaskProcessing,
153+
},
154+
defaultTaskUpdate,
155+
)
156+
if err != nil {
157+
return errors.Wrapf(err, "failed to reset in-progress %#q tasks", verificationTaskVerifyDocuments)
158+
}
159+
160+
return nil
161+
}

0 commit comments

Comments
 (0)