Skip to content

Commit 0eccabf

Browse files
craig[bot]wenyihu6stevendannaspilchenjasonlmfong
committed
153740: kvserver/rangefeed: add capacity to node level buffered sender r=stevendanna a=wenyihu6 Resolves: #152505 Release note: none --- **kvserver/rangefeed: add capacity to node level buffered sender** This patch adds capacity to node level buffered sender which will shut down all registrations if the node level buffer had overflowed. Part of: #129813 Release note: none --- **kvserver/rangefeed: rename overflow to overflowed** This commit renames overflow to overflowed in BufferedSender for clarity. --- **kvserver/rangefeed: dynamically adjust capacity per buffered sender** Previously, buffered sender capacity was configured via a cluster setting. This commit removes that setting and makes capacity scale dynamically with the number of active registrations. The minimum (and default) is 4096 * 20, and capacity never drops below that. Above the minimum, it grows as 4096 * <active registrations>. Events already buffered will not dropped when capacity decreases, and capacity is not adjusted during shutdown (0). --- **rangefeed: move back to static (bug configurable) send queue limit** The interactions here probably require more thought before doing anything more complicated than a simple static limit. Epic: none Release note: None --- **rangefeed: fix incorrect comment** Epic: none Release note: None --- **rangefeed: deflake TestUnbufferedRegWithStreamManager** Under stress, this test would fail because attempting to publish 20 events in a tight loop would overwhelm the processor's event buffer (which has a capacity of 16). Epic: none Release note: None 153965: sql/inspect: support checking multiple indexes in one INSPECT job r=spilchen a=spilchen Previously, there was an assert that made sure when building the INSPECT job that we only had one check. This removes that assert so that we can have mulitple checks. When you run INSPECT against many indexes on a table or against a database, we will have multiple checks, so it was necessary for that case. Closes #148300 Release note: none Epic: none 154027: activity: add fallback isolation level for statements not in transaction r=jasonlmfong a=jasonlmfong The change is achieved by retrieving the session default isolation level closes: [#153931](#153931) Epic: None Release note: None Co-authored-by: wenyihu6 <[email protected]> Co-authored-by: Steven Danna <[email protected]> Co-authored-by: Matt Spilchen <[email protected]> Co-authored-by: Jason Fong <[email protected]>
4 parents 24dc88d + 86b6124 + d433b29 + 42b091f commit 0eccabf

15 files changed

+348
-53
lines changed

pkg/kv/kvserver/kvserverbase/base.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,3 +383,16 @@ var MaxCommandSize = settings.RegisterByteSizeSetting(
383383
MaxCommandSizeDefault,
384384
settings.ByteSizeWithMinimum(MaxCommandSizeFloor),
385385
)
386+
387+
// DefaultRangefeedEventCap is the channel capacity of the rangefeed processor
388+
// and each registration. It is also used to calculate the default capacity
389+
// limit for the buffered sender.
390+
//
391+
// The size of an event is 72 bytes, so this will result in an allocation on the
392+
// order of ~300KB per RangeFeed. That's probably ok given the number of ranges
393+
// on a node that we'd like to support with active rangefeeds, but it's
394+
// certainly on the upper end of the range.
395+
//
396+
// Note that processors also must reserve memory from one of two memory monitors
397+
// for each event.
398+
const DefaultRangefeedEventCap = 4096

