Skip to content

Commit 8b03c8d

Browse files
authored
Merge pull request #151345 from asg0451/backport24.3-150004
release-24.3: changefeedccl: protobuf encoder randomized test for all types
2 parents 3cf5e6e + 5a8e27d commit 8b03c8d

File tree

4 files changed

+72
-16
lines changed

4 files changed

+72
-16
lines changed

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ go_test(
330330
"//pkg/util/intsets",
331331
"//pkg/util/ioctx",
332332
"//pkg/util/json",
333+
"//pkg/util/keysutil",
333334
"//pkg/util/leaktest",
334335
"//pkg/util/log",
335336
"//pkg/util/log/eventpb",

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ import (
8080
"github.com/cockroachdb/cockroach/pkg/util/cidr"
8181
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
8282
"github.com/cockroachdb/cockroach/pkg/util/hlc"
83+
"github.com/cockroachdb/cockroach/pkg/util/keysutil"
8384
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
8485
"github.com/cockroachdb/cockroach/pkg/util/log"
8586
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
@@ -7716,22 +7717,66 @@ func TestDistSenderRangeFeedPopulatesVirtualTable(t *testing.T) {
77167717
defer leaktest.AfterTest(t)()
77177718
defer log.Scope(t).Close(t)
77187719

7719-
s, cleanup := makeServer(t)
7720-
defer cleanup()
7720+
scanner := keysutil.MakePrettyScanner(nil, nil)
77217721

7722-
sqlDB := sqlutils.MakeSQLRunner(s.DB)
7723-
sqlDB.Exec(t, `CREATE TABLE tbl (a INT, b STRING);`)
7724-
sqlDB.Exec(t, `INSERT INTO tbl VALUES (1, 'one'), (2, 'two'), (3, 'three');`)
7725-
sqlDB.Exec(t, `CREATE CHANGEFEED FOR tbl INTO 'null://';`)
7726-
7727-
var tableID int
7728-
sqlDB.QueryRow(t, "SELECT table_id FROM crdb_internal.tables WHERE name='tbl'").Scan(&tableID)
7729-
tableKey := s.Codec.TablePrefix(uint32(tableID))
7730-
7731-
numRangesQuery := fmt.Sprintf(
7732-
"SELECT count(*) FROM crdb_internal.active_range_feeds WHERE range_start LIKE '%s/%%'",
7733-
tableKey)
7734-
sqlDB.CheckQueryResultsRetry(t, numRangesQuery, [][]string{{"1"}})
7722+
observeTables := func(sqlDB *sqlutils.SQLRunner, codec keys.SQLCodec) []int {
7723+
rows := sqlDB.Query(t, "SELECT range_start FROM crdb_internal.active_range_feeds")
7724+
defer rows.Close()
7725+
var tableIDs []int
7726+
for rows.Next() {
7727+
var prettyKey string
7728+
require.NoError(t, rows.Scan(&prettyKey))
7729+
key, err := scanner.Scan(prettyKey)
7730+
require.NoError(t, err)
7731+
_, tableID, err := codec.DecodeTablePrefix(key)
7732+
require.NoError(t, err)
7733+
tableIDs = append(tableIDs, int(tableID))
7734+
}
7735+
return tableIDs
7736+
}
7737+
7738+
cases := []struct {
7739+
user string
7740+
shouldSeeTable bool
7741+
}{
7742+
{`feedCreator`, false},
7743+
{`regularUser`, false},
7744+
{`adminUser`, true},
7745+
{`viewClusterMetadataUser`, true},
7746+
}
7747+
7748+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
7749+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
7750+
7751+
// Creates several different tables, users, and roles for us to use.
7752+
ChangefeedJobPermissionsTestSetup(t, s)
7753+
7754+
var tableID int
7755+
sqlDB.QueryRow(t, "SELECT table_id FROM crdb_internal.tables WHERE name = 'table_a'").Scan(&tableID)
7756+
7757+
var cf cdctest.TestFeed
7758+
asUser(t, f, `feedCreator`, func(userDB *sqlutils.SQLRunner) {
7759+
cf = feed(t, f, `CREATE CHANGEFEED FOR table_a;`)
7760+
})
7761+
defer closeFeed(t, cf)
7762+
7763+
for _, c := range cases {
7764+
testutils.SucceedsSoon(t, func() error {
7765+
asUser(t, f, c.user, func(userDB *sqlutils.SQLRunner) {
7766+
tableIDs := observeTables(userDB, s.Codec)
7767+
if c.shouldSeeTable {
7768+
require.Containsf(t, tableIDs, tableID, "user %s should see table %d", c.user, tableID)
7769+
} else {
7770+
require.Emptyf(t, tableIDs, "user %s should not see any tables", c.user)
7771+
}
7772+
})
7773+
return nil
7774+
})
7775+
}
7776+
7777+
}
7778+
7779+
cdcTest(t, testFn, feedTestEnterpriseSinks)
77357780
}
77367781

77377782
func TestChangefeedCaseInsensitiveOpts(t *testing.T) {

pkg/ccl/changefeedccl/helpers_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1407,6 +1407,9 @@ func ChangefeedJobPermissionsTestSetup(t *testing.T, s TestServer) {
14071407
`GRANT CHANGEFEED ON table_a TO userWithSomeGrants`,
14081408

14091409
`CREATE USER regularUser`,
1410+
1411+
`CREATE USER viewClusterMetadataUser`,
1412+
`GRANT SYSTEM VIEWCLUSTERMETADATA TO viewClusterMetadataUser`,
14101413
)
14111414
}
14121415

pkg/sql/crdb_internal.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7370,6 +7370,13 @@ CREATE TABLE crdb_internal.active_range_feeds (
73707370
last_err STRING
73717371
);`,
73727372
populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error {
7373+
privileged, err := p.HasPrivilege(ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.VIEWCLUSTERMETADATA, p.User())
7374+
if err != nil {
7375+
return err
7376+
}
7377+
if !privileged {
7378+
return nil
7379+
}
73737380
return p.execCfg.DistSender.ForEachActiveRangeFeed(
73747381
func(rfCtx kvcoord.RangeFeedContext, rf kvcoord.PartialRangeFeed) error {
73757382
var lastEvent tree.Datum
@@ -9517,7 +9524,7 @@ CREATE TABLE crdb_internal.logical_replication_node_processors (
95179524
state STRING,
95189525
recv_time INTERVAL,
95199526
last_recv_time INTERVAL,
9520-
ingest_time INTERVAL,
9527+
ingest_time INTERVAL,
95219528
flush_time INTERVAL,
95229529
flush_count INT,
95239530
flush_kvs INT,

0 commit comments

Comments
 (0)