Skip to content

Commit 81e65c9

Browse files
committed
kvserver/rangefeed: add capacity to node level buffered sender
This patch adds capacity to node level buffered sender which will shut down all registrations if the node level buffer had overflowed. Part of: #129813 Release note: none
1 parent f7554ce commit 81e65c9

File tree

6 files changed

+44
-13
lines changed

6 files changed

+44
-13
lines changed

pkg/kv/kvserver/rangefeed/buffered_sender.go

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"time"
1111

1212
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
13+
"github.com/cockroachdb/cockroach/pkg/util/log"
1314
"github.com/cockroachdb/cockroach/pkg/util/retry"
1415
"github.com/cockroachdb/cockroach/pkg/util/stop"
1516
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
@@ -56,8 +57,10 @@ type BufferedSender struct {
5657
// queueMu protects the buffer queue.
5758
queueMu struct {
5859
syncutil.Mutex
59-
stopped bool
60-
buffer *eventQueue
60+
stopped bool
61+
buffer *eventQueue
62+
capacity int64
63+
overflow bool
6164
}
6265

6366
// notifyDataC is used to notify the BufferedSender.run goroutine that there
@@ -72,14 +75,16 @@ type BufferedSender struct {
7275
}
7376

7477
func NewBufferedSender(
75-
sender ServerStreamSender, bsMetrics *BufferedSenderMetrics,
78+
sender ServerStreamSender, bsMetrics *BufferedSenderMetrics, maxQueueSize int64,
7679
) *BufferedSender {
7780
bs := &BufferedSender{
7881
sender: sender,
7982
metrics: bsMetrics,
8083
}
8184
bs.queueMu.buffer = newEventQueue()
8285
bs.notifyDataC = make(chan struct{}, 1)
86+
bs.queueMu.buffer = newEventQueue()
87+
bs.queueMu.capacity = maxQueueSize
8388
return bs
8489
}
8590

@@ -95,6 +100,15 @@ func (bs *BufferedSender) sendBuffered(
95100
if bs.queueMu.stopped {
96101
return errors.New("stream sender is stopped")
97102
}
103+
if bs.queueMu.overflow {
104+
// Is this too spammy
105+
log.Dev.Error(context.Background(), "buffer capacity exceeded")
106+
return newRetryErrBufferCapacityExceeded()
107+
}
108+
if bs.queueMu.buffer.len() >= bs.queueMu.capacity {
109+
bs.queueMu.overflow = true
110+
return newRetryErrBufferCapacityExceeded()
111+
}
98112
// TODO(wenyihu6): pass an actual context here
99113
alloc.Use(context.Background())
100114
bs.queueMu.buffer.pushBack(sharedMuxEvent{ev, alloc})
@@ -130,7 +144,7 @@ func (bs *BufferedSender) run(
130144
return nil
131145
case <-bs.notifyDataC:
132146
for {
133-
e, success := bs.popFront()
147+
e, success, overflowed, remains := bs.popFront()
134148
if !success {
135149
break
136150
}
@@ -143,18 +157,26 @@ func (bs *BufferedSender) run(
143157
if err != nil {
144158
return err
145159
}
160+
if overflowed && remains == int64(0) {
161+
return newRetryErrBufferCapacityExceeded()
162+
}
146163
}
147164
}
148165
}
149166
}
150167

151168
// popFront pops the front event from the buffer queue. It returns the event and
152169
// a boolean indicating if the event was successfully popped.
153-
func (bs *BufferedSender) popFront() (e sharedMuxEvent, success bool) {
170+
func (bs *BufferedSender) popFront() (
171+
e sharedMuxEvent,
172+
success bool,
173+
overflowed bool,
174+
remains int64,
175+
) {
154176
bs.queueMu.Lock()
155177
defer bs.queueMu.Unlock()
156178
event, ok := bs.queueMu.buffer.popFront()
157-
return event, ok
179+
return event, ok, bs.queueMu.overflow, bs.queueMu.buffer.len()
158180
}
159181

160182
// cleanup is called when the sender is stopped. It is expected to free up

pkg/kv/kvserver/rangefeed/buffered_sender_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestBufferedSenderDisconnectStream(t *testing.T) {
3434
defer stopper.Stop(ctx)
3535
testServerStream := newTestServerStream()
3636
smMetrics := NewStreamManagerMetrics()
37-
bs := NewBufferedSender(testServerStream, NewBufferedSenderMetrics())
37+
bs := NewBufferedSender(testServerStream, NewBufferedSenderMetrics(), 1000)
3838
sm := NewStreamManager(bs, smMetrics)
3939
require.NoError(t, sm.Start(ctx, stopper))
4040
defer sm.Stop(ctx)
@@ -87,7 +87,7 @@ func TestBufferedSenderChaosWithStop(t *testing.T) {
8787
testServerStream := newTestServerStream()
8888

8989
smMetrics := NewStreamManagerMetrics()
90-
bs := NewBufferedSender(testServerStream, NewBufferedSenderMetrics())
90+
bs := NewBufferedSender(testServerStream, NewBufferedSenderMetrics(), 1000)
9191
sm := NewStreamManager(bs, smMetrics)
9292
require.NoError(t, sm.Start(ctx, stopper))
9393

pkg/kv/kvserver/rangefeed/stream_manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ func TestStreamManagerErrorHandling(t *testing.T) {
180180
case scheduledProcessorWithUnbufferedSender:
181181
s = NewUnbufferedSender(testServerStream)
182182
case scheduledProcessorWithBufferedSender:
183-
s = NewBufferedSender(testServerStream, NewBufferedSenderMetrics())
183+
s = NewBufferedSender(testServerStream, NewBufferedSenderMetrics(), 1000)
184184
default:
185185
t.Fatalf("unknown rangefeed test type %v", rt)
186186
}

pkg/kv/kvserver/rangefeed/unbuffered_registration_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestUnbufferedRegWithStreamManager(t *testing.T) {
3434
defer stopper.Stop(ctx)
3535
testServerStream := newTestServerStream()
3636
smMetrics := NewStreamManagerMetrics()
37-
bs := NewBufferedSender(testServerStream, NewBufferedSenderMetrics())
37+
bs := NewBufferedSender(testServerStream, NewBufferedSenderMetrics(), 1000)
3838
sm := NewStreamManager(bs, smMetrics)
3939
require.NoError(t, sm.Start(ctx, stopper))
4040

@@ -106,7 +106,7 @@ func TestUnbufferedRegCorrectnessOnDisconnect(t *testing.T) {
106106
defer stopper.Stop(ctx)
107107
testServerStream := newTestServerStream()
108108
smMetrics := NewStreamManagerMetrics()
109-
bs := NewBufferedSender(testServerStream, NewBufferedSenderMetrics())
109+
bs := NewBufferedSender(testServerStream, NewBufferedSenderMetrics(), 1000)
110110
sm := NewStreamManager(bs, smMetrics)
111111
require.NoError(t, sm.Start(ctx, stopper))
112112
defer sm.Stop(ctx)

pkg/kv/kvserver/replica_rangefeed.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,15 @@ var RangefeedUseBufferedSender = settings.RegisterBoolSetting(
9393
metamorphic.ConstantWithTestBool("kv.rangefeed.buffered_sender.enabled", true),
9494
)
9595

96+
var RangefeedSingleBufferedSenderQueueMaxSize = settings.RegisterIntSetting(
97+
settings.SystemOnly,
98+
"kv.rangefeed.buffered_sender_queue.max_size",
99+
"max size of the buffered sender queue",
100+
// what should we set here a single buffered sender here for one Rangefeed
101+
// call
102+
2048,
103+
)
104+
96105
func init() {
97106
// Inject into kvserverbase to allow usage from kvcoord.
98107
kvserverbase.RangeFeedRefreshInterval = RangeFeedRefreshInterval

pkg/server/node.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2224,8 +2224,8 @@ func (n *Node) muxRangeFeed(muxStream kvpb.RPCInternal_MuxRangeFeedStream) error
22242224

22252225
sm := &rangefeed.StreamManager{}
22262226
if kvserver.RangefeedUseBufferedSender.Get(&n.storeCfg.Settings.SV) {
2227-
sm = rangefeed.NewStreamManager(rangefeed.NewBufferedSender(lockedMuxStream, n.metrics.BufferedSenderMetrics),
2228-
n.metrics.StreamManagerMetrics)
2227+
sm = rangefeed.NewStreamManager(rangefeed.NewBufferedSender(lockedMuxStream, n.metrics.BufferedSenderMetrics,
2228+
kvserver.RangefeedSingleBufferedSenderQueueMaxSize.Get(&n.storeCfg.Settings.SV)), n.metrics.StreamManagerMetrics)
22292229
} else {
22302230
sm = rangefeed.NewStreamManager(rangefeed.NewUnbufferedSender(lockedMuxStream), n.metrics.StreamManagerMetrics)
22312231
}

0 commit comments

Comments
 (0)