@@ -13,6 +13,7 @@ import (
13
13
14
14
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
15
15
"github.com/cockroachdb/cockroach/pkg/roachpb"
16
+ "github.com/cockroachdb/cockroach/pkg/testutils"
16
17
"github.com/cockroachdb/cockroach/pkg/util/hlc"
17
18
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
18
19
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -34,7 +35,7 @@ func TestBufferedSenderDisconnectStream(t *testing.T) {
34
35
defer stopper .Stop (ctx )
35
36
testServerStream := newTestServerStream ()
36
37
smMetrics := NewStreamManagerMetrics ()
37
- bs := NewBufferedSender (testServerStream , NewBufferedSenderMetrics (), 1000 )
38
+ bs := NewBufferedSender (testServerStream , NewBufferedSenderMetrics ())
38
39
sm := NewStreamManager (bs , smMetrics )
39
40
require .NoError (t , sm .Start (ctx , stopper ))
40
41
defer sm .Stop (ctx )
@@ -87,7 +88,7 @@ func TestBufferedSenderChaosWithStop(t *testing.T) {
87
88
testServerStream := newTestServerStream ()
88
89
89
90
smMetrics := NewStreamManagerMetrics ()
90
- bs := NewBufferedSender (testServerStream , NewBufferedSenderMetrics (), 1000 )
91
+ bs := NewBufferedSender (testServerStream , NewBufferedSenderMetrics ())
91
92
sm := NewStreamManager (bs , smMetrics )
92
93
require .NoError (t , sm .Start (ctx , stopper ))
93
94
@@ -164,3 +165,153 @@ func TestBufferedSenderChaosWithStop(t *testing.T) {
164
165
require .Equal (t , 0 , bs .len ())
165
166
})
166
167
}
168
+
169
+ // TestBufferedSenderOnOverflow tests that BufferedSender handles overflow
170
+ // properly. It does not test the shutdown flow with stream manager.
171
+ func TestBufferedSenderOnOverflow (t * testing.T ) {
172
+ defer leaktest .AfterTest (t )()
173
+ defer log .Scope (t ).Close (t )
174
+ ctx := context .Background ()
175
+
176
+ stopper := stop .NewStopper ()
177
+ defer stopper .Stop (ctx )
178
+ testServerStream := newTestServerStream ()
179
+ bs := NewBufferedSender (testServerStream , NewBufferedSenderMetrics ())
180
+ require .Equal (t , minBufferedSenderQueueCapacity , bs .queueMu .capacity )
181
+
182
+ val1 := roachpb.Value {RawBytes : []byte ("val" ), Timestamp : hlc.Timestamp {WallTime : 1 }}
183
+ ev1 := new (kvpb.RangeFeedEvent )
184
+ ev1 .MustSetValue (& kvpb.RangeFeedValue {Key : keyA , Value : val1 })
185
+ muxEv := & kvpb.MuxRangeFeedEvent {RangeFeedEvent : * ev1 , RangeID : 0 , StreamID : 1 }
186
+
187
+ for i := int64 (0 ); i < minBufferedSenderQueueCapacity ; i ++ {
188
+ require .NoError (t , bs .sendBuffered (muxEv , nil ))
189
+ }
190
+ require .Equal (t , minBufferedSenderQueueCapacity , int64 (bs .len ()))
191
+ e , success , overflowed , remains := bs .popFront ()
192
+ require .Equal (t , sharedMuxEvent {
193
+ ev : muxEv ,
194
+ alloc : nil ,
195
+ }, e )
196
+ require .True (t , success )
197
+ require .False (t , overflowed )
198
+ require .Equal (t , minBufferedSenderQueueCapacity - 1 , remains )
199
+ require .Equal (t , minBufferedSenderQueueCapacity - 1 , int64 (bs .len ()))
200
+ require .NoError (t , bs .sendBuffered (muxEv , nil ))
201
+ require .Equal (t , minBufferedSenderQueueCapacity , int64 (bs .len ()))
202
+
203
+ // Overflow now.
204
+ require .Equal (t , bs .sendBuffered (muxEv , nil ).Error (),
205
+ newRetryErrBufferCapacityExceeded ().Error ())
206
+ }
207
+
208
+ // TestBufferedSenderOnStreamShutdown tests that BufferedSender and
209
+ // StreamManager handle overflow and shutdown properly.
210
+
211
+ func TestBufferedSenderOnStreamShutdown (t * testing.T ) {
212
+ defer leaktest .AfterTest (t )()
213
+ defer log .Scope (t ).Close (t )
214
+ ctx := context .Background ()
215
+
216
+ stopper := stop .NewStopper ()
217
+ defer stopper .Stop (ctx )
218
+ testServerStream := newTestServerStream ()
219
+ smMetrics := NewStreamManagerMetrics ()
220
+ bs := NewBufferedSender (testServerStream , NewBufferedSenderMetrics ())
221
+ require .Equal (t , minBufferedSenderQueueCapacity , bs .queueMu .capacity )
222
+ sm := NewStreamManager (bs , smMetrics )
223
+ require .NoError (t , sm .Start (ctx , stopper ))
224
+ defer sm .Stop (ctx )
225
+
226
+ p , h , pStopper := newTestProcessor (t , withRangefeedTestType (scheduledProcessorWithBufferedSender ))
227
+ defer pStopper .Stop (ctx )
228
+
229
+ val1 := roachpb.Value {RawBytes : []byte ("val" ), Timestamp : hlc.Timestamp {WallTime : 1 }}
230
+ ev1 := new (kvpb.RangeFeedEvent )
231
+ ev1 .MustSetValue (& kvpb.RangeFeedValue {Key : keyA , Value : val1 })
232
+ muxEv := & kvpb.MuxRangeFeedEvent {RangeFeedEvent : * ev1 , RangeID : 0 , StreamID : 1 }
233
+
234
+ // Add 21 streams and overflow the buffer.
235
+ t .Run ("add 21 streams" , func (t * testing.T ) {
236
+ numStreams := int64 (21 )
237
+ expectedCapacity := perUnbufferedRegCapacity * numStreams
238
+ // Block the stream to help the queue to overflow.
239
+ unblock := testServerStream .BlockSend ()
240
+ for id := int64 (0 ); id < numStreams ; id ++ {
241
+ registered , d , _ := p .Register (ctx , h .span , hlc.Timestamp {}, nil , /* catchUpIter */
242
+ false /* withDiff */ , false /* withFiltering */ , false /* withOmitRemote */ , noBulkDelivery ,
243
+ sm .NewStream (id , 1 /*rangeID*/ ))
244
+ require .True (t , registered )
245
+ sm .AddStream (id , d )
246
+ }
247
+ require .Equal (t , expectedCapacity , bs .queueMu .capacity )
248
+ for int64 (bs .len ()) != expectedCapacity {
249
+ require .NoError (t , sm .sender .sendBuffered (muxEv , nil ))
250
+ }
251
+ require .Equal (t , expectedCapacity , int64 (bs .len ()))
252
+ require .Equal (t , bs .sendBuffered (muxEv , nil ).Error (),
253
+ newRetryErrBufferCapacityExceeded ().Error ())
254
+ require .True (t , bs .queueMu .overflowed )
255
+ unblock ()
256
+ })
257
+
258
+ t .Run ("overflow clean up" , func (t * testing.T ) {
259
+ // All events buffered should still be sent to the stream.
260
+ testutils .SucceedsSoon (t , func () error {
261
+ if bs .len () == 0 {
262
+ return nil
263
+ }
264
+ return errors .Newf ("expected 0 registrations, found %d" , bs .len ())
265
+ })
266
+ // Overflow cleanup.
267
+ err := <- sm .Error ()
268
+ require .Equal (t , newRetryErrBufferCapacityExceeded ().Error (), err .Error ())
269
+ // Note that we expect the stream manager to shut down here, but no actual
270
+ // error events would be sent during the shutdown.
271
+ require .Equal (t , bs .sendBuffered (muxEv , nil ).Error (), newRetryErrBufferCapacityExceeded ().Error ())
272
+ })
273
+ }
274
+
275
+ // TestBufferedSenderOnStreamDisconnect tests that BufferedSender dynamically
276
+ // adjusts its capacity when streams are connected or disconnected.
277
+ func TestBufferedSenderOnStreamDisconnect (t * testing.T ) {
278
+ defer leaktest .AfterTest (t )()
279
+ defer log .Scope (t ).Close (t )
280
+ ctx := context .Background ()
281
+
282
+ stopper := stop .NewStopper ()
283
+ defer stopper .Stop (ctx )
284
+ testServerStream := newTestServerStream ()
285
+ smMetrics := NewStreamManagerMetrics ()
286
+ bs := NewBufferedSender (testServerStream , NewBufferedSenderMetrics ())
287
+ require .Equal (t , minBufferedSenderQueueCapacity , bs .queueMu .capacity )
288
+
289
+ sm := NewStreamManager (bs , smMetrics )
290
+ require .NoError (t , sm .Start (ctx , stopper ))
291
+
292
+ p , h , pStopper := newTestProcessor (t , withRangefeedTestType (scheduledProcessorWithBufferedSender ))
293
+ defer pStopper .Stop (ctx )
294
+ defer sm .Stop (ctx )
295
+
296
+ numStreams := int64 (21 )
297
+ expectedCapacity := perUnbufferedRegCapacity * numStreams
298
+ for id := int64 (0 ); id < numStreams ; id ++ {
299
+ registered , d , _ := p .Register (ctx , h .span , hlc.Timestamp {}, nil , /* catchUpIter */
300
+ false /* withDiff */ , false /* withFiltering */ , false /* withOmitRemote */ , noBulkDelivery ,
301
+ sm .NewStream (id , 1 /*rangeID*/ ))
302
+ require .True (t , registered )
303
+ sm .AddStream (id , d )
304
+ }
305
+ require .Equal (t , expectedCapacity , bs .queueMu .capacity )
306
+ sm .DisconnectStream (int64 (0 ), newErrBufferCapacityExceeded ())
307
+ testServerStream .waitForEvent (t , makeMuxRangefeedErrorEvent (0 , 1 , newErrBufferCapacityExceeded ()))
308
+ testutils .SucceedsSoon (t , func () error {
309
+ bs .queueMu .Lock ()
310
+ defer bs .queueMu .Unlock ()
311
+ if bs .queueMu .capacity == expectedCapacity - perUnbufferedRegCapacity {
312
+ return nil
313
+ }
314
+ return errors .Newf ("expected %d cap to be %d" , bs .queueMu .capacity ,
315
+ expectedCapacity - perUnbufferedRegCapacity )
316
+ })
317
+ }
0 commit comments