Skip to content

Commit df504c6

Browse files
committed
kv/rangefeed: filter and test internal rangefeed registry
Filter and test internal rangefeed registry. Epic: None Release note: None
1 parent f4a0482 commit df504c6

File tree

4 files changed

+77
-21
lines changed

4 files changed

+77
-21
lines changed

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ go_test(
331331
"//pkg/util/intsets",
332332
"//pkg/util/ioctx",
333333
"//pkg/util/json",
334+
"//pkg/util/keysutil",
334335
"//pkg/util/leaktest",
335336
"//pkg/util/log",
336337
"//pkg/util/log/eventpb",

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 63 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ import (
9191
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
9292
"github.com/cockroachdb/cockroach/pkg/util/hlc"
9393
"github.com/cockroachdb/cockroach/pkg/util/json"
94+
"github.com/cockroachdb/cockroach/pkg/util/keysutil"
9495
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
9596
"github.com/cockroachdb/cockroach/pkg/util/log"
9697
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
@@ -823,24 +824,24 @@ func TestChangefeedQuotedIdentifiersTopicName(t *testing.T) {
823824
sqlDB := sqlutils.MakeSQLRunner(s.DB)
824825

825826
sqlDB.Exec(t, `CREATE TABLE mytable (
826-
id INT PRIMARY KEY,
827+
id INT PRIMARY KEY,
827828
"SomeField" JSONB,
828829
"AnotherField" JSONB
829830
)`)
830831

831832
sqlDB.Exec(t, `INSERT INTO mytable VALUES (
832-
1,
833+
1,
833834
'{"PropA": "value1", "prop_b": "value2"}'::jsonb,
834835
'{"PropC": "value3", "prop_d": "value4"}'::jsonb
835836
)`)
836837

837838
sqlDB.Exec(t, `INSERT INTO mytable VALUES (
838-
2,
839+
2,
839840
'{"PropA": "value5", "prop_b": "value6"}'::jsonb,
840841
'{"PropC": "value7", "prop_d": "value8"}'::jsonb
841842
)`)
842843

843-
foo := feed(t, f, `CREATE CHANGEFEED WITH diff, full_table_name, on_error=pause, envelope=wrapped AS SELECT
844+
foo := feed(t, f, `CREATE CHANGEFEED WITH diff, full_table_name, on_error=pause, envelope=wrapped AS SELECT
844845
id,
845846
"SomeField"->>'PropA' AS "PropA",
846847
"SomeField"->>'prop_b' AS "PropB",
@@ -9390,22 +9391,66 @@ func TestDistSenderRangeFeedPopulatesVirtualTable(t *testing.T) {
93909391
defer leaktest.AfterTest(t)()
93919392
defer log.Scope(t).Close(t)
93929393

9393-
s, cleanup := makeServer(t)
9394-
defer cleanup()
9394+
scanner := keysutil.MakePrettyScanner(nil, nil)
93959395

9396-
sqlDB := sqlutils.MakeSQLRunner(s.DB)
9397-
sqlDB.Exec(t, `CREATE TABLE tbl (a INT, b STRING);`)
9398-
sqlDB.Exec(t, `INSERT INTO tbl VALUES (1, 'one'), (2, 'two'), (3, 'three');`)
9399-
sqlDB.Exec(t, `CREATE CHANGEFEED FOR tbl INTO 'null://';`)
9396+
observeTables := func(sqlDB *sqlutils.SQLRunner, codec keys.SQLCodec) []int {
9397+
rows := sqlDB.Query(t, "SELECT range_start FROM crdb_internal.active_range_feeds")
9398+
defer rows.Close()
9399+
var tableIDs []int
9400+
for rows.Next() {
9401+
var prettyKey string
9402+
require.NoError(t, rows.Scan(&prettyKey))
9403+
key, err := scanner.Scan(prettyKey)
9404+
require.NoError(t, err)
9405+
_, tableID, err := codec.DecodeTablePrefix(key)
9406+
require.NoError(t, err)
9407+
tableIDs = append(tableIDs, int(tableID))
9408+
}
9409+
return tableIDs
9410+
}
9411+
9412+
cases := []struct {
9413+
user string
9414+
shouldSeeTable bool
9415+
}{
9416+
{`feedCreator`, false},
9417+
{`regularUser`, false},
9418+
{`adminUser`, true},
9419+
{`viewClusterMetadataUser`, true},
9420+
}
9421+
9422+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
9423+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
9424+
9425+
// Creates several different tables, users, and roles for us to use.
9426+
ChangefeedJobPermissionsTestSetup(t, s)
94009427

9401-
var tableID int
9402-
sqlDB.QueryRow(t, "SELECT table_id FROM crdb_internal.tables WHERE name='tbl'").Scan(&tableID)
9403-
tableKey := s.Codec.TablePrefix(uint32(tableID))
9428+
var tableID int
9429+
sqlDB.QueryRow(t, "SELECT table_id FROM crdb_internal.tables WHERE name = 'table_a'").Scan(&tableID)
94049430

9405-
numRangesQuery := fmt.Sprintf(
9406-
"SELECT count(*) FROM crdb_internal.active_range_feeds WHERE range_start LIKE '%s/%%'",
9407-
tableKey)
9408-
sqlDB.CheckQueryResultsRetry(t, numRangesQuery, [][]string{{"1"}})
9431+
var cf cdctest.TestFeed
9432+
asUser(t, f, `feedCreator`, func(userDB *sqlutils.SQLRunner) {
9433+
cf = feed(t, f, `CREATE CHANGEFEED FOR table_a;`)
9434+
})
9435+
defer closeFeed(t, cf)
9436+
9437+
for _, c := range cases {
9438+
testutils.SucceedsSoon(t, func() error {
9439+
asUser(t, f, c.user, func(userDB *sqlutils.SQLRunner) {
9440+
tableIDs := observeTables(userDB, s.Codec)
9441+
if c.shouldSeeTable {
9442+
require.Containsf(t, tableIDs, tableID, "user %s should see table %d", c.user, tableID)
9443+
} else {
9444+
require.Emptyf(t, tableIDs, "user %s should not see any tables", c.user)
9445+
}
9446+
})
9447+
return nil
9448+
})
9449+
}
9450+
9451+
}
9452+
9453+
cdcTest(t, testFn, feedTestEnterpriseSinks)
94099454
}
94109455

94119456
func TestChangefeedCaseInsensitiveOpts(t *testing.T) {
@@ -12157,7 +12202,7 @@ func TestChangefeedProtobuf(t *testing.T) {
1215712202
)`)
1215812203
sqlDB.Exec(t, `
1215912204
INSERT INTO pricing VALUES
12160-
(1, 'Chair', 15.75, 2.500, ARRAY['Brown', 'Black']),
12205+
(1, 'Chair', 15.75, 2.500, ARRAY['Brown', 'Black']),
1216112206
(2, 'Table', 20.00, 1.23456789, ARRAY['Brown', 'Black'])`)
1216212207

1216312208
var opts []string

pkg/ccl/changefeedccl/helpers_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1815,6 +1815,9 @@ func ChangefeedJobPermissionsTestSetup(t *testing.T, s TestServer) {
18151815
`GRANT CHANGEFEED ON table_a TO userWithSomeGrants`,
18161816

18171817
`CREATE USER regularUser`,
1818+
1819+
`CREATE USER viewClusterMetadataUser`,
1820+
`GRANT SYSTEM VIEWCLUSTERMETADATA TO viewClusterMetadataUser`,
18181821
)
18191822
}
18201823

pkg/sql/crdb_internal.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1264,12 +1264,12 @@ var crdbInternalSessionBasedLeases = virtualSchemaView{
12641264
schema: `
12651265
CREATE VIEW crdb_internal.kv_session_based_leases (
12661266
desc_id,
1267-
version,
1267+
version,
12681268
sql_instance_id,
1269-
session_id,
1269+
session_id,
12701270
crdb_region
12711271
) AS (
1272-
SELECT desc_id, version, sql_instance_id, session_id, crdb_region
1272+
SELECT desc_id, version, sql_instance_id, session_id, crdb_region
12731273
FROM system.lease
12741274
);
12751275
`,
@@ -7264,6 +7264,13 @@ CREATE TABLE crdb_internal.active_range_feeds (
72647264
last_err STRING
72657265
);`,
72667266
populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error {
7267+
privileged, err := p.HasPrivilege(ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.VIEWCLUSTERMETADATA, p.User())
7268+
if err != nil {
7269+
return err
7270+
}
7271+
if !privileged {
7272+
return nil
7273+
}
72677274
return p.execCfg.DistSender.ForEachActiveRangeFeed(
72687275
func(rfCtx kvcoord.RangeFeedContext, rf kvcoord.PartialRangeFeed) error {
72697276
now := p.EvalContext().GetStmtTimestamp()

0 commit comments

Comments
 (0)