Skip to content

Commit 0980cbd

Browse files
committed
changefeedccl: deflake tests that expect frequent span-level checkpoints
In a recent commit, the interval at which change aggregators would flush their frontiers to the change frontier was decoupled from the span-level checkpoint interval (which is configurable via a cluster setting). This caused some tests that only set the cluster setting (and not `min_checkpoint_frequency`) to a low value to sometimes flake. This patch updates all such tests to also configure the `min_checkpoint_frequency` option. Release note: None
1 parent 30a1eb7 commit 0980cbd

File tree

3 files changed

+16
-8
lines changed

3 files changed

+16
-8
lines changed

pkg/ccl/changefeedccl/alter_changefeed_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1542,7 +1542,8 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
15421542
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)
15431543

15441544
registry := s.Server.JobRegistry().(*jobs.Registry)
1545-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '100ms'`)
1545+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo
1546+
WITH resolved = '100ms', min_checkpoint_frequency='1ns'`)
15461547

15471548
g := ctxgroup.WithContext(context.Background())
15481549
g.Go(func() error {

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2802,7 +2802,8 @@ func TestChangefeedLaggingSpanCheckpointing(t *testing.T) {
28022802

28032803
var jobID jobspb.JobID
28042804
sqlDB.QueryRow(t,
2805-
`CREATE CHANGEFEED FOR foo INTO 'null://' WITH resolved='50ms', no_initial_scan, cursor=$1`, tsStr,
2805+
`CREATE CHANGEFEED FOR foo INTO 'null://'
2806+
WITH resolved='50ms', min_checkpoint_frequency='50ms', no_initial_scan, cursor=$1`, tsStr,
28062807
).Scan(&jobID)
28072808

28082809
// Helper to read job progress
@@ -2944,7 +2945,8 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) {
29442945
}
29452946

29462947
// Setup changefeed job details, avoid relying on initial scan functionality
2947-
baseFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved='100ms', min_checkpoint_frequency='100ms', no_initial_scan`)
2948+
baseFeed := feed(t, f, `CREATE CHANGEFEED FOR foo
2949+
WITH resolved='100ms', min_checkpoint_frequency='1ns', no_initial_scan`)
29482950
jobFeed := baseFeed.(cdctest.EnterpriseTestFeed)
29492951
jobRegistry := s.Server.JobRegistry().(*jobs.Registry)
29502952

@@ -9140,7 +9142,8 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
91409142
context.Background(), &s.Server.ClusterSettings().SV, maxCheckpointSize)
91419143

91429144
registry := s.Server.JobRegistry().(*jobs.Registry)
9143-
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved='100ms'`)
9145+
foo := feed(t, f, `CREATE CHANGEFEED FOR foo
9146+
WITH resolved='100ms', min_checkpoint_frequency='1ns'`)
91449147
// Some test feeds (kafka) are not buffered, so we have to consume messages.
91459148
var shouldDrain int32 = 1
91469149
g := ctxgroup.WithContext(context.Background())
@@ -12000,7 +12003,8 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1200012003
require.Equal(t, int64(0), managePTSCount)
1200112004
require.Equal(t, int64(0), managePTSErrorCount)
1200212005

12003-
createStmt := `CREATE CHANGEFEED FOR foo WITH resolved='10ms', no_initial_scan`
12006+
createStmt := `CREATE CHANGEFEED FOR foo
12007+
WITH resolved='10ms', min_checkpoint_frequency='10ms', no_initial_scan`
1200412008
testFeed := feed(t, f, createStmt)
1200512009
defer closeFeed(t, testFeed)
1200612010

@@ -12113,7 +12117,8 @@ func TestChangefeedProtectedTimestampUpdateError(t *testing.T) {
1211312117
return errors.New("test error")
1211412118
}
1211512119

12116-
createStmt := `CREATE CHANGEFEED FOR foo WITH resolved='10ms', no_initial_scan`
12120+
createStmt := `CREATE CHANGEFEED FOR foo
12121+
WITH resolved='10ms', min_checkpoint_frequency='10ms', no_initial_scan`
1211712122
testFeed := feed(t, f, createStmt)
1211812123
defer closeFeed(t, testFeed)
1211912124

pkg/ccl/changefeedccl/protected_timestamps_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,8 @@ func TestChangefeedProtectedTimestampUpdateForMultipleTables(t *testing.T) {
970970
require.Equal(t, int64(0), managePTSCount)
971971
require.Equal(t, int64(0), managePTSErrorCount)
972972

973-
createStmt := `CREATE CHANGEFEED FOR foo, bar WITH resolved='10ms', initial_scan='no'`
973+
createStmt := `CREATE CHANGEFEED FOR foo, bar
974+
WITH resolved='10ms', min_checkpoint_frequency='100ms', initial_scan='no'`
974975
testFeed := feed(t, f, createStmt)
975976
defer closeFeed(t, testFeed)
976977

@@ -1114,7 +1115,8 @@ func TestChangefeedPerTableProtectedTimestampProgression(t *testing.T) {
11141115
}
11151116
}
11161117

1117-
createStmt := `CREATE CHANGEFEED FOR table1, table2, table3 WITH resolved='100ms'`
1118+
createStmt := `CREATE CHANGEFEED FOR table1, table2, table3
1119+
WITH resolved='100ms', min_checkpoint_frequency='100ms'`
11181120
testFeed := feed(t, f, createStmt)
11191121
defer closeFeed(t, testFeed)
11201122

0 commit comments

Comments
 (0)