Skip to content

Commit 39234ec

Browse files
committed
changefeedccl: modify changefeed job payload to support db-level changefeeds
Update and populate changefeed job payload to support db-level changefeeds. Epic: CRDB-1421 Resolves: #147370 Release note: None
1 parent 894535e commit 39234ec

File tree

4 files changed

+106
-31
lines changed

4 files changed

+106
-31
lines changed

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 86 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,20 @@ func evalCursor(
475475
return asOf.Timestamp, nil
476476
}
477477

478+
func getTargetList(changefeedStmt *annotatedChangefeedStatement) (*tree.BackupTargetList, error) {
479+
targetList := tree.BackupTargetList{}
480+
if changefeedStmt.Level == tree.ChangefeedLevelTable {
481+
for _, t := range changefeedStmt.TableTargets {
482+
targetList.Tables.TablePatterns = append(targetList.Tables.TablePatterns, t.TableName)
483+
}
484+
} else if changefeedStmt.Level == tree.ChangefeedLevelDatabase {
485+
targetList.Databases = tree.NameList{tree.Name(changefeedStmt.DatabaseTarget)}
486+
} else {
487+
return nil, errors.AssertionFailedf("unknown changefeed level: %s", changefeedStmt.Level.String())
488+
}
489+
return &targetList, nil
490+
}
491+
478492
func createChangefeedJobRecord(
479493
ctx context.Context,
480494
p sql.PlanHookState,
@@ -542,18 +556,20 @@ func createChangefeedJobRecord(
542556
}
543557
}
544558

