Skip to content

Commit 9162f5d

Browse files
authored
Allow arbitrary DDL on the destination. (#125)
Some DDL operations are internal to a migration. We currently allow `modify` events for so that capped collections can have their limits set at the end of a migration. We also need to allow `createIndexes` to accommodate migrations where the destination’s indexes are created after initial sync. Rather than allowing specific events, though, this changeset generalizes tolerance of all unknown events—which should be DDL—on the destination. Any time such an event appears in the destination’s change stream, we now log about it then discard it. We still fail if a DDL event happens on the source. This allows migrator tools flexibility in when they create collections or indexes while still making verification fail if an unsupported event happens on the source.
1 parent 9431222 commit 9162f5d

File tree

2 files changed

+30
-10
lines changed

2 files changed

+30
-10
lines changed

internal/verifier/change_stream.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,12 @@ import (
2323
"golang.org/x/exp/slices"
2424
)
2525

26-
type modifyEventHandling string
26+
type ddlEventHandling string
2727

2828
const (
2929
fauxDocSizeForDeleteEvents = 1024
3030

31-
modifyEventType = "modify"
32-
33-
onModifyEventIgnore modifyEventHandling = "ignore"
31+
onDDLEventAllow ddlEventHandling = "allow"
3432
)
3533

3634
var supportedEventOpTypes = mapset.NewSet(
@@ -101,7 +99,7 @@ type ChangeStreamReader struct {
10199

102100
lag *msync.TypedAtomic[option.Option[time.Duration]]
103101

104-
onModifyEvent modifyEventHandling
102+
onDDLEvent ddlEventHandling
105103
}
106104

107105
func (verifier *Verifier) initializeChangeStreamReaders() {
@@ -134,7 +132,7 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
134132
handlerError: util.NewEventual[error](),
135133
doneChan: make(chan struct{}),
136134
lag: msync.NewTypedAtomic(option.None[time.Duration]()),
137-
onModifyEvent: onModifyEventIgnore,
135+
onDDLEvent: onDDLEventAllow,
138136
}
139137
}
140138

@@ -381,14 +379,16 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
381379
opType := changeEvents[eventsRead].OpType
382380
if !supportedEventOpTypes.Contains(opType) {
383381

384-
// We expect modify events on the destination as part of finalizing
382+
// We expect certain DDL events on the destination as part of
385383
// a migration. For example, mongosync enables indexes’ uniqueness
386-
// constraints and sets capped collection sizes.
387-
if opType == modifyEventType && csr.onModifyEvent == onModifyEventIgnore {
384+
// constraints and sets capped collection sizes, and sometimes
385+
// indexes are created after initial sync.
386+
387+
if csr.onDDLEvent == onDDLEventAllow {
388388
csr.logger.Info().
389389
Stringer("changeStream", csr).
390390
Stringer("event", cs.Current).
391-
Msg("This event is probably internal to the migration. Ignoring.")
391+
Msg("Ignoring event with unrecognized type on destination. (It’s assumedly internal to the migration.)")
392392

393393
// Discard this event, then keep reading.
394394
changeEvents = changeEvents[:len(changeEvents)-1]

internal/verifier/change_stream_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,20 @@ func (suite *IntegrationTestSuite) TestTolerateDestinationCollMod() {
784784
"should alter capped size",
785785
)
786786

787+
_, err = suite.dstMongoClient.
788+
Database(suite.DBNameForTest()).
789+
Collection("mycoll").
790+
Indexes().CreateOne(
791+
ctx,
792+
mongo.IndexModel{
793+
Keys: bson.D{
794+
{"foo", 1},
795+
{"barbar", 1},
796+
},
797+
},
798+
)
799+
suite.Require().NoError(err, "should create index")
800+
787801
err = verifier.WritesOff(ctx)
788802
if err == nil {
789803
err = verifierRunner.Await()
@@ -796,6 +810,12 @@ func (suite *IntegrationTestSuite) TestTolerateDestinationCollMod() {
796810
"cappedSize",
797811
"modify event should be recorded in log",
798812
)
813+
814+
suite.Assert().Contains(
815+
logBuffer.String(),
816+
"barbar_1",
817+
"createIndexes event should be recorded in log",
818+
)
799819
}
800820

801821
func (suite *IntegrationTestSuite) TestRecheckDocsWithDstChangeEvents() {

0 commit comments

Comments
 (0)