@@ -484,13 +484,14 @@ func evalCursor(
484
484
485
485
func getTargetList (changefeedStmt * annotatedChangefeedStatement ) (* tree.BackupTargetList , error ) {
486
486
targetList := tree.BackupTargetList {}
487
- if changefeedStmt .Level == tree .ChangefeedLevelTable {
487
+ switch changefeedStmt .Level {
488
+ case tree .ChangefeedLevelTable :
488
489
for _ , t := range changefeedStmt .TableTargets {
489
490
targetList .Tables .TablePatterns = append (targetList .Tables .TablePatterns , t .TableName )
490
491
}
491
- } else if changefeedStmt . Level == tree .ChangefeedLevelDatabase {
492
+ case tree .ChangefeedLevelDatabase :
492
493
targetList .Databases = tree.NameList {tree .Name (changefeedStmt .DatabaseTarget )}
493
- } else {
494
+ default :
494
495
return nil , errors .AssertionFailedf ("unknown changefeed level: %s" , changefeedStmt .Level .String ())
495
496
}
496
497
return & targetList , nil
@@ -570,11 +571,23 @@ func createChangefeedJobRecord(
570
571
571
572
var details jobspb.ChangefeedDetails
572
573
573
- tableNameToDescriptor , targetDatabaseDescs , err := getTargetDescriptors (ctx , p , targetList , statementTime , initialHighWater )
574
+ tableNameToDescriptor , targetDatabaseDescs , tableAndParentDescs , err := getTargetDescriptors (
575
+ ctx ,
576
+ p ,
577
+ targetList ,
578
+ statementTime ,
579
+ initialHighWater ,
580
+ )
574
581
575
582
if err != nil {
576
583
return nil , changefeedbase.Targets {}, err
577
584
}
585
+ if len (tableAndParentDescs ) == 0 {
586
+ return nil , changefeedbase.Targets {}, errors .AssertionFailedf ("expected at least one descriptor" )
587
+ }
588
+ if len (targetDatabaseDescs ) > 1 {
589
+ return nil , changefeedbase.Targets {}, errors .AssertionFailedf ("expected at most one database descriptor, got %d" , len (targetDatabaseDescs ))
590
+ }
578
591
579
592
for _ , t := range tableNameToDescriptor {
580
593
if tbl , ok := t .(catalog.TableDescriptor ); ok && tbl .ExternalRowData () != nil {
@@ -640,25 +653,65 @@ func createChangefeedJobRecord(
640
653
hasSelectPrivOnAllTables := true
641
654
hasChangefeedPrivOnAllTables := true
642
655
tolerances := opts .GetCanHandle ()
643
- for _ , desc := range tableNameToDescriptor {
644
- if table , isTable := desc .(catalog.TableDescriptor ); isTable {
645
- if err := changefeedvalidators .ValidateTable (targets , table , tolerances ); err != nil {
646
- return nil , changefeedbase.Targets {}, err
647
- }
648
- for _ , warning := range changefeedvalidators .WarningsForTable (table , tolerances ) {
649
- p .BufferClientNotice (ctx , pgnotice .Newf ("%s" , warning ))
650
- }
656
+ // Core changefeed:
657
+ // - Table-level changefeeds require the user to have SELECT privileges
658
+ // on all target tables
659
+ // - DB-level feeds are not supported
660
+ // Enterprise changefeed:
661
+ // - Table-level feeds require the CHANGEFEED privilege on all target tables
662
+ // - DB-level feeds require the CHANGEFEED privilege on the target database
663
+ switch changefeedStmt .Level {
664
+ case tree .ChangefeedLevelTable :
665
+ _ , tableToDatabaseLookup := buildTableToDatabaseAndSchemaLookup (tableAndParentDescs )
666
+ for _ , desc := range tableNameToDescriptor {
667
+ if table , isTable := desc .(catalog.TableDescriptor ); isTable {
668
+ if err := changefeedvalidators .ValidateTable (targets , table , tolerances ); err != nil {
669
+ return nil , changefeedbase.Targets {}, err
670
+ }
671
+ for _ , warning := range changefeedvalidators .WarningsForTable (table , tolerances ) {
672
+ p .BufferClientNotice (ctx , pgnotice .Newf ("%s" , warning ))
673
+ }
651
674
652
- hasSelect , hasChangefeed , err := checkPrivilegesForDescriptor (ctx , p , desc )
653
- if err != nil {
654
- return nil , changefeedbase.Targets {}, err
675
+ hasSelect , hasChangefeed , err := checkPrivilegesForDescriptor (ctx , p , desc )
676
+ if err != nil {
677
+ return nil , changefeedbase.Targets {}, err
678
+ }
679
+
680
+ databaseDesc , ok := tableToDatabaseLookup [table .GetID ()]
681
+ if ! ok {
682
+ return nil , changefeedbase.Targets {}, errors .AssertionFailedf ("expected to find a database descriptor for table %s" , table .GetName ())
683
+ }
684
+ if ! hasChangefeed {
685
+ _ , hasChangefeed , err = checkPrivilegesForDescriptor (ctx , p , databaseDesc )
686
+ if err != nil {
687
+ return nil , changefeedbase.Targets {}, err
688
+ }
689
+ }
690
+
691
+ hasSelectPrivOnAllTables = hasSelectPrivOnAllTables && hasSelect
692
+ hasChangefeedPrivOnAllTables = hasChangefeedPrivOnAllTables && hasChangefeed
655
693
}
656
- hasSelectPrivOnAllTables = hasSelectPrivOnAllTables && hasSelect
657
- hasChangefeedPrivOnAllTables = hasChangefeedPrivOnAllTables && hasChangefeed
658
694
}
695
+ case tree .ChangefeedLevelDatabase :
696
+ _ , hasDatabaseChangefeedPriv , err := checkPrivilegesForDescriptor (ctx , p , targetDatabaseDescs [0 ])
697
+ if err != nil {
698
+ return nil , changefeedbase.Targets {}, err
699
+ }
700
+ hasChangefeedPrivOnAllTables = hasDatabaseChangefeedPriv
701
+ default :
702
+ return nil , changefeedbase.Targets {}, errors .AssertionFailedf ("unknown changefeed level: %s" , changefeedStmt .Level )
659
703
}
704
+
660
705
if checkPrivs {
661
- if err := authorizeUserToCreateChangefeed (ctx , p , sinkURI , hasSelectPrivOnAllTables , hasChangefeedPrivOnAllTables , opts .GetConfluentSchemaRegistry ()); err != nil {
706
+ if err := authorizeUserToCreateChangefeed (
707
+ ctx ,
708
+ p ,
709
+ sinkURI ,
710
+ hasSelectPrivOnAllTables ,
711
+ hasChangefeedPrivOnAllTables ,
712
+ changefeedStmt .Level ,
713
+ opts .GetConfluentSchemaRegistry (),
714
+ ); err != nil {
662
715
return nil , changefeedbase.Targets {}, err
663
716
}
664
717
}
@@ -984,22 +1037,24 @@ func getTargetDescriptors(
984
1037
) (
985
1038
tableNameToDescriptor map [tree.TablePattern ]catalog.Descriptor ,
986
1039
databaseDescs []catalog.DatabaseDescriptor ,
1040
+ tableAndParentDescs []catalog.Descriptor ,
987
1041
err error ,
988
1042
) {
989
1043
if len (targets .Databases ) > 0 && len (targets .Tables .TablePatterns ) > 0 {
990
- return nil , nil , errors .Errorf (`CHANGEFEED cannot target both databases and tables` )
1044
+ return nil , nil , nil , errors .Errorf (`CHANGEFEED cannot target both databases and tables` )
991
1045
}
1046
+
992
1047
for _ , t := range targets .Tables .TablePatterns {
993
1048
p , err := t .NormalizeTablePattern ()
994
1049
if err != nil {
995
- return nil , nil , err
1050
+ return nil , nil , nil , err
996
1051
}
997
1052
if _ , ok := p .(* tree.TableName ); ! ok {
998
- return nil , nil , errors .Errorf (`CHANGEFEED cannot target %s` , tree .AsString (t ))
1053
+ return nil , nil , nil , errors .Errorf (`CHANGEFEED cannot target %s` , tree .AsString (t ))
999
1054
}
1000
1055
}
1001
-
1002
- _ , _ , targetDatabaseDescs , targetTableDescs , err := backupresolver .ResolveTargetsToDescriptors (ctx , p , statementTime , targets )
1056
+ // targetTableDescs is empty if the targets are not tables, targetDatabaseDescs is empty if the targets are not databases
1057
+ targetAndParentDescs , _ , targetDatabaseDescs , targetTableDescs , err := backupresolver .ResolveTargetsToDescriptors (ctx , p , statementTime , targets )
1003
1058
if err != nil {
1004
1059
var m * backupresolver.MissingTableErr
1005
1060
if errors .As (err , & m ) {
@@ -1013,7 +1068,7 @@ func getTargetDescriptors(
1013
1068
"do the targets exist at the specified cursor time %s?" , initialHighWater )
1014
1069
}
1015
1070
}
1016
- return targetTableDescs , targetDatabaseDescs , err
1071
+ return targetTableDescs , targetDatabaseDescs , targetAndParentDescs , err
1017
1072
}
1018
1073
1019
1074
func getTargetsAndTables (
@@ -1217,8 +1272,13 @@ func makeChangefeedDescription(
1217
1272
opts changefeedbase.StatementOptions ,
1218
1273
) (string , error ) {
1219
1274
c := & tree.CreateChangefeed {
1220
- TableTargets : changefeed .TableTargets ,
1221
- Select : changefeed .Select ,
1275
+ Select : changefeed .Select ,
1276
+ Level : changefeed .Level ,
1277
+ }
1278
+ if changefeed .Level == tree .ChangefeedLevelDatabase {
1279
+ c .DatabaseTarget = changefeed .DatabaseTarget
1280
+ } else {
1281
+ c .TableTargets = changefeed .TableTargets
1222
1282
}
1223
1283
1224
1284
if sinkURI != "" {
@@ -2124,3 +2184,34 @@ func getChangefeedEventMigrator(migrateEvent bool) log.StructuredEventMigrator {
2124
2184
channel .CHANGEFEED ,
2125
2185
)
2126
2186
}
2187
+
2188
+ func buildTableToDatabaseAndSchemaLookup (
2189
+ targetAndParentDescs []catalog.Descriptor ,
2190
+ ) (
2191
+ tableToSchema map [descpb.ID ]catalog.SchemaDescriptor ,
2192
+ tableToDatabase map [descpb.ID ]catalog.DatabaseDescriptor ,
2193
+ ) {
2194
+ // getSchema maps schema IDs to its schema descriptor.
2195
+ getSchema := make (map [descpb.ID ]catalog.SchemaDescriptor )
2196
+ // getDatabase maps database IDs to its database descriptor.
2197
+ getDatabase := make (map [descpb.ID ]catalog.DatabaseDescriptor )
2198
+ for _ , desc := range targetAndParentDescs {
2199
+ switch desc .DescriptorType () {
2200
+ case catalog .Schema :
2201
+ getSchema [desc .GetID ()] = desc .(catalog.SchemaDescriptor )
2202
+ case catalog .Database :
2203
+ getDatabase [desc .GetID ()] = desc .(catalog.DatabaseDescriptor )
2204
+ }
2205
+ }
2206
+ // tableToSchema maps table IDs to its parent schema descriptor.
2207
+ tableToSchema = make (map [descpb.ID ]catalog.SchemaDescriptor )
2208
+ // tableToDatabase maps table IDs to its parent database descriptor.
2209
+ tableToDatabase = make (map [descpb.ID ]catalog.DatabaseDescriptor )
2210
+ for _ , desc := range targetAndParentDescs {
2211
+ if desc .DescriptorType () == catalog .Table {
2212
+ tableToDatabase [desc .GetID ()] = getDatabase [desc .GetParentID ()]
2213
+ tableToSchema [desc .GetID ()] = getSchema [desc .GetParentSchemaID ()]
2214
+ }
2215
+ }
2216
+ return tableToSchema , tableToDatabase
2217
+ }
0 commit comments