Skip to content

Commit be5509d

Browse files
committed
changefeedccl: add metamorphic testing for db-level feeds
This adds metamorphic testing of db-level feeds, making table level feeds into db-level feeds unless the test explicitly opts out or is otherwise setting changefeed options that aren't compatible with db-level feeds. Epic: CRDB-1421 Fixes: #148858
1 parent bf0742d commit be5509d

File tree

6 files changed

+414
-75
lines changed

6 files changed

+414
-75
lines changed

pkg/ccl/changefeedccl/alter_changefeed_test.go

Lines changed: 72 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,9 @@ func TestAlterChangefeedAddTarget(t *testing.T) {
221221
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
222222
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
223223

224-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
224+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{
225+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
226+
})
225227
defer closeFeed(t, testFeed)
226228

227229
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
@@ -261,7 +263,9 @@ func TestAlterChangefeedAddTargetAfterInitialScan(t *testing.T) {
261263
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
262264
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b INT)`)
263265

264-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
266+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{
267+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
268+
})
265269
defer closeFeed(t, testFeed)
266270

267271
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
@@ -458,7 +462,9 @@ func TestAlterChangefeedDropTarget(t *testing.T) {
458462
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
459463
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
460464

461-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`)
465+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`, optOutOfMetamorphicDBLevelChangefeed{
466+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
467+
})
462468
defer closeFeed(t, testFeed)
463469

464470
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
@@ -493,7 +499,11 @@ func TestAlterChangefeedDropTargetAfterTableDrop(t *testing.T) {
493499
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
494500
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
495501

496-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH on_error='pause'`)
502+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH on_error='pause'`,
503+
optOutOfMetamorphicDBLevelChangefeed{
504+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
505+
},
506+
)
497507
defer closeFeed(t, testFeed)
498508

499509
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
@@ -668,7 +678,9 @@ func TestAlterChangefeedErrors(t *testing.T) {
668678
sqlDB := sqlutils.MakeSQLRunner(s.DB)
669679
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
670680
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
671-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
681+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{
682+
reason: "test initializes multiple tables but doesn't watch all of them",
683+
})
672684
defer closeFeed(t, testFeed)
673685

674686
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
@@ -765,7 +777,9 @@ func TestAlterChangefeedDropAllTargetsError(t *testing.T) {
765777
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
766778
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
767779

768-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`)
780+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`, optOutOfMetamorphicDBLevelChangefeed{
781+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
782+
})
769783
defer closeFeed(t, testFeed)
770784

771785
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
@@ -799,7 +813,10 @@ func TestAlterChangefeedTelemetry(t *testing.T) {
799813
// Reset the counts.
800814
_ = telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts)
801815

802-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`)
816+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`,
817+
optOutOfMetamorphicDBLevelChangefeed{
818+
reason: "test initializes multiple tables but doesn't watch all of them",
819+
})
803820
defer closeFeed(t, testFeed)
804821
feed := testFeed.(cdctest.EnterpriseTestFeed)
805822

@@ -994,7 +1011,10 @@ func TestAlterChangefeedAddTargetErrors(t *testing.T) {
9941011
return true, nil
9951012
}
9961013

997-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '100ms'`)
1014+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '100ms'`,
1015+
optOutOfMetamorphicDBLevelChangefeed{
1016+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
1017+
})
9981018

9991019
// Kafka feeds are not buffered, so we have to consume messages.
10001020
g := ctxgroup.WithContext(context.Background())
@@ -1068,7 +1088,10 @@ func TestAlterChangefeedDatabaseQualifiedNames(t *testing.T) {
10681088
sqlDB.Exec(t, `CREATE TABLE d.users (id INT PRIMARY KEY, name STRING)`)
10691089
sqlDB.Exec(t, `INSERT INTO d.drivers VALUES (1, 'Alice')`)
10701090
sqlDB.Exec(t, `INSERT INTO d.users VALUES (1, 'Bob')`)
1071-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR d.drivers WITH resolved = '100ms', diff`)
1091+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR d.drivers WITH resolved = '100ms', diff`,
1092+
optOutOfMetamorphicDBLevelChangefeed{
1093+
reason: "test initializes multiple tables but doesn't watch all of them",
1094+
})
10721095
defer closeFeed(t, testFeed)
10731096

10741097
assertPayloads(t, testFeed, []string{
@@ -1119,7 +1142,11 @@ func TestAlterChangefeedDatabaseScope(t *testing.T) {
11191142
`INSERT INTO new_movr.drivers VALUES (1, 'Bob')`,
11201143
)
11211144

1122-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff`)
1145+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff`,
1146+
optOutOfMetamorphicDBLevelChangefeed{
1147+
reason: "changefeed watches tables not in the default database",
1148+
},
1149+
)
11231150
defer closeFeed(t, testFeed)
11241151

11251152
assertPayloads(t, testFeed, []string{
@@ -1162,7 +1189,10 @@ func TestAlterChangefeedDatabaseScopeUnqualifiedName(t *testing.T) {
11621189
)
11631190

11641191
sqlDB.Exec(t, `USE movr`)
1165-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR drivers WITH diff, resolved = '100ms'`)
1192+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR drivers WITH diff, resolved = '100ms'`,
1193+
optOutOfMetamorphicDBLevelChangefeed{
1194+
reason: "changefeed watches tables not in the default database",
1195+
})
11661196
defer closeFeed(t, testFeed)
11671197

11681198
assertPayloads(t, testFeed, []string{
@@ -1211,6 +1241,9 @@ func TestAlterChangefeedColumnFamilyDatabaseScope(t *testing.T) {
12111241
if _, ok := f.(*webhookFeedFactory); ok {
12121242
args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "metamorphic enriched envelope does not support column families for webhook sinks"})
12131243
}
1244+
args = append(args, optOutOfMetamorphicDBLevelChangefeed{
1245+
reason: "changefeed watches tables not in the default database",
1246+
})
12141247
testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff, split_column_families`, args...)
12151248
defer closeFeed(t, testFeed)
12161249

