@@ -11,15 +11,18 @@ import (
11
11
"math/rand"
12
12
"strconv"
13
13
"strings"
14
+ "sync"
14
15
"time"
15
16
16
17
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
18
+ "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats"
17
19
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
18
20
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
19
21
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
20
22
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
21
23
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
22
24
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
25
+ "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
23
26
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
24
27
"github.com/stretchr/testify/require"
25
28
)
@@ -392,6 +395,312 @@ func runDiskStalledDetection(
392
395
c .Stop (ctx , t .L (), option .DefaultStopOpts (), c .CRDBNodes ())
393
396
}
394
397
398
+ // registerDiskStalledWALFailoverWithProgress registers a test that induces
399
+ // WAL failover while the workload is running. This test is similar to
400
+ // disk-stalled/wal-failover/among-stores, but allows some progress to be
401
+ // made while are in failover. Specifically, we'll oscillate both the
402
+ // workload and failover states in the following pattern with some jitter in
403
+ // the timing of each operation:
404
+ //
405
+ // Time (minutes) 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
406
+ // Workload |----|----|----| |----|----|----| |----|----|----| |----|----|----|
407
+ // Disk Stalls |----|----|----| |----|----|----| |----|----|----|
408
+ //
409
+ // Note that:
410
+ // Every 4th run, the workload will run without any disk stalls.
411
+ // Each workload and stall phase is 3m.
412
+ // Each operation has a min 30s + random 0-2m wait after both operations finish.
413
+ //
414
+ // The workload run in this test is meant to ramp up to 50% disk bandwidth.
415
+ // See: https://cloud.google.com/compute/docs/disks/performance for estimations on disk performance.
416
+ // For a 100GB pd-ssd disk we get an estimated max performance of:
417
+ // - 6K IOPS (3K baseline + 30 ops * 100GB disk).
418
+ // - 288 MiB/s (240 MiB/s baseline + 0.48 * 100GB disk).
419
+ func registerDiskStalledWALFailoverWithProgress (r registry.Registry ) {
420
+ r .Add (registry.TestSpec {
421
+ Name : "disk-stalled/wal-failover/among-stores/with-progress" ,
422
+ Owner : registry .OwnerStorage ,
423
+ Cluster : r .MakeClusterSpec (4 ,
424
+ spec .CPU (16 ),
425
+ spec .WorkloadNode (),
426
+ spec .ReuseNone (),
427
+ spec .DisableLocalSSD (),
428
+ spec .GCEVolumeCount (2 ),
429
+ spec .GCEVolumeType ("pd-ssd" ),
430
+ spec .VolumeSize (100 ),
431
+ ),
432
+ CompatibleClouds : registry .OnlyGCE ,
433
+ Suites : registry .Suites (registry .Nightly ),
434
+ Timeout : 2 * time .Hour ,
435
+ SkipPostValidations : registry .PostValidationNoDeadNodes ,
436
+ EncryptionSupport : registry .EncryptionMetamorphic ,
437
+ Leases : registry .MetamorphicLeases ,
438
+ Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
439
+ runDiskStalledWALFailoverWithProgress (ctx , t , c )
440
+ },
441
+ })
442
+ }
443
+
444
+ func runDiskStalledWALFailoverWithProgress (ctx context.Context , t test.Test , c cluster.Cluster ) {
445
+ const (
446
+ testDuration = 1 * time .Hour
447
+ // We'll issue short stalls every 10s to keep us in the failover state.
448
+ stallInterval = 5 * time .Second
449
+ shortStallDur = 200 * time .Millisecond
450
+ // For each loop, each operation will start after a random wait between [30s, 150s).
451
+ operationWaitBase = 30 * time .Second
452
+ waitJitterMax = 2 * time .Minute
453
+ operationDur = 3 * time .Minute
454
+ // QPS sampling parameters.
455
+ sampleInterval = 10 * time .Second
456
+ errorTolerance = 0.2 // 20% tolerance for throughput variation.
457
+ )
458
+
459
+ t .Status ("setting up disk staller" )
460
+ // Use CgroupDiskStaller with readsToo=false to only stall writes.
461
+ s := roachtestutil .MakeCgroupDiskStaller (t , c , false /* readsToo */ , false /* logsToo */ )
462
+ s .Setup (ctx )
463
+ defer s .Cleanup (ctx )
464
+
465
+ t .Status ("starting cluster" )
466
+ startOpts := option .DefaultStartOpts ()
467
+ startOpts .RoachprodOpts .WALFailover = "among-stores"
468
+ startOpts .RoachprodOpts .StoreCount = 2
469
+ startSettings := install .MakeClusterSettings ()
470
+ c .Start (ctx , t .L (), startOpts , startSettings , c .CRDBNodes ())
471
+
472
+ // Open a SQL connection to n1, the node that will be stalled.
473
+ n1Conn := c .Conn (ctx , t .L (), 1 )
474
+ defer n1Conn .Close ()
475
+ require .NoError (t , n1Conn .PingContext (ctx ))
476
+ // Wait for upreplication.
477
+ require .NoError (t , roachtestutil .WaitFor3XReplication (ctx , t .L (), n1Conn ))
478
+ adminUIAddrs , err := c .ExternalAdminUIAddr (ctx , t .L (), c .Nodes (2 ))
479
+ require .NoError (t , err )
480
+ adminURL := adminUIAddrs [0 ]
481
+ c .Run (ctx , option .WithNodes (c .WorkloadNode ()), `./cockroach workload init kv --splits 1000 {pgurl:1}` )
482
+ _ , err = n1Conn .ExecContext (ctx , `USE kv;` )
483
+ require .NoError (t , err )
484
+
485
+ t .Status ("starting oscillating workload and disk stall pattern" )
486
+ testStartedAt := timeutil .Now ()
487
+ m := c .NewMonitor (ctx , c .CRDBNodes ())
488
+
489
+ // Setup stats collector.
490
+ promCfg := & prometheus.Config {}
491
+ promCfg .WithPrometheusNode (c .WorkloadNode ().InstallNodes ()[0 ]).
492
+ WithNodeExporter (c .CRDBNodes ().InstallNodes ()).
493
+ WithCluster (c .CRDBNodes ().InstallNodes ())
494
+ err = c .StartGrafana (ctx , t .L (), promCfg )
495
+ require .NoError (t , err )
496
+ cleanupFunc := func () {
497
+ if err := c .StopGrafana (ctx , t .L (), t .ArtifactsDir ()); err != nil {
498
+ t .L ().ErrorfCtx (ctx , "Error(s) shutting down prom/grafana %s" , err )
499
+ }
500
+ }
501
+ defer cleanupFunc ()
502
+
503
+ promClient , err := clusterstats .SetupCollectorPromClient (ctx , c , t .L (), promCfg )
504
+ require .NoError (t , err )
505
+ statCollector := clusterstats .NewStatsCollector (ctx , promClient )
506
+
507
+ // Track mean throughput for each iteration.
508
+ var iterationMeans []float64
509
+
510
+ iteration := 1
511
+ for timeutil .Since (testStartedAt ) < testDuration {
512
+ if t .Failed () {
513
+ t .Fatalf ("test failed, stopping further iterations" )
514
+ return
515
+ }
516
+
517
+ workloadWaitDur := operationWaitBase + time .Duration (rand .Int63n (int64 (waitJitterMax )))
518
+ t .Status ("next workload run in " , workloadWaitDur )
519
+
520
+ // Channels to signal workload state.
521
+ workloadStarted := make (chan struct {})
522
+ workloadFinished := make (chan struct {})
523
+
524
+ var wg sync.WaitGroup
525
+ wg .Add (1 )
526
+ m .Go (func (ctx context.Context ) error {
527
+ defer wg .Done ()
528
+
529
+ select {
530
+ case <- ctx .Done ():
531
+ t .Fatalf ("context done before workload started: %s" , ctx .Err ())
532
+ case <- time .After (workloadWaitDur ):
533
+ t .Status ("starting workload" )
534
+ close (workloadStarted )
535
+ workloadCmd := `./cockroach workload run kv --read-percent 0 ` +
536
+ fmt .Sprintf (`--duration %s --concurrency 4096 --max-rate=2048 --tolerate-errors ` , operationDur .String ()) +
537
+ `--min-block-bytes=4096 --max-block-bytes=4096 --timeout 1s {pgurl:1-3}`
538
+ c .Run (ctx , option .WithNodes (c .WorkloadNode ()), workloadCmd )
539
+ close (workloadFinished )
540
+ return nil
541
+ }
542
+ return nil
543
+ })
544
+
545
+ // Collecting QPS samples while the workload is running and verify
546
+ // that the throughput is within errorTolerance of the mean.
547
+ var samples []float64
548
+ wg .Add (1 )
549
+ m .Go (func (ctx context.Context ) error {
550
+ defer wg .Done ()
551
+
552
+ // Wait for workload to start.
553
+ select {
554
+ case <- ctx .Done ():
555
+ t .Fatalf ("context done before workload started: %s" , ctx .Err ())
556
+ case <- workloadStarted :
557
+ }
558
+
559
+ // Wait 20s after workload starts before beginning sampling.
560
+ select {
561
+ case <- ctx .Done ():
562
+ t .Fatalf ("context done before workload started: %s" , ctx .Err ())
563
+ case <- time .After (20 * time .Second ):
564
+ t .Status ("starting QPS sampling" )
565
+ }
566
+
567
+ // Calculate approx how many samples we can take before workload ends.
568
+ // We want to stop sampling 10s before workload ends to avoid sampling during shutdown.
569
+ samplingDuration := operationDur - 30 * time .Second // 20s initial wait + 10s buffer at workload end
570
+ sampleCount := int (samplingDuration / sampleInterval )
571
+
572
+ sampleTimer := time .NewTicker (sampleInterval )
573
+ defer sampleTimer .Stop ()
574
+
575
+ done := false
576
+ for i := 0 ; i < sampleCount && ! done ; i ++ {
577
+ select {
578
+ case <- ctx .Done ():
579
+ t .Fatalf ("context done while sampling: %s" , ctx .Err ())
580
+ case <- workloadFinished :
581
+ done = true
582
+ case <- sampleTimer .C :
583
+ metric := `rate(sql_select_count[30s]) + rate(sql_insert_count[30s]) + rate(sql_update_count[30s])`
584
+ stats , err := statCollector .CollectPoint (ctx , t .L (), timeutil .Now (), metric )
585
+ if err != nil {
586
+ t .Errorf ("failed to collect throughput stats: %v" , err )
587
+ continue
588
+ }
589
+ var clusterQPS float64
590
+ if nodeStats , ok := stats ["node" ]; ok {
591
+ for _ , stat := range nodeStats {
592
+ clusterQPS += stat .Value
593
+ }
594
+ } else {
595
+ t .Status ("no node stats found for throughput metric " , metric )
596
+ continue
597
+ }
598
+ t .Status ("sampled cluster QPS: " , clusterQPS )
599
+ samples = append (samples , clusterQPS )
600
+ }
601
+ }
602
+
603
+ t .Status (fmt .Sprintf ("workload finished, %d samples collected" , len (samples )))
604
+ return nil
605
+ })
606
+
607
+ // Every 4th iteration, we'll skip the disk stall phase.
608
+ if iteration % 4 != 0 {
609
+ // Calculate next stall phase with jitter.
610
+ diskStallWaitDur := operationWaitBase + time .Duration (rand .Int63n (int64 (waitJitterMax )))
611
+ t .Status ("next stall phase in " , diskStallWaitDur )
612
+
613
+ wg .Add (1 )
614
+ m .Go (func (ctx context.Context ) error {
615
+ defer wg .Done ()
616
+ select {
617
+ case <- ctx .Done ():
618
+ t .Fatalf ("context done before stall started: %s" , ctx .Err ())
619
+ case <- time .After (diskStallWaitDur ):
620
+ t .Status ("starting disk stall" )
621
+ }
622
+ stallStart := timeutil .Now ()
623
+ // Execute short 200ms stalls every 10s.
624
+ for timeutil .Since (stallStart ) < operationDur {
625
+ select {
626
+ case <- ctx .Done ():
627
+ t .Fatalf ("context done while stall induced: %s" , ctx .Err ())
628
+ case <- time .After (stallInterval ):
629
+ func () {
630
+ s .Stall (ctx , c .Node (1 ))
631
+ t .Status ("short disk stall on n1" )
632
+ defer func () {
633
+ ctx , cancel := context .WithTimeout (context .Background (), time .Minute )
634
+ defer cancel ()
635
+ s .Unstall (ctx , c .Node (1 ))
636
+ }()
637
+ select {
638
+ case <- ctx .Done ():
639
+ t .Fatalf ("context done while stall induced: %s" , ctx .Err ())
640
+ case <- time .After (shortStallDur ):
641
+ return
642
+ }
643
+ }()
644
+ }
645
+ }
646
+
647
+ return nil
648
+ })
649
+ } else {
650
+ t .Status ("skipping disk stall phase for this iteration" )
651
+ }
652
+
653
+ // Wait for all goroutines to complete.
654
+ wg .Wait ()
655
+
656
+ // Validate throughput samples are within tolerance.
657
+ meanThroughput := roachtestutil .GetMeanOverLastN (len (samples ), samples )
658
+ t .Status ("mean throughput for iteration" , iteration , ": " , meanThroughput )
659
+ for _ , sample := range samples {
660
+ require .InEpsilonf (t , meanThroughput , sample , errorTolerance ,
661
+ "sample %f is not within tolerance of mean %f" , sample , meanThroughput )
662
+ }
663
+ iterationMeans = append (iterationMeans , meanThroughput )
664
+ iteration ++
665
+ }
666
+
667
+ t .Status ("exited control loop" )
668
+
669
+ time .Sleep (1 * time .Second )
670
+ exit , ok := getProcessExitMonotonic (ctx , t , c , 1 )
671
+ if ok && exit > 0 {
672
+ t .Fatal ("process exited unexpectedly" )
673
+ }
674
+
675
+ // Validate overall throughput consistency across iterations.
676
+ overallMean := roachtestutil .GetMeanOverLastN (len (iterationMeans ), iterationMeans )
677
+ for _ , mean := range iterationMeans {
678
+ require .InEpsilonf (t , overallMean , mean , errorTolerance ,
679
+ "iteration mean %f is not within tolerance of overall mean %f" , mean , overallMean )
680
+ }
681
+
682
+ data := mustGetMetrics (ctx , c , t , adminURL , install .SystemInterfaceName ,
683
+ testStartedAt .Add (5 * time .Minute ),
684
+ timeutil .Now ().Add (- time .Minute ),
685
+ []tsQuery {
686
+ {name : "cr.store.storage.wal.failover.secondary.duration" , queryType : total , sources : []string {"1" }},
687
+ })
688
+
689
+ // Over the course of the 1h test, we expect many short stalls. Assert that
690
+ // the total time spent writing to the secondary is at least 10m.
691
+ durInFailover := time .Duration (data .Results [0 ].Datapoints [len (data .Results [0 ].Datapoints )- 1 ].Value )
692
+ t .L ().PrintfCtx (ctx , "duration s1 spent writing to secondary %s" , durInFailover )
693
+ if durInFailover < 10 * time .Minute {
694
+ t .Errorf ("expected s1 to spend at least 10m writing to secondary, but spent %s" , durInFailover )
695
+ }
696
+
697
+ // Wait for the workload to finish (if it hasn't already).
698
+ m .Wait ()
699
+
700
+ // Shut down the nodes, allowing any devices to be unmounted during cleanup.
701
+ c .Stop (ctx , t .L (), option .DefaultStopOpts (), c .CRDBNodes ())
702
+ }
703
+
395
704
func getProcessStartMonotonic (
396
705
ctx context.Context , t test.Test , c cluster.Cluster , nodeID int ,
397
706
) (since time.Duration , ok bool ) {
0 commit comments