Skip to content

Commit 55edba2

Browse files
craig[bot]stevendanna
andcommitted
Merge #153870
153870: rangefeed: pop multiple events from queue in buffered sender r=tbg a=stevendanna This is a small optimization that is perhaps a step toward something like #152359. Epic: none Release note: None Co-authored-by: Steven Danna <[email protected]>
2 parents a97e76d + 81d192e commit 55edba2

File tree

5 files changed

+176
-69
lines changed

5 files changed

+176
-69
lines changed

pkg/kv/kvserver/rangefeed/buffered_sender.go

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ type BufferedSender struct {
7474
// Note that lockedMuxStream wraps the underlying grpc server stream, ensuring
7575
// thread safety.
7676
sender ServerStreamSender
77+
// sendBufSize is the number of items we pop from the queue at a time. Exposed for
78+
// testing.
79+
sendBufSize int
7780

7881
// queueMu protects the buffer queue.
7982
queueMu struct {
@@ -97,6 +100,8 @@ type BufferedSender struct {
97100
metrics *BufferedSenderMetrics
98101
}
99102

103+
const defaultSendBufSize = 64
104+
100105
type streamState int64
101106

102107
const (
@@ -148,8 +153,9 @@ func NewBufferedSender(
148153
sender ServerStreamSender, settings *cluster.Settings, bsMetrics *BufferedSenderMetrics,
149154
) *BufferedSender {
150155
bs := &BufferedSender{
151-
sender: sender,
152-
metrics: bsMetrics,
156+
sendBufSize: defaultSendBufSize,
157+
sender: sender,
158+
metrics: bsMetrics,
153159
}
154160
bs.notifyDataC = make(chan struct{}, 1)
155161
bs.queueMu.buffer = newEventQueue()
@@ -264,6 +270,8 @@ func (bs *BufferedSender) sendUnbuffered(ev *kvpb.MuxRangeFeedEvent) error {
264270
func (bs *BufferedSender) run(
265271
ctx context.Context, stopper *stop.Stopper, onError func(streamID int64),
266272
) error {
273+
eventsBuf := make([]sharedMuxEvent, 0, bs.sendBufSize)
274+
267275
for {
268276
select {
269277
case <-ctx.Done():
@@ -276,39 +284,47 @@ func (bs *BufferedSender) run(
276284
return nil
277285
case <-bs.notifyDataC:
278286
for {
279-
e, success := bs.popFront()
280-
if !success {
287+
eventsBuf = bs.popEvents(eventsBuf[:0], bs.sendBufSize)
288+
if len(eventsBuf) == 0 {
281289
break
282290
}
283291

284-
bs.metrics.BufferedSenderQueueSize.Dec(1)
285-
err := bs.sender.Send(e.ev)
286-
e.alloc.Release(ctx)
287-
if e.ev.Error != nil {
288-
onError(e.ev.StreamID)
289-
}
290-
if err != nil {
291-
return err
292+
bs.metrics.BufferedSenderQueueSize.Dec(int64(len(eventsBuf)))
293+
for _, evt := range eventsBuf {
294+
// TODO(ssd): This might be another location where we could transform
295+
// multiple events into BulkEvents. We can't just throw them all in a
296+
// bulk event though since we are processing events for different
297+
// streams here.
298+
err := bs.sender.Send(evt.ev)
299+
evt.alloc.Release(ctx)
300+
if evt.ev.Error != nil {
301+
onError(evt.ev.StreamID)
302+
}
303+
if err != nil {
304+
return err
305+
}
292306
}
307+
clear(eventsBuf) // Clear so referenced MuxRangeFeedEvents can be GC'd.
293308
}
294309
}
295310
}
296311
}
297312

298-
// popFront pops the front event from the buffer queue. It returns the event and
299-
// a boolean indicating if the event was successfully popped.
300-
func (bs *BufferedSender) popFront() (e sharedMuxEvent, success bool) {
313+
// popEvents appends up to eventsToPop events into dest, returning the appended slice.
314+
func (bs *BufferedSender) popEvents(dest []sharedMuxEvent, eventsToPop int) []sharedMuxEvent {
301315
bs.queueMu.Lock()
302316
defer bs.queueMu.Unlock()
303-
event, ok := bs.queueMu.buffer.popFront()
304-
if ok {
317+
dest = bs.queueMu.buffer.popFrontInto(dest, eventsToPop)
318+
319+
// Update accounting for everything we popped.
320+
for _, event := range dest {
305321
state, streamFound := bs.queueMu.byStream[event.ev.StreamID]
306322
if streamFound {
307323
state.queueItems--
308324
bs.queueMu.byStream[event.ev.StreamID] = state
309325
}
310326
}
311-
return event, ok
327+
return dest
312328
}
313329

314330
// addStream initializes the per-stream tracking for the given streamID.

pkg/kv/kvserver/rangefeed/buffered_sender_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,14 @@ func TestBufferedSenderOnOverflow(t *testing.T) {
7878
require.NoError(t, bs.sendBuffered(muxEv, nil))
7979
}
8080
require.Equal(t, queueCap, int64(bs.len()))
81-
e, success := bs.popFront()
81+
dest := bs.popEvents(nil, 1)
82+
require.Equal(t, 1, len(dest))
83+
e := dest[0]
8284
require.Equal(t, sharedMuxEvent{
8385
ev: muxEv,
8486
alloc: nil,
8587
}, e)
86-
require.True(t, success)
88+
8789
require.Equal(t, queueCap-1, int64(bs.len()))
8890
require.NoError(t, bs.sendBuffered(muxEv, nil))
8991
require.Equal(t, queueCap, int64(bs.len()))
@@ -212,6 +214,7 @@ func TestBufferedSenderOnOverflowMultiStream(t *testing.T) {
212214
queueCap := int64(24)
213215
RangefeedSingleBufferedSenderQueueMaxPerReg.Override(ctx, &st.SV, queueCap)
214216
bs := NewBufferedSender(testServerStream, st, NewBufferedSenderMetrics())
217+
bs.sendBufSize = 1 // Test requires knowing exactly when the queue will fill.
215218
require.Equal(t, queueCap, bs.queueMu.perStreamCapacity)
216219

217220
sm := NewStreamManager(bs, smMetrics)

pkg/kv/kvserver/rangefeed/event_queue.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,21 +79,38 @@ func (q *eventQueue) pushBack(e sharedMuxEvent) {
7979
q.size++
8080
}
8181

82-
func (q *eventQueue) popFront() (sharedMuxEvent, bool) {
83-
if q.size == 0 {
84-
return sharedMuxEvent{}, false
82+
// popFrontInto appends up to eventsToPop events into dest.
83+
func (q *eventQueue) popFrontInto(dest []sharedMuxEvent, eventsToPop int) []sharedMuxEvent {
84+
if eventsToPop == 0 || q.size == 0 {
85+
return dest
8586
}
87+
8688
if q.read == eventQueueChunkSize {
89+
assertTrue(q.first.nextChunk != nil, "nextChunk should be non-nil")
8790
removed := q.first
8891
q.first = q.first.nextChunk
8992
putPooledQueueChunk(removed)
9093
q.read = 0
9194
}
92-
res := q.first.data[q.read]
93-
q.first.data[q.read] = sharedMuxEvent{}
94-
q.read++
95-
q.size--
96-
return res, true
95+
96+
// We only read out of the current chunk. We could loop until we've reached
97+
// eventsToPop.
98+
availableInChunk := eventQueueChunkSize - q.read
99+
if q.first == q.last {
100+
// Last chunk - only up to write position.
101+
availableInChunk = q.write - q.read
102+
}
103+
104+
if eventsToPop > availableInChunk {
105+
eventsToPop = availableInChunk
106+
}
107+
108+
dest = append(dest, q.first.data[q.read:q.read+eventsToPop]...)
109+
clear(q.first.data[q.read : q.read+eventsToPop])
110+
111+
q.read += eventsToPop
112+
q.size -= eventsToPop
113+
return dest
97114
}
98115

99116
// free drops references held by the queue.

0 commit comments

Comments
 (0)