Skip to content

Commit 9db3a0e

Browse files
craig[bot]erikgrinakertbg
committed
107525: rangefeed: add `BenchmarkRangefeed` r=erikgrinaker a=erikgrinaker Resolves cockroachdb#107440. Epic: none Release note: None 107533: kvserver: improve TestClosedTimestampFrozenAfterSubsumption r=erikgrinaker a=tbg Adopt setupClusterForClosedTSTesting which wraps setupClusterWithDummyRange with goodies such as cockroachdb#107531. Fixes cockroachdb#107179[^1] [^1]: cockroachdb#107179 (comment) Epic: None Release note: None Co-authored-by: Erik Grinaker <[email protected]> Co-authored-by: Tobias Grieger <[email protected]>
3 parents 8c52fea + 11f8799 + 208a7eb commit 9db3a0e

File tree

6 files changed

+232
-73
lines changed

6 files changed

+232
-73
lines changed

pkg/kv/kvserver/client_rangefeed_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ func TestRangefeedIsRoutedToNonVoter(t *testing.T) {
227227
clusterArgs.ReplicationMode = base.ReplicationManual
228228
// NB: setupClusterForClosedTSTesting sets a low closed timestamp target
229229
// duration.
230-
tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, clusterArgs, "cttest", "kv")
230+
tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, clusterArgs, "cttest", "kv")
231231
defer tc.Stopper().Stop(ctx)
232232
tc.AddNonVotersOrFatal(t, desc.StartKey.AsRawKey(), tc.Target(1))
233233

pkg/kv/kvserver/closed_timestamp_test.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func TestClosedTimestampCanServe(t *testing.T) {
8585
// Disable the replicateQueue so that it doesn't interfere with replica
8686
// membership ranges.
8787
clusterArgs.ReplicationMode = base.ReplicationManual
88-
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, clusterArgs, dbName, tableName)
88+
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, clusterArgs, dbName, tableName)
8989
defer tc.Stopper().Stop(ctx)
9090

9191
if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
@@ -156,7 +156,7 @@ func TestClosedTimestampCanServeOnVoterIncoming(t *testing.T) {
156156
clusterArgs.ReplicationMode = base.ReplicationManual
157157
knobs, ltk := makeReplicationTestKnobs()
158158
clusterArgs.ServerArgs.Knobs = knobs
159-
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, clusterArgs, dbName, tableName)
159+
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, clusterArgs, dbName, tableName)
160160
defer tc.Stopper().Stop(ctx)
161161

