Skip to content

Commit 7a063ff

Browse files
craig[bot]erikgrinaker
andcommitted
107611: rangefeed: backpressure senders on `EventChanTimeout=0` r=erikgrinaker a=erikgrinaker **rangefeed: backpressure senders on `EventChanTimeout=0`** This patch makes rangefeeds backpressure senders instead of disconnecting clients when `EventChanTimeout` is set to 0. This will be useful for benchmarks, where backpressuring the workload will more accurately measure the overhead of rangefeeds instead of randomly introducing disconnects and catchup scans. **kvserver: add `COCKROACH_RANGEFEED_SEND_TIMEOUT`** This patch adds `COCKROACH_RANGEFEED_SEND_TIMEOUT` controlling the rangefeed channel send timeout. This is primarily intended for disabling the timeout entirely in benchmarks, instead backpressuring writers. Touches cockroachdb#107070. Epic: none Release note: None Co-authored-by: Erik Grinaker <[email protected]>
2 parents c3f3951 + ddbefa8 commit 7a063ff

File tree

5 files changed

+139
-20
lines changed

5 files changed

+139
-20
lines changed

pkg/kv/kvserver/rangefeed/processor.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@ type Config struct {
6464
// EventChanCap specifies the capacity to give to the Processor's input
6565
// channel.
6666
EventChanCap int
67-
// EventChanTimeout specifies the maximum duration that methods will
68-
// wait to send on the Processor's input channel before giving up and
69-
// shutting down the Processor. 0 for no timeout.
67+
// EventChanTimeout specifies the maximum time to wait when sending on the
68+
// Processor's input channel before giving up and shutting down the Processor.
69+
// 0 disables the timeout, backpressuring writers up through Raft (for tests).
7070
EventChanTimeout time.Duration
7171

7272
// Metrics is for production monitoring of RangeFeeds.
@@ -476,10 +476,10 @@ func (p *Processor) Register(
476476
// it should see these events during its catch up scan.
477477
p.syncEventC()
478478

479+
blockWhenFull := p.Config.EventChanTimeout == 0 // for testing
479480
r := newRegistration(
480481
span.AsRawSpanWithNoLocals(), startTS, catchUpIterConstructor, withDiff,
481-
p.Config.EventChanCap, p.Metrics, stream, disconnectFn, done,
482-
)
482+
p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn, done)
483483
select {
484484
case p.regC <- r:
485485
// Wait for response.

pkg/kv/kvserver/rangefeed/processor_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ func rangeFeedCheckpoint(span roachpb.Span, ts hlc.Timestamp) *kvpb.RangeFeedEve
142142
}
143143

144144
const testProcessorEventCCap = 16
145+
const testProcessorEventCTimeout = 10 * time.Millisecond
145146

146147
func newTestProcessorWithTxnPusher(
147148
t *testing.T, rtsIter storage.SimpleMVCCIterator, txnPusher TxnPusher,
@@ -161,6 +162,7 @@ func newTestProcessorWithTxnPusher(
161162
TxnPusher: txnPusher,
162163
PushTxnsInterval: pushTxnInterval,
163164
PushTxnsAge: pushTxnAge,
165+
EventChanTimeout: testProcessorEventCTimeout,
164166
EventChanCap: testProcessorEventCCap,
165167
Metrics: NewMetrics(),
166168
})
@@ -1128,6 +1130,7 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) {
11281130
PushTxnsInterval: pushTxnInterval,
11291131
PushTxnsAge: pushTxnAge,
11301132
EventChanCap: channelCapacity,
1133+
EventChanTimeout: time.Millisecond,
11311134
MemBudget: fb,
11321135
Metrics: NewMetrics(),
11331136
})
@@ -1218,6 +1221,7 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) {
12181221
PushTxnsInterval: pushTxnInterval,
12191222
PushTxnsAge: pushTxnAge,
12201223
EventChanCap: channelCapacity,
1224+
EventChanTimeout: time.Millisecond,
12211225
MemBudget: fb,
12221226
Metrics: NewMetrics(),
12231227
})
@@ -1297,6 +1301,7 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) {
12971301
PushTxnsInterval: pushTxnInterval,
12981302
PushTxnsAge: pushTxnAge,
12991303
EventChanCap: channelCapacity,
1304+
EventChanTimeout: time.Millisecond,
13001305
MemBudget: fb,
13011306
Metrics: NewMetrics(),
13021307
})
@@ -1458,3 +1463,81 @@ func TestSizeOfEvent(t *testing.T) {
14581463
size := int(unsafe.Sizeof(e))
14591464
require.Equal(t, 72, size)
14601465
}
1466+
1467+
// TestProcessorBackpressure tests that a processor with EventChanTimeout set to
1468+
// 0 will backpressure senders when a consumer isn't keeping up.
1469+
func TestProcessorBackpressure(t *testing.T) {
1470+
defer leaktest.AfterTest(t)()
1471+
1472+
ctx, cancel := context.WithCancel(context.Background())
1473+
defer cancel()
1474+
stopper := stop.NewStopper()
1475+
defer stopper.Stop(ctx)
1476+
1477+
span := roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")}
1478+
1479+
// Set up processor.
1480+
p := NewProcessor(Config{
1481+
AmbientContext: log.MakeTestingAmbientContext(nil),
1482+
Clock: hlc.NewClockForTesting(nil),
1483+
Metrics: NewMetrics(),
1484+
Span: span,
1485+
MemBudget: newTestBudget(math.MaxInt64),
1486+
EventChanCap: 1,
1487+
EventChanTimeout: 0,
1488+
})
1489+
require.NoError(t, p.Start(stopper, nil))
1490+
defer p.Stop()
1491+
1492+
// Add a registration.
1493+
stream := newTestStream()
1494+
done := &future.ErrorFuture{}
1495+
ok, _ := p.Register(span, hlc.MinTimestamp, nil, false, stream, nil, done)
1496+
require.True(t, ok)
1497+
1498+
// Wait for the initial checkpoint.
1499+
p.syncEventAndRegistrations()
1500+
require.Len(t, stream.Events(), 1)
1501+
1502+
// Block the registration consumer, and spawn a goroutine to post events to
1503+
// the stream, which should block. The rangefeed pipeline buffers a few
1504+
// additional events in intermediate goroutines between channels, so post 10
1505+
// events to be sure.
1506+
unblock := stream.BlockSend()
1507+
defer unblock()
1508+
1509+
const numEvents = 10
1510+
doneC := make(chan struct{})
1511+
go func() {
1512+
for i := 0; i < numEvents; i++ {
1513+
assert.True(t, p.ForwardClosedTS(ctx, hlc.Timestamp{WallTime: int64(i + 1)}))
1514+
}
1515+
close(doneC)
1516+
}()
1517+
1518+
// The sender should be blocked for at least 3 seconds.
1519+
select {
1520+
case <-doneC:
1521+
t.Fatal("send unexpectely succeeded")
1522+
case <-time.After(3 * time.Second):
1523+
case <-ctx.Done():
1524+
}
1525+
1526+
// Unblock the sender, and wait for it to complete.
1527+
unblock()
1528+
select {
1529+
case <-doneC:
1530+
case <-time.After(time.Second):
1531+
t.Fatal("sender did not complete")
1532+
}
1533+
1534+
// Wait for the final checkpoint event.
1535+
p.syncEventAndRegistrations()
1536+
events := stream.Events()
1537+
require.Equal(t, &kvpb.RangeFeedEvent{
1538+
Checkpoint: &kvpb.RangeFeedCheckpoint{
1539+
Span: span.AsRawSpanWithNoLocals(),
1540+
ResolvedTS: hlc.Timestamp{WallTime: numEvents},
1541+
},
1542+
}, events[len(events)-1])
1543+
}

