Skip to content

Commit 2067865

Browse files
craig[bot]tbgwenyihu6
committed
Merge #149582
149582: asim: port first batch of testing framework changes r=tbg a=tbg This ports a few improvements from #149531 to master: - ability to combine multiple load generators - improved range creation (placement_type option, min/max key, ...) - ability to disable split/lease/replicate queue - plots show initial and final values I also cleaned up the args handling to make sure that we don't silently skip args not supported by the test harness. This was helpful to find what still needed porting. A first datadriven test file was moved over; the CPU/write tracking is still unsupported, so I commented it out with TODOs to re-enable once possible. Unfortunately, introducing this will be a bigger yak shave, since this data is communicated through the `roachpb` protos, and we may not want to change them without also first adding the code that populates these fields correctly in production builds. Epic: CRDB-25222 Co-authored-by: Tobias Grieger <[email protected]> Co-authored-by: wenyihu6 <[email protected]>
2 parents 7a762aa + fda001d commit 2067865

33 files changed

+876
-384
lines changed

pkg/kv/kvserver/asim/config/settings.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,12 @@ type SimulationSettings struct {
110110
// rebalancer would care to reconcile (via lease or replica rebalancing) between
111111
// any two stores.
112112
LBMinRequiredQPSDiff float64
113+
// ReplicateQueueEnabled controls whether the replicate queue is enabled.
114+
ReplicateQueueEnabled bool
115+
// LeaseQueueEnabled controls whether the lease queue is enabled.
116+
LeaseQueueEnabled bool
117+
// SplitQueueEnabled controls whether the split queue is enabled.
118+
SplitQueueEnabled bool
113119
}
114120

115121
// DefaultSimulationSettings returns a set of default settings for simulation.
@@ -136,6 +142,9 @@ func DefaultSimulationSettings() *SimulationSettings {
136142
LBRebalancingInterval: defaultLBRebalancingInterval,
137143
LBRebalanceQPSThreshold: defaultLBRebalanceQPSThreshold,
138144
LBMinRequiredQPSDiff: defaultLBMinRequiredQPSDiff,
145+
ReplicateQueueEnabled: true,
146+
LeaseQueueEnabled: true,
147+
SplitQueueEnabled: true,
139148
}
140149
}
141150

pkg/kv/kvserver/asim/gen/generator.go

Lines changed: 98 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,28 @@ func (ss StaticSettings) Generate(seed int64) config.SimulationSettings {
9393
return ret
9494
}
9595

