Skip to content

Commit b7a46a6

Browse files
authored
REP-6438 Include currently-running task stats in baseline logs. (#130)
Currently the non-debug logs offer no insight into currently-active tasks’ progress. So if tasks take a long time, non-debug logging will show now progress until one of those tasks finishes. This amends logging thus: - Document & byte tallies now incorporate in-progress tasks’ document & byte counts. - “Active document comparison threads” is now printed. - Each line of the “Namespaces in progress” table shows that namespace’s count of active threads.
1 parent 87b4c56 commit b7a46a6

File tree

5 files changed

+98
-43
lines changed

5 files changed

+98
-43
lines changed

internal/verifier/compare.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"github.com/10gen/migration-verifier/chanutil"
1010
"github.com/10gen/migration-verifier/contextplus"
1111
"github.com/10gen/migration-verifier/dockey"
12-
"github.com/10gen/migration-verifier/internal/reportutils"
1312
"github.com/10gen/migration-verifier/internal/retry"
1413
"github.com/10gen/migration-verifier/internal/types"
1514
"github.com/10gen/migration-verifier/internal/util"
@@ -231,13 +230,10 @@ func (verifier *Verifier) compareDocsFromChannels(
231230

232231
srcDocCount++
233232
srcByteCount += types.ByteCount(len(srcDocWithTs.doc))
234-
verifier.workerTracker.SetDetail(
233+
verifier.workerTracker.SetSrcCounts(
235234
workerNum,
236-
fmt.Sprintf(
237-
"%s documents (%s)",
238-
reportutils.FmtReal(srcDocCount),
239-
reportutils.FmtBytes(srcByteCount),
240-
),
235+
srcDocCount,
236+
srcByteCount,
241237
)
242238
}
243239

internal/verifier/migration_verifier_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ func (suite *IntegrationTestSuite) TestVerifierFetchDocuments() {
471471
)
472472
}
473473

474-
func (suite *IntegrationTestSuite) TestGetNamespaceStatistics_Recheck() {
474+
func (suite *IntegrationTestSuite) TestGetPersistedNamespaceStatistics_Recheck() {
475475
ctx := suite.Context()
476476
verifier := suite.BuildVerifier()
477477

@@ -522,7 +522,7 @@ func (suite *IntegrationTestSuite) TestGetNamespaceStatistics_Recheck() {
522522
suite.Require().NoError(verifier.GenerateRecheckTasksWhileLocked(ctx))
523523
}()
524524

525-
stats, err := verifier.GetNamespaceStatistics(ctx)
525+
stats, err := verifier.GetPersistedNamespaceStatistics(ctx)
526526
suite.Require().NoError(err)
527527

528528
suite.Assert().Equal(
@@ -547,7 +547,7 @@ func (suite *IntegrationTestSuite) TestGetNamespaceStatistics_Gen0() {
547547
ctx := suite.Context()
548548
verifier := suite.BuildVerifier()
549549

550-
stats, err := verifier.GetNamespaceStatistics(ctx)
550+
stats, err := verifier.GetPersistedNamespaceStatistics(ctx)
551551
suite.Require().NoError(err)
552552

553553
suite.Assert().Equal(
@@ -565,7 +565,7 @@ func (suite *IntegrationTestSuite) TestGetNamespaceStatistics_Gen0() {
565565
task1, err := verifier.InsertCollectionVerificationTask(ctx, "mydb.coll1")
566566
suite.Require().NoError(err)
567567

568-
stats, err = verifier.GetNamespaceStatistics(ctx)
568+
stats, err = verifier.GetPersistedNamespaceStatistics(ctx)
569569
suite.Require().NoError(err)
570570

571571
suite.Assert().Equal(
@@ -593,7 +593,7 @@ func (suite *IntegrationTestSuite) TestGetNamespaceStatistics_Gen0() {
593593
err = verifier.UpdateVerificationTask(ctx, task1)
594594
suite.Require().NoError(err)
595595

596-
stats, err = verifier.GetNamespaceStatistics(ctx)
596+
stats, err = verifier.GetPersistedNamespaceStatistics(ctx)
597597
suite.Require().NoError(err)
598598

599599
suite.Assert().Equal(
@@ -643,7 +643,7 @@ func (suite *IntegrationTestSuite) TestGetNamespaceStatistics_Gen0() {
643643
task2parts[i] = task2part
644644
}
645645

646-
stats, err = verifier.GetNamespaceStatistics(ctx)
646+
stats, err = verifier.GetPersistedNamespaceStatistics(ctx)
647647
suite.Require().NoError(err)
648648

649649
suite.Assert().Equal(
@@ -671,7 +671,7 @@ func (suite *IntegrationTestSuite) TestGetNamespaceStatistics_Gen0() {
671671
err = verifier.UpdateVerificationTask(ctx, task1parts[0])
672672
suite.Require().NoError(err)
673673

674-
stats, err = verifier.GetNamespaceStatistics(ctx)
674+
stats, err = verifier.GetPersistedNamespaceStatistics(ctx)
675675
suite.Require().NoError(err)
676676

677677
suite.Assert().Equal(
@@ -710,7 +710,7 @@ func (suite *IntegrationTestSuite) TestGetNamespaceStatistics_Gen0() {
710710
err = verifier.UpdateVerificationTask(ctx, task2parts[1])
711711
suite.Require().NoError(err)
712712

713-
stats, err = verifier.GetNamespaceStatistics(ctx)
713+
stats, err = verifier.GetPersistedNamespaceStatistics(ctx)
714714
suite.Require().NoError(err)
715715

716716
suite.Assert().Equal(

internal/verifier/statistics.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,10 +168,10 @@ const perNsStatsPipelineTemplate = `[
168168
var templateOnce sync.Once
169169
var jsonTemplate *template.Template
170170

171-
// GetNamespaceStatistics queries the verifier’s metadata for statistics
171+
// GetPersistedNamespaceStatistics queries the verifier’s metadata for statistics
172172
// on progress for each namespace. The returned array is sorted by
173173
// Namespace and contains one entry for each namespace.
174-
func (verifier *Verifier) GetNamespaceStatistics(ctx context.Context) ([]NamespaceStats, error) {
174+
func (verifier *Verifier) GetPersistedNamespaceStatistics(ctx context.Context) ([]NamespaceStats, error) {
175175
generation, _ := verifier.getGeneration()
176176

177177
templateOnce.Do(func() {

internal/verifier/summary.go

Lines changed: 75 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ OUTB:
240240

241241
// Boolean returned indicates whether this generation has any tasks.
242242
func (verifier *Verifier) printNamespaceStatistics(ctx context.Context, strBuilder *strings.Builder, now time.Time) (bool, error) {
243-
stats, err := verifier.GetNamespaceStatistics(ctx)
243+
stats, err := verifier.GetPersistedNamespaceStatistics(ctx)
244244
if err != nil {
245245
return false, err
246246
}
@@ -282,6 +282,25 @@ func (verifier *Verifier) printNamespaceStatistics(ctx context.Context, strBuild
282282

283283
elapsed := now.Sub(verifier.generationStartTime)
284284

285+
var activeWorkers int
286+
perNamespaceWorkerStats := verifier.getPerNamespaceWorkerStats()
287+
for _, nsWorkerStats := range perNamespaceWorkerStats {
288+
for _, workerStats := range nsWorkerStats {
289+
activeWorkers++
290+
comparedDocs += workerStats.SrcDocCount
291+
comparedBytes += workerStats.SrcByteCount
292+
}
293+
}
294+
295+
if activeWorkers > 0 {
296+
fmt.Fprintf(
297+
strBuilder,
298+
"Active document comparison threads: %d of %d\n",
299+
activeWorkers,
300+
verifier.numWorkers,
301+
)
302+
}
303+
285304
docsPerSecond := float64(comparedDocs) / elapsed.Seconds()
286305
bytesPerSecond := float64(comparedBytes) / elapsed.Seconds()
287306
perSecondDataUnit := reportutils.FindBestUnit(bytesPerSecond)
@@ -336,7 +355,7 @@ func (verifier *Verifier) printNamespaceStatistics(ctx context.Context, strBuild
336355

337356
table := tablewriter.NewWriter(strBuilder)
338357

339-
headers := []string{"Src Namespace", "Src Docs Compared"}
358+
headers := []string{"Src Namespace", "Threads", "Src Docs Compared"}
340359
if showDataTotals {
341360
headers = append(headers, "Src Data Compared")
342361
}
@@ -351,18 +370,32 @@ func (verifier *Verifier) printNamespaceStatistics(ctx context.Context, strBuild
351370

352371
tableHasRows = true
353372

354-
row := []string{result.Namespace}
373+
var threads int
374+
375+
docsCompared := result.DocsCompared
376+
bytesCompared := result.BytesCompared
377+
378+
if nsWorkerStats, ok := perNamespaceWorkerStats[result.Namespace]; ok {
379+
threads = len(nsWorkerStats)
380+
381+
for _, workerStats := range nsWorkerStats {
382+
docsCompared += workerStats.SrcDocCount
383+
bytesCompared += workerStats.SrcByteCount
384+
}
385+
}
386+
387+
row := []string{result.Namespace, reportutils.FmtReal(threads)}
355388

356389
var docsCell string
357390

358391
if result.TotalDocs > 0 {
359392
docsCell = fmt.Sprintf("%s of %s (%s%%)",
360-
reportutils.FmtReal(result.DocsCompared),
393+
reportutils.FmtReal(docsCompared),
361394
reportutils.FmtReal(result.TotalDocs),
362-
reportutils.FmtPercent(result.DocsCompared, result.TotalDocs),
395+
reportutils.FmtPercent(docsCompared, result.TotalDocs),
363396
)
364397
} else {
365-
docsCell = reportutils.FmtReal(result.DocsCompared)
398+
docsCell = reportutils.FmtReal(docsCompared)
366399
}
367400

368401
row = append(row, docsCell)
@@ -374,16 +407,16 @@ func (verifier *Verifier) printNamespaceStatistics(ctx context.Context, strBuild
374407
dataUnit := reportutils.FindBestUnit(result.TotalBytes)
375408

376409
dataCell = fmt.Sprintf("%s of %s %s (%s%%)",
377-
reportutils.BytesToUnit(result.BytesCompared, dataUnit),
410+
reportutils.BytesToUnit(bytesCompared, dataUnit),
378411
reportutils.BytesToUnit(result.TotalBytes, dataUnit),
379412
dataUnit,
380-
reportutils.FmtPercent(result.BytesCompared, result.TotalBytes),
413+
reportutils.FmtPercent(bytesCompared, result.TotalBytes),
381414
)
382415
} else {
383-
dataUnit := reportutils.FindBestUnit(result.BytesCompared)
416+
dataUnit := reportutils.FindBestUnit(bytesCompared)
384417

385418
dataCell = fmt.Sprintf("%s %s",
386-
reportutils.BytesToUnit(result.BytesCompared, dataUnit),
419+
reportutils.BytesToUnit(bytesCompared, dataUnit),
387420
dataUnit,
388421
)
389422
}
@@ -403,7 +436,7 @@ func (verifier *Verifier) printNamespaceStatistics(ctx context.Context, strBuild
403436
}
404437

405438
func (verifier *Verifier) printEndOfGenerationStatistics(ctx context.Context, strBuilder *strings.Builder, now time.Time) (bool, error) {
406-
stats, err := verifier.GetNamespaceStatistics(ctx)
439+
stats, err := verifier.GetPersistedNamespaceStatistics(ctx)
407440
if err != nil {
408441
return false, err
409442
}
@@ -564,6 +597,25 @@ func (verifier *Verifier) printChangeEventStatistics(builder *strings.Builder, n
564597
}
565598
}
566599

600+
func (verifier *Verifier) getPerNamespaceWorkerStats() map[string][]WorkerStatus {
601+
wsmap := verifier.workerTracker.Load()
602+
603+
retMap := map[string][]WorkerStatus{}
604+
605+
for _, workerStats := range wsmap {
606+
if workerStats.TaskID == nil {
607+
continue
608+
}
609+
610+
retMap[workerStats.Namespace] = append(
611+
retMap[workerStats.Namespace],
612+
workerStats,
613+
)
614+
}
615+
616+
return retMap
617+
}
618+
567619
func (verifier *Verifier) printWorkerStatus(builder *strings.Builder, now time.Time) {
568620

569621
table := tablewriter.NewWriter(builder)
@@ -572,7 +624,7 @@ func (verifier *Verifier) printWorkerStatus(builder *strings.Builder, now time.T
572624
wsmap := verifier.workerTracker.Load()
573625

574626
activeThreadCount := 0
575-
for w := 0; w <= verifier.numWorkers; w++ {
627+
for w := range verifier.numWorkers {
576628
if wsmap[w].TaskID == nil {
577629
continue
578630
}
@@ -590,23 +642,27 @@ func (verifier *Verifier) printWorkerStatus(builder *strings.Builder, now time.T
590642
taskIdStr = fmt.Sprintf("%s", wsmap[w].TaskID)
591643
}
592644

645+
var detail string
646+
if wsmap[w].TaskType == verificationTaskVerifyDocuments {
647+
detail = fmt.Sprintf(
648+
"%s documents (%s)",
649+
reportutils.FmtReal(wsmap[w].SrcDocCount),
650+
reportutils.FmtBytes(wsmap[w].SrcByteCount),
651+
)
652+
}
653+
593654
table.Append(
594655
[]string{
595656
reportutils.FmtReal(w),
596657
wsmap[w].Namespace,
597658
taskIdStr,
598659
reportutils.DurationToHMS(now.Sub(wsmap[w].StartTime)),
599-
wsmap[w].Detail,
660+
detail,
600661
},
601662
)
602663
}
603664

604-
fmt.Fprintf(
605-
builder,
606-
"\nActive worker threads (%s of %s):\n",
607-
reportutils.FmtReal(activeThreadCount),
608-
reportutils.FmtReal(verifier.numWorkers),
609-
)
665+
fmt.Fprintf(builder, "\nWorker thread details:\n")
610666

611667
table.Render()
612668
}

internal/verifier/worker_tracker.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package verifier
33
import (
44
"time"
55

6+
"github.com/10gen/migration-verifier/internal/types"
67
"github.com/10gen/migration-verifier/msync"
78
"golang.org/x/exp/maps"
89
)
@@ -20,11 +21,12 @@ type WorkerStatusMap = map[int]WorkerStatus
2021
// WorkerStatus details the work that an individual worker thread
2122
// is doing.
2223
type WorkerStatus struct {
23-
TaskID any
24-
TaskType verificationTaskType
25-
Namespace string
26-
StartTime time.Time
27-
Detail string
24+
TaskID any
25+
StartTime time.Time
26+
TaskType verificationTaskType
27+
Namespace string
28+
SrcDocCount types.DocumentCount
29+
SrcByteCount types.ByteCount
2830
}
2931

3032
// NewWorkerTracker creates and returns a WorkerTracker.
@@ -52,10 +54,11 @@ func (wt *WorkerTracker) Set(workerNum int, task VerificationTask) {
5254
})
5355
}
5456

55-
func (wt *WorkerTracker) SetDetail(workerNum int, detail string) {
57+
func (wt *WorkerTracker) SetSrcCounts(workerNum int, docs types.DocumentCount, bytes types.ByteCount) {
5658
wt.guard.Store(func(m WorkerStatusMap) WorkerStatusMap {
5759
status := m[workerNum]
58-
status.Detail = detail
60+
status.SrcDocCount = docs
61+
status.SrcByteCount = bytes
5962
m[workerNum] = status
6063

6164
return m

0 commit comments

Comments
 (0)