545-
tableOnlyTargetList := tree.BackupTargetList{}
546-
for _, t := range changefeedStmt.TableTargets {
547-
tableOnlyTargetList.Tables.TablePatterns = append(tableOnlyTargetList.Tables.TablePatterns, t.TableName)
559+
targetList, err := getTargetList(changefeedStmt)
560+
if err != nil {
561+
return nil, err
548562
}
549563

550-
// This grabs table descriptors once to get their ids.
551-
targetDescs, err := getTableDescriptors(ctx, p, &tableOnlyTargetList, statementTime, initialHighWater)
564+
var details jobspb.ChangefeedDetails
565+
566+
tableNameToDescriptor, targetDatabaseDescs, err := getTargetDescriptors(ctx, p, targetList, statementTime, initialHighWater)
567+
552568
if err != nil {
553569
return nil, err
554570
}
555571

556-
for _, t := range targetDescs {
572+
for _, t := range tableNameToDescriptor {
557573
if tbl, ok := t.(catalog.TableDescriptor); ok && tbl.ExternalRowData() != nil {
558574
if tbl.ExternalRowData().TenantID.IsSet() {
559575
return nil, errors.UnimplementedError(errors.IssueLink{}, "changefeeds on a replication target are not supported")
@@ -562,7 +578,8 @@ func createChangefeedJobRecord(
562578
}
563579
}
564580

565-
targets, tables, err := getTargetsAndTables(ctx, p, targetDescs, changefeedStmt.TableTargets,
581+
// This grabs table descriptors once to get their ids.
582+
targets, tables, err := getTargetsAndTables(ctx, p, tableNameToDescriptor, changefeedStmt.TableTargets,
566583
changefeedStmt.originalSpecs, opts.ShouldUseFullStatementTimeName())
567584

568585
if err != nil {
@@ -572,19 +589,44 @@ func createChangefeedJobRecord(
572589
sd := p.SessionData().Clone()
573590
// Add non-local session data state (localization, etc).
574591
sessiondata.MarshalNonLocal(p.SessionData(), &sd.SessionData)
575-
details := jobspb.ChangefeedDetails{
576-
Tables: tables,
577-
SinkURI: sinkURI,
578-
StatementTime: statementTime,
579-
EndTime: endTime,
580-
TargetSpecifications: targets,
581-
SessionData: &sd.SessionData,
592+
if changefeedStmt.Level == tree.ChangefeedLevelTable {
593+
for _, t := range tableNameToDescriptor {
594+
if tbl, ok := t.(catalog.TableDescriptor); ok && tbl.ExternalRowData() != nil {
595+
if tbl.ExternalRowData().TenantID.IsSet() {
596+
return nil, errors.UnimplementedError(errors.IssueLink{}, "changefeeds on a replication target are not supported")
597+
}
598+
return nil, errors.UnimplementedError(errors.IssueLink{}, "changefeeds on external tables are not supported")
599+
}
600+
}
601+
602+
details = jobspb.ChangefeedDetails{
603+
Tables: tables,
604+
SinkURI: sinkURI,
605+
StatementTime: statementTime,
606+
EndTime: endTime,
607+
TargetSpecifications: targets,
608+
SessionData: &sd.SessionData,
609+
}
610+
} else if changefeedStmt.Level == tree.ChangefeedLevelDatabase {
611+
if len(targetDatabaseDescs) == 0 || len(targetDatabaseDescs) > 1 {
612+
return nil, errors.Errorf("only one database target is supported")
613+
}
614+
targets = getDatabaseTargets(targetDatabaseDescs)
615+
details = jobspb.ChangefeedDetails{
616+
TargetSpecifications: targets,
617+
SinkURI: sinkURI,
618+
StatementTime: statementTime,
619+
EndTime: endTime,
620+
SessionData: &sd.SessionData,
621+
}
622+
} else {
623+
return nil, errors.AssertionFailedf("unknown changefeed level: %s", changefeedStmt.Level)
582624
}
583625

584626
specs := AllTargets(details)
585627
hasSelectPrivOnAllTables := true
586628
hasChangefeedPrivOnAllTables := true
587-
for _, desc := range targetDescs {
629+
for _, desc := range tableNameToDescriptor {
588630
if table, isTable := desc.(catalog.TableDescriptor); isTable {
589631
if err := changefeedvalidators.ValidateTable(specs, table, tolerances); err != nil {
590632
return nil, err
@@ -610,7 +652,7 @@ func createChangefeedJobRecord(
610652
if changefeedStmt.Select != nil {
611653
// Serialize changefeed expression.
612654
normalized, withDiff, err := validateAndNormalizeChangefeedExpression(
613-
ctx, p, opts, changefeedStmt.Select, targetDescs, targets, statementTime,
655+
ctx, p, opts, changefeedStmt.Select, tableNameToDescriptor, targets, statementTime,
614656
)
615657
if err != nil {
616658
return nil, err
@@ -866,7 +908,7 @@ Few hours to a few days range are appropriate values for this option.`
866908
Description: description,
867909
Username: p.User(),
868910
DescriptorIDs: func() (sqlDescIDs []descpb.ID) {
869-
for _, desc := range targetDescs {
911+
for _, desc := range tableNameToDescriptor {
870912
sqlDescIDs = append(sqlDescIDs, desc.GetID())
871913
}
872914
return sqlDescIDs
@@ -899,31 +941,31 @@ func validateSettings(ctx context.Context, needsRangeFeed bool, execCfg *sql.Exe
899941
return nil
900942
}
901943

902-
func getTableDescriptors(
944+
func getTargetDescriptors(
903945
ctx context.Context,
904946
p sql.PlanHookState,
905947
targets *tree.BackupTargetList,
906948
statementTime hlc.Timestamp,
907949
initialHighWater hlc.Timestamp,
908-
) (map[tree.TablePattern]catalog.Descriptor, error) {
909-
// For now, disallow targeting a database or wildcard table selection.
910-
// Getting it right as tables enter and leave the set over time is
911-
// tricky.
912-
if len(targets.Databases) > 0 {
913-
return nil, errors.Errorf(`CHANGEFEED cannot target %s`,
914-
tree.AsString(targets))
950+
) (
951+
tableNameToDescriptor map[tree.TablePattern]catalog.Descriptor,
952+
databaseDescs []catalog.DatabaseDescriptor,
953+
err error,
954+
) {
955+
if len(targets.Databases) > 0 && len(targets.Tables.TablePatterns) > 0 {
956+
return nil, nil, errors.Errorf(`CHANGEFEED cannot target both databases and tables`)
915957
}
916958
for _, t := range targets.Tables.TablePatterns {
917959
p, err := t.NormalizeTablePattern()
918960
if err != nil {
919-
return nil, err
961+
return nil, nil, err
920962
}
921963
if _, ok := p.(*tree.TableName); !ok {
922-
return nil, errors.Errorf(`CHANGEFEED cannot target %s`, tree.AsString(t))
964+
return nil, nil, errors.Errorf(`CHANGEFEED cannot target %s`, tree.AsString(t))
923965
}
924966
}
925967

926-
_, _, _, targetDescs, err := backupresolver.ResolveTargetsToDescriptors(ctx, p, statementTime, targets)
968+
_, _, targetDatabaseDescs, targetTableDescs, err := backupresolver.ResolveTargetsToDescriptors(ctx, p, statementTime, targets)
927969
if err != nil {
928970
var m *backupresolver.MissingTableErr
929971
if errors.As(err, &m) {
@@ -937,7 +979,7 @@ func getTableDescriptors(
937979
"do the targets exist at the specified cursor time %s?", initialHighWater)
938980
}
939981
}
940-
return targetDescs, err
982+
return targetTableDescs, targetDatabaseDescs, err
941983
}
942984

943985
func getTargetsAndTables(
@@ -1013,6 +1055,21 @@ func getTargetsAndTables(
10131055
return targets, tables, nil
10141056
}
10151057

1058+
func getDatabaseTargets(
1059+
targetDatabaseDescs []catalog.DatabaseDescriptor,
1060+
) []jobspb.ChangefeedTargetSpecification {
1061+
targets := make([]jobspb.ChangefeedTargetSpecification, len(targetDatabaseDescs))
1062+
1063+
for i, desc := range targetDatabaseDescs {
1064+
targets[i] = jobspb.ChangefeedTargetSpecification{
1065+
DescID: desc.GetID(),
1066+
Type: jobspb.ChangefeedTargetSpecification_DATABASE,
1067+
StatementTimeName: desc.GetName(),
1068+
}
1069+
}
1070+
return targets
1071+
}
1072+
10161073
func validateSink(
10171074
ctx context.Context,
10181075
p sql.PlanHookState,

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11820,3 +11820,17 @@ func TestCloudstorageParallelCompression(t *testing.T) {
1182011820
}
1182111821
})
1182211822
}
11823+
11824+
func TestDatabaseLevelChangefeed(t *testing.T) {
11825+
defer leaktest.AfterTest(t)()
11826+
defer log.Scope(t).Close(t)
11827+
11828+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
11829+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
11830+
sqlDB.Exec(t, `CREATE DATABASE foo`)
11831+
sqlDB.Exec(t, `CREATE TABLE foo.bar(id int primary key, s string)`)
11832+
sqlDB.Exec(t, `INSERT INTO foo.bar(id, s) VALUES (0, 'hello'), (1, null)`)
11833+
expectErrCreatingFeed(t, f, `CREATE CHANGEFEED for DATABASE foo`, "database-level changefeed is not implemented")
11834+
}
11835+
cdcTest(t, testFn)
11836+
}

pkg/jobs/jobspb/jobs.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,8 +1098,8 @@ message ChangefeedTargetSpecification {
10981098
// Column family family_name of table table_id.
10991099
COLUMN_FAMILY = 2;
11001100

1101-
// Add TargetTypes for database, secondary index, etc. when implemented
1102-
1101+
DATABASE = 3;
1102+
// Add TargetTypes for schema, secondary index, etc. when implemented
11031103
}
11041104

11051105
TargetType type = 1;

pkg/sql/sem/tree/changefeed.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ const (
2121
ChangefeedLevelDatabase
2222
)
2323

24+
func (l ChangefeedLevel) String() string {
25+
return []string{"TABLE", "DATABASE"}[l]
26+
}
27+
2428
// CreateChangefeed represents a CREATE CHANGEFEED statement.
2529
type CreateChangefeed struct {
2630
TableTargets ChangefeedTableTargets

0 commit comments

Comments
 (0)