@@ -1263,6 +1296,9 @@ func TestAlterChangefeedAlterTableName(t *testing.T) {
12631296
if _, ok := f.(*webhookFeedFactory); ok {
12641297
args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "see comment"})
12651298
}
1299+
args = append(args, optOutOfMetamorphicDBLevelChangefeed{
1300+
reason: "changefeed watches tables not in the default database",
1301+
})
12661302

12671303
testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.users WITH diff, resolved = '100ms'`, args...)
12681304
defer closeFeed(t, testFeed)
@@ -1350,7 +1386,10 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) {
13501386
}
13511387

13521388
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo
1353-
WITH resolved = '1s', no_initial_scan, min_checkpoint_frequency='1ns'`)
1389+
WITH resolved = '1s', no_initial_scan, min_checkpoint_frequency='1ns'`,
1390+
optOutOfMetamorphicDBLevelChangefeed{
1391+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
1392+
})
13541393
jobFeed := testFeed.(cdctest.EnterpriseTestFeed)
13551394
jobRegistry := s.Server.JobRegistry().(*jobs.Registry)
13561395

@@ -1551,7 +1590,10 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
15511590

15521591
registry := s.Server.JobRegistry().(*jobs.Registry)
15531592
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo
1554-
WITH resolved = '100ms', min_checkpoint_frequency='1ns'`)
1593+
WITH resolved = '100ms', min_checkpoint_frequency='1ns'`,
1594+
optOutOfMetamorphicDBLevelChangefeed{
1595+
reason: "test initializes multiple tables but doesn't watch all of them",
1596+
})
15551597

15561598
g := ctxgroup.WithContext(context.Background())
15571599
g.Go(func() error {
@@ -1691,7 +1733,10 @@ func TestAlterChangefeedDropTargetDuringInitialScan(t *testing.T) {
16911733
if rnd.Intn(2) == 0 {
16921734
targets = "bar, foo"
16931735
}
1694-
testFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED for %s`, targets))
1736+
testFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED for %s`, targets),
1737+
optOutOfMetamorphicDBLevelChangefeed{
1738+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
1739+
})
16951740
defer closeFeed(t, testFeed)
16961741

16971742
// Wait for all spans to have been resolved.
@@ -1742,7 +1787,10 @@ func TestAlterChangefeedInitialScan(t *testing.T) {
17421787
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
17431788
sqlDB.Exec(t, `INSERT INTO bar VALUES (1), (2), (3)`)
17441789

1745-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '1s', no_initial_scan`)
1790+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '1s', no_initial_scan`,
1791+
optOutOfMetamorphicDBLevelChangefeed{
1792+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
1793+
})
17461794
defer closeFeed(t, testFeed)
17471795

17481796
expectResolvedTimestamp(t, testFeed)
@@ -1861,7 +1909,9 @@ func TestAlterChangefeedAccessControl(t *testing.T) {
18611909
rootDB := sqlutils.MakeSQLRunner(s.DB)
18621910

18631911
createFeed := func(stmt string) (cdctest.EnterpriseTestFeed, func()) {
1864-
successfulFeed := feed(t, f, stmt)
1912+
successfulFeed := feed(t, f, stmt, optOutOfMetamorphicDBLevelChangefeed{
1913+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
1914+
})
18651915
closeCf := func() {
18661916
closeFeed(t, successfulFeed)
18671917
}
@@ -1933,7 +1983,9 @@ func TestAlterChangefeedAddDropSameTarget(t *testing.T) {
19331983
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
19341984
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
19351985

1936-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
1986+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{
1987+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
1988+
})
19371989
defer closeFeed(t, testFeed)
19381990

19391991
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
@@ -2050,7 +2102,9 @@ func TestAlterChangefeedRandomizedTargetChanges(t *testing.T) {
20502102
createStmt := fmt.Sprintf(
20512103
`CREATE CHANGEFEED FOR %s WITH updated`, strings.Join(initialTables, ", "))
20522104
t.Log(createStmt)
2053-
testFeed := feed(t, f, createStmt)
2105+
testFeed := feed(t, f, createStmt, optOutOfMetamorphicDBLevelChangefeed{
2106+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
2107+
})
20542108
defer closeFeed(t, testFeed)
20552109

20562110
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)

0 commit comments

Comments
 (0)