Skip to content

Commit 834f678

Browse files
craig[bot]jasonlmfongpav-kv
committed
156854: api: allow admin user to view databases metadata r=jasonlmfong a=jasonlmfong the admin user currently has no view into all the databases due to its non-existence in the role_members table, this change adds a third condition to the where clause to allow the view Epic: None Release: None ----- api: refactor query logic for get databases metadata pull the left join condition on role_members out into the where clause Epic: None Release: None 158011: *: clarify log/state engine use r=arulajmani a=pav-kv This PR is one in the series of PRs clarifying the use of `storage.Engine`: whether it is the raft/log or state machine engine. Here we solidify that: - The sideloaded entry files are stored in the state engine `FS`. These are to be ingested into the state engine when the corresponding entry is applied. - Store liveness uses `LogEngine` because it needs a timely sync. More generally (in upcoming PRs), all `Store`-local keys are in the log engine. - Node liveness compactions in `startPeriodicLivenessCompaction` correctly use the `StateEngine` (works in the MVCC span). Just a drive-by style fixup. Epic: CRDB-55220 Co-authored-by: Jason Fong <[email protected]> Co-authored-by: Pavel Kalinnikov <[email protected]>
3 parents eb2dbf6 + 71a58cf + 5220bdd commit 834f678

File tree

8 files changed

+119
-60
lines changed

8 files changed

+119
-60
lines changed

pkg/kv/kvserver/logstore/sideload_disk.go

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
1818
"github.com/cockroachdb/cockroach/pkg/roachpb"
1919
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
20-
"github.com/cockroachdb/cockroach/pkg/storage"
2120
"github.com/cockroachdb/cockroach/pkg/storage/fs"
2221
"github.com/cockroachdb/cockroach/pkg/util/log"
2322
"github.com/cockroachdb/errors"
@@ -36,7 +35,7 @@ type DiskSideloadStorage struct {
3635
st *cluster.Settings
3736
limiter *rate.Limiter
3837
dir string
39-
eng storage.Engine
38+
fs *fs.Env
4039
}
4140

