Skip to content

Commit 063c8e5

Browse files
authored
REP-5140 Report change events in progress logs. (#25)
Change events can cause confusion due to how the verifier tallies them alongside missing documents. This changeset tries to add clarity by logging the change events seen during a given generation. A bit of extra debug logging is added here as well.
1 parent 5639f59 commit 063c8e5

File tree

11 files changed

+290
-15
lines changed

11 files changed

+290
-15
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ metaURI: mongodb://localhost:28012
7878
1. After launching the verifier (see above), you can send it requests to get it to start verifying. The verification process is started by using the `check`command. An [optional `filter` parameter](#document-filtering) can be passed within the `check` request body to only check documents within that filter. The verification process will keep running until you tell the verifier to stop. It will keep track of the inconsistencies it has found and will keep checking those inconsistencies hoping that eventually they will resolve.
7979

8080
```
81-
curl -H "Content-Type: application/json" -X POST -d '{}' http://127.0.0.1:27020/api/v1/check
81+
curl -H "Content-Type: application/json" -d '{}' http://127.0.0.1:27020/api/v1/check
8282
```
8383
8484

internal/verifier/change_stream.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEve
6363
case "replace":
6464
fallthrough
6565
case "update":
66+
if err := verifier.eventRecorder.AddEvent(changeEvent); err != nil {
67+
return errors.Wrapf(err, "failed to augment stats with change event: %+v", *changeEvent)
68+
}
69+
6670
return verifier.InsertChangeEventRecheckDoc(ctx, changeEvent)
6771
default:
6872
return UnknownEventError{Event: changeEvent}

internal/verifier/check.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
197197
// Now enter the multi-generational steady check state
198198
for {
199199
verifier.generationStartTime = time.Now()
200+
verifier.eventRecorder.Reset()
200201

201202
err := verifier.CheckWorker(ctx)
202203
if err != nil {
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package verifier
2+
3+
import (
4+
"github.com/10gen/migration-verifier/msync"
5+
"github.com/pkg/errors"
6+
"golang.org/x/exp/maps"
7+
)
8+
9+
type eventRecorderMap = map[string]PerNamespaceStats
10+
11+
// EventRecorder maintains statistics on change events.
12+
type EventRecorder struct {
13+
guard *msync.DataGuard[eventRecorderMap]
14+
}
15+
16+
// PerNamespaceStats records a given namespace’s event totals by optype.
17+
type PerNamespaceStats struct {
18+
Insert int
19+
Update int
20+
Replace int
21+
Delete int
22+
}
23+
24+
func (pns PerNamespaceStats) Total() int {
25+
return pns.Insert + pns.Update + pns.Replace + pns.Delete
26+
}
27+
28+
// NewEventRecorder creates and returns a new EventRecorder.
29+
func NewEventRecorder() *EventRecorder {
30+
return &EventRecorder{
31+
guard: msync.NewDataGuard(eventRecorderMap{}),
32+
}
33+
}
34+
35+
func (er EventRecorder) Reset() {
36+
er.guard.Store(func(m eventRecorderMap) eventRecorderMap {
37+
return eventRecorderMap{}
38+
})
39+
}
40+
41+
// AddEvent adds a ParsedEvent to the EventRecorder’s statistics.
42+
func (er EventRecorder) AddEvent(changeEvent *ParsedEvent) error {
43+
// This shouldn’t happen, but just in case:
44+
if changeEvent.Ns == nil {
45+
return errors.Errorf("Change event lacks a namespace: %+v", changeEvent)
46+
}
47+
48+
nsStr := changeEvent.Ns.DB + "." + changeEvent.Ns.Coll
49+
50+
var err error
51+
52+
er.guard.Store(func(m eventRecorderMap) eventRecorderMap {
53+
if _, exists := m[nsStr]; !exists {
54+
m[nsStr] = PerNamespaceStats{}
55+
}
56+
57+
nsStats := m[nsStr]
58+
59+
switch changeEvent.OpType {
60+
case "insert":
61+
nsStats.Insert++
62+
case "update":
63+
nsStats.Update++
64+
case "replace":
65+
nsStats.Replace++
66+
case "delete":
67+
nsStats.Delete++
68+
default:
69+
err = errors.Errorf("Event recorder received event with unknown optype: %+v", *changeEvent)
70+
}
71+
72+
m[nsStr] = nsStats
73+
74+
return m
75+
})
76+
77+
return err
78+
}
79+
80+
// Read returns a map of the tracked change events. The map
81+
// indexes on namespace then event optype. Each namespace will
82+
// have `insert`, `update`
83+
func (er EventRecorder) Read() eventRecorderMap {
84+
var theCopy eventRecorderMap
85+
86+
er.guard.Load(func(m eventRecorderMap) {
87+
theCopy = maps.Clone(m)
88+
})
89+
90+
return theCopy
91+
}

internal/verifier/migration_verifier.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ type Verifier struct {
9393
numWorkers int
9494
failureDisplaySize int64
9595

96+
eventRecorder *EventRecorder
97+
9698
// Used only with generation 0 to defer the first
9799
// progress report until after we’ve finished partitioning
98100
// every collection.
@@ -195,6 +197,10 @@ func NewVerifier(settings VerifierSettings) *Verifier {
195197
changeStreamErrChan: make(chan error),
196198
changeStreamDoneChan: make(chan struct{}),
197199
readConcernSetting: readConcern,
200+
201+
// This will get recreated once gen0 starts, but we want it
202+
// here in case the change streams gets an event before then.
203+
eventRecorder: NewEventRecorder(),
198204
}
199205
}
200206

@@ -223,6 +229,9 @@ func (verifier *Verifier) SetFailureDisplaySize(size int64) {
223229
}
224230

225231
func (verifier *Verifier) WritesOff(ctx context.Context) {
232+
verifier.logger.Debug().
233+
Msg("WritesOff called.")
234+
226235
verifier.mux.Lock()
227236
verifier.writesOff = true
228237
verifier.mux.Unlock()
@@ -1300,6 +1309,8 @@ func (verifier *Verifier) PrintVerificationSummary(ctx context.Context, genstatu
13001309
verifier.logger.Err(err).Msgf("Failed to report per-namespace statistics")
13011310
}
13021311

1312+
verifier.printChangeEventStatistics(strBuilder)
1313+
13031314
var statusLine string
13041315

13051316
if hasTasks {

internal/verifier/recheck.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55

66
"github.com/10gen/migration-verifier/internal/types"
7+
"github.com/pkg/errors"
78
"go.mongodb.org/mongo-driver/bson"
89
"go.mongodb.org/mongo-driver/mongo"
910
"go.mongodb.org/mongo-driver/mongo/options"
@@ -34,7 +35,11 @@ type RecheckDoc struct {
3435
func (verifier *Verifier) InsertFailedCompareRecheckDocs(
3536
namespace string, documentIDs []interface{}, dataSizes []int) error {
3637
dbName, collName := SplitNamespace(namespace)
37-
return verifier.insertRecheckDocs(context.Background(),
38+
39+
verifier.mux.Lock()
40+
defer verifier.mux.Unlock()
41+
42+
return verifier.insertRecheckDocsUnderLock(context.Background(),
3843
dbName, collName, documentIDs, dataSizes)
3944
}
4045

@@ -48,15 +53,20 @@ func (verifier *Verifier) InsertChangeEventRecheckDoc(ctx context.Context, chang
4853
// total data size for noninitial generations in the log.
4954
dataSizes := []int{maxBSONObjSize}
5055

51-
return verifier.insertRecheckDocs(
56+
verifier.mux.Lock()
57+
defer verifier.mux.Unlock()
58+
59+
if err := verifier.eventRecorder.AddEvent(changeEvent); err != nil {
60+
return errors.Wrapf(err, "failed to augment stats with change event: %+v", *changeEvent)
61+
}
62+
63+
return verifier.insertRecheckDocsUnderLock(
5264
ctx, changeEvent.Ns.DB, changeEvent.Ns.Coll, documentIDs, dataSizes)
5365
}
5466

55-
func (verifier *Verifier) insertRecheckDocs(
67+
func (verifier *Verifier) insertRecheckDocsUnderLock(
5668
ctx context.Context,
5769
dbName, collName string, documentIDs []interface{}, dataSizes []int) error {
58-
verifier.mux.Lock()
59-
defer verifier.mux.Unlock()
6070

6171
generation, _ := verifier.getGenerationWhileLocked()
6272

internal/verifier/recheck_test.go

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ func (suite *MultiMetaVersionTestSuite) TestFailedCompareThenReplace() {
4040
)
4141

4242
event := ParsedEvent{
43+
OpType: "insert",
4344
DocKey: DocKey{
4445
ID: "theDocID",
4546
},
@@ -95,7 +96,7 @@ func (suite *MultiMetaVersionTestSuite) TestLargeIDInsertions() {
9596
id3 := strings.Repeat("c", overlyLarge)
9697
ids := []interface{}{id1, id2, id3}
9798
dataSizes := []int{overlyLarge, overlyLarge, overlyLarge}
98-
err := verifier.insertRecheckDocs(ctx, "testDB", "testColl", ids, dataSizes)
99+
err := insertRecheckDocs(ctx, verifier, "testDB", "testColl", ids, dataSizes)
99100
suite.Require().NoError(err)
100101

101102
d1 := RecheckDoc{
@@ -156,7 +157,7 @@ func (suite *MultiMetaVersionTestSuite) TestLargeDataInsertions() {
156157
id3 := "c"
157158
ids := []interface{}{id1, id2, id3}
158159
dataSizes := []int{400 * 1024, 700 * 1024, 1024}
159-
err := verifier.insertRecheckDocs(ctx, "testDB", "testColl", ids, dataSizes)
160+
err := insertRecheckDocs(ctx, verifier, "testDB", "testColl", ids, dataSizes)
160161
suite.Require().NoError(err)
161162
d1 := RecheckDoc{
162163
PrimaryKey: RecheckPrimaryKey{
@@ -217,13 +218,13 @@ func (suite *MultiMetaVersionTestSuite) TestMultipleNamespaces() {
217218
id3 := "c"
218219
ids := []interface{}{id1, id2, id3}
219220
dataSizes := []int{1000, 1000, 1000}
220-
err := verifier.insertRecheckDocs(ctx, "testDB1", "testColl1", ids, dataSizes)
221+
err := insertRecheckDocs(ctx, verifier, "testDB1", "testColl1", ids, dataSizes)
221222
suite.Require().NoError(err)
222-
err = verifier.insertRecheckDocs(ctx, "testDB1", "testColl2", ids, dataSizes)
223+
err = insertRecheckDocs(ctx, verifier, "testDB1", "testColl2", ids, dataSizes)
223224
suite.Require().NoError(err)
224-
err = verifier.insertRecheckDocs(ctx, "testDB2", "testColl1", ids, dataSizes)
225+
err = insertRecheckDocs(ctx, verifier, "testDB2", "testColl1", ids, dataSizes)
225226
suite.Require().NoError(err)
226-
err = verifier.insertRecheckDocs(ctx, "testDB2", "testColl2", ids, dataSizes)
227+
err = insertRecheckDocs(ctx, verifier, "testDB2", "testColl2", ids, dataSizes)
227228
suite.Require().NoError(err)
228229

229230
verifier.generation++
@@ -267,17 +268,17 @@ func (suite *MultiMetaVersionTestSuite) TestGenerationalClear() {
267268
id2 := "b"
268269
ids := []interface{}{id1, id2}
269270
dataSizes := []int{1000, 1000}
270-
err := verifier.insertRecheckDocs(ctx, "testDB", "testColl", ids, dataSizes)
271+
err := insertRecheckDocs(ctx, verifier, "testDB", "testColl", ids, dataSizes)
271272
suite.Require().NoError(err)
272273

273274
verifier.generation++
274275

275-
err = verifier.insertRecheckDocs(ctx, "testDB", "testColl", ids, dataSizes)
276+
err = insertRecheckDocs(ctx, verifier, "testDB", "testColl", ids, dataSizes)
276277
suite.Require().NoError(err)
277278

278279
verifier.generation++
279280

280-
err = verifier.insertRecheckDocs(ctx, "testDB", "testColl", ids, dataSizes)
281+
err = insertRecheckDocs(ctx, verifier, "testDB", "testColl", ids, dataSizes)
281282
suite.Require().NoError(err)
282283

283284
d1 := RecheckDoc{
@@ -326,3 +327,16 @@ func (suite *MultiMetaVersionTestSuite) TestGenerationalClear() {
326327
results = suite.fetchRecheckDocs(ctx, verifier)
327328
suite.ElementsMatch([]interface{}{}, results)
328329
}
330+
331+
func insertRecheckDocs(
332+
ctx context.Context,
333+
verifier *Verifier,
334+
dbName, collName string,
335+
documentIDs []any,
336+
dataSizes []int,
337+
) error {
338+
verifier.mux.Lock()
339+
defer verifier.mux.Unlock()
340+
341+
return verifier.insertRecheckDocsUnderLock(ctx, dbName, collName, documentIDs, dataSizes)
342+
}

internal/verifier/summary.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,19 @@ package verifier
77
import (
88
"context"
99
"fmt"
10+
"sort"
11+
"strconv"
1012
"strings"
1113
"time"
1214

1315
"github.com/10gen/migration-verifier/internal/reportutils"
1416
"github.com/10gen/migration-verifier/internal/types"
1517
"github.com/olekukonko/tablewriter"
18+
"golang.org/x/exp/maps"
1619
)
1720

21+
const changeEventsTableMaxSize = 10
22+
1823
// NOTE: Each of the following should print one trailing and one final
1924
// newline.
2025

@@ -362,3 +367,61 @@ func (verifier *Verifier) printMismatchInvestigationNotes(strBuilder *strings.Bu
362367
strBuilder.WriteString(line + "\n")
363368
}
364369
}
370+
371+
func (verifier *Verifier) printChangeEventStatistics(builder *strings.Builder) {
372+
nsStats := verifier.eventRecorder.Read()
373+
374+
activeNamespacesCount := len(nsStats)
375+
376+
totalEvents := 0
377+
nsTotals := map[string]int{}
378+
for ns, events := range nsStats {
379+
nsTotals[ns] = events.Total()
380+
totalEvents += nsTotals[ns]
381+
}
382+
383+
eventsDescr := "none"
384+
if totalEvents > 0 {
385+
eventsDescr = fmt.Sprintf("%d total, across %d namespace(s)", totalEvents, activeNamespacesCount)
386+
}
387+
388+
builder.WriteString(fmt.Sprintf("\nChange events this generation: %s\n", eventsDescr))
389+
390+
if totalEvents == 0 {
391+
return
392+
}
393+
394+
reverseSortedNamespaces := maps.Keys(nsTotals)
395+
sort.Slice(
396+
reverseSortedNamespaces,
397+
func(i, j int) bool {
398+
return nsTotals[reverseSortedNamespaces[i]] > nsTotals[reverseSortedNamespaces[j]]
399+
},
400+
)
401+
402+
// Only report the busiest namespaces.
403+
if len(reverseSortedNamespaces) > changeEventsTableMaxSize {
404+
reverseSortedNamespaces = reverseSortedNamespaces[:changeEventsTableMaxSize]
405+
}
406+
407+
table := tablewriter.NewWriter(builder)
408+
table.SetHeader([]string{"Namespace", "Insert", "Update", "Replace", "Delete", "Total"})
409+
410+
for _, ns := range reverseSortedNamespaces {
411+
curNsStats := nsStats[ns]
412+
413+
table.Append(
414+
append(
415+
[]string{ns},
416+
strconv.Itoa(curNsStats.Insert),
417+
strconv.Itoa(curNsStats.Update),
418+
strconv.Itoa(curNsStats.Replace),
419+
strconv.Itoa(curNsStats.Delete),
420+
strconv.Itoa(curNsStats.Total()),
421+
),
422+
)
423+
}
424+
425+
builder.WriteString("\nMost frequently-changing namespaces:\n")
426+
table.Render()
427+
}

internal/verifier/unit_test_util.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,12 +153,16 @@ func (suite *WithMongodsTestSuite) TearDownSuite() {
153153
}
154154

155155
func (suite *WithMongodsTestSuite) TearDownTest() {
156+
suite.T().Logf("Tearing down test %#q", suite.T().Name())
157+
156158
ctx := context.Background()
157159
for _, client := range []*mongo.Client{suite.srcMongoClient, suite.dstMongoClient, suite.metaMongoClient} {
158160
dbNames, err := client.ListDatabaseNames(ctx, bson.D{})
159161
suite.Require().NoError(err)
160162
for _, dbName := range dbNames {
161163
if !suite.initialDbNames[dbName] {
164+
suite.T().Logf("Dropping database %#q, which seems to have been created during test %#q.", dbName, suite.T().Name())
165+
162166
err = client.Database(dbName).Drop(ctx)
163167
suite.Require().NoError(err)
164168
}

0 commit comments

Comments
 (0)