pkg/kv/kvserver/rangefeed/registry.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,10 @@ type registration struct {
9595
done *future.ErrorFuture
9696
unreg func()
9797
// Internal.
98-
id int64
99-
keys interval.Range
100-
buf chan *sharedEvent
98+
id int64
99+
keys interval.Range
100+
buf chan *sharedEvent
101+
blockWhenFull bool // if true, block when buf is full (for tests)
101102

102103
mu struct {
103104
sync.Locker
@@ -125,6 +126,7 @@ func newRegistration(
125126
catchUpIterConstructor CatchUpIteratorConstructor,
126127
withDiff bool,
127128
bufferSz int,
129+
blockWhenFull bool,
128130
metrics *Metrics,
129131
stream Stream,
130132
unregisterFn func(),
@@ -140,6 +142,7 @@ func newRegistration(
140142
done: done,
141143
unreg: unregisterFn,
142144
buf: make(chan *sharedEvent, bufferSz),
145+
blockWhenFull: blockWhenFull,
143146
}
144147
r.mu.Locker = &syncutil.Mutex{}
145148
r.mu.caughtUp = true
@@ -167,6 +170,22 @@ func (r *registration) publish(
167170
case r.buf <- e:
168171
r.mu.caughtUp = false
169172
default:
173+
// If we're asked to block (in tests), do a blocking send after releasing
174+
// the mutex -- otherwise, the output loop won't be able to consume from the
175+
// channel. We optimistically attempt the non-blocking send above first,
176+
// since we're already holding the mutex.
177+
if r.blockWhenFull {
178+
r.mu.Unlock()
179+
select {
180+
case r.buf <- e:
181+
r.mu.Lock()
182+
r.mu.caughtUp = false
183+
case <-ctx.Done():
184+
r.mu.Lock()
185+
alloc.Release(ctx)
186+
}
187+
return
188+
}
170189
// Buffer exceeded and we are dropping this event. Registration will need
171190
// a catch-up scan.
172191
r.mu.overflowed = true

pkg/kv/kvserver/rangefeed/registry_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ package rangefeed
1313
import (
1414
"context"
1515
"fmt"
16+
"sync"
1617
"testing"
1718

1819
_ "github.com/cockroachdb/cockroach/pkg/keys" // hook up pretty printer
@@ -88,7 +89,10 @@ func (s *testStream) Events() []*kvpb.RangeFeedEvent {
8889

8990
func (s *testStream) BlockSend() func() {
9091
s.mu.Lock()
91-
return s.mu.Unlock
92+
var once sync.Once
93+
return func() {
94+
once.Do(s.mu.Unlock) // safe to call multiple times, e.g. defer and explicit
95+
}
9296
}
9397

9498
type testRegistration struct {
@@ -119,6 +123,7 @@ func newTestRegistration(
119123
makeCatchUpIteratorConstructor(catchup),
120124
withDiff,
121125
5,
126+
false, /* blockWhenFull */
122127
NewMetrics(),
123128
s,
124129
func() {},

pkg/kv/kvserver/replica_rangefeed.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/cockroachdb/cockroach/pkg/storage"
2929
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
3030
"github.com/cockroachdb/cockroach/pkg/util/admission"
31+
"github.com/cockroachdb/cockroach/pkg/util/envutil"
3132
"github.com/cockroachdb/cockroach/pkg/util/future"
3233
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3334
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -73,6 +74,27 @@ var RangeFeedSmearInterval = settings.RegisterDurationSetting(
7374
settings.NonNegativeDuration,
7475
)
7576

77+
// defaultEventChanCap is the channel capacity of the rangefeed processor and
78+
// each registration.
79+
//
80+
// The size of an event is 72 bytes, so this will result in an allocation on the
81+
// order of ~300KB per RangeFeed. That's probably ok given the number of ranges
82+
// on a node that we'd like to support with active rangefeeds, but it's
83+
// certainly on the upper end of the range.
84+
//
85+
// TODO(dan): Everyone seems to agree that this memory limit would be better set
86+
// at a store-wide level, but there doesn't seem to be an easy way to accomplish
87+
// that.
88+
const defaultEventChanCap = 4096
89+
90+
// defaultEventChanTimeout is the send timeout for events published to a
91+
// rangefeed processor or rangefeed client channels. When exceeded, the
92+
// rangefeed or client is disconnected to prevent blocking foreground traffic
93+
// for longer than this timeout. When set to 0, clients are never disconnected,
94+
// and slow consumers will backpressure writers up through Raft.
95+
var defaultEventChanTimeout = envutil.EnvOrDefaultDuration(
96+
"COCKROACH_RANGEFEED_SEND_TIMEOUT", 50*time.Millisecond)
97+
7698
// lockedRangefeedStream is an implementation of rangefeed.Stream which provides
7799
// support for concurrent calls to Send. Note that the default implementation of
78100
// grpc.Stream is not safe for concurrent calls to Send.
@@ -293,16 +315,6 @@ func (r *Replica) updateRangefeedFilterLocked() bool {
293315
return false
294316
}
295317

296-
// The size of an event is 72 bytes, so this will result in an allocation on
297-
// the order of ~300KB per RangeFeed. That's probably ok given the number of
298-
// ranges on a node that we'd like to support with active rangefeeds, but it's
299-
// certainly on the upper end of the range.
300-
//
301-
// TODO(dan): Everyone seems to agree that this memory limit would be better set
302-
// at a store-wide level, but there doesn't seem to be an easy way to accomplish
303-
// that.
304-
const defaultEventChanCap = 4096
305-
306318
// Rangefeed registration takes place under the raftMu, so log if we ever hold
307319
// the mutex for too long, as this could affect foreground traffic.
308320
//
@@ -372,7 +384,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
372384
PushTxnsInterval: r.store.TestingKnobs().RangeFeedPushTxnsInterval,
373385
PushTxnsAge: r.store.TestingKnobs().RangeFeedPushTxnsAge,
374386
EventChanCap: defaultEventChanCap,
375-
EventChanTimeout: 50 * time.Millisecond,
387+
EventChanTimeout: defaultEventChanTimeout,
376388
Metrics: r.store.metrics.RangeFeedMetrics,
377389
MemBudget: feedBudget,
378390
}

0 commit comments

Comments
 (0)