96+
type MultiLoad []BasicLoad
97+
98+
// MultiLoad implements the LoadGen interface. It is a collection of
99+
// BasicLoad.
100+
var _ LoadGen = MultiLoad{}
101+
102+
func (ml MultiLoad) String() string {
103+
var str string
104+
for _, load := range ml {
105+
str += fmt.Sprintf("%s\n", load.String())
106+
}
107+
return str
108+
}
109+
110+
func (ml MultiLoad) Generate(seed int64, settings *config.SimulationSettings) []workload.Generator {
111+
var generators []workload.Generator
112+
for _, load := range ml {
113+
generators = append(generators, load.Generate(seed, settings)...)
114+
}
115+
return generators
116+
}
117+
96118
// BasicLoad implements the LoadGen interface.
97119
type BasicLoad struct {
98120
RWRatio float64
@@ -103,6 +125,8 @@ type BasicLoad struct {
103125
MinKey, MaxKey int64
104126
}
105127

128+
var _ LoadGen = BasicLoad{}
129+
106130
func (bl BasicLoad) String() string {
107131
return fmt.Sprintf(
108132
"basic load with rw_ratio=%0.2f, rate=%0.2f, skewed_access=%t, min_block_size=%d, max_block_size=%d, "+
@@ -212,6 +236,8 @@ const (
212236
Skewed
213237
Random
214238
WeightedRandom
239+
Weighted
240+
ReplicaPlacement
215241
)
216242

217243
func (p PlacementType) String() string {
@@ -224,6 +250,10 @@ func (p PlacementType) String() string {
224250
return "random"
225251
case WeightedRandom:
226252
return "weighted_rand"
253+
case Weighted:
254+
return "weighted"
255+
case ReplicaPlacement:
256+
return "replica_placement"
227257
default:
228258
panic("unknown placement type")
229259
}
@@ -239,6 +269,10 @@ func GetRangePlacementType(s string) PlacementType {
239269
return Random
240270
case "weighted_rand":
241271
return WeightedRandom
272+
case "weighted":
273+
return Weighted
274+
case "replica_placement":
275+
return ReplicaPlacement
242276
default:
243277
panic(fmt.Sprintf("unknown placement type %s", s))
244278
}
@@ -251,13 +285,14 @@ func GetRangePlacementType(s string) PlacementType {
251285
// WeightedRandomizedBasicRanges.
252286
type BaseRanges struct {
253287
Ranges int
254-
KeySpace int
288+
MinKey, MaxKey int64
255289
ReplicationFactor int
256290
Bytes int64
291+
ReplicaPlacement state.ReplicaPlacement
257292
}
258293

259294
func (b BaseRanges) String() string {
260-
return fmt.Sprintf("ranges=%d, key_space=%d, replication_factor=%d, bytes=%d", b.Ranges, b.KeySpace, b.ReplicationFactor, b.Bytes)
295+
return fmt.Sprintf("ranges=%d, min_key=%d, max_key=%d, replication_factor=%d, bytes=%d", b.Ranges, b.MinKey, b.MaxKey, b.ReplicationFactor, b.Bytes)
261296
}
262297

263298
// GetRangesInfo generates and distributes ranges across stores based on
@@ -267,13 +302,25 @@ func (b BaseRanges) GetRangesInfo(
267302
) state.RangesInfo {
268303
switch pType {
269304
case Even:
270-
return state.RangesInfoEvenDistribution(numOfStores, b.Ranges, b.KeySpace, b.ReplicationFactor, b.Bytes)
305+
return state.RangesInfoEvenDistribution(numOfStores, b.Ranges, b.MinKey, b.MaxKey, b.ReplicationFactor, b.Bytes)
271306
case Skewed:
272-
return state.RangesInfoSkewedDistribution(numOfStores, b.Ranges, b.KeySpace, b.ReplicationFactor, b.Bytes)
307+
return state.RangesInfoSkewedDistribution(numOfStores, b.Ranges, b.MinKey, b.MaxKey, b.ReplicationFactor, b.Bytes)
273308
case Random:
274-
return state.RangesInfoRandDistribution(randSource, numOfStores, b.Ranges, b.KeySpace, b.ReplicationFactor, b.Bytes)
309+
return state.RangesInfoRandDistribution(randSource, numOfStores, b.Ranges, b.MinKey, b.MaxKey, b.ReplicationFactor, b.Bytes)
275310
case WeightedRandom:
276-
return state.RangesInfoWeightedRandDistribution(randSource, weightedRandom, b.Ranges, b.KeySpace, b.ReplicationFactor, b.Bytes)
311+
return state.RangesInfoWeightedRandDistribution(
312+
randSource, weightedRandom, b.Ranges, b.MinKey, b.MaxKey, b.ReplicationFactor, b.Bytes)
313+
case ReplicaPlacement:
314+
// TODO(tbg): port this over from the prototype.
315+
/*
316+
return state.RangesInfoWithReplicaPlacement(
317+
b.ReplicaPlacement,
318+
b.Ranges,
319+
state.DefaultSpanConfigWithRF(b.ReplicationFactor),
320+
b.MinKey, b.MaxKey, b.Bytes,
321+
)
322+
*/
323+
panic("unimplemented")
277324
default:
278325
panic(fmt.Sprintf("unexpected range placement type %v", pType))
279326
}
@@ -292,6 +339,9 @@ func (b BaseRanges) LoadRangeInfo(s state.State, rangesInfo state.RangesInfo) {
292339
type BasicRanges struct {
293340
BaseRanges
294341
PlacementType PlacementType
342+
// ReplicaWeights and LeaseWeights are only non-nil when the placement type
343+
// is Weighted.
344+
ReplicaWeights, LeaseWeights []float64
295345
}
296346

297347
func (br BasicRanges) String() string {
@@ -311,3 +361,45 @@ func (br BasicRanges) Generate(
311361
br.LoadRangeInfo(s, rangesInfo)
312362
return s
313363
}
364+
365+
// MultiRanges implements the RangeGen interface, supporting multiple
366+
// BasicRanges generation.
367+
type MultiRanges []BasicRanges
368+
369+
var _ RangeGen = MultiRanges{}
370+
371+
func (mr MultiRanges) String() string {
372+
var str string
373+
for _, ranges := range mr {
374+
str += fmt.Sprintf("%s\n", ranges.String())
375+
}
376+
return str
377+
}
378+
379+
func (mr MultiRanges) Generate(
380+
seed int64, settings *config.SimulationSettings, s state.State,
381+
) state.State {
382+
var rangeInfos []state.RangeInfo
383+
for _, ranges := range mr {
384+
var nextInfos state.RangesInfo
385+
if ranges.PlacementType == Weighted {
386+
// TODO(tbg): instead refactoring GetRangesInfo to be more general.
387+
var storeIDs []state.StoreID
388+
for _, store := range s.Stores() {
389+
storeIDs = append(storeIDs, store.StoreID())
390+
}
391+
nextInfos = state.RangesInfoWithDistribution(storeIDs,
392+
ranges.ReplicaWeights, ranges.LeaseWeights,
393+
ranges.Ranges, state.DefaultSpanConfigWithRF(ranges.ReplicationFactor),
394+
ranges.MinKey, ranges.MaxKey, ranges.Bytes)
395+
} else {
396+
if ranges.LeaseWeights != nil || ranges.ReplicaWeights != nil {
397+
panic("leaseWeights and replicaWeights should be nil for non-weighted placement types")
398+
}
399+
nextInfos = ranges.GetRangesInfo(ranges.PlacementType, len(s.Stores()), nil, []float64{})
400+
}
401+
rangeInfos = append(rangeInfos, nextInfos...)
402+
}
403+
state.LoadRangeInfo(s, rangeInfos...)
404+
return s
405+
}

pkg/kv/kvserver/asim/history/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,6 @@ go_library(
88
deps = [
99
"//pkg/kv/kvserver/asim/metrics",
1010
"//pkg/kv/kvserver/asim/state",
11+
"@com_github_montanaflynn_stats//:stats",
1112
],
1213
)

pkg/kv/kvserver/asim/history/history.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,22 @@ package history
77

88
import (
99
"context"
10+
"fmt"
11+
"strings"
1012

1113
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/metrics"
1214
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state"
15+
"github.com/montanaflynn/stats"
1316
)
1417

1518
// History contains recorded information that summarizes a simulation run.
1619
// Currently it only contains the store metrics of the run.
1720
// TODO(kvoli): Add a range log like structure to the history.
1821
type History struct {
22+
// Recorded contains per-store metrics snapshots at each tick. The outer slice
23+
// grows over time at each tick, while each inner slice has one StoreMetrics per
24+
// store. E.g. Recorded[0] is the first tick, Recorded[0][0] is the first store's
25+
// metrics at the first tick.
1926
Recorded [][]metrics.StoreMetrics
2027
S state.State
2128
}
@@ -24,3 +31,33 @@ type History struct {
2431
func (h *History) Listen(ctx context.Context, sms []metrics.StoreMetrics) {
2532
h.Recorded = append(h.Recorded, sms)
2633
}
34+
35+
func (h *History) ShowRecordedValueAt(idx int, stat string) string {
36+
var buf strings.Builder
37+
38+
storeMetricsAtTick := h.Recorded[idx]
39+
values := make([]float64, 0, len(storeMetricsAtTick))
40+
41+
_, _ = fmt.Fprintf(&buf, "[")
42+
43+
// Extract values for each store. Note that h.Recorded[idx] is already sorted
44+
// by store ID when appending to h.Recorded.
45+
for i, sm := range storeMetricsAtTick {
46+
if i > 0 {
47+
_, _ = fmt.Fprintf(&buf, ", ")
48+
}
49+
value := sm.GetMetricValue(stat)
50+
if stat == "disk_fraction_used" {
51+
_, _ = fmt.Fprintf(&buf, "s%v=%.2f", sm.StoreID, value)
52+
} else {
53+
_, _ = fmt.Fprintf(&buf, "s%v=%.0f", sm.StoreID, value)
54+
}
55+
values = append(values, value)
56+
}
57+
_, _ = fmt.Fprintf(&buf, "]")
58+
stddev, _ := stats.StandardDeviation(values)
59+
mean, _ := stats.Mean(values)
60+
sum, _ := stats.Sum(values)
61+
_, _ = fmt.Fprintf(&buf, " (stddev=%.2f, mean=%.2f, sum=%.0f)", stddev, mean, sum)
62+
return buf.String()
63+
}

pkg/kv/kvserver/asim/metrics/tracker.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,44 @@ type StoreMetrics struct {
4040
DiskFractionUsed float64
4141
}
4242

43+
// GetMetricValue extracts the requested metric value from StoreMetrics.
44+
func (sm *StoreMetrics) GetMetricValue(stat string) float64 {
45+
switch stat {
46+
case "qps":
47+
return float64(sm.QPS)
48+
// case "cpu":
49+
// value = float64(sm.CPU)
50+
// case "write_bytes_per_second":
51+
// value = float64(sm.WriteBytesPerSecond)
52+
case "write":
53+
return float64(sm.WriteKeys)
54+
case "write_b":
55+
return float64(sm.WriteBytes)
56+
case "read":
57+
return float64(sm.ReadKeys)
58+
case "read_b":
59+
return float64(sm.ReadBytes)
60+
case "replicas":
61+
return float64(sm.Replicas)
62+
case "leases":
63+
return float64(sm.Leases)
64+
case "lease_moves":
65+
return float64(sm.LeaseTransfers)
66+
case "replica_moves":
67+
return float64(sm.Rebalances)
68+
case "replica_b_rcvd":
69+
return float64(sm.RebalanceRcvdBytes)
70+
case "replica_b_sent":
71+
return float64(sm.RebalanceSentBytes)
72+
case "range_splits":
73+
return float64(sm.RangeSplits)
74+
case "disk_fraction_used":
75+
return sm.DiskFractionUsed
76+
default:
77+
return 0
78+
}
79+
}
80+
4381
// the MetricsTracker to report new store metrics for a tick.
4482
type StoreMetricsListener interface {
4583
Listen(context.Context, []StoreMetrics)

pkg/kv/kvserver/asim/queue/lease_queue.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ func NewLeaseQueue(
5959
// meets the criteria it is enqueued. The criteria is currently if the
6060
// allocator returns a lease transfer.
6161
func (lq *leaseQueue) MaybeAdd(ctx context.Context, replica state.Replica, s state.State) bool {
62+
if !lq.settings.LeaseQueueEnabled {
63+
// Nothing to do, disabled.
64+
return false
65+
}
66+
6267
repl := NewSimulatorReplica(replica, s)
6368
lq.AddLogTag("r", repl.repl.Descriptor())
6469
lq.AnnotateCtx(ctx)

pkg/kv/kvserver/asim/queue/replicate_queue.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ func NewReplicateQueue(
5858
// meets the criteria it is enqueued. The criteria is currently if the
5959
// allocator returns a non-noop, then the replica is added.
6060
func (rq *replicateQueue) MaybeAdd(ctx context.Context, replica state.Replica, s state.State) bool {
61+
if !rq.settings.ReplicateQueueEnabled {
62+
// Nothing to do, disabled.
63+
return false
64+
}
65+
6166
repl := NewSimulatorReplica(replica, s)
6267
rq.AddLogTag("r", repl.repl.Descriptor())
6368
rq.AnnotateCtx(ctx)

pkg/kv/kvserver/asim/queue/split_queue.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ func NewSplitQueue(
4242
// MaybeAdd proposes a range for being split. If it meets the criteria it is
4343
// enqueued.
4444
func (sq *splitQueue) MaybeAdd(ctx context.Context, replica state.Replica, state state.State) bool {
45+
if !sq.settings.SplitQueueEnabled {
46+
// Nothing to do, disabled.
47+
return false
48+
}
49+
4550
priority := sq.shouldSplit(sq.lastTick, replica.Range(), state)
4651
if priority < 1 {
4752
return false

pkg/kv/kvserver/asim/state/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ go_library(
1010
"liveness.go",
1111
"load.go",
1212
"new_state.go",
13+
"new_state_test_helper.go",
14+
"parser_replica_placement.go",
1315
"split_decider.go",
1416
"state.go",
1517
"state_listener.go",

0 commit comments

Comments
 (0)