Skip to content

Commit 20aa7db

Browse files
committed
rac2,kvserver: add SendQueueLogger to periodically log per-stream send queue bytes
Informs #157793 Epic: none Release note: None
1 parent a6a0769 commit 20aa7db

File tree

6 files changed

+274
-2
lines changed

6 files changed

+274
-2
lines changed

pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ go_library(
77
"metrics.go",
88
"priority.go",
99
"range_controller.go",
10+
"send_queue_logger.go",
1011
"store_stream.go",
1112
"token_counter.go",
1213
"token_tracker.go",
@@ -49,6 +50,7 @@ go_test(
4950
"log_tracker_test.go",
5051
"priority_test.go",
5152
"range_controller_test.go",
53+
"send_queue_logger_test.go",
5254
"simulation_test.go",
5355
"store_stream_test.go",
5456
"token_counter_test.go",
@@ -86,6 +88,7 @@ go_test(
8688
"//pkg/util/timeutil",
8789
"@com_github_cockroachdb_datadriven//:datadriven",
8890
"@com_github_cockroachdb_errors//:errors",
91+
"@com_github_cockroachdb_redact//:redact",
8992
"@com_github_dustin_go_humanize//:go-humanize",
9093
"@com_github_gogo_protobuf//jsonpb",
9194
"@com_github_guptarohit_asciigraph//:asciigraph",
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package rac2
7+
8+
import (
9+
"cmp"
10+
"context"
11+
"slices"
12+
"time"
13+
14+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
15+
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
16+
"github.com/cockroachdb/cockroach/pkg/util/log"
17+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
18+
"github.com/cockroachdb/redact"
19+
)
20+
21+
const sendQueueLoggingInterval = 30 * time.Second
22+
23+
// SendQueueLogger logs send queue sizes for streams.
24+
//
25+
// Usage:
26+
//
27+
// sql := NewSendQueueLogger(numStreamsToLog)
28+
// // Periodically do:
29+
// if coll, ok := sql.TryStartLog(); ok {
30+
// for each range:
31+
// coll.ObserveRangeStats(rangeStats)
32+
// sql.FinishLog(ctx, coll)
33+
// }
34+
//
35+
// It is thread-safe.
36+
type SendQueueLogger struct {
37+
numStreamsToLog int
38+
limiter log.EveryN
39+
40+
mu struct {
41+
syncutil.Mutex
42+
scratch SendQueueLoggerCollector
43+
}
44+
45+
testingLog func(context.Context, *redact.StringBuilder)
46+
}
47+
48+
type streamAndBytes struct {
49+
stream kvflowcontrol.Stream
50+
bytes int64
51+
}
52+
53+
type SendQueueLoggerCollector struct {
54+
m map[kvflowcontrol.Stream]int64
55+
sl []streamAndBytes
56+
}
57+
58+
// NewSendQueueLogger creates a new SendQueueLogger that will log up to
59+
// numStreamsToLog streams with the highest send queue bytes when logging is
60+
// triggered.
61+
func NewSendQueueLogger(numStreamsToLog int) *SendQueueLogger {
62+
return &SendQueueLogger{
63+
numStreamsToLog: numStreamsToLog,
64+
limiter: log.Every(sendQueueLoggingInterval),
65+
}
66+
}
67+
68+
func (s *SendQueueLogger) TryStartLog() (SendQueueLoggerCollector, bool) {
69+
if !s.limiter.ShouldLog() {
70+
return SendQueueLoggerCollector{}, false
71+
}
72+
s.mu.Lock()
73+
defer s.mu.Unlock()
74+
collector := s.mu.scratch
75+
s.mu.scratch = SendQueueLoggerCollector{}
76+
if collector.m == nil {
77+
collector.m = make(map[kvflowcontrol.Stream]int64)
78+
}
79+
return collector, true
80+
}
81+
82+
func (s *SendQueueLoggerCollector) ObserveRangeStats(stats *RangeSendStreamStats) {
83+
for _, ss := range stats.internal {
84+
if ss.SendQueueBytes > 0 {
85+
s.m[ss.Stream] = s.m[ss.Stream] + ss.SendQueueBytes
86+
}
87+
}
88+
}
89+
90+
// NB: this is defined here so we can access it from tests.
91+
const sendQueueLogFormat = "send queues: %s"
92+
93+
func (s *SendQueueLogger) FinishLog(ctx context.Context, c SendQueueLoggerCollector) {
94+
if len(c.m) == 0 {
95+
return
96+
}
97+
defer func() {
98+
clear(c.m)
99+
c.sl = c.sl[:0]
100+
s.mu.Lock()
101+
s.mu.scratch = c
102+
s.mu.Unlock()
103+
}()
104+
var buf redact.StringBuilder
105+
c.finishLog(s.numStreamsToLog, &buf)
106+
if fn := s.testingLog; fn != nil {
107+
fn(ctx, &buf)
108+
} else {
109+
log.KvDistribution.Infof(ctx, sendQueueLogFormat, &buf)
110+
}
111+
}
112+
113+
func (c *SendQueueLoggerCollector) finishLog(maxNumStreams int, buf *redact.StringBuilder) {
114+
for stream, bytes := range c.m {
115+
c.sl = append(c.sl, streamAndBytes{
116+
stream: stream,
117+
bytes: bytes,
118+
})
119+
}
120+
// Sort by descending send queue bytes.
121+
slices.SortFunc(c.sl, func(a, b streamAndBytes) int {
122+
return -cmp.Compare(a.bytes, b.bytes)
123+
})
124+
n := min(maxNumStreams, len(c.sl))
125+
for i := 0; i < n; i++ {
126+
if i > 0 {
127+
buf.SafeString(", ")
128+
}
129+
buf.Printf("%s: %s", c.sl[i].stream,
130+
humanizeutil.IBytes(c.sl[i].bytes))
131+
}
132+
remainingBytes := int64(0)
133+
for i := n; i < len(c.sl); i++ {
134+
remainingBytes += c.sl[i].bytes
135+
}
136+
if remainingBytes > 0 {
137+
buf.Printf(", + %s more bytes across %d streams",
138+
humanizeutil.IBytes(remainingBytes), len(c.sl)-n)
139+
}
140+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package rac2
7+
8+
import (
9+
"context"
10+
"testing"
11+
12+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
13+
"github.com/cockroachdb/cockroach/pkg/roachpb"
14+
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
15+
"github.com/cockroachdb/cockroach/pkg/testutils/echotest"
16+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
17+
"github.com/cockroachdb/cockroach/pkg/util/log"
18+
"github.com/cockroachdb/redact"
19+
"github.com/stretchr/testify/require"
20+
)
21+
22+
func TestSendQueueLogger(t *testing.T) {
23+
defer leaktest.AfterTest(t)()
24+
s := log.ScopeWithoutShowLogs(t)
25+
defer s.Close(t)
26+
27+
rss11_100 := ReplicaSendStreamStats{
28+
Stream: kvflowcontrol.Stream{
29+
TenantID: roachpb.MustMakeTenantID(1),
30+
StoreID: 1,
31+
},
32+
ReplicaSendQueueStats: ReplicaSendQueueStats{
33+
SendQueueBytes: 100,
34+
},
35+
}
36+
rss22_200 := ReplicaSendStreamStats{
37+
Stream: kvflowcontrol.Stream{
38+
TenantID: roachpb.MustMakeTenantID(2),
39+
StoreID: 2,
40+
},
41+
ReplicaSendQueueStats: ReplicaSendQueueStats{
42+
SendQueueBytes: 200,
43+
},
44+
}
45+
rss33_50 := ReplicaSendStreamStats{
46+
Stream: kvflowcontrol.Stream{
47+
TenantID: roachpb.MustMakeTenantID(3),
48+
StoreID: 3,
49+
},
50+
ReplicaSendQueueStats: ReplicaSendQueueStats{
51+
SendQueueBytes: 50,
52+
},
53+
}
54+
rss44_25 := ReplicaSendStreamStats{
55+
Stream: kvflowcontrol.Stream{
56+
TenantID: roachpb.MustMakeTenantID(4),
57+
StoreID: 4,
58+
},
59+
ReplicaSendQueueStats: ReplicaSendQueueStats{
60+
SendQueueBytes: 25,
61+
},
62+
}
63+
sql := NewSendQueueLogger(2)
64+
var output redact.RedactableString
65+
sql.testingLog = func(ctx context.Context, buf *redact.StringBuilder) {
66+
if len(output) > 0 {
67+
output += "\n"
68+
}
69+
output += redact.Sprintf(sendQueueLogFormat, buf)
70+
}
71+
ctx := context.Background()
72+
coll, ok := sql.TryStartLog()
73+
74+
require.True(t, ok)
75+
// Empty stats are ok.
76+
stats := RangeSendStreamStats{}
77+
coll.ObserveRangeStats(&stats)
78+
stats = RangeSendStreamStats{
79+
internal: []ReplicaSendStreamStats{rss22_200, rss33_50},
80+
}
81+
coll.ObserveRangeStats(&stats)
82+
stats = RangeSendStreamStats{
83+
internal: []ReplicaSendStreamStats{rss11_100, rss22_200},
84+
}
85+
coll.ObserveRangeStats(&stats)
86+
stats = RangeSendStreamStats{
87+
internal: []ReplicaSendStreamStats{rss11_100},
88+
}
89+
coll.ObserveRangeStats(&stats)
90+
stats = RangeSendStreamStats{
91+
internal: []ReplicaSendStreamStats{rss33_50, rss44_25},
92+
}
93+
coll.ObserveRangeStats(&stats)
94+
sql.FinishLog(ctx, coll)
95+
96+
// Cannot log again immediately.
97+
_, ok = sql.TryStartLog()
98+
require.False(t, ok)
99+
// Call FinishLog again to ensure no logging happens.
100+
sql.FinishLog(ctx, SendQueueLoggerCollector{})
101+
102+
echotest.Require(t, string(output), datapathutils.TestDataPath(t, t.Name()+".txt"))
103+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
echo
2+
----
3+
send queues: t2/s2: 400 B, t1/s1: 200 B, + 125 B more bytes across 2 streams

pkg/kv/kvserver/stores.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/cockroachdb/cockroach/pkg/kv"
1515
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1616
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
17+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
1718
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
1819
"github.com/cockroachdb/cockroach/pkg/roachpb"
1920
"github.com/cockroachdb/cockroach/pkg/storage"
@@ -44,6 +45,7 @@ type Stores struct {
4445
biLatestTS hlc.Timestamp // Timestamp of gossip bootstrap info
4546
latestBI *gossip.BootstrapInfo // Latest cached bootstrap info
4647
}
48+
sendQueueLogger *rac2.SendQueueLogger
4749
}
4850

4951
var _ kv.Sender = &Stores{} // Stores implements the client.Sender interface
@@ -52,9 +54,11 @@ var _ gossip.Storage = &Stores{} // Stores implements the gossip.Storage interfa
5254
// NewStores returns a local-only sender which directly accesses
5355
// a collection of stores.
5456
func NewStores(ambient log.AmbientContext, clock *hlc.Clock) *Stores {
57+
const numStreamsToLog = 20
5558
return &Stores{
56-
AmbientContext: ambient,
57-
clock: clock,
59+
AmbientContext: ambient,
60+
clock: clock,
61+
sendQueueLogger: rac2.NewSendQueueLogger(numStreamsToLog),
5862
}
5963
}
6064

@@ -354,3 +358,21 @@ func (ls *Stores) GetAggregatedStoreStats(
354358
}
355359
return storesCPURate, numStores, nil
356360
}
361+
362+
func (ls *Stores) TryLogFlowControlSendQueues(ctx context.Context) {
363+
coll, ok := ls.sendQueueLogger.TryStartLog()
364+
if !ok {
365+
return
366+
}
367+
rangeSendStats := rac2.RangeSendStreamStats{}
368+
_ = ls.VisitStores(func(s *Store) error {
369+
newStoreReplicaVisitor(s).Visit(func(rep *Replica) bool {
370+
rangeSendStats.Clear()
371+
rep.SendStreamStats(&rangeSendStats)
372+
coll.ObserveRangeStats(&rangeSendStats)
373+
return true
374+
})
375+
return nil
376+
})
377+
ls.sendQueueLogger.FinishLog(ctx, coll)
378+
}

pkg/server/node.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1239,6 +1239,7 @@ func (n *Node) computeMetricsPeriodically(
12391239
})
12401240
n.updateNodeRangeCount()
12411241
n.storeCfg.KVFlowStreamTokenProvider.UpdateMetricGauges()
1242+
n.stores.TryLogFlowControlSendQueues(ctx)
12421243
return err
12431244
}
12441245

0 commit comments

Comments
 (0)