Skip to content

Commit e9963de

Browse files
committed
rangefeed: explicitly initialize stream status
Since we do have a window after an error where the unbufferedRegistration may send events, here we move to explicit initialization of the stream status to avoid leaking any streamStatus structs. Epic: none Release note: None
1 parent ae0c26a commit e9963de

File tree

7 files changed

+84
-49
lines changed

7 files changed

+84
-49
lines changed

pkg/kv/kvserver/rangefeed/buffered_sender.go

Lines changed: 47 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -147,47 +147,50 @@ func (bs *BufferedSender) sendBuffered(
147147
// registration. This error should be the next event that is sent to
148148
// stream.
149149
//
150-
// NB: The zero-value of streamStatus is the valid state of a newly seen
151-
// stream.
152-
status := bs.queueMu.byStream[ev.StreamID]
153-
switch status.state {
154-
case streamActive:
155-
if bs.queueMu.perStreamCapacity > 0 && status.queueItems == bs.queueMu.perStreamCapacity {
150+
// NB: We don't error if the stream status is not found as this may be an
151+
// event for an already closed stream. Such events are possible while the
152+
// registration publishes the catch up scan buffer.
153+
status, ok := bs.queueMu.byStream[ev.StreamID]
154+
if ok {
155+
switch status.state {
156+
case streamActive:
157+
if bs.queueMu.perStreamCapacity > 0 && status.queueItems == bs.queueMu.perStreamCapacity {
158+
if ev.Error != nil {
159+
// If _this_ event is an error, no use sending another error. This stream
160+
// is going down. Admit this error and mark the stream as overflowed.
161+
status.state = streamOverflowed
162+
} else {
163+
// This stream is at capacity, return an error to the registration that it
164+
// should send back to us after cleaning up.
165+
status.state = streamOverflowing
166+
return newRetryErrBufferCapacityExceeded()
167+
}
168+
}
169+
case streamOverflowing:
170+
// The unbufferedRegistration is the only component that sends non-error
171+
// events to our stream. In response to the error we return when moving to
172+
// stateOverflowing, it should immediately send us an error and mark itself
173+
// as disconnected.
174+
//
175+
// The only unfortunate exception is if we get disconnected while flushing
176+
// the catch-up scan buffer.
156177
if ev.Error != nil {
157-
// If _this_ event is an error, no use sending another error. This stream
158-
// is going down. Admit this error and mark the stream as overflowed.
159178
status.state = streamOverflowed
160-
} else {
161-
// This stream is at capacity, return an error to the registration that it
162-
// should send back to us after cleaning up.
163-
status.state = streamOverflowing
164-
return newRetryErrBufferCapacityExceeded()
165179
}
180+
case streamOverflowed:
181+
// If we are overflowed, we don't expect any further events because the
182+
// registration should have disconnected in response to the error.
183+
//
184+
// TODO(ssd): Consider adding an assertion here.
185+
return nil
186+
default:
187+
panic(fmt.Sprintf("unhandled stream state: %v", status.state))
166188
}
167-
case streamOverflowing:
168-
// The unbufferedRegistration is the only component that sends non-error
169-
// events to our stream. In response to the error we return when moving to
170-
// stateOverflowing, it should immediately send us an error and mark itself
171-
// as disconnected.
172-
//
173-
// The only unfortunate exception is if we get disconnected while flushing
174-
// the catch-up scan buffer.
175-
if ev.Error != nil {
176-
status.state = streamOverflowed
177-
}
178-
case streamOverflowed:
179-
// If we are overflowed, we don't expect any further events because the
180-
// registration should have disconnected in response to the error.
181-
//
182-
// TODO(ssd): Consider adding an assertion here.
183-
return nil
184-
default:
185-
panic(fmt.Sprintf("unhandled stream state: %v", status.state))
186-
}
189+
// We are admitting this event.
190+
status.queueItems++
191+
bs.queueMu.byStream[ev.StreamID] = status
187192

188-
// We are admitting this event.
189-
status.queueItems++
190-
bs.queueMu.byStream[ev.StreamID] = status
193+
}
191194

192195
// TODO(wenyihu6): pass an actual context here
193196
alloc.Use(context.Background())
@@ -259,6 +262,14 @@ func (bs *BufferedSender) popFront() (e sharedMuxEvent, success bool) {
259262
return event, ok
260263
}
261264

265+
func (bs *BufferedSender) addStream(streamID int64) {
266+
bs.queueMu.Lock()
267+
defer bs.queueMu.Unlock()
268+
if _, ok := bs.queueMu.byStream[streamID]; !ok {
269+
bs.queueMu.byStream[streamID] = streamStatus{}
270+
}
271+
}
272+
262273
func (bs *BufferedSender) removeStream(streamID int64) {
263274
bs.queueMu.Lock()
264275
defer bs.queueMu.Unlock()

pkg/kv/kvserver/rangefeed/buffered_sender_test.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -182,14 +182,17 @@ func TestBufferedSenderOnOverflow(t *testing.T) {
182182
st := cluster.MakeTestingClusterSettings()
183183

184184
queueCap := int64(24)
185+
streamID := int64(1)
186+
185187
RangefeedSingleBufferedSenderQueueMaxPerReg.Override(ctx, &st.SV, queueCap)
186188
bs := NewBufferedSender(testServerStream, st, NewBufferedSenderMetrics())
189+
bs.addStream(streamID)
187190
require.Equal(t, queueCap, bs.queueMu.perStreamCapacity)
188191

189192
val1 := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}}
190193
ev1 := new(kvpb.RangeFeedEvent)
191194
ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val1})
192-
muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: 1}
195+
muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: streamID}
193196

