Skip to content

Commit 2213a72

Browse files
committed
update types etc.
1 parent 3726807 commit 2213a72

File tree

4 files changed

+48
-37
lines changed

4 files changed

+48
-37
lines changed

.github/workflows/all.yml

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,12 @@ jobs:
6060
with:
6161
go-version: stable
6262

63-
- name: Install m
64-
run: npm install -g m mongosh
63+
- name: Install m and mtools
64+
run: |-
65+
{
66+
echo npm install -g m
67+
echo pipx install 'mtools[all]'
68+
} | parallel
6569
6670
- name: Install MongoDB ${{ matrix.mongodb_versions[0] }} (source)
6771
run: yes | m ${{ matrix.mongodb_versions[0] }} && dirname $(readlink $(which mongod)) > .srcpath
@@ -72,18 +76,15 @@ jobs:
7276
- name: Install latest stable MongoDB (metadata)
7377
run: yes | m stable && dirname $(readlink $(which mongod)) > .metapath
7478

75-
- name: Install mtools
76-
run: pipx install 'mtools[all]'
77-
7879
- name: Build
7980
run: go build main/migration_verifier.go
8081

8182
- name: Start clusters
8283
run: |-
8384
{
84-
echo "mlaunch init --binarypath $(cat .srcpath) --port 27020 --dir src --replicaset ${{ matrix.topology.args }}"
85-
echo "mlaunch init --binarypath $(cat .dstpath) --port 27030 --dir dst --replicaset ${{ matrix.topology.args }}"
86-
echo "mlaunch init --binarypath $(cat .metapath) --port 27040 --dir meta --replicaset --nodes 1"
85+
echo mlaunch init --binarypath $(cat .srcpath) --port 27020 --dir src --replicaset ${{ matrix.topology.args }}
86+
echo mlaunch init --binarypath $(cat .dstpath) --port 27030 --dir dst --replicaset ${{ matrix.topology.args }}
87+
echo mlaunch init --binarypath $(cat .metapath) --port 27040 --dir meta --replicaset --nodes 1
8788
} | parallel
8889
8990
- name: Test

internal/verifier/change_stream.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ const (
4242
)
4343

4444
type UnknownEventError struct {
45+
Event ParsedEvent
4546
RawEvent bson.Raw
4647
}
4748

@@ -96,7 +97,10 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
9697
dataSizes[i] = len(changeEvent.FullDocument)
9798
}
9899
default:
99-
return UnknownEventError{RawEvent: rawChangeEvent}
100+
return UnknownEventError{
101+
Event: changeEvent,
102+
RawEvent: rawChangeEvent,
103+
}
100104
}
101105
}
102106

@@ -138,7 +142,7 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
138142
writesOffTs *primitive.Timestamp,
139143
) error {
140144
eventsRead := 0
141-
var changeEventBatch []ParsedEvent
145+
var changeEventBatch []bson.Raw
142146

143147
for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 {
144148
// Once the change stream reaches the writesOff timestamp we should stop reading.
@@ -163,12 +167,10 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
163167
}
164168

165169
if changeEventBatch == nil {
166-
changeEventBatch = make([]ParsedEvent, cs.RemainingBatchLength()+1)
170+
changeEventBatch = make([]bson.Raw, cs.RemainingBatchLength()+1)
167171
}
168172

169-
if err := cs.Decode(&changeEventBatch[eventsRead]); err != nil {
170-
return errors.Wrap(err, "failed to decode change event")
171-
}
173+
copy(changeEventBatch[eventsRead], cs.Current)
172174

173175
eventsRead++
174176
}

internal/verifier/migration_verifier_test.go

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -145,28 +145,36 @@ func (suite *IntegrationTestSuite) TestGetNamespaceStatistics_Recheck() {
145145

146146
err := verifier.HandleChangeStreamEvents(
147147
ctx,
148-
[]ParsedEvent{{
149-
OpType: "insert",
150-
Ns: &Namespace{DB: "mydb", Coll: "coll2"},
151-
DocKey: DocKey{
152-
ID: "heyhey",
153-
},
154-
}},
148+
[]bson.Raw{
149+
testutil.MustMarshal(
150+
ParsedEvent{
151+
OpType: "insert",
152+
Ns: &Namespace{DB: "mydb", Coll: "coll2"},
153+
DocKey: DocKey{
154+
ID: "heyhey",
155+
},
156+
},
157+
),
158+
},
155159
)
156160
suite.Require().NoError(err)
157161

158162
err = verifier.HandleChangeStreamEvents(
159163
ctx,
160-
[]ParsedEvent{{
161-
ID: bson.M{
162-
"docID": "ID/docID",
163-
},
164-
OpType: "insert",
165-
Ns: &Namespace{DB: "mydb", Coll: "coll1"},
166-
DocKey: DocKey{
167-
ID: "hoohoo",
168-
},
169-
}},
164+
[]bson.Raw{
165+
testutil.MustMarshal(
166+
ParsedEvent{
167+
ID: bson.M{
168+
"docID": "ID/docID",
169+
},
170+
OpType: "insert",
171+
Ns: &Namespace{DB: "mydb", Coll: "coll1"},
172+
DocKey: DocKey{
173+
ID: "hoohoo",
174+
},
175+
},
176+
),
177+
},
170178
)
171179
suite.Require().NoError(err)
172180

@@ -408,19 +416,19 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() {
408416
},
409417
}
410418

411-
err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event})
419+
err = verifier.HandleChangeStreamEvents(ctx, []bson.Raw{testutil.MustMarshal(event)})
412420
suite.Require().NoError(err)
413421
event.OpType = "insert"
414-
err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event})
422+
err = verifier.HandleChangeStreamEvents(ctx, []bson.Raw{testutil.MustMarshal(event)})
415423
suite.Require().NoError(err)
416424
event.OpType = "replace"
417-
err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event})
425+
err = verifier.HandleChangeStreamEvents(ctx, []bson.Raw{testutil.MustMarshal(event)})
418426
suite.Require().NoError(err)
419427
event.OpType = "update"
420-
err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event})
428+
err = verifier.HandleChangeStreamEvents(ctx, []bson.Raw{testutil.MustMarshal(event)})
421429
suite.Require().NoError(err)
422430
event.OpType = "flibbity"
423-
err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event})
431+
err = verifier.HandleChangeStreamEvents(ctx, []bson.Raw{testutil.MustMarshal(event)})
424432
badEventErr := UnknownEventError{}
425433
suite.Require().ErrorAs(err, &badEventErr)
426434
suite.Assert().Equal("flibbity", badEventErr.Event.OpType)

internal/verifier/recheck_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() {
5252
FullDocument: testutil.MustMarshal(bson.D{{"foo", 1}}),
5353
}
5454

55-
err := verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event})
55+
err := verifier.HandleChangeStreamEvents(ctx, []bson.Raw{testutil.MustMarshal(event)})
5656
suite.Require().NoError(err)
5757

5858
recheckDocs = suite.fetchRecheckDocs(ctx, verifier)

0 commit comments

Comments
 (0)