Skip to content

Commit 2c7fc0c

Browse files
authored
Add a worker tracker to migration-verifier’s progress logs. (#60)
This changeset adds a table of worker thread statuses to the progress logs. This will allow at-a-glance monitoring of how long different tasks have been in progress and will usefully surface any hung threads. This also removes the “No tasks found. Sleeping.” log messages, which convey little but add a lot of clutter.
1 parent 4f3d3aa commit 2c7fc0c

File tree

4 files changed

+132
-0
lines changed

4 files changed

+132
-0
lines changed

internal/verifier/check.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,8 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error {
448448
)
449449
}
450450

451+
verifier.workerTracker.Set(workerNum, *task)
452+
451453
switch task.Type {
452454
case verificationTaskVerifyCollection:
453455
err := verifier.ProcessCollectionVerificationTask(ctx, workerNum, task)

internal/verifier/migration_verifier.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ type Verifier struct {
142142

143143
pprofInterval time.Duration
144144

145+
workerTracker *WorkerTracker
146+
145147
verificationStatusCheckInterval time.Duration
146148
}
147149

@@ -206,6 +208,8 @@ func NewVerifier(settings VerifierSettings) *Verifier {
206208
// here in case the change streams gets an event before then.
207209
eventRecorder: NewEventRecorder(),
208210

211+
workerTracker: NewWorkerTracker(NumWorkers),
212+
209213
verificationStatusCheckInterval: 15 * time.Second,
210214
}
211215
}
@@ -1515,6 +1519,14 @@ func (verifier *Verifier) PrintVerificationSummary(ctx context.Context, genstatu
15151519

15161520
verifier.printChangeEventStatistics(strBuilder)
15171521

1522+
// Only print the worker status table if debug logging is enabled.
1523+
if verifier.logger.Debug().Enabled() {
1524+
switch genstatus {
1525+
case Gen0MetadataAnalysisComplete, GenerationInProgress:
1526+
verifier.printWorkerStatus(strBuilder)
1527+
}
1528+
}
1529+
15181530
var statusLine string
15191531

15201532
if hasTasks {

internal/verifier/summary.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/10gen/migration-verifier/internal/reportutils"
1616
"github.com/10gen/migration-verifier/internal/types"
1717
"github.com/olekukonko/tablewriter"
18+
"go.mongodb.org/mongo-driver/bson/primitive"
1819
"golang.org/x/exp/maps"
1920
)
2021

@@ -425,3 +426,48 @@ func (verifier *Verifier) printChangeEventStatistics(builder *strings.Builder) {
425426
builder.WriteString("\nMost frequently-changing namespaces:\n")
426427
table.Render()
427428
}
429+
430+
func (verifier *Verifier) printWorkerStatus(builder *strings.Builder) {
431+
432+
table := tablewriter.NewWriter(builder)
433+
table.SetHeader([]string{"Thread #", "Namespace", "Task", "Time Elapsed"})
434+
435+
wsmap := verifier.workerTracker.Load()
436+
437+
activeThreadCount := 0
438+
for w := 0; w <= verifier.numWorkers; w++ {
439+
if wsmap[w].TaskID == nil {
440+
continue
441+
}
442+
443+
activeThreadCount++
444+
445+
var taskIdStr string
446+
447+
switch id := wsmap[w].TaskID.(type) {
448+
case primitive.ObjectID:
449+
theBytes, _ := id.MarshalText()
450+
451+
taskIdStr = string(theBytes)
452+
default:
453+
taskIdStr = fmt.Sprintf("%s", wsmap[w].TaskID)
454+
}
455+
456+
table.Append(
457+
[]string{
458+
strconv.Itoa(w),
459+
wsmap[w].Namespace,
460+
taskIdStr,
461+
reportutils.DurationToHMS(time.Since(wsmap[w].StartTime)),
462+
},
463+
)
464+
}
465+
466+
builder.WriteString(fmt.Sprintf(
467+
"\nActive worker threads (%d of %d):\n",
468+
activeThreadCount,
469+
verifier.numWorkers,
470+
))
471+
472+
table.Render()
473+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package verifier
2+
3+
import (
4+
"time"
5+
6+
"github.com/10gen/migration-verifier/msync"
7+
"golang.org/x/exp/maps"
8+
)
9+
10+
// WorkerTracker holds certain data points about each worker thread
11+
// in a check generation. It is thread-safe.
12+
type WorkerTracker struct {
13+
guard *msync.DataGuard[WorkerStatusMap]
14+
}
15+
16+
// WorkerStatusMap represents the status of each worker,
17+
// indexed by worker number (which start at 0).
18+
type WorkerStatusMap = map[int]WorkerStatus
19+
20+
// WorkerStatus details the work that an individual worker thread
21+
// is doing.
22+
type WorkerStatus struct {
23+
TaskID any
24+
TaskType verificationTaskType
25+
Namespace string
26+
StartTime time.Time
27+
}
28+
29+
// NewWorkerTracker creates and returns a WorkerTracker.
30+
func NewWorkerTracker(workersCount int) *WorkerTracker {
31+
wsmap := WorkerStatusMap{}
32+
for i := 0; i < workersCount; i++ {
33+
wsmap[i] = WorkerStatus{}
34+
}
35+
return &WorkerTracker{
36+
guard: msync.NewDataGuard(wsmap),
37+
}
38+
}
39+
40+
// Set updates the worker’s state in the WorkerTracker.
41+
func (wt *WorkerTracker) Set(workerNum int, task VerificationTask) {
42+
wt.guard.Store(func(m WorkerStatusMap) WorkerStatusMap {
43+
m[workerNum] = WorkerStatus{
44+
TaskID: task.PrimaryKey,
45+
TaskType: task.Type,
46+
Namespace: task.QueryFilter.Namespace,
47+
StartTime: time.Now(),
48+
}
49+
50+
return m
51+
})
52+
}
53+
54+
// Unset tells the WorkerTracker that the worker is now inactive.
55+
func (wt *WorkerTracker) Unset(workerNum int) {
56+
wt.guard.Store(func(m WorkerStatusMap) WorkerStatusMap {
57+
m[workerNum] = WorkerStatus{}
58+
59+
return m
60+
})
61+
}
62+
63+
// Load duplicates and returns the WorkerTracker’s internal
64+
// state map.
65+
func (wt *WorkerTracker) Load() WorkerStatusMap {
66+
var wtmap WorkerStatusMap
67+
wt.guard.Load(func(m map[int]WorkerStatus) {
68+
wtmap = maps.Clone(m)
69+
})
70+
71+
return wtmap
72+
}

0 commit comments

Comments
 (0)