Skip to content

Commit bff6d51

Browse files
committed
changefeedccl: implement db-level changefeed EXCLUDE TABLES syntax
The syntax for EXCLUDE TABLES will be CREATE CHANGEFEED FOR DATABASE FOO EXCLUDE TABLES fizz,buzz; Resolves: #147423 Release note: none
1 parent 5154683 commit bff6d51

File tree

6 files changed

+108
-15
lines changed

6 files changed

+108
-15
lines changed

docs/generated/sql/bnf/create_changefeed_stmt.bnf

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ create_changefeed_stmt ::=
44
| 'CREATE' 'CHANGEFEED' 'FOR' changefeed_table_target ( ( ',' changefeed_table_target ) )* 'INTO' sink 'WITH' option '=' value ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
55
| 'CREATE' 'CHANGEFEED' 'FOR' changefeed_table_target ( ( ',' changefeed_table_target ) )* 'INTO' sink 'WITH' option ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
66
| 'CREATE' 'CHANGEFEED' 'FOR' changefeed_table_target ( ( ',' changefeed_table_target ) )* 'INTO' sink
7-
| 'CREATE_CHANGEFEED_FOR_DATABASE' 'CHANGEFEED' 'FOR' 'DATABASE' database_option 'INTO' sink 'WITH' option '=' value ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
8-
| 'CREATE_CHANGEFEED_FOR_DATABASE' 'CHANGEFEED' 'FOR' 'DATABASE' database_option 'INTO' sink 'WITH' option ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
9-
| 'CREATE_CHANGEFEED_FOR_DATABASE' 'CHANGEFEED' 'FOR' 'DATABASE' database_option 'INTO' sink 'WITH' option '=' value ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
10-
| 'CREATE_CHANGEFEED_FOR_DATABASE' 'CHANGEFEED' 'FOR' 'DATABASE' database_option 'INTO' sink 'WITH' option ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
11-
| 'CREATE_CHANGEFEED_FOR_DATABASE' 'CHANGEFEED' 'FOR' 'DATABASE' database_option 'INTO' sink
7+
| 'CREATE_CHANGEFEED_FOR_DATABASE' 'CHANGEFEED' 'FOR' 'DATABASE' database_option db_level_changefeed_filter_option 'INTO' sink 'WITH' option '=' value ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
8+
| 'CREATE_CHANGEFEED_FOR_DATABASE' 'CHANGEFEED' 'FOR' 'DATABASE' database_option db_level_changefeed_filter_option 'INTO' sink 'WITH' option ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
9+
| 'CREATE_CHANGEFEED_FOR_DATABASE' 'CHANGEFEED' 'FOR' 'DATABASE' database_option db_level_changefeed_filter_option 'INTO' sink 'WITH' option '=' value ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
10+
| 'CREATE_CHANGEFEED_FOR_DATABASE' 'CHANGEFEED' 'FOR' 'DATABASE' database_option db_level_changefeed_filter_option 'INTO' sink 'WITH' option ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
11+
| 'CREATE_CHANGEFEED_FOR_DATABASE' 'CHANGEFEED' 'FOR' 'DATABASE' database_option db_level_changefeed_filter_option 'INTO' sink
1212
| 'CREATE' 'CHANGEFEED' 'INTO' sink 'WITH' option '=' value ( ( ',' ( option '=' value | option | option '=' value | option ) ) )* 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause
1313
| 'CREATE' 'CHANGEFEED' 'INTO' sink 'WITH' option ( ( ',' ( option '=' value | option | option '=' value | option ) ) )* 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause
1414
| 'CREATE' 'CHANGEFEED' 'INTO' sink 'WITH' option '=' value ( ( ',' ( option '=' value | option | option '=' value | option ) ) )* 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause

docs/generated/sql/bnf/stmt_block.bnf

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -643,7 +643,7 @@ create_stats_stmt ::=
643643

644644
create_changefeed_stmt ::=
645645
'CREATE' 'CHANGEFEED' 'FOR' changefeed_table_targets opt_changefeed_sink opt_with_options
646-
| 'CREATE_CHANGEFEED_FOR_DATABASE' 'CHANGEFEED' 'FOR' 'DATABASE' database_name opt_changefeed_sink opt_with_options
646+
| 'CREATE_CHANGEFEED_FOR_DATABASE' 'CHANGEFEED' 'FOR' 'DATABASE' database_name db_level_changefeed_filter_option opt_changefeed_sink opt_with_options
647647
| 'CREATE' 'CHANGEFEED' opt_changefeed_sink opt_with_options 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause
648648

649649
create_extension_stmt ::=
@@ -1951,6 +1951,10 @@ changefeed_table_targets ::=
19511951
opt_changefeed_sink ::=
19521952
'INTO' string_or_placeholder
19531953

