Skip to content

Commit c9eda2b

Browse files
craig[bot]dodeca12
andcommitted
Merge #156829
156829: storeliveness: refactor channel-based `sendQueue` with slices of `slpb.Message` r=iskettaneh,miraradeva a=dodeca12 Previously, outgoing messages in the transport layer were queued using a buffered channel (`messages chan slpb.Message`). Messages were processed by pulling them one-by-one from the channel and batching them using a timer-based approach. This patch refactors the queue implementation to use a mutex-protected slice (`msgs []slpb.Message`) instead of a channel. This change simplifies the batching logic by allowing all queued messages to be drained atomically in a single operation, rather than pulling them individually from a channel. With this refactor, we also increase the queue capacity to 100,000 messages, since the per-store receive queue size of 10,000 messages (since the send queue is per-node and serves multiple stores). The refactor also allows the batching mechansim to use a "sleep-then-drain" approach when compared to the existing timer-based approach. The timer-based approach had a subtle issue where `processQueue` would block in a select statement waiting on `q.messages` while batching, and when a new message was enqueued (which signals `q.messages`), it would immediately wake up the blocked goroutine, causing spikes in runnable goroutines. The new `sendQueue` struct provides `Append()` to add messages, `Drain()` to atomically retrieve all messages, and `Size()` to track the total byte size of queued messages. The `processQueue` method now drains all messages at once and sleeps for the batch duration, rather than using the previous timer-based batching approach. By sleeping first and then draining all messages atomically, we avoid the aforementioned wake-up spikes and achieve better pacing behaviour. Additionally, there's a minor renaming of the `SendAsync` function to `EnqueueMessage` to match the semantics of the implementation of the function (especially in later commits). Part of: #148210 Release note: None Co-authored-by: Swapneeth Gorantla <[email protected]>
2 parents 82eca8e + 0e6be32 commit c9eda2b

File tree

4 files changed

+128
-66
lines changed

4 files changed

+128
-66
lines changed

