Skip to content

Commit 215b5de

Browse files
committed
crosscluster/logical: auth PlanLogicalReplication with job id
This patch simplifies the implementation of source side table level auth checks. Epic: none Release note: none
1 parent e233359 commit 215b5de

File tree

3 files changed

+10
-1
lines changed

3 files changed

+10
-1
lines changed

pkg/crosscluster/logical/logical_replication_job.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,7 @@ func (p *logicalReplicationPlanner) generatePlanImpl(
488488
// During an offline initial scan, we need to replicate the whole table, not
489489
// just the primary keys.
490490
UseTableSpan: payload.CreateTable && progress.ReplicatedTime.IsEmpty(),
491+
StreamID: streampb.StreamID(payload.StreamID),
491492
}
492493
for _, pair := range payload.ReplicationPairs {
493494
req.TableIDs = append(req.TableIDs, pair.SrcDescriptorID)

pkg/repstream/streampb/stream.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ message LogicalReplicationPlanRequest {
170170
// PlanAsOf is the time as of which the plan should be produced.
171171
util.hlc.Timestamp plan_as_of = 2 [(gogoproto.nullable) = false];
172172
bool use_table_span = 3;
173+
int64 stream_id = 4 [(gogoproto.customname) = "StreamID", (gogoproto.casttype) = "StreamID"];
173174
}
174175

175176
// SourcePartition contains per partition information for a replication plan.

pkg/sql/sem/builtins/replication_builtins.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,14 @@ var replicationBuiltins = map[string]builtinDefinition{
511511
if err := protoutil.Unmarshal(reqBytes, &req); err != nil {
512512
return nil, err
513513
}
514-
if err := mgr.AuthorizeViaReplicationPriv(ctx); err != nil {
514+
if req.StreamID != 0 {
515+
if err := mgr.AuthorizeViaJob(ctx, req.StreamID); err != nil {
516+
return nil, err
517+
}
518+
// Auth via replication priv exists to ensure a user that planned their
519+
// job pre 25.2, which will not send a stream id, can still plan their
520+
// distsql flow.
521+
} else if err := mgr.AuthorizeViaReplicationPriv(ctx); err != nil {
515522
return nil, err
516523
}
517524

0 commit comments

Comments
 (0)