@@ -1785,6 +1785,120 @@ func runMessageTooLarge(ctx context.Context, t test.Test, c cluster.Cluster) {
1785
1785
require .Regexp (t , `mvcc=[\d\.]+,\d+` , logStr , "log should include mvcc" )
1786
1786
}
1787
1787
1788
+ type multiTablePTSBenchmarkParams struct {
1789
+ numTables int
1790
+ numRows int
1791
+ duration string
1792
+ }
1793
+
1794
+ // runCDCMultiTablePTSBenchmark is a benchmark for changefeeds with multiple tables,
1795
+ // focusing on the performance of the PTS system. It starts a bank workload on every
1796
+ // table it creates and then runs a single changefeed that targets all of these bank tables.
1797
+ // Each of those workloads (there will be one per table) will run for the duration specified
1798
+ // in the params and have the number of rows specified in the params.
1799
+ func runCDCMultiTablePTSBenchmark (
1800
+ ctx context.Context , t test.Test , c cluster.Cluster , params multiTablePTSBenchmarkParams ,
1801
+ ) {
1802
+ ct := newCDCTester (ctx , t , c )
1803
+ defer ct .Close ()
1804
+
1805
+ startOpts := option .DefaultStartOpts ()
1806
+ startOpts .RoachprodOpts .ExtraArgs = append (startOpts .RoachprodOpts .ExtraArgs ,
1807
+ "--vmodule=changefeed=2,changefeed_processors=2,protected_timestamps=2" ,
1808
+ )
1809
+
1810
+ db := ct .DB ()
1811
+ if err := configureDBForMultiTablePTSBenchmark (db ); err != nil {
1812
+ t .Fatalf ("failed to set cluster settings: %v" , err )
1813
+ }
1814
+
1815
+ initCmd := fmt .Sprintf ("./cockroach workload init bank --rows=%d --num-tables=%d {pgurl%s}" ,
1816
+ params .numRows , params .numTables , ct .crdbNodes .RandNode ())
1817
+ if err := c .RunE (ctx , option .WithNodes (ct .workloadNode ), initCmd ); err != nil {
1818
+ t .Fatalf ("failed to initialize bank tables: %v" , err )
1819
+ }
1820
+
1821
+ ct .workloadWg .Add (1 )
1822
+ ct .mon .Go (func (ctx context.Context ) error {
1823
+ defer ct .workloadWg .Done ()
1824
+ workloadCmd := fmt .Sprintf ("./cockroach workload run bank --rows=%d --duration=%s --num-tables=%d {pgurl%s}" ,
1825
+ params .numRows , params .duration , params .numTables , ct .crdbNodes )
1826
+ return c .RunE (ctx , option .WithNodes (ct .workloadNode ), workloadCmd )
1827
+ })
1828
+
1829
+ // We generate and run the changefeed, which requires rangefeeds to be enabled.
1830
+ if _ , err := db .Exec ("SET CLUSTER SETTING kv.rangefeed.enabled = true" ); err != nil {
1831
+ t .Fatalf ("failed to enable rangefeeds: %v" , err )
1832
+ }
1833
+
1834
+ targetNames := make ([]string , 0 , params .numTables )
1835
+ for i := range params .numTables {
1836
+ targetNames = append (targetNames , fmt .Sprintf ("bank.bank_%d" , i ))
1837
+ }
1838
+
1839
+ feed := ct .newChangefeed (feedArgs {
1840
+ sinkType : nullSink ,
1841
+ targets : targetNames ,
1842
+ opts : map [string ]string {
1843
+ "format" : "'json'" ,
1844
+ "resolved" : "'1s'" ,
1845
+ "full_table_name" : "" ,
1846
+ "min_checkpoint_frequency" : "'1s'" ,
1847
+ "initial_scan" : "'no'" ,
1848
+ },
1849
+ })
1850
+
1851
+ t .Status ("multi-table PTS benchmark running with jobId " , feed .jobID )
1852
+
1853
+ ct .waitForWorkload ()
1854
+
1855
+ t .Status ("workload finished, verifying metrics" )
1856
+
1857
+ // These metrics are in nanoseconds, so we are asserting that both
1858
+ // of these latency metrics are less than 10 milliseconds.
1859
+ ct .verifyMetrics (ctx , verifyMetricsUnderThreshold ([]string {
1860
+ "changefeed_stage_pts_manage_latency" ,
1861
+ "changefeed_stage_pts_create_latency" ,
1862
+ }, float64 (10 * time .Millisecond )))
1863
+
1864
+ t .Status ("multi-table PTS benchmark finished" )
1865
+ }
1866
+
1867
+ func configureDBForMultiTablePTSBenchmark (db * gosql.DB ) error {
1868
+ // This is used to trigger frequent garbage collection and
1869
+ // protected timestamp updates.
1870
+ if _ , err := db .Exec ("ALTER DATABASE defaultdb CONFIGURE ZONE USING gc.ttlseconds = 1" ); err != nil {
1871
+ return err
1872
+ }
1873
+ if _ , err := db .Exec ("SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'" ); err != nil {
1874
+ return err
1875
+ }
1876
+ if _ , err := db .Exec ("SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms'" ); err != nil {
1877
+ return err
1878
+ }
1879
+
1880
+ // This is used to trigger frequent protected timestamp updates.
1881
+ if _ , err := db .Exec ("SET CLUSTER SETTING changefeed.protect_timestamp_interval = '10ms'" ); err != nil {
1882
+ return err
1883
+ }
1884
+ if _ , err := db .Exec ("SET CLUSTER SETTING changefeed.protect_timestamp.lag = '1ms'" ); err != nil {
1885
+ return err
1886
+ }
1887
+
1888
+ // These settings are used to trigger frequent checkpoints since protected timestamp
1889
+ // management happens on checkpointing.
1890
+ if _ , err := db .Exec ("SET CLUSTER SETTING changefeed.span_checkpoint.interval = '1s'" ); err != nil {
1891
+ return err
1892
+ }
1893
+ if _ , err := db .Exec ("SET CLUSTER SETTING changefeed.frontier_highwater_lag_checkpoint_threshold = '100ms'" ); err != nil {
1894
+ return err
1895
+ }
1896
+ if _ , err := db .Exec ("SET CLUSTER SETTING changefeed.frontier_checkpoint_frequency = '1s'" ); err != nil {
1897
+ return err
1898
+ }
1899
+ return nil
1900
+ }
1901
+
1788
1902
func registerCDC (r registry.Registry ) {
1789
1903
r .Add (registry.TestSpec {
1790
1904
Name : "cdc/initial-scan-only" ,
@@ -2782,6 +2896,23 @@ func registerCDC(r registry.Registry) {
2782
2896
CompatibleClouds : registry .AllExceptIBM ,
2783
2897
Run : runMessageTooLarge ,
2784
2898
})
2899
+ r .Add (registry.TestSpec {
2900
+ Name : "cdc/multi-table-pts-benchmark" ,
2901
+ Owner : registry .OwnerCDC ,
2902
+ Benchmark : true ,
2903
+ Cluster : r .MakeClusterSpec (4 , spec .CPU (16 ), spec .WorkloadNode ()),
2904
+ CompatibleClouds : registry .AllClouds ,
2905
+ Suites : registry .Suites (registry .Nightly ),
2906
+ Timeout : 1 * time .Hour ,
2907
+ Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
2908
+ params := multiTablePTSBenchmarkParams {
2909
+ numTables : 500 ,
2910
+ numRows : 10_000 ,
2911
+ duration : "20m" ,
2912
+ }
2913
+ runCDCMultiTablePTSBenchmark (ctx , t , c , params )
2914
+ },
2915
+ })
2785
2916
}
2786
2917
2787
2918
const (
@@ -4466,3 +4597,38 @@ func verifyMetricsNonZero(names ...string) func(metrics map[string]*prompb.Metri
4466
4597
return false
4467
4598
}
4468
4599
}
4600
+
4601
+ func verifyMetricsUnderThreshold (
4602
+ names []string , threshold float64 ,
4603
+ ) func (metrics map [string ]* prompb.MetricFamily ) (ok bool ) {
4604
+ namesMap := make (map [string ]struct {}, len (names ))
4605
+ for _ , name := range names {
4606
+ namesMap [name ] = struct {}{}
4607
+ }
4608
+
4609
+ return func (metrics map [string ]* prompb.MetricFamily ) (ok bool ) {
4610
+ found := map [string ]struct {}{}
4611
+
4612
+ for name , fam := range metrics {
4613
+ if _ , ok := namesMap [name ]; ! ok {
4614
+ continue
4615
+ }
4616
+
4617
+ for _ , m := range fam .Metric {
4618
+ if m .Histogram .GetSampleCount () == 0 {
4619
+ continue
4620
+ }
4621
+
4622
+ observedValue := m .Histogram .GetSampleSum () / float64 (m .Histogram .GetSampleCount ())
4623
+ if observedValue < threshold {
4624
+ found [name ] = struct {}{}
4625
+ }
4626
+ }
4627
+
4628
+ if len (found ) == len (names ) {
4629
+ return true
4630
+ }
4631
+ }
4632
+ return false
4633
+ }
4634
+ }
0 commit comments