Skip to content

Commit 2bfd8c3

Browse files
committed
changefeedccl: support INCLUDE TABLES filter for DB-level changefeed
Support the ability to watch specific tables in a database-level feed. An example usage is: CREATE DATABASE FOR CHANGEFEED defaultdb INCLUDE TABLES foo,fizz.buzz,defaultdb.bar.tab,notdefaultdb.bar.tab; The table names above in the INCLUDE TABLES list will be resolved as such: Non-qualified table name foo: - defaultdb.public.foo Partially-qualified table name fizz.buzz: - defaultdb.fizz.buzz Fully-qualified table name defaultdb.bar.tab: - defaultdb.bar.tab Fully-qualified table name notdefaultdb.bar.tab: - Statement error; changefeed job will not be created Note that partially qualified table names in the form <database>.<table> are not supported. Partially qualfied table names are always assumed to be <schema>.<table>. Resolves: #147424 Release note: none
1 parent ffca318 commit 2bfd8c3

File tree

6 files changed

+178
-74
lines changed

6 files changed

+178
-74
lines changed

pkg/ccl/changefeedccl/changefeed.go

Lines changed: 82 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1616
"github.com/cockroachdb/cockroach/pkg/sql"
1717
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
18+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1819
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
1920
"github.com/cockroachdb/cockroach/pkg/sql/isql"
21+
"github.com/cockroachdb/cockroach/pkg/sql/parser"
2022
"github.com/cockroachdb/cockroach/pkg/sql/protoreflect"
23+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2124
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2225
"github.com/cockroachdb/cockroach/pkg/util/log"
2326
"github.com/cockroachdb/cockroach/pkg/util/tracing"
@@ -109,7 +112,9 @@ func AllTargets(
109112
func getTargetsFromDatabaseSpec(
110113
ctx context.Context, ts jobspb.ChangefeedTargetSpecification, execCfg *sql.ExecutorConfig,
111114
) (targets changefeedbase.Targets, err error) {
112-
err = sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, descs *descs.Collection) error {
115+
err = sql.DescsTxn(ctx, execCfg, func(
116+
ctx context.Context, txn isql.Txn, descs *descs.Collection,
117+
) error {
113118
databaseDescriptor, err := descs.ByIDWithLeased(txn.KV()).Get().Database(ctx, ts.DescID)
114119
if err != nil {
115120
return err
@@ -120,16 +125,23 @@ func getTargetsFromDatabaseSpec(
120125
return err
121126
}
122127
dbName := databaseDescriptor.GetName()
123-
for _, desc := range tables.OrderedDescriptors() {
124-
tableDesc, ok := desc.(catalog.TableDescriptor)
125-
if !ok {
126-
return errors.AssertionFailedf("expected table descriptor, got %T", desc)
127-
}
128-
// Skip virtual tables
129-
if !tableDesc.IsPhysicalTable() {
130-
continue
131-
}
132-
if ts.FilterList != nil && ts.FilterList.FilterType == jobspb.FilterList_EXCLUDE_TABLES {
128+
// DB-level feeds should have a filter list, even if the table list is empty.
129+
// By default, it would be an ExcludeFilter with an empty table list.
130+
if ts.FilterList == nil {
131+
return errors.AssertionFailedf("filter list is nil")
132+
}
133+
switch ts.FilterList.FilterType {
134+
case tree.ExcludeFilter:
135+
for _, desc := range tables.OrderedDescriptors() {
136+
tableDesc, ok := desc.(catalog.TableDescriptor)
137+
if !ok {
138+
return errors.AssertionFailedf("expected table descriptor, got %T", desc)
139+
}
140+
// Skip virtual tables
141+
if !tableDesc.IsPhysicalTable() {
142+
continue
143+
}
144+
133145
if _, ok := tableDescToSchemaName[tableDesc]; !ok {
134146
schemaID := tableDesc.GetParentSchemaID()
135147
schema, err := descs.ByIDWithLeased(txn.KV()).Get().Schema(ctx, schemaID)
@@ -138,23 +150,70 @@ func getTargetsFromDatabaseSpec(
138150
}
139151
tableDescToSchemaName[tableDesc] = schema.GetName()
140152
}
141-
fullyQualifiedTableName := fmt.Sprintf("%s.%s.%s", dbName, tableDescToSchemaName[tableDesc], tableDesc.GetName())
153+
fullyQualifiedTableName := fmt.Sprintf(
154+
"%s.%s.%s", dbName, tableDescToSchemaName[tableDesc], tableDesc.GetName())
142155
if _, ok := ts.FilterList.Tables[fullyQualifiedTableName]; ok {
143156
continue
144157
}
158+
159+
var tableType jobspb.ChangefeedTargetSpecification_TargetType
160+
if len(tableDesc.GetFamilies()) == 1 {
161+
tableType = jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY
162+
} else {
163+
tableType = jobspb.ChangefeedTargetSpecification_EACH_FAMILY
164+
}
165+
166+
targets.Add(changefeedbase.Target{
167+
Type: tableType,
168+
DescID: desc.GetID(),
169+
StatementTimeName: changefeedbase.StatementTimeName(desc.GetName()),
170+
})
145171
}
146-
var tableType jobspb.ChangefeedTargetSpecification_TargetType
147-
if len(tableDesc.GetFamilies()) == 1 {
148-
tableType = jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY
149-
} else {
150-
tableType = jobspb.ChangefeedTargetSpecification_EACH_FAMILY
151-
}
172+
case tree.IncludeFilter:
173+
for name := range ts.FilterList.Tables {
174+
tn, err := parser.ParseTableName(name)
175+
if err != nil {
176+
return err
177+
}
152178

153-
targets.Add(changefeedbase.Target{
154-
Type: tableType,
155-
DescID: desc.GetID(),
156-
StatementTimeName: changefeedbase.StatementTimeName(desc.GetName()),
157-
})
179+
schemaID, err := descs.LookupSchemaID(ctx, txn.KV(), ts.DescID, tn.Schema())
180+
if err != nil {
181+
return err
182+
}
183+
// Schema is not found in the database.
184+
if schemaID == descpb.InvalidID {
185+
continue
186+
}
187+
188+
tableID, err := descs.LookupObjectID(ctx, txn.KV(), ts.DescID, schemaID, tn.Object())
189+
if err != nil {
190+
return err
191+
}
192+
// Table is not found in the database.
193+
if tableID == descpb.InvalidID {
194+
continue
195+
}
196+
197+
desc, err := descs.ByIDWithLeased(txn.KV()).Get().Table(ctx, tableID)
198+
if err != nil {
199+
return err
200+
}
201+
202+
var tableType jobspb.ChangefeedTargetSpecification_TargetType
203+
if len(desc.GetFamilies()) == 1 {
204+
tableType = jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY
205+
} else {
206+
tableType = jobspb.ChangefeedTargetSpecification_EACH_FAMILY
207+
}
208+
209+
targets.Add(changefeedbase.Target{
210+
Type: tableType,
211+
DescID: tableID,
212+
StatementTimeName: changefeedbase.StatementTimeName(desc.GetName()),
213+
})
214+
}
215+
default:
216+
return errors.AssertionFailedf("invalid changefeed filter type")
158217
}
159218
return nil
160219
})

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -637,15 +637,14 @@ func createChangefeedJobRecord(
637637
if targetDatabaseDesc.GetID() == keys.SystemDatabaseID {
638638
return nil, changefeedbase.Targets{}, errors.Errorf("changefeed cannot target the system database")
639639
}
640-
if changefeedStmt.FilterOption != nil {
641-
fqTableNames, err := getFullyQualifiedTableNames(
642-
targetDatabaseDesc.GetName(), changefeedStmt.FilterOption.Tables,
643-
)
644-
if err != nil {
645-
return nil, changefeedbase.Targets{}, err
646-
}
647-
changefeedStmt.FilterOption.Tables = fqTableNames
640+
fqTableNames, err := getFullyQualifiedTableNames(
641+
targetDatabaseDesc.GetName(), changefeedStmt.FilterOption.Tables,
642+
)
643+
if err != nil {
644+
return nil, changefeedbase.Targets{}, err
648645
}
646+
changefeedStmt.FilterOption.Tables = fqTableNames
647+
649648
targetSpec := getDatabaseTargetSpec(targetDatabaseDesc, changefeedStmt.FilterOption)
650649
details = jobspb.ChangefeedDetails{
651650
TargetSpecifications: []jobspb.ChangefeedTargetSpecification{targetSpec},
@@ -1158,22 +1157,20 @@ func getTargetsAndTables(
11581157
}
11591158

11601159
func getDatabaseTargetSpec(
1161-
targetDatabaseDesc catalog.DatabaseDescriptor, filterOpt *tree.ChangefeedFilterOption,
1160+
targetDatabaseDesc catalog.DatabaseDescriptor, filterOpt tree.ChangefeedFilterOption,
11621161
) jobspb.ChangefeedTargetSpecification {
11631162
target := jobspb.ChangefeedTargetSpecification{
11641163
DescID: targetDatabaseDesc.GetID(),
11651164
Type: jobspb.ChangefeedTargetSpecification_DATABASE,
11661165
StatementTimeName: targetDatabaseDesc.GetName(),
11671166
}
1168-
if filterOpt != nil {
1169-
filterTables := make(map[string]pbtypes.Empty)
1170-
for _, table := range filterOpt.Tables {
1171-
filterTables[table.FQString()] = pbtypes.Empty{}
1172-
}
1173-
target.FilterList = &jobspb.FilterList{
1174-
FilterType: jobspb.FilterList_FilterType(filterOpt.FilterType),
1175-
Tables: filterTables,
1176-
}
1167+
filterTables := make(map[string]pbtypes.Empty)
1168+
for _, table := range filterOpt.Tables {
1169+
filterTables[table.FQString()] = pbtypes.Empty{}
1170+
}
1171+
target.FilterList = &jobspb.FilterList{
1172+
FilterType: filterOpt.FilterType,
1173+
Tables: filterTables,
11771174
}
11781175
return target
11791176
}

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 72 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -229,26 +229,78 @@ func TestDatabaseLevelChangefeedWithIncludeFilter(t *testing.T) {
229229
defer log.Scope(t).Close(t)
230230

231231
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
232-
expectSuccess := func(stmt string) {
233-
successfulFeed := feed(t, f, stmt)
234-
defer closeFeed(t, successfulFeed)
235-
_, err := successfulFeed.Next()
236-
require.NoError(t, err)
237-
}
238232
sqlDB := sqlutils.MakeSQLRunner(s.DB)
239-
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
240-
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
241-
sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`)
242-
sqlDB.Exec(t, `CREATE TABLE foo2 (a INT PRIMARY KEY, b STRING)`)
243-
sqlDB.Exec(t, `INSERT INTO foo2 VALUES (0, 'initial')`)
244-
sqlDB.Exec(t, `UPSERT INTO foo2 VALUES (0, 'updated')`)
233+
for i := range 4 {
234+
name := fmt.Sprintf("foo%d", i+1)
235+
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s (a INT PRIMARY KEY, b STRING)`, name))
236+
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (0, 'initial')`, name))
237+
sqlDB.Exec(t, fmt.Sprintf(`UPSERT INTO %s VALUES (0, 'updated')`, name))
238+
}
239+
240+
sqlDB.Exec(t, `CREATE SCHEMA private`)
241+
sqlDB.Exec(t, `CREATE TABLE private.foo1 (a INT PRIMARY KEY, b STRING)`)
242+
sqlDB.Exec(t, `INSERT INTO private.foo1 VALUES (0, 'initial')`)
243+
// Test that if there are multiple tables with the same name the correct
244+
// one will still be found using the default schema of public.
245+
feed1 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1`)
246+
defer closeFeed(t, feed1)
247+
assertPayloads(t, feed1, []string{
248+
`foo1: [0]->{"after": {"a": 0, "b": "updated"}}`,
249+
})
250+
251+
// Test that we can include multiple tables.
252+
feed2 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1,foo2`)
253+
defer closeFeed(t, feed2)
254+
assertPayloads(t, feed2, []string{
255+
`foo1: [0]->{"after": {"a": 0, "b": "updated"}}`,
256+
`foo2: [0]->{"after": {"a": 0, "b": "updated"}}`,
257+
})
245258

246-
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo`)
247-
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo,foo2`)
248-
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES d.bar.fizz, foo.foo2, foo`)
259+
// Test that we can handle fully qualified, partially qualified, and unqualified
260+
// table names.
261+
feed3 := feed(t, f,
262+
`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES d.public.foo2, public.foo3, foo4`)
263+
defer closeFeed(t, feed3)
264+
assertPayloads(t, feed3, []string{
265+
`foo2: [0]->{"after": {"a": 0, "b": "updated"}}`,
266+
`foo3: [0]->{"after": {"a": 0, "b": "updated"}}`,
267+
`foo4: [0]->{"after": {"a": 0, "b": "updated"}}`,
268+
})
269+
270+
// Test that we can handle tables that don't exist.
271+
feed4 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1,bob`)
272+
defer closeFeed(t, feed4)
273+
assertPayloads(t, feed4, []string{
274+
`foo1: [0]->{"after": {"a": 0, "b": "updated"}}`,
275+
})
276+
277+
// Test that fully qualified table names outside of the target database will
278+
// cause an error.
279+
expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES fizz.buzz.foo`,
280+
`table "fizz.buzz.foo" must be in target database "d"`)
281+
282+
// Table patterns are not supported.
249283
expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo.*`,
250284
`at or near "*": syntax error`)
251-
// TODO(#147421): Assert payload once the filter works
285+
286+
// Test that name resolution is not dependent on search_path() or current DB.
287+
sqlDB.Exec(t, `CREATE DATABASE notd`)
288+
sqlDB.Exec(t, `USE notd`)
289+
sqlDB.Exec(t, `SET search_path TO notpublic`)
290+
feed5 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1, private.foo1`)
291+
defer closeFeed(t, feed5)
292+
assertPayloads(t, feed5, []string{
293+
`foo1: [0]->{"after": {"a": 0, "b": "updated"}}`,
294+
`foo1: [0]->{"after": {"a": 0, "b": "initial"}}`,
295+
})
296+
297+
// Test that partially qualified table names are always assumed to be
298+
// <schema>.<table>.
299+
feed6 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES d.foo1, public.foo2`)
300+
defer closeFeed(t, feed6)
301+
assertPayloads(t, feed6, []string{
302+
`foo2: [0]->{"after": {"a": 0, "b": "updated"}}`,
303+
})
252304
}
253305
cdcTest(t, testFn, feedTestEnterpriseSinks)
254306
}
@@ -298,8 +350,10 @@ func TestDatabaseLevelChangefeedWithExcludeFilter(t *testing.T) {
298350
`foo4: [0]->{"after": {"a": 0, "b": "updated"}}`,
299351
})
300352

301-
// Test that we can handle fully qualfied, partially qualified, and unqualified table names.
302-
feed4 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES d.public.foo2, public.foo3, foo4`)
353+
// Test that we can handle fully qualified, partially qualified, and unqualified
354+
// table names.
355+
feed4 := feed(t, f,
356+
`CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES d.public.foo2, public.foo3, foo4`)
303357
defer closeFeed(t, feed4)
304358
assertPayloads(t, feed4, []string{
305359
`foo1: [0]->{"after": {"a": 0, "b": "initial"}}`,

pkg/jobs/jobspb/jobs.proto

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1113,10 +1113,7 @@ message ChangefeedTargetSpecification {
11131113
}
11141114

11151115
message FilterList {
1116-
enum FilterType {
1117-
EXCLUDE_TABLES = 0;
1118-
}
1119-
FilterType filter_type = 1;
1116+
uint32 filter_type = 1 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/sem/tree.FilterType"];
11201117
map<string, google.protobuf.Empty> tables = 2 [(gogoproto.nullable) = false];
11211118
}
11221119

pkg/sql/parser/sql.y

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -965,11 +965,8 @@ func (u *sqlSymUnion) doBlockOptions() tree.DoBlockOptions {
965965
func (u *sqlSymUnion) doBlockOption() tree.DoBlockOption {
966966
return u.val.(tree.DoBlockOption)
967967
}
968-
func (u *sqlSymUnion) changefeedFilterOption() *tree.ChangefeedFilterOption {
969-
if filterOption, ok := u.val.(*tree.ChangefeedFilterOption); ok {
970-
return filterOption
971-
}
972-
return nil
968+
func (u *sqlSymUnion) changefeedFilterOption() tree.ChangefeedFilterOption {
969+
return u.val.(tree.ChangefeedFilterOption)
973970
}
974971

975972
%}
@@ -1610,7 +1607,7 @@ func (u *sqlSymUnion) changefeedFilterOption() *tree.ChangefeedFilterOption {
16101607
%type <tree.ReturningClause> returning_clause
16111608
%type <tree.TableExprs> opt_using_clause
16121609
%type <tree.RefreshDataOption> opt_clear_data
1613-
%type <*tree.ChangefeedFilterOption> db_level_changefeed_filter_option
1610+
%type <tree.ChangefeedFilterOption> db_level_changefeed_filter_option
16141611

16151612
%type <tree.BatchParam> batch_param
16161613
%type <[]tree.BatchParam> batch_param_list
@@ -6510,15 +6507,15 @@ opt_using_clause:
65106507
db_level_changefeed_filter_option:
65116508
EXCLUDE TABLES table_name_list
65126509
{
6513-
$$.val = &tree.ChangefeedFilterOption{Tables: $3.tableNames(), FilterType: tree.ExcludeFilter}
6510+
$$.val = tree.ChangefeedFilterOption{Tables: $3.tableNames(), FilterType: tree.ExcludeFilter}
65146511
}
65156512
| INCLUDE TABLES table_name_list
65166513
{
6517-
$$.val = &tree.ChangefeedFilterOption{Tables: $3.tableNames(), FilterType: tree.IncludeFilter}
6514+
$$.val = tree.ChangefeedFilterOption{Tables: $3.tableNames(), FilterType: tree.IncludeFilter}
65186515
}
65196516
| /* EMPTY */
65206517
{
6521-
$$.val = nil
6518+
$$.val = tree.ChangefeedFilterOption{}
65226519
}
65236520

65246521

pkg/sql/sem/tree/changefeed.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type CreateChangefeed struct {
4646
TableTargets ChangefeedTableTargets
4747
DatabaseTarget ChangefeedDatabaseTarget
4848
Level ChangefeedLevel
49-
FilterOption *ChangefeedFilterOption
49+
FilterOption ChangefeedFilterOption
5050
SinkURI Expr
5151
Options KVOptions
5252
Select *SelectClause
@@ -73,7 +73,7 @@ func (node *CreateChangefeed) Format(ctx *FmtCtx) {
7373
ctx.FormatNode(&node.TableTargets)
7474
} else {
7575
ctx.FormatNode(&node.DatabaseTarget)
76-
if node.FilterOption != nil {
76+
if len(node.FilterOption.Tables) > 0 {
7777
ctx.WriteString(" ")
7878
ctx.WriteString(node.FilterOption.FilterType.String())
7979
ctx.WriteString(" TABLES ")

0 commit comments

Comments
 (0)