pkg/kv/kvserver/rangefeed/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ go_library(
3030
"//pkg/kv/kvpb",
3131
"//pkg/kv/kvserver/concurrency/isolation",
3232
"//pkg/kv/kvserver/concurrency/lock",
33+
"//pkg/kv/kvserver/kvserverbase",
3334
"//pkg/roachpb",
3435
"//pkg/settings",
3536
"//pkg/settings/cluster",

pkg/kv/kvserver/rangefeed/buffered_sender.go

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ import (
1010
"time"
1111

1212
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
13+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
14+
"github.com/cockroachdb/cockroach/pkg/settings"
15+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1316
"github.com/cockroachdb/cockroach/pkg/util/retry"
1417
"github.com/cockroachdb/cockroach/pkg/util/stop"
1518
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
@@ -45,6 +48,38 @@ import (
4548
// BufferedPerRangeEventSink.Send BufferedPerRangeEventSink.SendError
4649
//
4750

51+
// RangefeedSingleBufferedSenderQueueMaxSize is the maximum number of events
52+
// that the buffered sender will buffer before it starts returning capacity
53+
// exceeded errors. Updates to this setting are only applied to need
54+
// MuxRangefeedCalls, existing streams will use the previous value until
55+
// restarted.
56+
//
57+
// # The main goal of this limit is to provide a backstop against the
58+
//
59+
// The default here has been somewhat arbitrarily chosen considering that:
60+
//
61+
// - We want to avoid capacity exceeded errors that wouldn't have occurred
62+
// when the buffered registrations were in use.
63+
//
64+
// - We don't want to drastically increase the amount of queueing allowed for a
65+
// single registration.
66+
//
67+
// - One buffered sender is feeding a single gRPC client.
68+
//
69+
// - Events emitted during catchup scans have their own per-registration buffer
70+
// still.
71+
//
72+
// TODO(ssd): This is a bit of a stop-gap so that we have a knob to turn if we
73+
// need to. We probably want each buffered sender (or each consumerID) to be
74+
// able to hold up to some fraction of the total rangefeed budget. But we are
75+
// starting here for now.
76+
var RangefeedSingleBufferedSenderQueueMaxSize = settings.RegisterIntSetting(
77+
settings.SystemOnly,
78+
"kv.rangefeed.buffered_sender.queue_max_size",
79+
"max size of a buffered senders event queue (0 for no max)",
80+
kvserverbase.DefaultRangefeedEventCap*8,
81+
)
82+
4883
// BufferedSender is embedded in every rangefeed.BufferedPerRangeEventSink,
4984
// serving as a helper which buffers events before forwarding events to the
5085
// underlying gRPC stream.
@@ -58,6 +93,9 @@ type BufferedSender struct {
5893
syncutil.Mutex
5994
stopped bool
6095
buffer *eventQueue
96+
// capacity is the maximum number of events that can be buffered.
97+
capacity int64
98+
overflowed bool
6199
}
62100

63101
// notifyDataC is used to notify the BufferedSender.run goroutine that there
@@ -72,14 +110,16 @@ type BufferedSender struct {
72110
}
73111

74112
func NewBufferedSender(
75-
sender ServerStreamSender, bsMetrics *BufferedSenderMetrics,
113+
sender ServerStreamSender, settings *cluster.Settings, bsMetrics *BufferedSenderMetrics,
76114
) *BufferedSender {
77115
bs := &BufferedSender{
78116
sender: sender,
79117
metrics: bsMetrics,
80118
}
81119
bs.queueMu.buffer = newEventQueue()
82120
bs.notifyDataC = make(chan struct{}, 1)
121+
bs.queueMu.buffer = newEventQueue()
122+
bs.queueMu.capacity = RangefeedSingleBufferedSenderQueueMaxSize.Get(&settings.SV)
83123
return bs
84124
}
85125

@@ -95,6 +135,13 @@ func (bs *BufferedSender) sendBuffered(
95135
if bs.queueMu.stopped {
96136
return errors.New("stream sender is stopped")
97137
}
138+
if bs.queueMu.overflowed {
139+
return newRetryErrBufferCapacityExceeded()
140+
}
141+
if bs.queueMu.capacity > 0 && bs.queueMu.buffer.len() >= bs.queueMu.capacity {
142+
bs.queueMu.overflowed = true
143+
return newRetryErrBufferCapacityExceeded()
144+
}
98145
// TODO(wenyihu6): pass an actual context here
99146
alloc.Use(context.Background())
100147
bs.queueMu.buffer.pushBack(sharedMuxEvent{ev, alloc})
@@ -130,7 +177,7 @@ func (bs *BufferedSender) run(
130177
return nil
131178
case <-bs.notifyDataC:
132179
for {
133-
e, success := bs.popFront()
180+
e, success, overflowed, remains := bs.popFront()
134181
if !success {
135182
break
136183
}
@@ -143,18 +190,26 @@ func (bs *BufferedSender) run(
143190
if err != nil {
144191
return err
145192
}
193+
if overflowed && remains == int64(0) {
194+
return newRetryErrBufferCapacityExceeded()
195+
}
146196
}
147197
}
148198
}
149199
}
150200

151201
// popFront pops the front event from the buffer queue. It returns the event and
152202
// a boolean indicating if the event was successfully popped.
153-
func (bs *BufferedSender) popFront() (e sharedMuxEvent, success bool) {
203+
func (bs *BufferedSender) popFront() (
204+
e sharedMuxEvent,
205+
success bool,
206+
overflowed bool,
207+
remains int64,
208+
) {
154209
bs.queueMu.Lock()
155210
defer bs.queueMu.Unlock()
156211
event, ok := bs.queueMu.buffer.popFront()
157-
return event, ok
212+
return event, ok, bs.queueMu.overflowed, bs.queueMu.buffer.len()
158213
}
159214

160215
// cleanup is called when the sender is stopped. It is expected to free up
@@ -168,13 +223,18 @@ func (bs *BufferedSender) cleanup(ctx context.Context) {
168223
bs.metrics.BufferedSenderQueueSize.Dec(remaining)
169224
}
170225

171-
// Used for testing only.
172226
func (bs *BufferedSender) len() int {
173227
bs.queueMu.Lock()
174228
defer bs.queueMu.Unlock()
175229
return int(bs.queueMu.buffer.len())
176230
}
177231

232+
func (bs *BufferedSender) overflowed() bool {
233+
bs.queueMu.Lock()
234+
defer bs.queueMu.Unlock()
235+
return bs.queueMu.overflowed
236+
}
237+
178238
// Used for testing only.
179239
func (bs *BufferedSender) waitForEmptyBuffer(ctx context.Context) error {
180240
opts := retry.Options{

pkg/kv/kvserver/rangefeed/buffered_sender_test.go

Lines changed: 128 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313

1414
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1515
"github.com/cockroachdb/cockroach/pkg/roachpb"
16+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
17+
"github.com/cockroachdb/cockroach/pkg/testutils"
1618
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1719
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
1820
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -34,7 +36,8 @@ func TestBufferedSenderDisconnectStream(t *testing.T) {
3436
defer stopper.Stop(ctx)
3537
testServerStream := newTestServerStream()
3638
smMetrics := NewStreamManagerMetrics()
37-
bs := NewBufferedSender(testServerStream, NewBufferedSenderMetrics())
39+
st := cluster.MakeTestingClusterSettings()
40+
bs := NewBufferedSender(testServerStream, st, NewBufferedSenderMetrics())
3841
sm := NewStreamManager(bs, smMetrics)
3942
require.NoError(t, sm.Start(ctx, stopper))
4043
defer sm.Stop(ctx)
@@ -87,7 +90,8 @@ func TestBufferedSenderChaosWithStop(t *testing.T) {
8790
testServerStream := newTestServerStream()
8891

8992
smMetrics := NewStreamManagerMetrics()
90-
bs := NewBufferedSender(testServerStream, NewBufferedSenderMetrics())
93+
st := cluster.MakeTestingClusterSettings()
94+
bs := NewBufferedSender(testServerStream, st, NewBufferedSenderMetrics())
9195
sm := NewStreamManager(bs, smMetrics)
9296
require.NoError(t, sm.Start(ctx, stopper))
9397

@@ -164,3 +168,125 @@ func TestBufferedSenderChaosWithStop(t *testing.T) {
164168
require.Equal(t, 0, bs.len())
165169
})
166170
}
171+
172+
// TestBufferedSenderOnOverflow tests that BufferedSender handles overflow
173+
// properly. It does not test the shutdown flow with stream manager.
174+
func TestBufferedSenderOnOverflow(t *testing.T) {
175+
defer leaktest.AfterTest(t)()
176+
defer log.Scope(t).Close(t)
177+
ctx := context.Background()
178+
179+
stopper := stop.NewStopper()
180+
defer stopper.Stop(ctx)
181+
testServerStream := newTestServerStream()
182+
st := cluster.MakeTestingClusterSettings()
183+
184+
queueCap := int64(24)
185+
RangefeedSingleBufferedSenderQueueMaxSize.Override(ctx, &st.SV, queueCap)
186+
bs := NewBufferedSender(testServerStream, st, NewBufferedSenderMetrics())
187+
require.Equal(t, queueCap, bs.queueMu.capacity)
188+
189+
val1 := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}}
190+
ev1 := new(kvpb.RangeFeedEvent)
191+
ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val1})
192+
muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: 1}
193+
194+
for range queueCap {
195+
require.NoError(t, bs.sendBuffered(muxEv, nil))
196+
}
197+
require.Equal(t, queueCap, int64(bs.len()))
198+
e, success, overflowed, remains := bs.popFront()
199+
require.Equal(t, sharedMuxEvent{
200+
ev: muxEv,
201+
alloc: nil,
202+
}, e)
203+
require.True(t, success)
204+
require.False(t, overflowed)
205+
require.Equal(t, queueCap-1, remains)
206+
require.Equal(t, queueCap-1, int64(bs.len()))
207+
require.NoError(t, bs.sendBuffered(muxEv, nil))
208+
require.Equal(t, queueCap, int64(bs.len()))
209+
210+
// Overflow now.
211+
require.Equal(t, bs.sendBuffered(muxEv, nil).Error(),
212+
newRetryErrBufferCapacityExceeded().Error())
213+
}
214+
215+
// TestBufferedSenderOnStreamShutdown tests that BufferedSender and
216+
// StreamManager handle overflow and shutdown properly.
217+
func TestBufferedSenderOnStreamShutdown(t *testing.T) {
218+
defer leaktest.AfterTest(t)()
219+
defer log.Scope(t).Close(t)
220+
ctx := context.Background()
221+
222+
stopper := stop.NewStopper()
223+
defer stopper.Stop(ctx)
224+
testServerStream := newTestServerStream()
225+
smMetrics := NewStreamManagerMetrics()
226+
st := cluster.MakeTestingClusterSettings()
227+
228+
queueCap := int64(24)
229+
RangefeedSingleBufferedSenderQueueMaxSize.Override(ctx, &st.SV, queueCap)
230+
bs := NewBufferedSender(testServerStream, st, NewBufferedSenderMetrics())
231+
require.Equal(t, queueCap, bs.queueMu.capacity)
232+
233+
sm := NewStreamManager(bs, smMetrics)
234+
require.NoError(t, sm.Start(ctx, stopper))
235+
defer sm.Stop(ctx)
236+
237+
p, h, pStopper := newTestProcessor(t, withRangefeedTestType(scheduledProcessorWithBufferedSender))
238+
defer pStopper.Stop(ctx)
239+
240+
streamID := int64(42)
241+
242+
val1 := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}}
243+
ev1 := new(kvpb.RangeFeedEvent)
244+
ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val1})
245+
muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: streamID}
246+
247+
// Block the stream so that we can overflow later.
248+
unblock := testServerStream.BlockSend()
249+
defer unblock()
250+
251+
waitForQueueLen := func(len int) {
252+
testutils.SucceedsSoon(t, func() error {
253+
if bs.len() == len {
254+
return nil
255+
}
256+
return errors.Newf("expected %d events, found %d", len, bs.len())
257+
})
258+
}
259+
260+
// Add our stream to the stream manager.
261+
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
262+
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
263+
sm.NewStream(streamID, 1 /*rangeID*/))
264+
require.True(t, registered)
265+
sm.AddStream(streamID, d)
266+
267+
require.NoError(t, sm.sender.sendBuffered(muxEv, nil))
268+
// At this point we actually have sent 2 events. 1 checkpoint event sent by
269+
// register and 1 event sent on the line above. We wait for 1 of these events
270+
// to be pulled off the queue and block in the sender, leaving 1 in the queue.
271+
waitForQueueLen(1)
272+
// Now fill the rest of the queue.
273+
for range queueCap - 1 {
274+
require.NoError(t, sm.sender.sendBuffered(muxEv, nil))
275+
}
276+
277+
// The next write should overflow.
278+
capExceededErrStr := newRetryErrBufferCapacityExceeded().Error()
279+
err := sm.sender.sendBuffered(muxEv, nil)
280+
require.EqualError(t, err, capExceededErrStr)
281+
require.True(t, bs.overflowed())
282+
283+
unblock()
284+
waitForQueueLen(0)
285+
// Overflow cleanup.
286+
err = <-sm.Error()
287+
require.EqualError(t, err, capExceededErrStr)
288+
// Note that we expect the stream manager to shut down here, but no actual
289+
// error events would be sent during the shutdown.
290+
err = sm.sender.sendBuffered(muxEv, nil)
291+
require.EqualError(t, err, capExceededErrStr)
292+
}