194197
for range queueCap {
195198
require.NoError(t, bs.sendBuffered(muxEv, nil))
@@ -235,12 +238,13 @@ func TestBufferedSenderOnOverflowMultiStream(t *testing.T) {
235238
p, h, pStopper := newTestProcessor(t, withRangefeedTestType(scheduledProcessorWithBufferedSender))
236239
defer pStopper.Stop(ctx)
237240

238-
streamID := int64(42)
241+
streamID1 := int64(42)
242+
streamID2 := streamID1 + 1
239243

240244
val1 := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}}
241245
ev1 := new(kvpb.RangeFeedEvent)
242246
ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val1})
243-
muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: streamID}
247+
muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: streamID1}
244248

245249
// Block the stream so that we can overflow later.
246250
unblock := testServerStream.BlockSend()
@@ -256,18 +260,20 @@ func TestBufferedSenderOnOverflowMultiStream(t *testing.T) {
256260
}
257261

258262
// Add our stream to the stream manager.
263+
sm.RegisteringStream(streamID1)
259264
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
260265
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
261-
sm.NewStream(streamID, 1 /*rangeID*/))
266+
sm.NewStream(streamID1, 1 /*rangeID*/))
262267
require.True(t, registered)
263-
sm.AddStream(streamID, d)
268+
sm.AddStream(streamID1, d)
264269

265270
// Add a second stream to the stream manager.
271+
sm.RegisteringStream(streamID2)
266272
registered, d, _ = p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
267273
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
268-
sm.NewStream(streamID+1, 1 /*rangeID*/))
274+
sm.NewStream(streamID2, 1 /*rangeID*/))
269275
require.True(t, registered)
270-
sm.AddStream(streamID+1, d)
276+
sm.AddStream(streamID2, d)
271277

