Skip to content

Commit fafb5e5

Browse files
craig[bot]tbg
andcommitted
Merge #146526
146526: kvserver: improve tenant_id CPU observability r=tbg a=tbg Motivated by https://github.com/cockroachlabs/support/issues/3297. This commit - adds a new `replicas.cpunanospersecond` metric, which aggregates the Replica ReqCPUNanosPerSecond at the tenant level. - Adds a tenant_id tag to CPU profiles. This should simplify investigations related to tenant-induced overload: the new metric should often help pinpoint the set of hot tenants. CPU profiles can help dig into the specific code paths this tenant is exercising. This can then be rounded out with the existing metrics for request counts by tenant (both received by KV and sent by the tenant Pod) for a comprehensive picture. Release note: the `replicas.cpunanospersecond` metric was added. Notably, when child labels are enabled, it exposes evaluation-related Replica CPU usage by tenant. Epic: none Co-authored-by: Tobias Grieger <[email protected]>
2 parents add581c + 58c9012 commit fafb5e5

File tree

8 files changed

+110
-89
lines changed

8 files changed

+110
-89
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15153,6 +15153,14 @@ layers:
1515315153
unit: COUNT
1515415154
aggregation: AVG
1515515155
derivative: NONE
15156+
- name: replicas.cpunanospersecond
15157+
exported_name: replicas_cpunanospersecond
15158+
description: Nanoseconds of CPU time in Replica request processing including evaluation but not replication
15159+
y_axis_label: Nanoseconds
15160+
type: COUNTER
15161+
unit: NANOSECONDS
15162+
aggregation: AVG
15163+
derivative: NON_NEGATIVE_DERIVATIVE
1515615164
- name: replicas.leaders
1515715165
exported_name: replicas_leaders
1515815166
description: Number of raft leaders

pkg/kv/kvserver/metrics.go

Lines changed: 58 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,13 @@ var (
249249
Unit: metric.Unit_COUNT,
250250
}
251251

