Skip to content

Commit 35d0006

Browse files
committed
changefeedccl: support EXCLUDE TABLES in DB-level feeds
Support the ability to exclude tables from being watched in a database-level feed. An example usage is: CREATE DATABASE FOR CHANGEFEED defaultdb EXCLUDE TABLES foo,fizz.buzz,defaultdb.bar.tab,notdefaultdb.bar.tab; The table names above in the EXCLUDE TABLES list will be resolved as such: Non-qualified table name foo: - defaultdb.public.foo Partially-qualified table name fizz.buzz: - defaultdb.fizz.buzz Fully-qualified table name defaultdb.bar.tab: - defaultdb.bar.tab Fully-qualified table name notdefaultdb.bar.tab: - Statement error; changefeed job will not be created Note that partially qualified table names in the form <database>.<table> are not supported. Partially qualfied table names are always assumed to be <schema>.<table>. Resolves: #147424 Release note: none
1 parent 8f413be commit 35d0006

File tree

8 files changed

+166
-29
lines changed

8 files changed

+166
-29
lines changed

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ go_library(
120120
"//pkg/sql/rowenc",
121121
"//pkg/sql/rowexec",
122122
"//pkg/sql/sem/asof",
123+
"//pkg/sql/sem/catconstants",
123124
"//pkg/sql/sem/catid",
124125
"//pkg/sql/sem/eval",
125126
"//pkg/sql/sem/tree",

pkg/ccl/changefeedccl/changefeed.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package changefeedccl
88
import (
99
"context"
1010
"encoding/json"
11+
"fmt"
1112

1213
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1314
"github.com/cockroachdb/cockroach/pkg/cloud"
@@ -113,10 +114,12 @@ func getTargetsFromDatabaseSpec(
113114
if err != nil {
114115
return err
115116
}
117+
tableDescToSchemaName := make(map[catalog.TableDescriptor]string)
116118
tables, err := descs.GetAllTablesInDatabase(ctx, txn.KV(), databaseDescriptor)
117119
if err != nil {
118120
return err
119121
}
122+
dbName := databaseDescriptor.GetName()
120123
for _, desc := range tables.OrderedDescriptors() {
121124
tableDesc, ok := desc.(catalog.TableDescriptor)
122125
if !ok {
@@ -126,6 +129,20 @@ func getTargetsFromDatabaseSpec(
126129
if !tableDesc.IsPhysicalTable() {
127130
continue
128131
}
132+
if ts.FilterList != nil && ts.FilterList.FilterType == jobspb.FilterList_EXCLUDE_TABLES {
133+
if _, ok := tableDescToSchemaName[tableDesc]; !ok {
134+
schemaID := tableDesc.GetParentSchemaID()
135+
schema, err := descs.ByIDWithLeased(txn.KV()).Get().Schema(ctx, schemaID)
136+
if err != nil {
137+
return err
138+
}
139+
tableDescToSchemaName[tableDesc] = schema.GetName()
140+
}
141+
fullyQualifiedTableName := fmt.Sprintf("%s.%s.%s", dbName, tableDescToSchemaName[tableDesc], tableDesc.GetName())
142+
if _, ok := ts.FilterList.Tables[fullyQualifiedTableName]; ok {
143+
continue
144+
}
145+
}
129146
var tableType jobspb.ChangefeedTargetSpecification_TargetType
130147
if len(tableDesc.GetFamilies()) == 1 {
131148
tableType = jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
4747
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
4848
"github.com/cockroachdb/cockroach/pkg/sql/sem/asof"
49+
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
4950
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
5051
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
5152
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
@@ -65,6 +66,7 @@ import (
6566
"github.com/cockroachdb/cockroach/pkg/util/uuid"
6667
"github.com/cockroachdb/errors"
6768
"github.com/cockroachdb/redact"
69+
pbtypes "github.com/gogo/protobuf/types"
6870
)
6971

7072
// featureChangefeedEnabled is used to enable and disable the CHANGEFEED feature.
@@ -631,11 +633,22 @@ func createChangefeedJobRecord(
631633
if len(targetDatabaseDescs) == 0 || len(targetDatabaseDescs) > 1 {
632634
return nil, changefeedbase.Targets{}, errors.Errorf("changefeed only supports one database target")
633635
}
634-
if targetDatabaseDescs[0].GetID() == keys.SystemDatabaseID {
636+
targetDatabaseDesc := targetDatabaseDescs[0]
637+
if targetDatabaseDesc.GetID() == keys.SystemDatabaseID {
635638
return nil, changefeedbase.Targets{}, errors.Errorf("changefeed cannot target the system database")
636639
}
640+
if changefeedStmt.FilterOption != nil {
641+
fqTableNames, err := getFullyQualifiedTableNames(
642+
targetDatabaseDesc.GetName(), changefeedStmt.FilterOption.Tables,
643+
)
644+
if err != nil {
645+
return nil, changefeedbase.Targets{}, err
646+
}
647+
changefeedStmt.FilterOption.Tables = fqTableNames
648+
}
649+
targetSpec := getDatabaseTargetSpec(targetDatabaseDesc, changefeedStmt.FilterOption)
637650
details = jobspb.ChangefeedDetails{
638-
TargetSpecifications: getDatabaseTargets(targetDatabaseDescs),
651+
TargetSpecifications: []jobspb.ChangefeedTargetSpecification{targetSpec},
639652
SinkURI: sinkURI,
640653
StatementTime: statementTime,
641654
EndTime: endTime,
@@ -1144,19 +1157,53 @@ func getTargetsAndTables(
11441157
return targets, tables, nil
11451158
}
11461159

1147-
func getDatabaseTargets(
1148-
targetDatabaseDescs []catalog.DatabaseDescriptor,
1149-
) []jobspb.ChangefeedTargetSpecification {
1150-
targets := make([]jobspb.ChangefeedTargetSpecification, len(targetDatabaseDescs))
1160+
func getDatabaseTargetSpec(
1161+
targetDatabaseDesc catalog.DatabaseDescriptor, filterOpt *tree.ChangefeedFilterOption,
1162+
) jobspb.ChangefeedTargetSpecification {
1163+
target := jobspb.ChangefeedTargetSpecification{
1164+
DescID: targetDatabaseDesc.GetID(),
1165+
Type: jobspb.ChangefeedTargetSpecification_DATABASE,
1166+
StatementTimeName: targetDatabaseDesc.GetName(),
1167+
}
1168+
if filterOpt != nil {
1169+
filterTables := make(map[string]pbtypes.Empty)
1170+
for _, table := range filterOpt.Tables {
1171+
filterTables[table.FQString()] = pbtypes.Empty{}
1172+
}
1173+
target.FilterList = &jobspb.FilterList{
1174+
FilterType: jobspb.FilterList_FilterType(filterOpt.FilterType),
1175+
Tables: filterTables,
1176+
}
1177+
}
1178+
return target
1179+
}
11511180

1152-
for i, desc := range targetDatabaseDescs {
1153-
targets[i] = jobspb.ChangefeedTargetSpecification{
1154-
DescID: desc.GetID(),
1155-
Type: jobspb.ChangefeedTargetSpecification_DATABASE,
1156-
StatementTimeName: desc.GetName(),
1181+
func getFullyQualifiedTableNames(
1182+
targetDatabase string, tableNames tree.TableNames,
1183+
) (tree.TableNames, error) {
1184+
var fqTableNames tree.TableNames
1185+
1186+
for _, tableName := range tableNames {
1187+
if tableName.SchemaName == "" {
1188+
// The table name is non-qualified e.g. foo. This will resolve to <targetDatabase>.public.foo.
1189+
tableName.SchemaName = catconstants.PublicSchemaName
1190+
tableName.CatalogName = tree.Name(targetDatabase)
1191+
} else if tableName.CatalogName == "" {
1192+
// The table name is partially qualified e.g. foo.bar. This will resolve to
1193+
// <targetDatabase>.foo.bar.
1194+
tableName.CatalogName = tree.Name(targetDatabase)
1195+
} else {
1196+
// Table name is fully qualfied e.g. foo.bar.fizz. This will resolve to
1197+
// foo.bar.fizz unless foo != <targetDatabase>, in which case it would fail.
1198+
if tableName.CatalogName != tree.Name(targetDatabase) {
1199+
return nil, errors.AssertionFailedf(
1200+
"table %q must be in target database %q", tableName.FQString(), targetDatabase,
1201+
)
1202+
}
11571203
}
1204+
fqTableNames = append(fqTableNames, tableName)
11581205
}
1159-
return targets
1206+
return fqTableNames, nil
11601207
}
11611208

11621209
func validateSink(

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 76 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ func TestDatabaseLevelChangefeedWithIncludeFilter(t *testing.T) {
245245

246246
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo`)
247247
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo,foo2`)
248-
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo.bar.fizz, foo.foo2, foo`)
248+
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES d.bar.fizz, foo.foo2, foo`)
249249
expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo.*`,
250250
`at or near "*": syntax error`)
251251
// TODO(#147421): Assert payload once the filter works
@@ -258,26 +258,85 @@ func TestDatabaseLevelChangefeedWithExcludeFilter(t *testing.T) {
258258
defer log.Scope(t).Close(t)
259259

260260
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
261-
expectSuccess := func(stmt string) {
262-
successfulFeed := feed(t, f, stmt)
263-
defer closeFeed(t, successfulFeed)
264-
_, err := successfulFeed.Next()
265-
require.NoError(t, err)
266-
}
267261
sqlDB := sqlutils.MakeSQLRunner(s.DB)
268-
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
269-
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
270-
sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`)
271-
sqlDB.Exec(t, `CREATE TABLE foo2 (a INT PRIMARY KEY, b STRING)`)
272-
sqlDB.Exec(t, `INSERT INTO foo2 VALUES (0, 'initial')`)
273-
sqlDB.Exec(t, `UPSERT INTO foo2 VALUES (0, 'updated')`)
262+
for i := range 4 {
263+
name := fmt.Sprintf("foo%d", i+1)
264+
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s (a INT PRIMARY KEY, b STRING)`, name))
265+
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (0, 'initial')`, name))
266+
sqlDB.Exec(t, fmt.Sprintf(`UPSERT INTO %s VALUES (0, 'updated')`, name))
267+
}
268+
269+
sqlDB.Exec(t, `CREATE SCHEMA private`)
270+
sqlDB.Exec(t, `CREATE TABLE private.foo1 (a INT PRIMARY KEY, b STRING)`)
271+
sqlDB.Exec(t, `INSERT INTO private.foo1 VALUES (0, 'initial')`)
272+
// Test that if there are multiple tables with the same name the correct
273+
// one will still be found using the default schema of public.
274+
feed1 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo1`)
275+
defer closeFeed(t, feed1)
276+
assertPayloads(t, feed1, []string{
277+
`foo1: [0]->{"after": {"a": 0, "b": "initial"}}`,
278+
`foo2: [0]->{"after": {"a": 0, "b": "updated"}}`,
279+
`foo3: [0]->{"after": {"a": 0, "b": "updated"}}`,
280+
`foo4: [0]->{"after": {"a": 0, "b": "updated"}}`,
281+
})
282+
283+
feed2 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo2`)
284+
defer closeFeed(t, feed2)
285+
assertPayloads(t, feed2, []string{
286+
`foo1: [0]->{"after": {"a": 0, "b": "initial"}}`,
287+
`foo1: [0]->{"after": {"a": 0, "b": "updated"}}`,
288+
`foo3: [0]->{"after": {"a": 0, "b": "updated"}}`,
289+
`foo4: [0]->{"after": {"a": 0, "b": "updated"}}`,
290+
})
291+
292+
// Test that we can exclude multiple tables.
293+
feed3 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo2,foo3`)
294+
defer closeFeed(t, feed3)
295+
assertPayloads(t, feed3, []string{
296+
`foo1: [0]->{"after": {"a": 0, "b": "initial"}}`,
297+
`foo1: [0]->{"after": {"a": 0, "b": "updated"}}`,
298+
`foo4: [0]->{"after": {"a": 0, "b": "updated"}}`,
299+
})
300+
301+
// Test that we can handle fully qualfied, partially qualified, and unqualified table names.
302+
feed4 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES d.public.foo2, public.foo3, foo4`)
303+
defer closeFeed(t, feed4)
304+
assertPayloads(t, feed4, []string{
305+
`foo1: [0]->{"after": {"a": 0, "b": "initial"}}`,
306+
`foo1: [0]->{"after": {"a": 0, "b": "updated"}}`,
307+
})
308+
309+
// Test that we can handle tables that don't exist.
310+
feed5 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES bob`)
311+
defer closeFeed(t, feed5)
312+
assertPayloads(t, feed5, []string{
313+
`foo1: [0]->{"after": {"a": 0, "b": "initial"}}`,
314+
`foo1: [0]->{"after": {"a": 0, "b": "updated"}}`,
315+
`foo2: [0]->{"after": {"a": 0, "b": "updated"}}`,
316+
`foo3: [0]->{"after": {"a": 0, "b": "updated"}}`,
317+
`foo4: [0]->{"after": {"a": 0, "b": "updated"}}`,
318+
})
274319

275-
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo`)
276-
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo,foo2`)
277-
expectSuccess(`CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo.bar.fizz, foo.foo2, foo`)
320+
// Test that fully qualified table names outside of the target database will
321+
// cause an error.
322+
expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES fizz.buzz.foo`,
323+
`table "fizz.buzz.foo" must be in target database "d"`)
324+
325+
// Table patterns are not supported.
278326
expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo.*`,
279327
`at or near "*": syntax error`)
280-
// TODO(#147421): Assert payload once the filter works
328+
329+
// Test that name resolution is not dependent on search_path() or current DB
330+
sqlDB.Exec(t, `CREATE DATABASE notd`)
331+
sqlDB.Exec(t, `USE notd`)
332+
sqlDB.Exec(t, `SET search_path TO notpublic`)
333+
feed6 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo1, private.foo1`)
334+
defer closeFeed(t, feed6)
335+
assertPayloads(t, feed6, []string{
336+
`foo2: [0]->{"after": {"a": 0, "b": "updated"}}`,
337+
`foo3: [0]->{"after": {"a": 0, "b": "updated"}}`,
338+
`foo4: [0]->{"after": {"a": 0, "b": "updated"}}`,
339+
})
281340
}
282341
cdcTest(t, testFn, feedTestEnterpriseSinks)
283342
}

pkg/cmd/protoc-gen-gogoroach/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ go_proto_compiler(
3636
"Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types",
3737
"Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor",
3838
"Mgoogle/protobuf/duration.proto=github.com/gogo/protobuf/types",
39+
"Mgoogle/protobuf/empty.proto=github.com/gogo/protobuf/types",
3940
],
4041
plugin = ":protoc-gen-gogoroach",
4142
valid_archive = False,
@@ -58,6 +59,7 @@ go_proto_compiler(
5859
"Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types",
5960
"Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor",
6061
"Mgoogle/protobuf/duration.proto=github.com/gogo/protobuf/types",
62+
"Mgoogle/protobuf/empty.proto=github.com/gogo/protobuf/types",
6163
],
6264
plugin = ":protoc-gen-gogoroach",
6365
valid_archive = False,

pkg/jobs/jobspb/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ proto_library(
5050
"@com_github_cockroachdb_errors//errorspb:errorspb_proto",
5151
"@com_github_gogo_protobuf//gogoproto:gogo_proto",
5252
"@com_google_protobuf//:any_proto",
53+
"@com_google_protobuf//:empty_proto",
5354
"@com_google_protobuf//:timestamp_proto",
5455
],
5556
)

pkg/jobs/jobspb/jobs.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import "clusterversion/cluster_version.proto";
2222
import "google/protobuf/timestamp.proto";
2323
import "util/tracing/tracingpb/recorded_span.proto";
2424
import "sql/catalog/externalcatalog/externalpb/external.proto";
25+
import "google/protobuf/empty.proto";
2526

2627
enum EncryptionMode {
2728
Passphrase = 0;
@@ -1107,7 +1108,15 @@ message ChangefeedTargetSpecification {
11071108
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"];
11081109
string family_name = 3;
11091110
string statement_time_name = 4;
1111+
FilterList filter_list = 5;
1112+
}
11101113

1114+
message FilterList {
1115+
enum FilterType {
1116+
EXCLUDE_TABLES = 0;
1117+
}
1118+
FilterType filter_type = 1;
1119+
map<string, google.protobuf.Empty> tables = 2 [(gogoproto.nullable) = false];
11111120
}
11121121

11131122
message ChangefeedDetails {

pkg/protos.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ SERVER_PROTOS = [
5858
"@com_google_protobuf//:any_proto",
5959
"@com_google_protobuf//:descriptor_proto",
6060
"@com_google_protobuf//:duration_proto",
61+
"@com_google_protobuf//:empty_proto",
6162
"@com_google_protobuf//:timestamp_proto",
6263
"@go_googleapis//google/api:annotations_proto",
6364
"@go_googleapis//google/api:http_proto",

0 commit comments

Comments
 (0)