162162
if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
@@ -192,7 +192,7 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) {
192192
skip.UnderRace(t)
193193

194194
ctx := context.Background()
195-
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
195+
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
196196
defer tc.Stopper().Stop(ctx)
197197
repls := replsForRange(ctx, t, tc, desc)
198198

@@ -270,7 +270,7 @@ func TestClosedTimestampCantServeWithConflictingIntent(t *testing.T) {
270270
defer txnwait.TestingOverrideTxnLivenessThreshold(time.Hour)()
271271

272272
ctx := context.Background()
273-
tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
273+
tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
274274
defer tc.Stopper().Stop(ctx)
275275
repls := replsForRange(ctx, t, tc, desc)
276276
ds := tc.Server(0).DistSenderI().(*kvcoord.DistSender)
@@ -377,7 +377,7 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) {
377377
skip.UnderRace(t)
378378

379379
ctx := context.Background()
380-
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
380+
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
381381
repls := replsForRange(ctx, t, tc, desc)
382382
// Disable the automatic merging.
383383
if _, err := db0.Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false"); err != nil {
@@ -457,7 +457,7 @@ func TestClosedTimestampCantServeBasedOnUncertaintyLimit(t *testing.T) {
457457
ctx := context.Background()
458458
// Set up the target duration to be very long and rely on lease transfers to
459459
// drive MaxClosed.
460-
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
460+
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
461461
defer tc.Stopper().Stop(ctx)
462462
repls := replsForRange(ctx, t, tc, desc)
463463

@@ -490,7 +490,7 @@ func TestClosedTimestampCanServeForWritingTransaction(t *testing.T) {
490490
skip.UnderRace(t)
491491

492492
ctx := context.Background()
493-
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
493+
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
494494
defer tc.Stopper().Stop(ctx)
495495
repls := replsForRange(ctx, t, tc, desc)
496496

@@ -537,7 +537,7 @@ func TestClosedTimestampCantServeForNonTransactionalReadRequest(t *testing.T) {
537537
skip.UnderRace(t)
538538

539539
ctx := context.Background()
540-
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
540+
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
541541
defer tc.Stopper().Stop(ctx)
542542
repls := replsForRange(ctx, t, tc, desc)
543543

@@ -579,7 +579,7 @@ func TestClosedTimestampCantServeForNonTransactionalBatch(t *testing.T) {
579579

580580
testutils.RunTrueAndFalse(t, "tsFromServer", func(t *testing.T, tsFromServer bool) {
581581
ctx := context.Background()
582-
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
582+
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
583583
defer tc.Stopper().Stop(ctx)
584584
repls := replsForRange(ctx, t, tc, desc)
585585

@@ -722,8 +722,7 @@ func TestClosedTimestampFrozenAfterSubsumption(t *testing.T) {
722722
// Set up the closed timestamp timing such that, when we block a merge and
723723
// transfer the RHS lease, the closed timestamp advances over the LHS
724724
// lease but not over the RHS lease.
725-
const numNodes = 3
726-
tc, _ := setupTestClusterWithDummyRange(t, clusterArgs, "cttest" /* dbName */, "kv" /* tableName */, numNodes)
725+
tc, _, _ := setupClusterForClosedTSTesting(ctx, t, 5*time.Second, 100*time.Millisecond, clusterArgs, "cttest", "kv")
727726
defer tc.Stopper().Stop(ctx)
728727
sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))
729728
sqlDB.ExecMultiple(t, strings.Split(fmt.Sprintf(`
@@ -1195,19 +1194,22 @@ func aggressiveResolvedTimestampPushKnobs() *kvserver.StoreTestingKnobs {
11951194
func setupClusterForClosedTSTesting(
11961195
ctx context.Context,
11971196
t *testing.T,
1198-
targetDuration time.Duration,
1197+
targetDuration, sideTransportInterval time.Duration,
11991198
clusterArgs base.TestClusterArgs,
12001199
dbName, tableName string,
12011200
) (tc serverutils.TestClusterInterface, db0 *gosql.DB, kvTableDesc roachpb.RangeDescriptor) {
12021201
const numNodes = 3
1202+
if sideTransportInterval == 0 {
1203+
sideTransportInterval = targetDuration / 4
1204+
}
12031205
tc, desc := setupTestClusterWithDummyRange(t, clusterArgs, dbName, tableName, numNodes)
12041206
sqlRunner := sqlutils.MakeSQLRunner(tc.ServerConn(0))
12051207
sqlRunner.ExecMultiple(t, strings.Split(fmt.Sprintf(`
12061208
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '%s';
12071209
SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '%s';
12081210
SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = true;
12091211
SET CLUSTER SETTING kv.allocator.load_based_rebalancing = 'off';
1210-
`, targetDuration, targetDuration/4),
1212+
`, targetDuration, sideTransportInterval),
12111213
";")...)
12121214

12131215
// Disable replicate queues to avoid errant lease transfers.

pkg/kv/kvserver/rangefeed/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ go_test(
4545
name = "rangefeed_test",
4646
size = "large",
4747
srcs = [
48+
"bench_test.go",
4849
"budget_test.go",
4950
"catchup_scan_bench_test.go",
5051
"catchup_scan_test.go",
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
// Copyright 2023 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.txt.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0, included in the file
9+
// licenses/APL.txt.
10+
11+
package rangefeed
12+
13+
import (
14+
"context"
15+
"encoding/binary"
16+
"fmt"
17+
"math"
18+
"testing"
19+
"time"
20+
21+
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
22+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
23+
"github.com/cockroachdb/cockroach/pkg/roachpb"
24+
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
25+
"github.com/cockroachdb/cockroach/pkg/util/future"
26+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
27+
"github.com/cockroachdb/cockroach/pkg/util/log"
28+
"github.com/cockroachdb/cockroach/pkg/util/stop"
29+
"github.com/cockroachdb/cockroach/pkg/util/uuid"
30+
"github.com/stretchr/testify/require"
31+
)
32+
33+
type benchmarkRangefeedOpts struct {
34+
opType opType
35+
numRegistrations int
36+
budget int64
37+
}
38+
39+
type opType string
40+
41+
const (
42+
writeOpType opType = "write" // individual 1PC writes
43+
commitOpType opType = "commit" // intent + commit writes, 1 per txn
44+
closedTSOpType opType = "closedts"
45+
)
46+
47+
// BenchmarkRangefeed benchmarks the processor and registrations, by submitting
48+
// a set of events and waiting until they are all emitted.
49+
func BenchmarkRangefeed(b *testing.B) {
50+
for _, opType := range []opType{writeOpType, commitOpType, closedTSOpType} {
51+
for _, numRegistrations := range []int{1, 10, 100} {
52+
name := fmt.Sprintf("opType=%s/numRegs=%d", opType, numRegistrations)
53+
b.Run(name, func(b *testing.B) {
54+
runBenchmarkRangefeed(b, benchmarkRangefeedOpts{
55+
opType: opType,
56+
numRegistrations: numRegistrations,
57+
budget: math.MaxInt64,
58+
})
59+
})
60+
}
61+
}
62+
}
63+
64+
// BenchmarkRangefeedBudget benchmarks the effect of enabling/disabling the
65+
// processor budget. It sets up a single processor and registration, and
66+
// processes a set of events.
67+
func BenchmarkRangefeedBudget(b *testing.B) {
68+
for _, budget := range []bool{false, true} {
69+
b.Run(fmt.Sprintf("budget=%t", budget), func(b *testing.B) {
70+
var budgetSize int64
71+
if budget {
72+
budgetSize = math.MaxInt64
73+
}
74+
runBenchmarkRangefeed(b, benchmarkRangefeedOpts{
75+
opType: writeOpType,
76+
numRegistrations: 1,
77+
budget: budgetSize,
78+
})
79+
})
80+
}
81+
}
82+
83+
// runBenchmarkRangefeed runs a rangefeed benchmark, emitting b.N events across
84+
// a rangefeed.
85+
func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) {
86+
defer log.Scope(b).Close(b)
87+
88+
ctx, cancel := context.WithCancel(context.Background())
89+
defer cancel()
90+
stopper := stop.NewStopper()
91+
defer stopper.Stop(ctx)
92+
93+
var budget *FeedBudget
94+
if opts.budget > 0 {
95+
budget = newTestBudget(opts.budget)
96+
}
97+
span := roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")}
98+
99+
// Set up processor.
100+
p := NewProcessor(Config{
101+
AmbientContext: log.MakeTestingAmbientContext(nil),
102+
Clock: hlc.NewClockForTesting(nil),
103+
Metrics: NewMetrics(),
104+
Span: span,
105+
MemBudget: budget,
106+
EventChanCap: b.N,
107+
EventChanTimeout: time.Hour,
108+
})
109+
require.NoError(b, p.Start(stopper, nil))
110+
111+
// Add registrations.
112+
streams := make([]*noopStream, opts.numRegistrations)
113+
futures := make([]*future.ErrorFuture, opts.numRegistrations)
114+
for i := 0; i < opts.numRegistrations; i++ {
115+
// withDiff does not matter for these benchmarks, since the previous value
116+
// is fetched and populated during Raft application.
117+
const withDiff = false
118+
streams[i] = &noopStream{ctx: ctx}
119+
futures[i] = &future.ErrorFuture{}
120+
ok, _ := p.Register(span, hlc.MinTimestamp, nil, withDiff, streams[i], nil, futures[i])
121+
require.True(b, ok)
122+
}
123+
124+
// Construct b.N events beforehand -- we don't want to measure this cost.
125+
var (
126+
logicalOps []enginepb.MVCCLogicalOp
127+
closedTimestamps []hlc.Timestamp
128+
prefix = roachpb.Key("key-")
129+
value = roachpb.MakeValueFromString("a few bytes of data").RawBytes
130+
)
131+
switch opts.opType {
132+
case writeOpType:
133+
logicalOps = make([]enginepb.MVCCLogicalOp, 0, b.N)
134+
for i := 0; i < b.N; i++ {
135+
key := append(prefix, make([]byte, 4)...)
136+
binary.BigEndian.PutUint32(key[len(prefix):], uint32(i))
137+
ts := hlc.Timestamp{WallTime: int64(i + 1)}
138+
logicalOps = append(logicalOps, makeLogicalOp(&enginepb.MVCCWriteValueOp{
139+
Key: key,
140+
Timestamp: ts,
141+
Value: value,
142+
}))
143+
}
144+
145+
case commitOpType:
146+
logicalOps = make([]enginepb.MVCCLogicalOp, 2*b.N)
147+
// Write all intents first, then all commits. Txns are tracked in an
148+
// internal heap, and we want to cover some of this cost, even though we
149+
// write and commit them incrementally.
150+
for i := 0; i < b.N; i++ {
151+
var txnID uuid.UUID
152+
txnID.DeterministicV4(uint64(i), uint64(b.N))
153+
key := append(prefix, make([]byte, 4)...)
154+
binary.BigEndian.PutUint32(key[len(prefix):], uint32(i))
155+
ts := hlc.Timestamp{WallTime: int64(i + 1)}
156+
logicalOps[i] = writeIntentOpWithKey(txnID, key, isolation.Serializable, ts)
157+
logicalOps[b.N+i] = commitIntentOpWithKV(txnID, key, ts, value)
158+
}
159+
160+
case closedTSOpType:
161+
closedTimestamps = make([]hlc.Timestamp, 0, b.N)
162+
for i := 0; i < b.N; i++ {
163+
closedTimestamps = append(closedTimestamps, hlc.Timestamp{WallTime: int64(i + 1)})
164+
}
165+
166+
default:
167+
b.Fatalf("unknown operation type %q", opts.opType)
168+
}
169+
170+
// Wait for catchup scans and flush checkpoint events.
171+
p.syncEventAndRegistrations()
172+
173+
// Run the benchmark. We accounted for b.N when constructing events.
174+
b.ResetTimer()
175+
176+
for _, logicalOp := range logicalOps {
177+
if !p.ConsumeLogicalOps(ctx, logicalOp) {
178+
b.Fatal("failed to submit logical operation")
179+
}
180+
}
181+
for _, closedTS := range closedTimestamps {
182+
if !p.ForwardClosedTS(ctx, closedTS) {
183+
b.Fatal("failed to forward closed timestamp")
184+
}
185+
}
186+
p.syncEventAndRegistrations()
187+
188+
// Check that all registrations ended successfully, and emitted the expected
189+
// number of events.
190+
b.StopTimer()
191+
p.Stop()
192+
193+
for i, f := range futures {
194+
regErr, err := future.Wait(ctx, f)
195+
require.NoError(b, err)
196+
require.NoError(b, regErr)
197+
require.Equal(b, b.N, streams[i].events-1) // ignore checkpoint after catchup
198+
}
199+
}
200+
201+
// noopStream is a stream that does nothing, except count events.
202+
type noopStream struct {
203+
ctx context.Context
204+
events int
205+
}
206+
207+
func (s *noopStream) Context() context.Context {
208+
return s.ctx
209+
}
210+
211+
func (s *noopStream) Send(*kvpb.RangeFeedEvent) error {
212+
s.events++
213+
return nil
214+
}

0 commit comments

Comments
 (0)