Skip to content

Commit 084ce67

Browse files
committed
kv/rangefeed: filter and test internal rangefeed registry
Filter and test internal rangefeed registry. Epic: None Release note: None
1 parent a99e71a commit 084ce67

File tree

4 files changed

+74
-18
lines changed

4 files changed

+74
-18
lines changed

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,7 @@ go_test(
324324
"//pkg/util/intsets",
325325
"//pkg/util/ioctx",
326326
"//pkg/util/json",
327+
"//pkg/util/keysutil",
327328
"//pkg/util/leaktest",
328329
"//pkg/util/log",
329330
"//pkg/util/log/eventpb",

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ import (
9191
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
9292
"github.com/cockroachdb/cockroach/pkg/util/hlc"
9393
"github.com/cockroachdb/cockroach/pkg/util/json"
94+
"github.com/cockroachdb/cockroach/pkg/util/keysutil"
9495
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
9596
"github.com/cockroachdb/cockroach/pkg/util/log"
9697
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
@@ -9128,22 +9129,66 @@ func TestDistSenderRangeFeedPopulatesVirtualTable(t *testing.T) {
91289129
defer leaktest.AfterTest(t)()
91299130
defer log.Scope(t).Close(t)
91309131

9131-
s, cleanup := makeServer(t)
9132-
defer cleanup()
9132+
scanner := keysutil.MakePrettyScanner(nil, nil)
91339133

9134-
sqlDB := sqlutils.MakeSQLRunner(s.DB)
9135-
sqlDB.Exec(t, `CREATE TABLE tbl (a INT, b STRING);`)
9136-
sqlDB.Exec(t, `INSERT INTO tbl VALUES (1, 'one'), (2, 'two'), (3, 'three');`)
9137-
sqlDB.Exec(t, `CREATE CHANGEFEED FOR tbl INTO 'null://';`)
9138-
9139-
var tableID int
9140-
sqlDB.QueryRow(t, "SELECT table_id FROM crdb_internal.tables WHERE name='tbl'").Scan(&tableID)
9141-
tableKey := s.Codec.TablePrefix(uint32(tableID))
9142-
9143-
numRangesQuery := fmt.Sprintf(
9144-
"SELECT count(*) FROM crdb_internal.active_range_feeds WHERE range_start LIKE '%s/%%'",
9145-
tableKey)
9146-
sqlDB.CheckQueryResultsRetry(t, numRangesQuery, [][]string{{"1"}})
9134+
observeTables := func(sqlDB *sqlutils.SQLRunner, codec keys.SQLCodec) []int {
9135+
rows := sqlDB.Query(t, "SELECT range_start FROM crdb_internal.active_range_feeds")
9136+
defer rows.Close()
9137+
var tableIDs []int
9138+
for rows.Next() {
9139+
var prettyKey string
9140+
require.NoError(t, rows.Scan(&prettyKey))
9141+
key, err := scanner.Scan(prettyKey)
9142+
require.NoError(t, err)
9143+
_, tableID, err := codec.DecodeTablePrefix(key)
9144+
require.NoError(t, err)
9145+
tableIDs = append(tableIDs, int(tableID))
9146+
}
9147+
return tableIDs
9148+
}
9149+
9150+
cases := []struct {
9151+
user string
9152+
shouldSeeTable bool
9153+
}{
9154+
{`feedCreator`, false},
9155+
{`regularUser`, false},
9156+
{`adminUser`, true},
9157+
{`viewClusterMetadataUser`, true},
9158+
}
9159+
9160+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
9161+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
9162+
9163+
// Creates several different tables, users, and roles for us to use.
9164+
ChangefeedJobPermissionsTestSetup(t, s)
9165+
9166+
var tableID int
9167+
sqlDB.QueryRow(t, "SELECT table_id FROM crdb_internal.tables WHERE name = 'table_a'").Scan(&tableID)
9168+
9169+
var cf cdctest.TestFeed
9170+
asUser(t, f, `feedCreator`, func(userDB *sqlutils.SQLRunner) {
9171+
cf = feed(t, f, `CREATE CHANGEFEED FOR table_a;`)
9172+
})
9173+
defer closeFeed(t, cf)
9174+
9175+
for _, c := range cases {
9176+
testutils.SucceedsSoon(t, func() error {
9177+
asUser(t, f, c.user, func(userDB *sqlutils.SQLRunner) {
9178+
tableIDs := observeTables(userDB, s.Codec)
9179+
if c.shouldSeeTable {
9180+
require.Containsf(t, tableIDs, tableID, "user %s should see table %d", c.user, tableID)
9181+
} else {
9182+
require.Emptyf(t, tableIDs, "user %s should not see any tables", c.user)
9183+
}
9184+
})
9185+
return nil
9186+
})
9187+
}
9188+
9189+
}
9190+
9191+
cdcTest(t, testFn, feedTestEnterpriseSinks)
91479192
}
91489193

91499194
func TestChangefeedCaseInsensitiveOpts(t *testing.T) {

pkg/ccl/changefeedccl/helpers_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1577,6 +1577,9 @@ func ChangefeedJobPermissionsTestSetup(t *testing.T, s TestServer) {
15771577
`GRANT CHANGEFEED ON table_a TO userWithSomeGrants`,
15781578

15791579
`CREATE USER regularUser`,
1580+
1581+
`CREATE USER viewClusterMetadataUser`,
1582+
`GRANT SYSTEM VIEWCLUSTERMETADATA TO viewClusterMetadataUser`,
15801583
)
15811584
}
15821585

pkg/sql/crdb_internal.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1780,12 +1780,12 @@ var crdbInternalSessionBasedLeases = virtualSchemaView{
17801780
schema: `
17811781
CREATE VIEW crdb_internal.kv_session_based_leases (
17821782
desc_id,
1783-
version,
1783+
version,
17841784
sql_instance_id,
1785-
session_id,
1785+
session_id,
17861786
crdb_region
17871787
) AS (
1788-
SELECT desc_id, version, sql_instance_id, session_id, crdb_region
1788+
SELECT desc_id, version, sql_instance_id, session_id, crdb_region
17891789
FROM system.lease
17901790
);
17911791
`,
@@ -7592,6 +7592,13 @@ CREATE TABLE crdb_internal.active_range_feeds (
75927592
last_err STRING
75937593
);`,
75947594
populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error {
7595+
privileged, err := p.HasPrivilege(ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.VIEWCLUSTERMETADATA, p.User())
7596+
if err != nil {
7597+
return err
7598+
}
7599+
if !privileged {
7600+
return nil
7601+
}
75957602
return p.execCfg.DistSender.ForEachActiveRangeFeed(
75967603
func(rfCtx kvcoord.RangeFeedContext, rf kvcoord.PartialRangeFeed) error {
75977604
now := p.EvalContext().GetStmtTimestamp()

0 commit comments

Comments
 (0)