Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
da4576d
initial code
FGasper Oct 23, 2024
903730a
tweaks
FGasper Oct 23, 2024
7e28f9e
add GH Action to ensure a build with Go 1.18
FGasper Oct 23, 2024
28befad
with
FGasper Oct 23, 2024
00e5e0e
add slices
FGasper Oct 23, 2024
a11f3a4
version file
FGasper Oct 23, 2024
34ea01a
1.19
FGasper Oct 23, 2024
5e7f19f
avoid exp
FGasper Oct 23, 2024
1f6bfc1
revert
FGasper Oct 23, 2024
8d76053
samber/lo
FGasper Oct 23, 2024
2dc8a16
tidy
FGasper Oct 23, 2024
1959209
vendor
FGasper Oct 23, 2024
1d1a5e9
tidy
FGasper Oct 23, 2024
0a59f4d
lint issues
FGasper Oct 23, 2024
cf5cce9
try linting
FGasper Oct 23, 2024
b2dbd82
names
FGasper Oct 23, 2024
484f5b6
lint after build
FGasper Oct 23, 2024
df6e98d
comment
FGasper Oct 23, 2024
6aecd28
event reporter working
FGasper Oct 29, 2024
26a724a
Merge branch 'main' into REP-5140-report-change-events
FGasper Nov 5, 2024
83ea752
revert samber/lo a bit
FGasper Nov 5, 2024
e9eecb0
remove lo
FGasper Nov 5, 2024
5728496
revert
FGasper Nov 5, 2024
539cab7
rename
FGasper Nov 5, 2024
0810360
Merge branch 'main' into REP-5140-report-change-events
FGasper Nov 6, 2024
38bbe0b
Merge branch 'main' into REP-5140-report-change-events
FGasper Nov 6, 2024
e399017
remove sleep …?
FGasper Nov 6, 2024
eece56a
trigger CI
FGasper Nov 7, 2024
a1c1d0c
move NewEventRecorder
FGasper Nov 7, 2024
8b058fb
fix lock
FGasper Nov 7, 2024
d54f561
one got away
FGasper Nov 7, 2024
12e75ea
add optype
FGasper Nov 7, 2024
e740f00
prevent blocking
FGasper Nov 7, 2024
4df1856
warn the error
FGasper Nov 7, 2024
8f17555
provide missing return
FGasper Nov 7, 2024
d7e788d
remove warning
FGasper Nov 7, 2024
bbd5321
detect hang & fail
FGasper Nov 7, 2024
6bab15a
add race testing
FGasper Nov 7, 2024
7d0379c
show full event
FGasper Nov 7, 2024
d91a5cb
different db names per test
FGasper Nov 7, 2024
0e6410f
race with recorder
FGasper Nov 7, 2024
f681fde
try another generation for writesoff?
FGasper Nov 7, 2024
d593640
5-minute timeout
FGasper Nov 7, 2024
b330e18
skip print
FGasper Nov 7, 2024
460a03f
don’t tally events
FGasper Nov 7, 2024
0f52df5
revert check.go … ?
FGasper Nov 7, 2024
77eaf87
revert change_stream
FGasper Nov 7, 2024
a87bebc
revert test
FGasper Nov 7, 2024
c927ea8
no race
FGasper Nov 7, 2024
2c4d7af
Revert "revert test"
FGasper Nov 7, 2024
7b3397a
remove extra channel ops
FGasper Nov 7, 2024
ca9d995
no worker delay
FGasper Nov 7, 2024
bb72776
Revert "revert change_stream"
FGasper Nov 7, 2024
668567c
restore extra break
FGasper Nov 7, 2024
07b4292
remove timeout
FGasper Nov 7, 2024
d21c153
remove DBs after each
FGasper Nov 8, 2024
75df85c
save
FGasper Nov 8, 2024
49689c1
Revert "revert change_stream"
FGasper Nov 8, 2024
344e275
Revert "don’t tally events"
FGasper Nov 8, 2024
81da7d9
comment out test; annotate teardown steps
FGasper Nov 8, 2024
64ff828
debug
FGasper Nov 8, 2024
491fe80
more debug
FGasper Nov 8, 2024
9a2990c
try 1s worker delay
FGasper Nov 8, 2024
47f1aad
more diag
FGasper Nov 8, 2024
6fc9692
debug messages
FGasper Nov 8, 2024
5cfb231
millis=0
FGasper Nov 8, 2024
1965ed2
not printf
FGasper Nov 8, 2024
068ae34
Merge branch 'main' into REP-5140-report-change-events
FGasper Nov 8, 2024
1e4bf10
revert unneeded changes
FGasper Nov 8, 2024
6787bba
Merge branch 'main' into REP-5140-report-change-events
FGasper Nov 8, 2024
b494610
Merge branch 'main' into REP-5140-report-change-events
FGasper Nov 8, 2024
e658032
remove unused
FGasper Nov 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ metaURI: mongodb://localhost:28012
1. After launching the verifier (see above), you can send it requests to get it to start verifying. The verification process is started by using the `check`command. An [optional `filter` parameter](#document-filtering) can be passed within the `check` request body to only check documents within that filter. The verification process will keep running until you tell the verifier to stop. It will keep track of the inconsistencies it has found and will keep checking those inconsistencies hoping that eventually they will resolve.

```
curl -H "Content-Type: application/json" -X POST -d '{}' http://127.0.0.1:27020/api/v1/check
curl -H "Content-Type: application/json" -d '{}' http://127.0.0.1:27020/api/v1/check
```


Expand Down
4 changes: 4 additions & 0 deletions internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEve
case "replace":
fallthrough
case "update":
if err := verifier.eventRecorder.AddEvent(changeEvent); err != nil {
return errors.Wrapf(err, "failed to augment stats with change event: %+v", *changeEvent)
}

return verifier.InsertChangeEventRecheckDoc(ctx, changeEvent)
default:
return UnknownEventError{Event: changeEvent}
Expand Down
1 change: 1 addition & 0 deletions internal/verifier/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
// Now enter the multi-generational steady check state
for {
verifier.generationStartTime = time.Now()
verifier.eventRecorder.Reset()

err := verifier.CheckWorker(ctx)
if err != nil {
Expand Down
91 changes: 91 additions & 0 deletions internal/verifier/event_recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package verifier

import (
"github.com/10gen/migration-verifier/msync"
"github.com/pkg/errors"
"golang.org/x/exp/maps"
)

type eventRecorderMap = map[string]PerNamespaceStats

// EventRecorder maintains statistics on change events.
type EventRecorder struct {
guard *msync.DataGuard[eventRecorderMap]
}

// PerNamespaceStats records a given namespace’s event totals by optype.
type PerNamespaceStats struct {
Insert int
Update int
Replace int
Delete int
}

func (pns PerNamespaceStats) Total() int {
return pns.Insert + pns.Update + pns.Replace + pns.Delete
}

// NewEventRecorder creates and returns a new EventRecorder.
func NewEventRecorder() *EventRecorder {
return &EventRecorder{
guard: msync.NewDataGuard(eventRecorderMap{}),
}
}

func (er EventRecorder) Reset() {
er.guard.Store(func(m eventRecorderMap) eventRecorderMap {
return eventRecorderMap{}
})
}

// AddEvent adds a ParsedEvent to the EventRecorder’s statistics.
func (er EventRecorder) AddEvent(changeEvent *ParsedEvent) error {
// This shouldn’t happen, but just in case:
if changeEvent.Ns == nil {
return errors.Errorf("Change event lacks a namespace: %+v", changeEvent)
}

nsStr := changeEvent.Ns.DB + "." + changeEvent.Ns.Coll

var err error

er.guard.Store(func(m eventRecorderMap) eventRecorderMap {
if _, exists := m[nsStr]; !exists {
m[nsStr] = PerNamespaceStats{}
}

nsStats := m[nsStr]

switch changeEvent.OpType {
case "insert":
nsStats.Insert++
case "update":
nsStats.Update++
case "replace":
nsStats.Replace++
case "delete":
nsStats.Delete++
default:
err = errors.Errorf("Event recorder received event with unknown optype: %+v", *changeEvent)
}

m[nsStr] = nsStats

return m
})

return err
}

// Read returns a map of the tracked change events. The map
// indexes on namespace then event optype. Each namespace will
// have `insert`, `update`
func (er EventRecorder) Read() eventRecorderMap {
var theCopy eventRecorderMap

er.guard.Load(func(m eventRecorderMap) {
theCopy = maps.Clone(m)
})

return theCopy
}
11 changes: 11 additions & 0 deletions internal/verifier/migration_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ type Verifier struct {
numWorkers int
failureDisplaySize int64

eventRecorder *EventRecorder

// Used only with generation 0 to defer the first
// progress report until after we’ve finished partitioning
// every collection.
Expand Down Expand Up @@ -195,6 +197,10 @@ func NewVerifier(settings VerifierSettings) *Verifier {
changeStreamErrChan: make(chan error),
changeStreamDoneChan: make(chan struct{}),
readConcernSetting: readConcern,

// This will get recreated once gen0 starts, but we want it
// here in case the change streams gets an event before then.
eventRecorder: NewEventRecorder(),
}
}

Expand Down Expand Up @@ -223,6 +229,9 @@ func (verifier *Verifier) SetFailureDisplaySize(size int64) {
}

func (verifier *Verifier) WritesOff(ctx context.Context) {
verifier.logger.Debug().
Msg("WritesOff called.")

verifier.mux.Lock()
verifier.writesOff = true
verifier.mux.Unlock()
Expand Down Expand Up @@ -1300,6 +1309,8 @@ func (verifier *Verifier) PrintVerificationSummary(ctx context.Context, genstatu
verifier.logger.Err(err).Msgf("Failed to report per-namespace statistics")
}

verifier.printChangeEventStatistics(strBuilder)

var statusLine string

if hasTasks {
Expand Down
20 changes: 15 additions & 5 deletions internal/verifier/recheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/10gen/migration-verifier/internal/types"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
Expand Down Expand Up @@ -34,7 +35,11 @@ type RecheckDoc struct {
func (verifier *Verifier) InsertFailedCompareRecheckDocs(
namespace string, documentIDs []interface{}, dataSizes []int) error {
dbName, collName := SplitNamespace(namespace)
return verifier.insertRecheckDocs(context.Background(),

verifier.mux.Lock()
defer verifier.mux.Unlock()

return verifier.insertRecheckDocsUnderLock(context.Background(),
dbName, collName, documentIDs, dataSizes)
}

Expand All @@ -48,15 +53,20 @@ func (verifier *Verifier) InsertChangeEventRecheckDoc(ctx context.Context, chang
// total data size for noninitial generations in the log.
dataSizes := []int{maxBSONObjSize}

return verifier.insertRecheckDocs(
verifier.mux.Lock()
defer verifier.mux.Unlock()

if err := verifier.eventRecorder.AddEvent(changeEvent); err != nil {
return errors.Wrapf(err, "failed to augment stats with change event: %+v", *changeEvent)
}

return verifier.insertRecheckDocsUnderLock(
ctx, changeEvent.Ns.DB, changeEvent.Ns.Coll, documentIDs, dataSizes)
}

func (verifier *Verifier) insertRecheckDocs(
func (verifier *Verifier) insertRecheckDocsUnderLock(
ctx context.Context,
dbName, collName string, documentIDs []interface{}, dataSizes []int) error {
verifier.mux.Lock()
defer verifier.mux.Unlock()

generation, _ := verifier.getGenerationWhileLocked()

Expand Down
32 changes: 23 additions & 9 deletions internal/verifier/recheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (suite *MultiMetaVersionTestSuite) TestFailedCompareThenReplace() {
)

event := ParsedEvent{
OpType: "insert",
DocKey: DocKey{
ID: "theDocID",
},
Expand Down Expand Up @@ -95,7 +96,7 @@ func (suite *MultiMetaVersionTestSuite) TestLargeIDInsertions() {
id3 := strings.Repeat("c", overlyLarge)
ids := []interface{}{id1, id2, id3}
dataSizes := []int{overlyLarge, overlyLarge, overlyLarge}
err := verifier.insertRecheckDocs(ctx, "testDB", "testColl", ids, dataSizes)
err := insertRecheckDocs(ctx, verifier, "testDB", "testColl", ids, dataSizes)
suite.Require().NoError(err)

d1 := RecheckDoc{
Expand Down Expand Up @@ -156,7 +157,7 @@ func (suite *MultiMetaVersionTestSuite) TestLargeDataInsertions() {
id3 := "c"
ids := []interface{}{id1, id2, id3}
dataSizes := []int{400 * 1024, 700 * 1024, 1024}
err := verifier.insertRecheckDocs(ctx, "testDB", "testColl", ids, dataSizes)
err := insertRecheckDocs(ctx, verifier, "testDB", "testColl", ids, dataSizes)
suite.Require().NoError(err)
d1 := RecheckDoc{
PrimaryKey: RecheckPrimaryKey{
Expand Down Expand Up @@ -217,13 +218,13 @@ func (suite *MultiMetaVersionTestSuite) TestMultipleNamespaces() {
id3 := "c"
ids := []interface{}{id1, id2, id3}
dataSizes := []int{1000, 1000, 1000}
err := verifier.insertRecheckDocs(ctx, "testDB1", "testColl1", ids, dataSizes)
err := insertRecheckDocs(ctx, verifier, "testDB1", "testColl1", ids, dataSizes)
suite.Require().NoError(err)
err = verifier.insertRecheckDocs(ctx, "testDB1", "testColl2", ids, dataSizes)
err = insertRecheckDocs(ctx, verifier, "testDB1", "testColl2", ids, dataSizes)
suite.Require().NoError(err)
err = verifier.insertRecheckDocs(ctx, "testDB2", "testColl1", ids, dataSizes)
err = insertRecheckDocs(ctx, verifier, "testDB2", "testColl1", ids, dataSizes)
suite.Require().NoError(err)
err = verifier.insertRecheckDocs(ctx, "testDB2", "testColl2", ids, dataSizes)
err = insertRecheckDocs(ctx, verifier, "testDB2", "testColl2", ids, dataSizes)
suite.Require().NoError(err)

verifier.generation++
Expand Down Expand Up @@ -267,17 +268,17 @@ func (suite *MultiMetaVersionTestSuite) TestGenerationalClear() {
id2 := "b"
ids := []interface{}{id1, id2}
dataSizes := []int{1000, 1000}
err := verifier.insertRecheckDocs(ctx, "testDB", "testColl", ids, dataSizes)
err := insertRecheckDocs(ctx, verifier, "testDB", "testColl", ids, dataSizes)
suite.Require().NoError(err)

verifier.generation++

err = verifier.insertRecheckDocs(ctx, "testDB", "testColl", ids, dataSizes)
err = insertRecheckDocs(ctx, verifier, "testDB", "testColl", ids, dataSizes)
suite.Require().NoError(err)

verifier.generation++

err = verifier.insertRecheckDocs(ctx, "testDB", "testColl", ids, dataSizes)
err = insertRecheckDocs(ctx, verifier, "testDB", "testColl", ids, dataSizes)
suite.Require().NoError(err)

d1 := RecheckDoc{
Expand Down Expand Up @@ -326,3 +327,16 @@ func (suite *MultiMetaVersionTestSuite) TestGenerationalClear() {
results = suite.fetchRecheckDocs(ctx, verifier)
suite.ElementsMatch([]interface{}{}, results)
}

func insertRecheckDocs(
ctx context.Context,
verifier *Verifier,
dbName, collName string,
documentIDs []any,
dataSizes []int,
) error {
verifier.mux.Lock()
defer verifier.mux.Unlock()

return verifier.insertRecheckDocsUnderLock(ctx, dbName, collName, documentIDs, dataSizes)
}
63 changes: 63 additions & 0 deletions internal/verifier/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@ package verifier
import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"time"

"github.com/10gen/migration-verifier/internal/reportutils"
"github.com/10gen/migration-verifier/internal/types"
"github.com/olekukonko/tablewriter"
"golang.org/x/exp/maps"
)

const changeEventsTableMaxSize = 10

// NOTE: Each of the following should print one trailing and one final
// newline.

Expand Down Expand Up @@ -362,3 +367,61 @@ func (verifier *Verifier) printMismatchInvestigationNotes(strBuilder *strings.Bu
strBuilder.WriteString(line + "\n")
}
}

func (verifier *Verifier) printChangeEventStatistics(builder *strings.Builder) {
nsStats := verifier.eventRecorder.Read()

activeNamespacesCount := len(nsStats)

totalEvents := 0
nsTotals := map[string]int{}
for ns, events := range nsStats {
nsTotals[ns] = events.Total()
totalEvents += nsTotals[ns]
}

eventsDescr := "none"
if totalEvents > 0 {
eventsDescr = fmt.Sprintf("%d total, across %d namespace(s)", totalEvents, activeNamespacesCount)
}

builder.WriteString(fmt.Sprintf("\nChange events this generation: %s\n", eventsDescr))

if totalEvents == 0 {
return
}

reverseSortedNamespaces := maps.Keys(nsTotals)
sort.Slice(
reverseSortedNamespaces,
func(i, j int) bool {
return nsTotals[reverseSortedNamespaces[i]] > nsTotals[reverseSortedNamespaces[j]]
},
)

// Only report the busiest namespaces.
if len(reverseSortedNamespaces) > changeEventsTableMaxSize {
reverseSortedNamespaces = reverseSortedNamespaces[:changeEventsTableMaxSize]
}

table := tablewriter.NewWriter(builder)
table.SetHeader([]string{"Namespace", "Insert", "Update", "Replace", "Delete", "Total"})

for _, ns := range reverseSortedNamespaces {
curNsStats := nsStats[ns]

table.Append(
append(
[]string{ns},
strconv.Itoa(curNsStats.Insert),
strconv.Itoa(curNsStats.Update),
strconv.Itoa(curNsStats.Replace),
strconv.Itoa(curNsStats.Delete),
strconv.Itoa(curNsStats.Total()),
),
)
}

builder.WriteString("\nMost frequently-changing namespaces:\n")
table.Render()
}
4 changes: 4 additions & 0 deletions internal/verifier/unit_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,16 @@ func (suite *WithMongodsTestSuite) TearDownSuite() {
}

func (suite *WithMongodsTestSuite) TearDownTest() {
suite.T().Logf("Tearing down test %#q", suite.T().Name())

ctx := context.Background()
for _, client := range []*mongo.Client{suite.srcMongoClient, suite.dstMongoClient, suite.metaMongoClient} {
dbNames, err := client.ListDatabaseNames(ctx, bson.D{})
suite.Require().NoError(err)
for _, dbName := range dbNames {
if !suite.initialDbNames[dbName] {
suite.T().Logf("Dropping database %#q, which seems to have been created during test %#q.", dbName, suite.T().Name())

err = client.Database(dbName).Drop(ctx)
suite.Require().NoError(err)
}
Expand Down
Loading
Loading