Skip to content

Commit 33522b5

Browse files
committed
changefeedccl: add metamorphic testing for db-level feeds
This adds metamorphic testing of db-level feeds, making table level feeds into db-level feeds unless the test explicitly opts out or is otherwise setting changefeed options that aren't compatible with db-level feeds. Epic: CRDB-1421 Fixes: #148858
1 parent bf0742d commit 33522b5

File tree

6 files changed

+382
-67
lines changed

6 files changed

+382
-67
lines changed

pkg/ccl/changefeedccl/alter_changefeed_test.go

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,9 @@ func TestAlterChangefeedAddTargetAfterInitialScan(t *testing.T) {
261261
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
262262
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b INT)`)
263263

264-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
264+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{
265+
reason: "db level changefeeds don't support ALTER CHANGEFEED commands with initial_scan",
266+
})
265267
defer closeFeed(t, testFeed)
266268

267269
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
@@ -493,7 +495,11 @@ func TestAlterChangefeedDropTargetAfterTableDrop(t *testing.T) {
493495
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
494496
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
495497

496-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH on_error='pause'`)
498+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH on_error='pause'`,
499+
optOutOfMetamorphicDBLevelChangefeed{
500+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
501+
},
502+
)
497503
defer closeFeed(t, testFeed)
498504

499505
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
@@ -668,7 +674,9 @@ func TestAlterChangefeedErrors(t *testing.T) {
668674
sqlDB := sqlutils.MakeSQLRunner(s.DB)
669675
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
670676
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
671-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
677+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{
678+
reason: "test initializes multiple tables but doesn't watch all of them",
679+
})
672680
defer closeFeed(t, testFeed)
673681

674682
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
@@ -799,7 +807,10 @@ func TestAlterChangefeedTelemetry(t *testing.T) {
799807
// Reset the counts.
800808
_ = telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts)
801809

802-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`)
810+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`,
811+
optOutOfMetamorphicDBLevelChangefeed{
812+
reason: "test initializes multiple tables but doesn't watch all of them",
813+
})
803814
defer closeFeed(t, testFeed)
804815
feed := testFeed.(cdctest.EnterpriseTestFeed)
805816

@@ -1068,7 +1079,10 @@ func TestAlterChangefeedDatabaseQualifiedNames(t *testing.T) {
10681079
sqlDB.Exec(t, `CREATE TABLE d.users (id INT PRIMARY KEY, name STRING)`)
10691080
sqlDB.Exec(t, `INSERT INTO d.drivers VALUES (1, 'Alice')`)
10701081
sqlDB.Exec(t, `INSERT INTO d.users VALUES (1, 'Bob')`)
1071-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR d.drivers WITH resolved = '100ms', diff`)
1082+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR d.drivers WITH resolved = '100ms', diff`,
1083+
optOutOfMetamorphicDBLevelChangefeed{
1084+
reason: "test initializes multiple tables but doesn't watch all of them",
1085+
})
10721086
defer closeFeed(t, testFeed)
10731087

10741088
assertPayloads(t, testFeed, []string{
@@ -1119,7 +1133,11 @@ func TestAlterChangefeedDatabaseScope(t *testing.T) {
11191133
`INSERT INTO new_movr.drivers VALUES (1, 'Bob')`,
11201134
)
11211135

1122-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff`)
1136+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff`,
1137+
optOutOfMetamorphicDBLevelChangefeed{
1138+
reason: "changefeed watches tables not in the default database",
1139+
},
1140+
)
11231141
defer closeFeed(t, testFeed)
11241142

11251143
assertPayloads(t, testFeed, []string{
@@ -1162,7 +1180,10 @@ func TestAlterChangefeedDatabaseScopeUnqualifiedName(t *testing.T) {
11621180
)
11631181

11641182
sqlDB.Exec(t, `USE movr`)
1165-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR drivers WITH diff, resolved = '100ms'`)
1183+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR drivers WITH diff, resolved = '100ms'`,
1184+
optOutOfMetamorphicDBLevelChangefeed{
1185+
reason: "test initializes multiple tables but doesn't watch all of them",
1186+
})
11661187
defer closeFeed(t, testFeed)
11671188

11681189
assertPayloads(t, testFeed, []string{
@@ -1211,6 +1232,9 @@ func TestAlterChangefeedColumnFamilyDatabaseScope(t *testing.T) {
12111232
if _, ok := f.(*webhookFeedFactory); ok {
12121233
args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "metamorphic enriched envelope does not support column families for webhook sinks"})
12131234
}
1235+
args = append(args, optOutOfMetamorphicDBLevelChangefeed{
1236+
reason: "changefeed watches tables not in the default database",
1237+
})
12141238
testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff, split_column_families`, args...)
12151239
defer closeFeed(t, testFeed)
12161240

@@ -1263,6 +1287,9 @@ func TestAlterChangefeedAlterTableName(t *testing.T) {
12631287
if _, ok := f.(*webhookFeedFactory); ok {
12641288
args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "see comment"})
12651289
}
1290+
args = append(args, optOutOfMetamorphicDBLevelChangefeed{
1291+
reason: "changefeed watches tables not in the default database",
1292+
})
12661293

12671294
testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.users WITH diff, resolved = '100ms'`, args...)
12681295
defer closeFeed(t, testFeed)
@@ -1551,7 +1578,10 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
15511578

15521579
registry := s.Server.JobRegistry().(*jobs.Registry)
15531580
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo
1554-
WITH resolved = '100ms', min_checkpoint_frequency='1ns'`)
1581+
WITH resolved = '100ms', min_checkpoint_frequency='1ns'`,
1582+
optOutOfMetamorphicDBLevelChangefeed{
1583+
reason: "test initializes multiple tables but doesn't watch all of them",
1584+
})
15551585

15561586
g := ctxgroup.WithContext(context.Background())
15571587
g.Go(func() error {
@@ -1861,7 +1891,9 @@ func TestAlterChangefeedAccessControl(t *testing.T) {
18611891
rootDB := sqlutils.MakeSQLRunner(s.DB)
18621892

18631893
createFeed := func(stmt string) (cdctest.EnterpriseTestFeed, func()) {
1864-
successfulFeed := feed(t, f, stmt)
1894+
successfulFeed := feed(t, f, stmt, optOutOfMetamorphicDBLevelChangefeed{
1895+
reason: "test initializes multiple tables but doesn't watch all of them",
1896+
})
18651897
closeCf := func() {
18661898
closeFeed(t, successfulFeed)
18671899
}
@@ -2050,7 +2082,9 @@ func TestAlterChangefeedRandomizedTargetChanges(t *testing.T) {
20502082
createStmt := fmt.Sprintf(
20512083
`CREATE CHANGEFEED FOR %s WITH updated`, strings.Join(initialTables, ", "))
20522084
t.Log(createStmt)
2053-
testFeed := feed(t, f, createStmt)
2085+
testFeed := feed(t, f, createStmt, optOutOfMetamorphicDBLevelChangefeed{
2086+
reason: "db level feeds don't support ALTERing targets with ADD/DROP TARGETS",
2087+
})
20542088
defer closeFeed(t, testFeed)
20552089

20562090
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)

0 commit comments

Comments
 (0)