pkg/kv/kvserver/rangefeed/sender_helper_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ type testServerStream struct {
3535
eventsSent int
3636
// streamEvents is a map of streamID to a list of events sent to that stream.
3737
streamEvents map[int64][]*kvpb.MuxRangeFeedEvent
38+
39+
// t can optionally set for additional logging.
40+
t *testing.T
3841
}
3942

4043
var _ ServerStreamSender = &testServerStream{}
@@ -121,6 +124,9 @@ func (s *testServerStream) SendIsThreadSafe() {}
121124
// Send mocks grpc.ServerStream Send method. It only counts events and stores
122125
// events by streamID in streamEvents.
123126
func (s *testServerStream) Send(e *kvpb.MuxRangeFeedEvent) error {
127+
if s.t != nil {
128+
s.t.Logf("Sending event for StreamID %d: %v", e.StreamID, e)
129+
}
124130
s.Lock()
125131
defer s.Unlock()
126132
s.eventsSent++

pkg/kv/kvserver/rangefeed/stream_manager_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"testing"
1414

1515
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
16+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1617
"github.com/cockroachdb/cockroach/pkg/testutils"
1718
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1819
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -175,12 +176,13 @@ func TestStreamManagerErrorHandling(t *testing.T) {
175176
testutils.RunValues(t, "feed type", testTypes, func(t *testing.T, rt rangefeedTestType) {
176177
testServerStream := newTestServerStream()
177178
smMetrics := NewStreamManagerMetrics()
179+
st := cluster.MakeTestingClusterSettings()
178180
var s sender
179181
switch rt {
180182
case scheduledProcessorWithUnbufferedSender:
181183
s = NewUnbufferedSender(testServerStream)
182184
case scheduledProcessorWithBufferedSender:
183-
s = NewBufferedSender(testServerStream, NewBufferedSenderMetrics())
185+
s = NewBufferedSender(testServerStream, st, NewBufferedSenderMetrics())
184186
default:
185187
t.Fatalf("unknown rangefeed test type %v", rt)
186188
}

0 commit comments

Comments
 (0)