1954+
db_level_changefeed_filter_option ::=
1955+
'EXCLUDE' 'TABLES' table_name_list
1956+
|
1957+
19541958
target_list ::=
19551959
( target_elem ) ( ( ',' target_elem ) )*
19561960

@@ -2856,6 +2860,9 @@ create_stats_option_list ::=
28562860
changefeed_table_target ::=
28572861
opt_table_prefix table_name opt_changefeed_family
28582862

2863+
table_name_list ::=
2864+
db_object_name_list
2865+
28592866
target_elem ::=
28602867
a_expr 'AS' target_name
28612868
| a_expr bare_col_label
@@ -2940,9 +2947,6 @@ row_or_rows ::=
29402947
table_index_name_list ::=
29412948
( table_index_name ) ( ( ',' table_index_name ) )*
29422949

2943-
table_name_list ::=
2944-
db_object_name_list
2945-
29462950
view_name_list ::=
29472951
db_object_name_list
29482952

@@ -3533,6 +3537,9 @@ opt_changefeed_family ::=
35333537
'FAMILY' family_name
35343538
|
35353539

3540+
db_object_name_list ::=
3541+
( db_object_name ) ( ( ',' db_object_name ) )*
3542+
35363543
target_name ::=
35373544
unrestricted_name
35383545

@@ -3606,9 +3613,6 @@ only_signed_fconst ::=
36063613
'+' 'FCONST'
36073614
| '-' 'FCONST'
36083615

3609-
db_object_name_list ::=
3610-
( db_object_name ) ( ( ',' db_object_name ) )*
3611-
36123616
inspect_option ::=
36133617
'INDEX' 'ALL'
36143618
| 'INDEX' '(' table_index_name_list ')'

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,35 @@ func TestDatabaseLevelChangefeedBasics(t *testing.T) {
224224
cdcTest(t, testFn)
225225
}
226226

