Skip to content

Commit 1c359f8

Browse files
craig[bot]xinhaozOriSavir
committed
146771: roachtest: add sustained wal failover test r=xinhaoz a=xinhaoz Add `disk-stalled/wal-failover/among-stores/with-progress` roachtest. This test is similar to the existing wal failover among stores test, however instead of stalling the disk completely every 10m, we induce a sustained WAL failover with short disk stalls to allow some operations to continue during failover state. Closes: #146095 147432: codelabs: edit testing to correct command r=OriSavir a=OriSavir Previously, it said to run Datadriven tests. However, the correct form is DataDriven. The Datadriven command is incorrect. Now, it is corrected. Epic: None Release note: None Co-authored-by: Xin Hao Zhang <[email protected]> Co-authored-by: Oriel Savir <[email protected]>
3 parents 59e98df + 3aa6881 + c82ea38 commit 1c359f8

File tree

4 files changed

+313
-3
lines changed

4 files changed

+313
-3
lines changed

docs/codelabs/01-sql-statement.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -493,8 +493,8 @@ FROBNICATE ALL
493493
```
494494

495495
Back to the terminal, make sure you are at `~/go/src/github.com/cockroachdb/cockroach`,
496-
and run `./dev test pkg/sql/parser -f TestParseDatadriven --rewrite`.
497-
The flag `--rewrite` is meant to automatically rewrite the datadriven test with the output it received.
496+
and run `./dev test pkg/sql/parser -f TestParseDataDriven --rewrite`.
497+
The flag `--rewrite` is meant to automatically rewrite the DataDriven test with the output it received.
498498

499499
Wait until the test command finishes, and open `pkg/sql/parser/testdata/frobnicate`, and you would expect:
500500

pkg/cmd/roachtest/spec/cluster_spec.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ func getGCEOpts(
268268
opts.PDVolumeCount = volumeCount
269269
}
270270
opts.SSDCount = localSSDCount
271-
if localSSD && localSSDCount > 0 {
271+
if (localSSD && localSSDCount > 0) || (!localSSD && volumeCount > 1) {
272272
// NB: As the default behavior for _roachprod_ (at least in AWS/GCP) is
273273
// to mount multiple disks as a single store using a RAID 0 array, we
274274
// must explicitly ask for multiple stores to be enabled, _unless_ the

pkg/cmd/roachtest/tests/disk_stall.go

Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,18 @@ import (
1111
"math/rand"
1212
"strconv"
1313
"strings"
14+
"sync"
1415
"time"
1516

1617
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
18+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats"
1719
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
1820
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
1921
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
2022
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
2123
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
2224
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
25+
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
2326
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
2427
"github.com/stretchr/testify/require"
2528
)
@@ -392,6 +395,312 @@ func runDiskStalledDetection(
392395
c.Stop(ctx, t.L(), option.DefaultStopOpts(), c.CRDBNodes())
393396
}
394397

398+
// registerDiskStalledWALFailoverWithProgress registers a test that induces
399+
// WAL failover while the workload is running. This test is similar to
400+
// disk-stalled/wal-failover/among-stores, but allows some progress to be
401+
// made while are in failover. Specifically, we'll oscillate both the
402+
// workload and failover states in the following pattern with some jitter in
403+
// the timing of each operation:
404+
//
405+
// Time (minutes) 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
406+
// Workload |----|----|----| |----|----|----| |----|----|----| |----|----|----|
407+
// Disk Stalls |----|----|----| |----|----|----| |----|----|----|
408+
//
409+
// Note that:
410+
// Every 4th run, the workload will run without any disk stalls.
411+
// Each workload and stall phase is 3m.
412+
// Each operation has a min 30s + random 0-2m wait after both operations finish.
413+
//
414+
// The workload run in this test is meant to ramp up to 50% disk bandwidth.
415+
// See: https://cloud.google.com/compute/docs/disks/performance for estimations on disk performance.
416+
// For a 100GB pd-ssd disk we get an estimated max performance of:
417+
// - 6K IOPS (3K baseline + 30 ops * 100GB disk).
418+
// - 288 MiB/s (240 MiB/s baseline + 0.48 * 100GB disk).
419+
func registerDiskStalledWALFailoverWithProgress(r registry.Registry) {
420+
r.Add(registry.TestSpec{
421+
Name: "disk-stalled/wal-failover/among-stores/with-progress",
422+
Owner: registry.OwnerStorage,
423+
Cluster: r.MakeClusterSpec(4,
424+
spec.CPU(16),
425+
spec.WorkloadNode(),
426+
spec.ReuseNone(),
427+
spec.DisableLocalSSD(),
428+
spec.GCEVolumeCount(2),
429+
spec.GCEVolumeType("pd-ssd"),
430+
spec.VolumeSize(100),
431+
),
432+
CompatibleClouds: registry.OnlyGCE,
433+
Suites: registry.Suites(registry.Nightly),
434+
Timeout: 2 * time.Hour,
435+
SkipPostValidations: registry.PostValidationNoDeadNodes,
436+
EncryptionSupport: registry.EncryptionMetamorphic,
437+
Leases: registry.MetamorphicLeases,
438+
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
439+
runDiskStalledWALFailoverWithProgress(ctx, t, c)
440+
},
441+
})
442+
}
443+
444+
func runDiskStalledWALFailoverWithProgress(ctx context.Context, t test.Test, c cluster.Cluster) {
445+
const (
446+
testDuration = 1 * time.Hour
447+
// We'll issue short stalls every 10s to keep us in the failover state.
448+
stallInterval = 5 * time.Second
449+
shortStallDur = 200 * time.Millisecond
450+
// For each loop, each operation will start after a random wait between [30s, 150s).
451+
operationWaitBase = 30 * time.Second
452+
waitJitterMax = 2 * time.Minute
453+
operationDur = 3 * time.Minute
454+
// QPS sampling parameters.
455+
sampleInterval = 10 * time.Second
456+
errorTolerance = 0.2 // 20% tolerance for throughput variation.
457+
)
458+
459+
t.Status("setting up disk staller")
460+
// Use CgroupDiskStaller with readsToo=false to only stall writes.
461+
s := roachtestutil.MakeCgroupDiskStaller(t, c, false /* readsToo */, false /* logsToo */)
462+
s.Setup(ctx)
463+
defer s.Cleanup(ctx)
464+
465+
t.Status("starting cluster")
466+
startOpts := option.DefaultStartOpts()
467+
startOpts.RoachprodOpts.WALFailover = "among-stores"
468+
startOpts.RoachprodOpts.StoreCount = 2
469+
startSettings := install.MakeClusterSettings()
470+
c.Start(ctx, t.L(), startOpts, startSettings, c.CRDBNodes())
471+
472+
// Open a SQL connection to n1, the node that will be stalled.
473+
n1Conn := c.Conn(ctx, t.L(), 1)
474+
defer n1Conn.Close()
475+
require.NoError(t, n1Conn.PingContext(ctx))
476+
// Wait for upreplication.
477+
require.NoError(t, roachtestutil.WaitFor3XReplication(ctx, t.L(), n1Conn))
478+
adminUIAddrs, err := c.ExternalAdminUIAddr(ctx, t.L(), c.Nodes(2))
479+
require.NoError(t, err)
480+
adminURL := adminUIAddrs[0]
481+
c.Run(ctx, option.WithNodes(c.WorkloadNode()), `./cockroach workload init kv --splits 1000 {pgurl:1}`)
482+
_, err = n1Conn.ExecContext(ctx, `USE kv;`)
483+
require.NoError(t, err)
484+
485+
t.Status("starting oscillating workload and disk stall pattern")
486+
testStartedAt := timeutil.Now()
487+
m := c.NewMonitor(ctx, c.CRDBNodes())
488+
489+
// Setup stats collector.
490+
promCfg := &prometheus.Config{}
491+
promCfg.WithPrometheusNode(c.WorkloadNode().InstallNodes()[0]).
492+
WithNodeExporter(c.CRDBNodes().InstallNodes()).
493+
WithCluster(c.CRDBNodes().InstallNodes())
494+
err = c.StartGrafana(ctx, t.L(), promCfg)
495+
require.NoError(t, err)
496+
cleanupFunc := func() {
497+
if err := c.StopGrafana(ctx, t.L(), t.ArtifactsDir()); err != nil {
498+
t.L().ErrorfCtx(ctx, "Error(s) shutting down prom/grafana %s", err)
499+
}
500+
}
501+
defer cleanupFunc()
502+
503+
promClient, err := clusterstats.SetupCollectorPromClient(ctx, c, t.L(), promCfg)
504+
require.NoError(t, err)
505+
statCollector := clusterstats.NewStatsCollector(ctx, promClient)
506+
507+
// Track mean throughput for each iteration.
508+
var iterationMeans []float64
509+
510+
iteration := 1
511+
for timeutil.Since(testStartedAt) < testDuration {
512+
if t.Failed() {
513+
t.Fatalf("test failed, stopping further iterations")
514+
return
515+
}
516+
517+
workloadWaitDur := operationWaitBase + time.Duration(rand.Int63n(int64(waitJitterMax)))
518+
t.Status("next workload run in ", workloadWaitDur)
519+
520+
// Channels to signal workload state.
521+
workloadStarted := make(chan struct{})
522+
workloadFinished := make(chan struct{})
523+
524+
var wg sync.WaitGroup
525+
wg.Add(1)
526+
m.Go(func(ctx context.Context) error {
527+
defer wg.Done()
528+
529+
select {
530+
case <-ctx.Done():
531+
t.Fatalf("context done before workload started: %s", ctx.Err())
532+
case <-time.After(workloadWaitDur):
533+
t.Status("starting workload")
534+
close(workloadStarted)
535+
workloadCmd := `./cockroach workload run kv --read-percent 0 ` +
536+
fmt.Sprintf(`--duration %s --concurrency 4096 --max-rate=2048 --tolerate-errors `, operationDur.String()) +
537+
`--min-block-bytes=4096 --max-block-bytes=4096 --timeout 1s {pgurl:1-3}`
538+
c.Run(ctx, option.WithNodes(c.WorkloadNode()), workloadCmd)
539+
close(workloadFinished)
540+
return nil
541+
}
542+
return nil
543+
})
544+
545+
// Collecting QPS samples while the workload is running and verify
546+
// that the throughput is within errorTolerance of the mean.
547+
var samples []float64
548+
wg.Add(1)
549+
m.Go(func(ctx context.Context) error {
550+
defer wg.Done()
551+
552+
// Wait for workload to start.
553+
select {
554+
case <-ctx.Done():
555+
t.Fatalf("context done before workload started: %s", ctx.Err())
556+
case <-workloadStarted:
557+
}
558+
559+
// Wait 20s after workload starts before beginning sampling.
560+
select {
561+
case <-ctx.Done():
562+
t.Fatalf("context done before workload started: %s", ctx.Err())
563+
case <-time.After(20 * time.Second):
564+
t.Status("starting QPS sampling")
565+
}
566+
567+
// Calculate approx how many samples we can take before workload ends.
568+
// We want to stop sampling 10s before workload ends to avoid sampling during shutdown.
569+
samplingDuration := operationDur - 30*time.Second // 20s initial wait + 10s buffer at workload end
570+
sampleCount := int(samplingDuration / sampleInterval)
571+
572+
sampleTimer := time.NewTicker(sampleInterval)
573+
defer sampleTimer.Stop()
574+
575+
done := false
576+
for i := 0; i < sampleCount && !done; i++ {
577+
select {
578+
case <-ctx.Done():
579+
t.Fatalf("context done while sampling: %s", ctx.Err())
580+
case <-workloadFinished:
581+
done = true
582+
case <-sampleTimer.C:
583+
metric := `rate(sql_select_count[30s]) + rate(sql_insert_count[30s]) + rate(sql_update_count[30s])`
584+
stats, err := statCollector.CollectPoint(ctx, t.L(), timeutil.Now(), metric)
585+
if err != nil {
586+
t.Errorf("failed to collect throughput stats: %v", err)
587+
continue
588+
}
589+
var clusterQPS float64
590+
if nodeStats, ok := stats["node"]; ok {
591+
for _, stat := range nodeStats {
592+
clusterQPS += stat.Value
593+
}
594+
} else {
595+
t.Status("no node stats found for throughput metric ", metric)
596+
continue
597+
}
598+
t.Status("sampled cluster QPS: ", clusterQPS)
599+
samples = append(samples, clusterQPS)
600+
}
601+
}
602+
603+
t.Status(fmt.Sprintf("workload finished, %d samples collected", len(samples)))
604+
return nil
605+
})
606+
607+
// Every 4th iteration, we'll skip the disk stall phase.
608+
if iteration%4 != 0 {
609+
// Calculate next stall phase with jitter.
610+
diskStallWaitDur := operationWaitBase + time.Duration(rand.Int63n(int64(waitJitterMax)))
611+
t.Status("next stall phase in ", diskStallWaitDur)
612+
613+
wg.Add(1)
614+
m.Go(func(ctx context.Context) error {
615+
defer wg.Done()
616+
select {
617+
case <-ctx.Done():
618+
t.Fatalf("context done before stall started: %s", ctx.Err())
619+
case <-time.After(diskStallWaitDur):
620+
t.Status("starting disk stall")
621+
}
622+
stallStart := timeutil.Now()
623+
// Execute short 200ms stalls every 10s.
624+
for timeutil.Since(stallStart) < operationDur {
625+
select {
626+
case <-ctx.Done():
627+
t.Fatalf("context done while stall induced: %s", ctx.Err())
628+
case <-time.After(stallInterval):
629+
func() {
630+
s.Stall(ctx, c.Node(1))
631+
t.Status("short disk stall on n1")
632+
defer func() {
633+
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
634+
defer cancel()
635+
s.Unstall(ctx, c.Node(1))
636+
}()
637+
select {
638+
case <-ctx.Done():
639+
t.Fatalf("context done while stall induced: %s", ctx.Err())
640+
case <-time.After(shortStallDur):
641+
return
642+
}
643+
}()
644+
}
645+
}
646+
647+
return nil
648+
})
649+
} else {
650+
t.Status("skipping disk stall phase for this iteration")
651+
}
652+
653+
// Wait for all goroutines to complete.
654+
wg.Wait()
655+
656+
// Validate throughput samples are within tolerance.
657+
meanThroughput := roachtestutil.GetMeanOverLastN(len(samples), samples)
658+
t.Status("mean throughput for iteration", iteration, ": ", meanThroughput)
659+
for _, sample := range samples {
660+
require.InEpsilonf(t, meanThroughput, sample, errorTolerance,
661+
"sample %f is not within tolerance of mean %f", sample, meanThroughput)
662+
}
663+
iterationMeans = append(iterationMeans, meanThroughput)
664+
iteration++
665+
}
666+
667+
t.Status("exited control loop")
668+
669+
time.Sleep(1 * time.Second)
670+
exit, ok := getProcessExitMonotonic(ctx, t, c, 1)
671+
if ok && exit > 0 {
672+
t.Fatal("process exited unexpectedly")
673+
}
674+
675+
// Validate overall throughput consistency across iterations.
676+
overallMean := roachtestutil.GetMeanOverLastN(len(iterationMeans), iterationMeans)
677+
for _, mean := range iterationMeans {
678+
require.InEpsilonf(t, overallMean, mean, errorTolerance,
679+
"iteration mean %f is not within tolerance of overall mean %f", mean, overallMean)
680+
}
681+
682+
data := mustGetMetrics(ctx, c, t, adminURL, install.SystemInterfaceName,
683+
testStartedAt.Add(5*time.Minute),
684+
timeutil.Now().Add(-time.Minute),
685+
[]tsQuery{
686+
{name: "cr.store.storage.wal.failover.secondary.duration", queryType: total, sources: []string{"1"}},
687+
})
688+
689+
// Over the course of the 1h test, we expect many short stalls. Assert that
690+
// the total time spent writing to the secondary is at least 10m.
691+
durInFailover := time.Duration(data.Results[0].Datapoints[len(data.Results[0].Datapoints)-1].Value)
692+
t.L().PrintfCtx(ctx, "duration s1 spent writing to secondary %s", durInFailover)
693+
if durInFailover < 10*time.Minute {
694+
t.Errorf("expected s1 to spend at least 10m writing to secondary, but spent %s", durInFailover)
695+
}
696+
697+
// Wait for the workload to finish (if it hasn't already).
698+
m.Wait()
699+
700+
// Shut down the nodes, allowing any devices to be unmounted during cleanup.
701+
c.Stop(ctx, t.L(), option.DefaultStopOpts(), c.CRDBNodes())
702+
}
703+
395704
func getProcessStartMonotonic(
396705
ctx context.Context, t test.Test, c cluster.Cluster, nodeID int,
397706
) (since time.Duration, ok bool) {

pkg/cmd/roachtest/tests/registry.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func RegisterTests(r registry.Registry) {
5353
registerDiskFull(r)
5454
registerDiskStalledDetection(r)
5555
registerDiskStalledWALFailover(r)
56+
registerDiskStalledWALFailoverWithProgress(r)
5657
registerDjango(r)
5758
registerDrain(r)
5859
registerDrop(r)

0 commit comments

Comments
 (0)