@@ -142,6 +142,7 @@ func rangeFeedCheckpoint(span roachpb.Span, ts hlc.Timestamp) *kvpb.RangeFeedEve
142142}
143143
144144const testProcessorEventCCap = 16
145+ const testProcessorEventCTimeout = 10 * time .Millisecond
145146
146147func newTestProcessorWithTxnPusher (
147148 t * testing.T , rtsIter storage.SimpleMVCCIterator , txnPusher TxnPusher ,
@@ -161,6 +162,7 @@ func newTestProcessorWithTxnPusher(
161162 TxnPusher : txnPusher ,
162163 PushTxnsInterval : pushTxnInterval ,
163164 PushTxnsAge : pushTxnAge ,
165+ EventChanTimeout : testProcessorEventCTimeout ,
164166 EventChanCap : testProcessorEventCCap ,
165167 Metrics : NewMetrics (),
166168 })
@@ -1128,6 +1130,7 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) {
11281130 PushTxnsInterval : pushTxnInterval ,
11291131 PushTxnsAge : pushTxnAge ,
11301132 EventChanCap : channelCapacity ,
1133+ EventChanTimeout : time .Millisecond ,
11311134 MemBudget : fb ,
11321135 Metrics : NewMetrics (),
11331136 })
@@ -1218,6 +1221,7 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) {
12181221 PushTxnsInterval : pushTxnInterval ,
12191222 PushTxnsAge : pushTxnAge ,
12201223 EventChanCap : channelCapacity ,
1224+ EventChanTimeout : time .Millisecond ,
12211225 MemBudget : fb ,
12221226 Metrics : NewMetrics (),
12231227 })
@@ -1297,6 +1301,7 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) {
12971301 PushTxnsInterval : pushTxnInterval ,
12981302 PushTxnsAge : pushTxnAge ,
12991303 EventChanCap : channelCapacity ,
1304+ EventChanTimeout : time .Millisecond ,
13001305 MemBudget : fb ,
13011306 Metrics : NewMetrics (),
13021307 })
@@ -1458,3 +1463,81 @@ func TestSizeOfEvent(t *testing.T) {
14581463 size := int (unsafe .Sizeof (e ))
14591464 require .Equal (t , 72 , size )
14601465}
1466+
1467+ // TestProcessorBackpressure tests that a processor with EventChanTimeout set to
1468+ // 0 will backpressure senders when a consumer isn't keeping up.
1469+ func TestProcessorBackpressure (t * testing.T ) {
1470+ defer leaktest .AfterTest (t )()
1471+
1472+ ctx , cancel := context .WithCancel (context .Background ())
1473+ defer cancel ()
1474+ stopper := stop .NewStopper ()
1475+ defer stopper .Stop (ctx )
1476+
1477+ span := roachpb.RSpan {Key : roachpb .RKey ("a" ), EndKey : roachpb .RKey ("z" )}
1478+
1479+ // Set up processor.
1480+ p := NewProcessor (Config {
1481+ AmbientContext : log .MakeTestingAmbientContext (nil ),
1482+ Clock : hlc .NewClockForTesting (nil ),
1483+ Metrics : NewMetrics (),
1484+ Span : span ,
1485+ MemBudget : newTestBudget (math .MaxInt64 ),
1486+ EventChanCap : 1 ,
1487+ EventChanTimeout : 0 ,
1488+ })
1489+ require .NoError (t , p .Start (stopper , nil ))
1490+ defer p .Stop ()
1491+
1492+ // Add a registration.
1493+ stream := newTestStream ()
1494+ done := & future.ErrorFuture {}
1495+ ok , _ := p .Register (span , hlc .MinTimestamp , nil , false , stream , nil , done )
1496+ require .True (t , ok )
1497+
1498+ // Wait for the initial checkpoint.
1499+ p .syncEventAndRegistrations ()
1500+ require .Len (t , stream .Events (), 1 )
1501+
1502+ // Block the registration consumer, and spawn a goroutine to post events to
1503+ // the stream, which should block. The rangefeed pipeline buffers a few
1504+ // additional events in intermediate goroutines between channels, so post 10
1505+ // events to be sure.
1506+ unblock := stream .BlockSend ()
1507+ defer unblock ()
1508+
1509+ const numEvents = 10
1510+ doneC := make (chan struct {})
1511+ go func () {
1512+ for i := 0 ; i < numEvents ; i ++ {
1513+ assert .True (t , p .ForwardClosedTS (ctx , hlc.Timestamp {WallTime : int64 (i + 1 )}))
1514+ }
1515+ close (doneC )
1516+ }()
1517+
1518+ // The sender should be blocked for at least 3 seconds.
1519+ select {
1520+ case <- doneC :
1521+ t .Fatal ("send unexpectely succeeded" )
1522+ case <- time .After (3 * time .Second ):
1523+ case <- ctx .Done ():
1524+ }
1525+
1526+ // Unblock the sender, and wait for it to complete.
1527+ unblock ()
1528+ select {
1529+ case <- doneC :
1530+ case <- time .After (time .Second ):
1531+ t .Fatal ("sender did not complete" )
1532+ }
1533+
1534+ // Wait for the final checkpoint event.
1535+ p .syncEventAndRegistrations ()
1536+ events := stream .Events ()
1537+ require .Equal (t , & kvpb.RangeFeedEvent {
1538+ Checkpoint : & kvpb.RangeFeedCheckpoint {
1539+ Span : span .AsRawSpanWithNoLocals (),
1540+ ResolvedTS : hlc.Timestamp {WallTime : numEvents },
1541+ },
1542+ }, events [len (events )- 1 ])
1543+ }
0 commit comments