227+
func TestDatabaseLevelChangefeedWithFilter(t *testing.T) {
228+
defer leaktest.AfterTest(t)()
229+
defer log.Scope(t).Close(t)
230+
231+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
232+
expectSuccess := func(stmt string) {
233+
successfulFeed := feed(t, f, stmt)
234+
defer closeFeed(t, successfulFeed)
235+
_, err := successfulFeed.Next()
236+
require.NoError(t, err)
237+
}
238+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
239+
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
240+
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
241+
sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`)
242+
sqlDB.Exec(t, `CREATE TABLE foo2 (a INT PRIMARY KEY, b STRING)`)
243+
sqlDB.Exec(t, `INSERT INTO foo2 VALUES (0, 'initial')`)
244+
sqlDB.Exec(t, `UPSERT INTO foo2 VALUES (0, 'updated')`)
245+
246+
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo`)
247+
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo,foo2`)
248+
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo.bar.fizz, foo.foo2, foo`)
249+
expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo.*`,
250+
`at or near "*": syntax error`)
251+
// TODO(#147421): Assert payload once the filter works
252+
}
253+
cdcTest(t, testFn, feedTestEnterpriseSinks)
254+
}
255+
227256
func TestChangefeedBasicQuery(t *testing.T) {
228257
defer leaktest.AfterTest(t)()
229258
defer log.Scope(t).Close(t)

pkg/sql/parser/sql.y

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -959,6 +959,13 @@ func (u *sqlSymUnion) doBlockOptions() tree.DoBlockOptions {
959959
func (u *sqlSymUnion) doBlockOption() tree.DoBlockOption {
960960
return u.val.(tree.DoBlockOption)
961961
}
962+
func (u *sqlSymUnion) changefeedFilterOption() *tree.ChangefeedFilterOption {
963+
if filterOption, ok := u.val.(*tree.ChangefeedFilterOption); ok {
964+
return filterOption
965+
}
966+
return nil
967+
}
968+
962969
%}
963970

964971
// NB: the %token definitions must come before the %type definitions in this
@@ -1592,6 +1599,7 @@ func (u *sqlSymUnion) doBlockOption() tree.DoBlockOption {
15921599
%type <tree.ReturningClause> returning_clause
15931600
%type <tree.TableExprs> opt_using_clause
15941601
%type <tree.RefreshDataOption> opt_clear_data
1602+
%type <*tree.ChangefeedFilterOption> db_level_changefeed_filter_option
15951603

15961604
%type <tree.BatchParam> batch_param
15971605
%type <[]tree.BatchParam> batch_param_list
@@ -6257,12 +6265,13 @@ create_changefeed_stmt:
62576265
Level: tree.ChangefeedLevelTable,
62586266
}
62596267
}
6260-
| CREATE_CHANGEFEED_FOR_DATABASE CHANGEFEED FOR DATABASE database_name opt_changefeed_sink opt_with_options
6268+
| CREATE_CHANGEFEED_FOR_DATABASE CHANGEFEED FOR DATABASE database_name db_level_changefeed_filter_option opt_changefeed_sink opt_with_options
62616269
{
62626270
$$.val = &tree.CreateChangefeed{
62636271
DatabaseTarget: tree.ChangefeedDatabaseTarget($5),
6264-
SinkURI: $6.expr(),
6265-
Options: $7.kvOptions(),
6272+
FilterOption: $6.changefeedFilterOption(),
6273+
SinkURI: $7.expr(),
6274+
Options: $8.kvOptions(),
62666275
Level: tree.ChangefeedLevelDatabase,
62676276
}
62686277
}
@@ -6487,6 +6496,16 @@ opt_using_clause:
64876496
$$.val = tree.TableExprs{}
64886497
}
64896498

6499+
db_level_changefeed_filter_option:
6500+
EXCLUDE TABLES table_name_list
6501+
{
6502+
$$.val = &tree.ChangefeedFilterOption{Tables: $3.tableNames(), FilterType: tree.ExcludeFilter}
6503+
}
6504+
| /* EMPTY */
6505+
{
6506+
$$.val = nil
6507+
}
6508+
64906509

64916510
// %Help: DISCARD - reset the session to its initial state
64926511
// %Category: Cfg

pkg/sql/parser/testdata/changefeed

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,3 +178,21 @@ at or near "into": syntax error
178178
DETAIL: source SQL:
179179
CREATE CHANGEFEED FOR database INTO 'foo'
180180
^
181+
182+
parse
183+
CREATE CHANGEFEED FOR DATABASE d1 EXCLUDE TABLES bar INTO 'foo'
184+
----
185+
CREATE CHANGEFEED FOR DATABASE d1 EXCLUDE TABLES bar INTO '*****' -- normalized!
186+
CREATE CHANGEFEED FOR DATABASE d1 EXCLUDE TABLES bar INTO ('*****') -- fully parenthesized
187+
CREATE CHANGEFEED FOR DATABASE d1 EXCLUDE TABLES bar INTO '_' -- literals removed
188+
CREATE CHANGEFEED FOR DATABASE _ EXCLUDE TABLES _ INTO '*****' -- identifiers removed
189+
CREATE CHANGEFEED FOR DATABASE d1 EXCLUDE TABLES bar INTO 'foo' -- passwords exposed
190+
191+
parse
192+
CREATE CHANGEFEED FOR DATABASE d1 EXCLUDE TABLES foo,bar INTO 'foo'
193+
----
194+
CREATE CHANGEFEED FOR DATABASE d1 EXCLUDE TABLES foo, bar INTO '*****' -- normalized!
195+
CREATE CHANGEFEED FOR DATABASE d1 EXCLUDE TABLES foo, bar INTO ('*****') -- fully parenthesized
196+
CREATE CHANGEFEED FOR DATABASE d1 EXCLUDE TABLES foo, bar INTO '_' -- literals removed
197+
CREATE CHANGEFEED FOR DATABASE _ EXCLUDE TABLES _, _ INTO '*****' -- identifiers removed
198+
CREATE CHANGEFEED FOR DATABASE d1 EXCLUDE TABLES foo, bar INTO 'foo' -- passwords exposed

pkg/sql/sem/tree/changefeed.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,32 @@ const (
2121
ChangefeedLevelDatabase
2222
)
2323

24+
type FilterType int
25+
26+
// Needs to be the same as
27+
const (
28+
ExcludeFilter FilterType = iota
29+
)
30+
31+
type ChangefeedFilterOption struct {
32+
Tables TableNames
33+
FilterType FilterType
34+
}
35+
2436
func (l ChangefeedLevel) String() string {
2537
return []string{"TABLE", "DATABASE"}[l]
2638
}
2739

40+
func (l FilterType) String() string {
41+
return []string{"EXCLUDE"}[l]
42+
}
43+
2844
// CreateChangefeed represents a CREATE CHANGEFEED statement.
2945
type CreateChangefeed struct {
3046
TableTargets ChangefeedTableTargets
3147
DatabaseTarget ChangefeedDatabaseTarget
3248
Level ChangefeedLevel
49+
FilterOption *ChangefeedFilterOption
3350
SinkURI Expr
3451
Options KVOptions
3552
Select *SelectClause
@@ -56,6 +73,12 @@ func (node *CreateChangefeed) Format(ctx *FmtCtx) {
5673
ctx.FormatNode(&node.TableTargets)
5774
} else {
5875
ctx.FormatNode(&node.DatabaseTarget)
76+
if node.FilterOption != nil {
77+
ctx.WriteString(" ")
78+
ctx.WriteString(node.FilterOption.FilterType.String())
79+
ctx.WriteString(" TABLES ")
80+
ctx.FormatNode(&node.FilterOption.Tables)
81+
}
5982
}
6083
if node.SinkURI != nil {
6184
ctx.WriteString(" INTO ")

0 commit comments

Comments
 (0)