Skip to content

Commit 81d192e

Browse files
committed
rangefeed: pop multiple events from queue in buffered sender
This is a small optimization that is perhaps a step toward something like #152359. Epic: none Release note: None
1 parent 9bcc3fd commit 81d192e

File tree

5 files changed

+176
-69
lines changed

5 files changed

+176
-69
lines changed

pkg/kv/kvserver/rangefeed/buffered_sender.go

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ type BufferedSender struct {
7373
// Note that lockedMuxStream wraps the underlying grpc server stream, ensuring
7474
// thread safety.
7575
sender ServerStreamSender
76+
// sendBufSize is the number of items we pop from the queue at a time. Exposed for
77+
// testing.
78+
sendBufSize int
7679

7780
// queueMu protects the buffer queue.
7881
queueMu struct {
@@ -96,6 +99,8 @@ type BufferedSender struct {
9699
metrics *BufferedSenderMetrics
97100
}
98101

102+
const defaultSendBufSize = 64
103+
99104
type streamState int64
100105

101106
const (
@@ -119,8 +124,9 @@ func NewBufferedSender(
119124
sender ServerStreamSender, settings *cluster.Settings, bsMetrics *BufferedSenderMetrics,
120125
) *BufferedSender {
121126
bs := &BufferedSender{
122-
sender: sender,
123-
metrics: bsMetrics,
127+
sendBufSize: defaultSendBufSize,
128+
sender: sender,
129+
metrics: bsMetrics,
124130
}
125131
bs.notifyDataC = make(chan struct{}, 1)
126132
bs.queueMu.buffer = newEventQueue()
@@ -219,6 +225,8 @@ func (bs *BufferedSender) sendUnbuffered(ev *kvpb.MuxRangeFeedEvent) error {
219225
func (bs *BufferedSender) run(
220226
ctx context.Context, stopper *stop.Stopper, onError func(streamID int64),
221227
) error {
228+
eventsBuf := make([]sharedMuxEvent, 0, bs.sendBufSize)
229+
222230
for {
223231
select {
224232
case <-ctx.Done():
@@ -231,39 +239,47 @@ func (bs *BufferedSender) run(
231239
return nil
232240
case <-bs.notifyDataC:
233241
for {
234-
e, success := bs.popFront()
235-
if !success {
242+
eventsBuf = bs.popEvents(eventsBuf[:0], bs.sendBufSize)
243+
if len(eventsBuf) == 0 {
236244
break
237245
}
238246

239-
bs.metrics.BufferedSenderQueueSize.Dec(1)
240-
err := bs.sender.Send(e.ev)
241-
e.alloc.Release(ctx)
242-
if e.ev.Error != nil {
243-
onError(e.ev.StreamID)
244-
}
245-
if err != nil {
246-
return err
247+
bs.metrics.BufferedSenderQueueSize.Dec(int64(len(eventsBuf)))
248+
for _, evt := range eventsBuf {
249+
// TODO(ssd): This might be another location where we could transform
250+
// multiple events into BulkEvents. We can't just throw them all in a
251+
// bulk event though since we are processing events for different
252+
// streams here.
253+
err := bs.sender.Send(evt.ev)
254+
evt.alloc.Release(ctx)
255+
if evt.ev.Error != nil {
256+
onError(evt.ev.StreamID)
257+
}
258+
if err != nil {
259+
return err
260+
}
247261
}
262+
clear(eventsBuf) // Clear so referenced MuxRangeFeedEvents can be GC'd.
248263
}
249264
}
250265
}
251266
}
252267

253-
// popFront pops the front event from the buffer queue. It returns the event and
254-
// a boolean indicating if the event was successfully popped.
255-
func (bs *BufferedSender) popFront() (e sharedMuxEvent, success bool) {
268+
// popEvents appends up to eventsToPop events into dest, returning the appended slice.
269+
func (bs *BufferedSender) popEvents(dest []sharedMuxEvent, eventsToPop int) []sharedMuxEvent {
256270
bs.queueMu.Lock()
257271
defer bs.queueMu.Unlock()
258-
event, ok := bs.queueMu.buffer.popFront()
259-
if ok {
272+
dest = bs.queueMu.buffer.popFrontInto(dest, eventsToPop)
273+
274+
// Update accounting for everything we popped.
275+
for _, event := range dest {
260276
state, streamFound := bs.queueMu.byStream[event.ev.StreamID]
261277
if streamFound {
262278
state.queueItems--
263279
bs.queueMu.byStream[event.ev.StreamID] = state
264280
}
265281
}
266-
return event, ok
282+
return dest
267283
}
268284

269285
// addStream initializes the per-stream tracking for the given streamID.

pkg/kv/kvserver/rangefeed/buffered_sender_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,12 +198,14 @@ func TestBufferedSenderOnOverflow(t *testing.T) {
198198
require.NoError(t, bs.sendBuffered(muxEv, nil))
199199
}
200200
require.Equal(t, queueCap, int64(bs.len()))
201-
e, success := bs.popFront()
201+
dest := bs.popEvents(nil, 1)
202+
require.Equal(t, 1, len(dest))
203+
e := dest[0]
202204
require.Equal(t, sharedMuxEvent{
203205
ev: muxEv,
204206
alloc: nil,
205207
}, e)
206-
require.True(t, success)
208+
207209
require.Equal(t, queueCap-1, int64(bs.len()))
208210
require.NoError(t, bs.sendBuffered(muxEv, nil))
209211
require.Equal(t, queueCap, int64(bs.len()))
@@ -229,6 +231,7 @@ func TestBufferedSenderOnOverflowMultiStream(t *testing.T) {
229231
queueCap := int64(24)
230232
RangefeedSingleBufferedSenderQueueMaxPerReg.Override(ctx, &st.SV, queueCap)
231233
bs := NewBufferedSender(testServerStream, st, NewBufferedSenderMetrics())
234+
bs.sendBufSize = 1 // Test requires knowing exactly when the queue will fill.
232235
require.Equal(t, queueCap, bs.queueMu.perStreamCapacity)
233236

234237
sm := NewStreamManager(bs, smMetrics)

pkg/kv/kvserver/rangefeed/event_queue.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,21 +79,38 @@ func (q *eventQueue) pushBack(e sharedMuxEvent) {
7979
q.size++
8080
}
8181

82-
func (q *eventQueue) popFront() (sharedMuxEvent, bool) {
83-
if q.size == 0 {
84-
return sharedMuxEvent{}, false
82+
// popFrontInto appends up to eventsToPop events into dest.
83+
func (q *eventQueue) popFrontInto(dest []sharedMuxEvent, eventsToPop int) []sharedMuxEvent {
84+
if eventsToPop == 0 || q.size == 0 {
85+
return dest
8586
}
87+
8688
if q.read == eventQueueChunkSize {
89+
assertTrue(q.first.nextChunk != nil, "nextChunk should be non-nil")
8790
removed := q.first
8891
q.first = q.first.nextChunk
8992
putPooledQueueChunk(removed)
9093
q.read = 0
9194
}
92-
res := q.first.data[q.read]
93-
q.first.data[q.read] = sharedMuxEvent{}
94-
q.read++
95-
q.size--
96-
return res, true
95+
96+
// We only read out of the current chunk. We could loop until we've reached
97+
// eventsToPop.
98+
availableInChunk := eventQueueChunkSize - q.read
99+
if q.first == q.last {
100+
// Last chunk - only up to write position.
101+
availableInChunk = q.write - q.read
102+
}
103+
104+
if eventsToPop > availableInChunk {
105+
eventsToPop = availableInChunk
106+
}
107+
108+
dest = append(dest, q.first.data[q.read:q.read+eventsToPop]...)
109+
clear(q.first.data[q.read : q.read+eventsToPop])
110+
111+
q.read += eventsToPop
112+
q.size -= eventsToPop
113+
return dest
97114
}
98115

99116
// free drops references held by the queue.

0 commit comments

Comments
 (0)