4241
func sideloadedPath(baseDir string, rangeID roachpb.RangeID) string {
@@ -57,15 +56,11 @@ func sideloadedPath(baseDir string, rangeID roachpb.RangeID) string {
5756
// NewDiskSideloadStorage creates a SideloadStorage for a given replica, stored
5857
// in the specified engine.
5958
func NewDiskSideloadStorage(
60-
st *cluster.Settings,
61-
rangeID roachpb.RangeID,
62-
baseDir string,
63-
limiter *rate.Limiter,
64-
eng storage.Engine,
59+
st *cluster.Settings, rangeID roachpb.RangeID, baseDir string, limiter *rate.Limiter, fs *fs.Env,
6560
) *DiskSideloadStorage {
6661
return &DiskSideloadStorage{
6762
dir: sideloadedPath(baseDir, rangeID),
68-
eng: eng,
63+
fs: fs,
6964
st: st,
7065
limiter: limiter,
7166
}
@@ -86,14 +81,14 @@ func (ss *DiskSideloadStorage) Put(
8681
for {
8782
// Use 0644 since that's what RocksDB uses:
8883
// https://github.com/facebook/rocksdb/blob/56656e12d67d8a63f1e4c4214da9feeec2bd442b/env/env_posix.cc#L171
89-
if err := kvserverbase.WriteFileSyncing(ctx, filename, contents, ss.eng.Env(), 0644, ss.st, ss.limiter, fs.PebbleIngestionWriteCategory); err == nil {
84+
if err := kvserverbase.WriteFileSyncing(ctx, filename, contents, ss.fs, 0644, ss.st, ss.limiter, fs.PebbleIngestionWriteCategory); err == nil {
9085
return nil
9186
} else if !oserror.IsNotExist(err) {
9287
return err
9388
}
9489
// Ensure that ss.dir exists. The filename() is placed directly in ss.dir,
9590
// so the next loop iteration should succeed.
96-
if err := mkdirAllAndSyncParents(ss.eng.Env(), ss.dir, os.ModePerm); err != nil {
91+
if err := mkdirAllAndSyncParents(ss.fs, ss.dir, os.ModePerm); err != nil {
9792
return err
9893
}
9994
continue
@@ -102,7 +97,7 @@ func (ss *DiskSideloadStorage) Put(
10297

10398
// Sync implements SideloadStorage.
10499
func (ss *DiskSideloadStorage) Sync() error {
105-
dir, err := ss.eng.Env().OpenDir(ss.dir)
100+
dir, err := ss.fs.OpenDir(ss.dir)
106101
// The directory can be missing because we did not Put() any entry to it yet,
107102
// or it has been removed by TruncateTo() or Clear().
108103
//
@@ -127,7 +122,7 @@ func (ss *DiskSideloadStorage) Get(
127122
ctx context.Context, index kvpb.RaftIndex, term kvpb.RaftTerm,
128123
) ([]byte, error) {
129124
filename := ss.filename(ctx, index, term)
130-
b, err := fs.ReadFile(ss.eng.Env(), filename)
125+
b, err := fs.ReadFile(ss.fs, filename)
131126
if oserror.IsNotExist(err) {
132127
return nil, errSideloadedFileNotFound
133128
}
@@ -155,7 +150,7 @@ func (ss *DiskSideloadStorage) Purge(
155150
}
156151

157152
func (ss *DiskSideloadStorage) fileSize(filename string) (int64, error) {
158-
info, err := ss.eng.Env().Stat(filename)
153+
info, err := ss.fs.Stat(filename)
159154
if err != nil {
160155
if oserror.IsNotExist(err) {
161156
return 0, errSideloadedFileNotFound
@@ -170,7 +165,7 @@ func (ss *DiskSideloadStorage) purgeFile(ctx context.Context, filename string) (
170165
if err != nil {
171166
return 0, err
172167
}
173-
if err := ss.eng.Env().Remove(filename); err != nil {
168+
if err := ss.fs.Remove(filename); err != nil {
174169
if oserror.IsNotExist(err) {
175170
return 0, errSideloadedFileNotFound
176171
}
@@ -181,7 +176,7 @@ func (ss *DiskSideloadStorage) purgeFile(ctx context.Context, filename string) (
181176

182177
// Clear implements SideloadStorage.
183178
func (ss *DiskSideloadStorage) Clear(_ context.Context) error {
184-
return ss.eng.Env().RemoveAll(ss.dir)
179+
return ss.fs.RemoveAll(ss.dir)
185180
}
186181

187182
// TruncateTo implements SideloadStorage.
@@ -207,7 +202,7 @@ func (ss *DiskSideloadStorage) TruncateTo(ctx context.Context, lastIndex kvpb.Ra
207202
if deletedAll {
208203
// The directory may not exist, or it may exist and have been empty.
209204
// Not worth trying to figure out which one, just try to delete.
210-
err := ss.eng.Env().Remove(ss.dir)
205+
err := ss.fs.Remove(ss.dir)
211206
if err != nil && !oserror.IsNotExist(err) {
212207
// TODO(pavelkalinnikov): this is possible because deletedAll can be left
213208
// true despite existence of files with index < from which are skipped.
@@ -246,7 +241,7 @@ func (ss *DiskSideloadStorage) forEach(
246241
ctx context.Context, visit func(index kvpb.RaftIndex, filename string) (bool, error),
247242
) error {
248243
// TODO(pavelkalinnikov): consider making the List method iterative.
249-
matches, err := ss.eng.Env().List(ss.dir)
244+
matches, err := ss.fs.List(ss.dir)
250245
if oserror.IsNotExist(err) {
251246
return nil // nothing to do
252247
} else if err != nil {

pkg/kv/kvserver/logstore/sideload_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func newTestingSideloadStorage(eng storage.Engine) *DiskSideloadStorage {
8686
return NewDiskSideloadStorage(
8787
cluster.MakeTestingClusterSettings(), 1,
8888
filepath.Join(eng.GetAuxiliaryDir(), "fake", "testing", "dir"),
89-
rate.NewLimiter(rate.Inf, math.MaxInt64), eng)
89+
rate.NewLimiter(rate.Inf, math.MaxInt64), eng.Env())
9090
}
9191

9292
// TODO(pavelkalinnikov): give these tests a good refactor.
@@ -96,7 +96,7 @@ func testSideloadingSideloadedStorage(t *testing.T, eng storage.Engine) {
9696

9797
assertExists := func(exists bool) {
9898
t.Helper()
99-
_, err := ss.eng.Env().Stat(ss.dir)
99+
_, err := ss.fs.Stat(ss.dir)
100100
if !exists {
101101
require.True(t, oserror.IsNotExist(err), err)
102102
} else {
@@ -537,7 +537,7 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) {
537537
if test.size != stats.SideloadedBytes {
538538
t.Fatalf("expected %d sideloadedSize, but found %d", test.size, stats.SideloadedBytes)
539539
}
540-
actKeys, err := sideloaded.eng.Env().List(sideloaded.Dir())
540+
actKeys, err := sideloaded.fs.List(sideloaded.Dir())
541541
if oserror.IsNotExist(err) {
542542
t.Log("swallowing IsNotExist")
543543
err = nil

pkg/kv/kvserver/replica_init.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ func newUninitializedReplicaWithoutRaftGroup(store *Store, id roachpb.FullReplic
218218
// can be ingested to the state machine locally, when being applied.
219219
store.StateEngine().GetAuxiliaryDir(),
220220
store.limiters.BulkIOWriteRate,
221-
store.StateEngine(),
221+
store.StateEngine().Env(),
222222
)
223223
r.logStorage = &replicaLogStorage{
224224
ctx: r.raftCtx,

pkg/kv/kvserver/replica_raftlog_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func newReplicaLogStorageTest(t *testing.T) *replicaLogStorageTest {
102102
st := cluster.MakeTestingClusterSettings()
103103
eng := storage.NewDefaultInMemForTesting()
104104
sideloaded := logstore.NewDiskSideloadStorage(st, rangeID,
105-
eng.GetAuxiliaryDir(), nil /* limiter: unused */, eng)
105+
eng.GetAuxiliaryDir(), nil /* limiter: unused */, eng.Env())
106106

107107
rt.ls = &replicaLogStorage{
108108
ctx: context.Background(),

pkg/kv/kvserver/store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2253,7 +2253,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
22532253

22542254
// Create the Store Liveness SupportManager.
22552255
sm := storeliveness.NewSupportManager(
2256-
slpb.StoreIdent{NodeID: s.nodeDesc.NodeID, StoreID: s.StoreID()}, s.StateEngine(),
2256+
slpb.StoreIdent{NodeID: s.nodeDesc.NodeID, StoreID: s.StoreID()}, s.LogEngine(),
22572257
s.cfg.StoreLiveness.Options, s.cfg.Settings, s.stopper, s.cfg.Clock,
22582258
s.cfg.StoreLiveness.HeartbeatTicker, s.cfg.StoreLiveness.Transport,
22592259
s.cfg.StoreLiveness.SupportManagerKnobs(),

pkg/server/api_v2_databases_metadata.go

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -465,15 +465,23 @@ func getTableMetadataBaseQuery(userName string) *safesql.Query {
465465
FROM system.table_metadata tbm,
466466
(SELECT "sql.stats.automatic_collection.enabled" as auto_stats_enabled
467467
FROM [SHOW CLUSTER SETTING sql.stats.automatic_collection.enabled]) csc
468-
LEFT JOIN system.role_members rm ON rm.role = 'admin' AND member = $
469-
WHERE (rm.role = 'admin' OR tbm.db_name IN (
468+
WHERE (
469+
$ = 'admin'
470+
OR EXISTS (
471+
SELECT 1
472+
FROM system.role_members rm
473+
WHERE rm.member = $
474+
AND rm.role = 'admin'
475+
)
476+
OR tbm.db_name IN (
470477
SELECT cdp.database_name
471478
FROM "".crdb_internal.cluster_database_privileges cdp
472479
WHERE (grantee = $ OR grantee = 'public')
473480
AND privilege_type = 'CONNECT'
474-
))
481+
)
482+
)
475483
AND tbm.table_type = 'TABLE'
476-
`, userName, userName)
484+
`, userName, userName, userName)
477485

478486
return query
479487
}
@@ -859,22 +867,30 @@ func getDatabaseMetadataBaseQuery(userName string) *safesql.Query {
859867
COALESCE(s.store_ids, ARRAY[]) as store_ids,
860868
count(*) OVER() as total_row_count
861869
FROM system.namespace n
862-
LEFT JOIN system.table_metadata tbm ON n.id = tbm.db_id
863-
LEFT JOIN system.role_members rm ON rm.role = 'admin' AND member = $
870+
LEFT JOIN system.table_metadata tbm ON n.id = tbm.db_id
864871
LEFT JOIN (
865872
SELECT db_id, array_agg(DISTINCT unnested_ids) as store_ids
866873
FROM system.table_metadata, unnest(store_ids) as unnested_ids
867874
GROUP BY db_id
868875
) s ON s.db_id = tbm.db_id
869-
WHERE (rm.role = 'admin' OR n.name IN (
870-
SELECT cdp.database_name
871-
FROM "".crdb_internal.cluster_database_privileges cdp
872-
WHERE (grantee = $ OR grantee = 'public')
873-
AND privilege_type = 'CONNECT'
874-
))
876+
WHERE (
877+
$ = 'admin'
878+
OR EXISTS (
879+
SELECT 1
880+
FROM system.role_members rm
881+
WHERE rm.member = $
882+
AND rm.role = 'admin'
883+
)
884+
OR n.name IN (
885+
SELECT cdp.database_name
886+
FROM "".crdb_internal.cluster_database_privileges AS cdp
887+
WHERE (cdp.grantee = $ OR cdp.grantee = 'public')
888+
AND cdp.privilege_type = 'CONNECT'
889+
)
890+
)
875891
AND n."parentID" = 0
876892
AND n."parentSchemaID" = 0
877-
`, userName, userName)
893+
`, userName, userName, userName)
878894

879895
return query
880896
}

pkg/server/api_v2_databases_metadata_test.go

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,23 @@ func TestGetTableMetadataWithDetails(t *testing.T) {
388388
t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet)
389389
require.NotEmpty(t, resp.Metadata)
390390
require.Contains(t, resp.CreateStatement, table.tableName)
391+
392+
// Test with dedicated admin user (user named 'admin').
393+
adminUsername := username.AdminRoleName()
394+
adminClient, _, err := ts.GetAuthenticatedHTTPClientAndCookie(adminUsername, false, 1)
395+
require.NoError(t, err)
396+
397+
// Admin user should be able to access the table even without explicit CONNECT grants.
398+
resp = makeApiRequest[tableMetadataWithDetailsResponse](
399+
t, adminClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet)
400+
require.NotEmpty(t, resp.Metadata)
401+
require.Contains(t, resp.CreateStatement, table.tableName)
402+
403+
// Admin user should still have access even after revoking public access was already done above.
404+
resp = makeApiRequest[tableMetadataWithDetailsResponse](
405+
t, adminClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet)
406+
require.NotEmpty(t, resp.Metadata)
407+
require.Contains(t, resp.CreateStatement, table.tableName)
391408
})
392409

393410
t.Run("non GET method 405 error", func(t *testing.T) {
@@ -508,14 +525,14 @@ func TestGetDbMetadata(t *testing.T) {
508525

509526
// All databases grant CONNECT to public by default, so the user should see all databases.
510527
// There should be 4: defaultdb, postgres, new_test_db_1, and new_test_db_2.
511-
// The system db should not be included, since it doe snot have CONNECT granted to public.
528+
// The system db should not be included, since it does not have CONNECT granted to public.
512529
uri := "/api/v2/database_metadata/?sortBy=name"
513530
mdResp := makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet)
514531
verifyDatabases([]string{"defaultdb", "new_test_db_1", "new_test_db_2", "postgres"}, mdResp.Results)
515532

516533
// Revoke connect access for public from db1.
517534
conn.Exec(t, fmt.Sprintf("REVOKE CONNECT ON DATABASE %s FROM %s", db1Name, "public"))
518-
// Asser that user no longer sees db1.
535+
// Assert that user no longer sees db1.
519536
mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet)
520537
verifyDatabases([]string{"defaultdb", "new_test_db_2", "postgres"}, mdResp.Results)
521538

@@ -535,6 +552,23 @@ func TestGetDbMetadata(t *testing.T) {
535552
conn.Exec(t, fmt.Sprintf("GRANT admin TO %s", sessionUsername.Normalized()))
536553
mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, userClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet)
537554
verifyDatabases([]string{"defaultdb", "new_test_db_1", "new_test_db_2", "postgres", "system"}, mdResp.Results)
555+
556+
// Test with dedicated admin user (user named 'admin').
557+
adminUsername := username.AdminRoleName()
558+
adminClient, _, err := ts.GetAuthenticatedHTTPClientAndCookie(adminUsername, false, 1)
559+
require.NoError(t, err)
560+
561+
// The admin user should see all databases even without explicit CONNECT grants.
562+
// There should be 5: defaultdb, postgres, new_test_db_1, and new_test_db_2, system.
563+
mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, adminClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet)
564+
verifyDatabases([]string{"defaultdb", "new_test_db_1", "new_test_db_2", "postgres", "system"}, mdResp.Results)
565+
566+
// Revoke CONNECT on public from both test databases to ensure the admin user
567+
// can still see them.
568+
conn.Exec(t, fmt.Sprintf("REVOKE CONNECT ON DATABASE %s FROM %s", db1Name, "public"))
569+
conn.Exec(t, fmt.Sprintf("REVOKE CONNECT ON DATABASE %s FROM %s", db2Name, "public"))
570+
mdResp = makeApiRequest[PaginatedResponse[[]dbMetadata]](t, adminClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet)
571+
verifyDatabases([]string{"defaultdb", "new_test_db_1", "new_test_db_2", "postgres", "system"}, mdResp.Results)
538572
})
539573

540574
t.Run("pagination", func(t *testing.T) {
@@ -700,6 +734,26 @@ func TestGetDbMetadataWithDetails(t *testing.T) {
700734
// Assert that user can see system db.
701735
resp = makeApiRequest[dbMetadataWithDetailsResponse](t, userClient, ts.AdminURL().WithPath(systemUri).String(), http.MethodGet)
702736
require.Equal(t, int64(1), resp.Metadata.DbId)
737+
738+
// Test with dedicated admin user (user named 'admin').
739+
adminUsername := username.AdminRoleName()
740+
adminClient, _, err := ts.GetAuthenticatedHTTPClientAndCookie(adminUsername, false, 1)
741+
require.NoError(t, err)
742+
743+
// Admin user should be able to access the database even without explicit CONNECT grants.
744+
resp = makeApiRequest[dbMetadataWithDetailsResponse](
745+
t, adminClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet)
746+
require.Equal(t, int64(db1Id), resp.Metadata.DbId)
747+
748+
// Admin user should also see the system database.
749+
resp = makeApiRequest[dbMetadataWithDetailsResponse](
750+
t, adminClient, ts.AdminURL().WithPath(systemUri).String(), http.MethodGet)
751+
require.Equal(t, int64(1), resp.Metadata.DbId)
752+
753+
// Admin user should still have access even after public access was already revoked above.
754+
resp = makeApiRequest[dbMetadataWithDetailsResponse](
755+
t, adminClient, ts.AdminURL().WithPath(uri).String(), http.MethodGet)
756+
require.Equal(t, int64(db1Id), resp.Metadata.DbId)
703757
})
704758

705759
t.Run("non GET method 405 error", func(t *testing.T) {

pkg/server/node.go

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1158,29 +1158,23 @@ func (n *Node) startPeriodicLivenessCompaction(
11581158
_ = n.stores.VisitStores(func(store *kvserver.Store) error {
11591159
store.VisitReplicas(func(repl *kvserver.Replica) bool {
11601160
span := repl.Desc().KeySpan().AsRawSpanWithNoLocals()
1161-
if keys.NodeLivenessSpan.Overlaps(span) {
1162-
1163-
// The CompactRange() method expects the start and end keys to be
1164-
// encoded.
1165-
startEngineKey :=
1166-
storage.EngineKey{
1167-
Key: span.Key,
1168-
}.Encode()
1169-
1170-
endEngineKey :=
1171-
storage.EngineKey{
1172-
Key: span.EndKey,
1173-
}.Encode()
1174-
1175-
timeBeforeCompaction := timeutil.Now()
1176-
if err := store.StateEngine().CompactRange(
1177-
context.Background(), startEngineKey, endEngineKey); err != nil {
1178-
log.Dev.Errorf(ctx, "failed compacting liveness replica: %+v with error: %s", repl, err)
1179-
}
1180-
1181-
log.Dev.Infof(ctx, "finished compacting liveness replica: %+v and it took: %+v",
1182-
repl, timeutil.Since(timeBeforeCompaction))
1161+
if !keys.NodeLivenessSpan.Overlaps(span) {
1162+
return true
11831163
}
1164+
1165+
// CompactRange() expects the start and end keys to be encoded.
1166+
startEngineKey := storage.EngineKey{Key: span.Key}.Encode()
1167+
endEngineKey := storage.EngineKey{Key: span.EndKey}.Encode()
1168+
1169+
timeBeforeCompaction := timeutil.Now()
1170+
if err := store.StateEngine().CompactRange(
1171+
context.Background(), startEngineKey, endEngineKey,
1172+
); err != nil {
1173+
log.Dev.Errorf(ctx, "failed compacting liveness replica: %+v with error: %s", repl, err)
1174+
}
1175+
1176+
log.Dev.Infof(ctx, "finished compacting liveness replica: %+v and it took: %+v",
1177+
repl, timeutil.Since(timeBeforeCompaction))
11841178
return true
11851179
})
11861180
return nil

0 commit comments

Comments
 (0)