@@ -41,16 +41,24 @@ const (
4141 // when creating the changefeed
4242 resolvedInterval = "5s"
4343
44+ // minCheckpointFrequency is the value to use for `min_checkpoint_frequency`
45+ // when creating changefeeds.
46+ minCheckpointFrequency = "1s"
47+
4448 // kafkaBufferMessageSize is the number of messages from kafka
4549 // we allow to be buffered in memory before validating them.
4650 kafkaBufferMessageSize = 1 << 16 // 64 KiB
4751)
4852
49- var (
53+ const (
5054 // the CDC target, DB and table. We're running the bank workload in
5155 // this test.
52- targetDB = "bank"
53- targetTable = "bank"
56+ // NB: We increase the number of ranges/rows from the defaults to
57+ // allow for more interesting DistSQL plans.
58+ targetDB = "bank"
59+ targetTable = "bank"
60+ targetTableRanges = 100
61+ targetTableRows = 10_000
5462
5563 // teamcityAgentZone is the zone used in this test. Since this test
5664 // runs a lot of queries from the TeamCity agent to CRDB nodes, we
@@ -74,6 +82,18 @@ func registerCDCMixedVersions(r registry.Registry) {
7482 runCDCMixedVersions (ctx , t , c )
7583 },
7684 })
85+ r .Add (registry.TestSpec {
86+ Name : "cdc/mixed-version/checkpointing" ,
87+ Owner : registry .OwnerCDC ,
88+ Cluster : r .MakeClusterSpec (5 , spec .WorkloadNode (), spec .GCEZones (teamcityAgentZone ), spec .Arch (vm .ArchAMD64 )),
89+ Timeout : 3 * time .Hour ,
90+ CompatibleClouds : registry .OnlyGCE ,
91+ Suites : registry .Suites (registry .MixedVersion , registry .Nightly ),
92+ Randomized : true ,
93+ Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
94+ runCDCMixedVersionCheckpointing (ctx , t , c )
95+ },
96+ })
7797}
7898
7999// cdcMixedVersionTester implements mixed-version/upgrade testing for
@@ -351,8 +371,9 @@ func (cmvt *cdcMixedVersionTester) createChangeFeed(
351371 l .Printf ("starting changefeed on node %d (updating system settings via node %d)" , node , systemNode )
352372
353373 options := map [string ]string {
354- "updated" : "" ,
355- "resolved" : fmt .Sprintf ("'%s'" , resolvedInterval ),
374+ "updated" : "" ,
375+ "resolved" : fmt .Sprintf ("'%s'" , resolvedInterval ),
376+ "min_checkpoint_frequency" : fmt .Sprintf ("'%s'" , minCheckpointFrequency ),
356377 }
357378
358379 var ff cdcFeatureFlags
@@ -386,6 +407,8 @@ func (cmvt *cdcMixedVersionTester) createChangeFeed(
386407// runWorkloadCmd returns the command that runs the workload.
387408func (cmvt * cdcMixedVersionTester ) runWorkloadCmd (r * rand.Rand ) * roachtestutil.Command {
388409 return roachtestutil .NewCommand ("%s workload run bank" , test .DefaultCockroachPath ).
410+ Flag ("ranges" , targetTableRanges ).
411+ Flag ("rows" , targetTableRows ).
389412 // Since all rows are placed in a buffer, setting a low rate of 2 operations / sec
390413 // helps ensure that we don't exceed the buffer capacity.
391414 Flag ("max-rate" , 2 ).
@@ -403,6 +426,8 @@ func (cmvt *cdcMixedVersionTester) initWorkload(
403426 }
404427
405428 bankInit := roachtestutil .NewCommand ("%s workload init bank" , test .DefaultCockroachPath ).
429+ Flag ("ranges" , targetTableRanges ).
430+ Flag ("rows" , targetTableRows ).
406431 Flag ("seed" , r .Int63 ()).
407432 Arg ("{pgurl%s}" , cmvt .crdbNodes )
408433
@@ -561,3 +586,67 @@ func runCDCMixedVersions(ctx context.Context, t test.Test, c cluster.Cluster) {
561586 mvt .AfterUpgradeFinalized ("wait and validate" , tester .waitAndValidate )
562587 mvt .Run ()
563588}
589+
590+ // runCDCMixedVersionCheckpointing tests that the new span-level checkpoint
591+ // code added in 25.2 works correctly in mixed-version states.
592+ //
593+ // Writing the checkpoint is explicitly forced with cluster settings and
594+ // restoring the checkpoint implicitly happens following each of the rolling
595+ // restarts during the mixed-version test run.
596+ func runCDCMixedVersionCheckpointing (ctx context.Context , t test.Test , c cluster.Cluster ) {
597+ tester := newCDCMixedVersionTester (ctx , c )
598+
599+ mvt := mixedversion .NewTest (
600+ ctx , t , t .L (), c , tester .crdbNodes ,
601+ // We're only concerned with mixed-version compatibility starting at
602+ // versions that can upgrade to 25.2 (only 24.3 and 25.1), since that's
603+ // the first version with the new span-level checkpoint format.
604+ mixedversion .MinimumSupportedVersion ("v24.3.0" ),
605+ )
606+
607+ cleanupKafka := tester .StartKafka (t , c )
608+ defer cleanupKafka ()
609+
610+ forceCheckpointing := func (
611+ ctx context.Context , l * logger.Logger , r * rand.Rand , h * mixedversion.Helper ,
612+ ) error {
613+ // NB: We use the 24.3 names for the cluster settings so that the cluster
614+ // can understand them at startup time.
615+ for _ , stmt := range []string {
616+ `SET CLUSTER SETTING changefeed.frontier_checkpoint_frequency = '1s'` ,
617+ `SET CLUSTER SETTING changefeed.frontier_highwater_lag_checkpoint_threshold = '1us'` ,
618+ } {
619+ if err := h .Exec (r , stmt ); err != nil {
620+ return err
621+ }
622+ }
623+ return nil
624+ }
625+
626+ scatter := func (
627+ ctx context.Context , l * logger.Logger , r * rand.Rand , h * mixedversion.Helper ,
628+ ) error {
629+ return h .Exec (r , `ALTER TABLE bank.bank SCATTER` )
630+ }
631+
632+ // Test setup.
633+ mvt .OnStartup ("start changefeed" , tester .createChangeFeed )
634+ mvt .OnStartup ("create validator" , tester .setupValidator )
635+ mvt .OnStartup ("init workload" , tester .initWorkload )
636+ mvt .OnStartup ("set checkpointing settings" , forceCheckpointing )
637+
638+ // Run workload and kafka consumer.
639+ runWorkloadCmd := tester .runWorkloadCmd (mvt .RNG ())
640+ _ = mvt .BackgroundCommand ("run workload" , tester .workloadNodes , runWorkloadCmd )
641+ _ = mvt .BackgroundFunc ("run kafka consumer" , tester .runKafkaConsumer )
642+
643+ // Scatter the ranges throughout the test to make it more likely that every
644+ // node will participate in the changefeed.
645+ mvt .InMixedVersion ("scatter ranges" , scatter )
646+
647+ // Validate the changefeed's output both during and after any upgrades.
648+ mvt .InMixedVersion ("wait and validate" , tester .waitAndValidate )
649+ mvt .AfterUpgradeFinalized ("wait and validate" , tester .waitAndValidate )
650+
651+ mvt .Run ()
652+ }
0 commit comments