Skip to content

Commit da4576d

Browse files
committed
initial code
1 parent a2fc487 commit da4576d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

95 files changed

+37701
-2815
lines changed

go.mod

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,25 @@ module github.com/10gen/migration-verifier
33
go 1.18
44

55
require (
6+
github.com/cespare/permute/v2 v2.0.0-beta2
67
github.com/deckarep/golang-set/v2 v2.3.0
78
github.com/dustin/go-humanize v1.0.1
89
github.com/gin-gonic/gin v1.8.1
910
github.com/google/uuid v1.3.0
1011
github.com/olekukonko/tablewriter v0.0.5
1112
github.com/pkg/errors v0.9.1
1213
github.com/rs/zerolog v1.28.0
14+
github.com/samber/lo v1.47.0
1315
github.com/stretchr/testify v1.8.0
1416
github.com/urfave/cli v1.22.9
1517
go.mongodb.org/mongo-driver v1.10.2
16-
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
17-
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
18+
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
19+
golang.org/x/sync v0.8.0
1820
gopkg.in/natefinch/lumberjack.v2 v2.0.0
1921
)
2022

2123
require (
2224
github.com/BurntSushi/toml v0.3.1 // indirect
23-
github.com/cespare/permute/v2 v2.0.0-beta2 // indirect
2425
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
2526
github.com/davecgh/go-spew v1.1.1 // indirect
2627
github.com/gin-contrib/sse v0.1.0 // indirect
@@ -49,8 +50,7 @@ require (
4950
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
5051
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
5152
golang.org/x/sys v0.1.0 // indirect
52-
golang.org/x/text v0.3.7 // indirect
53-
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
53+
golang.org/x/text v0.16.0 // indirect
5454
google.golang.org/protobuf v1.28.0 // indirect
5555
gopkg.in/yaml.v2 v2.4.0 // indirect
5656
gopkg.in/yaml.v3 v3.0.1 // indirect

internal/verifier/change_stream.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ package verifier
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
76
"time"
87

8+
"github.com/pkg/errors"
9+
910
"github.com/10gen/migration-verifier/internal/keystring"
1011
"go.mongodb.org/mongo-driver/bson"
1112
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -48,6 +49,10 @@ func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEve
4849
case "replace":
4950
fallthrough
5051
case "update":
52+
if err := verifier.generationEventRecorder.AddEvent(changeEvent); err != nil {
53+
return errors.Wrapf(err, "failed to augment stats with change event: %+v", *changeEvent)
54+
}
55+
5156
return verifier.InsertChangeEventRecheckDoc(ctx, changeEvent)
5257
default:
5358
return errors.New(`Not supporting: "` + changeEvent.OpType + `" events`)

internal/verifier/check.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
190190
// Now enter the multi-generational steady check state
191191
for {
192192
verifier.generationStartTime = time.Now()
193+
verifier.generationEventRecorder = NewEventRecorder()
193194

194195
err := verifier.CheckWorker(ctx)
195196
if err != nil {
@@ -227,6 +228,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
227228
verifier.lastGeneration = true
228229
}
229230
verifier.generation++
231+
230232
verifier.phase = Recheck
231233
err = verifier.GenerateRecheckTasks(ctx)
232234
if err != nil {
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package verifier
2+
3+
import (
4+
"github.com/10gen/migration-verifier/msync"
5+
"github.com/pkg/errors"
6+
"golang.org/x/exp/maps"
7+
)
8+
9+
type eventRecorderMap = map[string]PerNamespaceStats
10+
11+
// EventRecorder maintains statistics on change events.
12+
type EventRecorder struct {
13+
guard *msync.DataGuard[eventRecorderMap]
14+
}
15+
16+
// PerNamespaceStats records a given namespace’s event totals by optype.
17+
type PerNamespaceStats struct {
18+
Insert int
19+
Update int
20+
Replace int
21+
Delete int
22+
}
23+
24+
func (pns PerNamespaceStats) Total() int {
25+
return pns.Insert + pns.Update + pns.Replace + pns.Delete
26+
}
27+
28+
// NewEventRecorder creates and returns a new EventRecorder.
29+
func NewEventRecorder() *EventRecorder {
30+
return &EventRecorder{
31+
guard: msync.NewDataGuard(eventRecorderMap{}),
32+
}
33+
}
34+
35+
// AddEvent adds a ParsedEvent to the EventRecorder’s statistics.
36+
func (et EventRecorder) AddEvent(changeEvent *ParsedEvent) error {
37+
// This shouldn’t happen, but just in case:
38+
if changeEvent.Ns == nil {
39+
return errors.Errorf("Change event lacks a namespace: %+v", changeEvent)
40+
}
41+
42+
nsStr := changeEvent.Ns.DB + "." + changeEvent.Ns.Coll
43+
44+
var err error
45+
46+
et.guard.Store(func(m eventRecorderMap) eventRecorderMap {
47+
if _, exists := m[nsStr]; !exists {
48+
m[nsStr] = PerNamespaceStats{}
49+
}
50+
51+
nsStats := m[nsStr]
52+
53+
switch changeEvent.OpType {
54+
case "insert":
55+
nsStats.Insert++
56+
case "update":
57+
nsStats.Update++
58+
case "replace":
59+
nsStats.Replace++
60+
case "delete":
61+
nsStats.Delete++
62+
default:
63+
err = errors.Errorf("Event recorder received event with unknown optype: %+v", *changeEvent)
64+
}
65+
66+
m[nsStr] = nsStats
67+
68+
return m
69+
})
70+
71+
return err
72+
}
73+
74+
// Read returns a map of the tracked change events. The map
75+
// indexes on namespace then event optype. Each namespace will
76+
// have `insert`, `update`
77+
func (et EventRecorder) Read() eventRecorderMap {
78+
var theCopy eventRecorderMap
79+
80+
et.guard.Load(func(m eventRecorderMap) {
81+
theCopy = maps.Clone(m)
82+
})
83+
84+
return theCopy
85+
}

internal/verifier/migration_verifier.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ type Verifier struct {
9595
numWorkers int
9696
failureDisplaySize int64
9797

98+
generationEventRecorder *EventRecorder
99+
98100
// Used only with generation 0 to defer the first
99101
// progress report until after we’ve finished partitioning
100102
// every collection.
@@ -1400,6 +1402,8 @@ func (verifier *Verifier) PrintVerificationSummary(ctx context.Context, genstatu
14001402
verifier.logger.Err(err).Msgf("Failed to report per-namespace statistics")
14011403
}
14021404

1405+
verifier.printChangeEventStatistics(strBuilder)
1406+
14031407
var statusLine string
14041408

14051409
if hasTasks {

internal/verifier/summary.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,16 @@ package verifier
77
import (
88
"context"
99
"fmt"
10+
"slices"
11+
"strconv"
1012
"strings"
1113
"time"
1214

1315
"github.com/10gen/migration-verifier/internal/reportutils"
1416
"github.com/10gen/migration-verifier/internal/types"
1517
"github.com/olekukonko/tablewriter"
18+
"github.com/samber/lo"
19+
"golang.org/x/exp/maps"
1620
)
1721

1822
// NOTE: Each of the following should print one trailing and one final
@@ -362,3 +366,67 @@ func (verifier *Verifier) printMismatchInvestigationNotes(strBuilder *strings.Bu
362366
strBuilder.WriteString(line + "\n")
363367
}
364368
}
369+
370+
func (verifier *Verifier) printChangeEventStatistics(builder *strings.Builder) {
371+
nsStats := verifier.generationEventRecorder.Read()
372+
373+
activeNamespacesCount := len(nsStats)
374+
375+
totalEvents := 0
376+
nsTotals := map[string]int{}
377+
for ns, events := range nsStats {
378+
nsTotals[ns] = events.Total()
379+
totalEvents += nsTotals[ns]
380+
}
381+
382+
eventsDescr := lo.Ternary(
383+
totalEvents == 0,
384+
"0",
385+
fmt.Sprintf("%d total, across %d namespace(s)", totalEvents, activeNamespacesCount),
386+
)
387+
builder.WriteString(fmt.Sprintf("Change events this generation: %s\n", eventsDescr))
388+
389+
if totalEvents == 0 {
390+
return
391+
}
392+
393+
sortedNamespaces := maps.Keys(nsTotals)
394+
slices.SortFunc(
395+
sortedNamespaces,
396+
func(ns1, ns2 string) int {
397+
if nsTotals[ns1] < nsTotals[ns2] {
398+
return 1
399+
}
400+
401+
if nsTotals[ns1] > nsTotals[ns2] {
402+
return -1
403+
}
404+
405+
return 0
406+
},
407+
)
408+
409+
// Only report the busiest namespaces.
410+
sortedNamespaces = sortedNamespaces[:10]
411+
412+
table := tablewriter.NewWriter(builder)
413+
table.SetHeader([]string{"Namespace", "Insert", "Update", "Replace", "Delete", "Total"})
414+
415+
for _, ns := range sortedNamespaces {
416+
curNsStats := nsStats[ns]
417+
418+
table.Append(
419+
append(
420+
[]string{ns},
421+
strconv.Itoa(curNsStats.Insert),
422+
strconv.Itoa(curNsStats.Update),
423+
strconv.Itoa(curNsStats.Replace),
424+
strconv.Itoa(curNsStats.Delete),
425+
strconv.Itoa(curNsStats.Total()),
426+
),
427+
)
428+
}
429+
430+
builder.WriteString("\nMost frequently-changing namespaces:\n")
431+
table.Render()
432+
}

msync/dataguard.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package msync
2+
3+
import "sync"
4+
5+
// DataGuard encapsulates a value with a mutex to ensure that anything that
6+
// accesses it does so in a race-safe way.
7+
type DataGuard[T any] struct {
8+
mutex sync.RWMutex
9+
value T
10+
}
11+
12+
// NewDataGuard returns a new DataGuard that wraps the given value.
13+
func NewDataGuard[T any](val T) *DataGuard[T] {
14+
return &DataGuard[T]{
15+
value: val,
16+
}
17+
}
18+
19+
// Load runs the given callback, passing it the DataGuard’s stored value.
20+
func (l *DataGuard[T]) Load(cb func(T)) {
21+
l.mutex.RLock()
22+
defer l.mutex.RUnlock()
23+
24+
cb(l.value)
25+
}
26+
27+
// Store is like Load but will replace the DataGuard’s stored value with the
28+
// callback’s return.
29+
func (l *DataGuard[T]) Store(cb func(T) T) {
30+
l.mutex.Lock()
31+
defer l.mutex.Unlock()
32+
33+
l.value = cb(l.value)
34+
}

msync/dataguard_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package msync
2+
3+
import (
4+
"sync"
5+
"testing"
6+
7+
"github.com/stretchr/testify/suite"
8+
)
9+
10+
type unitTestSuite struct {
11+
suite.Suite
12+
}
13+
14+
func TestUnitTestSuite(t *testing.T) {
15+
suite.Run(t, &unitTestSuite{})
16+
}
17+
18+
func (s *unitTestSuite) TestDataGuard() {
19+
l := NewDataGuard(42)
20+
21+
var wg sync.WaitGroup
22+
for i := 0; i < 100; i++ {
23+
var delta int
24+
if i%2 == 0 {
25+
delta = 2
26+
} else {
27+
delta = -1
28+
}
29+
30+
wg.Add(1)
31+
go func() {
32+
defer wg.Done()
33+
l.Store(func(v int) int {
34+
return v + delta
35+
})
36+
}()
37+
}
38+
wg.Wait()
39+
40+
l.Load(func(v int) {
41+
s.Require().Equal(92, v)
42+
})
43+
}

vendor/github.com/samber/lo/.gitignore

Lines changed: 38 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/samber/lo/.travis.yml

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)