Skip to content

Commit 5b0e51e

Browse files
craig[bot]msbutler
andcommitted
Merge #142772
142772: crosscluster/physical: update ALTER REPLICATION JOB auth r=jeffswenson a=msbutler Epic CRDB-47102 Release note (ops change): all ALTER VIRTUAL CLUSTER REPLICATION JOB cmds, except for ALTER VIRTUAL CLUSTER SET REPLICATION SOURCE, will require the REPLICATIONDEST priv, in addition to MANAGEVIRTUALCLUSTER. ALTER VIRTUAL CLUSTER SET REPLICATION SOURCE now requires REPLICATIONSOURCE. Note, if the ingestion job was created before 25.1, the user can still alter the replication job without REPLICATIONDEST priv-- we'd like to leave this undocumented though. Co-authored-by: Michael Butler <[email protected]>
2 parents 8a3a6f6 + f48f2a5 commit 5b0e51e

File tree

3 files changed

+65
-1
lines changed

3 files changed

+65
-1
lines changed

pkg/crosscluster/physical/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ go_library(
2727
"//pkg/cloud",
2828
"//pkg/cloud/externalconn",
2929
"//pkg/cloud/externalconn/connectionpb",
30+
"//pkg/clusterversion",
3031
"//pkg/crosscluster",
3132
"//pkg/crosscluster/producer",
3233
"//pkg/crosscluster/replicationutils",
@@ -63,6 +64,7 @@ go_library(
6364
"//pkg/sql/isql",
6465
"//pkg/sql/pgwire/pgcode",
6566
"//pkg/sql/pgwire/pgerror",
67+
"//pkg/sql/pgwire/pgnotice",
6668
"//pkg/sql/physicalplan",
6769
"//pkg/sql/privilege",
6870
"//pkg/sql/rowenc",
@@ -132,6 +134,7 @@ go_test(
132134
"//pkg/ccl/kvccl/kvtenantccl",
133135
"//pkg/ccl/storageccl",
134136
"//pkg/cloud/impl:cloudimpl",
137+
"//pkg/clusterversion",
135138
"//pkg/crosscluster",
136139
"//pkg/crosscluster/producer",
137140
"//pkg/crosscluster/replicationtestutils",

pkg/crosscluster/physical/alter_replication_job.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"time"
1313

1414
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
15+
"github.com/cockroachdb/cockroach/pkg/clusterversion"
1516
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils"
1617
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
1718
"github.com/cockroachdb/cockroach/pkg/jobs"
@@ -26,9 +27,12 @@ import (
2627
"github.com/cockroachdb/cockroach/pkg/sql/isql"
2728
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
2829
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
30+
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
31+
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
2932
"github.com/cockroachdb/cockroach/pkg/sql/sem/asof"
3033
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
3134
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
35+
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege"
3236
"github.com/cockroachdb/cockroach/pkg/sql/types"
3337
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3438
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -224,9 +228,22 @@ func alterReplicationJobHook(
224228
}
225229
jobRegistry := p.ExecCfg().JobRegistry
226230
if alterTenantStmt.Producer {
231+
if err := p.CheckPrivilege(
232+
ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.REPLICATIONSOURCE,
233+
); err != nil {
234+
return err
235+
}
227236
return alterTenantSetReplicationSource(ctx, p.InternalSQLTxn(), jobRegistry, options, tenInfo)
228237
}
229-
238+
if err := p.CheckPrivilege(
239+
ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.REPLICATIONDEST,
240+
); err != nil {
241+
if !p.ExecCfg().Settings.Version.IsActive(ctx, clusterversion.V25_2) {
242+
p.BufferClientNotice(ctx, pgnotice.Newf("this command will require the REPLICATIONDEST privilege on a fully upgraded 25.2+ cluster"))
243+
} else {
244+
return err
245+
}
246+
}
230247
// If a source uri is being provided, we're enabling replication into an
231248
// existing virtual cluster. It must be inactive, and we'll verify that it
232249
// was the cluster from which the one it will replicate was replicated, i.e.

pkg/crosscluster/physical/alter_replication_job_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,12 @@ import (
1212
"time"
1313

1414
"github.com/cockroachdb/cockroach/pkg/base"
15+
"github.com/cockroachdb/cockroach/pkg/clusterversion"
1516
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationtestutils"
1617
"github.com/cockroachdb/cockroach/pkg/jobs"
1718
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
19+
"github.com/cockroachdb/cockroach/pkg/security/username"
20+
"github.com/cockroachdb/cockroach/pkg/server"
1821
"github.com/cockroachdb/cockroach/pkg/sql"
1922
"github.com/cockroachdb/cockroach/pkg/testutils"
2023
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
@@ -544,10 +547,51 @@ func TestAlterReplicationJobErrors(t *testing.T) {
544547
defer srv.Stopper().Stop(ctx)
545548

546549
db := sqlutils.MakeSQLRunner(sqlDB)
550+
db.Exec(t, "CREATE TENANT t1")
551+
552+
db.Exec(t, fmt.Sprintf("CREATE USER %s", username.TestUser))
553+
testuser := sqlutils.MakeSQLRunner(srv.SQLConn(t, serverutils.User(username.TestUser)))
547554

548555
t.Run("alter tenant subqueries", func(t *testing.T) {
549556
// Regression test for #136339
550557
db.ExpectErr(t, "subqueries are not allowed", "ALTER TENANT (select 't2') START REPLICATION OF t1 ON 'foo'")
551558
})
559+
t.Run("alter replication dest privs", func(t *testing.T) {
560+
cmd := "ALTER TENANT t1 SET REPLICATION RETENTION ='100ms'"
561+
testuser.ExpectErr(t, "user testuser does not have MANAGEVIRTUALCLUSTER system privilege", cmd)
562+
db.Exec(t, fmt.Sprintf("GRANT SYSTEM MANAGEVIRTUALCLUSTER TO %s", username.TestUser))
563+
testuser.ExpectErr(t, "user testuser does not have REPLICATIONDEST system privilege", cmd)
564+
db.Exec(t, fmt.Sprintf("GRANT SYSTEM REPLICATIONDEST TO %s", username.TestUser))
565+
// Implies we got past the priv checks.
566+
testuser.ExpectErr(t, `does not have an active replication consumer job`, cmd)
567+
db.Exec(t, fmt.Sprintf("REVOKE SYSTEM MANAGEVIRTUALCLUSTER FROM %s", username.TestUser))
568+
})
569+
t.Run("alter replication source privs", func(t *testing.T) {
570+
cmd := "ALTER TENANT t1 SET REPLICATION SOURCE EXPIRATION WINDOW ='100ms'"
571+
testuser.ExpectErr(t, "user testuser does not have MANAGEVIRTUALCLUSTER system privilege", cmd)
572+
db.Exec(t, fmt.Sprintf("GRANT SYSTEM MANAGEVIRTUALCLUSTER TO %s", username.TestUser))
573+
testuser.ExpectErr(t, "user testuser does not have REPLICATIONSOURCE system privilege", cmd)
574+
db.Exec(t, fmt.Sprintf("GRANT SYSTEM REPLICATIONSOURCE TO %s", username.TestUser))
575+
testuser.Exec(t, cmd)
576+
})
577+
t.Run("alter replication dest priv 24.3", func(t *testing.T) {
578+
params := base.TestServerArgs{
579+
DefaultTestTenant: base.TestControlsTenantsExplicitly,
580+
}
581+
params.Knobs.Server = &server.TestingKnobs{
582+
ClusterVersionOverride: clusterversion.V24_3.Version(),
583+
DisableAutomaticVersionUpgrade: make(chan struct{}),
584+
}
552585

586+
srv, sqlDB, _ := serverutils.StartServer(t, params)
587+
defer srv.Stopper().Stop(ctx)
588+
db := sqlutils.MakeSQLRunner(sqlDB)
589+
db.Exec(t, "CREATE TENANT t1")
590+
db.Exec(t, fmt.Sprintf("CREATE USER %s", username.TestUser))
591+
testuser := sqlutils.MakeSQLRunner(srv.SQLConn(t, serverutils.User(username.TestUser)))
592+
db.Exec(t, fmt.Sprintf("GRANT SYSTEM MANAGEVIRTUALCLUSTER TO %s", username.TestUser))
593+
// Implies we got past the priv checks, without REPLICATIONDEST.
594+
cmd := "ALTER TENANT t1 SET REPLICATION RETENTION ='100ms'"
595+
testuser.ExpectErr(t, `does not have an active replication consumer job`, cmd)
596+
})
553597
}

0 commit comments

Comments
 (0)