Skip to content

Commit 08dbd7e

Browse files
craig[bot]Rahul Aggarwal
authored andcommitted
sql: Add new generator crdb_internal.scan_storage_internal_keys()
This new builtin is used to gather specific pebble metrics for a node and store id (within an given keyspan). The builtin returns information about the different types of keys (including snapshot pinned keys) as well as bytes. Informs: cockroachdb#94659 Release note: None
1 parent a7f0434 commit 08dbd7e

File tree

14 files changed

+278
-10
lines changed

14 files changed

+278
-10
lines changed

pkg/kv/kvserver/api.proto

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,18 @@ message GetTableMetricsResponse{
8989
repeated storage.enginepb.SSTableMetricsInfo table_metrics = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "TableMetrics"];
9090
}
9191

92+
// ScanStorageInternalKeysRequest retrieves metrics about keys within a range belonging
93+
// to a particular node and store.
94+
message ScanStorageInternalKeysRequest {
95+
StoreRequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
96+
roachpb.Span span = 2 [(gogoproto.nullable) = false];
97+
int64 MegabytesPerSecond = 3;
98+
}
99+
100+
message ScanStorageInternalKeysResponse {
101+
repeated storage.enginepb.StorageInternalKeysMetrics advanced_metrics = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "AdvancedPebbleMetrics"];
102+
}
103+
92104
message CompactEngineSpanResponse {
93105
}
94106

pkg/kv/kvserver/storage_engine_client.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,33 @@ func (c *StorageEngineClient) GetTableMetrics(
7676
return resp.TableMetrics, nil
7777
}
7878

