Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 47 additions & 36 deletions pkg/kv/kvserver/rangefeed/buffered_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,47 +147,50 @@ func (bs *BufferedSender) sendBuffered(
// registration. This error should be the next event that is sent to
// stream.
//
// NB: The zero-value of streamStatus is the valid state of a newly seen
// stream.
status := bs.queueMu.byStream[ev.StreamID]
switch status.state {
case streamActive:
if bs.queueMu.perStreamCapacity > 0 && status.queueItems == bs.queueMu.perStreamCapacity {
// NB: We don't error if the stream status is not found as this may be an
// event for an already closed stream. Such events are possible while the
// registration publishes the catch up scan buffer.
status, ok := bs.queueMu.byStream[ev.StreamID]
if ok {
switch status.state {
case streamActive:
if bs.queueMu.perStreamCapacity > 0 && status.queueItems == bs.queueMu.perStreamCapacity {
if ev.Error != nil {
// If _this_ event is an error, no use sending another error. This stream
// is going down. Admit this error and mark the stream as overflowed.
status.state = streamOverflowed
} else {
// This stream is at capacity, return an error to the registration that it
// should send back to us after cleaning up.
status.state = streamOverflowing
return newRetryErrBufferCapacityExceeded()
}
}
case streamOverflowing:
// The unbufferedRegistration is the only component that sends non-error
// events to our stream. In response to the error we return when moving to
// stateOverflowing, it should immediately send us an error and mark itself
// as disconnected.
//
// The only unfortunate exception is if we get disconnected while flushing
// the catch-up scan buffer.
if ev.Error != nil {
// If _this_ event is an error, no use sending another error. This stream
// is going down. Admit this error and mark the stream as overflowed.
status.state = streamOverflowed
} else {
// This stream is at capacity, return an error to the registration that it
// should send back to us after cleaning up.
status.state = streamOverflowing
return newRetryErrBufferCapacityExceeded()
}
case streamOverflowed:
// If we are overflowed, we don't expect any further events because the
// registration should have disconnected in response to the error.
//
// TODO(ssd): Consider adding an assertion here.
return nil
default:
panic(fmt.Sprintf("unhandled stream state: %v", status.state))
}
case streamOverflowing:
// The unbufferedRegistration is the only component that sends non-error
// events to our stream. In response to the error we return when moving to
// stateOverflowing, it should immediately send us an error and mark itself
// as disconnected.
//
// The only unfortunate exception is if we get disconnected while flushing
// the catch-up scan buffer.
if ev.Error != nil {
status.state = streamOverflowed
}
case streamOverflowed:
// If we are overflowed, we don't expect any further events because the
// registration should have disconnected in response to the error.
//
// TODO(ssd): Consider adding an assertion here.
return nil
default:
panic(fmt.Sprintf("unhandled stream state: %v", status.state))
}
// We are admitting this event.
status.queueItems++
bs.queueMu.byStream[ev.StreamID] = status

// We are admitting this event.
status.queueItems++
bs.queueMu.byStream[ev.StreamID] = status
}

// TODO(wenyihu6): pass an actual context here
alloc.Use(context.Background())
Expand Down Expand Up @@ -259,6 +262,14 @@ func (bs *BufferedSender) popFront() (e sharedMuxEvent, success bool) {
return event, ok
}

func (bs *BufferedSender) addStream(streamID int64) {
bs.queueMu.Lock()
defer bs.queueMu.Unlock()
if _, ok := bs.queueMu.byStream[streamID]; !ok {
bs.queueMu.byStream[streamID] = streamStatus{}
}
}

func (bs *BufferedSender) removeStream(streamID int64) {
bs.queueMu.Lock()
defer bs.queueMu.Unlock()
Expand Down
22 changes: 14 additions & 8 deletions pkg/kv/kvserver/rangefeed/buffered_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,17 @@ func TestBufferedSenderOnOverflow(t *testing.T) {
st := cluster.MakeTestingClusterSettings()

queueCap := int64(24)
streamID := int64(1)

RangefeedSingleBufferedSenderQueueMaxPerReg.Override(ctx, &st.SV, queueCap)
bs := NewBufferedSender(testServerStream, st, NewBufferedSenderMetrics())
bs.addStream(streamID)
require.Equal(t, queueCap, bs.queueMu.perStreamCapacity)

val1 := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}}
ev1 := new(kvpb.RangeFeedEvent)
ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val1})
muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: 1}
muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: streamID}

for range queueCap {
require.NoError(t, bs.sendBuffered(muxEv, nil))
Expand Down Expand Up @@ -235,12 +238,13 @@ func TestBufferedSenderOnOverflowMultiStream(t *testing.T) {
p, h, pStopper := newTestProcessor(t, withRangefeedTestType(scheduledProcessorWithBufferedSender))
defer pStopper.Stop(ctx)

streamID := int64(42)
streamID1 := int64(42)
streamID2 := streamID1 + 1

val1 := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}}
ev1 := new(kvpb.RangeFeedEvent)
ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val1})
muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: streamID}
muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: streamID1}

// Block the stream so that we can overflow later.
unblock := testServerStream.BlockSend()
Expand All @@ -256,18 +260,20 @@ func TestBufferedSenderOnOverflowMultiStream(t *testing.T) {
}

// Add our stream to the stream manager.
sm.RegisteringStream(streamID1)
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
sm.NewStream(streamID, 1 /*rangeID*/))
sm.NewStream(streamID1, 1 /*rangeID*/))
require.True(t, registered)
sm.AddStream(streamID, d)
sm.AddStream(streamID1, d)

