Skip to content

Commit a6a0769

Browse files
committed
rac2: add kvflowcontrol.Stream to ReplicaSendStreamStats
Informs #157793 Epic: none Release note: None
1 parent 86eb0d0 commit a6a0769

File tree

3 files changed

+29
-16
lines changed

3 files changed

+29
-16
lines changed

pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,8 @@ func (q *RangeSendQueueStats) Clear() {
320320
// ReplicaSendStreamStats contains the stats for a replica send stream that may
321321
// be used to inform placement decisions pertaining to the replica.
322322
type ReplicaSendStreamStats struct {
323+
// Stream is the flow control stream for the replica.
324+
Stream kvflowcontrol.Stream
323325
// IsStateReplicate is true iff the replica is being sent entries.
324326
IsStateReplicate bool
325327
// HasSendQueue is true when a replica has a non-zero amount of queued
@@ -1721,6 +1723,7 @@ func (rc *rangeController) SendStreamStats(statsToSet *RangeSendStreamStats) {
17211723
// end up overwriting the same state at most twice, not a big issue.
17221724
for _, vs := range vss {
17231725
stats := ReplicaSendStreamStats{
1726+
Stream: vs.stateForWaiters.evalTokenCounter.stream,
17241727
IsStateReplicate: vs.isStateReplicate,
17251728
HasSendQueue: vs.hasSendQ,
17261729
}
@@ -1731,6 +1734,7 @@ func (rc *rangeController) SendStreamStats(statsToSet *RangeSendStreamStats) {
17311734
// Now handle the non-voters.
17321735
for _, nv := range rc.mu.nonVoterSet {
17331736
stats := ReplicaSendStreamStats{
1737+
Stream: nv.evalTokenCounter.stream,
17341738
IsStateReplicate: nv.isStateReplicate,
17351739
HasSendQueue: nv.hasSendQ,
17361740
}

pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1580,8 +1580,9 @@ func TestRangeController(t *testing.T) {
15801580
for _, repl := range sortReplicas(r) {
15811581
replStats, ok := stats.ReplicaSendStreamStats(repl.ReplicaID)
15821582
require.True(t, ok)
1583-
buf.WriteString(fmt.Sprintf("%v: is_state_replicate=%-5v has_send_queue=%-5v send_queue_size=%v / %v entries\n",
1583+
buf.WriteString(fmt.Sprintf("%v: stream=%v is_state_replicate=%-5v has_send_queue=%-5v send_queue_size=%v / %v entries\n",
15841584
repl,
1585+
replStats.Stream,
15851586
replStats.IsStateReplicate,
15861587
replStats.HasSendQueue,
15871588
// Cast for formatting.
@@ -2476,6 +2477,10 @@ func TestRangeSendStreamStatsString(t *testing.T) {
24762477
stats := RangeSendStreamStats{
24772478
internal: []ReplicaSendStreamStats{
24782479
{
2480+
Stream: kvflowcontrol.Stream{
2481+
TenantID: roachpb.MustMakeTenantID(1),
2482+
StoreID: roachpb.StoreID(1),
2483+
},
24792484
IsStateReplicate: false,
24802485
HasSendQueue: true,
24812486
ReplicaSendQueueStats: ReplicaSendQueueStats{
@@ -2485,6 +2490,10 @@ func TestRangeSendStreamStatsString(t *testing.T) {
24852490
},
24862491
},
24872492
{
2493+
Stream: kvflowcontrol.Stream{
2494+
TenantID: roachpb.MustMakeTenantID(2),
2495+
StoreID: roachpb.StoreID(2),
2496+
},
24882497
IsStateReplicate: true,
24892498
HasSendQueue: false,
24902499
ReplicaSendQueueStats: ReplicaSendQueueStats{

pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/send_stream_stats

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,15 @@ NormalPri:
6262
# queue size for s2.
6363
send_stream_stats range_id=1 refresh=false
6464
----
65-
(n1,s1):1: is_state_replicate=true has_send_queue=false send_queue_size=+0 B / 0 entries
66-
(n2,s2):2: is_state_replicate=false has_send_queue=true send_queue_size=+0 B / 0 entries
67-
(n3,s3):3: is_state_replicate=true has_send_queue=true send_queue_size=+0 B / 0 entries
65+
(n1,s1):1: stream=t1/s1 is_state_replicate=true has_send_queue=false send_queue_size=+0 B / 0 entries
66+
(n2,s2):2: stream=t1/s2 is_state_replicate=false has_send_queue=true send_queue_size=+0 B / 0 entries
67+
(n3,s3):3: stream=t1/s3 is_state_replicate=true has_send_queue=true send_queue_size=+0 B / 0 entries
6868

6969
send_stream_stats range_id=1 refresh=true
7070
----
71-
(n1,s1):1: is_state_replicate=true has_send_queue=false send_queue_size=+0 B / 0 entries
72-
(n2,s2):2: is_state_replicate=false has_send_queue=true send_queue_size=+0 B / 0 entries
73-
(n3,s3):3: is_state_replicate=true has_send_queue=true send_queue_size=+2.0 MiB / 2 entries
71+
(n1,s1):1: stream=t1/s1 is_state_replicate=true has_send_queue=false send_queue_size=+0 B / 0 entries
72+
(n2,s2):2: stream=t1/s2 is_state_replicate=false has_send_queue=true send_queue_size=+0 B / 0 entries
73+
(n3,s3):3: stream=t1/s3 is_state_replicate=true has_send_queue=true send_queue_size=+2.0 MiB / 2 entries
7474

7575
# Next, add another entry, which will also be queued. We want to see the stats
7676
# tick over and update by themselves (refresh=false) by ticking the clock the
@@ -94,9 +94,9 @@ t1/s3: eval reg=+1.0 MiB/+16 MiB ela=-7.0 MiB/+8.0 MiB
9494
# The stats shouldn't have changed, as we haven't refreshed them via ticking.
9595
send_stream_stats range_id=1 refresh=false
9696
----
97-
(n1,s1):1: is_state_replicate=true has_send_queue=false send_queue_size=+0 B / 0 entries
98-
(n2,s2):2: is_state_replicate=false has_send_queue=true send_queue_size=+0 B / 0 entries
99-
(n3,s3):3: is_state_replicate=true has_send_queue=true send_queue_size=+2.0 MiB / 2 entries
97+
(n1,s1):1: stream=t1/s1 is_state_replicate=true has_send_queue=false send_queue_size=+0 B / 0 entries
98+
(n2,s2):2: stream=t1/s2 is_state_replicate=false has_send_queue=true send_queue_size=+0 B / 0 entries
99+
(n3,s3):3: stream=t1/s3 is_state_replicate=true has_send_queue=true send_queue_size=+2.0 MiB / 2 entries
100100

101101
tick duration=7s
102102
----
@@ -118,16 +118,16 @@ t1/s3: eval reg=+1.0 MiB/+16 MiB ela=-7.0 MiB/+8.0 MiB
118118
# The stats now should have been populated.
119119
send_stream_stats range_id=1 refresh=false
120120
----
121-
(n1,s1):1: is_state_replicate=true has_send_queue=false send_queue_size=+0 B / 0 entries
122-
(n2,s2):2: is_state_replicate=false has_send_queue=true send_queue_size=+0 B / 0 entries
123-
(n3,s3):3: is_state_replicate=true has_send_queue=true send_queue_size=+14 MiB / 3 entries
121+
(n1,s1):1: stream=t1/s1 is_state_replicate=true has_send_queue=false send_queue_size=+0 B / 0 entries
122+
(n2,s2):2: stream=t1/s2 is_state_replicate=false has_send_queue=true send_queue_size=+0 B / 0 entries
123+
(n3,s3):3: stream=t1/s3 is_state_replicate=true has_send_queue=true send_queue_size=+14 MiB / 3 entries
124124

125125
# Sanity check they are the same when refreshing.
126126
send_stream_stats range_id=1 refresh=true
127127
----
128-
(n1,s1):1: is_state_replicate=true has_send_queue=false send_queue_size=+0 B / 0 entries
129-
(n2,s2):2: is_state_replicate=false has_send_queue=true send_queue_size=+0 B / 0 entries
130-
(n3,s3):3: is_state_replicate=true has_send_queue=true send_queue_size=+14 MiB / 3 entries
128+
(n1,s1):1: stream=t1/s1 is_state_replicate=true has_send_queue=false send_queue_size=+0 B / 0 entries
129+
(n2,s2):2: stream=t1/s2 is_state_replicate=false has_send_queue=true send_queue_size=+0 B / 0 entries
130+
(n3,s3):3: stream=t1/s3 is_state_replicate=true has_send_queue=true send_queue_size=+14 MiB / 3 entries
131131

132132
close_rcs
133133
----

0 commit comments

Comments
 (0)