Skip to content

Commit fc9e850

Browse files
committed
rangefeed: remove Config.CheckStreamsInterval
This parameter is unused. Epic: none Release note: None
1 parent d293bb3 commit fc9e850

File tree

2 files changed

+59
-76
lines changed

2 files changed

+59
-76
lines changed

pkg/kv/kvserver/rangefeed/processor.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@ const (
3434
// defaultPushTxnsAge is the default age at which a Processor will begin to
3535
// consider a transaction old enough to push.
3636
defaultPushTxnsAge = 10 * time.Second
37-
// defaultCheckStreamsInterval is the default interval at which a Processor
38-
// will check all streams to make sure they have not been canceled.
39-
defaultCheckStreamsInterval = 1 * time.Second
4037
)
4138

4239
// newErrBufferCapacityExceeded creates an error that is returned to subscribers
@@ -72,10 +69,6 @@ type Config struct {
7269
// shutting down the Processor. 0 for no timeout.
7370
EventChanTimeout time.Duration
7471

75-
// CheckStreamsInterval specifies interval at which a Processor will check
76-
// all streams to make sure they have not been canceled.
77-
CheckStreamsInterval time.Duration
78-
7972
// Metrics is for production monitoring of RangeFeeds.
8073
Metrics *Metrics
8174

@@ -101,9 +94,6 @@ func (sc *Config) SetDefaults() {
10194
sc.PushTxnsAge = defaultPushTxnsAge
10295
}
10396
}
104-
if sc.CheckStreamsInterval == 0 {
105-
sc.CheckStreamsInterval = defaultCheckStreamsInterval
106-
}
10797
}
10898

10999
// Processor manages a set of rangefeed registrations and handles the routing of

pkg/kv/kvserver/rangefeed/processor_test.go

