Skip to content

Commit f33561f

Browse files
authored
Merge pull request #154059 from KeithCh/disable-db-level-feeds
release-25.4: changefeedccl: disable DB-level feeds
2 parents b3d1e3a + baa403a commit f33561f

File tree

2 files changed

+47
-5
lines changed

2 files changed

+47
-5
lines changed

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"time"
1414

1515
"github.com/cockroachdb/cockroach/pkg/backup/backupresolver"
16+
"github.com/cockroachdb/cockroach/pkg/build"
1617
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
1718
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1819
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators"
@@ -191,6 +192,13 @@ func changefeedPlanHook(
191192
if changefeedStmt == nil {
192193
return nil, nil, false, nil
193194
}
195+
if changefeedStmt.Level != tree.ChangefeedLevelTable {
196+
return nil, nil, false,
197+
errors.UnimplementedError(
198+
errors.IssueLink{IssueURL: build.MakeIssueURL(154053)},
199+
"database-level changefeed is not implemented yet",
200+
)
201+
}
194202

195203
exprEval := p.ExprEvaluator("CREATE CHANGEFEED")
196204
var sinkURI string

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ func TestChangefeedBasics(t *testing.T) {
166166
func TestDatabaseLevelChangefeedBasics(t *testing.T) {
167167
defer leaktest.AfterTest(t)()
168168
defer log.Scope(t).Close(t)
169+
skip.WithIssue(t, 154053, "unreleased feature")
169170

170171
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
171172
sqlDB := sqlutils.MakeSQLRunner(s.DB)
@@ -227,6 +228,7 @@ func TestDatabaseLevelChangefeedBasics(t *testing.T) {
227228
func TestDatabaseLevelChangefeedWithIncludeFilter(t *testing.T) {
228229
defer leaktest.AfterTest(t)()
229230
defer log.Scope(t).Close(t)
231+
skip.WithIssue(t, 154053, "unreleased feature")
230232

231233
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
232234
expectSuccess := func(stmt string) {
@@ -256,6 +258,7 @@ func TestDatabaseLevelChangefeedWithIncludeFilter(t *testing.T) {
256258
func TestDatabaseLevelChangefeedWithExcludeFilter(t *testing.T) {
257259
defer leaktest.AfterTest(t)()
258260
defer log.Scope(t).Close(t)
261+
skip.WithIssue(t, 154053, "unreleased feature")
259262

260263
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
261264
expectSuccess := func(stmt string) {
@@ -1031,6 +1034,7 @@ func TestChangefeedDiff(t *testing.T) {
10311034
func TestDatabaseLevelChangefeedDiff(t *testing.T) {
10321035
defer leaktest.AfterTest(t)()
10331036
defer log.Scope(t).Close(t)
1037+
skip.WithIssue(t, 154053, "unreleased feature")
10341038

10351039
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
10361040
sqlDB := sqlutils.MakeSQLRunner(s.DB)
@@ -1164,6 +1168,8 @@ func TestMissingTableErr(t *testing.T) {
11641168
func TestChangefeedMissingDatabaseErr(t *testing.T) {
11651169
defer leaktest.AfterTest(t)()
11661170
defer log.Scope(t).Close(t)
1171+
skip.WithIssue(t, 154053, "unreleased feature")
1172+
11671173
cdcTest(t, func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
11681174
expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE foo`, `database "foo" does not exist`)
11691175
})
@@ -1172,6 +1178,8 @@ func TestChangefeedMissingDatabaseErr(t *testing.T) {
11721178
func TestChangefeedCannotTargetSystemDatabaseErr(t *testing.T) {
11731179
defer leaktest.AfterTest(t)()
11741180
defer log.Scope(t).Close(t)
1181+
skip.WithIssue(t, 154053, "unreleased feature")
1182+
11751183
cdcTest(t, func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
11761184
expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE system`, `changefeed cannot target the system database`)
11771185
})
@@ -4003,7 +4011,7 @@ func TestChangefeedCreateAuthorizationWithChangefeedPriv(t *testing.T) {
40034011
"CREATE CHANGEFEED FOR table_a, table_b INTO 'external://nope'",
40044012
)
40054013
userDB.ExpectErr(t,
4006-
`user "user1" requires the CHANGEFEED privilege on the target database to be able to run an enterprise changefeed`,
4014+
`database-level changefeed is not implemented yet`,
40074015
"CREATE CHANGEFEED FOR DATABASE defaultdb INTO 'kafka://nope'",
40084016
)
40094017
})
@@ -4014,7 +4022,7 @@ func TestChangefeedCreateAuthorizationWithChangefeedPriv(t *testing.T) {
40144022
"CREATE CHANGEFEED FOR table_a, table_b INTO 'external://nope'",
40154023
)
40164024
userDB.ExpectErr(t,
4017-
`user "user1" requires the CHANGEFEED privilege on the target database to be able to run an enterprise changefeed`,
4025+
`database-level changefeed is not implemented yet`,
40184026
"CREATE CHANGEFEED FOR DATABASE defaultdb INTO 'kafka://nope'",
40194027
)
40204028
})
@@ -4024,7 +4032,7 @@ func TestChangefeedCreateAuthorizationWithChangefeedPriv(t *testing.T) {
40244032
"CREATE CHANGEFEED FOR table_a, table_b INTO 'external://nope'",
40254033
)
40264034
userDB.ExpectErr(t,
4027-
`user "user1" requires the CHANGEFEED privilege on the target database to be able to run an enterprise changefeed`,
4035+
`database-level changefeed is not implemented yet`,
40284036
"CREATE CHANGEFEED FOR DATABASE defaultdb INTO 'kafka://nope'",
40294037
)
40304038
})
@@ -4038,7 +4046,7 @@ func TestChangefeedCreateAuthorizationWithChangefeedPriv(t *testing.T) {
40384046
"CREATE CHANGEFEED FOR table_a, table_b INTO 'kafka://nope'",
40394047
)
40404048
userDB.ExpectErr(t,
4041-
"pq: the CHANGEFEED privilege on the target database can only be used with external connection sinks",
4049+
"database-level changefeed is not implemented yet",
40424050
"CREATE CHANGEFEED FOR DATABASE defaultdb INTO 'kafka://nope'",
40434051
)
40444052
})
@@ -4049,7 +4057,8 @@ func TestChangefeedCreateAuthorizationWithChangefeedPriv(t *testing.T) {
40494057
)
40504058
})
40514059
withUser(t, "user1", func(userDB *sqlutils.SQLRunner) {
4052-
userDB.Exec(t,
4060+
userDB.ExpectErr(t,
4061+
"database-level changefeed is not implemented yet",
40534062
"CREATE CHANGEFEED FOR DATABASE defaultdb INTO 'external://nope'",
40544063
)
40554064
})
@@ -12526,6 +12535,7 @@ func TestChangefeedBareFullProtobuf(t *testing.T) {
1252612535
func TestDatabaseRenameDuringDatabaseLevelChangefeed(t *testing.T) {
1252712536
defer leaktest.AfterTest(t)()
1252812537
defer log.Scope(t).Close(t)
12538+
skip.WithIssue(t, 154053, "unreleased feature")
1252912539

1253012540
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
1253112541
sqlDB := sqlutils.MakeSQLRunner(s.DB)
@@ -12554,6 +12564,7 @@ func TestDatabaseRenameDuringDatabaseLevelChangefeed(t *testing.T) {
1255412564
func TestTableRenameDuringDatabaseLevelChangefeed(t *testing.T) {
1255512565
defer leaktest.AfterTest(t)()
1255612566
defer log.Scope(t).Close(t)
12567+
skip.WithIssue(t, 154053, "unreleased feature")
1255712568

1255812569
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
1255912570
sqlDB := sqlutils.MakeSQLRunner(s.DB)
@@ -12609,3 +12620,26 @@ func TestCreateTableLevelChangefeedWithDBPrivilege(t *testing.T) {
1260912620
}
1261012621
cdcTest(t, testFn, feedTestEnterpriseSinks)
1261112622
}
12623+
12624+
func TestDatabaseLevelChangefeedUnimplemented(t *testing.T) {
12625+
defer leaktest.AfterTest(t)()
12626+
defer log.Scope(t).Close(t)
12627+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
12628+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
12629+
sqlDB.Exec(t, `CREATE TABLE bar (id INT PRIMARY KEY);`)
12630+
sqlDB.Exec(t, `INSERT INTO bar VALUES (1);`)
12631+
expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d`,
12632+
`database-level changefeed is not implemented yet`)
12633+
expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d WITH diff`,
12634+
`database-level changefeed is not implemented yet`)
12635+
expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo`,
12636+
`database-level changefeed is not implemented yet`)
12637+
expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo`,
12638+
`database-level changefeed is not implemented yet`)
12639+
expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE system`,
12640+
`database-level changefeed is not implemented yet`)
12641+
expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE doesnotexist`,
12642+
`database-level changefeed is not implemented yet`)
12643+
}
12644+
cdcTest(t, testFn, feedTestEnterpriseSinks)
12645+
}

0 commit comments

Comments
 (0)