Skip to content

Commit af6b700

Browse files
committed
changefeedccl: deflake tests that expect frequent span-level checkpoints
In a recent commit, the interval at which change aggregators would flush their frontiers to the change frontier was decoupled from the span-level checkpoint interval (which is configurable via a cluster setting). This caused some tests that only set the cluster setting (and not `min_checkpoint_frequency`) to a low value to sometimes flake. This patch updates all such tests to also configure the `min_checkpoint_frequency` option. Release note: None
1 parent a52f8ce commit af6b700

File tree

3 files changed

+16
-8
lines changed

3 files changed

+16
-8
lines changed

pkg/ccl/changefeedccl/alter_changefeed_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1542,7 +1542,8 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
15421542
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)
15431543

15441544
registry := s.Server.JobRegistry().(*jobs.Registry)
1545-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '100ms'`)
1545+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo
1546+
WITH resolved = '100ms', min_checkpoint_frequency='1ns'`)
15461547

15471548
g := ctxgroup.WithContext(context.Background())
15481549
g.Go(func() error {

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2810,7 +2810,8 @@ func TestChangefeedLaggingSpanCheckpointing(t *testing.T) {
28102810

28112811
var jobID jobspb.JobID
28122812
sqlDB.QueryRow(t,
2813-
`CREATE CHANGEFEED FOR foo INTO 'null://' WITH resolved='50ms', no_initial_scan, cursor=$1`, tsStr,
2813+
`CREATE CHANGEFEED FOR foo INTO 'null://'
2814+
WITH resolved='50ms', min_checkpoint_frequency='50ms', no_initial_scan, cursor=$1`, tsStr,
28142815
).Scan(&jobID)
28152816

28162817
// Helper to read job progress
@@ -2952,7 +2953,8 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) {
29522953
}
29532954

29542955
// Setup changefeed job details, avoid relying on initial scan functionality
2955-
baseFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved='100ms', min_checkpoint_frequency='100ms', no_initial_scan`)
2956+
baseFeed := feed(t, f, `CREATE CHANGEFEED FOR foo
2957+
WITH resolved='100ms', min_checkpoint_frequency='1ns', no_initial_scan`)
29562958
jobFeed := baseFeed.(cdctest.EnterpriseTestFeed)
29572959
jobRegistry := s.Server.JobRegistry().(*jobs.Registry)
29582960

@@ -9149,7 +9151,8 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
91499151
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)
91509152

91519153
registry := s.Server.JobRegistry().(*jobs.Registry)
9152-
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved='100ms'`)
9154+
foo := feed(t, f, `CREATE CHANGEFEED FOR foo
9155+
WITH resolved='100ms', min_checkpoint_frequency='1ns'`)
91539156
// Some test feeds (kafka) are not buffered, so we have to consume messages.
91549157
var shouldDrain int32 = 1
91559158
g := ctxgroup.WithContext(context.Background())
@@ -12009,7 +12012,8 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1200912012
require.Equal(t, int64(0), managePTSCount)
1201012013
require.Equal(t, int64(0), managePTSErrorCount)
1201112014

12012-
createStmt := `CREATE CHANGEFEED FOR foo WITH resolved='10ms', no_initial_scan`
12015+
createStmt := `CREATE CHANGEFEED FOR foo
12016+
WITH resolved='10ms', min_checkpoint_frequency='10ms', no_initial_scan`
1201312017
testFeed := feed(t, f, createStmt)
1201412018
defer closeFeed(t, testFeed)
1201512019

@@ -12122,7 +12126,8 @@ func TestChangefeedProtectedTimestampUpdateError(t *testing.T) {
1212212126
return errors.New("test error")
1212312127
}
1212412128

12125-
createStmt := `CREATE CHANGEFEED FOR foo WITH resolved='10ms', no_initial_scan`
12129+
createStmt := `CREATE CHANGEFEED FOR foo
12130+
WITH resolved='10ms', min_checkpoint_frequency='10ms', no_initial_scan`
1212612131
testFeed := feed(t, f, createStmt)
1212712132
defer closeFeed(t, testFeed)
1212812133

pkg/ccl/changefeedccl/protected_timestamps_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,8 @@ func TestChangefeedProtectedTimestampUpdateForMultipleTables(t *testing.T) {
970970
require.Equal(t, int64(0), managePTSCount)
971971
require.Equal(t, int64(0), managePTSErrorCount)
972972

973-
createStmt := `CREATE CHANGEFEED FOR foo, bar WITH resolved='10ms', initial_scan='no'`
973+
createStmt := `CREATE CHANGEFEED FOR foo, bar
974+
WITH resolved='10ms', min_checkpoint_frequency='100ms', initial_scan='no'`
974975
testFeed := feed(t, f, createStmt)
975976
defer closeFeed(t, testFeed)
976977

@@ -1114,7 +1115,8 @@ func TestChangefeedPerTableProtectedTimestampProgression(t *testing.T) {
11141115
}
11151116
}
11161117

1117-
createStmt := `CREATE CHANGEFEED FOR table1, table2, table3 WITH resolved='100ms'`
1118+
createStmt := `CREATE CHANGEFEED FOR table1, table2, table3
1119+
WITH resolved='100ms', min_checkpoint_frequency='100ms'`
11181120
testFeed := feed(t, f, createStmt)
11191121
defer closeFeed(t, testFeed)
11201122

0 commit comments

Comments
 (0)