Skip to content

Commit d729d9b

Browse files
craig[bot]msbutler
andcommitted
Merge #142840
142840: crosscluster/logical: add table level auth for REPLICATIONDEST r=dt a=msbutler Epic: [CRDB-47102](https://cockroachlabs.atlassian.net/browse/CRDB-47102) Release note (sql change): this patch allows a user to begin LDR on an existing table if the user has a table level REPLICATIONDEST priv. Furthermore, this patch allows a user to begin LDR onto an automatically created table if the user has the parent database level CREATE privilege. Finally, during bidirectional replication, this patch allows the user in the OG source URI, which will begin the reverse stream, to authorize via this table level REPLICATIONDEST priv. Co-authored-by: Michael Butler <[email protected]>
2 parents ff8d2bd + a7984b5 commit d729d9b

File tree

5 files changed

+151
-14
lines changed

5 files changed

+151
-14
lines changed

pkg/crosscluster/logical/create_logical_replication_stmt.go

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,6 @@ func createLogicalReplicationStreamPlanHook(
102102
return err
103103
}
104104

105-
// TODO(dt): the global priv is a big hammer; should we be checking just on
106-
// table(s) or database being replicated from and into?
107-
if err := p.CheckPrivilege(
108-
ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.REPLICATIONDEST,
109-
); err != nil {
110-
return err
111-
}
112-
113105
if stmt.From.Database != "" {
114106
return errors.UnimplementedErrorf(errors.IssueLink{}, "logical replication streams on databases are unsupported")
115107
}
@@ -158,6 +150,10 @@ func createLogicalReplicationStreamPlanHook(
158150
return err
159151
}
160152

153+
if err := checkReplicationPrivileges(ctx, p, stmt, resolvedDestObjects, options.BidirectionalURI()); err != nil {
154+
return errors.Wrapf(err, "failed privilege check: table or system level REPLICATIONDEST privilege required")
155+
}
156+
161157
if !p.ExtendedEvalContext().TxnIsSingleStmt {
162158
return errors.New("cannot CREATE LOGICAL REPLICATION STREAM in a multi-statement transaction")
163159
}
@@ -361,6 +357,14 @@ func (r *ResolvedDestObjects) TargetDescription() string {
361357
return targetDescription
362358
}
363359

360+
func (r *ResolvedDestObjects) TargetTableNames() []string {
361+
var targetTableNames []string
362+
for i := range r.TableNames {
363+
targetTableNames = append(targetTableNames, r.TableNames[i].Table())
364+
}
365+
return targetTableNames
366+
}
367+
364368
func resolveDestinationObjects(
365369
ctx context.Context,
366370
r resolver.SchemaResolver,
@@ -728,3 +732,36 @@ func (r *resolvedLogicalReplicationOptions) BidirectionalURI() string {
728732
}
729733
return r.bidirectionalURI
730734
}
735+
736+
func checkReplicationPrivileges(
737+
ctx context.Context,
738+
p sql.PlanHookState,
739+
stmt *tree.CreateLogicalReplicationStream,
740+
resolvedDestObjects ResolvedDestObjects,
741+
bidirectionalStream string,
742+
) error {
743+
if err := p.CheckPrivilege(
744+
ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.REPLICATIONDEST,
745+
); err != nil {
746+
if !stmt.CreateTable {
747+
return replicationutils.AuthorizeTableLevelPriv(ctx, p, p.ExtendedEvalContext().SessionAccessor, privilege.REPLICATIONDEST, resolvedDestObjects.TargetTableNames())
748+
} else {
749+
dbDesc, err := p.InternalSQLTxn().Descriptors().ByIDWithLeased(p.InternalSQLTxn().KV()).WithoutNonPublic().Get().Database(ctx, resolvedDestObjects.ParentDatabaseID)
750+
if err != nil {
751+
return err
752+
}
753+
if err := p.CheckPrivilege(ctx, dbDesc, privilege.CREATE); err != nil {
754+
return err
755+
}
756+
if bidirectionalStream != "" {
757+
// TODO(msbutler): how to validate that the user in the reverse stream
758+
// URI has REPLICATIONSOURCE priv on a table that has yet to be created?
759+
// We could assert it is the same user as the current user, then we
760+
// could grant the user the REPLICATIONSOURCE priv on table creation?
761+
// Or, we could make REPLICATIONSOURCE a db level priv, required for
762+
// BIDI??
763+
}
764+
}
765+
}
766+
return nil
767+
}

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1954,6 +1954,7 @@ func TestShowLogicalReplicationJobs(t *testing.T) {
19541954
func TestUserPrivileges(t *testing.T) {
19551955
defer leaktest.AfterTest(t)()
19561956
skip.UnderDeadlock(t)
1957+
skip.UnderRaceWithIssue(t, 142992)
19571958
defer log.Scope(t).Close(t)
19581959

19591960
ctx := context.Background()
@@ -1978,6 +1979,8 @@ func TestUserPrivileges(t *testing.T) {
19781979
dbA.Exec(t, fmt.Sprintf("GRANT SYSTEM REPLICATIONDEST TO %s", username.TestUser+"2"))
19791980
testuser := sqlutils.MakeSQLRunner(s.SQLConn(t, serverutils.User(username.TestUser), serverutils.DBName("a")))
19801981
testuser2 := sqlutils.MakeSQLRunner(s.SQLConn(t, serverutils.User(username.TestUser+"2"), serverutils.DBName("a")))
1982+
dbB.Exec(t, "CREATE USER testuser3")
1983+
dbBURL2 := replicationtestutils.GetExternalConnectionURI(t, s, s, serverutils.DBName("b"), serverutils.User(username.TestUser+"3"))
19811984

19821985
var jobAID jobspb.JobID
19831986
createStmt := "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab"
@@ -2022,21 +2025,58 @@ func TestUserPrivileges(t *testing.T) {
20222025
})
20232026

20242027
t.Run("replication-dest", func(t *testing.T) {
2025-
testuser.ExpectErr(t, "user testuser does not have REPLICATIONDEST system privilege", createStmt, dbBURL.String())
2028+
testuser.ExpectErr(t, "failed privilege check: table or system level REPLICATIONDEST privilege required: user testuser does not have REPLICATIONDEST privilege on relation tab", createStmt, dbBURL.String())
20262029
dbA.Exec(t, fmt.Sprintf("GRANT SYSTEM REPLICATIONDEST TO %s", username.TestUser))
20272030
testuser.QueryRow(t, createStmt, dbBURL.String()).Scan(&jobAID)
2031+
dbA.Exec(t, fmt.Sprintf("REVOKE SYSTEM REPLICATIONDEST FROM %s", username.TestUser))
20282032
})
20292033
t.Run("replication-src", func(t *testing.T) {
2030-
dbB.Exec(t, "CREATE USER testuser3")
2031-
dbBURL2 := replicationtestutils.GetExternalConnectionURI(t, s, s, serverutils.DBName("b"), serverutils.User(username.TestUser+"3"))
2032-
testuser.ExpectErr(t, "user testuser3 does not have REPLICATIONSOURCE system privilege", createStmt, dbBURL2.String())
2034+
dbA.ExpectErr(t, "user testuser3 does not have REPLICATIONSOURCE system privilege", createStmt, dbBURL2.String())
20332035
sourcePriv := "REPLICATIONSOURCE"
20342036
if rng.Intn(3) == 0 {
20352037
// Test deprecated privilege name.
20362038
sourcePriv = "REPLICATION"
20372039
}
2040+
20382041
dbB.Exec(t, fmt.Sprintf("GRANT SYSTEM %s TO %s", sourcePriv, username.TestUser+"3"))
2039-
testuser.QueryRow(t, createStmt, dbBURL2.String()).Scan(&jobAID)
2042+
dbA.QueryRow(t, createStmt, dbBURL2.String()).Scan(&jobAID)
2043+
dbB.Exec(t, fmt.Sprintf("REVOKE SYSTEM %s FROM %s", sourcePriv, username.TestUser+"3"))
2044+
})
2045+
t.Run("table-level-replication-dest", func(t *testing.T) {
2046+
2047+
dbA.Exec(t, `CREATE TABLE tab2 (x INT PRIMARY KEY)`)
2048+
dbB.Exec(t, `CREATE TABLE tab2 (x INT PRIMARY KEY)`)
2049+
2050+
multiTableStmt := `CREATE LOGICAL REPLICATION STREAM FROM TABLES (tab, tab2) ON $1 INTO TABLES (tab, tab2)`
2051+
2052+
testuser.ExpectErr(t, "failed privilege check: table or system level REPLICATIONDEST privilege required: user testuser does not have REPLICATIONDEST privilege on relation tab", multiTableStmt, dbBURL.String())
2053+
2054+
dbA.Exec(t, fmt.Sprintf(`GRANT REPLICATIONDEST ON TABLE tab TO %s`, username.TestUser))
2055+
testuser.ExpectErr(t, "failed privilege check: table or system level REPLICATIONDEST privilege required: user testuser does not have REPLICATIONDEST privilege on relation tab2", multiTableStmt, dbBURL.String())
2056+
2057+
dbA.Exec(t, fmt.Sprintf(`GRANT REPLICATIONDEST ON TABLE tab2 TO %s`, username.TestUser))
2058+
testuser.Exec(t, multiTableStmt, dbBURL.String())
2059+
dbA.Exec(t, fmt.Sprintf(`REVOKE REPLICATIONDEST ON TABLE tab FROM %s`, username.TestUser))
2060+
dbA.Exec(t, fmt.Sprintf(`REVOKE REPLICATIONDEST ON TABLE tab2 FROM %s`, username.TestUser))
2061+
})
2062+
t.Run("db-create-replication-dest", func(t *testing.T) {
2063+
createStmt := "CREATE LOGICALLY REPLICATED TABLES (tab_clone, tab2_clone) FROM TABLES (tab, tab2) ON $1 WITH UNIDIRECTIONAL"
2064+
// First try without CREATE privilege on destination database - should fail
2065+
testuser.ExpectErr(t, "user testuser does not have CREATE privilege on database a", createStmt, dbBURL.String())
2066+
2067+
// Grant CREATE privilege on destination database - should now succeed
2068+
dbA.Exec(t, `GRANT CREATE ON DATABASE a TO testuser`)
2069+
testuser.Exec(t, createStmt, dbBURL.String())
2070+
2071+
dbAURL := replicationtestutils.GetExternalConnectionURI(t, s, s, serverutils.DBName("a"), serverutils.User(username.TestUser))
2072+
2073+
dbB.Exec(t, fmt.Sprintf("GRANT SYSTEM REPLICATION TO %s", username.TestUser+"3"))
2074+
createStmtBidi := "CREATE LOGICALLY REPLICATED TABLES (tab_clone_2, tab2_clone_2) FROM TABLES (tab, tab2) ON $1 WITH BIDIRECTIONAL ON $2"
2075+
testuser.ExpectErr(t, " uri requires REPLICATIONDEST privilege for bidirectional replication: user testuser3 does not have REPLICATIONDEST privilege on relation tab", createStmtBidi, dbBURL2.String(), dbAURL.String())
2076+
2077+
dbB.Exec(t, fmt.Sprintf("GRANT SYSTEM REPLICATIONDEST TO %s", username.TestUser+"3"))
2078+
testuser.QueryRow(t, createStmtBidi, dbBURL2.String(), dbAURL.String()).Scan(&jobAID)
2079+
20402080
})
20412081
}
20422082

pkg/crosscluster/producer/replication_manager.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,9 @@ func (r *replicationStreamManagerImpl) StartReplicationStreamForTables(
8484
if execConfig.Codec.IsSystem() && !kvserver.RangefeedEnabled.Get(&execConfig.Settings.SV) {
8585
return streampb.ReplicationProducerSpec{}, errors.Errorf("kv.rangefeed.enabled must be enabled on the source cluster for logical replication")
8686
}
87-
87+
if err := maybeAuthorizeReverseStream(ctx, r, req); err != nil {
88+
return streampb.ReplicationProducerSpec{}, errors.Wrap(err, "uri requires REPLICATIONDEST privilege for bidirectional replication")
89+
}
8890
if err := maybeValidateReverseURI(ctx, req.UnvalidatedReverseStreamURI, r.evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig).InternalDB); err != nil {
8991
return streampb.ReplicationProducerSpec{}, errors.Wrap(err, "reverse stream uri failed validation")
9092
}
@@ -157,6 +159,20 @@ func (r *replicationStreamManagerImpl) StartReplicationStreamForTables(
157159
}, nil
158160
}
159161

162+
func maybeAuthorizeReverseStream(
163+
ctx context.Context, r *replicationStreamManagerImpl, req streampb.ReplicationProducerRequest,
164+
) error {
165+
if req.UnvalidatedReverseStreamURI == "" {
166+
return nil
167+
}
168+
if err := r.evalCtx.SessionAccessor.CheckPrivilege(ctx,
169+
syntheticprivilege.GlobalPrivilegeObject,
170+
privilege.REPLICATIONDEST); err != nil {
171+
return replicationutils.AuthorizeTableLevelPriv(ctx, r.resolver, r.evalCtx.SessionAccessor, privilege.REPLICATIONDEST, req.TableNames)
172+
}
173+
return nil
174+
}
175+
160176
func maybeValidateReverseURI(ctx context.Context, reverseURI string, db *sql.InternalDB) error {
161177
if reverseURI == "" {
162178
return nil

pkg/crosscluster/replicationutils/BUILD.bazel

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,17 @@ go_library(
1212
"//pkg/repstream/streampb",
1313
"//pkg/roachpb",
1414
"//pkg/sql",
15+
"//pkg/sql/catalog",
1516
"//pkg/sql/catalog/catpb",
1617
"//pkg/sql/catalog/descpb",
1718
"//pkg/sql/catalog/descs",
19+
"//pkg/sql/catalog/resolver",
1820
"//pkg/sql/catalog/tabledesc",
1921
"//pkg/sql/isql",
22+
"//pkg/sql/parser",
23+
"//pkg/sql/privilege",
24+
"//pkg/sql/sem/eval",
25+
"//pkg/sql/sem/tree",
2026
"//pkg/storage",
2127
"//pkg/storage/mvccencoding",
2228
"//pkg/testutils/fingerprintutils",

pkg/crosscluster/replicationutils/utils.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,17 @@ import (
1818
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
1919
"github.com/cockroachdb/cockroach/pkg/roachpb"
2020
"github.com/cockroachdb/cockroach/pkg/sql"
21+
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
2122
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
2223
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
2324
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
25+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver"
2426
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
2527
"github.com/cockroachdb/cockroach/pkg/sql/isql"
28+
"github.com/cockroachdb/cockroach/pkg/sql/parser"
29+
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
30+
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
31+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2632
"github.com/cockroachdb/cockroach/pkg/storage"
2733
"github.com/cockroachdb/cockroach/pkg/storage/mvccencoding"
2834
"github.com/cockroachdb/cockroach/pkg/testutils/fingerprintutils"
@@ -327,3 +333,35 @@ func UnlockLDRTables(
327333
return nil
328334
})
329335
}
336+
337+
func AuthorizeTableLevelPriv(
338+
ctx context.Context,
339+
r resolver.SchemaResolver,
340+
sessionAccessor eval.SessionAccessor,
341+
priv privilege.Kind,
342+
tableNames []string,
343+
) error {
344+
for _, name := range tableNames {
345+
uon, err := parser.ParseTableName(name)
346+
if err != nil {
347+
return err
348+
}
349+
lookupFlags := tree.ObjectLookupFlags{
350+
Required: true,
351+
DesiredObjectKind: tree.TableObject,
352+
DesiredTableDescKind: tree.ResolveRequireTableDesc,
353+
}
354+
d, _, err := resolver.ResolveExistingObject(ctx, r, uon, lookupFlags)
355+
if err != nil {
356+
return err
357+
}
358+
td, ok := d.(catalog.TableDescriptor)
359+
if !ok {
360+
return errors.New("expected table descriptor")
361+
}
362+
if err := sessionAccessor.CheckPrivilege(ctx, td, priv); err != nil {
363+
return err
364+
}
365+
}
366+
return nil
367+
}

0 commit comments

Comments
 (0)