Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 72 additions & 18 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ func TestAlterChangefeedAddTarget(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
})
defer closeFeed(t, testFeed)

feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
Expand Down Expand Up @@ -261,7 +263,9 @@ func TestAlterChangefeedAddTargetAfterInitialScan(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b INT)`)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
})
defer closeFeed(t, testFeed)

feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
Expand Down Expand Up @@ -458,7 +462,9 @@ func TestAlterChangefeedDropTarget(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`, optOutOfMetamorphicDBLevelChangefeed{
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
})
defer closeFeed(t, testFeed)

feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
Expand Down Expand Up @@ -493,7 +499,11 @@ func TestAlterChangefeedDropTargetAfterTableDrop(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH on_error='pause'`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH on_error='pause'`,
optOutOfMetamorphicDBLevelChangefeed{
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
},
)
defer closeFeed(t, testFeed)

feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
Expand Down Expand Up @@ -668,7 +678,9 @@ func TestAlterChangefeedErrors(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{
reason: "test initializes multiple tables but doesn't watch all of them",
})
defer closeFeed(t, testFeed)

feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
Expand Down Expand Up @@ -765,7 +777,9 @@ func TestAlterChangefeedDropAllTargetsError(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`, optOutOfMetamorphicDBLevelChangefeed{
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
})
defer closeFeed(t, testFeed)

feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
Expand Down Expand Up @@ -799,7 +813,10 @@ func TestAlterChangefeedTelemetry(t *testing.T) {
// Reset the counts.
_ = telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`,
optOutOfMetamorphicDBLevelChangefeed{
reason: "test initializes multiple tables but doesn't watch all of them",
})
defer closeFeed(t, testFeed)
feed := testFeed.(cdctest.EnterpriseTestFeed)

Expand Down Expand Up @@ -994,7 +1011,10 @@ func TestAlterChangefeedAddTargetErrors(t *testing.T) {
return true, nil
}

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '100ms'`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '100ms'`,
optOutOfMetamorphicDBLevelChangefeed{
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
})

// Kafka feeds are not buffered, so we have to consume messages.
g := ctxgroup.WithContext(context.Background())
Expand Down Expand Up @@ -1068,7 +1088,10 @@ func TestAlterChangefeedDatabaseQualifiedNames(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE d.users (id INT PRIMARY KEY, name STRING)`)
sqlDB.Exec(t, `INSERT INTO d.drivers VALUES (1, 'Alice')`)
sqlDB.Exec(t, `INSERT INTO d.users VALUES (1, 'Bob')`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR d.drivers WITH resolved = '100ms', diff`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR d.drivers WITH resolved = '100ms', diff`,
optOutOfMetamorphicDBLevelChangefeed{
reason: "test initializes multiple tables but doesn't watch all of them",
})
defer closeFeed(t, testFeed)

assertPayloads(t, testFeed, []string{
Expand Down Expand Up @@ -1119,7 +1142,11 @@ func TestAlterChangefeedDatabaseScope(t *testing.T) {
`INSERT INTO new_movr.drivers VALUES (1, 'Bob')`,
)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff`,
optOutOfMetamorphicDBLevelChangefeed{
reason: "changefeed watches tables not in the default database",
},
)
defer closeFeed(t, testFeed)

