Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 44 additions & 10 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,9 @@ func TestAlterChangefeedAddTargetAfterInitialScan(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b INT)`)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
})
defer closeFeed(t, testFeed)

feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
Expand Down Expand Up @@ -493,7 +495,11 @@ func TestAlterChangefeedDropTargetAfterTableDrop(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH on_error='pause'`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH on_error='pause'`,
optOutOfMetamorphicDBLevelChangefeed{
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
},
)
defer closeFeed(t, testFeed)

feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
Expand Down Expand Up @@ -668,7 +674,9 @@ func TestAlterChangefeedErrors(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{
reason: "test initializes multiple tables but doesn't watch all of them",
})
defer closeFeed(t, testFeed)

feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
Expand Down Expand Up @@ -799,7 +807,10 @@ func TestAlterChangefeedTelemetry(t *testing.T) {
// Reset the counts.
_ = telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`,
optOutOfMetamorphicDBLevelChangefeed{
reason: "test initializes multiple tables but doesn't watch all of them",
})
defer closeFeed(t, testFeed)
feed := testFeed.(cdctest.EnterpriseTestFeed)

Expand Down Expand Up @@ -1068,7 +1079,10 @@ func TestAlterChangefeedDatabaseQualifiedNames(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE d.users (id INT PRIMARY KEY, name STRING)`)
sqlDB.Exec(t, `INSERT INTO d.drivers VALUES (1, 'Alice')`)
sqlDB.Exec(t, `INSERT INTO d.users VALUES (1, 'Bob')`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR d.drivers WITH resolved = '100ms', diff`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR d.drivers WITH resolved = '100ms', diff`,
optOutOfMetamorphicDBLevelChangefeed{
reason: "test initializes multiple tables but doesn't watch all of them",
})
defer closeFeed(t, testFeed)

assertPayloads(t, testFeed, []string{
Expand Down Expand Up @@ -1119,7 +1133,11 @@ func TestAlterChangefeedDatabaseScope(t *testing.T) {
`INSERT INTO new_movr.drivers VALUES (1, 'Bob')`,
)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff`,
optOutOfMetamorphicDBLevelChangefeed{
reason: "changefeed watches tables not in the default database",
},
)
defer closeFeed(t, testFeed)

assertPayloads(t, testFeed, []string{
Expand Down Expand Up @@ -1162,7 +1180,10 @@ func TestAlterChangefeedDatabaseScopeUnqualifiedName(t *testing.T) {
)

sqlDB.Exec(t, `USE movr`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR drivers WITH diff, resolved = '100ms'`)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR drivers WITH diff, resolved = '100ms'`,
optOutOfMetamorphicDBLevelChangefeed{
reason: "test initializes multiple tables but doesn't watch all of them",
})
defer closeFeed(t, testFeed)

assertPayloads(t, testFeed, []string{
Expand Down Expand Up @@ -1211,6 +1232,9 @@ func TestAlterChangefeedColumnFamilyDatabaseScope(t *testing.T) {
if _, ok := f.(*webhookFeedFactory); ok {
args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "metamorphic enriched envelope does not support column families for webhook sinks"})
}
args = append(args, optOutOfMetamorphicDBLevelChangefeed{
reason: "changefeed watches tables not in the default database",
})
testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff, split_column_families`, args...)
defer closeFeed(t, testFeed)

Expand Down Expand Up @@ -1263,6 +1287,9 @@ func TestAlterChangefeedAlterTableName(t *testing.T) {
if _, ok := f.(*webhookFeedFactory); ok {
args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "see comment"})
}
args = append(args, optOutOfMetamorphicDBLevelChangefeed{
reason: "changefeed watches tables not in the default database",
})

testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.users WITH diff, resolved = '100ms'`, args...)
defer closeFeed(t, testFeed)
Expand Down Expand Up @@ -1551,7 +1578,10 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {

registry := s.Server.JobRegistry().(*jobs.Registry)
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo
WITH resolved = '100ms', min_checkpoint_frequency='1ns'`)
WITH resolved = '100ms', min_checkpoint_frequency='1ns'`,
optOutOfMetamorphicDBLevelChangefeed{
reason: "test initializes multiple tables but doesn't watch all of them",
})

g := ctxgroup.WithContext(context.Background())
g.Go(func() error {
Expand Down Expand Up @@ -1861,7 +1891,9 @@ func TestAlterChangefeedAccessControl(t *testing.T) {
rootDB := sqlutils.MakeSQLRunner(s.DB)

createFeed := func(stmt string) (cdctest.EnterpriseTestFeed, func()) {
successfulFeed := feed(t, f, stmt)
successfulFeed := feed(t, f, stmt, optOutOfMetamorphicDBLevelChangefeed{
reason: "test initializes multiple tables but doesn't watch all of them",
})
closeCf := func() {
closeFeed(t, successfulFeed)
}
Expand Down Expand Up @@ -2050,7 +2082,9 @@ func TestAlterChangefeedRandomizedTargetChanges(t *testing.T) {
createStmt := fmt.Sprintf(
`CREATE CHANGEFEED FOR %s WITH updated`, strings.Join(initialTables, ", "))
t.Log(createStmt)
testFeed := feed(t, f, createStmt)
testFeed := feed(t, f, createStmt, optOutOfMetamorphicDBLevelChangefeed{
reason: "db level feeds don't support ALTERing targets with ADD/DROP TARGETS",
})
defer closeFeed(t, testFeed)

feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
Expand Down
Loading
Loading