Skip to content

Commit 9f71acc

Browse files
craig[bot]KeithCh
andcommitted
Merge #154187
154187: changefeedccl: support INCLUDE TABLES filter for DB-level changefeed r=andyyang890,aerfrei a=KeithCh Support the ability to watch specific tables in a database-level feed. An example usage is: CREATE DATABASE FOR CHANGEFEED defaultdb INCLUDE TABLES foo,fizz.buzz,defaultdb.bar.tab,notdefaultdb.bar.tab; The table names above in the INCLUDE TABLES list will be resolved as such: Non-qualified table name foo: - defaultdb.public.foo Partially-qualified table name fizz.buzz: - defaultdb.fizz.buzz Fully-qualified table name defaultdb.bar.tab: - defaultdb.bar.tab Fully-qualified table name notdefaultdb.bar.tab: - Statement error; changefeed job will not be created Note that partially qualified table names in the form \<database\>.\<table\> are not supported. Partially qualfied table names are always assumed to be \<schema\>.\<table\>. Resolves: #147422 Release note: none Co-authored-by: Keith Chow <[email protected]>
2 parents f01d098 + 2bfd8c3 commit 9f71acc

File tree

6 files changed

+178
-74
lines changed

6 files changed

+178
-74
lines changed

pkg/ccl/changefeedccl/changefeed.go

