From 10a5244d07b02c5efd6c371318f0fe0778f8c6d3 Mon Sep 17 00:00:00 2001 From: Aerin Freilich Date: Tue, 4 Nov 2025 14:46:25 -0500 Subject: [PATCH 1/2] changefeedccl: change db-level feed default to no initial scan This change the default initial scan behavior for db-level feeds to not do an initial scan if no option is specified. When an initial scan is specified, that is respected. We do not allow db-level changefeeds with initial_scan='only'. This includes a rewrite of existing db-level feeds tests which rely on default initial scans in their testing. Epic: CRDB-1421 Informs: #156484 Release note: None --- pkg/ccl/changefeedccl/changefeed_stmt.go | 22 +- pkg/ccl/changefeedccl/changefeed_test.go | 513 +++++++++++------- .../show_changefeed_jobs_test.go | 5 +- 3 files changed, 352 insertions(+), 188 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 7bbaa0a3e620..993c1fed7ee9 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -230,9 +230,18 @@ func changefeedPlanHook( return nil, nil, false, err } - // Treat all tables inside a database as if "split_column_families" is set. if changefeedStmt.Level == tree.ChangefeedLevelDatabase { + // Treat all tables inside a database as if "split_column_families" is set. rawOpts[changefeedbase.OptSplitColumnFamilies] = `yes` + + // The default behavior for a database-level changefeed is + // NOT to perform an initial scan, unlike table-level changefeeds. + _, initialScanSet := rawOpts[changefeedbase.OptInitialScan] + _, initialScanOnlySet := rawOpts[changefeedbase.OptInitialScanOnly] + _, noInitialScanSet := rawOpts[changefeedbase.OptNoInitialScan] + if !initialScanOnlySet && !noInitialScanSet && !initialScanSet { + rawOpts[changefeedbase.OptInitialScan] = `no` + } } opts := changefeedbase.MakeStatementOptions(rawOpts) @@ -1462,6 +1471,17 @@ func validateDetailsAndOptions( if err := opts.ValidateForCreateChangefeed(details.Select != ""); err != nil { return err } + targetSpecs := details.TargetSpecifications + if len(targetSpecs) != 0 && targetSpecs[0].Type == jobspb.ChangefeedTargetSpecification_DATABASE { + scanType, err := opts.GetInitialScanType() + if err != nil { + return err + } + if scanType == changefeedbase.OnlyInitialScan { + return errors.Errorf( + `cannot specify %s on a database level changefeed`, changefeedbase.OptInitialScanOnly) + } + } if opts.HasEndTime() { scanType, err := opts.GetInitialScanType() if err != nil { diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 219c14db4b08..e584df3d2dfc 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -172,22 +172,11 @@ func TestDatabaseLevelChangefeedBasics(t *testing.T) { testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) - sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`) - sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`) sqlDB.Exec(t, `CREATE TABLE foo2 (a INT PRIMARY KEY, b STRING)`) - sqlDB.Exec(t, `INSERT INTO foo2 VALUES (0, 'initial')`) - sqlDB.Exec(t, `UPSERT INTO foo2 VALUES (0, 'updated')`) foo := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d`) defer closeFeed(t, foo) - // 'initial' is skipped because only the latest value ('updated') is - // emitted by the initial scan. - assertPayloads(t, foo, []string{ - `foo: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo2: [0]->{"after": {"a": 0, "b": "updated"}}`, - }) - sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b')`) assertPayloads(t, foo, []string{ `foo: [1]->{"after": {"a": 1, "b": "a"}}`, @@ -223,178 +212,244 @@ func TestDatabaseLevelChangefeedBasics(t *testing.T) { }) } - cdcTest(t, testFn) + // TODO(#158105): Add support for sinkless in db-level feeds tests. + cdcTest(t, testFn, feedTestEnterpriseSinks) } -func TestDatabaseLevelChangefeedWithIncludeFilter(t *testing.T) { +func TestDatabaseLevelChangefeedWithFilter(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { - sqlDB := sqlutils.MakeSQLRunner(s.DB) - for i := range 4 { - name := fmt.Sprintf("foo%d", i+1) - sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s (a INT PRIMARY KEY, b STRING)`, name)) - sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (0, 'initial')`, name)) - sqlDB.Exec(t, fmt.Sprintf(`UPSERT INTO %s VALUES (0, 'updated')`, name)) - } - - sqlDB.Exec(t, `CREATE SCHEMA private`) - sqlDB.Exec(t, `CREATE TABLE private.foo1 (a INT PRIMARY KEY, b STRING)`) - sqlDB.Exec(t, `INSERT INTO private.foo1 VALUES (0, 'initial')`) - // Test that if there are multiple tables with the same name the correct - // one will still be found using the default schema of public. - feed1 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1`) - defer closeFeed(t, feed1) - assertPayloads(t, feed1, []string{ - `foo1: [0]->{"after": {"a": 0, "b": "updated"}}`, - }) + type testCase struct { + name string + createStmt string + expectedPayloads []string + expectedErr string + } - // Test that we can include multiple tables. - feed2 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1,foo2`) - defer closeFeed(t, feed2) - assertPayloads(t, feed2, []string{ - `foo1: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo2: [0]->{"after": {"a": 0, "b": "updated"}}`, - }) + // Each test case will create 5 tables: foo1, foo2, foo3, foo4, and private.foo1. + // Then the test case creates the changefeed with the statement shown and + // inserts a row into each table. + testCases := []testCase{ + { + name: "include_single_table", + createStmt: "CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1", + expectedPayloads: []string{ + `foo1: [1]->{"after": {"a": 1, "b": "foo1"}}`, + }, + }, + { + name: "include_multiple_tables", + createStmt: "CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1, foo2", + expectedPayloads: []string{ + `foo1: [1]->{"after": {"a": 1, "b": "foo1"}}`, + `foo2: [1]->{"after": {"a": 1, "b": "foo2"}}`, + }, + }, + { + name: "include_qualified_names", + createStmt: "CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES d.public.foo2, public.foo3, foo4", + expectedPayloads: []string{ + `foo2: [1]->{"after": {"a": 1, "b": "foo2"}}`, + `foo3: [1]->{"after": {"a": 1, "b": "foo3"}}`, + `foo4: [1]->{"after": {"a": 1, "b": "foo4"}}`, + }, + }, + { + name: "include_nonexistent_table", + createStmt: "CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1, bob", + expectedPayloads: []string{ + `foo1: [1]->{"after": {"a": 1, "b": "foo1"}}`, + }, + }, + { + name: "include_partially_qualified_as_schema_table", + createStmt: "CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES d.foo1, public.foo2", + expectedPayloads: []string{ + `foo2: [1]->{"after": {"a": 1, "b": "foo2"}}`, + }, + }, + { + // Test that if there are multiple tables with the same name the correct + // one will still be found using the default schema of public. + name: "exclude_potentially_ambiguous_table_name", + createStmt: "CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo1", + expectedPayloads: []string{ + `foo1: [1]->{"after": {"a": 1, "b": "private.foo1"}}`, + `foo2: [1]->{"after": {"a": 1, "b": "foo2"}}`, + `foo3: [1]->{"after": {"a": 1, "b": "foo3"}}`, + `foo4: [1]->{"after": {"a": 1, "b": "foo4"}}`, + }, + }, + { + name: "exclude_single_table", + createStmt: "CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo2", + expectedPayloads: []string{ + `foo1: [1]->{"after": {"a": 1, "b": "foo1"}}`, + `foo1: [1]->{"after": {"a": 1, "b": "private.foo1"}}`, + `foo3: [1]->{"after": {"a": 1, "b": "foo3"}}`, + `foo4: [1]->{"after": {"a": 1, "b": "foo4"}}`, + }, + }, + { + name: "exclude_multiple_tables", + createStmt: "CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo2, foo3", + expectedPayloads: []string{ + `foo1: [1]->{"after": {"a": 1, "b": "foo1"}}`, + `foo1: [1]->{"after": {"a": 1, "b": "private.foo1"}}`, + `foo4: [1]->{"after": {"a": 1, "b": "foo4"}}`, + }, + }, + { + name: "exclude_qualified_names", + createStmt: "CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES d.public.foo2, public.foo3, foo4", + expectedPayloads: []string{ + `foo1: [1]->{"after": {"a": 1, "b": "foo1"}}`, + `foo1: [1]->{"after": {"a": 1, "b": "private.foo1"}}`, + }, + }, + { + name: "exclude_nonexistent_table", + createStmt: "CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES bob", + expectedPayloads: []string{ + `foo1: [1]->{"after": {"a": 1, "b": "foo1"}}`, + `foo1: [1]->{"after": {"a": 1, "b": "private.foo1"}}`, + `foo2: [1]->{"after": {"a": 1, "b": "foo2"}}`, + `foo3: [1]->{"after": {"a": 1, "b": "foo3"}}`, + `foo4: [1]->{"after": {"a": 1, "b": "foo4"}}`, + }, + }, + { + name: "include_error_table_outside_database", + createStmt: "CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES fizz.buzz.foo", + expectedErr: `table "fizz.buzz.foo" must be in target database "d"`, + }, + { + name: "include_error_table_pattern", + createStmt: "CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo.*", + expectedErr: `at or near "*": syntax error`, + }, + { + name: "exclude_error_table_outside_database", + createStmt: "CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES fizz.buzz.foo", + expectedErr: `table "fizz.buzz.foo" must be in target database "d"`, + }, + { + name: "exclude_error_table_pattern", + createStmt: "CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo.*", + expectedErr: `at or near "*": syntax error`, + }, + } - // Test that we can handle fully qualified, partially qualified, and unqualified - // table names. - feed3 := feed(t, f, - `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES d.public.foo2, public.foo3, foo4`) - defer closeFeed(t, feed3) - assertPayloads(t, feed3, []string{ - `foo2: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo3: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo4: [0]->{"after": {"a": 0, "b": "updated"}}`, - }) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) - // Test that we can handle tables that don't exist. - feed4 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1,bob`) - defer closeFeed(t, feed4) - assertPayloads(t, feed4, []string{ - `foo1: [0]->{"after": {"a": 0, "b": "updated"}}`, - }) + for i := range 4 { + name := fmt.Sprintf("foo%d", i+1) + sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s (a INT PRIMARY KEY, b STRING)`, name)) + } + sqlDB.Exec(t, `CREATE SCHEMA private`) + sqlDB.Exec(t, `CREATE TABLE private.foo1 (a INT PRIMARY KEY, b STRING)`) - // Test that fully qualified table names outside of the target database will - // cause an error. - expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES fizz.buzz.foo`, - `table "fizz.buzz.foo" must be in target database "d"`) + if tc.expectedErr != "" { + expectErrCreatingFeed(t, f, tc.createStmt, tc.expectedErr) + return + } - // Table patterns are not supported. - expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo.*`, - `at or near "*": syntax error`) + feed := feed(t, f, tc.createStmt) + defer closeFeed(t, feed) - // Test that name resolution is not dependent on search_path() or current DB. - sqlDB.Exec(t, `CREATE DATABASE notd`) - sqlDB.Exec(t, `USE notd`) - sqlDB.Exec(t, `SET search_path TO notpublic`) - feed5 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1, private.foo1`) - defer closeFeed(t, feed5) - assertPayloads(t, feed5, []string{ - `foo1: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo1: [0]->{"after": {"a": 0, "b": "initial"}}`, - }) + for i := range 4 { + name := fmt.Sprintf("foo%d", i+1) + sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (1, '%s')`, name, name)) + } + sqlDB.Exec(t, `INSERT INTO private.foo1 VALUES (1, 'private.foo1')`) - // Test that partially qualified table names are always assumed to be - // .. - feed6 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES d.foo1, public.foo2`) - defer closeFeed(t, feed6) - assertPayloads(t, feed6, []string{ - `foo2: [0]->{"after": {"a": 0, "b": "updated"}}`, + assertPayloads(t, feed, tc.expectedPayloads) + } + cdcTest(t, testFn, feedTestEnterpriseSinks) }) } - cdcTest(t, testFn, feedTestEnterpriseSinks) } -func TestDatabaseLevelChangefeedWithExcludeFilter(t *testing.T) { +func TestDatabaseLevelChangefeedNameResolutionIsSearchPathIndependent(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { - sqlDB := sqlutils.MakeSQLRunner(s.DB) - for i := range 4 { - name := fmt.Sprintf("foo%d", i+1) - sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s (a INT PRIMARY KEY, b STRING)`, name)) - sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (0, 'initial')`, name)) - sqlDB.Exec(t, fmt.Sprintf(`UPSERT INTO %s VALUES (0, 'updated')`, name)) - } - - sqlDB.Exec(t, `CREATE SCHEMA private`) - sqlDB.Exec(t, `CREATE TABLE private.foo1 (a INT PRIMARY KEY, b STRING)`) - sqlDB.Exec(t, `INSERT INTO private.foo1 VALUES (0, 'initial')`) - // Test that if there are multiple tables with the same name the correct - // one will still be found using the default schema of public. - feed1 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo1`) - defer closeFeed(t, feed1) - assertPayloads(t, feed1, []string{ - `foo1: [0]->{"after": {"a": 0, "b": "initial"}}`, - `foo2: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo3: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo4: [0]->{"after": {"a": 0, "b": "updated"}}`, - }) - - feed2 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo2`) - defer closeFeed(t, feed2) - assertPayloads(t, feed2, []string{ - `foo1: [0]->{"after": {"a": 0, "b": "initial"}}`, - `foo1: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo3: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo4: [0]->{"after": {"a": 0, "b": "updated"}}`, - }) - - // Test that we can exclude multiple tables. - feed3 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo2,foo3`) - defer closeFeed(t, feed3) - assertPayloads(t, feed3, []string{ - `foo1: [0]->{"after": {"a": 0, "b": "initial"}}`, - `foo1: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo4: [0]->{"after": {"a": 0, "b": "updated"}}`, - }) - - // Test that we can handle fully qualified, partially qualified, and unqualified - // table names. - feed4 := feed(t, f, - `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES d.public.foo2, public.foo3, foo4`) - defer closeFeed(t, feed4) - assertPayloads(t, feed4, []string{ - `foo1: [0]->{"after": {"a": 0, "b": "initial"}}`, - `foo1: [0]->{"after": {"a": 0, "b": "updated"}}`, - }) - - // Test that we can handle tables that don't exist. - feed5 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES bob`) - defer closeFeed(t, feed5) - assertPayloads(t, feed5, []string{ - `foo1: [0]->{"after": {"a": 0, "b": "initial"}}`, - `foo1: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo2: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo3: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo4: [0]->{"after": {"a": 0, "b": "updated"}}`, - }) - - // Test that fully qualified table names outside of the target database will - // cause an error. - expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES fizz.buzz.foo`, - `table "fizz.buzz.foo" must be in target database "d"`) - - // Table patterns are not supported. - expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo.*`, - `at or near "*": syntax error`) - - // Test that name resolution is not dependent on search_path() or current DB - sqlDB.Exec(t, `CREATE DATABASE notd`) - sqlDB.Exec(t, `USE notd`) - sqlDB.Exec(t, `SET search_path TO notpublic`) - feed6 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo1, private.foo1`) - defer closeFeed(t, feed6) - assertPayloads(t, feed6, []string{ - `foo2: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo3: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo4: [0]->{"after": {"a": 0, "b": "updated"}}`, + type testCase struct { + name string + createStmt string + expectedPayloads []string + } + + // Each test case will create 5 tables: foo1, foo2, foo3, foo4, and private.foo1. + // Then the test case creates the changefeed with the statement shown and + // inserts a row into each table. + testCases := []testCase{ + { + name: "include_filter", + createStmt: "CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1", + expectedPayloads: []string{ + `foo1: [1]->{"after": {"a": 1, "b": "foo1"}}`, + }, + }, + { + name: "include_filter_with_private_schema", + createStmt: "CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1, private.foo1", + expectedPayloads: []string{ + `foo1: [1]->{"after": {"a": 1, "b": "foo1"}}`, + `foo1: [1]->{"after": {"a": 1, "b": "private.foo1"}}`, + }, + }, + { + name: "exclude_filter", + createStmt: "CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo1, private.foo1", + expectedPayloads: []string{ + `foo2: [1]->{"after": {"a": 1, "b": "foo2"}}`, + `foo3: [1]->{"after": {"a": 1, "b": "foo3"}}`, + `foo4: [1]->{"after": {"a": 1, "b": "foo4"}}`, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + for i := range 4 { + name := fmt.Sprintf("foo%d", i+1) + sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s (a INT PRIMARY KEY, b STRING)`, name)) + } + sqlDB.Exec(t, `CREATE SCHEMA private`) + sqlDB.Exec(t, `CREATE TABLE private.foo1 (a INT PRIMARY KEY, b STRING)`) + + // Switch to a different database and search_path to show that it + // doesn't affect table resolution for the changefeed. + sqlDB.Exec(t, `CREATE DATABASE notd`) + sqlDB.Exec(t, `USE notd`) + sqlDB.Exec(t, `SET search_path TO notpublic`) + sqlDB.Exec(t, `CREATE TABLE notd.foo1 (a INT PRIMARY KEY, b STRING)`) + + // Create feed - should resolve tables relative to database d, not current search_path + feed := feed(t, f, tc.createStmt) + defer closeFeed(t, feed) + + sqlDB.Exec(t, `INSERT INTO notd.foo1 VALUES (0, 'notd.notpublic.foo1')`) + sqlDB.Exec(t, `INSERT INTO d.private.foo1 VALUES (1, 'private.foo1')`) + for i := range 4 { + name := fmt.Sprintf("foo%d", i+1) + sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO d.%s VALUES (1, '%s')`, name, name)) + } + + assertPayloads(t, feed, tc.expectedPayloads) + } + + cdcTest(t, testFn, feedTestEnterpriseSinks) }) } - cdcTest(t, testFn, feedTestEnterpriseSinks) } func TestChangefeedBasicQuery(t *testing.T) { @@ -1150,21 +1205,10 @@ func TestDatabaseLevelChangefeedDiff(t *testing.T) { testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) - sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`) - sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`) sqlDB.Exec(t, `CREATE TABLE foo2 (a INT PRIMARY KEY, b STRING)`) - sqlDB.Exec(t, `INSERT INTO foo2 VALUES (0, 'initial')`) - sqlDB.Exec(t, `UPSERT INTO foo2 VALUES (0, 'updated')`) d := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d WITH diff`) defer closeFeed(t, d) - // 'initial' is skipped because only the latest value ('updated') is - // emitted by the initial scan. - assertPayloads(t, d, []string{ - `foo: [0]->{"after": {"a": 0, "b": "updated"}, "before": null}`, - `foo2: [0]->{"after": {"a": 0, "b": "updated"}, "before": null}`, - }) - sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b')`) assertPayloads(t, d, []string{ `foo: [1]->{"after": {"a": 1, "b": "a"}, "before": null}`, @@ -1209,7 +1253,7 @@ func TestDatabaseLevelChangefeedDiff(t *testing.T) { }) } - cdcTest(t, testFn) + cdcTest(t, testFn, feedTestEnterpriseSinks) } func TestChangefeedTenants(t *testing.T) { @@ -12686,7 +12730,7 @@ func TestChangefeedBareFullProtobuf(t *testing.T) { } } -func TestDatabaseRenameDuringDatabaseLevelChangefeed(t *testing.T) { +func TestDatabaseLevelChangefeedRenameDatabase(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -12694,12 +12738,14 @@ func TestDatabaseRenameDuringDatabaseLevelChangefeed(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE DATABASE foo;`) sqlDB.Exec(t, `CREATE TABLE foo.bar (id INT PRIMARY KEY);`) + + feed1 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE foo`) + defer closeFeed(t, feed1) + sqlDB.Exec(t, `INSERT INTO foo.bar VALUES (1);`) expectedRows := []string{ `bar: [1]->{"after": {"id": 1}}`, } - feed1 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE foo`) - defer closeFeed(t, feed1) assertPayloads(t, feed1, expectedRows) sqlDB.Exec(t, `ALTER DATABASE foo RENAME TO bar;`) @@ -12711,22 +12757,23 @@ func TestDatabaseRenameDuringDatabaseLevelChangefeed(t *testing.T) { } // TODO(#152196): Remove feedTestUseRootUserConnection once we have ALTER // DEFAULT PRIVILEGES for databases - cdcTest(t, testFn, feedTestUseRootUserConnection) + cdcTest(t, testFn, feedTestEnterpriseSinks, feedTestUseRootUserConnection) } -func TestTableRenameDuringDatabaseLevelChangefeed(t *testing.T) { +func TestDatabaseLevelChangefeedRenameTable(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE TABLE d.bar (id INT PRIMARY KEY);`) + feed1 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d`) + defer closeFeed(t, feed1) + sqlDB.Exec(t, `INSERT INTO d.bar VALUES (1);`) expectedRows := []string{ `bar: [1]->{"after": {"id": 1}}`, } - feed1 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d`) - defer closeFeed(t, feed1) assertPayloads(t, feed1, expectedRows) sqlDB.Exec(t, `ALTER TABLE d.bar RENAME TO foo;`) @@ -12736,7 +12783,7 @@ func TestTableRenameDuringDatabaseLevelChangefeed(t *testing.T) { } assertPayloads(t, feed1, expectedRows) } - cdcTest(t, testFn) + cdcTest(t, testFn, feedTestEnterpriseSinks) } func getChangefeedLoggingChannel(sv *settings.Values) logpb.Channel { @@ -12821,7 +12868,7 @@ func TestDatabaseLevelChangefeedChangingTableset(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO bar VALUES (0, 'initial')`) }() - foo := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d`) + foo := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d WITH initial_scan='yes'`) defer closeFeed(t, foo) assertPayloads(t, foo, []string{ `foo: [0]->{"after": {"a": 0, "b": "updated"}}`, @@ -12913,3 +12960,103 @@ func TestDatabaseLevelChangefeedChangingTableset(t *testing.T) { } }) } + +// TestDatabaseLevelChangefeedWithInitialScanOptions tests the different initial +// scan options for a database-level changefeed. It makes sure that the default +// is NOT to perform an initial scan, unlike table-level changefeeds, but that +// it still respects the initial_scan_only option. No initial scan is not supported +// no matter how it is specified. +func TestDatabaseLevelChangefeedWithInitialScanOptions(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + type testCase struct { + name string + option string + expectedBehavior string + } + + tests := []testCase{ + { + option: "no_initial_scan", + expectedBehavior: "no", + }, + { + option: "initial_scan = 'only'", + expectedBehavior: "only (error)", + }, + { + option: "initial_scan_only", + expectedBehavior: "only (error)", + }, + { + option: "initial_scan = 'yes'", + expectedBehavior: "yes", + }, + { + option: "initial_scan", + expectedBehavior: "yes", + }, + { + name: "no_option_specified", + option: "", + expectedBehavior: "no", + }, + { + option: "initial_scan = 'no'", + expectedBehavior: "no", + }, + } + + for _, tc := range tests { + if tc.name == "" { + tc.name = tc.option + } + t.Run(tc.name, func(t *testing.T) { + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`) + sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO bar VALUES (0, 'initial')`) + + var createStmt string + if tc.option == "" { + createStmt = `CREATE CHANGEFEED FOR DATABASE d` + } else { + createStmt = fmt.Sprintf(`CREATE CHANGEFEED FOR DATABASE d WITH %s`, tc.option) + } + + if tc.expectedBehavior == "only (error)" { + expectedErr := fmt.Sprintf( + `cannot specify %s on a database level changefeed`, + changefeedbase.OptInitialScanOnly, + ) + sqlDB.ExpectErrWithTimeout( + t, expectedErr, createStmt, + ) + return + } + + feed := feed(t, f, createStmt) + defer closeFeed(t, feed) + + if tc.expectedBehavior == "yes" { + assertPayloads(t, feed, []string{ + `foo: [0]->{"after": {"a": 0, "b": "initial"}}`, + `bar: [0]->{"after": {"a": 0, "b": "initial"}}`, + }) + } + + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'new')`) + sqlDB.Exec(t, `INSERT INTO bar VALUES (1, 'new')`) + + assertPayloads(t, feed, []string{ + `foo: [1]->{"after": {"a": 1, "b": "new"}}`, + `bar: [1]->{"after": {"a": 1, "b": "new"}}`, + }) + } + cdcTest(t, testFn, feedTestEnterpriseSinks) + }) + } +} diff --git a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go index 6a763cdaeba8..9161916fad64 100644 --- a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go +++ b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go @@ -78,10 +78,7 @@ func TestShowChangefeedJobsDatabaseLevel(t *testing.T) { dbcf := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d`) defer closeFeed(t, dbcf) - assertPayloads(t, dbcf, []string{ - `foo: [0]->{"after": {"a": 0, "b": "initial"}}`, - `bar: [1]->{"after": {"a": 1, "b": "initial"}}`, - }) + // Database level changefeeds do not perform an initial scan. waitForJobState(sqlDB, t, dbcf.(cdctest.EnterpriseTestFeed).JobID(), jobs.StateRunning) t.Run("without watched tables", func(t *testing.T) { From ac46fc2e3de2b67ffc5c9c444320e18e08679558 Mon Sep 17 00:00:00 2001 From: Aerin Freilich Date: Tue, 4 Nov 2025 17:33:22 -0500 Subject: [PATCH 2/2] changefeedccl: add metamorphic testing for db-level feeds This adds metamorphic testing of db-level feeds, making table level feeds into db-level feeds unless the test explicitly opts out or is otherwise setting changefeed options that aren't compatible with db-level feeds. Epic: CRDB-1421 Fixes: #148858 --- .../changefeedccl/alter_changefeed_test.go | 52 ++++- pkg/ccl/changefeedccl/changefeed.go | 24 +- pkg/ccl/changefeedccl/changefeed_test.go | 213 ++++++++++++++---- pkg/ccl/changefeedccl/encoder_test.go | 34 ++- pkg/ccl/changefeedccl/helpers_test.go | 110 +++++++++ .../protected_timestamps_test.go | 4 +- .../show_changefeed_jobs_test.go | 24 +- 7 files changed, 388 insertions(+), 73 deletions(-) diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index 000970181306..776f1f08ec5d 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -261,7 +261,9 @@ func TestAlterChangefeedAddTargetAfterInitialScan(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b INT)`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{ + reason: "db level changefeeds don't support ALTER CHANGEFEED commands with initial_scan", + }) defer closeFeed(t, testFeed) feed, ok := testFeed.(cdctest.EnterpriseTestFeed) @@ -493,7 +495,11 @@ func TestAlterChangefeedDropTargetAfterTableDrop(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH on_error='pause'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH on_error='pause'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs", + }, + ) defer closeFeed(t, testFeed) feed, ok := testFeed.(cdctest.EnterpriseTestFeed) @@ -668,7 +674,9 @@ func TestAlterChangefeedErrors(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, testFeed) feed, ok := testFeed.(cdctest.EnterpriseTestFeed) @@ -799,7 +807,10 @@ func TestAlterChangefeedTelemetry(t *testing.T) { // Reset the counts. _ = telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, testFeed) feed := testFeed.(cdctest.EnterpriseTestFeed) @@ -1068,7 +1079,10 @@ func TestAlterChangefeedDatabaseQualifiedNames(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE d.users (id INT PRIMARY KEY, name STRING)`) sqlDB.Exec(t, `INSERT INTO d.drivers VALUES (1, 'Alice')`) sqlDB.Exec(t, `INSERT INTO d.users VALUES (1, 'Bob')`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR d.drivers WITH resolved = '100ms', diff`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR d.drivers WITH resolved = '100ms', diff`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, testFeed) assertPayloads(t, testFeed, []string{ @@ -1119,7 +1133,11 @@ func TestAlterChangefeedDatabaseScope(t *testing.T) { `INSERT INTO new_movr.drivers VALUES (1, 'Bob')`, ) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + }, + ) defer closeFeed(t, testFeed) assertPayloads(t, testFeed, []string{ @@ -1162,7 +1180,10 @@ func TestAlterChangefeedDatabaseScopeUnqualifiedName(t *testing.T) { ) sqlDB.Exec(t, `USE movr`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR drivers WITH diff, resolved = '100ms'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR drivers WITH diff, resolved = '100ms'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, testFeed) assertPayloads(t, testFeed, []string{ @@ -1211,6 +1232,9 @@ func TestAlterChangefeedColumnFamilyDatabaseScope(t *testing.T) { if _, ok := f.(*webhookFeedFactory); ok { args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "metamorphic enriched envelope does not support column families for webhook sinks"}) } + args = append(args, optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + }) testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff, split_column_families`, args...) defer closeFeed(t, testFeed) @@ -1263,6 +1287,7 @@ func TestAlterChangefeedAlterTableName(t *testing.T) { if _, ok := f.(*webhookFeedFactory); ok { args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "see comment"}) } + args = append(args, optOutOfMetamorphicDBLevelChangefeed{reason: "uses non default DB"}) testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.users WITH diff, resolved = '100ms'`, args...) defer closeFeed(t, testFeed) @@ -1551,7 +1576,10 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { registry := s.Server.JobRegistry().(*jobs.Registry) testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo -WITH resolved = '100ms', min_checkpoint_frequency='1ns'`) +WITH resolved = '100ms', min_checkpoint_frequency='1ns'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) g := ctxgroup.WithContext(context.Background()) g.Go(func() error { @@ -1861,7 +1889,9 @@ func TestAlterChangefeedAccessControl(t *testing.T) { rootDB := sqlutils.MakeSQLRunner(s.DB) createFeed := func(stmt string) (cdctest.EnterpriseTestFeed, func()) { - successfulFeed := feed(t, f, stmt) + successfulFeed := feed(t, f, stmt, optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) closeCf := func() { closeFeed(t, successfulFeed) } @@ -2050,7 +2080,9 @@ func TestAlterChangefeedRandomizedTargetChanges(t *testing.T) { createStmt := fmt.Sprintf( `CREATE CHANGEFEED FOR %s WITH updated`, strings.Join(initialTables, ", ")) t.Log(createStmt) - testFeed := feed(t, f, createStmt) + testFeed := feed(t, f, createStmt, optOutOfMetamorphicDBLevelChangefeed{ + reason: "db level feeds don't support ALTERing targets with ADD/DROP TARGETS", + }) defer closeFeed(t, testFeed) feed, ok := testFeed.(cdctest.EnterpriseTestFeed) diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go index 8c8409754907..afa844a14850 100644 --- a/pkg/ccl/changefeedccl/changefeed.go +++ b/pkg/ccl/changefeedccl/changefeed.go @@ -79,7 +79,8 @@ func AllTargets( if len(cd.TargetSpecifications) > 1 { return changefeedbase.Targets{}, errors.AssertionFailedf("database-level changefeed is not supported with multiple targets") } - targets, err = getTargetsFromDatabaseSpec(ctx, ts, execCfg, timestamp) + _, useFullTableName := cd.Opts[changefeedbase.OptFullTableName] + targets, err = getTargetsFromDatabaseSpec(ctx, ts, execCfg, timestamp, useFullTableName) if err != nil { return changefeedbase.Targets{}, err } @@ -117,6 +118,7 @@ func getTargetsFromDatabaseSpec( ts jobspb.ChangefeedTargetSpecification, execCfg *sql.ExecutorConfig, timestamp hlc.Timestamp, + useFullTableName bool, ) (targets changefeedbase.Targets, err error) { err = sql.DescsTxn(ctx, execCfg, func( ctx context.Context, txn isql.Txn, descs *descs.Collection, @@ -172,15 +174,21 @@ func getTargetsFromDatabaseSpec( tableType = jobspb.ChangefeedTargetSpecification_EACH_FAMILY } + tableName := func() string { + if useFullTableName { + return fullyQualifiedTableName + } + return desc.GetName() + }() targets.Add(changefeedbase.Target{ Type: tableType, DescID: desc.GetID(), - StatementTimeName: changefeedbase.StatementTimeName(desc.GetName()), + StatementTimeName: changefeedbase.StatementTimeName(tableName), }) } case tree.IncludeFilter: - for name := range ts.FilterList.Tables { - tn, err := parser.ParseTableName(name) + for fullyQualifiedTableName := range ts.FilterList.Tables { + tn, err := parser.ParseTableName(fullyQualifiedTableName) if err != nil { return err } @@ -215,10 +223,16 @@ func getTargetsFromDatabaseSpec( tableType = jobspb.ChangefeedTargetSpecification_EACH_FAMILY } + tableName := func() string { + if useFullTableName { + return fullyQualifiedTableName + } + return desc.GetName() + }() targets.Add(changefeedbase.Target{ Type: tableType, DescID: tableID, - StatementTimeName: changefeedbase.StatementTimeName(desc.GetName()), + StatementTimeName: changefeedbase.StatementTimeName(tableName), }) } default: diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index e584df3d2dfc..8dd5fe395075 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -954,8 +954,14 @@ func TestChangefeedIdleness(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) sqlDB.Exec(t, `CREATE TABLE bar (b INT PRIMARY KEY)`) - cf1 := feed(t, f, "CREATE CHANGEFEED FOR TABLE foo WITH resolved='10ms'") // higher resolved frequency for faster test - cf2 := feed(t, f, "CREATE CHANGEFEED FOR TABLE bar WITH resolved='10ms'") + cf1 := feed(t, f, "CREATE CHANGEFEED FOR TABLE foo WITH resolved='10ms'", // higher resolved frequency for faster test + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) + cf2 := feed(t, f, "CREATE CHANGEFEED FOR TABLE bar WITH resolved='10ms'", + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, cf1) go workload() @@ -1418,6 +1424,30 @@ func TestChangefeedFullTableName(t *testing.T) { }) } +func TestDatabaseLevelChangefeedWithFullTableName(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + + normal := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d WITH full_table_name`) + defer closeFeed(t, normal) + include := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo WITH full_table_name`) + defer closeFeed(t, include) + exclude := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES bar WITH full_table_name`) + defer closeFeed(t, exclude) + + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a')`) + assertPayloads(t, normal, []string{`d.public.foo: [1]->{"after": {"a": 1, "b": "a"}}`}) + assertPayloads(t, include, []string{`d.public.foo: [1]->{"after": {"a": 1, "b": "a"}}`}) + assertPayloads(t, exclude, []string{`d.public.foo: [1]->{"after": {"a": 1, "b": "a"}}`}) + } + + cdcTest(t, testFn, feedTestEnterpriseSinks) +} + func TestChangefeedMultiTable(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1883,12 +1913,10 @@ func TestChangefeedInitialScan(t *testing.T) { defer log.Scope(t).Close(t) noInitialScanTests := map[string]string{ - `no cursor - no initial scan`: `CREATE CHANGEFEED FOR no_initial_scan WITH no_initial_scan, resolved='1s'`, `no cursor - no initial backfill`: `CREATE CHANGEFEED FOR no_initial_scan WITH initial_scan = 'no', resolved='1s'`, } initialScanTests := map[string]string{ - `cursor - with initial scan`: `CREATE CHANGEFEED FOR initial_scan WITH initial_scan, resolved='1s', cursor='%s'`, `cursor - with initial backfill`: `CREATE CHANGEFEED FOR initial_scan WITH initial_scan = 'yes', resolved='1s', cursor='%s'`, } @@ -2470,6 +2498,9 @@ func TestChangefeedColumnDropsOnMultipleFamiliesWithTheSameName(t *testing.T) { if _, ok := f.(*webhookFeedFactory); ok { args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "metamorphic enriched envelope does not support column families for webhook sinks"}) } + args = append(args, optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) // Open up the changefeed. cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c, TABLE alsohasfams FAMILY id_a`, args...) @@ -2721,7 +2752,10 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { sqlDB.Exec(t, `ALTER TABLE historical ADD COLUMN c INT`) sqlDB.Exec(t, `INSERT INTO historical (a) VALUES (3)`) sqlDB.Exec(t, `INSERT INTO historical (a, c) VALUES (4, 14)`) - historical := feed(t, f, `CREATE CHANGEFEED FOR historical WITH cursor=$1`, start) + historical := feed(t, f, `CREATE CHANGEFEED FOR historical WITH cursor=$1`, start, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, historical) assertPayloads(t, historical, []string{ `historical: [0]->{"after": {"a": 0, "b": "0"}}`, @@ -2736,7 +2770,10 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { // NB: the default is a nullable column sqlDB.Exec(t, `CREATE TABLE add_column (a INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO add_column VALUES (1)`) - addColumn := feed(t, f, `CREATE CHANGEFEED FOR add_column`) + addColumn := feed(t, f, `CREATE CHANGEFEED FOR add_column`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, addColumn) assertPayloads(t, addColumn, []string{ `add_column: [1]->{"after": {"a": 1}}`, @@ -2751,7 +2788,10 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`rename column`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE rename_column (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO rename_column VALUES (1, '1')`) - renameColumn := feed(t, f, `CREATE CHANGEFEED FOR rename_column`) + renameColumn := feed(t, f, `CREATE CHANGEFEED FOR rename_column`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, renameColumn) assertPayloads(t, renameColumn, []string{ `rename_column: [1]->{"after": {"a": 1, "b": "1"}}`, @@ -2766,7 +2806,10 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`add default`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE add_default (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO add_default (a, b) VALUES (1, '1')`) - addDefault := feed(t, f, `CREATE CHANGEFEED FOR add_default`) + addDefault := feed(t, f, `CREATE CHANGEFEED FOR add_default`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, addDefault) sqlDB.Exec(t, `ALTER TABLE add_default ALTER COLUMN b SET DEFAULT 'd'`) sqlDB.Exec(t, `INSERT INTO add_default (a) VALUES (2)`) @@ -2779,7 +2822,10 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`drop default`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE drop_default (a INT PRIMARY KEY, b STRING DEFAULT 'd')`) sqlDB.Exec(t, `INSERT INTO drop_default (a) VALUES (1)`) - dropDefault := feed(t, f, `CREATE CHANGEFEED FOR drop_default`) + dropDefault := feed(t, f, `CREATE CHANGEFEED FOR drop_default`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, dropDefault) sqlDB.Exec(t, `ALTER TABLE drop_default ALTER COLUMN b DROP DEFAULT`) sqlDB.Exec(t, `INSERT INTO drop_default (a) VALUES (2)`) @@ -2792,7 +2838,10 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`drop not null`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE drop_notnull (a INT PRIMARY KEY, b STRING NOT NULL)`) sqlDB.Exec(t, `INSERT INTO drop_notnull VALUES (1, '1')`) - dropNotNull := feed(t, f, `CREATE CHANGEFEED FOR drop_notnull`) + dropNotNull := feed(t, f, `CREATE CHANGEFEED FOR drop_notnull`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, dropNotNull) sqlDB.Exec(t, `ALTER TABLE drop_notnull ALTER b DROP NOT NULL`) sqlDB.Exec(t, `INSERT INTO drop_notnull VALUES (2, NULL)`) @@ -2805,7 +2854,10 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`checks`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE checks (a INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO checks VALUES (1)`) - checks := feed(t, f, `CREATE CHANGEFEED FOR checks`) + checks := feed(t, f, `CREATE CHANGEFEED FOR checks`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, checks) sqlDB.Exec(t, `ALTER TABLE checks ADD CONSTRAINT c CHECK (a < 5) NOT VALID`) sqlDB.Exec(t, `INSERT INTO checks VALUES (2)`) @@ -2824,7 +2876,10 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`add index`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE add_index (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO add_index VALUES (1, '1')`) - addIndex := feed(t, f, `CREATE CHANGEFEED FOR add_index`) + addIndex := feed(t, f, `CREATE CHANGEFEED FOR add_index`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, addIndex) sqlDB.Exec(t, `CREATE INDEX b_idx ON add_index (b)`) sqlDB.Exec(t, `SELECT * FROM add_index@b_idx`) @@ -2838,7 +2893,10 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`unique`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE "unique" (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO "unique" VALUES (1, '1')`) - unique := feed(t, f, `CREATE CHANGEFEED FOR "unique"`) + unique := feed(t, f, `CREATE CHANGEFEED FOR "unique"`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, unique) sqlDB.Exec(t, `ALTER TABLE "unique" ADD CONSTRAINT u UNIQUE (b)`) sqlDB.Exec(t, `INSERT INTO "unique" VALUES (2, '2')`) @@ -2852,7 +2910,10 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { sqlDB.Exec( t, `CREATE TABLE alter_default (a INT PRIMARY KEY, b STRING DEFAULT 'before')`) sqlDB.Exec(t, `INSERT INTO alter_default (a) VALUES (1)`) - alterDefault := feed(t, f, `CREATE CHANGEFEED FOR alter_default`) + alterDefault := feed(t, f, `CREATE CHANGEFEED FOR alter_default`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, alterDefault) sqlDB.Exec(t, `ALTER TABLE alter_default ALTER COLUMN b SET DEFAULT 'after'`) sqlDB.Exec(t, `INSERT INTO alter_default (a) VALUES (2)`) @@ -2866,7 +2927,10 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`add column with DEFAULT NULL`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE t (id INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO t VALUES (1)`) - defaultNull := feed(t, f, `CREATE CHANGEFEED FOR t`) + defaultNull := feed(t, f, `CREATE CHANGEFEED FOR t`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, defaultNull) sqlDB.Exec(t, `ALTER TABLE t ADD COLUMN c INT DEFAULT NULL`) sqlDB.Exec(t, `INSERT INTO t VALUES (2, 2)`) @@ -3411,7 +3475,10 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE add_col_comp (a INT PRIMARY KEY, b INT AS (a + 5) STORED)`) sqlDB.Exec(t, `INSERT INTO add_col_comp VALUES (1)`) sqlDB.Exec(t, `INSERT INTO add_col_comp (a) VALUES (2)`) - addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`) + addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, addColComp) assertPayloadsStripTs(t, addColComp, []string{ `add_col_comp: [1]->{"after": {"a": 1, "b": 6}}`, @@ -3437,7 +3504,10 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE drop_column (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO drop_column VALUES (1, '1')`) sqlDB.Exec(t, `INSERT INTO drop_column VALUES (2, '2')`) - dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`) + dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, dropColumn) assertPayloadsStripTs(t, dropColumn, []string{ `drop_column: [1]->{"after": {"a": 1, "b": "1"}}`, @@ -3479,7 +3549,10 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) { Changefeed.(*TestingKnobs) knobs.BeforeEmitRow = waitSinkHook - multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`) + multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, multipleAlters) assertPayloadsStripTs(t, multipleAlters, []string{ `multiple_alters: [1]->{"after": {"a": 1, "b": "1"}}`, @@ -3594,7 +3667,10 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE add_col_comp (a INT PRIMARY KEY, b INT AS (a + 5) STORED)`) sqlDB.Exec(t, `INSERT INTO add_col_comp VALUES (1)`) sqlDB.Exec(t, `INSERT INTO add_col_comp (a) VALUES (2)`) - addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`) + addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, addColComp) assertPayloadsStripTs(t, addColComp, []string{ `add_col_comp: [1]->{"after": {"a": 1, "b": 6}}`, @@ -3614,7 +3690,10 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE drop_column (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO drop_column VALUES (1, '1')`) sqlDB.Exec(t, `INSERT INTO drop_column VALUES (2, '2')`) - dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`) + dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, dropColumn) assertPayloadsStripTs(t, dropColumn, []string{ `drop_column: [1]->{"after": {"a": 1, "b": "1"}}`, @@ -3654,7 +3733,10 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { Changefeed.(*TestingKnobs) knobs.BeforeEmitRow = waitSinkHook - multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`) + multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, multipleAlters) assertPayloadsStripTs(t, multipleAlters, []string{ `multiple_alters: [1]->{"after": {"a": 1, "b": "1"}}`, @@ -4252,7 +4334,8 @@ func TestChangefeedJobControl(t *testing.T) { ChangefeedJobPermissionsTestSetup(t, s) createFeed := func(stmt string) (cdctest.EnterpriseTestFeed, func()) { - successfulFeed := feed(t, f, stmt) + successfulFeed := feed(t, f, stmt, optOutOfMetamorphicDBLevelChangefeed{ + reason: "tests table level changefeed permissions"}) closeCf := func() { closeFeed(t, successfulFeed) } @@ -5683,28 +5766,40 @@ func TestChangefeedLowFrequencyNotices(t *testing.T) { t.Run("no options specified", func(t *testing.T) { actual = "(no notice)" f := makeKafkaFeedFactory(t, s, dbWithHandler) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test requires split_column_families NOT to be set", + }) defer closeFeed(t, testFeed) require.Equal(t, `changefeed will emit to topic _u2603_`, actual) }) t.Run("normal resolved and min_checkpoint_frequency", func(t *testing.T) { actual = "(no notice)" f := makeKafkaFeedFactory(t, s, dbWithHandler) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='10s', min_checkpoint_frequency='10s'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='10s', min_checkpoint_frequency='10s'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test requires split_column_families NOT to be set", + }) defer closeFeed(t, testFeed) require.Equal(t, `changefeed will emit to topic _u2603_`, actual) }) t.Run("low resolved timestamp", func(t *testing.T) { actual = "(no notice)" f := makeKafkaFeedFactory(t, s, dbWithHandler) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='200ms'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='200ms'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test requires split_column_families NOT to be set", + }) defer closeFeed(t, testFeed) require.Equal(t, `the 'resolved' timestamp interval (200ms) is very low; consider increasing it to at least 500ms`, actual) }) t.Run("low min_checkpoint_frequency timestamp", func(t *testing.T) { actual = "(no notice)" f := makeKafkaFeedFactory(t, s, dbWithHandler) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH min_checkpoint_frequency='200ms'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH min_checkpoint_frequency='200ms'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test requires split_column_families NOT to be set", + }) defer closeFeed(t, testFeed) require.Equal(t, `the 'min_checkpoint_frequency' timestamp interval (200ms) is very low; consider increasing it to at least 500ms`, actual) }) @@ -5740,7 +5835,10 @@ func TestChangefeedOutputTopics(t *testing.T) { t.Run("kafka", func(t *testing.T) { actual = "(no notice)" f := makeKafkaFeedFactory(t, s, dbWithHandler) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test requires split_column_families NOT to be set", + }) defer closeFeed(t, testFeed) require.Equal(t, `changefeed will emit to topic _u2603_`, actual) }) @@ -5748,7 +5846,10 @@ func TestChangefeedOutputTopics(t *testing.T) { t.Run("pubsub v2", func(t *testing.T) { actual = "(no notice)" f := makePubsubFeedFactory(s, dbWithHandler) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'gcpubsub://does.not.matter/'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'gcpubsub://does.not.matter/'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test requires split_column_families NOT to be set", + }) defer closeFeed(t, testFeed) // Pubsub doesn't sanitize the topic name. require.Equal(t, `changefeed will emit to topic ☃`, actual) @@ -6535,9 +6636,15 @@ func TestChangefeedTruncateOrDrop(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE truncate_cascade (b INT PRIMARY KEY REFERENCES truncate (a)) WITH (schema_locked=false)`) sqlDB.Exec(t, `BEGIN; INSERT INTO truncate VALUES (1); INSERT INTO truncate_cascade VALUES (1); COMMIT`) - truncate := feed(t, f, `CREATE CHANGEFEED FOR truncate`) + truncate := feed(t, f, `CREATE CHANGEFEED FOR truncate`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, truncate) - truncateCascade := feed(t, f, `CREATE CHANGEFEED FOR truncate_cascade`) + truncateCascade := feed(t, f, `CREATE CHANGEFEED FOR truncate_cascade`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, truncateCascade) assertPayloads(t, truncate, []string{`truncate: [1]->{"after": {"a": 1}}`}) assertPayloads(t, truncateCascade, []string{`truncate_cascade: [1]->{"after": {"b": 1}}`}) @@ -6554,7 +6661,10 @@ func TestChangefeedTruncateOrDrop(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE drop (a INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO drop VALUES (1)`) - drop := feed(t, f, `CREATE CHANGEFEED FOR drop`) + drop := feed(t, f, `CREATE CHANGEFEED FOR drop`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, drop) assertPayloads(t, drop, []string{`drop: [1]->{"after": {"a": 1}}`}) sqlDB.Exec(t, `DROP TABLE drop`) @@ -8181,10 +8291,16 @@ func TestChangefeedTelemetry(t *testing.T) { // Reset the counts. _ = telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts) - // Start some feeds (and read from them to make sure they've started. - foo := feed(t, f, `CREATE CHANGEFEED FOR foo`) + // Start some feeds (and read from them to make sure they've started). + foo := feed(t, f, `CREATE CHANGEFEED FOR foo`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, foo) - fooBar := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH format=json`) + fooBar := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH format=json`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, fooBar) assertPayloads(t, foo, []string{ `foo: [1]->{"after": {"a": 1}}`, @@ -8465,7 +8581,9 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) { defer closeSink() atomic.StoreInt32(&shouldDrain, 1) - feed := feed(t, f, "CREATE CHANGEFEED FOR foo") + feed := feed(t, f, "CREATE CHANGEFEED FOR foo", optOutOfMetamorphicDBLevelChangefeed{ + reason: "doesn't use the default DB", + }) defer closeFeed(t, feed) jobID := feed.(cdctest.EnterpriseTestFeed).JobID() @@ -8630,7 +8748,8 @@ func TestChangefeedHandlesRollingRestart(t *testing.T) { defer closeSink() proceed <- struct{}{} // Allow changefeed to start. - feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH initial_scan='no', min_checkpoint_frequency='100ms'") + feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH initial_scan='no', min_checkpoint_frequency='100ms'", + optOutOfMetamorphicDBLevelChangefeed{reason: "doesn't use the default DB"}) defer closeFeed(t, feed) jf := feed.(cdctest.EnterpriseTestFeed) @@ -9749,7 +9868,9 @@ func TestDistSenderRangeFeedPopulatesVirtualTable(t *testing.T) { var cf cdctest.TestFeed asUser(t, f, `feedCreator`, func(userDB *sqlutils.SQLRunner) { - cf = feed(t, f, `CREATE CHANGEFEED FOR table_a;`) + cf = feed(t, f, `CREATE CHANGEFEED FOR table_a;`, optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) }) defer closeFeed(t, cf) @@ -10035,7 +10156,9 @@ func TestChangefeedOnlyInitialScanCSV(t *testing.T) { sqlDB.CheckQueryResultsRetry(t, `SELECT count(*) FROM foo,bar`, [][]string{{`9`}}) - feed := feed(t, f, testData.changefeedStmt) + feed := feed(t, f, testData.changefeedStmt, optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) sqlDB.Exec(t, "INSERT INTO foo VALUES (4, 'Doug'), (5, 'Elaine'), (6, 'Fred')") sqlDB.Exec(t, "INSERT INTO bar VALUES (4, 'd'), (5, 'e'), (6, 'f')") @@ -10836,7 +10959,9 @@ func TestAlterChangefeedTelemetryLogs(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`, optOutOfMetamorphicDBLevelChangefeed{ + reason: "db-level changefeed doesn't support ALTER CHANGEFEED ADD/DROP", + }) defer closeFeed(t, testFeed) feed := testFeed.(cdctest.EnterpriseTestFeed) @@ -11124,7 +11249,10 @@ func TestChangefeedKafkaMessageTooLarge(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE large (a INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO large (a) SELECT * FROM generate_series(1, 2000);`) - foo := feed(t, f, `CREATE CHANGEFEED FOR large WITH kafka_sink_config='{"Flush": {"MaxMessages": 1000}}'`) + foo := feed(t, f, `CREATE CHANGEFEED FOR large WITH kafka_sink_config='{"Flush": {"MaxMessages": 1000}}'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, foo) rnd, _ := randutil.NewPseudoRand() @@ -11195,7 +11323,10 @@ func TestChangefeedKafkaMessageTooLarge(t *testing.T) { } { t.Run(fmt.Sprintf(`eventually surface error for retry: %s`, failTest.errMsg), func(t *testing.T) { knobs.kafkaInterceptor = failTest.failInterceptor - foo := feed(t, f, `CREATE CHANGEFEED FOR errors WITH kafka_sink_config='{"Flush": {"MaxMessages": 0}}'`) + foo := feed(t, f, `CREATE CHANGEFEED FOR errors WITH kafka_sink_config='{"Flush": {"MaxMessages": 0}}'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, foo) feedJob := foo.(cdctest.EnterpriseTestFeed) diff --git a/pkg/ccl/changefeedccl/encoder_test.go b/pkg/ccl/changefeedccl/encoder_test.go index c9a6252b6fd7..4678be12c091 100644 --- a/pkg/ccl/changefeedccl/encoder_test.go +++ b/pkg/ccl/changefeedccl/encoder_test.go @@ -601,7 +601,9 @@ func TestAvroEnum(t *testing.T) { sqlDB.Exec(t, `INSERT INTO soft_deletes values (0, 'active')`) sd := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR soft_deletes `+ - `WITH format=%s`, changefeedbase.OptFormatAvro)) + `WITH format=%s`, changefeedbase.OptFormatAvro), + optOutOfMetamorphicDBLevelChangefeed{reason: "has unwatched foo table"}, + ) defer closeFeed(t, sd) assertPayloads(t, sd, []string{ `soft_deletes: {"a":{"long":0},"b":{"string":"active"}}->{"after":{"soft_deletes":{"a":{"long":0},"b":{"string":"active"},"c":{"long":0}}}}`, @@ -639,9 +641,11 @@ func TestAvroSchemaNaming(t *testing.T) { sqlDB.Exec(t, `INSERT INTO movr.drivers VALUES (1, 'Alice')`, ) - + optOutOfDBLevelChangefeed := optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + } movrFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s`, changefeedbase.OptFormatAvro)) + `WITH format=%s`, changefeedbase.OptFormatAvro), optOutOfDBLevelChangefeed) defer closeFeed(t, movrFeed) foo := movrFeed.(*kafkaFeed) @@ -656,7 +660,8 @@ func TestAvroSchemaNaming(t *testing.T) { }) fqnFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s, full_table_name`, changefeedbase.OptFormatAvro)) + `WITH format=%s, full_table_name`, changefeedbase.OptFormatAvro), + optOutOfDBLevelChangefeed) defer closeFeed(t, fqnFeed) foo = fqnFeed.(*kafkaFeed) @@ -671,8 +676,8 @@ func TestAvroSchemaNaming(t *testing.T) { }) prefixFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s, avro_schema_prefix=super`, - changefeedbase.OptFormatAvro)) + `WITH format=%s, avro_schema_prefix=super`, changefeedbase.OptFormatAvro), + optOutOfDBLevelChangefeed) defer closeFeed(t, prefixFeed) foo = prefixFeed.(*kafkaFeed) @@ -687,7 +692,8 @@ func TestAvroSchemaNaming(t *testing.T) { }) prefixFQNFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s, avro_schema_prefix=super, full_table_name`, changefeedbase.OptFormatAvro)) + `WITH format=%s, avro_schema_prefix=super, full_table_name`, changefeedbase.OptFormatAvro), + optOutOfDBLevelChangefeed) defer closeFeed(t, prefixFQNFeed) foo = prefixFQNFeed.(*kafkaFeed) @@ -707,7 +713,8 @@ func TestAvroSchemaNaming(t *testing.T) { sqlDB.Exec(t, `ALTER TABLE movr.drivers ADD COLUMN vehicle_id int CREATE FAMILY volatile`) multiFamilyFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s, %s`, changefeedbase.OptFormatAvro, changefeedbase.OptSplitColumnFamilies)) + `WITH format=%s, %s`, changefeedbase.OptFormatAvro, changefeedbase.OptSplitColumnFamilies), + optOutOfDBLevelChangefeed) defer closeFeed(t, multiFamilyFeed) foo = multiFamilyFeed.(*kafkaFeed) @@ -744,8 +751,11 @@ func TestAvroSchemaNamespace(t *testing.T) { `INSERT INTO movr.drivers VALUES (1, 'Alice')`, ) + optOutOfDBLevelChangefeed := optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + } noNamespaceFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s`, changefeedbase.OptFormatAvro)) + `WITH format=%s`, changefeedbase.OptFormatAvro), optOutOfDBLevelChangefeed) defer closeFeed(t, noNamespaceFeed) assertPayloads(t, noNamespaceFeed, []string{ @@ -758,7 +768,8 @@ func TestAvroSchemaNamespace(t *testing.T) { require.NotContains(t, foo.registry.SchemaForSubject(`drivers-value`), `namespace`) namespaceFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s, avro_schema_prefix=super`, changefeedbase.OptFormatAvro)) + `WITH format=%s, avro_schema_prefix=super`, changefeedbase.OptFormatAvro), + optOutOfDBLevelChangefeed) defer closeFeed(t, namespaceFeed) foo = namespaceFeed.(*kafkaFeed) @@ -787,7 +798,8 @@ func TestAvroSchemaHasExpectedTopLevelFields(t *testing.T) { ) foo := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s`, changefeedbase.OptFormatAvro)) + `WITH format=%s`, changefeedbase.OptFormatAvro), + optOutOfMetamorphicDBLevelChangefeed{reason: "uses non default DB"}) defer closeFeed(t, foo) assertPayloads(t, foo, []string{ diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index b21664222e74..9bffd40cf54c 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -1147,6 +1147,10 @@ type optOutOfMetamorphicEnrichedEnvelope struct { reason string } +type optOutOfMetamorphicDBLevelChangefeed struct { + reason string +} + func feed( t testing.TB, f cdctest.TestFeedFactory, create string, args ...interface{}, ) cdctest.TestFeed { @@ -1157,6 +1161,11 @@ func feed( t.Fatal(err) } + create, args, err = maybeForceDBLevelChangefeed(t, create, f, args) + if err != nil { + t.Fatal(err) + } + feed, err := f.Feed(create, args...) if err != nil { t.Fatal(err) @@ -1171,6 +1180,107 @@ func feed( return feed } +func maybeForceDBLevelChangefeed( + t testing.TB, create string, f cdctest.TestFeedFactory, args []any, +) (newCreate string, newArgs []any, err error) { + for i, arg := range args { + if o, ok := arg.(optOutOfMetamorphicDBLevelChangefeed); ok { + t.Logf("opted out of DB level changefeed for %s: %s", create, o.reason) + newArgs = slices.Clone(args) + newArgs = slices.Delete(newArgs, i, i+1) + return create, newArgs, nil + } + } + + switch f := f.(type) { + case *externalConnectionFeedFactory: + return maybeForceDBLevelChangefeed(t, create, f.TestFeedFactory, args) + case *sinklessFeedFactory: + t.Logf("did not force DB level changefeed for %s because %T is not supported", create, f) + return create, args, nil + } + + createStmt, err := parser.ParseOne(create) + if err != nil { + return "", nil, err + } + createAST := createStmt.AST.(*tree.CreateChangefeed) + if createAST.Select != nil { + t.Logf("did not force DB level changefeed for %s because it is a CDC query", create) + return create, args, nil + } + + if createAST.Level == tree.ChangefeedLevelDatabase { + t.Logf("did not force DB level changefeed for %s because it is already a DB level changefeed", create) + return create, args, nil + } + + opts := createAST.Options + // Read the initial scan type from the options so that we can force it to "yes" + // for DB level changefeeds in the default case. + initialScanType := `` + hasCursor := false + for _, opt := range opts { + key := opt.Key.String() + if strings.EqualFold(key, "initial_scan") { + if opt.Value != nil { + initialScanType = opt.Value.String() + } + if strings.EqualFold(initialScanType, "'only'") { + t.Logf("did not force DB level changefeed for %s because it set initial scan only", create) + return create, args, nil + } + } + if strings.EqualFold(key, "initial_scan_only") { + t.Logf("did not force DB level changefeed for %s because it set initial scan only", create) + return create, args, nil + } + if strings.EqualFold(key, "no_initial_scan") { + initialScanType = "no" + } + if strings.EqualFold(key, "cursor") { + hasCursor = true + } + // Since DB level feeds set split column families, and split column families is incompatible + // with resolved for kafka and pubsub sinks, we skip DB level feeds metamorphic testing in + // that case. + if strings.EqualFold(key, "resolved") { + switch f.(type) { + case *kafkaFeedFactory, *pubsubFeedFactory: + t.Logf("did not force DB level changefeed for %s because it set resolved", create) + return create, args, nil + } + } + } + + for _, target := range createAST.TableTargets { + if target.FamilyName != "" { + t.Logf("did not force DB level changefeed for %s because it has column family %s", create, target.FamilyName) + return create, args, nil + } + } + + // Keep the options as is but make it a DB level changefeed. + createStmt.AST.(*tree.CreateChangefeed).Level = tree.ChangefeedLevelDatabase + createStmt.AST.(*tree.CreateChangefeed).TableTargets = nil + createStmt.AST.(*tree.CreateChangefeed).DatabaseTarget = tree.ChangefeedDatabaseTarget("d") + + // Unlike table-level changefeeds, database-level changefeeds do not perform + // an initial scan by default. To convert a default table level feed into an + // equivalent DB level feed, we need to explicitly set the initial scan type. + if initialScanType == "" && !hasCursor { + createStmt.AST.(*tree.CreateChangefeed).Options = append(createStmt.AST.(*tree.CreateChangefeed).Options, tree.KVOption{ + Key: "initial_scan", + Value: tree.NewDString("yes"), + }) + } + t.Logf("forcing DB level changefeed for %s", create) + create = tree.AsStringWithFlags(createStmt.AST, tree.FmtShowPasswords) + + t.Logf("forced DB level changefeed result: %s", create) + return create, args, nil +} + func maybeForceEnrichedEnvelope( t testing.TB, create string, f cdctest.TestFeedFactory, args []any, ) (newCreate string, newArgs []any, forced bool, _ error) { diff --git a/pkg/ccl/changefeedccl/protected_timestamps_test.go b/pkg/ccl/changefeedccl/protected_timestamps_test.go index 875ea7b5aed8..024fadab8457 100644 --- a/pkg/ccl/changefeedccl/protected_timestamps_test.go +++ b/pkg/ccl/changefeedccl/protected_timestamps_test.go @@ -381,7 +381,9 @@ func TestChangefeedAlterPTS(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `CREATE TABLE foo2 (a INT PRIMARY KEY, b STRING)`) f2 := feed(t, f, `CREATE CHANGEFEED FOR table foo with protect_data_from_gc_on_pause, - resolved='1s', min_checkpoint_frequency='1s'`) + resolved='1s', min_checkpoint_frequency='1s'`, optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", + }) defer closeFeed(t, f2) getNumPTSRecords := func() int { diff --git a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go index 9161916fad64..422f56713fd9 100644 --- a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go +++ b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go @@ -68,7 +68,11 @@ func TestShowChangefeedJobsDatabaseLevel(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO bar VALUES (1, 'initial')`) - tcf := feed(t, f, `CREATE CHANGEFEED FOR d.foo, d.bar`) + tcf := feed(t, f, `CREATE CHANGEFEED FOR d.foo, d.bar`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test asserts how DB-level and table-level changefeeds differ", + }, + ) defer closeFeed(t, tcf) assertPayloads(t, tcf, []string{ `foo: [0]->{"after": {"a": 0, "b": "initial"}}`, @@ -158,7 +162,11 @@ func TestShowChangefeedJobsBasic(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) - foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH format='json'`) + foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH format='json'`, + optOutOfMetamorphicDBLevelChangefeed{ + // NB: We test WITH WATCHED_TABLES in another test. This one specifically does not. + reason: "db level changefeeds don't have full_table_names without WITH WATCHED_TABLES", + }) defer closeFeed(t, foo) type row struct { @@ -289,7 +297,9 @@ func TestShowChangefeedJobsRedacted(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { foo := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR TABLE foo INTO '%s'`, tc.uri), - optOutOfMetamorphicEnrichedEnvelope{reason: "compares text of changefeed statement"}) + optOutOfMetamorphicEnrichedEnvelope{reason: "compares text of changefeed statement"}, + optOutOfMetamorphicDBLevelChangefeed{reason: "compares text of changefeed statement"}, + ) defer closeFeed(t, foo) efoo, ok := foo.(cdctest.EnterpriseTestFeed) @@ -551,7 +561,10 @@ func TestShowChangefeedJobsAlterChangefeed(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`) - foo := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicEnrichedEnvelope{reason: "compares text of changefeed statement"}) + foo := feed(t, f, `CREATE CHANGEFEED FOR foo`, + optOutOfMetamorphicEnrichedEnvelope{reason: "compares text of changefeed statement"}, + optOutOfMetamorphicDBLevelChangefeed{reason: "compares text of changefeed statement"}, + ) defer closeFeed(t, foo) feed, ok := foo.(cdctest.EnterpriseTestFeed) @@ -648,7 +661,8 @@ func TestShowChangefeedJobsAuthorization(t *testing.T) { var jobID jobspb.JobID createFeed := func(stmt string) { - successfulFeed := feed(t, f, stmt) + successfulFeed := feed(t, f, stmt, optOutOfMetamorphicDBLevelChangefeed{ + reason: "tests table level changefeed permissions"}) defer closeFeed(t, successfulFeed) _, err := successfulFeed.Next() require.NoError(t, err)