diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index 000970181306..a20d40b548bf 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -221,7 +221,9 @@ func TestAlterChangefeedAddTarget(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{ + reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs", + }) defer closeFeed(t, testFeed) feed, ok := testFeed.(cdctest.EnterpriseTestFeed) @@ -261,7 +263,9 @@ func TestAlterChangefeedAddTargetAfterInitialScan(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b INT)`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{ + reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs", + }) defer closeFeed(t, testFeed) feed, ok := testFeed.(cdctest.EnterpriseTestFeed) @@ -458,7 +462,9 @@ func TestAlterChangefeedDropTarget(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`, optOutOfMetamorphicDBLevelChangefeed{ + reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs", + }) defer closeFeed(t, testFeed) feed, ok := testFeed.(cdctest.EnterpriseTestFeed) @@ -493,7 +499,11 @@ func TestAlterChangefeedDropTargetAfterTableDrop(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH on_error='pause'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH on_error='pause'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs", + }, + ) defer closeFeed(t, testFeed) feed, ok := testFeed.(cdctest.EnterpriseTestFeed) @@ -668,7 +678,9 @@ func TestAlterChangefeedErrors(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, testFeed) feed, ok := testFeed.(cdctest.EnterpriseTestFeed) @@ -765,7 +777,9 @@ func TestAlterChangefeedDropAllTargetsError(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`, optOutOfMetamorphicDBLevelChangefeed{ + reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs", + }) defer closeFeed(t, testFeed) feed, ok := testFeed.(cdctest.EnterpriseTestFeed) @@ -799,7 +813,10 @@ func TestAlterChangefeedTelemetry(t *testing.T) { // Reset the counts. _ = telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, testFeed) feed := testFeed.(cdctest.EnterpriseTestFeed) @@ -994,7 +1011,10 @@ func TestAlterChangefeedAddTargetErrors(t *testing.T) { return true, nil } - testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '100ms'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '100ms'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs", + }) // Kafka feeds are not buffered, so we have to consume messages. g := ctxgroup.WithContext(context.Background()) @@ -1068,7 +1088,10 @@ func TestAlterChangefeedDatabaseQualifiedNames(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE d.users (id INT PRIMARY KEY, name STRING)`) sqlDB.Exec(t, `INSERT INTO d.drivers VALUES (1, 'Alice')`) sqlDB.Exec(t, `INSERT INTO d.users VALUES (1, 'Bob')`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR d.drivers WITH resolved = '100ms', diff`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR d.drivers WITH resolved = '100ms', diff`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, testFeed) assertPayloads(t, testFeed, []string{ @@ -1119,7 +1142,11 @@ func TestAlterChangefeedDatabaseScope(t *testing.T) { `INSERT INTO new_movr.drivers VALUES (1, 'Bob')`, ) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + }, + ) defer closeFeed(t, testFeed) assertPayloads(t, testFeed, []string{ @@ -1162,7 +1189,10 @@ func TestAlterChangefeedDatabaseScopeUnqualifiedName(t *testing.T) { ) sqlDB.Exec(t, `USE movr`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR drivers WITH diff, resolved = '100ms'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR drivers WITH diff, resolved = '100ms'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + }) defer closeFeed(t, testFeed) assertPayloads(t, testFeed, []string{ @@ -1211,6 +1241,9 @@ func TestAlterChangefeedColumnFamilyDatabaseScope(t *testing.T) { if _, ok := f.(*webhookFeedFactory); ok { args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "metamorphic enriched envelope does not support column families for webhook sinks"}) } + args = append(args, optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + }) testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff, split_column_families`, args...) defer closeFeed(t, testFeed) @@ -1263,6 +1296,9 @@ func TestAlterChangefeedAlterTableName(t *testing.T) { if _, ok := f.(*webhookFeedFactory); ok { args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "see comment"}) } + args = append(args, optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + }) testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.users WITH diff, resolved = '100ms'`, args...) defer closeFeed(t, testFeed) @@ -1350,7 +1386,10 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) { } testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo -WITH resolved = '1s', no_initial_scan, min_checkpoint_frequency='1ns'`) +WITH resolved = '1s', no_initial_scan, min_checkpoint_frequency='1ns'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs", + }) jobFeed := testFeed.(cdctest.EnterpriseTestFeed) jobRegistry := s.Server.JobRegistry().(*jobs.Registry) @@ -1551,7 +1590,10 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { registry := s.Server.JobRegistry().(*jobs.Registry) testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo -WITH resolved = '100ms', min_checkpoint_frequency='1ns'`) +WITH resolved = '100ms', min_checkpoint_frequency='1ns'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) g := ctxgroup.WithContext(context.Background()) g.Go(func() error { @@ -1691,7 +1733,10 @@ func TestAlterChangefeedDropTargetDuringInitialScan(t *testing.T) { if rnd.Intn(2) == 0 { targets = "bar, foo" } - testFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED for %s`, targets)) + testFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED for %s`, targets), + optOutOfMetamorphicDBLevelChangefeed{ + reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs", + }) defer closeFeed(t, testFeed) // Wait for all spans to have been resolved. @@ -1742,7 +1787,10 @@ func TestAlterChangefeedInitialScan(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO bar VALUES (1), (2), (3)`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '1s', no_initial_scan`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '1s', no_initial_scan`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs", + }) defer closeFeed(t, testFeed) expectResolvedTimestamp(t, testFeed) @@ -1861,7 +1909,9 @@ func TestAlterChangefeedAccessControl(t *testing.T) { rootDB := sqlutils.MakeSQLRunner(s.DB) createFeed := func(stmt string) (cdctest.EnterpriseTestFeed, func()) { - successfulFeed := feed(t, f, stmt) + successfulFeed := feed(t, f, stmt, optOutOfMetamorphicDBLevelChangefeed{ + reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs", + }) closeCf := func() { closeFeed(t, successfulFeed) } @@ -1933,7 +1983,9 @@ func TestAlterChangefeedAddDropSameTarget(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{ + reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs", + }) defer closeFeed(t, testFeed) feed, ok := testFeed.(cdctest.EnterpriseTestFeed) @@ -2050,7 +2102,9 @@ func TestAlterChangefeedRandomizedTargetChanges(t *testing.T) { createStmt := fmt.Sprintf( `CREATE CHANGEFEED FOR %s WITH updated`, strings.Join(initialTables, ", ")) t.Log(createStmt) - testFeed := feed(t, f, createStmt) + testFeed := feed(t, f, createStmt, optOutOfMetamorphicDBLevelChangefeed{ + reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs", + }) defer closeFeed(t, testFeed) feed, ok := testFeed.(cdctest.EnterpriseTestFeed) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index dc0c310c78be..05cd7bc7ee14 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -954,8 +954,14 @@ func TestChangefeedIdleness(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) sqlDB.Exec(t, `CREATE TABLE bar (b INT PRIMARY KEY)`) - cf1 := feed(t, f, "CREATE CHANGEFEED FOR TABLE foo WITH resolved='10ms'") // higher resolved frequency for faster test - cf2 := feed(t, f, "CREATE CHANGEFEED FOR TABLE bar WITH resolved='10ms'") + cf1 := feed(t, f, "CREATE CHANGEFEED FOR TABLE foo WITH resolved='10ms'", // higher resolved frequency for faster test + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) + cf2 := feed(t, f, "CREATE CHANGEFEED FOR TABLE bar WITH resolved='10ms'", + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, cf1) go workload() @@ -2494,6 +2500,9 @@ func TestChangefeedColumnDropsOnMultipleFamiliesWithTheSameName(t *testing.T) { if _, ok := f.(*webhookFeedFactory); ok { args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "metamorphic enriched envelope does not support column families for webhook sinks"}) } + args = append(args, optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) // Open up the changefeed. cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c, TABLE alsohasfams FAMILY id_a`, args...) @@ -2760,7 +2769,10 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { // NB: the default is a nullable column sqlDB.Exec(t, `CREATE TABLE add_column (a INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO add_column VALUES (1)`) - addColumn := feed(t, f, `CREATE CHANGEFEED FOR add_column`) + addColumn := feed(t, f, `CREATE CHANGEFEED FOR add_column`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, addColumn) assertPayloads(t, addColumn, []string{ `add_column: [1]->{"after": {"a": 1}}`, @@ -2775,7 +2787,10 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`rename column`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE rename_column (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO rename_column VALUES (1, '1')`) - renameColumn := feed(t, f, `CREATE CHANGEFEED FOR rename_column`) + renameColumn := feed(t, f, `CREATE CHANGEFEED FOR rename_column`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, renameColumn) assertPayloads(t, renameColumn, []string{ `rename_column: [1]->{"after": {"a": 1, "b": "1"}}`, @@ -2790,7 +2805,10 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`add default`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE add_default (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO add_default (a, b) VALUES (1, '1')`) - addDefault := feed(t, f, `CREATE CHANGEFEED FOR add_default`) + addDefault := feed(t, f, `CREATE CHANGEFEED FOR add_default`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, addDefault) sqlDB.Exec(t, `ALTER TABLE add_default ALTER COLUMN b SET DEFAULT 'd'`) sqlDB.Exec(t, `INSERT INTO add_default (a) VALUES (2)`) @@ -2803,7 +2821,10 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`drop default`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE drop_default (a INT PRIMARY KEY, b STRING DEFAULT 'd')`) sqlDB.Exec(t, `INSERT INTO drop_default (a) VALUES (1)`) - dropDefault := feed(t, f, `CREATE CHANGEFEED FOR drop_default`) + dropDefault := feed(t, f, `CREATE CHANGEFEED FOR drop_default`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, dropDefault) sqlDB.Exec(t, `ALTER TABLE drop_default ALTER COLUMN b DROP DEFAULT`) sqlDB.Exec(t, `INSERT INTO drop_default (a) VALUES (2)`) @@ -2816,7 +2837,10 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`drop not null`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE drop_notnull (a INT PRIMARY KEY, b STRING NOT NULL)`) sqlDB.Exec(t, `INSERT INTO drop_notnull VALUES (1, '1')`) - dropNotNull := feed(t, f, `CREATE CHANGEFEED FOR drop_notnull`) + dropNotNull := feed(t, f, `CREATE CHANGEFEED FOR drop_notnull`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, dropNotNull) sqlDB.Exec(t, `ALTER TABLE drop_notnull ALTER b DROP NOT NULL`) sqlDB.Exec(t, `INSERT INTO drop_notnull VALUES (2, NULL)`) @@ -2829,7 +2853,10 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`checks`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE checks (a INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO checks VALUES (1)`) - checks := feed(t, f, `CREATE CHANGEFEED FOR checks`) + checks := feed(t, f, `CREATE CHANGEFEED FOR checks`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, checks) sqlDB.Exec(t, `ALTER TABLE checks ADD CONSTRAINT c CHECK (a < 5) NOT VALID`) sqlDB.Exec(t, `INSERT INTO checks VALUES (2)`) @@ -2848,7 +2875,10 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`add index`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE add_index (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO add_index VALUES (1, '1')`) - addIndex := feed(t, f, `CREATE CHANGEFEED FOR add_index`) + addIndex := feed(t, f, `CREATE CHANGEFEED FOR add_index`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, addIndex) sqlDB.Exec(t, `CREATE INDEX b_idx ON add_index (b)`) sqlDB.Exec(t, `SELECT * FROM add_index@b_idx`) @@ -2862,7 +2892,10 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`unique`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE "unique" (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO "unique" VALUES (1, '1')`) - unique := feed(t, f, `CREATE CHANGEFEED FOR "unique"`) + unique := feed(t, f, `CREATE CHANGEFEED FOR "unique"`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, unique) sqlDB.Exec(t, `ALTER TABLE "unique" ADD CONSTRAINT u UNIQUE (b)`) sqlDB.Exec(t, `INSERT INTO "unique" VALUES (2, '2')`) @@ -2876,7 +2909,10 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { sqlDB.Exec( t, `CREATE TABLE alter_default (a INT PRIMARY KEY, b STRING DEFAULT 'before')`) sqlDB.Exec(t, `INSERT INTO alter_default (a) VALUES (1)`) - alterDefault := feed(t, f, `CREATE CHANGEFEED FOR alter_default`) + alterDefault := feed(t, f, `CREATE CHANGEFEED FOR alter_default`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, alterDefault) sqlDB.Exec(t, `ALTER TABLE alter_default ALTER COLUMN b SET DEFAULT 'after'`) sqlDB.Exec(t, `INSERT INTO alter_default (a) VALUES (2)`) @@ -2890,7 +2926,10 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`add column with DEFAULT NULL`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE t (id INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO t VALUES (1)`) - defaultNull := feed(t, f, `CREATE CHANGEFEED FOR t`) + defaultNull := feed(t, f, `CREATE CHANGEFEED FOR t`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, defaultNull) sqlDB.Exec(t, `ALTER TABLE t ADD COLUMN c INT DEFAULT NULL`) sqlDB.Exec(t, `INSERT INTO t VALUES (2, 2)`) @@ -3435,7 +3474,10 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE add_col_comp (a INT PRIMARY KEY, b INT AS (a + 5) STORED)`) sqlDB.Exec(t, `INSERT INTO add_col_comp VALUES (1)`) sqlDB.Exec(t, `INSERT INTO add_col_comp (a) VALUES (2)`) - addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`) + addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, addColComp) assertPayloadsStripTs(t, addColComp, []string{ `add_col_comp: [1]->{"after": {"a": 1, "b": 6}}`, @@ -3461,7 +3503,10 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE drop_column (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO drop_column VALUES (1, '1')`) sqlDB.Exec(t, `INSERT INTO drop_column VALUES (2, '2')`) - dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`) + dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, dropColumn) assertPayloadsStripTs(t, dropColumn, []string{ `drop_column: [1]->{"after": {"a": 1, "b": "1"}}`, @@ -3503,7 +3548,10 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) { Changefeed.(*TestingKnobs) knobs.BeforeEmitRow = waitSinkHook - multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`) + multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, multipleAlters) assertPayloadsStripTs(t, multipleAlters, []string{ `multiple_alters: [1]->{"after": {"a": 1, "b": "1"}}`, @@ -3618,7 +3666,10 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE add_col_comp (a INT PRIMARY KEY, b INT AS (a + 5) STORED)`) sqlDB.Exec(t, `INSERT INTO add_col_comp VALUES (1)`) sqlDB.Exec(t, `INSERT INTO add_col_comp (a) VALUES (2)`) - addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`) + addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, addColComp) assertPayloadsStripTs(t, addColComp, []string{ `add_col_comp: [1]->{"after": {"a": 1, "b": 6}}`, @@ -3638,7 +3689,10 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE drop_column (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO drop_column VALUES (1, '1')`) sqlDB.Exec(t, `INSERT INTO drop_column VALUES (2, '2')`) - dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`) + dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, dropColumn) assertPayloadsStripTs(t, dropColumn, []string{ `drop_column: [1]->{"after": {"a": 1, "b": "1"}}`, @@ -3678,7 +3732,10 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { Changefeed.(*TestingKnobs) knobs.BeforeEmitRow = waitSinkHook - multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`) + multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, multipleAlters) assertPayloadsStripTs(t, multipleAlters, []string{ `multiple_alters: [1]->{"after": {"a": 1, "b": "1"}}`, @@ -4276,7 +4333,8 @@ func TestChangefeedJobControl(t *testing.T) { ChangefeedJobPermissionsTestSetup(t, s) createFeed := func(stmt string) (cdctest.EnterpriseTestFeed, func()) { - successfulFeed := feed(t, f, stmt) + successfulFeed := feed(t, f, stmt, optOutOfMetamorphicDBLevelChangefeed{ + reason: "tests table level changefeed permissions"}) closeCf := func() { closeFeed(t, successfulFeed) } @@ -5707,28 +5765,40 @@ func TestChangefeedLowFrequencyNotices(t *testing.T) { t.Run("no options specified", func(t *testing.T) { actual = "(no notice)" f := makeKafkaFeedFactory(t, s, dbWithHandler) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test requires split_column_families NOT to be set", + }) defer closeFeed(t, testFeed) require.Equal(t, `changefeed will emit to topic _u2603_`, actual) }) t.Run("normal resolved and min_checkpoint_frequency", func(t *testing.T) { actual = "(no notice)" f := makeKafkaFeedFactory(t, s, dbWithHandler) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='10s', min_checkpoint_frequency='10s'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='10s', min_checkpoint_frequency='10s'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test requires split_column_families NOT to be set", + }) defer closeFeed(t, testFeed) require.Equal(t, `changefeed will emit to topic _u2603_`, actual) }) t.Run("low resolved timestamp", func(t *testing.T) { actual = "(no notice)" f := makeKafkaFeedFactory(t, s, dbWithHandler) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='200ms'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='200ms'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test requires split_column_families NOT to be set", + }) defer closeFeed(t, testFeed) require.Equal(t, `the 'resolved' timestamp interval (200ms) is very low; consider increasing it to at least 500ms`, actual) }) t.Run("low min_checkpoint_frequency timestamp", func(t *testing.T) { actual = "(no notice)" f := makeKafkaFeedFactory(t, s, dbWithHandler) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH min_checkpoint_frequency='200ms'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH min_checkpoint_frequency='200ms'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test requires split_column_families NOT to be set", + }) defer closeFeed(t, testFeed) require.Equal(t, `the 'min_checkpoint_frequency' timestamp interval (200ms) is very low; consider increasing it to at least 500ms`, actual) }) @@ -5764,7 +5834,10 @@ func TestChangefeedOutputTopics(t *testing.T) { t.Run("kafka", func(t *testing.T) { actual = "(no notice)" f := makeKafkaFeedFactory(t, s, dbWithHandler) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test requires split_column_families NOT to be set", + }) defer closeFeed(t, testFeed) require.Equal(t, `changefeed will emit to topic _u2603_`, actual) }) @@ -5772,7 +5845,10 @@ func TestChangefeedOutputTopics(t *testing.T) { t.Run("pubsub v2", func(t *testing.T) { actual = "(no notice)" f := makePubsubFeedFactory(s, dbWithHandler) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'gcpubsub://does.not.matter/'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'gcpubsub://does.not.matter/'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test requires split_column_families NOT to be set", + }) defer closeFeed(t, testFeed) // Pubsub doesn't sanitize the topic name. require.Equal(t, `changefeed will emit to topic ☃`, actual) @@ -6559,9 +6635,15 @@ func TestChangefeedTruncateOrDrop(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE truncate_cascade (b INT PRIMARY KEY REFERENCES truncate (a)) WITH (schema_locked=false)`) sqlDB.Exec(t, `BEGIN; INSERT INTO truncate VALUES (1); INSERT INTO truncate_cascade VALUES (1); COMMIT`) - truncate := feed(t, f, `CREATE CHANGEFEED FOR truncate`) + truncate := feed(t, f, `CREATE CHANGEFEED FOR truncate`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, truncate) - truncateCascade := feed(t, f, `CREATE CHANGEFEED FOR truncate_cascade`) + truncateCascade := feed(t, f, `CREATE CHANGEFEED FOR truncate_cascade`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, truncateCascade) assertPayloads(t, truncate, []string{`truncate: [1]->{"after": {"a": 1}}`}) assertPayloads(t, truncateCascade, []string{`truncate_cascade: [1]->{"after": {"b": 1}}`}) @@ -6578,7 +6660,10 @@ func TestChangefeedTruncateOrDrop(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE drop (a INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO drop VALUES (1)`) - drop := feed(t, f, `CREATE CHANGEFEED FOR drop`) + drop := feed(t, f, `CREATE CHANGEFEED FOR drop`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, drop) assertPayloads(t, drop, []string{`drop: [1]->{"after": {"a": 1}}`}) sqlDB.Exec(t, `DROP TABLE drop`) @@ -8205,10 +8290,16 @@ func TestChangefeedTelemetry(t *testing.T) { // Reset the counts. _ = telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts) - // Start some feeds (and read from them to make sure they've started. - foo := feed(t, f, `CREATE CHANGEFEED FOR foo`) + // Start some feeds (and read from them to make sure they've started). + foo := feed(t, f, `CREATE CHANGEFEED FOR foo`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, foo) - fooBar := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH format=json`) + fooBar := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH format=json`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, fooBar) assertPayloads(t, foo, []string{ `foo: [1]->{"after": {"a": 1}}`, @@ -8489,7 +8580,9 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) { defer closeSink() atomic.StoreInt32(&shouldDrain, 1) - feed := feed(t, f, "CREATE CHANGEFEED FOR foo") + feed := feed(t, f, "CREATE CHANGEFEED FOR foo", optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + }) defer closeFeed(t, feed) jobID := feed.(cdctest.EnterpriseTestFeed).JobID() @@ -8654,7 +8747,10 @@ func TestChangefeedHandlesRollingRestart(t *testing.T) { defer closeSink() proceed <- struct{}{} // Allow changefeed to start. - feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH initial_scan='no', min_checkpoint_frequency='100ms'") + feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH initial_scan='no', min_checkpoint_frequency='100ms'", + optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + }) defer closeFeed(t, feed) jf := feed.(cdctest.EnterpriseTestFeed) @@ -9773,7 +9869,9 @@ func TestDistSenderRangeFeedPopulatesVirtualTable(t *testing.T) { var cf cdctest.TestFeed asUser(t, f, `feedCreator`, func(userDB *sqlutils.SQLRunner) { - cf = feed(t, f, `CREATE CHANGEFEED FOR table_a;`) + cf = feed(t, f, `CREATE CHANGEFEED FOR table_a;`, optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) }) defer closeFeed(t, cf) @@ -10059,7 +10157,9 @@ func TestChangefeedOnlyInitialScanCSV(t *testing.T) { sqlDB.CheckQueryResultsRetry(t, `SELECT count(*) FROM foo,bar`, [][]string{{`9`}}) - feed := feed(t, f, testData.changefeedStmt) + feed := feed(t, f, testData.changefeedStmt, optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) sqlDB.Exec(t, "INSERT INTO foo VALUES (4, 'Doug'), (5, 'Elaine'), (6, 'Fred')") sqlDB.Exec(t, "INSERT INTO bar VALUES (4, 'd'), (5, 'e'), (6, 'f')") @@ -10860,7 +10960,9 @@ func TestAlterChangefeedTelemetryLogs(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`, optOutOfMetamorphicDBLevelChangefeed{ + reason: "db-level changefeed doesn't support ALTER CHANGEFEED ADD/DROP", + }) defer closeFeed(t, testFeed) feed := testFeed.(cdctest.EnterpriseTestFeed) @@ -11148,7 +11250,10 @@ func TestChangefeedKafkaMessageTooLarge(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE large (a INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO large (a) SELECT * FROM generate_series(1, 2000);`) - foo := feed(t, f, `CREATE CHANGEFEED FOR large WITH kafka_sink_config='{"Flush": {"MaxMessages": 1000}}'`) + foo := feed(t, f, `CREATE CHANGEFEED FOR large WITH kafka_sink_config='{"Flush": {"MaxMessages": 1000}}'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, foo) rnd, _ := randutil.NewPseudoRand() @@ -11219,7 +11324,10 @@ func TestChangefeedKafkaMessageTooLarge(t *testing.T) { } { t.Run(fmt.Sprintf(`eventually surface error for retry: %s`, failTest.errMsg), func(t *testing.T) { knobs.kafkaInterceptor = failTest.failInterceptor - foo := feed(t, f, `CREATE CHANGEFEED FOR errors WITH kafka_sink_config='{"Flush": {"MaxMessages": 0}}'`) + foo := feed(t, f, `CREATE CHANGEFEED FOR errors WITH kafka_sink_config='{"Flush": {"MaxMessages": 0}}'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, foo) feedJob := foo.(cdctest.EnterpriseTestFeed) diff --git a/pkg/ccl/changefeedccl/encoder_test.go b/pkg/ccl/changefeedccl/encoder_test.go index c9a6252b6fd7..4bb82138d0fe 100644 --- a/pkg/ccl/changefeedccl/encoder_test.go +++ b/pkg/ccl/changefeedccl/encoder_test.go @@ -601,7 +601,11 @@ func TestAvroEnum(t *testing.T) { sqlDB.Exec(t, `INSERT INTO soft_deletes values (0, 'active')`) sd := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR soft_deletes `+ - `WITH format=%s`, changefeedbase.OptFormatAvro)) + `WITH format=%s`, changefeedbase.OptFormatAvro), + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }, + ) defer closeFeed(t, sd) assertPayloads(t, sd, []string{ `soft_deletes: {"a":{"long":0},"b":{"string":"active"}}->{"after":{"soft_deletes":{"a":{"long":0},"b":{"string":"active"},"c":{"long":0}}}}`, @@ -639,9 +643,10 @@ func TestAvroSchemaNaming(t *testing.T) { sqlDB.Exec(t, `INSERT INTO movr.drivers VALUES (1, 'Alice')`, ) - movrFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s`, changefeedbase.OptFormatAvro)) + `WITH format=%s`, changefeedbase.OptFormatAvro), optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + }) defer closeFeed(t, movrFeed) foo := movrFeed.(*kafkaFeed) @@ -656,7 +661,10 @@ func TestAvroSchemaNaming(t *testing.T) { }) fqnFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s, full_table_name`, changefeedbase.OptFormatAvro)) + `WITH format=%s, full_table_name`, changefeedbase.OptFormatAvro), + optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + }) defer closeFeed(t, fqnFeed) foo = fqnFeed.(*kafkaFeed) @@ -671,8 +679,10 @@ func TestAvroSchemaNaming(t *testing.T) { }) prefixFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s, avro_schema_prefix=super`, - changefeedbase.OptFormatAvro)) + `WITH format=%s, avro_schema_prefix=super`, changefeedbase.OptFormatAvro), + optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + }) defer closeFeed(t, prefixFeed) foo = prefixFeed.(*kafkaFeed) @@ -687,7 +697,10 @@ func TestAvroSchemaNaming(t *testing.T) { }) prefixFQNFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s, avro_schema_prefix=super, full_table_name`, changefeedbase.OptFormatAvro)) + `WITH format=%s, avro_schema_prefix=super, full_table_name`, changefeedbase.OptFormatAvro), + optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + }) defer closeFeed(t, prefixFQNFeed) foo = prefixFQNFeed.(*kafkaFeed) @@ -707,7 +720,10 @@ func TestAvroSchemaNaming(t *testing.T) { sqlDB.Exec(t, `ALTER TABLE movr.drivers ADD COLUMN vehicle_id int CREATE FAMILY volatile`) multiFamilyFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s, %s`, changefeedbase.OptFormatAvro, changefeedbase.OptSplitColumnFamilies)) + `WITH format=%s, %s`, changefeedbase.OptFormatAvro, changefeedbase.OptSplitColumnFamilies), + optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + }) defer closeFeed(t, multiFamilyFeed) foo = multiFamilyFeed.(*kafkaFeed) @@ -744,8 +760,15 @@ func TestAvroSchemaNamespace(t *testing.T) { `INSERT INTO movr.drivers VALUES (1, 'Alice')`, ) - noNamespaceFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s`, changefeedbase.OptFormatAvro)) + noNamespaceFeed := feed(t, f, + fmt.Sprintf( + `CREATE CHANGEFEED FOR movr.drivers WITH format=%s`, + changefeedbase.OptFormatAvro, + ), + optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + }, + ) defer closeFeed(t, noNamespaceFeed) assertPayloads(t, noNamespaceFeed, []string{ @@ -758,7 +781,10 @@ func TestAvroSchemaNamespace(t *testing.T) { require.NotContains(t, foo.registry.SchemaForSubject(`drivers-value`), `namespace`) namespaceFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s, avro_schema_prefix=super`, changefeedbase.OptFormatAvro)) + `WITH format=%s, avro_schema_prefix=super`, changefeedbase.OptFormatAvro), + optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + }) defer closeFeed(t, namespaceFeed) foo = namespaceFeed.(*kafkaFeed) @@ -787,7 +813,10 @@ func TestAvroSchemaHasExpectedTopLevelFields(t *testing.T) { ) foo := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s`, changefeedbase.OptFormatAvro)) + `WITH format=%s`, changefeedbase.OptFormatAvro), + optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + }) defer closeFeed(t, foo) assertPayloads(t, foo, []string{ diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index b21664222e74..f554da511eaa 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -1147,6 +1147,13 @@ type optOutOfMetamorphicEnrichedEnvelope struct { reason string } +// Set this to always be true for now while we find flakes. +var forceDBLevelChangefeed = true + +type optOutOfMetamorphicDBLevelChangefeed struct { + reason string +} + func feed( t testing.TB, f cdctest.TestFeedFactory, create string, args ...interface{}, ) cdctest.TestFeed { @@ -1157,6 +1164,11 @@ func feed( t.Fatal(err) } + create, args, err = maybeForceDBLevelChangefeed(t, create, f, args) + if err != nil { + t.Fatal(err) + } + feed, err := f.Feed(create, args...) if err != nil { t.Fatal(err) @@ -1171,6 +1183,123 @@ func feed( return feed } +func maybeForceDBLevelChangefeed( + t testing.TB, create string, f cdctest.TestFeedFactory, args []any, +) (newCreate string, newArgs []any, err error) { + if !forceDBLevelChangefeed { + return create, args, nil + } + + for i, arg := range args { + if o, ok := arg.(optOutOfMetamorphicDBLevelChangefeed); ok { + t.Logf("opted out of DB level changefeed for %s: %s", create, o.reason) + newArgs = slices.Clone(args) + newArgs = slices.Delete(newArgs, i, i+1) + return create, newArgs, nil + } + } + + switch f := f.(type) { + case *externalConnectionFeedFactory: + return maybeForceDBLevelChangefeed(t, create, f.TestFeedFactory, args) + case *sinklessFeedFactory: + t.Logf("did not force DB level changefeed for %s because %T is not supported", create, f) + return create, args, nil + } + + createStmt, err := parser.ParseOne(create) + if err != nil { + return "", nil, err + } + createAST := createStmt.AST.(*tree.CreateChangefeed) + if createAST.Select != nil { + t.Logf("did not force DB level changefeed for %s because it is a CDC query", create) + return create, args, nil + } + + if createAST.Level == tree.ChangefeedLevelDatabase { + t.Logf("did not force DB level changefeed for %s because it is already a DB level changefeed", create) + return create, args, nil + } + + opts := createAST.Options + for _, opt := range opts { + key := opt.Key.String() + if strings.EqualFold(key, "initial_scan") { + if opt.Value != nil && strings.EqualFold(opt.Value.String(), "'only'") { + t.Logf("did not force DB level changefeed for %s because it set initial scan only", create) + return create, args, nil + } + } + if strings.EqualFold(key, "initial_scan_only") { + t.Logf("did not force DB level changefeed for %s because it set initial scan only", create) + return create, args, nil + } + // Since DB level feeds set split column families, and split column families is incompatible + // with resolved for kafka and pubsub sinks, we skip DB level feeds metamorphic testing in + // that case. + if strings.EqualFold(key, "resolved") { + switch f.(type) { + case *kafkaFeedFactory, *pubsubFeedFactory: + t.Logf("did not force DB level changefeed for %s because it set resolved", create) + return create, args, nil + } + } + } + + for _, target := range createAST.TableTargets { + if target.FamilyName != "" { + t.Logf("did not force DB level changefeed for %s because it has column family %s", create, target.FamilyName) + return create, args, nil + } + } + + // Keep the options as is but make it a DB level changefeed. + createStmt.AST.(*tree.CreateChangefeed).Level = tree.ChangefeedLevelDatabase + createStmt.AST.(*tree.CreateChangefeed).TableTargets = nil + createStmt.AST.(*tree.CreateChangefeed).DatabaseTarget = tree.ChangefeedDatabaseTarget("d") + + // Unlike table-level changefeeds, database-level changefeeds do not perform + // an initial scan by default. To convert a default table level feed into an + // equivalent DB level feed, we need to explicitly set the initial scan type. + if isUsingDefaultInitialScan(opts) { + createStmt.AST.(*tree.CreateChangefeed).Options = append(createStmt.AST.(*tree.CreateChangefeed).Options, tree.KVOption{ + Key: "initial_scan", + Value: tree.NewDString("yes"), + }) + } + + create = tree.AsStringWithFlags(createStmt.AST, tree.FmtShowPasswords) + t.Logf("forcing DB level changefeed for %T - %s", f, create) + return create, args, nil +} + +// isUsingDefaultInitialScan returns true if the changefeed is does not specify +// an initial scan type and does not have a cursor, resulting in the default +// behavior of the initial scan. +func isUsingDefaultInitialScan(opts []tree.KVOption) bool { + initialScanType := "" + hasCursor := false + for _, opt := range opts { + key := opt.Key.String() + if strings.EqualFold(key, "initial_scan") { + if opt.Value != nil { + initialScanType = opt.Value.String() + } + } + if strings.EqualFold(key, "no_initial_scan") { + initialScanType = "'no'" + } + if strings.EqualFold(key, "initial_scan_only") { + initialScanType = "'only'" + } + if strings.EqualFold(key, "cursor") { + hasCursor = true + } + } + return initialScanType == "" && !hasCursor +} + func maybeForceEnrichedEnvelope( t testing.TB, create string, f cdctest.TestFeedFactory, args []any, ) (newCreate string, newArgs []any, forced bool, _ error) { diff --git a/pkg/ccl/changefeedccl/protected_timestamps_test.go b/pkg/ccl/changefeedccl/protected_timestamps_test.go index 875ea7b5aed8..ca80743221b4 100644 --- a/pkg/ccl/changefeedccl/protected_timestamps_test.go +++ b/pkg/ccl/changefeedccl/protected_timestamps_test.go @@ -380,8 +380,13 @@ func TestChangefeedAlterPTS(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `CREATE TABLE foo2 (a INT PRIMARY KEY, b STRING)`) - f2 := feed(t, f, `CREATE CHANGEFEED FOR table foo with protect_data_from_gc_on_pause, - resolved='1s', min_checkpoint_frequency='1s'`) + f2 := feed(t, f, + `CREATE CHANGEFEED FOR table foo with protect_data_from_gc_on_pause, + resolved='1s', min_checkpoint_frequency='1s'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed creates multiple tables but doesn't watch all of them", + }, + ) defer closeFeed(t, f2) getNumPTSRecords := func() int { diff --git a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go index 2401163b0227..204e5841a1e7 100644 --- a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go +++ b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go @@ -68,7 +68,11 @@ func TestShowChangefeedJobsDatabaseLevel(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO bar VALUES (1, 'initial')`) - tcf := feed(t, f, `CREATE CHANGEFEED FOR d.foo, d.bar`) + tcf := feed(t, f, `CREATE CHANGEFEED FOR d.foo, d.bar`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test asserts how DB-level and table-level changefeeds differ", + }, + ) defer closeFeed(t, tcf) assertPayloads(t, tcf, []string{ `foo: [0]->{"after": {"a": 0, "b": "initial"}}`, @@ -159,7 +163,11 @@ func TestShowChangefeedJobsBasic(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) - foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH format='json'`) + foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH format='json'`, + optOutOfMetamorphicDBLevelChangefeed{ + // NB: We test WITH WATCHED_TABLES in another test. This one specifically does not. + reason: "db level changefeeds don't have full_table_names without WITH WATCHED_TABLES", + }) defer closeFeed(t, foo) type row struct { @@ -290,7 +298,9 @@ func TestShowChangefeedJobsRedacted(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { foo := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR TABLE foo INTO '%s'`, tc.uri), - optOutOfMetamorphicEnrichedEnvelope{reason: "compares text of changefeed statement"}) + optOutOfMetamorphicEnrichedEnvelope{reason: "compares text of changefeed statement"}, + optOutOfMetamorphicDBLevelChangefeed{reason: "compares text of changefeed statement"}, + ) defer closeFeed(t, foo) efoo, ok := foo.(cdctest.EnterpriseTestFeed) @@ -552,7 +562,10 @@ func TestShowChangefeedJobsAlterChangefeed(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`) - foo := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicEnrichedEnvelope{reason: "compares text of changefeed statement"}) + foo := feed(t, f, `CREATE CHANGEFEED FOR foo`, + optOutOfMetamorphicEnrichedEnvelope{reason: "compares text of changefeed statement"}, + optOutOfMetamorphicDBLevelChangefeed{reason: "compares text of changefeed statement"}, + ) defer closeFeed(t, foo) feed, ok := foo.(cdctest.EnterpriseTestFeed) @@ -649,7 +662,8 @@ func TestShowChangefeedJobsAuthorization(t *testing.T) { var jobID jobspb.JobID createFeed := func(stmt string) { - successfulFeed := feed(t, f, stmt) + successfulFeed := feed(t, f, stmt, optOutOfMetamorphicDBLevelChangefeed{ + reason: "tests table level changefeed permissions"}) defer closeFeed(t, successfulFeed) _, err := successfulFeed.Next() require.NoError(t, err)