Skip to content

Commit e7d8a4b

Browse files
committed
preliminary
1 parent 6b53825 commit e7d8a4b

File tree

12 files changed

+702
-50
lines changed

12 files changed

+702
-50
lines changed

internal/util/value.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package util
2+
3+
// ResetToZero is a convenience to set any variable to its
4+
// type's zero value without writing out the type.
5+
func ResetToZero[T any](ptr *T) {
6+
*ptr = *new(T)
7+
}

internal/verifier/change_stream.go

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -160,15 +160,21 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
160160
eventsRead++
161161
}
162162

163-
if eventsRead > 0 {
164-
verifier.logger.Debug().Int("eventsCount", eventsRead).Msgf("Received a batch of events.")
165-
err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch)
166-
if err != nil {
167-
return false, errors.Wrap(err, "failed to handle change events")
168-
}
163+
if cs.Err() != nil {
164+
return false, errors.Wrap(cs.Err(), "change stream iteration failed")
165+
}
166+
167+
if eventsRead == 0 {
168+
return false, nil
169169
}
170170

171-
return eventsRead > 0, errors.Wrap(cs.Err(), "change stream iteration failed")
171+
verifier.logger.Debug().Int("eventsCount", eventsRead).Msgf("Received a batch of events.")
172+
err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch)
173+
if err != nil {
174+
return false, errors.Wrap(err, "failed to handle change events")
175+
}
176+
177+
return true, nil
172178
}
173179

