Skip to content

Commit df028f4

Browse files
committed
Merge branch 'main' into REP-5329-retry-consistently
2 parents c55f11d + 2c7fc0c commit df028f4

File tree

4 files changed

+132
-0
lines changed

4 files changed

+132
-0
lines changed

internal/verifier/check.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,8 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error {
484484
continue
485485
}
486486

487+
verifier.workerTracker.Set(workerNum, *task)
488+
487489
switch task.Type {
488490
case verificationTaskVerifyCollection:
489491
err := verifier.ProcessCollectionVerificationTask(ctx, workerNum, &task)

internal/verifier/migration_verifier.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ type Verifier struct {
142142

143143
pprofInterval time.Duration
144144

145+
workerTracker *WorkerTracker
146+
145147
verificationStatusCheckInterval time.Duration
146148
}
147149

@@ -206,6 +208,8 @@ func NewVerifier(settings VerifierSettings) *Verifier {
206208
// here in case the change streams gets an event before then.
207209
eventRecorder: NewEventRecorder(),
208210

211+
workerTracker: NewWorkerTracker(NumWorkers),
212+
209213
verificationStatusCheckInterval: 15 * time.Second,
210214
}
211215
}
@@ -1518,6 +1522,14 @@ func (verifier *Verifier) PrintVerificationSummary(ctx context.Context, genstatu
15181522

15191523
verifier.printChangeEventStatistics(strBuilder)
15201524

1525+
// Only print the worker status table if debug logging is enabled.
1526+
if verifier.logger.Debug().Enabled() {
1527+
switch genstatus {
1528+
case Gen0MetadataAnalysisComplete, GenerationInProgress:
1529+
verifier.printWorkerStatus(strBuilder)
1530+
}
1531+
}
1532+
15211533
var statusLine string
15221534

15231535
if hasTasks {

internal/verifier/summary.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/10gen/migration-verifier/internal/reportutils"
1616
"github.com/10gen/migration-verifier/internal/types"
1717
"github.com/olekukonko/tablewriter"
18+
"go.mongodb.org/mongo-driver/bson/primitive"
1819
"golang.org/x/exp/maps"
1920
)
2021

@@ -436,3 +437,48 @@ func (verifier *Verifier) printChangeEventStatistics(builder *strings.Builder) {
436437
builder.WriteString("\nMost frequently-changing namespaces:\n")
437438
table.Render()
438439
}
440+
441+
func (verifier *Verifier) printWorkerStatus(builder *strings.Builder) {
442+
443+
table := tablewriter.NewWriter(builder)
444+
table.SetHeader([]string{"Thread #", "Namespace", "Task", "Time Elapsed"})
445+
446+
wsmap := verifier.workerTracker.Load()
447+
448+
activeThreadCount := 0
449+
for w := 0; w <= verifier.numWorkers; w++ {
450+
if wsmap[w].TaskID == nil {
451+
continue
452+
}
453+
454+
activeThreadCount++
455+
456+
var taskIdStr string
457+
458+
switch id := wsmap[w].TaskID.(type) {
459+
case primitive.ObjectID:
460+
theBytes, _ := id.MarshalText()
461+
462+
taskIdStr = string(theBytes)
463+
default:
464+
taskIdStr = fmt.Sprintf("%s", wsmap[w].TaskID)
465+
}
466+
467+
table.Append(
468+
[]string{
469+
strconv.Itoa(w),
470+
wsmap[w].Namespace,
471+
taskIdStr,
472+
reportutils.DurationToHMS(time.Since(wsmap[w].StartTime)),
473+
},
474+
)
475+
}
476+
477+
builder.WriteString(fmt.Sprintf(
478+
"\nActive worker threads (%d of %d):\n",
479+
activeThreadCount,
480+
verifier.numWorkers,
481+
))
482+
483+
table.Render()
484+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package verifier
2+
3+
import (
4+
"time"
5+
6+
"github.com/10gen/migration-verifier/msync"
7+
"golang.org/x/exp/maps"
8+
)
9+
10+
// WorkerTracker holds certain data points about each worker thread
11+
// in a check generation. It is thread-safe.
12+
type WorkerTracker struct {
13+
guard *msync.DataGuard[WorkerStatusMap]
14+
}
15+
16+
// WorkerStatusMap represents the status of each worker,
17+
// indexed by worker number (which start at 0).
18+
type WorkerStatusMap = map[int]WorkerStatus
19+
20+
// WorkerStatus details the work that an individual worker thread
21+
// is doing.
22+
type WorkerStatus struct {
23+
TaskID any
24+
TaskType verificationTaskType
25+
Namespace string
26+
StartTime time.Time
27+
}
28+
29+
// NewWorkerTracker creates and returns a WorkerTracker.
30+
func NewWorkerTracker(workersCount int) *WorkerTracker {
31+
wsmap := WorkerStatusMap{}
32+
for i := 0; i < workersCount; i++ {
33+
wsmap[i] = WorkerStatus{}
34+
}
35+
return &WorkerTracker{
36+
guard: msync.NewDataGuard(wsmap),
37+
}
38+
}
39+
40+
// Set updates the worker’s state in the WorkerTracker.
41+
func (wt *WorkerTracker) Set(workerNum int, task VerificationTask) {
42+
wt.guard.Store(func(m WorkerStatusMap) WorkerStatusMap {
43+
m[workerNum] = WorkerStatus{
44+
TaskID: task.PrimaryKey,
45+
TaskType: task.Type,
46+
Namespace: task.QueryFilter.Namespace,
47+
StartTime: time.Now(),
48+
}
49+
50+
return m
51+
})
52+
}
53+
54+
// Unset tells the WorkerTracker that the worker is now inactive.
55+
func (wt *WorkerTracker) Unset(workerNum int) {
56+
wt.guard.Store(func(m WorkerStatusMap) WorkerStatusMap {
57+
m[workerNum] = WorkerStatus{}
58+
59+
return m
60+
})
61+
}
62+
63+
// Load duplicates and returns the WorkerTracker’s internal
64+
// state map.
65+
func (wt *WorkerTracker) Load() WorkerStatusMap {
66+
var wtmap WorkerStatusMap
67+
wt.guard.Load(func(m map[int]WorkerStatus) {
68+
wtmap = maps.Clone(m)
69+
})
70+
71+
return wtmap
72+
}

0 commit comments

Comments
 (0)