pkg/kv/kvserver/storeliveness/support_manager.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ var Enabled = settings.RegisterBoolSetting(
3434
// MessageSender is the interface that defines how Store Liveness messages are
3535
// sent. Transport is the production implementation of MessageSender.
3636
type MessageSender interface {
37-
SendAsync(ctx context.Context, msg slpb.Message) (sent bool)
37+
EnqueueMessage(ctx context.Context, msg slpb.Message) (sent bool)
3838
}
3939

4040
// SupportManager orchestrates requesting and providing Store Liveness support.
@@ -323,7 +323,7 @@ func (sm *SupportManager) sendHeartbeats(ctx context.Context) {
323323
// Send heartbeats to each remote store.
324324
successes := 0
325325
for _, msg := range heartbeats {
326-
if sent := sm.sender.SendAsync(ctx, msg); sent {
326+
if sent := sm.sender.EnqueueMessage(ctx, msg); sent {
327327
successes++
328328
} else {
329329
log.KvExec.Warningf(ctx, "failed to send heartbeat to store %+v", msg.To)
@@ -424,15 +424,15 @@ func (sm *SupportManager) handleMessages(ctx context.Context, msgs []*slpb.Messa
424424
sm.metrics.SupportForStores.Update(int64(sm.supporterStateHandler.getNumSupportFor()))
425425

426426
for _, response := range responses {
427-
_ = sm.sender.SendAsync(ctx, response)
427+
_ = sm.sender.EnqueueMessage(ctx, response)
428428
}
429429
log.KvExec.VInfof(ctx, 2, "sent %d heartbeat responses", len(responses))
430430
}
431431

432432
// maxReceiveQueueSize is the maximum number of messages the receive queue can
433433
// store. If message consumption is slow (e.g. due to a disk stall) and the
434434
// queue reaches maxReceiveQueueSize, incoming messages will be dropped.
435-
const maxReceiveQueueSize = 10000
435+
const maxReceiveQueueSize = 10_000
436436

437437
var receiveQueueSizeLimitReachedErr = errors.Errorf("store liveness receive queue is full")
438438

pkg/kv/kvserver/storeliveness/testutils.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ type testMessageSender struct {
9494
messages []slpb.Message
9595
}
9696

97-
func (tms *testMessageSender) SendAsync(_ context.Context, msg slpb.Message) (sent bool) {
97+
func (tms *testMessageSender) EnqueueMessage(_ context.Context, msg slpb.Message) (sent bool) {
9898
tms.mu.Lock()
9999
defer tms.mu.Unlock()
100100
tms.messages = append(tms.messages, msg)

pkg/kv/kvserver/storeliveness/transport.go

Lines changed: 103 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,12 @@ import (
2525
)
2626

2727
const (
28-
// Outgoing messages are queued per-node on a channel of this size.
29-
sendBufferSize = 1000
28+
// Outgoing messages are queued per-node on a slice of this size.
29+
// This is set to 100_000 because the receiver side has
30+
// maxReceiveQueueSize = 10_000 per store.
31+
// Since this is a per-node queue that may handle messages for multiple
32+
// stores, it should be at least 10x the per-store queue size.
33+
maxSendQueueSize = 100_000
3034

3135
// When no message has been queued for this duration, the corresponding
3236
// instance of processQueue will shut down.
@@ -52,9 +56,51 @@ type MessageHandler interface {
5256
HandleMessage(msg *slpb.Message) error
5357
}
5458

59+
var sendQueueSizeLimitReachedErr = errors.Errorf("store liveness send queue is full")
60+
5561
// sendQueue is a queue of outgoing Messages.
5662
type sendQueue struct {
57-
messages chan slpb.Message
63+
// msgs is a slice of messages that are queued to be sent.
64+
mu struct {
65+
syncutil.Mutex
66+
msgs []slpb.Message
67+
// size is the total size in bytes of the messages in the queue.
68+
size int64
69+
}
70+
// directSend is the channel used to signal the processQueue goroutine.
71+
directSend chan struct{}
72+
}
73+
74+
func newSendQueue() sendQueue {
75+
return sendQueue{
76+
directSend: make(chan struct{}, 1),
77+
}
78+
}
79+
80+
func (q *sendQueue) append(msg slpb.Message) error {
81+
q.mu.Lock()
82+
defer q.mu.Unlock()
83+
// Drop messages if maxSendQueueSize is reached.
84+
if len(q.mu.msgs) >= maxSendQueueSize {
85+
return sendQueueSizeLimitReachedErr
86+
}
87+
q.mu.msgs = append(q.mu.msgs, msg)
88+
q.mu.size += int64(msg.Size())
89+
select {
90+
case q.directSend <- struct{}{}:
91+
default:
92+
}
93+
return nil
94+
}
95+
96+
func (q *sendQueue) drain() ([]slpb.Message, int64) {
97+
q.mu.Lock()
98+
defer q.mu.Unlock()
99+
msgs := q.mu.msgs
100+
q.mu.msgs = nil
101+
size := q.mu.size
102+
q.mu.size = 0
103+
return msgs, size
58104
}
59105

60106
// Transport handles the RPC messages for Store Liveness.
@@ -213,14 +259,14 @@ func (t *Transport) handleMessage(ctx context.Context, msg *slpb.Message) {
213259
t.metrics.MessagesReceived.Inc(1)
214260
}
215261

216-
// SendAsync implements the MessageSender interface. It sends a message to the
262+
// EnqueueMessage implements the MessageSender interface. It sends a message to the
217263
// recipient specified in the request, and returns false if the outgoing queue
218264
// is full or the node dialer's circuit breaker has tripped.
219265
//
220266
// The returned bool may be a false positive but will never be a false negative;
221267
// if sent is true the message may or may not actually be sent but if it's false
222268
// the message definitely was not sent.
223-
func (t *Transport) SendAsync(ctx context.Context, msg slpb.Message) (enqueued bool) {
269+
func (t *Transport) EnqueueMessage(ctx context.Context, msg slpb.Message) (enqueued bool) {
224270
toNodeID := msg.To.NodeID
225271
fromNodeID := msg.From.NodeID
226272
// If this is a message from one local store to another local store, do not
@@ -248,12 +294,8 @@ func (t *Transport) SendAsync(ctx context.Context, msg slpb.Message) (enqueued b
248294
}
249295
}
250296

251-
select {
252-
case q.messages <- msg:
253-
t.metrics.SendQueueSize.Inc(1)
254-
t.metrics.SendQueueBytes.Inc(int64(msg.Size()))
255-
return true
256-
default:
297+
msgSize := int64(msg.Size())
298+
if err := q.append(msg); err != nil {
257299
if logQueueFullEvery.ShouldLog() {
258300
log.KvExec.Warningf(
259301
t.AnnotateCtx(context.Background()),
@@ -263,14 +305,18 @@ func (t *Transport) SendAsync(ctx context.Context, msg slpb.Message) (enqueued b
263305
t.metrics.MessagesSendDropped.Inc(1)
264306
return false
265307
}
308+
309+
t.metrics.SendQueueSize.Inc(1)
310+
t.metrics.SendQueueBytes.Inc(msgSize)
311+
return true
266312
}
267313

268314
// getQueue returns the queue for the specified node ID and a boolean
269315
// indicating whether the queue already exists (true) or was created (false).
270316
func (t *Transport) getQueue(nodeID roachpb.NodeID) (*sendQueue, bool) {
271317
queue, ok := t.queues.Load(nodeID)
272318
if !ok {
273-
q := sendQueue{messages: make(chan slpb.Message, sendBufferSize)}
319+
q := newSendQueue()
274320
queue, ok = t.queues.LoadOrStore(nodeID, &q)
275321
}
276322
return queue, ok
@@ -287,21 +333,18 @@ func (t *Transport) startProcessNewQueue(
287333
) (started bool) {
288334
cleanup := func() {
289335
q, ok := t.getQueue(toNodeID)
336+
if !ok {
337+
return
338+
}
290339
t.queues.Delete(toNodeID)
291-
// Account for all remaining messages in the queue. SendAsync may be
340+
// Account for all remaining messages in the queue. EnqueueMessage may be
292341
// writing to the queue concurrently, so it's possible that we won't
293342
// account for a few messages below.
294-
if ok {
295-
for {
296-
select {
297-
case m := <-q.messages:
298-
t.metrics.MessagesSendDropped.Inc(1)
299-
t.metrics.SendQueueSize.Dec(1)
300-
t.metrics.SendQueueBytes.Dec(int64(m.Size()))
301-
default:
302-
return
303-
}
304-
}
343+
msgs, msgsSize := q.drain()
344+
if len(msgs) > 0 {
345+
t.metrics.MessagesSendDropped.Inc(int64(len(msgs)))
346+
t.metrics.SendQueueSize.Dec(int64(len(msgs)))
347+
t.metrics.SendQueueBytes.Dec(msgsSize)
305348
}
306349
}
307350
worker := func(ctx context.Context) {
@@ -362,10 +405,17 @@ func (t *Transport) processQueue(
362405
}
363406
var idleTimer timeutil.Timer
364407
defer idleTimer.Stop()
365-
var batchTimer timeutil.Timer
366-
defer batchTimer.Stop()
367408
batch := &slpb.MessageBatch{}
368409
for {
410+
// Drain the timer channel before resetting to avoid receiving stale
411+
// timeout signals. This can happen if the timer fires while we're
412+
// processing messages (e.g., during the batchDuration sleep).
413+
if !idleTimer.Stop() {
414+
select {
415+
case <-idleTimer.C:
416+
default:
417+
}
418+
}
369419
idleTimer.Reset(getIdleTimeout())
370420
select {
371421
case <-t.stopper.ShouldQuiesce():
@@ -375,23 +425,34 @@ func (t *Transport) processQueue(
375425
t.metrics.SendQueueIdle.Inc(1)
376426
return nil
377427

378-
case msg := <-q.messages:
379-
batch.Messages = append(batch.Messages, msg)
380-
t.metrics.SendQueueSize.Dec(1)
381-
t.metrics.SendQueueBytes.Dec(int64(msg.Size()))
382-
383-
// Pull off as many queued requests as possible within batchDuration.
384-
batchTimer.Reset(batchDuration)
385-
for done := false; !done; {
386-
select {
387-
case msg = <-q.messages:
388-
batch.Messages = append(batch.Messages, msg)
389-
t.metrics.SendQueueSize.Dec(1)
390-
t.metrics.SendQueueBytes.Dec(int64(msg.Size()))
391-
case <-batchTimer.C:
392-
done = true
393-
}
428+
case <-q.directSend:
429+
// Sleep for batchDuration to batch messages, then drain all accumulated
430+
// messages at once. We use sleep-then-drain instead of a timer-based
431+
// batching mechanism (e.g., select between timer and message channel)
432+
// to avoid frequent goroutine wake-ups.
433+
//
434+
// With timer-based batching, the goroutine would wake up on each message
435+
// arrival during the batching window.
436+
// This creates a pattern where the goroutine wakes up repeatedly
437+
// during batching, causing spikes in runnable goroutines and
438+
// suboptimal scheduling behaviour.
439+
//
440+
// By sleeping first and then draining all messages in a single operation,
441+
// we ensure the goroutine wakes up only once per batch period, after
442+
// the batching window has elapsed. This keeps the runnable goroutine
443+
// count stable and reduces scheduling overhead.
444+
time.Sleep(batchDuration)
445+
select {
446+
case <-q.directSend:
447+
default:
448+
}
449+
var batchMessagesSize int64
450+
batch.Messages, batchMessagesSize = q.drain()
451+
if len(batch.Messages) == 0 {
452+
continue
394453
}
454+
t.metrics.SendQueueSize.Dec(int64(len(batch.Messages)))
455+
t.metrics.SendQueueBytes.Dec(batchMessagesSize)
395456

396457
batch.Now = t.clock.NowAsClockTimestamp()
397458
if err = stream.Send(batch); err != nil {

0 commit comments

Comments
 (0)