@@ -1731,6 +1731,120 @@ func runMessageTooLarge(ctx context.Context, t test.Test, c cluster.Cluster) {
1731
1731
require .Regexp (t , `mvcc=[\d\.]+,\d+` , logStr , "log should include mvcc" )
1732
1732
}
1733
1733
1734
+ type multiTablePTSBenchmarkParams struct {
1735
+ numTables int
1736
+ numRows int
1737
+ duration string
1738
+ }
1739
+
1740
+ // runCDCMultiTablePTSBenchmark is a benchmark for changefeeds with multiple tables,
1741
+ // focusing on the performance of the PTS system. It starts a bank workload on every
1742
+ // table it creates and then runs a single changefeed that targets all of these bank tables.
1743
+ // Each of those workloads (there will be one per table) will run for the duration specified
1744
+ // in the params and have the number of rows specified in the params.
1745
+ func runCDCMultiTablePTSBenchmark (
1746
+ ctx context.Context , t test.Test , c cluster.Cluster , params multiTablePTSBenchmarkParams ,
1747
+ ) {
1748
+ ct := newCDCTester (ctx , t , c )
1749
+ defer ct .Close ()
1750
+
1751
+ startOpts := option .DefaultStartOpts ()
1752
+ startOpts .RoachprodOpts .ExtraArgs = append (startOpts .RoachprodOpts .ExtraArgs ,
1753
+ "--vmodule=changefeed=2,changefeed_processors=2,protected_timestamps=2" ,
1754
+ )
1755
+
1756
+ db := ct .DB ()
1757
+ if err := configureDBForMultiTablePTSBenchmark (db ); err != nil {
1758
+ t .Fatalf ("failed to set cluster settings: %v" , err )
1759
+ }
1760
+
1761
+ initCmd := fmt .Sprintf ("./cockroach workload init bank --rows=%d --num-tables=%d {pgurl%s}" ,
1762
+ params .numRows , params .numTables , ct .crdbNodes .RandNode ())
1763
+ if err := c .RunE (ctx , option .WithNodes (ct .workloadNode ), initCmd ); err != nil {
1764
+ t .Fatalf ("failed to initialize bank tables: %v" , err )
1765
+ }
1766
+
1767
+ ct .workloadWg .Add (1 )
1768
+ ct .mon .Go (func (ctx context.Context ) error {
1769
+ defer ct .workloadWg .Done ()
1770
+ workloadCmd := fmt .Sprintf ("./cockroach workload run bank --rows=%d --duration=%s --num-tables=%d {pgurl%s}" ,
1771
+ params .numRows , params .duration , params .numTables , ct .crdbNodes )
1772
+ return c .RunE (ctx , option .WithNodes (ct .workloadNode ), workloadCmd )
1773
+ })
1774
+
1775
+ // We generate and run the changefeed, which requires rangefeeds to be enabled.
1776
+ if _ , err := db .Exec ("SET CLUSTER SETTING kv.rangefeed.enabled = true" ); err != nil {
1777
+ t .Fatalf ("failed to enable rangefeeds: %v" , err )
1778
+ }
1779
+
1780
+ targetNames := make ([]string , 0 , params .numTables )
1781
+ for i := range params .numTables {
1782
+ targetNames = append (targetNames , fmt .Sprintf ("bank.bank_%d" , i ))
1783
+ }
1784
+
1785
+ feed := ct .newChangefeed (feedArgs {
1786
+ sinkType : nullSink ,
1787
+ targets : targetNames ,
1788
+ opts : map [string ]string {
1789
+ "format" : "'json'" ,
1790
+ "resolved" : "'1s'" ,
1791
+ "full_table_name" : "" ,
1792
+ "min_checkpoint_frequency" : "'1s'" ,
1793
+ "initial_scan" : "'no'" ,
1794
+ },
1795
+ })
1796
+
1797
+ t .Status ("multi-table PTS benchmark running with jobId " , feed .jobID )
1798
+
1799
+ ct .waitForWorkload ()
1800
+
1801
+ t .Status ("workload finished, verifying metrics" )
1802
+
1803
+ // These metrics are in nanoseconds, so we are asserting that both
1804
+ // of these latency metrics are less than 10 milliseconds.
1805
+ ct .verifyMetrics (ctx , verifyMetricsUnderThreshold ([]string {
1806
+ "changefeed_stage_pts_manage_latency" ,
1807
+ "changefeed_stage_pts_create_latency" ,
1808
+ }, float64 (10 * time .Millisecond )))
1809
+
1810
+ t .Status ("multi-table PTS benchmark finished" )
1811
+ }
1812
+
1813
+ func configureDBForMultiTablePTSBenchmark (db * gosql.DB ) error {
1814
+ // This is used to trigger frequent garbage collection and
1815
+ // protected timestamp updates.
1816
+ if _ , err := db .Exec ("ALTER DATABASE defaultdb CONFIGURE ZONE USING gc.ttlseconds = 1" ); err != nil {
1817
+ return err
1818
+ }
1819
+ if _ , err := db .Exec ("SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'" ); err != nil {
1820
+ return err
1821
+ }
1822
+ if _ , err := db .Exec ("SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms'" ); err != nil {
1823
+ return err
1824
+ }
1825
+
1826
+ // This is used to trigger frequent protected timestamp updates.
1827
+ if _ , err := db .Exec ("SET CLUSTER SETTING changefeed.protect_timestamp_interval = '10ms'" ); err != nil {
1828
+ return err
1829
+ }
1830
+ if _ , err := db .Exec ("SET CLUSTER SETTING changefeed.protect_timestamp.lag = '1ms'" ); err != nil {
1831
+ return err
1832
+ }
1833
+
1834
+ // These settings are used to trigger frequent checkpoints since protected timestamp
1835
+ // management happens on checkpointing.
1836
+ if _ , err := db .Exec ("SET CLUSTER SETTING changefeed.span_checkpoint.interval = '1s'" ); err != nil {
1837
+ return err
1838
+ }
1839
+ if _ , err := db .Exec ("SET CLUSTER SETTING changefeed.frontier_highwater_lag_checkpoint_threshold = '100ms'" ); err != nil {
1840
+ return err
1841
+ }
1842
+ if _ , err := db .Exec ("SET CLUSTER SETTING changefeed.frontier_checkpoint_frequency = '1s'" ); err != nil {
1843
+ return err
1844
+ }
1845
+ return nil
1846
+ }
1847
+
1734
1848
func registerCDC (r registry.Registry ) {
1735
1849
r .Add (registry.TestSpec {
1736
1850
Name : "cdc/initial-scan-only" ,
@@ -2719,6 +2833,23 @@ func registerCDC(r registry.Registry) {
2719
2833
CompatibleClouds : registry .AllExceptIBM ,
2720
2834
Run : runMessageTooLarge ,
2721
2835
})
2836
+ r .Add (registry.TestSpec {
2837
+ Name : "cdc/multi-table-pts-benchmark" ,
2838
+ Owner : registry .OwnerCDC ,
2839
+ Benchmark : true ,
2840
+ Cluster : r .MakeClusterSpec (4 , spec .CPU (16 ), spec .WorkloadNode ()),
2841
+ CompatibleClouds : registry .AllClouds ,
2842
+ Suites : registry .Suites (registry .Nightly ),
2843
+ Timeout : 1 * time .Hour ,
2844
+ Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
2845
+ params := multiTablePTSBenchmarkParams {
2846
+ numTables : 500 ,
2847
+ numRows : 10_000 ,
2848
+ duration : "20m" ,
2849
+ }
2850
+ runCDCMultiTablePTSBenchmark (ctx , t , c , params )
2851
+ },
2852
+ })
2722
2853
}
2723
2854
2724
2855
const (
@@ -4403,3 +4534,38 @@ func verifyMetricsNonZero(names ...string) func(metrics map[string]*prompb.Metri
4403
4534
return false
4404
4535
}
4405
4536
}
4537
+
4538
+ func verifyMetricsUnderThreshold (
4539
+ names []string , threshold float64 ,
4540
+ ) func (metrics map [string ]* prompb.MetricFamily ) (ok bool ) {
4541
+ namesMap := make (map [string ]struct {}, len (names ))
4542
+ for _ , name := range names {
4543
+ namesMap [name ] = struct {}{}
4544
+ }
4545
+
4546
+ return func (metrics map [string ]* prompb.MetricFamily ) (ok bool ) {
4547
+ found := map [string ]struct {}{}
4548
+
4549
+ for name , fam := range metrics {
4550
+ if _ , ok := namesMap [name ]; ! ok {
4551
+ continue
4552
+ }
4553
+
4554
+ for _ , m := range fam .Metric {
4555
+ if m .Histogram .GetSampleCount () == 0 {
4556
+ continue
4557
+ }
4558
+
4559
+ observedValue := m .Histogram .GetSampleSum () / float64 (m .Histogram .GetSampleCount ())
4560
+ if observedValue < threshold {
4561
+ found [name ] = struct {}{}
4562
+ }
4563
+ }
4564
+
4565
+ if len (found ) == len (names ) {
4566
+ return true
4567
+ }
4568
+ }
4569
+ return false
4570
+ }
4571
+ }
0 commit comments