174180
for {
@@ -184,15 +190,31 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
184190
// If the changeStreamEnderChan has a message, the user has indicated that
185191
// source writes are ended. This means we should exit rather than continue
186192
// reading the change stream since there should be no more events.
187-
case <-verifier.changeStreamEnderChan:
193+
case finalTs := <-verifier.changeStreamFinalTsChan:
188194
verifier.logger.Debug().
189-
Msg("Change stream thread received shutdown request.")
195+
Interface("finalTimestamp", finalTs).
196+
Msg("Change stream thread received final timestamp.")
190197

191198
changeStreamEnded = true
192199

193200
// Read all change events until the source reports no events.
194201
// (i.e., the `getMore` call returns empty)
195202
for {
203+
curTs, err := extractTimestampFromResumeToken(cs.ResumeToken())
204+
if err != nil {
205+
err = errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
206+
break
207+
}
208+
209+
if curTs.Compare(finalTs) >= 0 {
210+
verifier.logger.Debug().
211+
Interface("currentTimestamp", curTs).
212+
Interface("finalTimestamp", finalTs).
213+
Msg("Change stream has reached the final timestamp. Shutting down.")
214+
215+
break
216+
}
217+
196218
var gotEvent bool
197219
gotEvent, err = readAndHandleOneChangeEventBatch()
198220

internal/verifier/check.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
mapset "github.com/deckarep/golang-set/v2"
1010
"github.com/pkg/errors"
1111
"go.mongodb.org/mongo-driver/bson"
12+
"go.mongodb.org/mongo-driver/bson/primitive"
1213
"go.mongodb.org/mongo-driver/mongo"
1314
)
1415

@@ -42,13 +43,13 @@ func (verifier *Verifier) Check(ctx context.Context, filter map[string]any) {
4243
verifier.MaybeStartPeriodicHeapProfileCollection(ctx)
4344
}
4445

45-
func (verifier *Verifier) waitForChangeStream() error {
46+
func (verifier *Verifier) finalizeChangeStream(finalTs primitive.Timestamp) error {
4647
verifier.mux.RLock()
4748
csRunning := verifier.changeStreamRunning
4849
verifier.mux.RUnlock()
4950
if csRunning {
5051
verifier.logger.Debug().Msg("Changestream still running, signalling that writes are done and waiting for change stream to exit")
51-
verifier.changeStreamEnderChan <- struct{}{}
52+
verifier.changeStreamFinalTsChan <- finalTs
5253
select {
5354
case err := <-verifier.changeStreamErrChan:
5455
verifier.logger.Warn().Err(err).
@@ -238,11 +239,11 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
238239
// will result in an extra iteration. The odds of this are lower and the user should be
239240
// paying attention. Also, this should not matter too much because any failures will be
240241
// caught again on the next iteration.
241-
if verifier.writesOff {
242+
if ts, writesOff := verifier.writesOffTs.Get(); writesOff {
242243
// It's necessary to wait for the change stream to finish before incrementing the
243244
// generation number, or the last changes will not be checked.
244245
verifier.mux.Unlock()
245-
err := verifier.waitForChangeStream()
246+
err := verifier.finalizeChangeStream(ts)
246247
if err != nil {
247248
return err
248249
}

internal/verifier/clustertime.go

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -54,23 +54,13 @@ func GetClusterTime(
5454
// OPTIMIZATION: We now append an oplog entry--this time for real!--to
5555
// cause any lagging shards to output their events.
5656
//
57-
// Since this is just an optimization, failures here are nonfatal.
58-
err = retryer.RunForTransientErrorsOnly(
59-
ctx,
60-
logger,
61-
func(_ *retry.Info) error {
62-
var err error
63-
optime, err = runAppendOplogNote(ctx, client, false)
64-
if err != nil {
65-
err = fmt.Errorf("%w: %w", retry.CustomTransientErr, err)
66-
}
67-
return err
68-
},
69-
)
57+
// Since this is just an optimization, failures here are nonfatal,
58+
// and we might as well forgo the retryer.
59+
_, err = runAppendOplogNote(ctx, client, false)
7060
if err != nil {
7161
// This isn't serious enough even to warn on, so leave it at info-level.
7262
logger.Info().Err(err).
73-
Msg("Failed to append oplog note; change stream may need extra time to finish.")
63+
Msg("Failed to update oplog; change stream may need extra time to finish.")
7464
}
7565

7666
return optime, nil

internal/verifier/migration_verifier.go

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ import (
2121
"github.com/10gen/migration-verifier/internal/reportutils"
2222
"github.com/10gen/migration-verifier/internal/retry"
2323
"github.com/10gen/migration-verifier/internal/types"
24+
"github.com/10gen/migration-verifier/internal/util"
2425
"github.com/10gen/migration-verifier/internal/uuidutil"
26+
"github.com/10gen/migration-verifier/option"
2527
"github.com/olekukonko/tablewriter"
2628
"github.com/pkg/errors"
2729
"github.com/rs/zerolog"
@@ -78,7 +80,7 @@ var timeFormat = time.RFC3339
7880

7981
// Verifier is the main state for the migration verifier
8082
type Verifier struct {
81-
writesOff bool
83+
writesOffTs option.Option[primitive.Timestamp]
8284
lastGeneration bool
8385
running bool
8486
generation int
@@ -123,12 +125,12 @@ type Verifier struct {
123125
metaDBName string
124126
srcStartAtTs *primitive.Timestamp
125127

126-
mux sync.RWMutex
127-
changeStreamRunning bool
128-
changeStreamEnderChan chan struct{}
129-
changeStreamErrChan chan error
130-
changeStreamDoneChan chan struct{}
131-
lastChangeEventTime *primitive.Timestamp
128+
mux sync.RWMutex
129+
changeStreamRunning bool
130+
changeStreamFinalTsChan chan primitive.Timestamp
131+
changeStreamErrChan chan error
132+
changeStreamDoneChan chan struct{}
133+
lastChangeEventTime *primitive.Timestamp
132134

133135
readConcernSetting ReadConcernSetting
134136

@@ -188,15 +190,15 @@ func NewVerifier(settings VerifierSettings) *Verifier {
188190
}
189191

190192
return &Verifier{
191-
phase: Idle,
192-
numWorkers: NumWorkers,
193-
readPreference: readpref.Primary(),
194-
partitionSizeInBytes: 400 * 1024 * 1024,
195-
failureDisplaySize: DefaultFailureDisplaySize,
196-
changeStreamEnderChan: make(chan struct{}),
197-
changeStreamErrChan: make(chan error),
198-
changeStreamDoneChan: make(chan struct{}),
199-
readConcernSetting: readConcern,
193+
phase: Idle,
194+
numWorkers: NumWorkers,
195+
readPreference: readpref.Primary(),
196+
partitionSizeInBytes: 400 * 1024 * 1024,
197+
failureDisplaySize: DefaultFailureDisplaySize,
198+
changeStreamFinalTsChan: make(chan primitive.Timestamp),
199+
changeStreamErrChan: make(chan error),
200+
changeStreamDoneChan: make(chan struct{}),
201+
readConcernSetting: readConcern,
200202

201203
// This will get recreated once gen0 starts, but we want it
202204
// here in case the change streams gets an event before then.
@@ -228,18 +230,30 @@ func (verifier *Verifier) SetFailureDisplaySize(size int64) {
228230
verifier.failureDisplaySize = size
229231
}
230232

231-
func (verifier *Verifier) WritesOff(ctx context.Context) {
233+
func (verifier *Verifier) WritesOff(ctx context.Context) error {
232234
verifier.logger.Debug().
233235
Msg("WritesOff called.")
234236

235237
verifier.mux.Lock()
236-
verifier.writesOff = true
237-
verifier.mux.Unlock()
238+
defer verifier.mux.Unlock()
239+
writesOffTs, err := GetClusterTime(
240+
ctx,
241+
verifier.logger,
242+
verifier.srcClient,
243+
)
244+
245+
if err != nil {
246+
return errors.Wrapf(err, "failed to fetch source's cluster time")
247+
}
248+
249+
verifier.writesOffTs = option.Some(writesOffTs)
250+
251+
return nil
238252
}
239253

240254
func (verifier *Verifier) WritesOn(ctx context.Context) {
241255
verifier.mux.Lock()
242-
verifier.writesOff = false
256+
util.ResetToZero(&verifier.writesOffTs)
243257
verifier.mux.Unlock()
244258
}
245259

internal/verifier/web_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ const RequestInProgressErrorDescription = "Another request is currently in progr
2222
// MigrationVerifierAPI represents the interaction webserver with mongosync
2323
type MigrationVerifierAPI interface {
2424
Check(ctx context.Context, filter map[string]any)
25-
WritesOff(ctx context.Context)
25+
WritesOff(ctx context.Context) error
2626
WritesOn(ctx context.Context)
2727
GetProgress(ctx context.Context) (Progress, error)
2828
}

internal/verifier/web_server_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ func NewMockVerifier() *MockVerifier {
3030
func (verifier *MockVerifier) Check(ctx context.Context, filter map[string]any) {
3131
verifier.filter = filter
3232
}
33-
func (verifier *MockVerifier) WritesOff(ctx context.Context) {}
34-
func (verifier *MockVerifier) WritesOn(ctx context.Context) {}
33+
func (verifier *MockVerifier) WritesOff(ctx context.Context) error { return nil }
34+
func (verifier *MockVerifier) WritesOn(ctx context.Context) {}
3535
func (verifier *MockVerifier) GetProgress(ctx context.Context) (Progress, error) {
3636
return Progress{}, nil
3737
}

option/bson.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package option
2+
3+
import (
4+
"github.com/pkg/errors"
5+
"go.mongodb.org/mongo-driver/bson"
6+
"go.mongodb.org/mongo-driver/bson/bsontype"
7+
"go.mongodb.org/mongo-driver/bson/primitive"
8+
)
9+
10+
// MarshalBSONValue implements bson.ValueMarshaler.
11+
func (o Option[T]) MarshalBSONValue() (bsontype.Type, []byte, error) {
12+
val, exists := o.Get()
13+
if !exists {
14+
return bson.MarshalValue(primitive.Null{})
15+
}
16+
17+
return bson.MarshalValue(val)
18+
}
19+
20+
// UnmarshalBSONValue implements bson.ValueUnmarshaler.
21+
func (o *Option[T]) UnmarshalBSONValue(bType bsontype.Type, raw []byte) error {
22+
23+
switch bType {
24+
case bson.TypeNull:
25+
o.val = nil
26+
27+
default:
28+
valPtr := new(T)
29+
30+
err := bson.UnmarshalValue(bType, raw, &valPtr)
31+
if err != nil {
32+
return errors.Wrapf(err, "failed to unmarshal %T", *o)
33+
}
34+
35+
// This may not even be possible, but we should still check.
36+
if isNil(*valPtr) {
37+
return errors.Wrapf(err, "refuse to unmarshal nil %T value", *o)
38+
}
39+
40+
o.val = valPtr
41+
}
42+
43+
return nil
44+
}
45+
46+
// IsZero implements bsoncodec.Zeroer.
47+
func (o Option[T]) IsZero() bool {
48+
return o.IsNone()
49+
}

option/json.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package option
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
)
7+
8+
var _ json.Marshaler = &Option[int]{}
9+
var _ json.Unmarshaler = &Option[int]{}
10+
11+
// MarshalJSON encodes Option into json.
12+
func (o Option[T]) MarshalJSON() ([]byte, error) {
13+
val, exists := o.Get()
14+
if exists {
15+
return json.Marshal(val)
16+
}
17+
18+
return json.Marshal(nil)
19+
}
20+
21+
// UnmarshalJSON decodes Option from json.
22+
func (o *Option[T]) UnmarshalJSON(b []byte) error {
23+
if bytes.Equal(b, []byte("null")) {
24+
o.val = nil
25+
} else {
26+
val := *new(T)
27+
28+
err := json.Unmarshal(b, &val)
29+
if err != nil {
30+
return err
31+
}
32+
33+
o.val = &val
34+
}
35+
36+
return nil
37+
}

0 commit comments

Comments
 (0)