@@ -28,6 +28,7 @@ import (
2828 "github.com/cockroachdb/cockroach/pkg/storage"
2929 "github.com/cockroachdb/cockroach/pkg/storage/enginepb"
3030 "github.com/cockroachdb/cockroach/pkg/util/admission"
31+ "github.com/cockroachdb/cockroach/pkg/util/envutil"
3132 "github.com/cockroachdb/cockroach/pkg/util/future"
3233 "github.com/cockroachdb/cockroach/pkg/util/hlc"
3334 "github.com/cockroachdb/cockroach/pkg/util/log"
@@ -73,6 +74,27 @@ var RangeFeedSmearInterval = settings.RegisterDurationSetting(
7374 settings .NonNegativeDuration ,
7475)
7576
77+ // defaultEventChanCap is the channel capacity of the rangefeed processor and
78+ // each registration.
79+ //
80+ // The size of an event is 72 bytes, so this will result in an allocation on the
81+ // order of ~300KB per RangeFeed. That's probably ok given the number of ranges
82+ // on a node that we'd like to support with active rangefeeds, but it's
83+ // certainly on the upper end of the range.
84+ //
85+ // TODO(dan): Everyone seems to agree that this memory limit would be better set
86+ // at a store-wide level, but there doesn't seem to be an easy way to accomplish
87+ // that.
88+ const defaultEventChanCap = 4096
89+
90+ // defaultEventChanTimeout is the send timeout for events published to a
91+ // rangefeed processor or rangefeed client channels. When exceeded, the
92+ // rangefeed or client is disconnected to prevent blocking foreground traffic
93+ // for longer than this timeout. When set to 0, clients are never disconnected,
94+ // and slow consumers will backpressure writers up through Raft.
95+ var defaultEventChanTimeout = envutil .EnvOrDefaultDuration (
96+ "COCKROACH_RANGEFEED_SEND_TIMEOUT" , 50 * time .Millisecond )
97+
7698// lockedRangefeedStream is an implementation of rangefeed.Stream which provides
7799// support for concurrent calls to Send. Note that the default implementation of
78100// grpc.Stream is not safe for concurrent calls to Send.
@@ -293,16 +315,6 @@ func (r *Replica) updateRangefeedFilterLocked() bool {
293315 return false
294316}
295317
296- // The size of an event is 72 bytes, so this will result in an allocation on
297- // the order of ~300KB per RangeFeed. That's probably ok given the number of
298- // ranges on a node that we'd like to support with active rangefeeds, but it's
299- // certainly on the upper end of the range.
300- //
301- // TODO(dan): Everyone seems to agree that this memory limit would be better set
302- // at a store-wide level, but there doesn't seem to be an easy way to accomplish
303- // that.
304- const defaultEventChanCap = 4096
305-
306318// Rangefeed registration takes place under the raftMu, so log if we ever hold
307319// the mutex for too long, as this could affect foreground traffic.
308320//
@@ -372,7 +384,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
372384 PushTxnsInterval : r .store .TestingKnobs ().RangeFeedPushTxnsInterval ,
373385 PushTxnsAge : r .store .TestingKnobs ().RangeFeedPushTxnsAge ,
374386 EventChanCap : defaultEventChanCap ,
375- EventChanTimeout : 50 * time . Millisecond ,
387+ EventChanTimeout : defaultEventChanTimeout ,
376388 Metrics : r .store .metrics .RangeFeedMetrics ,
377389 MemBudget : feedBudget ,
378390 }
0 commit comments