Skip to content

Commit 019bb62

Browse files
craig[bot]sumeerbhola
andcommitted
Merge #157865
157865: rac2,kvserver: add SendQueueLogger to periodically log per-stream send queue bytes r=tbg a=sumeerbhola Informs #157793 Epic: none Release note: None Co-authored-by: sumeerbhola <[email protected]>
2 parents 680a632 + 20aa7db commit 019bb62

File tree

9 files changed

+303
-18
lines changed

9 files changed

+303
-18
lines changed

pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ go_library(
77
"metrics.go",
88
"priority.go",
99
"range_controller.go",
10+
"send_queue_logger.go",
1011
"store_stream.go",
1112
"token_counter.go",
1213
"token_tracker.go",
@@ -49,6 +50,7 @@ go_test(
4950
"log_tracker_test.go",
5051
"priority_test.go",
5152
"range_controller_test.go",
53+
"send_queue_logger_test.go",
5254
"simulation_test.go",
5355
"store_stream_test.go",
5456
"token_counter_test.go",
@@ -86,6 +88,7 @@ go_test(
8688
"//pkg/util/timeutil",
8789
"@com_github_cockroachdb_datadriven//:datadriven",
8890
"@com_github_cockroachdb_errors//:errors",
91+
"@com_github_cockroachdb_redact//:redact",
8992
"@com_github_dustin_go_humanize//:go-humanize",
9093
"@com_github_gogo_protobuf//jsonpb",
9194
"@com_github_guptarohit_asciigraph//:asciigraph",

pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,8 @@ func (q *RangeSendQueueStats) Clear() {
320320
// ReplicaSendStreamStats contains the stats for a replica send stream that may
321321
// be used to inform placement decisions pertaining to the replica.
322322
type ReplicaSendStreamStats struct {
323+
// Stream is the flow control stream for the replica.
324+
Stream kvflowcontrol.Stream
323325
// IsStateReplicate is true iff the replica is being sent entries.
324326
IsStateReplicate bool
325327
// HasSendQueue is true when a replica has a non-zero amount of queued
@@ -1721,6 +1723,7 @@ func (rc *rangeController) SendStreamStats(statsToSet *RangeSendStreamStats) {
17211723
// end up overwriting the same state at most twice, not a big issue.
17221724
for _, vs := range vss {
17231725
stats := ReplicaSendStreamStats{
1726+
Stream: vs.stateForWaiters.evalTokenCounter.stream,
17241727
IsStateReplicate: vs.isStateReplicate,
17251728
HasSendQueue: vs.hasSendQ,
17261729
}
@@ -1731,6 +1734,7 @@ func (rc *rangeController) SendStreamStats(statsToSet *RangeSendStreamStats) {
17311734
// Now handle the non-voters.
17321735
for _, nv := range rc.mu.nonVoterSet {
17331736
stats := ReplicaSendStreamStats{
1737+
Stream: nv.evalTokenCounter.stream,
17341738
IsStateReplicate: nv.isStateReplicate,
17351739
HasSendQueue: nv.hasSendQ,
17361740
}

pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1580,8 +1580,9 @@ func TestRangeController(t *testing.T) {
15801580
for _, repl := range sortReplicas(r) {
15811581
replStats, ok := stats.ReplicaSendStreamStats(repl.ReplicaID)
15821582
require.True(t, ok)
1583-
buf.WriteString(fmt.Sprintf("%v: is_state_replicate=%-5v has_send_queue=%-5v send_queue_size=%v / %v entries\n",
1583+
buf.WriteString(fmt.Sprintf("%v: stream=%v is_state_replicate=%-5v has_send_queue=%-5v send_queue_size=%v / %v entries\n",
15841584
repl,
1585+
replStats.Stream,
15851586
replStats.IsStateReplicate,
15861587
replStats.HasSendQueue,
15871588
// Cast for formatting.
@@ -2476,6 +2477,10 @@ func TestRangeSendStreamStatsString(t *testing.T) {
24762477
stats := RangeSendStreamStats{
24772478
internal: []ReplicaSendStreamStats{
24782479
{
2480+
Stream: kvflowcontrol.Stream{
2481+
TenantID: roachpb.MustMakeTenantID(1),
2482+
StoreID: roachpb.StoreID(1),
2483+
},
24792484
IsStateReplicate: false,
24802485
HasSendQueue: true,
24812486
ReplicaSendQueueStats: ReplicaSendQueueStats{
@@ -2485,6 +2490,10 @@ func TestRangeSendStreamStatsString(t *testing.T) {
24852490
},
24862491
},
24872492
{
2493+
Stream: kvflowcontrol.Stream{
2494+
TenantID: roachpb.MustMakeTenantID(2),
2495+
StoreID: roachpb.StoreID(2),
2496+
},
24882497
IsStateReplicate: true,
24892498
HasSendQueue: false,
24902499
ReplicaSendQueueStats: ReplicaSendQueueStats{
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package rac2
7+
8+
import (
9+
"cmp"
10+
"context"
11+
"slices"
12+
"time"
13+
14+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
15+
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
16+
"github.com/cockroachdb/cockroach/pkg/util/log"
17+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
18+
"github.com/cockroachdb/redact"
19+
)
20+
21+
const sendQueueLoggingInterval = 30 * time.Second
22+
23+
// SendQueueLogger logs send queue sizes for streams.
24+
//
25+
// Usage:
26+
//
27+
// sql := NewSendQueueLogger(numStreamsToLog)
28+
// // Periodically do:
29+
// if coll, ok := sql.TryStartLog(); ok {
30+
// for each range:
31+
// coll.ObserveRangeStats(rangeStats)
32+
// sql.FinishLog(ctx, coll)
33+
// }
34+
//
35+
// It is thread-safe.
36+
type SendQueueLogger struct {
37+
numStreamsToLog int
38+
limiter log.EveryN
39+
40+
mu struct {
41+
syncutil.Mutex
42+
scratch SendQueueLoggerCollector
43+
}
44+
45+
testingLog func(context.Context, *redact.StringBuilder)
46+
}
47+
48+
type streamAndBytes struct {
49+
stream kvflowcontrol.Stream
50+
bytes int64
51+
}
52+
53+
type SendQueueLoggerCollector struct {
54+
m map[kvflowcontrol.Stream]int64
55+
sl []streamAndBytes
56+
}
57+
58+
// NewSendQueueLogger creates a new SendQueueLogger that will log up to
59+
// numStreamsToLog streams with the highest send queue bytes when logging is
60+
// triggered.
61+
func NewSendQueueLogger(numStreamsToLog int) *SendQueueLogger {
62+
return &SendQueueLogger{
63+
numStreamsToLog: numStreamsToLog,
64+
limiter: log.Every(sendQueueLoggingInterval),
65+
}
66+
}
67+
68+
func (s *SendQueueLogger) TryStartLog() (SendQueueLoggerCollector, bool) {
69+
if !s.limiter.ShouldLog() {
70+
return SendQueueLoggerCollector{}, false
71+
}
72+
s.mu.Lock()
73+
defer s.mu.Unlock()
74+
collector := s.mu.scratch
75+
s.mu.scratch = SendQueueLoggerCollector{}
76+
if collector.m == nil {
77+
collector.m = make(map[kvflowcontrol.Stream]int64)
78+
}
79+
return collector, true
80+
}
81+
82+
func (s *SendQueueLoggerCollector) ObserveRangeStats(stats *RangeSendStreamStats) {
83+
for _, ss := range stats.internal {
84+
if ss.SendQueueBytes > 0 {
85+
s.m[ss.Stream] = s.m[ss.Stream] + ss.SendQueueBytes
86+
}
87+
}
88+
}
89+
90+
// NB: this is defined here so we can access it from tests.
91+
const sendQueueLogFormat = "send queues: %s"
92+
93+
func (s *SendQueueLogger) FinishLog(ctx context.Context, c SendQueueLoggerCollector) {
94+
if len(c.m) == 0 {
95+
return
96+
}
97+
defer func() {
98+
clear(c.m)
99+
c.sl = c.sl[:0]
100+
s.mu.Lock()
101+
s.mu.scratch = c
102+
s.mu.Unlock()
103+
}()
104+
var buf redact.StringBuilder
105+
c.finishLog(s.numStreamsToLog, &buf)
106+
if fn := s.testingLog; fn != nil {
107+
fn(ctx, &buf)
108+
} else {
109+
log.KvDistribution.Infof(ctx, sendQueueLogFormat, &buf)
110+
}
111+
}
112+
113+
func (c *SendQueueLoggerCollector) finishLog(maxNumStreams int, buf *redact.StringBuilder) {
114+
for stream, bytes := range c.m {
115+
c.sl = append(c.sl, streamAndBytes{
116+
stream: stream,
117+
bytes: bytes,
118+
})
119+
}
120+
// Sort by descending send queue bytes.
121+
slices.SortFunc(c.sl, func(a, b streamAndBytes) int {
122+
return -cmp.Compare(a.bytes, b.bytes)
123+
})
124+
n := min(maxNumStreams, len(c.sl))
125+
for i := 0; i < n; i++ {
126+
if i > 0 {
127+
buf.SafeString(", ")
128+
}
129+
buf.Printf("%s: %s", c.sl[i].stream,
130+
humanizeutil.IBytes(c.sl[i].bytes))
131+
}
132+
remainingBytes := int64(0)
133+
for i := n; i < len(c.sl); i++ {
134+
remainingBytes += c.sl[i].bytes
135+
}
136+
if remainingBytes > 0 {
137+
buf.Printf(", + %s more bytes across %d streams",
138+
humanizeutil.IBytes(remainingBytes), len(c.sl)-n)
139+
}
140+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package rac2
7+
8+
import (
9+
"context"
10+
"testing"
11+
12+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
13+
"github.com/cockroachdb/cockroach/pkg/roachpb"
14+
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
15+
"github.com/cockroachdb/cockroach/pkg/testutils/echotest"
16+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
17+
"github.com/cockroachdb/cockroach/pkg/util/log"
18+
"github.com/cockroachdb/redact"
19+
"github.com/stretchr/testify/require"
20+
)
21+
22+
func TestSendQueueLogger(t *testing.T) {
23+
defer leaktest.AfterTest(t)()
24+
s := log.ScopeWithoutShowLogs(t)
25+
defer s.Close(t)
26+
27+
rss11_100 := ReplicaSendStreamStats{
28+
Stream: kvflowcontrol.Stream{
29+
TenantID: roachpb.MustMakeTenantID(1),
30+
StoreID: 1,
31+
},
32+
ReplicaSendQueueStats: ReplicaSendQueueStats{
33+
SendQueueBytes: 100,
34+
},
35+
}
36+
rss22_200 := ReplicaSendStreamStats{
37+
Stream: kvflowcontrol.Stream{
38+
TenantID: roachpb.MustMakeTenantID(2),
39+
StoreID: 2,
40+
},
41+
ReplicaSendQueueStats: ReplicaSendQueueStats{
42+
SendQueueBytes: 200,
43+
},
44+
}
45+
rss33_50 := ReplicaSendStreamStats{
46+
Stream: kvflowcontrol.Stream{
47+
TenantID: roachpb.MustMakeTenantID(3),
48+
StoreID: 3,
49+
},
50+
ReplicaSendQueueStats: ReplicaSendQueueStats{
51+
SendQueueBytes: 50,
52+
},
53+
}
54+
rss44_25 := ReplicaSendStreamStats{
55+
Stream: kvflowcontrol.Stream{
56+
TenantID: roachpb.MustMakeTenantID(4),
57+
StoreID: 4,
58+
},
59+
ReplicaSendQueueStats: ReplicaSendQueueStats{
60+
SendQueueBytes: 25,
61+
},
62+
}
63+
sql := NewSendQueueLogger(2)
64+
var output redact.RedactableString
65+
sql.testingLog = func(ctx context.Context, buf *redact.StringBuilder) {
66+
if len(output) > 0 {
67+
output += "\n"
68+
}
69+
output += redact.Sprintf(sendQueueLogFormat, buf)
70+
}
71+
ctx := context.Background()
72+
coll, ok := sql.TryStartLog()
73+
74+
require.True(t, ok)
75+
// Empty stats are ok.
76+
stats := RangeSendStreamStats{}
77+
coll.ObserveRangeStats(&stats)
78+
stats = RangeSendStreamStats{
79+
internal: []ReplicaSendStreamStats{rss22_200, rss33_50},
80+
}
81+
coll.ObserveRangeStats(&stats)
82+
stats = RangeSendStreamStats{
83+
internal: []ReplicaSendStreamStats{rss11_100, rss22_200},
84+
}
85+
coll.ObserveRangeStats(&stats)
86+
stats = RangeSendStreamStats{
87+
internal: []ReplicaSendStreamStats{rss11_100},
88+
}
89+
coll.ObserveRangeStats(&stats)
90+
stats = RangeSendStreamStats{
91+
internal: []ReplicaSendStreamStats{rss33_50, rss44_25},
92+
}
93+
coll.ObserveRangeStats(&stats)
94+
sql.FinishLog(ctx, coll)
95+
96+
// Cannot log again immediately.
97+
_, ok = sql.TryStartLog()
98+
require.False(t, ok)
99+
// Call FinishLog again to ensure no logging happens.
100+
sql.FinishLog(ctx, SendQueueLoggerCollector{})
101+
102+
echotest.Require(t, string(output), datapathutils.TestDataPath(t, t.Name()+".txt"))
103+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
echo
2+
----
3+
send queues: t2/s2: 400 B, t1/s1: 200 B, + 125 B more bytes across 2 streams

pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/send_stream_stats

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,15 @@ NormalPri:
6262
# queue size for s2.
6363
send_stream_stats range_id=1 refresh=false
6464
----
65-
(n1,s1):1: is_state_replicate=true has_send_queue=false send_queue_size=+0 B / 0 entries
66-
(n2,s2):2: is_state_replicate=false has_send_queue=true send_queue_size=+0 B / 0 entries
67-
(n3,s3):3: is_state_replicate=true has_send_queue=true send_queue_size=+0 B / 0 entries
65+
(n1,s1):1: stream=t1/s1 is_state_replicate=true has_send_queue=false send_queue_size=+0 B / 0 entries
66+
(n2,s2):2: stream=t1/s2 is_state_replicate=false has_send_queue=true send_queue_size=+0 B / 0 entries
67+
(n3,s3):3: stream=t1/s3 is_state_replicate=true has_send_queue=true send_queue_size=+0 B / 0 entries
6868

6969
send_stream_stats range_id=1 refresh=true
7070
----
71-
(n1,s1):1: is_state_replicate=true has_send_queue=false send_queue_size=+0 B / 0 entries
72-
(n2,s2):2: is_state_replicate=false has_send_queue=true send_queue_size=+0 B / 0 entries
73-
(n3,s3):3: is_state_replicate=true has_send_queue=true send_queue_size=+2.0 MiB / 2 entries
71+
(n1,s1):1: stream=t1/s1 is_state_replicate=true has_send_queue=false send_queue_size=+0 B / 0 entries
72+
(n2,s2):2: stream=t1/s2 is_state_replicate=false has_send_queue=true send_queue_size=+0 B / 0 entries
73+
(n3,s3):3: stream=t1/s3 is_state_replicate=true has_send_queue=true send_queue_size=+2.0 MiB / 2 entries
7474

7575
# Next, add another entry, which will also be queued. We want to see the stats
7676
# tick over and update by themselves (refresh=false) by ticking the clock the
@@ -94,9 +94,9 @@ t1/s3: eval reg=+1.0 MiB/+16 MiB ela=-7.0 MiB/+8.0 MiB
9494
# The stats shouldn't have changed, as we haven't refreshed them via ticking.
9595
send_stream_stats range_id=1 refresh=false
9696
----
97-
(n1,s1):1: is_state_replicate=true has_send_queue=false send_queue_size=+0 B / 0 entries
98-
(n2,s2):2: is_state_replicate=false has_send_queue=true send_queue_size=+0 B / 0 entries
99-
(n3,s3):3: is_state_replicate=true has_send_queue=true send_queue_size=+2.0 MiB / 2 entries
97+
(n1,s1):1: stream=t1/s1 is_state_replicate=true has_send_queue=false send_queue_size=+0 B / 0 entries
98+
(n2,s2):2: stream=t1/s2 is_state_replicate=false has_send_queue=true send_queue_size=+0 B / 0 entries
99+
(n3,s3):3: stream=t1/s3 is_state_replicate=true has_send_queue=true send_queue_size=+2.0 MiB / 2 entries
100100

101101
tick duration=7s
102102
----
@@ -118,16 +118,16 @@ t1/s3: eval reg=+1.0 MiB/+16 MiB ela=-7.0 MiB/+8.0 MiB
118118
# The stats now should have been populated.
119119
send_stream_stats range_id=1 refresh=false
120120
----
121-
(n1,s1):1: is_state_replicate=true has_send_queue=false send_queue_size=+0 B / 0 entries
122-
(n2,s2):2: is_state_replicate=false has_send_queue=true send_queue_size=+0 B / 0 entries
123-
(n3,s3):3: is_state_replicate=true has_send_queue=true send_queue_size=+14 MiB / 3 entries
121+
(n1,s1):1: stream=t1/s1 is_state_replicate=true has_send_queue=false send_queue_size=+0 B / 0 entries
122+
(n2,s2):2: stream=t1/s2 is_state_replicate=false has_send_queue=true send_queue_size=+0 B / 0 entries
123+
(n3,s3):3: stream=t1/s3 is_state_replicate=true has_send_queue=true send_queue_size=+14 MiB / 3 entries
124124

125125
# Sanity check they are the same when refreshing.
126126
send_stream_stats range_id=1 refresh=true
127127
----
128-
(n1,s1):1: is_state_replicate=true has_send_queue=false send_queue_size=+0 B / 0 entries
129-
(n2,s2):2: is_state_replicate=false has_send_queue=true send_queue_size=+0 B / 0 entries
130-
(n3,s3):3: is_state_replicate=true has_send_queue=true send_queue_size=+14 MiB / 3 entries
128+
(n1,s1):1: stream=t1/s1 is_state_replicate=true has_send_queue=false send_queue_size=+0 B / 0 entries
129+
(n2,s2):2: stream=t1/s2 is_state_replicate=false has_send_queue=true send_queue_size=+0 B / 0 entries
130+
(n3,s3):3: stream=t1/s3 is_state_replicate=true has_send_queue=true send_queue_size=+14 MiB / 3 entries
131131

132132
close_rcs
133133
----

0 commit comments

Comments
 (0)