@@ -122,18 +122,16 @@ func NewBufferedSender(
122122 sender : sender ,
123123 metrics : bsMetrics ,
124124 }
125- bs .queueMu .buffer = newEventQueue ()
126125 bs .notifyDataC = make (chan struct {}, 1 )
127126 bs .queueMu .buffer = newEventQueue ()
128127 bs .queueMu .perStreamCapacity = RangefeedSingleBufferedSenderQueueMaxPerReg .Get (& settings .SV )
129128 bs .queueMu .byStream = make (map [int64 ]streamStatus )
130129 return bs
131130}
132131
133- // sendBuffered buffers the event before sending it to the underlying
134- // gRPC stream. It does not block. sendBuffered will take the
135- // ownership of the alloc and release it if the returned error is
136- // non-nil. It only errors in the case of an already stopped stream.
132+ // sendBuffered buffers the event before sending it to the underlying gRPC
133+ // stream. It does not block. It errors in the case of a stopped sender of if
134+ // the registration has exceeded its capacity.
137135func (bs * BufferedSender ) sendBuffered (
138136 ev * kvpb.MuxRangeFeedEvent , alloc * SharedBudgetAllocation ,
139137) error {
@@ -174,7 +172,12 @@ func (bs *BufferedSender) sendBuffered(
174172 // as disconnected.
175173 //
176174 // The only unfortunate exception is if we get disconnected while flushing
177- // the catch-up scan buffer.
175+ // the catch-up scan buffer. In this case we admit the event and stay in
176+ // state overflowing until we actually receive the error.
177+ //
178+ // TODO(ssd): Given the above exception, we should perhaps just move
179+ // directly to streamOverflowed. But, I think instead we want to remove
180+ // that exception if possible.
178181 if ev .Error != nil {
179182 status .state = streamOverflowed
180183 }
@@ -256,13 +259,14 @@ func (bs *BufferedSender) popFront() (e sharedMuxEvent, success bool) {
256259 if ok {
257260 state , streamFound := bs .queueMu .byStream [event .ev .StreamID ]
258261 if streamFound {
259- state .queueItems -= 1
262+ state .queueItems --
260263 bs .queueMu .byStream [event .ev .StreamID ] = state
261264 }
262265 }
263266 return event , ok
264267}
265268
269+ // addStream initializes the per-stream tracking for the given streamID.
266270func (bs * BufferedSender ) addStream (streamID int64 ) {
267271 bs .queueMu .Lock ()
268272 defer bs .queueMu .Unlock ()
@@ -275,6 +279,12 @@ func (bs *BufferedSender) addStream(streamID int64) {
275279 }
276280}
277281
282+ // removeStream removes the per-stream state tracking from the sender.
283+ //
284+ // TODO(ssd): There may be items still in the queue when removeStream is called.
285+ // We'd like to solve this by removing this as a possibility. But this is OK
286+ // since we will eventually process the events and the client knows to ignore
287+ // them.
278288func (bs * BufferedSender ) removeStream (streamID int64 ) {
279289 bs .queueMu .Lock ()
280290 defer bs .queueMu .Unlock ()
@@ -308,10 +318,7 @@ func (bs *BufferedSender) waitForEmptyBuffer(ctx context.Context) error {
308318 MaxRetries : 50 ,
309319 }
310320 for re := retry .StartWithCtx (ctx , opts ); re .Next (); {
311- bs .queueMu .Lock ()
312- caughtUp := bs .queueMu .buffer .len () == 0 // nolint:deferunlockcheck
313- bs .queueMu .Unlock ()
314- if caughtUp {
321+ if bs .len () == 0 {
315322 return nil
316323 }
317324 }
0 commit comments