@@ -11,17 +11,18 @@ import (
11
11
"math/rand"
12
12
"strconv"
13
13
"strings"
14
- "sync"
15
14
"time"
16
15
17
16
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
18
17
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats"
19
18
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
20
19
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
21
20
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
21
+ "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
22
22
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
23
23
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
24
24
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
25
+ "github.com/cockroachdb/cockroach/pkg/roachprod/logger"
25
26
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
26
27
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
27
28
"github.com/stretchr/testify/require"
@@ -435,6 +436,7 @@ func registerDiskStalledWALFailoverWithProgress(r registry.Registry) {
435
436
SkipPostValidations : registry .PostValidationNoDeadNodes ,
436
437
EncryptionSupport : registry .EncryptionMetamorphic ,
437
438
Leases : registry .MetamorphicLeases ,
439
+ Monitor : true ,
438
440
Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
439
441
runDiskStalledWALFailoverWithProgress (ctx , t , c )
440
442
},
@@ -484,7 +486,7 @@ func runDiskStalledWALFailoverWithProgress(ctx context.Context, t test.Test, c c
484
486
485
487
t .Status ("starting oscillating workload and disk stall pattern" )
486
488
testStartedAt := timeutil .Now ()
487
- m := c . NewMonitor ( ctx , c . CRDBNodes ( ))
489
+ g := t . NewGroup ( task . WithContext ( ctx ))
488
490
489
491
// Setup stats collector.
490
492
promCfg := & prometheus.Config {}
@@ -511,7 +513,6 @@ func runDiskStalledWALFailoverWithProgress(ctx context.Context, t test.Test, c c
511
513
for timeutil .Since (testStartedAt ) < testDuration {
512
514
if t .Failed () {
513
515
t .Fatalf ("test failed, stopping further iterations" )
514
- return
515
516
}
516
517
517
518
workloadWaitDur := operationWaitBase + time .Duration (rand .Int63n (int64 (waitJitterMax )))
@@ -521,11 +522,7 @@ func runDiskStalledWALFailoverWithProgress(ctx context.Context, t test.Test, c c
521
522
workloadStarted := make (chan struct {})
522
523
workloadFinished := make (chan struct {})
523
524
524
- var wg sync.WaitGroup
525
- wg .Add (1 )
526
- m .Go (func (ctx context.Context ) error {
527
- defer wg .Done ()
528
-
525
+ g .Go (func (ctx context.Context , _ * logger.Logger ) error {
529
526
select {
530
527
case <- ctx .Done ():
531
528
t .Fatalf ("context done before workload started: %s" , ctx .Err ())
@@ -540,14 +537,12 @@ func runDiskStalledWALFailoverWithProgress(ctx context.Context, t test.Test, c c
540
537
return nil
541
538
}
542
539
return nil
543
- })
540
+ }, task . Name ( "workload-run" ) )
544
541
545
542
// Collecting QPS samples while the workload is running and verify
546
543
// that the throughput is within errorTolerance of the mean.
547
544
var samples []float64
548
- wg .Add (1 )
549
- m .Go (func (ctx context.Context ) error {
550
- defer wg .Done ()
545
+ g .Go (func (ctx context.Context , _ * logger.Logger ) error {
551
546
552
547
// Wait for workload to start.
553
548
select {
@@ -602,17 +597,15 @@ func runDiskStalledWALFailoverWithProgress(ctx context.Context, t test.Test, c c
602
597
603
598
t .Status (fmt .Sprintf ("workload finished, %d samples collected" , len (samples )))
604
599
return nil
605
- })
600
+ }, task . Name ( "qps-sampling" ) )
606
601
607
602
// Every 4th iteration, we'll skip the disk stall phase.
608
603
if iteration % 4 != 0 {
609
604
// Calculate next stall phase with jitter.
610
605
diskStallWaitDur := operationWaitBase + time .Duration (rand .Int63n (int64 (waitJitterMax )))
611
606
t .Status ("next stall phase in " , diskStallWaitDur )
612
607
613
- wg .Add (1 )
614
- m .Go (func (ctx context.Context ) error {
615
- defer wg .Done ()
608
+ g .Go (func (ctx context.Context , _ * logger.Logger ) error {
616
609
select {
617
610
case <- ctx .Done ():
618
611
t .Fatalf ("context done before stall started: %s" , ctx .Err ())
@@ -650,13 +643,13 @@ func runDiskStalledWALFailoverWithProgress(ctx context.Context, t test.Test, c c
650
643
}
651
644
652
645
return nil
653
- })
646
+ }, task . Name ( "disk-stall-phase" ) )
654
647
} else {
655
648
t .Status ("skipping disk stall phase for this iteration" )
656
649
}
657
650
658
651
// Wait for all goroutines to complete.
659
- wg .Wait ()
652
+ g .Wait ()
660
653
661
654
// Validate throughput samples are within tolerance.
662
655
meanThroughput := roachtestutil .GetMeanOverLastN (len (samples ), samples )
@@ -698,12 +691,6 @@ func runDiskStalledWALFailoverWithProgress(ctx context.Context, t test.Test, c c
698
691
if durInFailover < 10 * time .Minute {
699
692
t .Errorf ("expected s1 to spend at least 10m writing to secondary, but spent %s" , durInFailover )
700
693
}
701
-
702
- // Wait for the workload to finish (if it hasn't already).
703
- m .Wait ()
704
-
705
- // Shut down the nodes, allowing any devices to be unmounted during cleanup.
706
- c .Stop (ctx , t .L (), option .DefaultStopOpts (), c .CRDBNodes ())
707
694
}
708
695
709
696
func getProcessStartMonotonic (
0 commit comments