79+
// ScanStorageInternalKeys is a tree.ScanStorageInternalKeys
80+
func (c *StorageEngineClient) ScanStorageInternalKeys(
81+
ctx context.Context, nodeID, storeID int32, startKey, endKey []byte, megabytesPerSecond int64,
82+
) ([]enginepb.StorageInternalKeysMetrics, error) {
83+
conn, err := c.nd.Dial(ctx, roachpb.NodeID(nodeID), rpc.DefaultClass)
84+
if err != nil {
85+
return []enginepb.StorageInternalKeysMetrics{}, errors.Wrapf(err, "could not dial node ID %d", nodeID)
86+
}
87+
88+
client := NewPerStoreClient(conn)
89+
req := &ScanStorageInternalKeysRequest{
90+
StoreRequestHeader: StoreRequestHeader{
91+
NodeID: roachpb.NodeID(nodeID),
92+
StoreID: roachpb.StoreID(storeID),
93+
},
94+
Span: roachpb.Span{Key: roachpb.Key(startKey), EndKey: roachpb.Key(endKey)},
95+
MegabytesPerSecond: megabytesPerSecond,
96+
}
97+
98+
resp, err := client.ScanStorageInternalKeys(ctx, req)
99+
100+
if err != nil {
101+
return []enginepb.StorageInternalKeysMetrics{}, err
102+
}
103+
return resp.AdvancedPebbleMetrics, nil
104+
}
105+
79106
// SetCompactionConcurrency is a tree.CompactionConcurrencyFunc.
80107
func (c *StorageEngineClient) SetCompactionConcurrency(
81108
ctx context.Context, nodeID, storeID int32, compactionConcurrency uint64,

pkg/kv/kvserver/storage_services.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,6 @@ service PerReplica {
4444
service PerStore {
4545
rpc CompactEngineSpan(cockroach.kv.kvserver.CompactEngineSpanRequest) returns (cockroach.kv.kvserver.CompactEngineSpanResponse) {}
4646
rpc GetTableMetrics(cockroach.kv.kvserver.GetTableMetricsRequest) returns (cockroach.kv.kvserver.GetTableMetricsResponse) {}
47+
rpc ScanStorageInternalKeys(cockroach.kv.kvserver.ScanStorageInternalKeysRequest) returns (cockroach.kv.kvserver.ScanStorageInternalKeysResponse) {}
4748
rpc SetCompactionConcurrency(cockroach.kv.kvserver.CompactionConcurrencyRequest) returns (cockroach.kv.kvserver.CompactionConcurrencyResponse) {}
4849
}

pkg/kv/kvserver/stores_server.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,24 @@ func (is Server) GetTableMetrics(
173173
return resp, err
174174
}
175175

176+
func (is Server) ScanStorageInternalKeys(
177+
ctx context.Context, req *ScanStorageInternalKeysRequest,
178+
) (*ScanStorageInternalKeysResponse, error) {
179+
resp := &ScanStorageInternalKeysResponse{}
180+
err := is.execStoreCommand(ctx, req.StoreRequestHeader,
181+
func(ctx context.Context, s *Store) error {
182+
metrics, err := s.TODOEngine().ScanStorageInternalKeys(req.Span.Key, req.Span.EndKey, req.MegabytesPerSecond)
183+
184+
if err != nil {
185+
return err
186+
}
187+
188+
resp.AdvancedPebbleMetrics = metrics
189+
return nil
190+
})
191+
return resp, err
192+
}
193+
176194
// SetCompactionConcurrency implements PerStoreServer. It changes the compaction
177195
// concurrency of a store. While SetCompactionConcurrency is safe for concurrent
178196
// use, it adds uncertainty about the compaction concurrency actually set on

pkg/server/server_sql.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -961,16 +961,17 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
961961
ClientCertExpirationCache: security.NewClientCertExpirationCache(
962962
ctx, cfg.Settings, cfg.stopper, &timeutil.DefaultTimeSource{}, rootSQLMemoryMonitor,
963963
),
964-
RootMemoryMonitor: rootSQLMemoryMonitor,
965-
TestingKnobs: sqlExecutorTestingKnobs,
966-
CompactEngineSpanFunc: storageEngineClient.CompactEngineSpan,
967-
CompactionConcurrencyFunc: storageEngineClient.SetCompactionConcurrency,
968-
GetTableMetricsFunc: storageEngineClient.GetTableMetrics,
969-
TraceCollector: traceCollector,
970-
TenantUsageServer: cfg.tenantUsageServer,
971-
KVStoresIterator: cfg.kvStoresIterator,
972-
InspectzServer: cfg.inspectzServer,
973-
RangeDescIteratorFactory: cfg.rangeDescIteratorFactory,
964+
RootMemoryMonitor: rootSQLMemoryMonitor,
965+
TestingKnobs: sqlExecutorTestingKnobs,
966+
CompactEngineSpanFunc: storageEngineClient.CompactEngineSpan,
967+
CompactionConcurrencyFunc: storageEngineClient.SetCompactionConcurrency,
968+
GetTableMetricsFunc: storageEngineClient.GetTableMetrics,
969+
ScanStorageInternalKeysFunc: storageEngineClient.ScanStorageInternalKeys,
970+
TraceCollector: traceCollector,
971+
TenantUsageServer: cfg.tenantUsageServer,
972+
KVStoresIterator: cfg.kvStoresIterator,
973+
InspectzServer: cfg.inspectzServer,
974+
RangeDescIteratorFactory: cfg.rangeDescIteratorFactory,
974975
SyntheticPrivilegeCache: syntheticprivilegecache.New(
975976
cfg.Settings, cfg.stopper, cfg.db,
976977
serverCacheMemoryMonitor.MakeBoundAccount(),

pkg/sql/exec_util.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1383,6 +1383,10 @@ type ExecutorConfig struct {
13831383
// overlap with a key range for a specified node and store.
13841384
GetTableMetricsFunc eval.GetTableMetricsFunc
13851385

1386+
// ScanStorageInternalKeys is used to gather information about the types of
1387+
// keys (including snapshot pinned keys) at each level of a node store.
1388+
ScanStorageInternalKeysFunc eval.ScanStorageInternalKeysFunc
1389+
13861390
// TraceCollector is used to contact all live nodes in the cluster, and
13871391
// collect trace spans from their inflight node registries.
13881392
TraceCollector *collector.TraceCollector

pkg/sql/planner.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ func (evalCtx *extendedEvalContext) copyFromExecCfg(execCfg *ExecutorConfig) {
127127
evalCtx.CompactEngineSpan = execCfg.CompactEngineSpanFunc
128128
evalCtx.SetCompactionConcurrency = execCfg.CompactionConcurrencyFunc
129129
evalCtx.GetTableMetrics = execCfg.GetTableMetricsFunc
130+
evalCtx.ScanStorageInternalKeys = execCfg.ScanStorageInternalKeysFunc
130131
evalCtx.TestingKnobs = execCfg.EvalContextTestingKnobs
131132
evalCtx.ClusterID = execCfg.NodeInfo.LogicalClusterID()
132133
evalCtx.ClusterName = execCfg.RPCContext.ClusterName()

pkg/sql/sem/builtins/fixed_oids.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2440,6 +2440,8 @@ var builtinOidsArray = []string{
24402440
2467: `crdb_internal.request_statement_bundle(stmtFingerprint: string, planGist: string, samplingProbability: float, minExecutionLatency: interval, expiresAfter: interval) -> bool`,
24412441
2468: `crdb_internal.request_statement_bundle(stmtFingerprint: string, planGist: string, antiPlanGist: bool, samplingProbability: float, minExecutionLatency: interval, expiresAfter: interval) -> bool`,
24422442
2469: `crdb_internal.is_system_table_key(raw_key: bytes) -> bool`,
2443+
2470: `crdb_internal.scan_storage_internal_keys(node_id: int, store_id: int, start_key: bytes, end_key: bytes) -> tuple{int AS level, int AS node_id, int AS store_id, int AS snapshot_pinned_keys, int AS snapshot_pinned_keys_bytes, int AS point_key_delete_count, int AS point_key_set_count, int AS range_delete_count, int AS range_key_set_count, int AS range_key_delete_count}`,
2444+
2471: `crdb_internal.scan_storage_internal_keys(node_id: int, store_id: int, start_key: bytes, end_key: bytes, mb_per_second: int4) -> tuple{int AS level, int AS node_id, int AS store_id, int AS snapshot_pinned_keys, int AS snapshot_pinned_keys_bytes, int AS point_key_delete_count, int AS point_key_set_count, int AS range_delete_count, int AS range_key_set_count, int AS range_key_delete_count}`,
24432445
}
24442446

24452447
var builtinOidsBySignature map[string]oid.Oid

pkg/sql/sem/builtins/generator_builtins.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,6 +684,36 @@ The last argument is a JSONB object containing the following optional fields:
684684
volatility.Stable,
685685
),
686686
),
687+
"crdb_internal.scan_storage_internal_keys": makeBuiltin(
688+
tree.FunctionProperties{
689+
Category: builtinconstants.CategorySystemInfo,
690+
},
691+
makeGeneratorOverload(
692+
tree.ParamTypes{
693+
{Name: "node_id", Typ: types.Int},
694+
{Name: "store_id", Typ: types.Int},
695+
{Name: "start_key", Typ: types.Bytes},
696+
{Name: "end_key", Typ: types.Bytes},
697+
},
698+
storageInternalKeysGeneratorType,
699+
makeStorageInternalKeysGenerator,
700+
"Scans a store's storage engine, computing statistics describing the internal keys within the span [start_key, end_key). This function is rate limited to 10 megabytes per second.",
701+
volatility.Volatile,
702+
),
703+
makeGeneratorOverload(
704+
tree.ParamTypes{
705+
{Name: "node_id", Typ: types.Int},
706+
{Name: "store_id", Typ: types.Int},
707+
{Name: "start_key", Typ: types.Bytes},
708+
{Name: "end_key", Typ: types.Bytes},
709+
{Name: "mb_per_second", Typ: types.Int4},
710+
},
711+
storageInternalKeysGeneratorType,
712+
makeStorageInternalKeysGenerator,
713+
"Scans a store's storage engine, computing statistics describing the internal keys within the span [start_key, end_key).",
714+
volatility.Volatile,
715+
),
716+
),
687717
}
688718

689719
var decodePlanGistGeneratorType = types.String
@@ -3272,6 +3302,116 @@ func makeTableMetricsGenerator(
32723302
return newTableMetricsIterator(evalCtx, nodeID, storeID, start, end), nil
32733303
}
32743304

3305+
type storageInternalKeysIterator struct {
3306+
metrics []enginepb.StorageInternalKeysMetrics
3307+
evalCtx *eval.Context
3308+
3309+
iterIdx int
3310+
nodeID int32
3311+
storeID int32
3312+
megabytesPerSecond int64
3313+
start []byte
3314+
end []byte
3315+
}
3316+
3317+
var storageInternalKeysGeneratorType = types.MakeLabeledTuple(
3318+
[]*types.T{types.Int, types.Int, types.Int, types.Int, types.Int, types.Int, types.Int, types.Int,
3319+
types.Int, types.Int},
3320+
[]string{
3321+
"level",
3322+
"node_id",
3323+
"store_id",
3324+
"snapshot_pinned_keys",
3325+
"snapshot_pinned_keys_bytes",
3326+
"point_key_delete_count",
3327+
"point_key_set_count",
3328+
"range_delete_count",
3329+
"range_key_set_count",
3330+
"range_key_delete_count",
3331+
},
3332+
)
3333+
3334+
var _ eval.ValueGenerator = (*storageInternalKeysIterator)(nil)
3335+
3336+
func newStorageInternalKeysGenerator(
3337+
evalCtx *eval.Context, nodeID, storeID int32, start, end []byte, megaBytesPerSecond int64,
3338+
) *storageInternalKeysIterator {
3339+
return &storageInternalKeysIterator{evalCtx: evalCtx, nodeID: nodeID, storeID: storeID, start: start, end: end, megabytesPerSecond: megaBytesPerSecond}
3340+
}
3341+
3342+
// Start implements the tree.ValueGenerator interface.
3343+
func (s *storageInternalKeysIterator) Start(ctx context.Context, _ *kv.Txn) error {
3344+
var err error
3345+
s.metrics, err = s.evalCtx.ScanStorageInternalKeys(ctx, s.nodeID, s.storeID, s.start, s.end, s.megabytesPerSecond)
3346+
if err != nil {
3347+
err = errors.Wrapf(err, "getting table metrics for node %d store %d", s.nodeID, s.storeID)
3348+
}
3349+
3350+
return err
3351+
}
3352+
3353+
// Next implements the tree.ValueGenerator interface.
3354+
func (s *storageInternalKeysIterator) Next(_ context.Context) (bool, error) {
3355+
s.iterIdx++
3356+
return s.iterIdx <= len(s.metrics), nil
3357+
}
3358+
3359+
// Values implements the tree.ValueGenerator interface.
3360+
func (s *storageInternalKeysIterator) Values() (tree.Datums, error) {
3361+
metricsInfo := s.metrics[s.iterIdx-1]
3362+
levelDatum := tree.DNull
3363+
3364+
if metricsInfo.Level != -1 {
3365+
levelDatum = tree.NewDInt(tree.DInt(metricsInfo.Level))
3366+
}
3367+
3368+
return tree.Datums{
3369+
levelDatum,
3370+
tree.NewDInt(tree.DInt(s.nodeID)),
3371+
tree.NewDInt(tree.DInt(s.storeID)),
3372+
tree.NewDInt(tree.DInt(metricsInfo.SnapshotPinnedKeys)),
3373+
tree.NewDInt(tree.DInt(metricsInfo.SnapshotPinnedKeysBytes)),
3374+
tree.NewDInt(tree.DInt(metricsInfo.PointKeyDeleteCount)),
3375+
tree.NewDInt(tree.DInt(metricsInfo.PointKeySetCount)),
3376+
tree.NewDInt(tree.DInt(metricsInfo.RangeDeleteCount)),
3377+
tree.NewDInt(tree.DInt(metricsInfo.RangeKeySetCount)),
3378+
tree.NewDInt(tree.DInt(metricsInfo.RangeKeyDeleteCount)),
3379+
}, nil
3380+
}
3381+
3382+
// Close implements the tree.ValueGenerator interface.
3383+
func (tmi *storageInternalKeysIterator) Close(_ context.Context) {}
3384+
3385+
// ResolvedType implements the tree.ValueGenerator interface.
3386+
func (tmi *storageInternalKeysIterator) ResolvedType() *types.T {
3387+
return storageInternalKeysGeneratorType
3388+
}
3389+
3390+
func makeStorageInternalKeysGenerator(
3391+
ctx context.Context, evalCtx *eval.Context, args tree.Datums,
3392+
) (eval.ValueGenerator, error) {
3393+
isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx)
3394+
if err != nil {
3395+
return nil, err
3396+
}
3397+
if !isAdmin {
3398+
return nil, errInsufficientPriv
3399+
}
3400+
nodeID := int32(tree.MustBeDInt(args[0]))
3401+
storeID := int32(tree.MustBeDInt(args[1]))
3402+
start := []byte(tree.MustBeDBytes(args[2]))
3403+
end := []byte(tree.MustBeDBytes(args[3]))
3404+
3405+
var megabytesPerSecond int64
3406+
if len(args) > 4 {
3407+
megabytesPerSecond = int64(tree.MustBeDInt(args[4]))
3408+
} else {
3409+
megabytesPerSecond = int64(10)
3410+
}
3411+
3412+
return newStorageInternalKeysGenerator(evalCtx, nodeID, storeID, start, end, megabytesPerSecond), nil
3413+
}
3414+
32753415
var tableSpanStatsGeneratorType = types.MakeLabeledTuple(
32763416
[]*types.T{types.Int, types.Int, types.Int, types.Int, types.Int, types.Int, types.Float},
32773417
[]string{"database_id", "table_id", "range_count", "approximate_disk_bytes", "live_bytes", "total_bytes", "live_percentage"},

pkg/sql/sem/eval/context.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,9 @@ type Context struct {
227227
// GetTableMetrics is used in crdb_internal.sstable_metrics.
228228
GetTableMetrics GetTableMetricsFunc
229229

230+
// ScanStorageInternalKeys is used in crdb_internal.scan_storage_internal_keys.
231+
ScanStorageInternalKeys ScanStorageInternalKeysFunc
232+
230233
// SetCompactionConcurrency is used to change the compaction concurrency of
231234
// a store.
232235
SetCompactionConcurrency SetCompactionConcurrencyFunc

0 commit comments

Comments
 (0)