Skip to content

Commit 3135118

Browse files
craig[bot]stevendanna
andcommitted
Merge #154439
154439: rangefeed: move to per-registration limit in buffered sender r=wenyihu6 a=stevendanna This move the overall queue limit to a per-registration limit, more similar to the behaviour we had in the old unbuffered sender world. One potential advantage here is that it doesn't punish all connected registrations because of one high-traffic registrations, reducing the rate of catch-up scans needed. Epic: none Release note: None Co-authored-by: Steven Danna <[email protected]>
2 parents 220c440 + 14d273e commit 3135118

File tree

5 files changed

+150
-91
lines changed

5 files changed

+150
-91
lines changed

pkg/kv/kvserver/rangefeed/buffered_sender.go

Lines changed: 105 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ package rangefeed
77

88
import (
99
"context"
10+
"fmt"
1011
"time"
1112

1213
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1314
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
1415
"github.com/cockroachdb/cockroach/pkg/settings"
1516
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
17+
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
1618
"github.com/cockroachdb/cockroach/pkg/util/retry"
1719
"github.com/cockroachdb/cockroach/pkg/util/stop"
1820
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
@@ -48,38 +50,20 @@ import (
4850
// BufferedPerRangeEventSink.Send BufferedPerRangeEventSink.SendError
4951
//
5052

51-
// RangefeedSingleBufferedSenderQueueMaxSize is the maximum number of events
52-
// that the buffered sender will buffer before it starts returning capacity
53-
// exceeded errors. Updates to this setting are only applied to new
54-
// MuxRangefeedCalls. Existing streams will use the previous value until
53+
// RangefeedSingleBufferedSenderQueueMaxPerReg is the maximum number of events
54+
// that the buffered sender will buffer for a single registration (identified by
55+
// streamID). Existing MuxRangefeeds will use the previous value until
5556
// restarted.
5657
//
57-
// The default here has been arbitrarily chosen. Ideally,
58-
//
59-
// - We want to avoid capacity exceeded errors that wouldn't have occurred
60-
// when the buffered registrations were in use.
61-
//
62-
// - We don't want to drastically increase the amount of queueing allowed for a
63-
// single registration.
64-
//
65-
// A small buffer may be justified given that:
66-
//
67-
// - One buffered sender is feeding a single gRPC client, so scaling based on
68-
// registrations doesn't necessarily make sense. If the consumer is behind, it
69-
// is behind.
70-
//
71-
// - Events emitted during catchup scans have their own per-registration buffer
72-
// still.
73-
//
7458
// TODO(ssd): This is a bit of a stop-gap so that we have a knob to turn if we
7559
// need to. We probably want each buffered sender (or each consumerID) to be
7660
// able to hold up to some fraction of the total rangefeed budget. But we are
7761
// starting here for now.
78-
var RangefeedSingleBufferedSenderQueueMaxSize = settings.RegisterIntSetting(
62+
var RangefeedSingleBufferedSenderQueueMaxPerReg = settings.RegisterIntSetting(
7963
settings.SystemOnly,
80-
"kv.rangefeed.buffered_sender.queue_max_size",
81-
"max size of a buffered senders event queue (0 for no max)",
82-
kvserverbase.DefaultRangefeedEventCap*8,
64+
"kv.rangefeed.buffered_sender.per_registration_max_queue_size",
65+
"maximum number of events a single registration can have queued in the event queue (0 for no max)",
66+
kvserverbase.DefaultRangefeedEventCap*2,
8367
)
8468

8569
// BufferedSender is embedded in every rangefeed.BufferedPerRangeEventSink,
@@ -95,9 +79,10 @@ type BufferedSender struct {
9579
syncutil.Mutex
9680
stopped bool
9781
buffer *eventQueue
98-
// capacity is the maximum number of events that can be buffered.
99-
capacity int64
100-
overflowed bool
82+
// perStreamCapacity is the maximum number buffered events allowed per
83+
// stream.
84+
perStreamCapacity int64
85+
byStream map[int64]streamStatus
10186
}
10287

10388
// notifyDataC is used to notify the BufferedSender.run goroutine that there
@@ -111,6 +96,25 @@ type BufferedSender struct {
11196
metrics *BufferedSenderMetrics
11297
}
11398

99+
type streamState int64
100+
101+
const (
102+
// streamActive is the default state of the stream.
103+
streamActive streamState = iota
104+
// streamOverflowing is the state we are in when the stream has reached its
105+
// limit and is waiting to deliver an error.
106+
streamOverflowing streamState = iota
107+
// streamOverflowed means the stream has overflowed and the error has been
108+
// placed in the queue.
109+
streamOverflowed streamState = iota
110+
)
111+
112+
type streamStatus struct {
113+
// queueItems is the number of items for a given stream in the event queue.
114+
queueItems int64
115+
state streamState
116+
}
117+
114118
func NewBufferedSender(
115119
sender ServerStreamSender, settings *cluster.Settings, bsMetrics *BufferedSenderMetrics,
116120
) *BufferedSender {
@@ -121,7 +125,8 @@ func NewBufferedSender(
121125
bs.queueMu.buffer = newEventQueue()
122126
bs.notifyDataC = make(chan struct{}, 1)
123127
bs.queueMu.buffer = newEventQueue()
124-
bs.queueMu.capacity = RangefeedSingleBufferedSenderQueueMaxSize.Get(&settings.SV)
128+
bs.queueMu.perStreamCapacity = RangefeedSingleBufferedSenderQueueMaxPerReg.Get(&settings.SV)
129+
bs.queueMu.byStream = make(map[int64]streamStatus)
125130
return bs
126131
}
127132

@@ -137,13 +142,52 @@ func (bs *BufferedSender) sendBuffered(
137142
if bs.queueMu.stopped {
138143
return errors.New("stream sender is stopped")
139144
}
140-
if bs.queueMu.overflowed {
141-
return newRetryErrBufferCapacityExceeded()
142-
}
143-
if bs.queueMu.capacity > 0 && bs.queueMu.buffer.len() >= bs.queueMu.capacity {
144-
bs.queueMu.overflowed = true
145-
return newRetryErrBufferCapacityExceeded()
145+
146+
// Per-stream capacity limits. If the stream is already overflowed we drop the
147+
// request. If the stream has hit its limit, we return an error to the
148+
// registration. This error should be the next event that is sent to
149+
// stream.
150+
//
151+
// NB: The zero-value of streamStatus is the valid state of a newly seen
152+
// stream.
153+
status := bs.queueMu.byStream[ev.StreamID]
154+
switch status.state {
155+
case streamActive:
156+
if bs.queueMu.perStreamCapacity > 0 && status.queueItems == bs.queueMu.perStreamCapacity {
157+
if ev.Error != nil {
158+
// If _this_ event is an error, no use sending another error. This stream
159+
// is going down. Admit this error and mark the stream as overflowed.
160+
status.state = streamOverflowed
161+
} else {
162+
// This stream is at capacity, return an error to the registration that it
163+
// should send back to us after cleaning up.
164+
status.state = streamOverflowing
165+
return newRetryErrBufferCapacityExceeded()
166+
}
167+
}
168+
case streamOverflowing:
169+
// The unbufferedRegistration is the only component that sends non-error
170+
// events to our stream. In response to the error we return when moving to
171+
// stateOverflowing, it should immediately send us an error and mark itself
172+
// as disconnected. As a result, no non-error events are expected.
173+
if ev.Error == nil {
174+
panic("only error events expected after stream has exceeded capacity")
175+
}
176+
status.state = streamOverflowed
177+
case streamOverflowed:
178+
// If we are overflowed, we don't expect any further events because the
179+
// registration should have disconnected in response to the error.
180+
//
181+
// TODO(ssd): Consider adding an assertion here.
182+
return nil
183+
default:
184+
panic(fmt.Sprintf("unhandled stream state: %v", status.state))
146185
}
186+
187+
// We are admitting this event.
188+
status.queueItems++
189+
bs.queueMu.byStream[ev.StreamID] = status
190+
147191
// TODO(wenyihu6): pass an actual context here
148192
alloc.Use(context.Background())
149193
bs.queueMu.buffer.pushBack(sharedMuxEvent{ev, alloc})
@@ -179,10 +223,11 @@ func (bs *BufferedSender) run(
179223
return nil
180224
case <-bs.notifyDataC:
181225
for {
182-
e, success, overflowed, remains := bs.popFront()
226+
e, success := bs.popFront()
183227
if !success {
184228
break
185229
}
230+
186231
bs.metrics.BufferedSenderQueueSize.Dec(1)
187232
err := bs.sender.Send(e.ev)
188233
e.alloc.Release(ctx)
@@ -192,26 +237,33 @@ func (bs *BufferedSender) run(
192237
if err != nil {
193238
return err
194239
}
195-
if overflowed && remains == int64(0) {
196-
return newRetryErrBufferCapacityExceeded()
197-
}
198240
}
199241
}
200242
}
201243
}
202244

203245
// popFront pops the front event from the buffer queue. It returns the event and
204246
// a boolean indicating if the event was successfully popped.
205-
func (bs *BufferedSender) popFront() (
206-
e sharedMuxEvent,
207-
success bool,
208-
overflowed bool,
209-
remains int64,
210-
) {
247+
func (bs *BufferedSender) popFront() (e sharedMuxEvent, success bool) {
211248
bs.queueMu.Lock()
212249
defer bs.queueMu.Unlock()
213250
event, ok := bs.queueMu.buffer.popFront()
214-
return event, ok, bs.queueMu.overflowed, bs.queueMu.buffer.len()
251+
if ok {
252+
state, streamFound := bs.queueMu.byStream[event.ev.StreamID]
253+
if streamFound {
254+
state.queueItems -= 1
255+
bs.queueMu.byStream[event.ev.StreamID] = state
256+
} else {
257+
assumedUnreachable("event found in queue with no state in byStream")
258+
}
259+
}
260+
return event, ok
261+
}
262+
263+
func (bs *BufferedSender) removeStream(streamID int64) {
264+
bs.queueMu.Lock()
265+
defer bs.queueMu.Unlock()
266+
delete(bs.queueMu.byStream, streamID)
215267
}
216268

217269
// cleanup is called when the sender is stopped. It is expected to free up
@@ -222,6 +274,7 @@ func (bs *BufferedSender) cleanup(ctx context.Context) {
222274
bs.queueMu.stopped = true
223275
remaining := bs.queueMu.buffer.len()
224276
bs.queueMu.buffer.drain(ctx)
277+
bs.queueMu.byStream = nil
225278
bs.metrics.BufferedSenderQueueSize.Dec(remaining)
226279
}
227280

@@ -231,12 +284,6 @@ func (bs *BufferedSender) len() int {
231284
return int(bs.queueMu.buffer.len())
232285
}
233286

234-
func (bs *BufferedSender) overflowed() bool {
235-
bs.queueMu.Lock()
236-
defer bs.queueMu.Unlock()
237-
return bs.queueMu.overflowed
238-
}
239-
240287
// Used for testing only.
241288
func (bs *BufferedSender) waitForEmptyBuffer(ctx context.Context) error {
242289
opts := retry.Options{
@@ -258,3 +305,9 @@ func (bs *BufferedSender) waitForEmptyBuffer(ctx context.Context) error {
258305
}
259306
return errors.New("buffered sender failed to send in time")
260307
}
308+
309+
func assumedUnreachable(msg string) {
310+
if buildutil.CrdbTestBuild {
311+
panic(fmt.Sprintf("assumed unreachable code reached: %v", msg))
312+
}
313+
}

pkg/kv/kvserver/rangefeed/buffered_sender_test.go

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,9 @@ func TestBufferedSenderOnOverflow(t *testing.T) {
182182
st := cluster.MakeTestingClusterSettings()
183183

184184
queueCap := int64(24)
185-
RangefeedSingleBufferedSenderQueueMaxSize.Override(ctx, &st.SV, queueCap)
185+
RangefeedSingleBufferedSenderQueueMaxPerReg.Override(ctx, &st.SV, queueCap)
186186
bs := NewBufferedSender(testServerStream, st, NewBufferedSenderMetrics())
187-
require.Equal(t, queueCap, bs.queueMu.capacity)
187+
require.Equal(t, queueCap, bs.queueMu.perStreamCapacity)
188188

189189
val1 := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}}
190190
ev1 := new(kvpb.RangeFeedEvent)
@@ -195,26 +195,24 @@ func TestBufferedSenderOnOverflow(t *testing.T) {
195195
require.NoError(t, bs.sendBuffered(muxEv, nil))
196196
}
197197
require.Equal(t, queueCap, int64(bs.len()))
198-
e, success, overflowed, remains := bs.popFront()
198+
e, success := bs.popFront()
199199
require.Equal(t, sharedMuxEvent{
200200
ev: muxEv,
201201
alloc: nil,
202202
}, e)
203203
require.True(t, success)
204-
require.False(t, overflowed)
205-
require.Equal(t, queueCap-1, remains)
206204
require.Equal(t, queueCap-1, int64(bs.len()))
207205
require.NoError(t, bs.sendBuffered(muxEv, nil))
208206
require.Equal(t, queueCap, int64(bs.len()))
209207

210208
// Overflow now.
211-
require.Equal(t, bs.sendBuffered(muxEv, nil).Error(),
212-
newRetryErrBufferCapacityExceeded().Error())
209+
err := bs.sendBuffered(muxEv, nil)
210+
require.EqualError(t, err, newRetryErrBufferCapacityExceeded().Error())
213211
}
214212

215-
// TestBufferedSenderOnStreamShutdown tests that BufferedSender and
216-
// StreamManager handle overflow and shutdown properly.
217-
func TestBufferedSenderOnStreamShutdown(t *testing.T) {
213+
// TestBufferedSenderOnOverflowMultiStream tests that BufferedSender and
214+
// StreamManager handle overflow and stream removal.
215+
func TestBufferedSenderOnOverflowMultiStream(t *testing.T) {
218216
defer leaktest.AfterTest(t)()
219217
defer log.Scope(t).Close(t)
220218
ctx := context.Background()
@@ -226,9 +224,9 @@ func TestBufferedSenderOnStreamShutdown(t *testing.T) {
226224
st := cluster.MakeTestingClusterSettings()
227225

228226
queueCap := int64(24)
229-
RangefeedSingleBufferedSenderQueueMaxSize.Override(ctx, &st.SV, queueCap)
227+
RangefeedSingleBufferedSenderQueueMaxPerReg.Override(ctx, &st.SV, queueCap)
230228
bs := NewBufferedSender(testServerStream, st, NewBufferedSenderMetrics())
231-
require.Equal(t, queueCap, bs.queueMu.capacity)
229+
require.Equal(t, queueCap, bs.queueMu.perStreamCapacity)
232230

233231
sm := NewStreamManager(bs, smMetrics)
234232
require.NoError(t, sm.Start(ctx, stopper))
@@ -264,29 +262,31 @@ func TestBufferedSenderOnStreamShutdown(t *testing.T) {
264262
require.True(t, registered)
265263
sm.AddStream(streamID, d)
266264

267-
require.NoError(t, sm.sender.sendBuffered(muxEv, nil))
268-
// At this point we actually have sent 2 events. 1 checkpoint event sent by
269-
// register and 1 event sent on the line above. We wait for 1 of these events
270-
// to be pulled off the queue and block in the sender, leaving 1 in the queue.
265+
// Add a second stream to the stream manager.
266+
registered, d, _ = p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
267+
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
268+
sm.NewStream(streamID+1, 1 /*rangeID*/))
269+
require.True(t, registered)
270+
sm.AddStream(streamID+1, d)
271+
272+
// At this point we actually have sent 2 events, one for each checkpoint sent
273+
// by the registrations. One of these should get pulled off the queue and block.
271274
waitForQueueLen(1)
272275
// Now fill the rest of the queue.
273-
for range queueCap - 1 {
276+
for range queueCap {
274277
require.NoError(t, sm.sender.sendBuffered(muxEv, nil))
275278
}
276279

277280
// The next write should overflow.
278281
capExceededErrStr := newRetryErrBufferCapacityExceeded().Error()
279282
err := sm.sender.sendBuffered(muxEv, nil)
280283
require.EqualError(t, err, capExceededErrStr)
281-
require.True(t, bs.overflowed())
284+
285+
// A write to a different stream should be fine
286+
muxEv.StreamID = streamID + 2
287+
err = sm.sender.sendBuffered(muxEv, nil)
288+
require.NoError(t, err)
282289

283290
unblock()
284291
waitForQueueLen(0)
285-
// Overflow cleanup.
286-
err = <-sm.Error()
287-
require.EqualError(t, err, capExceededErrStr)
288-
// Note that we expect the stream manager to shut down here, but no actual
289-
// error events would be sent during the shutdown.
290-
err = sm.sender.sendBuffered(muxEv, nil)
291-
require.EqualError(t, err, capExceededErrStr)
292292
}

pkg/kv/kvserver/rangefeed/stream_manager.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ type sender interface {
7575
// background until a node level error is encountered which would shut down
7676
// all streams in StreamManager.
7777
run(ctx context.Context, stopper *stop.Stopper, onError func(int64)) error
78+
79+
// Remove stream is called when an individual stream is being removed.
80+
removeStream(streamID int64)
81+
7882
// cleanup is called when the sender is stopped. It is expected to clean up
7983
// any resources used by the sender.
8084
cleanup(ctx context.Context)
@@ -108,12 +112,16 @@ func (sm *StreamManager) NewStream(streamID int64, rangeID roachpb.RangeID) (sin
108112
// streamID to avoid metrics inaccuracy when the error is sent before the stream
109113
// is added to the StreamManager.
110114
func (sm *StreamManager) OnError(streamID int64) {
111-
sm.streams.Lock()
112-
defer sm.streams.Unlock()
113-
if _, ok := sm.streams.m[streamID]; ok {
114-
delete(sm.streams.m, streamID)
115-
sm.metrics.ActiveMuxRangeFeed.Dec(1)
116-
}
115+
func() {
116+
sm.streams.Lock()
117+
defer sm.streams.Unlock()
118+
if _, ok := sm.streams.m[streamID]; ok {
119+
// TODO(ssd): We should be able to assert we are disconnected here.
120+
delete(sm.streams.m, streamID)
121+
sm.metrics.ActiveMuxRangeFeed.Dec(1)
122+
}
123+
}()
124+
sm.sender.removeStream(streamID)
117125
}
118126

119127
// DisconnectStream disconnects the stream with the given streamID.

0 commit comments

Comments
 (0)