Lines changed: 59 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -155,15 +155,14 @@ func newTestProcessorWithTxnPusher(
155155
pushTxnAge = 50 * time.Millisecond
156156
}
157157
p := NewProcessor(Config{
158-
AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(),
159-
Clock: hlc.NewClockForTesting(nil),
160-
Span: roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
161-
TxnPusher: txnPusher,
162-
PushTxnsInterval: pushTxnInterval,
163-
PushTxnsAge: pushTxnAge,
164-
EventChanCap: testProcessorEventCCap,
165-
CheckStreamsInterval: 10 * time.Millisecond,
166-
Metrics: NewMetrics(),
158+
AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(),
159+
Clock: hlc.NewClockForTesting(nil),
160+
Span: roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
161+
TxnPusher: txnPusher,
162+
PushTxnsInterval: pushTxnInterval,
163+
PushTxnsAge: pushTxnAge,
164+
EventChanCap: testProcessorEventCCap,
165+
Metrics: NewMetrics(),
167166
})
168167
require.NoError(t, p.Start(stopper, makeIntentScannerConstructor(rtsIter)))
169168
return p, stopper
@@ -566,16 +565,15 @@ func TestProcessorMemoryBudgetExceeded(t *testing.T) {
566565
stopper := stop.NewStopper()
567566
var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable
568567
p := NewProcessor(Config{
569-
AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(),
570-
Clock: hlc.NewClockForTesting(nil),
571-
Span: roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
572-
PushTxnsInterval: pushTxnInterval,
573-
PushTxnsAge: pushTxnAge,
574-
EventChanCap: testProcessorEventCCap,
575-
CheckStreamsInterval: 10 * time.Millisecond,
576-
Metrics: NewMetrics(),
577-
MemBudget: fb,
578-
EventChanTimeout: time.Millisecond,
568+
AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(),
569+
Clock: hlc.NewClockForTesting(nil),
570+
Span: roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
571+
PushTxnsInterval: pushTxnInterval,
572+
PushTxnsAge: pushTxnAge,
573+
EventChanCap: testProcessorEventCCap,
574+
Metrics: NewMetrics(),
575+
MemBudget: fb,
576+
EventChanTimeout: time.Millisecond,
579577
})
580578
require.NoError(t, p.Start(stopper, nil))
581579
ctx := context.Background()
@@ -635,16 +633,15 @@ func TestProcessorMemoryBudgetReleased(t *testing.T) {
635633
stopper := stop.NewStopper()
636634
var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable
637635
p := NewProcessor(Config{
638-
AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(),
639-
Clock: hlc.NewClockForTesting(nil),
640-
Span: roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
641-
PushTxnsInterval: pushTxnInterval,
642-
PushTxnsAge: pushTxnAge,
643-
EventChanCap: testProcessorEventCCap,
644-
CheckStreamsInterval: 10 * time.Millisecond,
645-
Metrics: NewMetrics(),
646-
MemBudget: fb,
647-
EventChanTimeout: 15 * time.Minute, // Enable timeout to allow consumer to process
636+
AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(),
637+
Clock: hlc.NewClockForTesting(nil),
638+
Span: roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
639+
PushTxnsInterval: pushTxnInterval,
640+
PushTxnsAge: pushTxnAge,
641+
EventChanCap: testProcessorEventCCap,
642+
Metrics: NewMetrics(),
643+
MemBudget: fb,
644+
EventChanTimeout: 15 * time.Minute, // Enable timeout to allow consumer to process
648645
// events even if we reach memory budget capacity.
649646
})
650647
require.NoError(t, p.Start(stopper, nil))
@@ -1125,15 +1122,14 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) {
11251122
stopper := stop.NewStopper()
11261123
var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable
11271124
p := NewProcessor(Config{
1128-
AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(),
1129-
Clock: hlc.NewClockForTesting(nil),
1130-
Span: roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
1131-
PushTxnsInterval: pushTxnInterval,
1132-
PushTxnsAge: pushTxnAge,
1133-
EventChanCap: channelCapacity,
1134-
CheckStreamsInterval: 10 * time.Millisecond,
1135-
MemBudget: fb,
1136-
Metrics: NewMetrics(),
1125+
AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(),
1126+
Clock: hlc.NewClockForTesting(nil),
1127+
Span: roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
1128+
PushTxnsInterval: pushTxnInterval,
1129+
PushTxnsAge: pushTxnAge,
1130+
EventChanCap: channelCapacity,
1131+
MemBudget: fb,
1132+
Metrics: NewMetrics(),
11371133
})
11381134
require.NoError(t, p.Start(stopper, nil))
11391135
ctx := context.Background()
@@ -1216,15 +1212,14 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) {
12161212
stopper := stop.NewStopper()
12171213
var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable
12181214
p := NewProcessor(Config{
1219-
AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(),
1220-
Clock: hlc.NewClockForTesting(nil),
1221-
Span: roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
1222-
PushTxnsInterval: pushTxnInterval,
1223-
PushTxnsAge: pushTxnAge,
1224-
EventChanCap: channelCapacity,
1225-
CheckStreamsInterval: 10 * time.Millisecond,
1226-
MemBudget: fb,
1227-
Metrics: NewMetrics(),
1215+
AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(),
1216+
Clock: hlc.NewClockForTesting(nil),
1217+
Span: roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
1218+
PushTxnsInterval: pushTxnInterval,
1219+
PushTxnsAge: pushTxnAge,
1220+
EventChanCap: channelCapacity,
1221+
MemBudget: fb,
1222+
Metrics: NewMetrics(),
12281223
})
12291224
require.NoError(t, p.Start(stopper, nil))
12301225
ctx := context.Background()
@@ -1296,15 +1291,14 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) {
12961291
stopper := stop.NewStopper()
12971292
var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable
12981293
p := NewProcessor(Config{
1299-
AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(),
1300-
Clock: hlc.NewClockForTesting(nil),
1301-
Span: roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
1302-
PushTxnsInterval: pushTxnInterval,
1303-
PushTxnsAge: pushTxnAge,
1304-
EventChanCap: channelCapacity,
1305-
CheckStreamsInterval: 10 * time.Millisecond,
1306-
MemBudget: fb,
1307-
Metrics: NewMetrics(),
1294+
AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(),
1295+
Clock: hlc.NewClockForTesting(nil),
1296+
Span: roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
1297+
PushTxnsInterval: pushTxnInterval,
1298+
PushTxnsAge: pushTxnAge,
1299+
EventChanCap: channelCapacity,
1300+
MemBudget: fb,
1301+
Metrics: NewMetrics(),
13081302
})
13091303
require.NoError(t, p.Start(stopper, nil))
13101304
ctx := context.Background()
@@ -1468,16 +1462,15 @@ func BenchmarkProcessorWithBudget(b *testing.B) {
14681462
stopper := stop.NewStopper()
14691463
var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable
14701464
p := NewProcessor(Config{
1471-
AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(),
1472-
Clock: hlc.NewClockForTesting(nil),
1473-
Span: roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
1474-
PushTxnsInterval: pushTxnInterval,
1475-
PushTxnsAge: pushTxnAge,
1476-
EventChanCap: benchmarkEvents * b.N,
1477-
CheckStreamsInterval: 10 * time.Millisecond,
1478-
Metrics: NewMetrics(),
1479-
MemBudget: budget,
1480-
EventChanTimeout: time.Minute,
1465+
AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(),
1466+
Clock: hlc.NewClockForTesting(nil),
1467+
Span: roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
1468+
PushTxnsInterval: pushTxnInterval,
1469+
PushTxnsAge: pushTxnAge,
1470+
EventChanCap: benchmarkEvents * b.N,
1471+
Metrics: NewMetrics(),
1472+
MemBudget: budget,
1473+
EventChanTimeout: time.Minute,
14811474
})
14821475
require.NoError(b, p.Start(stopper, nil))
14831476
ctx := context.Background()

0 commit comments

Comments
 (0)