252+
metaReqCPUNanos = metric.Metadata{
253+
Name: "replicas.cpunanospersecond",
254+
Help: "Nanoseconds of CPU time in Replica request processing including evaluation but not replication",
255+
Measurement: "Nanoseconds",
256+
Unit: metric.Unit_NANOSECONDS,
257+
}
258+
252259
// Storage metrics.
253260
metaLiveBytes = metric.Metadata{
254261
Name: "livebytes",
@@ -3207,29 +3214,6 @@ type StoreMetrics struct {
32073214
DiskWriteMaxBytesPerSecond *metric.Gauge
32083215
}
32093216

3210-
type tenantMetricsRef struct {
3211-
// All fields are internal. Don't access them.
3212-
3213-
_tenantID roachpb.TenantID
3214-
_state int32 // atomic; 0=usable 1=poisoned
3215-
3216-
// _stack helps diagnose use-after-release when it occurs.
3217-
// This field is populated in releaseTenant and printed
3218-
// in assertions on failure.
3219-
_stack struct {
3220-
syncutil.Mutex
3221-
debugutil.SafeStack
3222-
}
3223-
}
3224-
3225-
func (ref *tenantMetricsRef) assert(ctx context.Context) {
3226-
if atomic.LoadInt32(&ref._state) != 0 {
3227-
ref._stack.Lock()
3228-
defer ref._stack.Unlock()
3229-
log.FatalfDepth(ctx, 1, "tenantMetricsRef already finalized in:\n%s", ref._stack.SafeStack)
3230-
}
3231-
}
3232-
32333217
// TenantsStorageMetrics are metrics which are aggregated over all tenants
32343218
// present on the server. The struct maintains child metrics used by each
32353219
// tenant to track their individual values. The struct expects that children
@@ -3238,6 +3222,7 @@ func (ref *tenantMetricsRef) assert(ctx context.Context) {
32383222
type TenantsStorageMetrics struct {
32393223
// NB: If adding more metrics to this struct, be sure to
32403224
// also update tenantsStorageMetricsSet().
3225+
ReqCPUNanos *aggmetric.AggCounterFloat64
32413226
LiveBytes *aggmetric.AggGauge
32423227
KeyBytes *aggmetric.AggGauge
32433228
ValBytes *aggmetric.AggGauge
@@ -3274,6 +3259,7 @@ type TenantsStorageMetrics struct {
32743259
// see kvbase.TenantsStorageMetricsSet for public access. Assigned in init().
32753260
func tenantsStorageMetricsSet() map[string]struct{} {
32763261
return map[string]struct{}{
3262+
metaReqCPUNanos.Name: {},
32773263
metaLiveBytes.Name: {},
32783264
metaKeyBytes.Name: {},
32793265
metaValBytes.Name: {},
@@ -3304,7 +3290,7 @@ func (sm *TenantsStorageMetrics) MetricStruct() {}
33043290
// method are reference counted with decrements occurring in the corresponding
33053291
// releaseTenant call. This method must be called prior to adding or subtracting
33063292
// MVCC stats.
3307-
func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) *tenantMetricsRef {
3293+
func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) *tenantStorageMetrics {
33083294
// incRef increments the reference count if it is not already zero indicating
33093295
// that the struct has already been destroyed.
33103296
incRef := func(m *tenantStorageMetrics) (alreadyDestroyed bool) {
@@ -3319,15 +3305,13 @@ func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) *tenan
33193305
for {
33203306
if m, ok := sm.tenants.Load(tenantID); ok {
33213307
if alreadyDestroyed := incRef(m); !alreadyDestroyed {
3322-
return &tenantMetricsRef{
3323-
_tenantID: tenantID,
3324-
}
3308+
return m
33253309
}
33263310
// Somebody else concurrently took the reference count to zero, go back
33273311
// around. Because of the locking in releaseTenant, we know that we'll
33283312
// find a different value or no value at all on the next iteration.
33293313
} else {
3330-
m := &tenantStorageMetrics{}
3314+
m := &tenantStorageMetrics{tenantID: tenantID}
33313315
m.mu.Lock()
33323316
_, loaded := sm.tenants.LoadOrStore(tenantID, m)
33333317
if loaded {
@@ -3338,6 +3322,7 @@ func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) *tenan
33383322
// Successfully stored a new instance, initialize it and then unlock it.
33393323
tenantIDStr := tenantID.String()
33403324
m.mu.refCount++
3325+
m.ReqCPUNanos = sm.ReqCPUNanos.AddChild(tenantIDStr)
33413326
m.LiveBytes = sm.LiveBytes.AddChild(tenantIDStr)
33423327
m.KeyBytes = sm.KeyBytes.AddChild(tenantIDStr)
33433328
m.ValBytes = sm.ValBytes.AddChild(tenantIDStr)
@@ -3359,37 +3344,35 @@ func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) *tenan
33593344
m.SysCount = sm.SysCount.AddChild(tenantIDStr)
33603345
m.AbortSpanBytes = sm.AbortSpanBytes.AddChild(tenantIDStr)
33613346
m.mu.Unlock()
3362-
return &tenantMetricsRef{
3363-
_tenantID: tenantID,
3364-
}
3347+
return m
33653348
}
33663349
}
33673350
}
33683351

33693352
// releaseTenant releases the reference to the metrics for this tenant which was
33703353
// acquired with acquireTenant. It will fatally log if no entry exists for this
33713354
// tenant.
3372-
func (sm *TenantsStorageMetrics) releaseTenant(ctx context.Context, ref *tenantMetricsRef) {
3373-
m := sm.getTenant(ctx, ref) // NB: asserts against use-after-release
3374-
if atomic.SwapInt32(&ref._state, 1) != 0 {
3375-
ref.assert(ctx) // this will fatal
3376-
return // unreachable
3377-
}
3378-
ref._stack.Lock()
3379-
ref._stack.SafeStack = debugutil.Stack()
3380-
ref._stack.Unlock()
3355+
func (sm *TenantsStorageMetrics) releaseTenant(ctx context.Context, m *tenantStorageMetrics) {
33813356
m.mu.Lock()
33823357
defer m.mu.Unlock()
3358+
if m.mu.released.Load() {
3359+
log.FatalfDepth(ctx, 1, "tenant metrics already released in:\n%s", m.mu.stack)
3360+
}
33833361
m.mu.refCount--
3384-
if m.mu.refCount < 0 {
3385-
log.Fatalf(ctx, "invalid refCount on metrics for tenant %v: %d", ref._tenantID, m.mu.refCount)
3386-
} else if m.mu.refCount > 0 {
3362+
if n := m.mu.refCount; n < 0 {
3363+
log.Fatalf(ctx, "invalid refCount on metrics for tenant %v: %d", m.tenantID, n)
3364+
} else if n > 0 {
33873365
return
33883366
}
33893367

3368+
m.mu.released.Store(true)
3369+
m.mu.stack = debugutil.Stack()
3370+
33903371
// The refCount is zero, delete this instance after destroying its metrics.
33913372
// Note that concurrent attempts to create an instance will detect the zero
33923373
// refCount value and construct a new instance.
3374+
m.ReqCPUNanos.Unlink() // counter
3375+
m.ReqCPUNanos = nil
33933376
for _, gptr := range []**aggmetric.Gauge{
33943377
&m.LiveBytes,
33953378
&m.KeyBytes,
@@ -3417,28 +3400,30 @@ func (sm *TenantsStorageMetrics) releaseTenant(ctx context.Context, ref *tenantM
34173400
(*gptr).Unlink()
34183401
*gptr = nil
34193402
}
3420-
sm.tenants.Delete(ref._tenantID)
3421-
}
3422-
3423-
// getTenant is a helper method used to retrieve the metrics for a tenant. The
3424-
// call will log fatally if no such tenant has been previously acquired.
3425-
func (sm *TenantsStorageMetrics) getTenant(
3426-
ctx context.Context, ref *tenantMetricsRef,
3427-
) *tenantStorageMetrics {
3428-
ref.assert(ctx)
3429-
m, ok := sm.tenants.Load(ref._tenantID)
3430-
if !ok {
3431-
log.Fatalf(ctx, "no metrics exist for tenant %v", ref._tenantID)
3432-
}
3433-
return m
3403+
sm.tenants.Delete(m.tenantID)
34343404
}
34353405

3406+
// tenantStorageMetrics is a struct that holds the metrics for all replicas (
3407+
// within a Store) of a given tenant.
3408+
//
3409+
// Whenever it is guaranteed that the replica is not destroyed, the metrics
3410+
// fields can be accessed directly (for example, when holding raftMu and having
3411+
// previously checked the replica's destroyStatus).
3412+
//
3413+
// Whenever this is *not* guaranteed, use `TenantsStorageMetrics.acquireTenant`
3414+
// ( followed by releaseTenant after completion of usage) instead to avoid
3415+
// racing with a potential concurrent attempt to release the metrics.
34363416
type tenantStorageMetrics struct {
3437-
mu struct {
3417+
tenantID roachpb.TenantID
3418+
mu struct {
34383419
syncutil.Mutex
34393420
refCount int
3421+
released atomic.Bool // allowed to read without holding mu
3422+
stack debugutil.SafeStack
34403423
}
34413424

3425+
ReqCPUNanos *aggmetric.CounterFloat64
3426+
34423427
LiveBytes *aggmetric.Gauge
34433428
KeyBytes *aggmetric.Gauge
34443429
ValBytes *aggmetric.Gauge
@@ -3461,9 +3446,18 @@ type tenantStorageMetrics struct {
34613446
AbortSpanBytes *aggmetric.Gauge
34623447
}
34633448

3449+
func (tm *tenantStorageMetrics) assert(ctx context.Context) {
3450+
if tm.mu.released.Load() {
3451+
tm.mu.Lock()
3452+
defer tm.mu.Unlock()
3453+
log.Fatalf(ctx, "tenant metrics already released in:\n%s", tm.mu.stack)
3454+
}
3455+
}
3456+
34643457
func newTenantsStorageMetrics() *TenantsStorageMetrics {
34653458
b := aggmetric.MakeBuilder(multitenant.TenantIDLabel)
34663459
sm := &TenantsStorageMetrics{
3460+
ReqCPUNanos: b.CounterFloat64(metaReqCPUNanos),
34673461
LiveBytes: b.Gauge(metaLiveBytes),
34683462
KeyBytes: b.Gauge(metaKeyBytes),
34693463
ValBytes: b.Gauge(metaValBytes),
@@ -4020,10 +4014,9 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
40204014
// single snapshot of these gauges in the registry might mix the values of two
40214015
// subsequent updates.
40224016
func (sm *TenantsStorageMetrics) incMVCCGauges(
4023-
ctx context.Context, ref *tenantMetricsRef, delta enginepb.MVCCStats,
4017+
ctx context.Context, tm *tenantStorageMetrics, delta enginepb.MVCCStats,
40244018
) {
4025-
ref.assert(ctx)
4026-
tm := sm.getTenant(ctx, ref)
4019+
tm.assert(ctx)
40274020
tm.LiveBytes.Inc(delta.LiveBytes)
40284021
tm.KeyBytes.Inc(delta.KeyBytes)
40294022
tm.ValBytes.Inc(delta.ValBytes)
@@ -4047,17 +4040,17 @@ func (sm *TenantsStorageMetrics) incMVCCGauges(
40474040
}
40484041

40494042
func (sm *TenantsStorageMetrics) addMVCCStats(
4050-
ctx context.Context, ref *tenantMetricsRef, delta enginepb.MVCCStats,
4043+
ctx context.Context, tm *tenantStorageMetrics, delta enginepb.MVCCStats,
40514044
) {
4052-
sm.incMVCCGauges(ctx, ref, delta)
4045+
sm.incMVCCGauges(ctx, tm, delta)
40534046
}
40544047

40554048
func (sm *TenantsStorageMetrics) subtractMVCCStats(
4056-
ctx context.Context, ref *tenantMetricsRef, delta enginepb.MVCCStats,
4049+
ctx context.Context, tm *tenantStorageMetrics, delta enginepb.MVCCStats,
40574050
) {
40584051
var neg enginepb.MVCCStats
40594052
neg.Subtract(delta)
4060-
sm.incMVCCGauges(ctx, ref, neg)
4053+
sm.incMVCCGauges(ctx, tm, neg)
40614054
}
40624055

40634056
func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) {

pkg/kv/kvserver/metrics_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,14 @@ import (
2727
func TestTenantsStorageMetricsRelease(t *testing.T) {
2828
defer leaktest.AfterTest(t)()
2929
m := newTenantsStorageMetrics()
30-
var refs []*tenantMetricsRef
30+
var refs []*tenantStorageMetrics
3131
const tenants = 7
3232
for i := 0; i < tenants; i++ {
3333
id := roachpb.MustMakeTenantID(roachpb.MinTenantID.InternalValue + uint64(i))
34-
ref := m.acquireTenant(id)
35-
tm := m.getTenant(context.Background(), ref)
34+
tm := m.acquireTenant(id)
3635
tm.SysBytes.Update(1023)
3736
tm.KeyCount.Inc(123)
38-
refs = append(refs, ref)
37+
refs = append(refs, tm)
3938
}
4039
for i, ref := range refs {
4140
require.Equal(t, int64(1023*(tenants-i)), m.SysBytes.Value(), i)

pkg/kv/kvserver/mvcc_gc_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -664,7 +664,7 @@ func (mgcq *mvccGCQueue) process(
664664
) (processed bool, err error) {
665665
// Record the CPU time processing the request for this replica. This is
666666
// recorded regardless of errors that are encountered.
667-
defer repl.MeasureReqCPUNanos(grunning.Time())
667+
defer repl.MeasureReqCPUNanos(ctx, grunning.Time())
668668

669669
// Lookup the descriptor and GC policy for the zone containing this key range.
670670
desc, conf := repl.DescAndSpanConfig()

pkg/kv/kvserver/replica.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -548,13 +548,11 @@ type Replica struct {
548548
// [^1]: TODO(pavelkalinnikov): we can but it'd be a larger refactor.
549549
tenantLimiter tenantrate.Limiter
550550

551-
// tenantMetricsRef is a metrics reference indicating the tenant under
552-
// which to track the range's contributions. This is determined by the
553-
// start key of the Replica, once initialized.
554-
// Its purpose is to help track down missing/extraneous release operations
555-
// that would not be apparent or easy to resolve when refcounting at the store
556-
// level only.
557-
tenantMetricsRef *tenantMetricsRef
551+
// tenantMetricsRef is a struct for per-tenant metrics contributed by this
552+
// replica. It is determined by the start key of the Replica, once
553+
// initialized. See tenantStorageMetrics for precautions that must be taken
554+
// when accessing this.
555+
tenantMetricsRef *tenantStorageMetrics
558556

559557
// sideTransportClosedTimestamp encapsulates state related to the closed
560558
// timestamp's information about the range. Note that the
@@ -2770,9 +2768,23 @@ func init() {
27702768

27712769
// MeasureReqCPUNanos measures the cpu time spent on this replica processing
27722770
// requests.
2773-
func (r *Replica) MeasureReqCPUNanos(start time.Duration) {
2771+
func (r *Replica) MeasureReqCPUNanos(ctx context.Context, start time.Duration) {
27742772
r.measureNanosRunning(start, func(dur float64) {
27752773
r.loadStats.RecordReqCPUNanos(dur)
2774+
// NB: the caller also has a tenant ID, but we use the replica's here for
2775+
// simplicity. There is no established pattern for short-lived references
2776+
// to a specific tenant's metrics.
2777+
if r.tenantMetricsRef != nil {
2778+
// We can *not* use the tenant metrics directly because nothing in this
2779+
// current code path prevents the surrounding replica from getting
2780+
// destroyed, which could zero the refcount and release the metrics
2781+
// object. Instead, we go through acquireTenant, which gives us an object
2782+
// that is and remain valid. This is not an expensive operation in
2783+
// the common case (the replica still exists).
2784+
tm := r.store.metrics.acquireTenant(r.tenantMetricsRef.tenantID)
2785+
tm.ReqCPUNanos.Inc(dur)
2786+
r.store.metrics.releaseTenant(ctx, tm)
2787+
}
27762788
})
27772789
}
27782790

pkg/kv/kvserver/replica_rate_limit.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@ import (
1818
// maybeRateLimitBatch may block the batch waiting to be rate-limited. Note that
1919
// the replica must be initialized and thus there is no synchronization issue
2020
// on the tenantRateLimiter.
21-
func (r *Replica) maybeRateLimitBatch(ctx context.Context, ba *kvpb.BatchRequest) error {
21+
func (r *Replica) maybeRateLimitBatch(
22+
ctx context.Context, ba *kvpb.BatchRequest, tenantIDOrZero roachpb.TenantID,
23+
) error {
2224
if r.tenantLimiter == nil {
2325
return nil
2426
}
25-
tenantID, ok := roachpb.ClientTenantFromContext(ctx)
26-
if !ok || tenantID == roachpb.SystemTenantID {
27+
if !tenantIDOrZero.IsSet() || tenantIDOrZero.IsSystem() {
2728
return nil
2829
}
2930

pkg/kv/kvserver/replica_send.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,23 @@ func (r *Replica) Send(
123123
func (r *Replica) SendWithWriteBytes(
124124
ctx context.Context, ba *kvpb.BatchRequest,
125125
) (*kvpb.BatchResponse, *kvadmission.StoreWriteBytes, *kvpb.Error) {
126+
tenantIDOrZero, _ := roachpb.ClientTenantFromContext(ctx)
127+
128+
// Record the CPU time processing the request for this replica. This is
129+
// recorded regardless of errors that are encountered.
130+
startCPU := grunning.Time()
131+
defer r.MeasureReqCPUNanos(ctx, startCPU)
132+
126133
if r.store.cfg.Settings.CPUProfileType() == cluster.CPUProfileWithLabels {
127134
defer pprof.SetGoroutineLabels(ctx)
128135
// Note: the defer statement captured the previous context.
129-
ctx = pprof.WithLabels(ctx, pprof.Labels("range_str", r.rangeStr.ID()))
136+
var lbls pprof.LabelSet
137+
if tenantIDOrZero.IsSet() {
138+
lbls = pprof.Labels("range_str", r.rangeStr.ID(), "tenant_id", tenantIDOrZero.String())
139+
} else {
140+
lbls = pprof.Labels("range_str", r.rangeStr.ID())
141+
}
142+
ctx = pprof.WithLabels(ctx, lbls)
130143
pprof.SetGoroutineLabels(ctx)
131144
}
132145
if trace.IsEnabled() {
@@ -135,11 +148,6 @@ func (r *Replica) SendWithWriteBytes(
135148
// Add the range log tag.
136149
ctx = r.AnnotateCtx(ctx)
137150

138-
// Record the CPU time processing the request for this replica. This is
139-
// recorded regardless of errors that are encountered.
140-
startCPU := grunning.Time()
141-
defer r.MeasureReqCPUNanos(startCPU)
142-
143151
isReadOnly := ba.IsReadOnly()
144152
if err := r.checkBatchRequest(ba, isReadOnly); err != nil {
145153
return nil, nil, kvpb.NewError(err)
@@ -148,7 +156,7 @@ func (r *Replica) SendWithWriteBytes(
148156
if err := r.maybeBackpressureBatch(ctx, ba); err != nil {
149157
return nil, nil, kvpb.NewError(err)
150158
}
151-
if err := r.maybeRateLimitBatch(ctx, ba); err != nil {
159+
if err := r.maybeRateLimitBatch(ctx, ba, tenantIDOrZero); err != nil {
152160
return nil, nil, kvpb.NewError(err)
153161
}
154162
if err := r.maybeCommitWaitBeforeCommitTrigger(ctx, ba); err != nil {

0 commit comments

Comments
 (0)