Skip to content

Commit cf30717

Browse files
author
Andrew Baptist
committed
roachtest: modernize gracefuldraining test
Previously the test would attempt to determine the throughput based on the metrics timeseries web API during the test. This caused a high variance even though the test runner was showing lower variance. This change updates to query against the `crdb_internal.node_metrics` table during the test and produces more reliable results. Epic: none Informs: cockroachdb#106490 Release note: None
1 parent 955e7e0 commit cf30717

File tree

1 file changed

+78
-151
lines changed
  • pkg/cmd/roachtest/tests

1 file changed

+78
-151
lines changed

pkg/cmd/roachtest/tests/kv.go

Lines changed: 78 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,13 @@ import (
1515
gosql "database/sql"
1616
"fmt"
1717
"math/rand"
18-
"net/http"
1918
"os"
2019
"strconv"
2120
"strings"
2221
"sync"
2322
"sync/atomic"
2423
"time"
2524

26-
"github.com/cockroachdb/cockroach/pkg/base"
2725
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
2826
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
2927
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
@@ -32,9 +30,7 @@ import (
3230
"github.com/cockroachdb/cockroach/pkg/kv"
3331
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
3432
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
35-
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
36-
"github.com/cockroachdb/cockroach/pkg/util/httputil"
37-
"github.com/cockroachdb/cockroach/pkg/util/retry"
33+
"github.com/cockroachdb/cockroach/pkg/testutils"
3834
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
3935
"github.com/cockroachdb/errors"
4036
"github.com/stretchr/testify/assert"
@@ -518,50 +514,52 @@ func registerKVGracefulDraining(r registry.Registry) {
518514
Cluster: r.MakeClusterSpec(4),
519515
Leases: registry.MetamorphicLeases,
520516
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
517+
c.Put(ctx, t.Cockroach(), "./cockroach", c.Range(1, c.Spec().NodeCount))
521518
nodes := c.Spec().NodeCount - 1
522-
c.Put(ctx, t.Cockroach(), "./cockroach", c.Range(1, nodes))
523-
c.Put(ctx, t.DeprecatedWorkload(), "./workload", c.Node(nodes+1))
524519

525520
t.Status("starting cluster")
526-
527521
// If the test ever fails, the person who investigates the
528522
// failure will likely be thankful for this additional logging.
529523
startOpts := option.DefaultStartOpts()
530524
startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs, "--vmodule=store=2,store_rebalancer=2")
531525
c.Start(ctx, t.L(), startOpts, install.MakeClusterSettings(), c.Range(1, nodes))
532526

533-
db := c.Conn(ctx, t.L(), 1)
534-
defer db.Close()
527+
db1 := c.Conn(ctx, t.L(), 1)
528+
defer db1.Close()
529+
db2 := c.Conn(ctx, t.L(), 2)
530+
defer db2.Close()
535531

536-
err := WaitFor3XReplication(ctx, t, db)
532+
err := WaitFor3XReplication(ctx, t, db1)
537533
require.NoError(t, err)
538534

539535
t.Status("initializing workload")
540536

541537
// Initialize the database with a lot of ranges so that there are
542538
// definitely a large number of leases on the node that we shut down
543539
// before it starts draining.
544-
splitCmd := "./workload run kv --init --max-ops=1 --splits 100 {pgurl:1}"
545-
c.Run(ctx, c.Node(nodes+1), splitCmd)
540+
c.Run(ctx, c.Node(1), "./cockroach workload init kv --splits 100")
546541

547542
m := c.NewMonitor(ctx, c.Nodes(1, nodes))
543+
m.ExpectDeath()
548544

549545
// specifiedQPS is going to be the --max-rate for the kv workload.
550-
const specifiedQPS = 1000
546+
specifiedQPS := 1000
547+
if c.IsLocal() {
548+
specifiedQPS = 100
549+
}
551550
// Because we're specifying a --max-rate well less than what cockroach
552551
// should be capable of, draining one of the three nodes should have no
553552
// effect on performance at all, meaning that a fairly aggressive
554553
// threshold here should be ok.
555-
expectedQPS := specifiedQPS * 0.9
554+
expectedQPS := float64(specifiedQPS) * .9
556555

557556
t.Status("starting workload")
558557
workloadStartTime := timeutil.Now()
559558
desiredRunDuration := 5 * time.Minute
560559
m.Go(func(ctx context.Context) error {
561560
cmd := fmt.Sprintf(
562-
"./workload run kv --duration=%s --read-percent=0 --tolerate-errors --max-rate=%d {pgurl:1-%d}",
563-
desiredRunDuration,
564-
specifiedQPS, nodes-1)
561+
"./cockroach workload run kv --duration=%s --read-percent=0 --max-rate=%d {pgurl:1-%d}",
562+
desiredRunDuration, specifiedQPS, nodes-1)
565563
t.WorkerStatus(cmd)
566564
defer func() {
567565
t.WorkerStatus("workload command completed")
@@ -570,152 +568,81 @@ func registerKVGracefulDraining(r registry.Registry) {
570568
return c.RunE(ctx, c.Node(nodes+1), cmd)
571569
})
572570

573-
m.Go(func(ctx context.Context) error {
574-
defer t.WorkerStatus()
575-
576-
t.WorkerStatus("waiting for perf to stabilize")
577-
// Before we start shutting down nodes, wait for the performance
578-
// of the workload to stabilize at the expected allowed level.
579-
580-
adminURLs, err := c.ExternalAdminUIAddr(ctx, t.L(), c.Node(1))
581-
if err != nil {
582-
return err
583-
}
584-
url := "http://" + adminURLs[0] + "/ts/query"
585-
getQPSTimeSeries := func(start, end time.Time) ([]tspb.TimeSeriesDatapoint, error) {
586-
request := tspb.TimeSeriesQueryRequest{
587-
StartNanos: start.UnixNano(),
588-
EndNanos: end.UnixNano(),
589-
// Check the performance in each timeseries sample interval.
590-
SampleNanos: base.DefaultMetricsSampleInterval.Nanoseconds(),
591-
Queries: []tspb.Query{
592-
{
593-
Name: "cr.node.sql.query.count",
594-
Downsampler: tspb.TimeSeriesQueryAggregator_AVG.Enum(),
595-
SourceAggregator: tspb.TimeSeriesQueryAggregator_SUM.Enum(),
596-
Derivative: tspb.TimeSeriesQueryDerivative_NON_NEGATIVE_DERIVATIVE.Enum(),
597-
},
598-
},
599-
}
600-
var response tspb.TimeSeriesQueryResponse
601-
if err := httputil.PostJSON(http.Client{}, url, &request, &response); err != nil {
602-
return nil, err
603-
}
604-
if len(response.Results[0].Datapoints) <= 1 {
605-
return nil, errors.Newf("not enough datapoints in timeseries query response: %+v", response)
606-
}
607-
return response.Results[0].Datapoints, nil
571+
verifyQPS := func(ctx context.Context) error {
572+
if qps := measureQPS(ctx, t, time.Second, db1, db2); qps < expectedQPS {
573+
return errors.Newf(
574+
"QPS of %.2f at time %v is below minimum allowable QPS of %.2f",
575+
qps, timeutil.Now(), expectedQPS)
608576
}
577+
return nil
578+
}
609579

610-
waitBegin := timeutil.Now()
611-
// Nb: we could want to use testutil.SucceedSoonError() here,
612-
// however that has a hardcoded timeout of 45 seconds, and
613-
// empirically we see this loop needs ~40 seconds to get enough
614-
// samples to succeed. This would be too close to call, so
615-
// we're using our own timeout instead.
616-
if err := retry.ForDuration(1*time.Minute, func() (err error) {
617-
defer func() {
618-
if timeutil.Since(waitBegin) > 3*time.Second && err != nil {
619-
t.Status(fmt.Sprintf("perf not stable yet: %v", err))
620-
}
621-
}()
622-
now := timeutil.Now()
623-
datapoints, err := getQPSTimeSeries(workloadStartTime, now)
624-
if err != nil {
625-
return err
626-
}
580+
t.Status("waiting for perf to stabilize")
581+
testutils.SucceedsSoon(t, func() error { return verifyQPS(ctx) })
627582

628-
// Examine the last data point. As the retry.ForDuration loop
629-
// iterates, this will only consider the last 10 seconds of
630-
// measurement.
631-
dp := datapoints[len(datapoints)-1]
632-
if qps := dp.Value; qps < expectedQPS {
633-
return errors.Newf(
634-
"QPS of %.2f at time %v is below minimum allowable QPS of %.2f; entire timeseries: %+v",
635-
qps, timeutil.Unix(0, dp.TimestampNanos), expectedQPS, datapoints)
583+
// Begin the monitoring goroutine to track QPS every second.
584+
m.Go(func(ctx context.Context) error {
585+
t.Status("starting watcher to verify QPS during the test")
586+
defer t.WorkerStatus()
587+
for {
588+
// Measure QPS every second throughout the test. verifyQPS takes time
589+
// to run so we don't sleep between invocations.
590+
require.NoError(t, verifyQPS(ctx))
591+
// Stop measuring 10 seconds before we stop the workload.
592+
if timeutil.Since(workloadStartTime) > desiredRunDuration-10*time.Second {
593+
return nil
636594
}
637-
638-
// The desired performance has been reached by the
639-
// workload. We're ready to start exercising shutdowns.
640-
return nil
641-
}); err != nil {
642-
t.Fatal(err)
643595
}
644-
t.Status("detected stable perf before restarts: OK")
645-
646-
// The time at which we know the performance has become stable already.
647-
stablePerfStartTime := timeutil.Now()
648-
649-
t.WorkerStatus("gracefully draining and restarting nodes")
650-
// Gracefully shut down the third node, let the cluster run for a
651-
// while, then restart it. Then repeat for good measure.
652-
for i := 0; i < 2; i++ {
653-
if i > 0 {
654-
// No need to wait extra during the first iteration: we
655-
// have already waited for the perf to become stable
656-
// above.
657-
t.Status("letting workload run with all nodes")
658-
select {
659-
case <-ctx.Done():
660-
return nil
661-
case <-time.After(1 * time.Minute):
662-
}
663-
}
664-
m.ExpectDeath()
665-
// Graceful drain: send SIGTERM, which should be sufficient
666-
// to stop the node, followed by a non-graceful SIGKILL a
667-
// bit later to clean up should the process have become
668-
// stuck.
669-
stopOpts := option.DefaultStopOpts()
670-
stopOpts.RoachprodOpts.Sig = 15
671-
stopOpts.RoachprodOpts.Wait = true
672-
stopOpts.RoachprodOpts.MaxWait = 30
673-
c.Stop(ctx, t.L(), stopOpts, c.Node(nodes))
674-
c.Stop(ctx, t.L(), option.DefaultStopOpts(), c.Node(nodes))
675-
t.Status("letting workload run with one node down")
596+
})
597+
598+
t.Status("gracefully draining and restarting nodes")
599+
// Gracefully shut down the third node, let the cluster run for a
600+
// while, then restart it. Then repeat for good measure.
601+
for i := 0; i < 2; i++ {
602+
if i > 0 {
603+
// No need to wait extra during the first iteration: we
604+
// have already waited for the perf to become stable
605+
// above.
606+
t.Status("letting workload run with all nodes")
676607
select {
677608
case <-ctx.Done():
678-
return nil
609+
return
679610
case <-time.After(1 * time.Minute):
680611
}
681-
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), c.Node(nodes))
682-
m.ResetDeaths()
683612
}
684-
685-
// Let the test run for nearly the entire duration of the kv command.
686-
// The key is that we want the workload command to still be running when
687-
// we look at the performance below. Given that the workload was set
688-
// to run for 5 minutes, we should be fine here, however we want to guarantee
689-
// there's at least 10s left to go. Check this.
690-
t.WorkerStatus("checking workload is still running")
691-
runDuration := timeutil.Since(workloadStartTime)
692-
if runDuration > desiredRunDuration-10*time.Second {
693-
t.Fatalf("not enough workload time left to reliably determine performance (%s left)",
694-
desiredRunDuration-runDuration)
695-
}
696-
697-
t.WorkerStatus("checking for perf throughout the test")
698-
699-
// Check that the QPS has been at the expected max rate for the entire
700-
// test duration, even as one of the nodes was being stopped and started.
701-
endTestTime := timeutil.Now()
702-
datapoints, err := getQPSTimeSeries(stablePerfStartTime, endTestTime)
703-
if err != nil {
704-
t.Fatal(err)
613+
// Graceful drain: send SIGTERM, which should be sufficient
614+
// to stop the node, followed by a non-graceful SIGKILL a
615+
// bit later to clean up should the process have become
616+
// stuck.
617+
stopOpts := option.DefaultStopOpts()
618+
stopOpts.RoachprodOpts.Sig = 15
619+
stopOpts.RoachprodOpts.Wait = true
620+
stopOpts.RoachprodOpts.MaxWait = 30
621+
c.Stop(ctx, t.L(), stopOpts, c.Node(nodes))
622+
c.Stop(ctx, t.L(), option.DefaultStopOpts(), c.Node(nodes))
623+
t.Status("letting workload run with one node down")
624+
select {
625+
case <-ctx.Done():
626+
return
627+
case <-time.After(1 * time.Minute):
705628
}
629+
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), c.Node(nodes))
630+
m.ResetDeaths()
631+
}
706632

707-
for _, dp := range datapoints {
708-
if qps := dp.Value; qps < expectedQPS {
709-
t.Fatalf(
710-
"QPS of %.2f at time %v is below minimum allowable QPS of %.2f; entire timeseries: %+v",
711-
qps, timeutil.Unix(0, dp.TimestampNanos), expectedQPS, datapoints)
712-
}
713-
}
714-
t.Status("perf is OK!")
715-
t.WorkerStatus("waiting for workload to complete")
716-
return nil
717-
})
633+
// Let the test run for nearly the entire duration of the kv command.
634+
// The key is that we want the workload command to still be running when
635+
// we look at the performance below. Given that the workload was set
636+
// to run for 5 minutes, we should be fine here, however we want to guarantee
637+
// there's at least 10s left to go. Check this.
638+
t.Status("checking workload is still running")
639+
runDuration := timeutil.Since(workloadStartTime)
640+
if runDuration > desiredRunDuration-10*time.Second {
641+
t.Fatalf("not enough workload time left to reliably determine performance (%s left)",
642+
desiredRunDuration-runDuration)
643+
}
718644

645+
t.Status("waiting for workload to complete")
719646
m.Wait()
720647
},
721648
})

0 commit comments

Comments
 (0)