Skip to content

Commit d72e004

Browse files
authored
Merge pull request #151348 from asg0451/backport25.3-150004
release-25.3: changefeedccl: protobuf encoder randomized test for all types
2 parents 37d9e8b + ed751f2 commit d72e004

File tree

4 files changed

+74
-18
lines changed

4 files changed

+74
-18
lines changed

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ go_test(
325325
"//pkg/util/intsets",
326326
"//pkg/util/ioctx",
327327
"//pkg/util/json",
328+
"//pkg/util/keysutil",
328329
"//pkg/util/leaktest",
329330
"//pkg/util/log",
330331
"//pkg/util/log/eventpb",

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ import (
9191
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
9292
"github.com/cockroachdb/cockroach/pkg/util/hlc"
9393
"github.com/cockroachdb/cockroach/pkg/util/json"
94+
"github.com/cockroachdb/cockroach/pkg/util/keysutil"
9495
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
9596
"github.com/cockroachdb/cockroach/pkg/util/log"
9697
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
@@ -9192,22 +9193,66 @@ func TestDistSenderRangeFeedPopulatesVirtualTable(t *testing.T) {
91929193
defer leaktest.AfterTest(t)()
91939194
defer log.Scope(t).Close(t)
91949195

9195-
s, cleanup := makeServer(t)
9196-
defer cleanup()
9196+
scanner := keysutil.MakePrettyScanner(nil, nil)
91979197

9198-
sqlDB := sqlutils.MakeSQLRunner(s.DB)
9199-
sqlDB.Exec(t, `CREATE TABLE tbl (a INT, b STRING);`)
9200-
sqlDB.Exec(t, `INSERT INTO tbl VALUES (1, 'one'), (2, 'two'), (3, 'three');`)
9201-
sqlDB.Exec(t, `CREATE CHANGEFEED FOR tbl INTO 'null://';`)
9202-
9203-
var tableID int
9204-
sqlDB.QueryRow(t, "SELECT table_id FROM crdb_internal.tables WHERE name='tbl'").Scan(&tableID)
9205-
tableKey := s.Codec.TablePrefix(uint32(tableID))
9206-
9207-
numRangesQuery := fmt.Sprintf(
9208-
"SELECT count(*) FROM crdb_internal.active_range_feeds WHERE range_start LIKE '%s/%%'",
9209-
tableKey)
9210-
sqlDB.CheckQueryResultsRetry(t, numRangesQuery, [][]string{{"1"}})
9198+
observeTables := func(sqlDB *sqlutils.SQLRunner, codec keys.SQLCodec) []int {
9199+
rows := sqlDB.Query(t, "SELECT range_start FROM crdb_internal.active_range_feeds")
9200+
defer rows.Close()
9201+
var tableIDs []int
9202+
for rows.Next() {
9203+
var prettyKey string
9204+
require.NoError(t, rows.Scan(&prettyKey))
9205+
key, err := scanner.Scan(prettyKey)
9206+
require.NoError(t, err)
9207+
_, tableID, err := codec.DecodeTablePrefix(key)
9208+
require.NoError(t, err)
9209+
tableIDs = append(tableIDs, int(tableID))
9210+
}
9211+
return tableIDs
9212+
}
9213+
9214+
cases := []struct {
9215+
user string
9216+
shouldSeeTable bool
9217+
}{
9218+
{`feedCreator`, false},
9219+
{`regularUser`, false},
9220+
{`adminUser`, true},
9221+
{`viewClusterMetadataUser`, true},
9222+
}
9223+
9224+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
9225+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
9226+
9227+
// Creates several different tables, users, and roles for us to use.
9228+
ChangefeedJobPermissionsTestSetup(t, s)
9229+
9230+
var tableID int
9231+
sqlDB.QueryRow(t, "SELECT table_id FROM crdb_internal.tables WHERE name = 'table_a'").Scan(&tableID)
9232+
9233+
var cf cdctest.TestFeed
9234+
asUser(t, f, `feedCreator`, func(userDB *sqlutils.SQLRunner) {
9235+
cf = feed(t, f, `CREATE CHANGEFEED FOR table_a;`)
9236+
})
9237+
defer closeFeed(t, cf)
9238+
9239+
for _, c := range cases {
9240+
testutils.SucceedsSoon(t, func() error {
9241+
asUser(t, f, c.user, func(userDB *sqlutils.SQLRunner) {
9242+
tableIDs := observeTables(userDB, s.Codec)
9243+
if c.shouldSeeTable {
9244+
require.Containsf(t, tableIDs, tableID, "user %s should see table %d", c.user, tableID)
9245+
} else {
9246+
require.Emptyf(t, tableIDs, "user %s should not see any tables", c.user)
9247+
}
9248+
})
9249+
return nil
9250+
})
9251+
}
9252+
9253+
}
9254+
9255+
cdcTest(t, testFn, feedTestEnterpriseSinks)
92119256
}
92129257

92139258
func TestChangefeedCaseInsensitiveOpts(t *testing.T) {

pkg/ccl/changefeedccl/helpers_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1759,6 +1759,9 @@ func ChangefeedJobPermissionsTestSetup(t *testing.T, s TestServer) {
17591759
`GRANT CHANGEFEED ON table_a TO userWithSomeGrants`,
17601760

17611761
`CREATE USER regularUser`,
1762+
1763+
`CREATE USER viewClusterMetadataUser`,
1764+
`GRANT SYSTEM VIEWCLUSTERMETADATA TO viewClusterMetadataUser`,
17621765
)
17631766
}
17641767

pkg/sql/crdb_internal.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1400,12 +1400,12 @@ var crdbInternalSessionBasedLeases = virtualSchemaView{
14001400
schema: `
14011401
CREATE VIEW crdb_internal.kv_session_based_leases (
14021402
desc_id,
1403-
version,
1403+
version,
14041404
sql_instance_id,
1405-
session_id,
1405+
session_id,
14061406
crdb_region
14071407
) AS (
1408-
SELECT desc_id, version, sql_instance_id, session_id, crdb_region
1408+
SELECT desc_id, version, sql_instance_id, session_id, crdb_region
14091409
FROM system.lease
14101410
);
14111411
`,
@@ -7391,6 +7391,13 @@ CREATE TABLE crdb_internal.active_range_feeds (
73917391
last_err STRING
73927392
);`,
73937393
populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error {
7394+
privileged, err := p.HasPrivilege(ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.VIEWCLUSTERMETADATA, p.User())
7395+
if err != nil {
7396+
return err
7397+
}
7398+
if !privileged {
7399+
return nil
7400+
}
73947401
return p.execCfg.DistSender.ForEachActiveRangeFeed(
73957402
func(rfCtx kvcoord.RangeFeedContext, rf kvcoord.PartialRangeFeed) error {
73967403
now := p.EvalContext().GetStmtTimestamp()

0 commit comments

Comments
 (0)