@@ -13,6 +13,7 @@ import (
13
13
14
14
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
15
15
"github.com/cockroachdb/cockroach/pkg/roachpb"
16
+ "github.com/cockroachdb/cockroach/pkg/settings/cluster"
16
17
"github.com/cockroachdb/cockroach/pkg/testutils"
17
18
"github.com/cockroachdb/cockroach/pkg/util/hlc"
18
19
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -35,7 +36,8 @@ func TestBufferedSenderDisconnectStream(t *testing.T) {
35
36
defer stopper .Stop (ctx )
36
37
testServerStream := newTestServerStream ()
37
38
smMetrics := NewStreamManagerMetrics ()
38
- bs := NewBufferedSender (testServerStream , NewBufferedSenderMetrics ())
39
+ st := cluster .MakeTestingClusterSettings ()
40
+ bs := NewBufferedSender (testServerStream , st , NewBufferedSenderMetrics ())
39
41
sm := NewStreamManager (bs , smMetrics )
40
42
require .NoError (t , sm .Start (ctx , stopper ))
41
43
defer sm .Stop (ctx )
@@ -88,7 +90,8 @@ func TestBufferedSenderChaosWithStop(t *testing.T) {
88
90
testServerStream := newTestServerStream ()
89
91
90
92
smMetrics := NewStreamManagerMetrics ()
91
- bs := NewBufferedSender (testServerStream , NewBufferedSenderMetrics ())
93
+ st := cluster .MakeTestingClusterSettings ()
94
+ bs := NewBufferedSender (testServerStream , st , NewBufferedSenderMetrics ())
92
95
sm := NewStreamManager (bs , smMetrics )
93
96
require .NoError (t , sm .Start (ctx , stopper ))
94
97
@@ -176,29 +179,33 @@ func TestBufferedSenderOnOverflow(t *testing.T) {
176
179
stopper := stop .NewStopper ()
177
180
defer stopper .Stop (ctx )
178
181
testServerStream := newTestServerStream ()
179
- bs := NewBufferedSender (testServerStream , NewBufferedSenderMetrics ())
180
- require .Equal (t , minBufferedSenderQueueCapacity , bs .queueMu .capacity )
182
+ st := cluster .MakeTestingClusterSettings ()
183
+
184
+ queueCap := int64 (24 )
185
+ RangefeedSingleBufferedSenderQueueMaxSize .Override (ctx , & st .SV , queueCap )
186
+ bs := NewBufferedSender (testServerStream , st , NewBufferedSenderMetrics ())
187
+ require .Equal (t , queueCap , bs .queueMu .capacity )
181
188
182
189
val1 := roachpb.Value {RawBytes : []byte ("val" ), Timestamp : hlc.Timestamp {WallTime : 1 }}
183
190
ev1 := new (kvpb.RangeFeedEvent )
184
191
ev1 .MustSetValue (& kvpb.RangeFeedValue {Key : keyA , Value : val1 })
185
192
muxEv := & kvpb.MuxRangeFeedEvent {RangeFeedEvent : * ev1 , RangeID : 0 , StreamID : 1 }
186
193
187
- for i := int64 ( 0 ); i < minBufferedSenderQueueCapacity ; i ++ {
194
+ for range queueCap {
188
195
require .NoError (t , bs .sendBuffered (muxEv , nil ))
189
196
}
190
- require .Equal (t , minBufferedSenderQueueCapacity , int64 (bs .len ()))
197
+ require .Equal (t , queueCap , int64 (bs .len ()))
191
198
e , success , overflowed , remains := bs .popFront ()
192
199
require .Equal (t , sharedMuxEvent {
193
200
ev : muxEv ,
194
201
alloc : nil ,
195
202
}, e )
196
203
require .True (t , success )
197
204
require .False (t , overflowed )
198
- require .Equal (t , minBufferedSenderQueueCapacity - 1 , remains )
199
- require .Equal (t , minBufferedSenderQueueCapacity - 1 , int64 (bs .len ()))
205
+ require .Equal (t , queueCap - 1 , remains )
206
+ require .Equal (t , queueCap - 1 , int64 (bs .len ()))
200
207
require .NoError (t , bs .sendBuffered (muxEv , nil ))
201
- require .Equal (t , minBufferedSenderQueueCapacity , int64 (bs .len ()))
208
+ require .Equal (t , queueCap , int64 (bs .len ()))
202
209
203
210
// Overflow now.
204
211
require .Equal (t , bs .sendBuffered (muxEv , nil ).Error (),
@@ -207,7 +214,6 @@ func TestBufferedSenderOnOverflow(t *testing.T) {
207
214
208
215
// TestBufferedSenderOnStreamShutdown tests that BufferedSender and
209
216
// StreamManager handle overflow and shutdown properly.
210
-
211
217
func TestBufferedSenderOnStreamShutdown (t * testing.T ) {
212
218
defer leaktest .AfterTest (t )()
213
219
defer log .Scope (t ).Close (t )
@@ -217,101 +223,70 @@ func TestBufferedSenderOnStreamShutdown(t *testing.T) {
217
223
defer stopper .Stop (ctx )
218
224
testServerStream := newTestServerStream ()
219
225
smMetrics := NewStreamManagerMetrics ()
220
- bs := NewBufferedSender (testServerStream , NewBufferedSenderMetrics ())
221
- require .Equal (t , minBufferedSenderQueueCapacity , bs .queueMu .capacity )
226
+ st := cluster .MakeTestingClusterSettings ()
227
+
228
+ queueCap := int64 (24 )
229
+ RangefeedSingleBufferedSenderQueueMaxSize .Override (ctx , & st .SV , queueCap )
230
+ bs := NewBufferedSender (testServerStream , st , NewBufferedSenderMetrics ())
231
+ require .Equal (t , queueCap , bs .queueMu .capacity )
232
+
222
233
sm := NewStreamManager (bs , smMetrics )
223
234
require .NoError (t , sm .Start (ctx , stopper ))
224
235
defer sm .Stop (ctx )
225
236
226
237
p , h , pStopper := newTestProcessor (t , withRangefeedTestType (scheduledProcessorWithBufferedSender ))
227
238
defer pStopper .Stop (ctx )
228
239
240
+ streamID := int64 (42 )
241
+
229
242
val1 := roachpb.Value {RawBytes : []byte ("val" ), Timestamp : hlc.Timestamp {WallTime : 1 }}
230
243
ev1 := new (kvpb.RangeFeedEvent )
231
244
ev1 .MustSetValue (& kvpb.RangeFeedValue {Key : keyA , Value : val1 })
232
- muxEv := & kvpb.MuxRangeFeedEvent {RangeFeedEvent : * ev1 , RangeID : 0 , StreamID : 1 }
245
+ muxEv := & kvpb.MuxRangeFeedEvent {RangeFeedEvent : * ev1 , RangeID : 0 , StreamID : streamID }
233
246
234
- // Add 21 streams and overflow the buffer.
235
- t .Run ("add 21 streams" , func (t * testing.T ) {
236
- numStreams := int64 (21 )
237
- expectedCapacity := perUnbufferedRegCapacity * numStreams
238
- // Block the stream to help the queue to overflow.
239
- unblock := testServerStream .BlockSend ()
240
- for id := int64 (0 ); id < numStreams ; id ++ {
241
- registered , d , _ := p .Register (ctx , h .span , hlc.Timestamp {}, nil , /* catchUpIter */
242
- false /* withDiff */ , false /* withFiltering */ , false /* withOmitRemote */ , noBulkDelivery ,
243
- sm .NewStream (id , 1 /*rangeID*/ ))
244
- require .True (t , registered )
245
- sm .AddStream (id , d )
246
- }
247
- require .Equal (t , expectedCapacity , bs .queueMu .capacity )
248
- for int64 (bs .len ()) != expectedCapacity {
249
- require .NoError (t , sm .sender .sendBuffered (muxEv , nil ))
250
- }
251
- require .Equal (t , expectedCapacity , int64 (bs .len ()))
252
- require .Equal (t , bs .sendBuffered (muxEv , nil ).Error (),
253
- newRetryErrBufferCapacityExceeded ().Error ())
254
- require .True (t , bs .queueMu .overflowed )
255
- unblock ()
256
- })
247
+ // Block the stream so that we can overflow later.
248
+ unblock := testServerStream .BlockSend ()
249
+ defer unblock ()
257
250
258
- t .Run ("overflow clean up" , func (t * testing.T ) {
259
- // All events buffered should still be sent to the stream.
251
+ waitForQueueLen := func (len int ) {
260
252
testutils .SucceedsSoon (t , func () error {
261
- if bs .len () == 0 {
253
+ if bs .len () == len {
262
254
return nil
263
255
}
264
- return errors .Newf ("expected 0 registrations , found %d" , bs .len ())
256
+ return errors .Newf ("expected %d events , found %d" , len , bs .len ())
265
257
})
266
- // Overflow cleanup.
267
- err := <- sm .Error ()
268
- require .Equal (t , newRetryErrBufferCapacityExceeded ().Error (), err .Error ())
269
- // Note that we expect the stream manager to shut down here, but no actual
270
- // error events would be sent during the shutdown.
271
- require .Equal (t , bs .sendBuffered (muxEv , nil ).Error (), newRetryErrBufferCapacityExceeded ().Error ())
272
- })
273
- }
274
-
275
- // TestBufferedSenderOnStreamDisconnect tests that BufferedSender dynamically
276
- // adjusts its capacity when streams are connected or disconnected.
277
- func TestBufferedSenderOnStreamDisconnect (t * testing.T ) {
278
- defer leaktest .AfterTest (t )()
279
- defer log .Scope (t ).Close (t )
280
- ctx := context .Background ()
281
-
282
- stopper := stop .NewStopper ()
283
- defer stopper .Stop (ctx )
284
- testServerStream := newTestServerStream ()
285
- smMetrics := NewStreamManagerMetrics ()
286
- bs := NewBufferedSender (testServerStream , NewBufferedSenderMetrics ())
287
- require .Equal (t , minBufferedSenderQueueCapacity , bs .queueMu .capacity )
288
-
289
- sm := NewStreamManager (bs , smMetrics )
290
- require .NoError (t , sm .Start (ctx , stopper ))
291
-
292
- p , h , pStopper := newTestProcessor (t , withRangefeedTestType (scheduledProcessorWithBufferedSender ))
293
- defer pStopper .Stop (ctx )
294
- defer sm .Stop (ctx )
258
+ }
295
259
296
- numStreams := int64 (21 )
297
- expectedCapacity := perUnbufferedRegCapacity * numStreams
298
- for id := int64 (0 ); id < numStreams ; id ++ {
299
- registered , d , _ := p .Register (ctx , h .span , hlc.Timestamp {}, nil , /* catchUpIter */
300
- false /* withDiff */ , false /* withFiltering */ , false /* withOmitRemote */ , noBulkDelivery ,
301
- sm .NewStream (id , 1 /*rangeID*/ ))
302
- require .True (t , registered )
303
- sm .AddStream (id , d )
260
+ // Add our stream to the stream manager.
261
+ registered , d , _ := p .Register (ctx , h .span , hlc.Timestamp {}, nil , /* catchUpIter */
262
+ false /* withDiff */ , false /* withFiltering */ , false /* withOmitRemote */ , noBulkDelivery ,
263
+ sm .NewStream (streamID , 1 /*rangeID*/ ))
264
+ require .True (t , registered )
265
+ sm .AddStream (streamID , d )
266
+
267
+ require .NoError (t , sm .sender .sendBuffered (muxEv , nil ))
268
+ // At this point we actually have sent 2 events. 1 checkpoint event sent by
269
+ // register and 1 event sent on the line above. We wait for 1 of these events
270
+ // to be pulled off the queue and block in the sender, leaving 1 in the queue.
271
+ waitForQueueLen (1 )
272
+ // Now fill the rest of the queue.
273
+ for range queueCap - 1 {
274
+ require .NoError (t , sm .sender .sendBuffered (muxEv , nil ))
304
275
}
305
- require .Equal (t , expectedCapacity , bs .queueMu .capacity )
306
- sm .DisconnectStream (int64 (0 ), newErrBufferCapacityExceeded ())
307
- testServerStream .waitForEvent (t , makeMuxRangefeedErrorEvent (0 , 1 , newErrBufferCapacityExceeded ()))
308
- testutils .SucceedsSoon (t , func () error {
309
- bs .queueMu .Lock ()
310
- defer bs .queueMu .Unlock ()
311
- if bs .queueMu .capacity == expectedCapacity - perUnbufferedRegCapacity {
312
- return nil
313
- }
314
- return errors .Newf ("expected %d cap to be %d" , bs .queueMu .capacity ,
315
- expectedCapacity - perUnbufferedRegCapacity )
316
- })
276
+
277
+ // The next write should overflow.
278
+ capExceededErrStr := newRetryErrBufferCapacityExceeded ().Error ()
279
+ err := sm .sender .sendBuffered (muxEv , nil )
280
+ require .EqualError (t , err , capExceededErrStr )
281
+ require .True (t , bs .overflowed ())
282
+
283
+ unblock ()
284
+ waitForQueueLen (0 )
285
+ // Overflow cleanup.
286
+ err = <- sm .Error ()
287
+ require .EqualError (t , err , capExceededErrStr )
288
+ // Note that we expect the stream manager to shut down here, but no actual
289
+ // error events would be sent during the shutdown.
290
+ err = sm .sender .sendBuffered (muxEv , nil )
291
+ require .EqualError (t , err , capExceededErrStr )
317
292
}
0 commit comments