Skip to content

Commit bbd09cf

Browse files
committed
rangefeed: explicitly initialize stream status
Since we do have a window after an error where the unbufferedRegistration may send events, here we move to explicit initialization of the stream status to avoid leaking any streamStatus structs. Epic: none Release note: None
1 parent 3135118 commit bbd09cf

File tree

3 files changed

+53
-37
lines changed

3 files changed

+53
-37
lines changed

pkg/kv/kvserver/rangefeed/buffered_sender.go

Lines changed: 46 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -148,46 +148,48 @@ func (bs *BufferedSender) sendBuffered(
148148
// registration. This error should be the next event that is sent to
149149
// stream.
150150
//
151-
// NB: The zero-value of streamStatus is the valid state of a newly seen
152-
// stream.
153-
status := bs.queueMu.byStream[ev.StreamID]
154-
switch status.state {
155-
case streamActive:
156-
if bs.queueMu.perStreamCapacity > 0 && status.queueItems == bs.queueMu.perStreamCapacity {
157-
if ev.Error != nil {
158-
// If _this_ event is an error, no use sending another error. This stream
159-
// is going down. Admit this error and mark the stream as overflowed.
160-
status.state = streamOverflowed
161-
} else {
162-
// This stream is at capacity, return an error to the registration that it
163-
// should send back to us after cleaning up.
164-
status.state = streamOverflowing
165-
return newRetryErrBufferCapacityExceeded()
151+
// NB: We don't error if the stream status is not found as this may be an
152+
// event for an already closed stream. Such events are possible while the
153+
// registration publishes the catch up scan buffer.
154+
status, ok := bs.queueMu.byStream[ev.StreamID]
155+
if ok {
156+
switch status.state {
157+
case streamActive:
158+
if bs.queueMu.perStreamCapacity > 0 && status.queueItems == bs.queueMu.perStreamCapacity {
159+
if ev.Error != nil {
160+
// If _this_ event is an error, no use sending another error. This stream
161+
// is going down. Admit this error and mark the stream as overflowed.
162+
status.state = streamOverflowed
163+
} else {
164+
// This stream is at capacity, return an error to the registration that it
165+
// should send back to us after cleaning up.
166+
status.state = streamOverflowing
167+
return newRetryErrBufferCapacityExceeded()
168+
}
166169
}
170+
case streamOverflowing:
171+
// The unbufferedRegistration is the only component that sends non-error
172+
// events to our stream. In response to the error we return when moving to
173+
// stateOverflowing, it should immediately send us an error and mark itself
174+
// as disconnected. As a result, no non-error events are expected.
175+
if ev.Error == nil {
176+
panic("only error events expected after stream has exceeded capacity")
177+
}
178+
status.state = streamOverflowed
179+
case streamOverflowed:
180+
// If we are overflowed, we don't expect any further events because the
181+
// registration should have disconnected in response to the error.
182+
//
183+
// TODO(ssd): Consider adding an assertion here.
184+
return nil
185+
default:
186+
panic(fmt.Sprintf("unhandled stream state: %v", status.state))
167187
}
168-
case streamOverflowing:
169-
// The unbufferedRegistration is the only component that sends non-error
170-
// events to our stream. In response to the error we return when moving to
171-
// stateOverflowing, it should immediately send us an error and mark itself
172-
// as disconnected. As a result, no non-error events are expected.
173-
if ev.Error == nil {
174-
panic("only error events expected after stream has exceeded capacity")
175-
}
176-
status.state = streamOverflowed
177-
case streamOverflowed:
178-
// If we are overflowed, we don't expect any further events because the
179-
// registration should have disconnected in response to the error.
180-
//
181-
// TODO(ssd): Consider adding an assertion here.
182-
return nil
183-
default:
184-
panic(fmt.Sprintf("unhandled stream state: %v", status.state))
188+
// We are admitting this event.
189+
status.queueItems++
190+
bs.queueMu.byStream[ev.StreamID] = status
185191
}
186192

187-
// We are admitting this event.
188-
status.queueItems++
189-
bs.queueMu.byStream[ev.StreamID] = status
190-
191193
// TODO(wenyihu6): pass an actual context here
192194
alloc.Use(context.Background())
193195
bs.queueMu.buffer.pushBack(sharedMuxEvent{ev, alloc})
@@ -260,6 +262,14 @@ func (bs *BufferedSender) popFront() (e sharedMuxEvent, success bool) {
260262
return event, ok
261263
}
262264

265+
func (bs *BufferedSender) addStream(streamID int64) {
266+
bs.queueMu.Lock()
267+
defer bs.queueMu.Unlock()
268+
if _, ok := bs.queueMu.byStream[streamID]; !ok {
269+
bs.queueMu.byStream[streamID] = streamStatus{}
270+
}
271+
}
272+
263273
func (bs *BufferedSender) removeStream(streamID int64) {
264274
bs.queueMu.Lock()
265275
defer bs.queueMu.Unlock()

pkg/kv/kvserver/rangefeed/stream_manager.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ type sender interface {
7676
// all streams in StreamManager.
7777
run(ctx context.Context, stopper *stop.Stopper, onError func(int64)) error
7878

79-
// Remove stream is called when an individual stream is being removed.
79+
// addStream is called when an individual stream is being added.
80+
addStream(streamID int64)
81+
// removeStream is called when an individual stream is being removed.
8082
removeStream(streamID int64)
8183

8284
// cleanup is called when the sender is stopped. It is expected to clean up
@@ -156,6 +158,7 @@ func (sm *StreamManager) AddStream(streamID int64, d Disconnector) {
156158
if _, ok := sm.streams.m[streamID]; ok {
157159
log.KvDistribution.Fatalf(context.Background(), "stream %d already exists", streamID)
158160
}
161+
sm.sender.addStream(streamID)
159162
sm.streams.m[streamID] = d
160163
sm.metrics.ActiveMuxRangeFeed.Inc(1)
161164
sm.metrics.NumMuxRangeFeed.Inc(1)

pkg/kv/kvserver/rangefeed/unbuffered_sender.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,9 @@ func (ubs *UnbufferedSender) sendUnbuffered(event *kvpb.MuxRangeFeedEvent) error
132132
return ubs.sender.Send(event)
133133
}
134134

135+
// addStream implements sender.
136+
func (ubs *UnbufferedSender) addStream(int64) {}
137+
135138
// removeStream implements sender.
136139
func (ubs *UnbufferedSender) removeStream(int64) {}
137140

0 commit comments

Comments
 (0)