@@ -17,6 +17,8 @@ import (
1717 "encoding/json"
1818 "fmt"
1919 "path/filepath"
20+ "strconv"
21+ "sync/atomic"
2022 "time"
2123
2224 "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
@@ -26,6 +28,7 @@ import (
2628 "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
2729 "github.com/cockroachdb/cockroach/pkg/jobs"
2830 "github.com/cockroachdb/cockroach/pkg/roachprod/install"
31+ "github.com/cockroachdb/cockroach/pkg/util/randutil"
2932 "github.com/cockroachdb/cockroach/pkg/util/timeutil"
3033 "github.com/cockroachdb/cockroach/pkg/workload/histogram"
3134 "github.com/cockroachdb/errors"
@@ -34,6 +37,7 @@ import (
3437)
3538
3639type cdcBenchScanType string
40+ type cdcBenchServer string
3741type cdcBenchProtocol string
3842
3943const (
@@ -56,6 +60,10 @@ const (
5660 // practice it can.
5761 cdcBenchColdCatchupScan cdcBenchScanType = "catchup-cold"
5862
63+ cdcBenchNoServer cdcBenchServer = ""
64+ cdcBenchProcessorServer cdcBenchServer = "processor" // legacy processor
65+ cdcBenchSchedulerServer cdcBenchServer = "scheduler" // new scheduler
66+
5967 cdcBenchNoProtocol cdcBenchProtocol = ""
6068 cdcBenchRangefeedProtocol cdcBenchProtocol = "rangefeed" // basic rangefeed protocol
6169 cdcBenchMuxProtocol cdcBenchProtocol = "mux" // multiplexing rangefeed protocol
@@ -64,6 +72,7 @@ const (
6472var (
6573 cdcBenchScanTypes = []cdcBenchScanType {
6674 cdcBenchInitialScan , cdcBenchCatchupScan , cdcBenchColdCatchupScan }
75+ cdcBenchServers = []cdcBenchServer {cdcBenchProcessorServer } // TODO(erikgrinaker): scheduler
6776 cdcBenchProtocols = []cdcBenchProtocol {cdcBenchRangefeedProtocol , cdcBenchMuxProtocol }
6877)
6978
@@ -99,6 +108,53 @@ func registerCDCBench(r registry.Registry) {
99108 }
100109 }
101110 }
111+
112+ // Workload impact benchmarks.
113+ for _ , readPercent := range []int {0 , 100 } {
114+ for _ , ranges := range []int64 {100 , 100000 } {
115+ readPercent , ranges := readPercent , ranges // pin loop variables
116+ const (
117+ nodes = 5 // excluding coordinator and workload nodes
118+ cpus = 16
119+ format = "json"
120+ )
121+
122+ // Control run that only runs the workload, with no changefeed.
123+ r .Add (registry.TestSpec {
124+ Name : fmt .Sprintf (
125+ "cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%d/control" ,
126+ readPercent , nodes , cpus , ranges ),
127+ Owner : registry .OwnerCDC ,
128+ Benchmark : true ,
129+ Cluster : r .MakeClusterSpec (nodes + 2 , spec .CPU (cpus )),
130+ RequiresLicense : true ,
131+ Timeout : time .Hour ,
132+ Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
133+ runCDCBenchWorkload (ctx , t , c , ranges , readPercent , "" , "" , "" )
134+ },
135+ })
136+
137+ // Workloads with a concurrent changefeed running.
138+ for _ , server := range cdcBenchServers {
139+ for _ , protocol := range cdcBenchProtocols {
140+ server , protocol := server , protocol // pin loop variables
141+ r .Add (registry.TestSpec {
142+ Name : fmt .Sprintf (
143+ "cdc/workload/kv%d/nodes=%d/cpu=%d/ranges=%d/server=%s/protocol=%s/format=%s/sink=null" ,
144+ readPercent , nodes , cpus , ranges , server , protocol , format ),
145+ Owner : registry .OwnerCDC ,
146+ Benchmark : true ,
147+ Cluster : r .MakeClusterSpec (nodes + 2 , spec .CPU (cpus )),
148+ RequiresLicense : true ,
149+ Timeout : time .Hour ,
150+ Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
151+ runCDCBenchWorkload (ctx , t , c , ranges , readPercent , server , protocol , format )
152+ },
153+ })
154+ }
155+ }
156+ }
157+ }
102158}
103159
104160func formatSI (num int64 ) string {
@@ -263,6 +319,209 @@ func runCDCBenchScan(
263319 m .Wait ()
264320}
265321
322+ // runCDCBenchWorkload runs a KV workload on top of a changefeed, measuring the
323+ // workload throughput and latency. Rangefeeds are configured to backpressure
324+ // writers, which yields reliable results for the full write+emission cost.
325+ // The workload results (throughput and latency) can be compared to separate
326+ // control runs that only run the workload without changefeeds and rangefeeds.
327+ //
328+ // It sets up a cluster with N-2 data nodes, and a separate changefeed
329+ // coordinator node and workload runner.
330+ func runCDCBenchWorkload (
331+ ctx context.Context ,
332+ t test.Test ,
333+ c cluster.Cluster ,
334+ numRanges int64 ,
335+ readPercent int ,
336+ server cdcBenchServer ,
337+ protocol cdcBenchProtocol ,
338+ format string ,
339+ ) {
340+ const sink = "null://"
341+ var (
342+ numNodes = c .Spec ().NodeCount
343+ nData = c .Range (1 , numNodes - 2 )
344+ nCoord = c .Node (numNodes - 1 )
345+ nWorkload = c .Node (numNodes )
346+
347+ workloadSeed = randutil .NewPseudoSeed ()
348+ concurrency = len (nData ) * 64
349+ duration = 20 * time .Minute
350+ insertCount = int64 (0 )
351+ cdcEnabled = true
352+ )
353+ if readPercent == 100 {
354+ insertCount = 1_000_000 // ingest some data to read
355+ }
356+ // Either of these will disable changefeeds. Make sure they're all disabled.
357+ if server == "" || protocol == "" || format == "" {
358+ require .Empty (t , server )
359+ require .Empty (t , protocol )
360+ require .Empty (t , format )
361+ cdcEnabled = false
362+ }
363+
364+ // Start data nodes first to place data on them. We'll start the changefeed
365+ // coordinator later, since we don't want any data on it.
366+ opts , settings := makeCDCBenchOptions ()
367+ settings .ClusterSettings ["kv.rangefeed.enabled" ] = strconv .FormatBool (cdcEnabled )
368+
369+ switch protocol {
370+ case cdcBenchMuxProtocol :
371+ settings .ClusterSettings ["changefeed.mux_rangefeed.enabled" ] = "true"
372+ case cdcBenchRangefeedProtocol :
373+ settings .ClusterSettings ["changefeed.mux_rangefeed.enabled" ] = "false"
374+ case cdcBenchNoProtocol :
375+ default :
376+ t .Fatalf ("unknown protocol %q" , protocol )
377+ }
378+
379+ c .Put (ctx , t .Cockroach (), "./cockroach" )
380+ c .Start (ctx , t .L (), opts , settings , nData )
381+ m := c .NewMonitor (ctx , nData .Merge (nCoord ))
382+
383+ conn := c .Conn (ctx , t .L (), nData [0 ])
384+ defer conn .Close ()
385+
386+ // Prohibit ranges on the changefeed coordinator.
387+ t .L ().Printf ("configuring zones" )
388+ for _ , target := range getAllZoneTargets (ctx , t , conn ) {
389+ _ , err := conn .ExecContext (ctx , fmt .Sprintf (
390+ `ALTER %s CONFIGURE ZONE USING num_replicas=3, constraints='[-node%d]'` , target , nCoord [0 ]))
391+ require .NoError (t , err )
392+ }
393+
394+ // Wait for system ranges to upreplicate.
395+ require .NoError (t , WaitFor3XReplication (ctx , t , conn ))
396+
397+ // Create and split the workload table.
398+ //
399+ // NB: don't scatter -- the ranges end up fairly well-distributed anyway, and
400+ // the scatter can often fail with 100k ranges.
401+ t .L ().Printf ("creating table with %s ranges" , humanize .Comma (numRanges ))
402+ c .Run (ctx , nWorkload , fmt .Sprintf (
403+ `./cockroach workload init kv --splits %d {pgurl:%d}` , numRanges , nData [0 ]))
404+ require .NoError (t , WaitFor3XReplication (ctx , t , conn ))
405+
406+ // For read-only workloads, ingest some data. init --insert-count does not use
407+ // the standard key generator that the read workload uses, so we have to write
408+ // them with a separate write workload first, see:
409+ // https://github.com/cockroachdb/cockroach/issues/107874
410+ if insertCount > 0 {
411+ const batchSize = 1000
412+ batches := (insertCount - 1 )/ batchSize + 1 // ceiling division
413+ t .L ().Printf ("ingesting %s rows" , humanize .Comma (insertCount ))
414+ c .Run (ctx , nWorkload , fmt .Sprintf (
415+ `./cockroach workload run kv --seed %d --read-percent 0 --batch %d --max-ops %d {pgurl:%d}` ,
416+ workloadSeed , batchSize , batches , nData [0 ]))
417+ }
418+
419+ // Now that the ranges are placed, start the changefeed coordinator.
420+ t .L ().Printf ("starting coordinator node" )
421+ c .Start (ctx , t .L (), opts , settings , nCoord )
422+
423+ conn = c .Conn (ctx , t .L (), nCoord [0 ])
424+ defer conn .Close ()
425+
426+ // Start the changefeed if enabled. We disable the initial scan, since we
427+ // don't care about the historical data.
428+ var jobID int
429+ var done atomic.Value // time.Time
430+ if cdcEnabled {
431+ t .L ().Printf ("starting changefeed" )
432+ require .NoError (t , conn .QueryRowContext (ctx , fmt .Sprintf (
433+ `CREATE CHANGEFEED FOR kv.kv INTO '%s' WITH format = '%s', initial_scan = 'no'` ,
434+ sink , format )).
435+ Scan (& jobID ))
436+
437+ // Monitor the changefeed for failures. When the workload finishes, it will
438+ // store the completion timestamp in done, and we'll wait for the
439+ // changefeed's watermark to reach it.
440+ //
441+ // The watermark and lag isn't recorded by the benchmark, but we make sure
442+ // all data is eventually emitted. It is also helpful for inspection, and we
443+ // may want to track or assert on it later. Initially, this asserted that
444+ // the changefeed wasn't lagging by more than 1-2 minutes, but with 100k
445+ // ranges it was found to sometimes lag by over 8 minutes.
446+ m .Go (func (ctx context.Context ) error {
447+ info , err := waitForChangefeed (ctx , conn , jobID , func (info changefeedInfo ) (bool , error ) {
448+ switch jobs .Status (info .status ) {
449+ case jobs .StatusPending , jobs .StatusRunning :
450+ doneValue := done .Load ()
451+ return doneValue != nil && info .highwaterTime .After (doneValue .(time.Time )), nil
452+ default :
453+ return false , errors .Errorf ("unexpected changefeed status %s" , info .status )
454+ }
455+ })
456+ if err != nil {
457+ return err
458+ }
459+ t .L ().Printf ("changefeed watermark is %s" , info .highwaterTime .Format (time .RFC3339 ))
460+ return nil
461+ })
462+
463+ // Wait for a stable changefeed before starting the workload, by waiting for
464+ // the watermark to reach the current time.
465+ now := timeutil .Now ()
466+ t .L ().Printf ("waiting for changefeed watermark to reach current time (%s)" ,
467+ now .Format (time .RFC3339 ))
468+ info , err := waitForChangefeed (ctx , conn , jobID , func (info changefeedInfo ) (bool , error ) {
469+ switch jobs .Status (info .status ) {
470+ case jobs .StatusPending , jobs .StatusRunning :
471+ return info .highwaterTime .After (now ), nil
472+ default :
473+ return false , errors .Errorf ("unexpected changefeed status %s" , info .status )
474+ }
475+ })
476+ require .NoError (t , err )
477+ t .L ().Printf ("changefeed watermark is %s" , info .highwaterTime .Format (time .RFC3339 ))
478+
479+ } else {
480+ t .L ().Printf ("control run, not starting changefeed" )
481+ }
482+
483+ // Run the workload and record stats. Make sure to use the same seed, so we
484+ // read any rows we wrote above.
485+ m .Go (func (ctx context.Context ) error {
486+ // If there's more than 10,000 replicas per node they may struggle to
487+ // maintain RPC connections or liveness, which occasionally fails client
488+ // write requests with ambiguous errors. We tolerate errors in this case
489+ // until we optimize rangefeeds.
490+ //
491+ // TODO(erikgrinaker): remove this when benchmarks are stable.
492+ var extra string
493+ if readPercent < 100 && (numRanges / int64 (len (nData ))) >= 10000 {
494+ extra += ` --tolerate-errors`
495+ }
496+ t .L ().Printf ("running workload" )
497+ err := c .RunE (ctx , nWorkload , fmt .Sprintf (
498+ `./cockroach workload run kv --seed %d --histograms=%s/stats.json ` +
499+ `--concurrency %d --duration %s --write-seq R%d --read-percent %d %s {pgurl:%d-%d}` ,
500+ workloadSeed , t .PerfArtifactsDir (), concurrency , duration , insertCount , readPercent , extra ,
501+ nData [0 ], nData [len (nData )- 1 ]))
502+ if err != nil {
503+ return err
504+ }
505+ t .L ().Printf ("workload completed" )
506+
507+ // When the workload completes, signal the completion time to the changefeed
508+ // monitor via done, which will wait for it to fully catch up.
509+ if cdcEnabled {
510+ now := timeutil .Now ()
511+ done .Store (now )
512+ info , err := getChangefeedInfo (conn , jobID )
513+ if err != nil {
514+ return err
515+ }
516+ t .L ().Printf ("waiting for changefeed watermark to reach %s (lagging by %s)" ,
517+ now .Format (time .RFC3339 ), now .Sub (info .highwaterTime ).Truncate (time .Second ))
518+ }
519+ return nil
520+ })
521+
522+ m .Wait ()
523+ }
524+
266525// getAllZoneTargets returns all zone targets (e.g. "RANGE default", "DATABASE
267526// system", etc).
268527func getAllZoneTargets (ctx context.Context , t test.Test , conn * gosql.DB ) []string {
0 commit comments