You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
storeliveness: refactor channel-based sendQueue with slices of slpb.Message
Previously, outgoing messages in the transport layer were queued using a
buffered channel (`messages chan slpb.Message`). Messages were processed
by pulling them one-by-one from the channel and batching them using a
timer-based approach.
This patch refactors the queue implementation to use a mutex-protected
slice (`msgs []slpb.Message`) instead of a channel. This change
simplifies the batching logic by allowing all queued messages to be
drained atomically in a single operation, rather than pulling them
individually from a channel. With this refactor, we also increase the
queue capacity to 100,000 messages, since the per-store receive queue
size of 10,000 messages (since the send queue is per-node and serves
multiple stores). The refactor also allows the batching mechansim to
use a "sleep-then-drain" approach when compared to the existing
timer-based approach. The timer-based approach had a subtle issue
where `processQueue` would block in a select statement waiting
on `q.messages` while batching, and when a new message was enqueued
(which signals `q.messages`), it would immediately wake up the
blocked goroutine, causing spikes in runnable goroutines.
The new `sendQueue` struct provides `Append()` to add messages, `Drain()`
to atomically retrieve all messages, and `Size()` to track the total
byte size of queued messages. The `processQueue` method now drains all
messages at once and sleeps for the batch duration, rather than using
the previous timer-based batching approach. By sleeping
first and then draining all messages atomically, we avoid the
aforementioned wake-up spikes and achieve better pacing behaviour.
Part of: #148210
Release note: None
0 commit comments