Skip to content

Commit 4f28164

Browse files
committed
changefeedccl: implement INCLUDE TABLES syntax in DB-level feeds
The syntax for INCLUDE TABLES will be CREATE CHANGEFEED FOR DATABASE FOO EXCLUDE TABLES fizz,buzz; Resolves #147420 Release note: None
1 parent c931ab6 commit 4f28164

File tree

5 files changed

+55
-3
lines changed

5 files changed

+55
-3
lines changed

docs/generated/sql/bnf/stmt_block.bnf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1953,6 +1953,7 @@ opt_changefeed_sink ::=
19531953

19541954
db_level_changefeed_filter_option ::=
19551955
'EXCLUDE' 'TABLES' table_name_list
1956+
| 'INCLUDE' 'TABLES' table_name_list
19561957
|
19571958

19581959
target_list ::=

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,36 @@ func TestDatabaseLevelChangefeedBasics(t *testing.T) {
224224
cdcTest(t, testFn)
225225
}
226226

227-
func TestDatabaseLevelChangefeedWithFilter(t *testing.T) {
227+
func TestDatabaseLevelChangefeedWithIncludeFilter(t *testing.T) {
228+
defer leaktest.AfterTest(t)()
229+
defer log.Scope(t).Close(t)
230+
231+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
232+
expectSuccess := func(stmt string) {
233+
successfulFeed := feed(t, f, stmt)
234+
defer closeFeed(t, successfulFeed)
235+
_, err := successfulFeed.Next()
236+
require.NoError(t, err)
237+
}
238+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
239+
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
240+
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
241+
sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`)
242+
sqlDB.Exec(t, `CREATE TABLE foo2 (a INT PRIMARY KEY, b STRING)`)
243+
sqlDB.Exec(t, `INSERT INTO foo2 VALUES (0, 'initial')`)
244+
sqlDB.Exec(t, `UPSERT INTO foo2 VALUES (0, 'updated')`)
245+
246+
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo`)
247+
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo,foo2`)
248+
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo.bar.fizz, foo.foo2, foo`)
249+
expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo.*`,
250+
`at or near "*": syntax error`)
251+
// TODO(#147421): Assert payload once the filter works
252+
}
253+
cdcTest(t, testFn, feedTestEnterpriseSinks)
254+
}
255+
256+
func TestDatabaseLevelChangefeedWithExcludeFilter(t *testing.T) {
228257
defer leaktest.AfterTest(t)()
229258
defer log.Scope(t).Close(t)
230259

pkg/sql/parser/sql.y

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6501,6 +6501,10 @@ db_level_changefeed_filter_option:
65016501
{
65026502
$$.val = &tree.ChangefeedFilterOption{Tables: $3.tableNames(), FilterType: tree.ExcludeFilter}
65036503
}
6504+
| INCLUDE TABLES table_name_list
6505+
{
6506+
$$.val = &tree.ChangefeedFilterOption{Tables: $3.tableNames(), FilterType: tree.IncludeFilter}
6507+
}
65046508
| /* EMPTY */
65056509
{
65066510
$$.val = nil

pkg/sql/parser/testdata/changefeed

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,3 +196,21 @@ CREATE CHANGEFEED FOR DATABASE d1 EXCLUDE TABLES foo, bar INTO ('*****') -- full
196196
CREATE CHANGEFEED FOR DATABASE d1 EXCLUDE TABLES foo, bar INTO '_' -- literals removed
197197
CREATE CHANGEFEED FOR DATABASE _ EXCLUDE TABLES _, _ INTO '*****' -- identifiers removed
198198
CREATE CHANGEFEED FOR DATABASE d1 EXCLUDE TABLES foo, bar INTO 'foo' -- passwords exposed
199+
200+
parse
201+
CREATE CHANGEFEED FOR DATABASE d1 INCLUDE TABLES bar INTO 'foo'
202+
----
203+
CREATE CHANGEFEED FOR DATABASE d1 INCLUDE TABLES bar INTO '*****' -- normalized!
204+
CREATE CHANGEFEED FOR DATABASE d1 INCLUDE TABLES bar INTO ('*****') -- fully parenthesized
205+
CREATE CHANGEFEED FOR DATABASE d1 INCLUDE TABLES bar INTO '_' -- literals removed
206+
CREATE CHANGEFEED FOR DATABASE _ INCLUDE TABLES _ INTO '*****' -- identifiers removed
207+
CREATE CHANGEFEED FOR DATABASE d1 INCLUDE TABLES bar INTO 'foo' -- passwords exposed
208+
209+
parse
210+
CREATE CHANGEFEED FOR DATABASE d1 INCLUDE TABLES foo,bar INTO 'foo'
211+
----
212+
CREATE CHANGEFEED FOR DATABASE d1 INCLUDE TABLES foo, bar INTO '*****' -- normalized!
213+
CREATE CHANGEFEED FOR DATABASE d1 INCLUDE TABLES foo, bar INTO ('*****') -- fully parenthesized
214+
CREATE CHANGEFEED FOR DATABASE d1 INCLUDE TABLES foo, bar INTO '_' -- literals removed
215+
CREATE CHANGEFEED FOR DATABASE _ INCLUDE TABLES _, _ INTO '*****' -- identifiers removed
216+
CREATE CHANGEFEED FOR DATABASE d1 INCLUDE TABLES foo, bar INTO 'foo' -- passwords exposed

pkg/sql/sem/tree/changefeed.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ const (
2323

2424
type FilterType int
2525

26-
// Needs to be the same as
2726
const (
2827
ExcludeFilter FilterType = iota
28+
IncludeFilter
2929
)
3030

3131
type ChangefeedFilterOption struct {
@@ -38,7 +38,7 @@ func (l ChangefeedLevel) String() string {
3838
}
3939

4040
func (l FilterType) String() string {
41-
return []string{"EXCLUDE"}[l]
41+
return []string{"EXCLUDE", "INCLUDE"}[l]
4242
}
4343

4444
// CreateChangefeed represents a CREATE CHANGEFEED statement.

0 commit comments

Comments
 (0)