// Add a second stream to the stream manager.
sm.RegisteringStream(streamID2)
registered, d, _ = p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
sm.NewStream(streamID+1, 1 /*rangeID*/))
sm.NewStream(streamID2, 1 /*rangeID*/))
require.True(t, registered)
sm.AddStream(streamID+1, d)
sm.AddStream(streamID2, d)

// At this point we actually have sent 2 events, one for each checkpoint sent
// by the registrations. One of these should get pulled off the queue and block.
Expand All @@ -283,7 +289,7 @@ func TestBufferedSenderOnOverflowMultiStream(t *testing.T) {
require.EqualError(t, err, capExceededErrStr)

// A write to a different stream should be fine
muxEv.StreamID = streamID + 2
muxEv.StreamID = streamID2
err = sm.sender.sendBuffered(muxEv, nil)
require.NoError(t, err)

Expand Down
10 changes: 9 additions & 1 deletion pkg/kv/kvserver/rangefeed/stream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ type sender interface {
// all streams in StreamManager.
run(ctx context.Context, stopper *stop.Stopper, onError func(int64)) error

// Remove stream is called when an individual stream is being removed.
// addStream is called when an individual stream is being added.
addStream(streamID int64)
// removeStream is called when an individual stream is being removed.
removeStream(streamID int64)

// cleanup is called when the sender is stopped. It is expected to clean up
Expand Down Expand Up @@ -139,6 +141,12 @@ func (sm *StreamManager) DisconnectStream(streamID int64, err *kvpb.Error) {
}
}

// RegisteringStream is called once a stream will be registered. After this
// point, the stream may start to see event.
func (sm *StreamManager) RegisteringStream(streamID int64) {
sm.sender.addStream(streamID)
}

// AddStream adds a streamID with its disconnector to the StreamManager.
// StreamManager can use the disconnector to shut down the rangefeed stream
// later on.
Expand Down
12 changes: 8 additions & 4 deletions pkg/kv/kvserver/rangefeed/stream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,11 @@ func TestStreamManagerErrorHandling(t *testing.T) {
t.Run("Fail to register rangefeed with the processor", func(t *testing.T) {
p, _, stopper := newTestProcessor(t, withRangefeedTestType(rt))
defer stopper.Stop(ctx)
sm.NewStream(sID, rID)
// We mock failed registration by not calling p.Register.
// node.MuxRangefeed would call sendBuffered with error event.
require.NoError(t, sm.sender.sendBuffered(makeMuxRangefeedErrorEvent(sID, rID, disconnectErr), nil))
streamSink := sm.NewStream(sID, rID)
// We mock failed registration by not calling p.Register and calling
// SendError just as (*Node).muxRangeFeed does.
sm.RegisteringStream(sID)
streamSink.SendError(disconnectErr)
expectErrorHandlingInvariance(p)
testServerStream.reset()
})
Expand All @@ -223,6 +224,7 @@ func TestStreamManagerErrorHandling(t *testing.T) {
p, h, stopper := newTestProcessor(t, withRangefeedTestType(rt))
defer stopper.Stop(ctx)
stream := sm.NewStream(sID, rID)
sm.RegisteringStream(sID)
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
stream)
Expand All @@ -237,6 +239,7 @@ func TestStreamManagerErrorHandling(t *testing.T) {
stream := sm.NewStream(sID, rID)
p, h, stopper := newTestProcessor(t, withRangefeedTestType(rt))
defer stopper.Stop(ctx)
sm.RegisteringStream(sID)
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
stream)
Expand All @@ -252,6 +255,7 @@ func TestStreamManagerErrorHandling(t *testing.T) {
stream := sm.NewStream(sID, rID)
p, h, stopper := newTestProcessor(t, withRangefeedTestType(rt))
defer stopper.Stop(ctx)
sm.RegisteringStream(sID)
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
stream)
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/rangefeed/unbuffered_registration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestUnbufferedRegWithStreamManager(t *testing.T) {
})
t.Run("register 50 streams", func(t *testing.T) {
for id := int64(0); id < 50; id++ {
sm.RegisteringStream(id)
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
sm.NewStream(id, r1))
Expand Down Expand Up @@ -142,6 +143,7 @@ func TestUnbufferedRegCorrectnessOnDisconnect(t *testing.T) {
evErr.MustSetValue(&kvpb.RangeFeedError{Error: *discErr})

// Register one stream.
sm.RegisteringStream(s1)
registered, d, _ := p.Register(ctx, h.span, startTs,
makeCatchUpIterator(catchUpIter, span, startTs), /* catchUpIter */
true /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/rangefeed/unbuffered_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ func (ubs *UnbufferedSender) sendUnbuffered(event *kvpb.MuxRangeFeedEvent) error
return ubs.sender.Send(event)
}

// addStream implements sender.
func (ubs *UnbufferedSender) addStream(int64) {}

// removeStream implements sender.
func (ubs *UnbufferedSender) removeStream(int64) {}

Expand Down
1 change: 1 addition & 0 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2307,6 +2307,7 @@ func (n *Node) muxRangeFeed(muxStream kvpb.RPCInternal_MuxRangeFeedStream) error
// Disconnector returned can be used to shut down rangefeed from the
// stream manager. If rangefeed disconnects with an error after being
// successfully registered, it calls streamSink.SendError.
sm.RegisteringStream(req.StreamID)
if disconnector, err := n.stores.RangeFeed(streamCtx, req, streamSink, limiter); err != nil {
streamSink.SendError(kvpb.NewError(err))
} else {
Expand Down