272278
// At this point we actually have sent 2 events, one for each checkpoint sent
273279
// by the registrations. One of these should get pulled off the queue and block.
@@ -283,7 +289,7 @@ func TestBufferedSenderOnOverflowMultiStream(t *testing.T) {
283289
require.EqualError(t, err, capExceededErrStr)
284290

285291
// A write to a different stream should be fine
286-
muxEv.StreamID = streamID + 2
292+
muxEv.StreamID = streamID2
287293
err = sm.sender.sendBuffered(muxEv, nil)
288294
require.NoError(t, err)
289295

pkg/kv/kvserver/rangefeed/stream_manager.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ type sender interface {
7676
// all streams in StreamManager.
7777
run(ctx context.Context, stopper *stop.Stopper, onError func(int64)) error
7878

79-
// Remove stream is called when an individual stream is being removed.
79+
// addStream is called when an individual stream is being added.
80+
addStream(streamID int64)
81+
// removeStream is called when an individual stream is being removed.
8082
removeStream(streamID int64)
8183

8284
// cleanup is called when the sender is stopped. It is expected to clean up
@@ -139,6 +141,12 @@ func (sm *StreamManager) DisconnectStream(streamID int64, err *kvpb.Error) {
139141
}
140142
}
141143

144+
// RegisteringStream is called once a stream will be registered. After this
145+
// point, the stream may start to see event.
146+
func (sm *StreamManager) RegisteringStream(streamID int64) {
147+
sm.sender.addStream(streamID)
148+
}
149+
142150
// AddStream adds a streamID with its disconnector to the StreamManager.
143151
// StreamManager can use the disconnector to shut down the rangefeed stream
144152
// later on.

pkg/kv/kvserver/rangefeed/stream_manager_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -211,10 +211,11 @@ func TestStreamManagerErrorHandling(t *testing.T) {
211211
t.Run("Fail to register rangefeed with the processor", func(t *testing.T) {
212212
p, _, stopper := newTestProcessor(t, withRangefeedTestType(rt))
213213
defer stopper.Stop(ctx)
214-
sm.NewStream(sID, rID)
215-
// We mock failed registration by not calling p.Register.
216-
// node.MuxRangefeed would call sendBuffered with error event.
217-
require.NoError(t, sm.sender.sendBuffered(makeMuxRangefeedErrorEvent(sID, rID, disconnectErr), nil))
214+
streamSink := sm.NewStream(sID, rID)
215+
// We mock failed registration by not calling p.Register and calling
216+
// SendError just as (*Node).muxRangeFeed does.
217+
sm.RegisteringStream(sID)
218+
streamSink.SendError(disconnectErr)
218219
expectErrorHandlingInvariance(p)
219220
testServerStream.reset()
220221
})
@@ -223,6 +224,7 @@ func TestStreamManagerErrorHandling(t *testing.T) {
223224
p, h, stopper := newTestProcessor(t, withRangefeedTestType(rt))
224225
defer stopper.Stop(ctx)
225226
stream := sm.NewStream(sID, rID)
227+
sm.RegisteringStream(sID)
226228
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
227229
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
228230
stream)
@@ -237,6 +239,7 @@ func TestStreamManagerErrorHandling(t *testing.T) {
237239
stream := sm.NewStream(sID, rID)
238240
p, h, stopper := newTestProcessor(t, withRangefeedTestType(rt))
239241
defer stopper.Stop(ctx)
242+
sm.RegisteringStream(sID)
240243
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
241244
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
242245
stream)
@@ -252,6 +255,7 @@ func TestStreamManagerErrorHandling(t *testing.T) {
252255
stream := sm.NewStream(sID, rID)
253256
p, h, stopper := newTestProcessor(t, withRangefeedTestType(rt))
254257
defer stopper.Stop(ctx)
258+
sm.RegisteringStream(sID)
255259
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
256260
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
257261
stream)

pkg/kv/kvserver/rangefeed/unbuffered_registration_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ func TestUnbufferedRegWithStreamManager(t *testing.T) {
4848
})
4949
t.Run("register 50 streams", func(t *testing.T) {
5050
for id := int64(0); id < 50; id++ {
51+
sm.RegisteringStream(id)
5152
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
5253
false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,
5354
sm.NewStream(id, r1))
@@ -142,6 +143,7 @@ func TestUnbufferedRegCorrectnessOnDisconnect(t *testing.T) {
142143
evErr.MustSetValue(&kvpb.RangeFeedError{Error: *discErr})
143144

144145
// Register one stream.
146+
sm.RegisteringStream(s1)
145147
registered, d, _ := p.Register(ctx, h.span, startTs,
146148
makeCatchUpIterator(catchUpIter, span, startTs), /* catchUpIter */
147149
true /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery,

pkg/kv/kvserver/rangefeed/unbuffered_sender.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,9 @@ func (ubs *UnbufferedSender) sendUnbuffered(event *kvpb.MuxRangeFeedEvent) error
132132
return ubs.sender.Send(event)
133133
}
134134

135+
// addStream implements sender.
136+
func (ubs *UnbufferedSender) addStream(int64) {}
137+
135138
// removeStream implements sender.
136139
func (ubs *UnbufferedSender) removeStream(int64) {}
137140

pkg/server/node.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2307,6 +2307,7 @@ func (n *Node) muxRangeFeed(muxStream kvpb.RPCInternal_MuxRangeFeedStream) error
23072307
// Disconnector returned can be used to shut down rangefeed from the
23082308
// stream manager. If rangefeed disconnects with an error after being
23092309
// successfully registered, it calls streamSink.SendError.
2310+
sm.RegisteringStream(req.StreamID)
23102311
if disconnector, err := n.stores.RangeFeed(streamCtx, req, streamSink, limiter); err != nil {
23112312
streamSink.SendError(kvpb.NewError(err))
23122313
} else {

0 commit comments

Comments
 (0)