Lines changed: 82 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1616
"github.com/cockroachdb/cockroach/pkg/sql"
1717
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
18+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1819
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
1920
"github.com/cockroachdb/cockroach/pkg/sql/isql"
21+
"github.com/cockroachdb/cockroach/pkg/sql/parser"
2022
"github.com/cockroachdb/cockroach/pkg/sql/protoreflect"
23+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2124
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2225
"github.com/cockroachdb/cockroach/pkg/util/log"
2326
"github.com/cockroachdb/cockroach/pkg/util/tracing"
@@ -109,7 +112,9 @@ func AllTargets(
109112
func getTargetsFromDatabaseSpec(
110113
ctx context.Context, ts jobspb.ChangefeedTargetSpecification, execCfg *sql.ExecutorConfig,
111114
) (targets changefeedbase.Targets, err error) {
112-
err = sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, descs *descs.Collection) error {
115+
err = sql.DescsTxn(ctx, execCfg, func(
116+
ctx context.Context, txn isql.Txn, descs *descs.Collection,
117+
) error {
113118
databaseDescriptor, err := descs.ByIDWithLeased(txn.KV()).Get().Database(ctx, ts.DescID)
114119
if err != nil {
115120
return err
@@ -120,16 +125,23 @@ func getTargetsFromDatabaseSpec(
120125
return err
121126
}
122127
dbName := databaseDescriptor.GetName()
123-
for _, desc := range tables.OrderedDescriptors() {
124-
tableDesc, ok := desc.(catalog.TableDescriptor)
125-
if !ok {
126-
return errors.AssertionFailedf("expected table descriptor, got %T", desc)
127-
}
128-
// Skip virtual tables
129-
if !tableDesc.IsPhysicalTable() {
130-
continue
131-
}
132-
if ts.FilterList != nil && ts.FilterList.FilterType == jobspb.FilterList_EXCLUDE_TABLES {
128+
// DB-level feeds should have a filter list, even if the table list is empty.
129+
// By default, it would be an ExcludeFilter with an empty table list.
130+
if ts.FilterList == nil {
131+
return errors.AssertionFailedf("filter list is nil")
132+
}
133+
switch ts.FilterList.FilterType {
134+
case tree.ExcludeFilter:
135+
for _, desc := range tables.OrderedDescriptors() {
136+
tableDesc, ok := desc.(catalog.TableDescriptor)
137+
if !ok {
138+
return errors.AssertionFailedf("expected table descriptor, got %T", desc)
139+
}
140+
// Skip virtual tables
141+
if !tableDesc.IsPhysicalTable() {
142+
continue
143+
}
144+
133145
if _, ok := tableDescToSchemaName[tableDesc]; !ok {
134146
schemaID := tableDesc.GetParentSchemaID()
135147
schema, err := descs.ByIDWithLeased(txn.KV()).Get().Schema(ctx, schemaID)
@@ -138,23 +150,70 @@ func getTargetsFromDatabaseSpec(
138150
}
139151
tableDescToSchemaName[tableDesc] = schema.GetName()
140152
}
141-
fullyQualifiedTableName := fmt.Sprintf("%s.%s.%s", dbName, tableDescToSchemaName[tableDesc], tableDesc.GetName())
153+
fullyQualifiedTableName := fmt.Sprintf(
154+
"%s.%s.%s", dbName, tableDescToSchemaName[tableDesc], tableDesc.GetName())
142155
if _, ok := ts.FilterList.Tables[fullyQualifiedTableName]; ok {
143156
continue
144157
}
158+
159+
var tableType jobspb.ChangefeedTargetSpecification_TargetType
160+
if len(tableDesc.GetFamilies()) == 1 {
161+
tableType = jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY
162+
} else {
163+
tableType = jobspb.ChangefeedTargetSpecification_EACH_FAMILY
164+
}
165+
166+
targets.Add(changefeedbase.Target{
167+
Type: tableType,
168+
DescID: desc.GetID(),
169+
StatementTimeName: changefeedbase.StatementTimeName(desc.GetName()),
170+
})
145171
}
146-
var tableType jobspb.ChangefeedTargetSpecification_TargetType
147-
if len(tableDesc.GetFamilies()) == 1 {
148-
tableType = jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY
149-
} else {
150-
tableType = jobspb.ChangefeedTargetSpecification_EACH_FAMILY
151-
}
172+
case tree.IncludeFilter:
173+
for name := range ts.FilterList.Tables {
174+
tn, err := parser.ParseTableName(name)
175+
if err != nil {
176+
return err
177+
}
152178

153-
targets.Add(changefeedbase.Target{
154-
Type: tableType,
155-
DescID: desc.GetID(),
156-
StatementTimeName: changefeedbase.StatementTimeName(desc.GetName()),
157-
})
179+
schemaID, err := descs.LookupSchemaID(ctx, txn.KV(), ts.DescID, tn.Schema())
180+
if err != nil {
181+
return err
182+
}
183+
// Schema is not found in the database.
184+
if schemaID == descpb.InvalidID {
185+
continue
186+
}
187+
188+
tableID, err := descs.LookupObjectID(ctx, txn.KV(), ts.DescID, schemaID, tn.Object())
189+
if err != nil {
190+
return err
191+
}
192+
// Table is not found in the database.
193+
if tableID == descpb.InvalidID {
194+
continue
195+
}
196+
197+
desc, err := descs.ByIDWithLeased(txn.KV()).Get().Table(ctx, tableID)
198+
if err != nil {
199+
return err
200+
}
201+
202+
var tableType jobspb.ChangefeedTargetSpecification_TargetType
203+
if len(desc.GetFamilies()) == 1 {
204+
tableType = jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY
205+
} else {
206+
tableType = jobspb.ChangefeedTargetSpecification_EACH_FAMILY
207+
}
208+
209+
targets.Add(changefeedbase.Target{
210+
Type: tableType,
211+
DescID: tableID,
212+
StatementTimeName: changefeedbase.StatementTimeName(desc.GetName()),
213+
})
214+
}
215+
default:
216+
return errors.AssertionFailedf("invalid changefeed filter type")
158217
}
159218
return nil
160219
})

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -706,15 +706,14 @@ func createChangefeedJobRecord(
706706
if targetDatabaseDesc.GetID() == keys.SystemDatabaseID {
707707
return nil, changefeedbase.Targets{}, errors.Errorf("changefeed cannot target the system database")
708708
}
709-
if changefeedStmt.FilterOption != nil {
710-
fqTableNames, err := getFullyQualifiedTableNames(
711-
targetDatabaseDesc.GetName(), changefeedStmt.FilterOption.Tables,
712-
)
713-
if err != nil {
714-
return nil, changefeedbase.Targets{}, err
715-
}
716-
changefeedStmt.FilterOption.Tables = fqTableNames
709+
fqTableNames, err := getFullyQualifiedTableNames(
710+
targetDatabaseDesc.GetName(), changefeedStmt.FilterOption.Tables,
711+
)
712+
if err != nil {
713+
return nil, changefeedbase.Targets{}, err
717714
}
715+
changefeedStmt.FilterOption.Tables = fqTableNames
716+
718717
targetSpec := getDatabaseTargetSpec(targetDatabaseDesc, changefeedStmt.FilterOption)
719718
details = jobspb.ChangefeedDetails{
720719
TargetSpecifications: []jobspb.ChangefeedTargetSpecification{targetSpec},
@@ -1227,22 +1226,20 @@ func getTargetsAndTables(
12271226
}
12281227

12291228
func getDatabaseTargetSpec(
1230-
targetDatabaseDesc catalog.DatabaseDescriptor, filterOpt *tree.ChangefeedFilterOption,
1229+
targetDatabaseDesc catalog.DatabaseDescriptor, filterOpt tree.ChangefeedFilterOption,
12311230
) jobspb.ChangefeedTargetSpecification {
12321231
target := jobspb.ChangefeedTargetSpecification{
12331232
DescID: targetDatabaseDesc.GetID(),
12341233
Type: jobspb.ChangefeedTargetSpecification_DATABASE,
12351234
StatementTimeName: targetDatabaseDesc.GetName(),
12361235
}
1237-
if filterOpt != nil {
1238-
filterTables := make(map[string]pbtypes.Empty)
1239-
for _, table := range filterOpt.Tables {
1240-
filterTables[table.FQString()] = pbtypes.Empty{}
1241-
}
1242-
target.FilterList = &jobspb.FilterList{
1243-
FilterType: jobspb.FilterList_FilterType(filterOpt.FilterType),
1244-
Tables: filterTables,
1245-
}
1236+
filterTables := make(map[string]pbtypes.Empty)
1237+
for _, table := range filterOpt.Tables {
1238+
filterTables[table.FQString()] = pbtypes.Empty{}
1239+
}
1240+
target.FilterList = &jobspb.FilterList{
1241+
FilterType: filterOpt.FilterType,
1242+
Tables: filterTables,
12461243
}
12471244
return target
12481245
}

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 72 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -231,26 +231,78 @@ func TestDatabaseLevelChangefeedWithIncludeFilter(t *testing.T) {
231231
defer log.Scope(t).Close(t)
232232

233233
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
234-
expectSuccess := func(stmt string) {
235-
successfulFeed := feed(t, f, stmt)
236-
defer closeFeed(t, successfulFeed)
237-
_, err := successfulFeed.Next()
238-
require.NoError(t, err)
239-
}
240234
sqlDB := sqlutils.MakeSQLRunner(s.DB)
241-
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
242-
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
243-
sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`)
244-
sqlDB.Exec(t, `CREATE TABLE foo2 (a INT PRIMARY KEY, b STRING)`)
245-
sqlDB.Exec(t, `INSERT INTO foo2 VALUES (0, 'initial')`)
246-
sqlDB.Exec(t, `UPSERT INTO foo2 VALUES (0, 'updated')`)
235+
for i := range 4 {
236+
name := fmt.Sprintf("foo%d", i+1)
237+
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s (a INT PRIMARY KEY, b STRING)`, name))
238+
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (0, 'initial')`, name))
239+
sqlDB.Exec(t, fmt.Sprintf(`UPSERT INTO %s VALUES (0, 'updated')`, name))
240+
}
241+
242+
sqlDB.Exec(t, `CREATE SCHEMA private`)
243+
sqlDB.Exec(t, `CREATE TABLE private.foo1 (a INT PRIMARY KEY, b STRING)`)
244+
sqlDB.Exec(t, `INSERT INTO private.foo1 VALUES (0, 'initial')`)
245+
// Test that if there are multiple tables with the same name the correct
246+
// one will still be found using the default schema of public.
247+
feed1 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1`)
248+
defer closeFeed(t, feed1)
249+
assertPayloads(t, feed1, []string{
250+
`foo1: [0]->{"after": {"a": 0, "b": "updated"}}`,
251+
})
252+
253+
// Test that we can include multiple tables.
254+
feed2 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1,foo2`)
255+
defer closeFeed(t, feed2)
256+
assertPayloads(t, feed2, []string{
257+
`foo1: [0]->{"after": {"a": 0, "b": "updated"}}`,
258+
`foo2: [0]->{"after": {"a": 0, "b": "updated"}}`,
259+
})
247260

248-
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo`)
249-
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo,foo2`)
250-
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES d.bar.fizz, foo.foo2, foo`)
261+
// Test that we can handle fully qualified, partially qualified, and unqualified
262+
// table names.
263+
feed3 := feed(t, f,
264+
`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES d.public.foo2, public.foo3, foo4`)
265+
defer closeFeed(t, feed3)
266+
assertPayloads(t, feed3, []string{
267+
`foo2: [0]->{"after": {"a": 0, "b": "updated"}}`,
268+
`foo3: [0]->{"after": {"a": 0, "b": "updated"}}`,
269+
`foo4: [0]->{"after": {"a": 0, "b": "updated"}}`,
270+
})
271+
272+
// Test that we can handle tables that don't exist.
273+
feed4 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1,bob`)
274+
defer closeFeed(t, feed4)
275+
assertPayloads(t, feed4, []string{
276+
`foo1: [0]->{"after": {"a": 0, "b": "updated"}}`,
277+
})
278+
279+
// Test that fully qualified table names outside of the target database will
280+
// cause an error.
281+
expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES fizz.buzz.foo`,
282+
`table "fizz.buzz.foo" must be in target database "d"`)
283+
284+
// Table patterns are not supported.
251285
expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo.*`,
252286
`at or near "*": syntax error`)
253-
// TODO(#147421): Assert payload once the filter works
287+
288+
// Test that name resolution is not dependent on search_path() or current DB.
289+
sqlDB.Exec(t, `CREATE DATABASE notd`)
290+
sqlDB.Exec(t, `USE notd`)
291+
sqlDB.Exec(t, `SET search_path TO notpublic`)
292+
feed5 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1, private.foo1`)
293+
defer closeFeed(t, feed5)
294+
assertPayloads(t, feed5, []string{
295+
`foo1: [0]->{"after": {"a": 0, "b": "updated"}}`,
296+
`foo1: [0]->{"after": {"a": 0, "b": "initial"}}`,
297+
})
298+
299+
// Test that partially qualified table names are always assumed to be
300+
// <schema>.<table>.
301+
feed6 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES d.foo1, public.foo2`)
302+
defer closeFeed(t, feed6)
303+
assertPayloads(t, feed6, []string{
304+
`foo2: [0]->{"after": {"a": 0, "b": "updated"}}`,
305+
})
254306
}
255307
cdcTest(t, testFn, feedTestEnterpriseSinks)
256308
}
@@ -300,8 +352,10 @@ func TestDatabaseLevelChangefeedWithExcludeFilter(t *testing.T) {
300352
`foo4: [0]->{"after": {"a": 0, "b": "updated"}}`,
301353
})
302354

303-
// Test that we can handle fully qualfied, partially qualified, and unqualified table names.
304-
feed4 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES d.public.foo2, public.foo3, foo4`)
355+
// Test that we can handle fully qualified, partially qualified, and unqualified
356+
// table names.
357+
feed4 := feed(t, f,
358+
`CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES d.public.foo2, public.foo3, foo4`)
305359
defer closeFeed(t, feed4)
306360
assertPayloads(t, feed4, []string{
307361
`foo1: [0]->{"after": {"a": 0, "b": "initial"}}`,

pkg/jobs/jobspb/jobs.proto

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1113,10 +1113,7 @@ message ChangefeedTargetSpecification {
11131113
}
11141114

11151115
message FilterList {
1116-
enum FilterType {
1117-
EXCLUDE_TABLES = 0;
1118-
}
1119-
FilterType filter_type = 1;
1116+
uint32 filter_type = 1 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/sem/tree.FilterType"];
11201117
map<string, google.protobuf.Empty> tables = 2 [(gogoproto.nullable) = false];
11211118
}
11221119

pkg/sql/parser/sql.y

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -965,11 +965,8 @@ func (u *sqlSymUnion) doBlockOptions() tree.DoBlockOptions {
965965
func (u *sqlSymUnion) doBlockOption() tree.DoBlockOption {
966966
return u.val.(tree.DoBlockOption)
967967
}
968-
func (u *sqlSymUnion) changefeedFilterOption() *tree.ChangefeedFilterOption {
969-
if filterOption, ok := u.val.(*tree.ChangefeedFilterOption); ok {
970-
return filterOption
971-
}
972-
return nil
968+
func (u *sqlSymUnion) changefeedFilterOption() tree.ChangefeedFilterOption {
969+
return u.val.(tree.ChangefeedFilterOption)
973970
}
974971

975972
%}
@@ -1610,7 +1607,7 @@ func (u *sqlSymUnion) changefeedFilterOption() *tree.ChangefeedFilterOption {
16101607
%type <tree.ReturningClause> returning_clause
16111608
%type <tree.TableExprs> opt_using_clause
16121609
%type <tree.RefreshDataOption> opt_clear_data
1613-
%type <*tree.ChangefeedFilterOption> db_level_changefeed_filter_option
1610+
%type <tree.ChangefeedFilterOption> db_level_changefeed_filter_option
16141611

16151612
%type <tree.BatchParam> batch_param
16161613
%type <[]tree.BatchParam> batch_param_list
@@ -6510,15 +6507,15 @@ opt_using_clause:
65106507
db_level_changefeed_filter_option:
65116508
EXCLUDE TABLES table_name_list
65126509
{
6513-
$$.val = &tree.ChangefeedFilterOption{Tables: $3.tableNames(), FilterType: tree.ExcludeFilter}
6510+
$$.val = tree.ChangefeedFilterOption{Tables: $3.tableNames(), FilterType: tree.ExcludeFilter}
65146511
}
65156512
| INCLUDE TABLES table_name_list
65166513
{
6517-
$$.val = &tree.ChangefeedFilterOption{Tables: $3.tableNames(), FilterType: tree.IncludeFilter}
6514+
$$.val = tree.ChangefeedFilterOption{Tables: $3.tableNames(), FilterType: tree.IncludeFilter}
65186515
}
65196516
| /* EMPTY */
65206517
{
6521-
$$.val = nil
6518+
$$.val = tree.ChangefeedFilterOption{}
65226519
}
65236520

65246521

pkg/sql/sem/tree/changefeed.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type CreateChangefeed struct {
4646
TableTargets ChangefeedTableTargets
4747
DatabaseTarget ChangefeedDatabaseTarget
4848
Level ChangefeedLevel
49-
FilterOption *ChangefeedFilterOption
49+
FilterOption ChangefeedFilterOption
5050
SinkURI Expr
5151
Options KVOptions
5252
Select *SelectClause
@@ -73,7 +73,7 @@ func (node *CreateChangefeed) Format(ctx *FmtCtx) {
7373
ctx.FormatNode(&node.TableTargets)
7474
} else {
7575
ctx.FormatNode(&node.DatabaseTarget)
76-
if node.FilterOption != nil {
76+
if len(node.FilterOption.Tables) > 0 {
7777
ctx.WriteString(" ")
7878
ctx.WriteString(node.FilterOption.FilterType.String())
7979
ctx.WriteString(" TABLES ")

0 commit comments

Comments
 (0)