assertPayloads(t, testFeed, []string{
Expand Down Expand Up @@ -1162,7 +1189,10 @@ func TestAlterChangefeedDatabaseScopeUnqualifiedName(t *testing.T) {
)

sqlDB.Exec(t, `USE movr`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR drivers WITH diff, resolved = '100ms'`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR drivers WITH diff, resolved = '100ms'`,
optOutOfMetamorphicDBLevelChangefeed{
reason: "changefeed watches tables not in the default database",
})
defer closeFeed(t, testFeed)

assertPayloads(t, testFeed, []string{
Expand Down Expand Up @@ -1211,6 +1241,9 @@ func TestAlterChangefeedColumnFamilyDatabaseScope(t *testing.T) {
if _, ok := f.(*webhookFeedFactory); ok {
args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "metamorphic enriched envelope does not support column families for webhook sinks"})
}
args = append(args, optOutOfMetamorphicDBLevelChangefeed{
reason: "changefeed watches tables not in the default database",
})
testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff, split_column_families`, args...)
defer closeFeed(t, testFeed)

Expand Down Expand Up @@ -1263,6 +1296,9 @@ func TestAlterChangefeedAlterTableName(t *testing.T) {
if _, ok := f.(*webhookFeedFactory); ok {
args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "see comment"})
}
args = append(args, optOutOfMetamorphicDBLevelChangefeed{
reason: "changefeed watches tables not in the default database",
})

testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.users WITH diff, resolved = '100ms'`, args...)
defer closeFeed(t, testFeed)
Expand Down Expand Up @@ -1350,7 +1386,10 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) {
}

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo
WITH resolved = '1s', no_initial_scan, min_checkpoint_frequency='1ns'`)
WITH resolved = '1s', no_initial_scan, min_checkpoint_frequency='1ns'`,
optOutOfMetamorphicDBLevelChangefeed{
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
})
jobFeed := testFeed.(cdctest.EnterpriseTestFeed)
jobRegistry := s.Server.JobRegistry().(*jobs.Registry)

Expand Down Expand Up @@ -1551,7 +1590,10 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {

registry := s.Server.JobRegistry().(*jobs.Registry)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo
WITH resolved = '100ms', min_checkpoint_frequency='1ns'`)
WITH resolved = '100ms', min_checkpoint_frequency='1ns'`,
optOutOfMetamorphicDBLevelChangefeed{
reason: "test initializes multiple tables but doesn't watch all of them",
})

g := ctxgroup.WithContext(context.Background())
g.Go(func() error {
Expand Down Expand Up @@ -1691,7 +1733,10 @@ func TestAlterChangefeedDropTargetDuringInitialScan(t *testing.T) {
if rnd.Intn(2) == 0 {
targets = "bar, foo"
}
testFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED for %s`, targets))
testFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED for %s`, targets),
optOutOfMetamorphicDBLevelChangefeed{
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
})
defer closeFeed(t, testFeed)

// Wait for all spans to have been resolved.
Expand Down Expand Up @@ -1742,7 +1787,10 @@ func TestAlterChangefeedInitialScan(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO bar VALUES (1), (2), (3)`)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '1s', no_initial_scan`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '1s', no_initial_scan`,
optOutOfMetamorphicDBLevelChangefeed{
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
})
defer closeFeed(t, testFeed)

expectResolvedTimestamp(t, testFeed)
Expand Down Expand Up @@ -1861,7 +1909,9 @@ func TestAlterChangefeedAccessControl(t *testing.T) {
rootDB := sqlutils.MakeSQLRunner(s.DB)

createFeed := func(stmt string) (cdctest.EnterpriseTestFeed, func()) {
successfulFeed := feed(t, f, stmt)
successfulFeed := feed(t, f, stmt, optOutOfMetamorphicDBLevelChangefeed{
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
})
closeCf := func() {
closeFeed(t, successfulFeed)
}
Expand Down Expand Up @@ -1933,7 +1983,9 @@ func TestAlterChangefeedAddDropSameTarget(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
})
defer closeFeed(t, testFeed)

feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
Expand Down Expand Up @@ -2050,7 +2102,9 @@ func TestAlterChangefeedRandomizedTargetChanges(t *testing.T) {
createStmt := fmt.Sprintf(
`CREATE CHANGEFEED FOR %s WITH updated`, strings.Join(initialTables, ", "))
t.Log(createStmt)
testFeed := feed(t, f, createStmt)
testFeed := feed(t, f, createStmt, optOutOfMetamorphicDBLevelChangefeed{
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
})
defer closeFeed(t, testFeed)

feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
Expand Down
Loading
Loading