Skip to content

Commit e2c5563

Browse files
craig[bot]elizaMkraulemw5hwenyihu6rafiss
committed
148753: changefeedccl: gate Kafka v2 message too large error detail behind cluster setting r=asg0451 a=elizaMkraule A recent change added detailed logging for Kafka v2 changefeed messages that exceed the broker's size limit. These logs now include the message key, size, and MVCC timestamp to aid in debugging. To make this safe for backporting, the behavior is now gated behind the cluster setting: changefeed.kafka_v2_include_error_details In the main branch, this setting defaults to true to preserve the enhanced observability. In release branch backports, it will default to false. When enabled, the log will include: - The key of the offending message - Combined key + value size - MVCC timestamp When disabled, the log reverts to the previous, minimal format. Related to: Jira issue: [CRDB-49646](https://cockroachlabs.atlassian.net/browse/CRDB-49646) See also #144994 Release note (general change): Kafka v2 changefeed sinks now support a cluster setting that enables detailed error logging for messages exceeding Kafka v2 size limit. 149538: sql: don't throw errors for skipped auto stats jobs r=mw5h a=mw5h Previously, auto stats jobs would throw errors and increase failed jobs counters if they attempted to start while a stats collection was already in progress on the table. For large clusters with 'sql.stats.automatic_job_check_before_creating_job.enabled' set to true, this could create quite a few failed jobs. These failed jobs don't seem to cause any performance issues, but they clutter logs, potentially obscuring real problems and alarming customers, who then file tickets with support to figure out why their jobs are failing. This patch: * refactors the autostats checks to reduce code duplication. * swallows the error for concurrent auto stats creation, logging at INFO level instead. * changes the create stats jobs test so that it no longer expects these jobs creations to fail and instead expects the stats to not be collected. * fixes a bug in the create stats jobs test that would cause it to hang instead of exiting on error. * adds a cluster setting, sql.stats.error_on_concurrent_create_stats.enabled, which controls this new behavior. By default the old behavior is maintained. Fixes: #148413 Release note (ops change): CockroachDB now has a cluster setting, sql.stats.error_on_concurrent_create_stats.enabled, which modifies how it reacts to concurrent auto stats jobs. The default, true, maintains the previous behavior. Setting this to false will cause the concurrent auto stats job to be skipped with just a log entry and no increased error counters. 149699: asim: update range usage info and store capacity r=tbg a=wenyihu6 **asim: remove unused range usage info** This commit removes the unused field RangeUsageInfo from TransferLeaseOp. Epic: none Release note: none --- **asim: support request_cpu_per_access and raft_cpu_per_write** This commit adds support for the request_cpu_per_access and raft_cpu_per_write options in the gen_load command. It only adds the options to the data-driven framework and workload generator. But no real changes have been made to LoadEvent, and they currently have no effect on range usage or apply load. Future commits will implement the actual impact. Epic: none Release note: none --- **asim: add impact from request_cpu_per_access and raft_cpu_per_write** Previously, request_cpu_per_access and raft_cpu_per_write were added to the workload generator in data driven tests, but they had no actual effect to the cluster yet. This commit makes them take effect by applying the impact from LoadEvent including CPUPerSecond in store capacity and recording range load stats. Epic: none Release note: none --- **asim: add store capacity cpu stats to storemetrics** Previously, store capacity cpu was populated. This commit adds the corresponding stats to the StoreMetrics. Epic: none Release note: none --- **asim: removes redundant size assignment** This commit removes an redundant size assignment for rangeInfo in LoadRangeInfo, since the caller already populates rangesInfo. Epic: none Release note: none --- **asim: add a comment for RangeUsageInfo.WritesPerSecond** This commit adds a comment clarifying nuances with WritesPerSecond in RangeUsageInfo. It is actually the sum of writes rather than the rate. It's currently unused outside of two unit tests TestWorkloadApply and TestCapacityOverride. They both abuse this field by verifying that the writes reach the replicas and the sum of writes is expected. Since it is tricky to assert on the exact per rate stat, we currently leave it as is. But we should fix this later. Epic: none Release note: none --- **asim: account for follower replica load** Previously, asim only accounted for load on the leaseholder, ignoring non-leaseholder replicas. This commit updates it to consider all replicas for RangeUsageInfo and store capacity aggregation. RangeUsageInfo handles leaseholder checks and clears request CPU and QPS stats for non-leaseholder replicas. Epic: none Release note: none --- **asim: add range info String method** This commit adds a RangeIndo string method. Epic: none Release note: none --- **asim: add write bytes per sec to capacity & range usage** Previously, we added WriteBytesPerSecond to roachpb.StoreCapacity. This commit plumbs it through store capacity aggregation, range load usage, and StoreMetrics. MMA will later use to track write bandwidth usage across stores. Epic: none Release note: none --- **asim: support node_cpu_rate_capacity with gen_cluster** Previously, we added roachpb.NodeCapacity for MMA to compute resource utilization across stores. This commit integrates it into the asim setup, enabling gen_cluster to use the node_cpu_rate_capacity option. Note that no functions currently access node capacity; future MMA integration commits will utilize it. Epic: none Release note: none --- **asim: add comments for datadriven** This commit updates comments for a few options we added recently. Epic: none Release note: none 149717: logictest: add back assertion that was rewritten accidentally r=rafiss a=rafiss ee263e2 rewrote this test so that it expects no spanconfig. This was likely a mistake caused by rewriting before retrying for long enough. This patch adds back the assertion, and adds another one that should prevent accidental rewrites. fixes #148603 Release note: None Co-authored-by: Eliza Kraule <[email protected]> Co-authored-by: Matt White <[email protected]> Co-authored-by: wenyihu6 <[email protected]> Co-authored-by: Rafi Shamim <[email protected]>
5 parents 8fefbf9 + a7dbc81 + d1dfc80 + 7d7bfda + 6318681 commit e2c5563

34 files changed

+714
-372
lines changed

docs/generated/settings/settings-for-tenants.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consu
1818
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled application
1919
changefeed.fast_gzip.enabled boolean true use fast gzip implementation application
2020
changefeed.span_checkpoint.lag_threshold (alias: changefeed.frontier_highwater_lag_checkpoint_threshold) duration 10m0s the amount of time a changefeed's lagging (slowest) spans must lag behind its leading (fastest) spans before a span-level checkpoint to save leading span progress is written; if 0, span-level checkpoints due to lagging spans is disabled application
21+
changefeed.kafka_v2_error_details.enabled boolean true if enabled, Kafka v2 sinks will include the message key, size, and MVCC timestamp in message too large errors application
2122
changefeed.memory.per_changefeed_limit byte size 512 MiB controls amount of data that can be buffered per changefeed application
2223
changefeed.resolved_timestamp.min_update_interval (alias: changefeed.min_highwater_advance) duration 0s minimum amount of time that must have elapsed since the last time a changefeed's resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress application
2324
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds application
@@ -355,6 +356,7 @@ sql.stats.automatic_partial_collection.fraction_stale_rows float 0.05 target fra
355356
sql.stats.automatic_partial_collection.min_stale_rows integer 100 target minimum number of stale rows per table that will trigger a partial statistics refresh application
356357
sql.stats.cleanup.recurrence string @hourly cron-tab recurrence for SQL Stats cleanup job application
357358
sql.stats.detailed_latency_metrics.enabled boolean false label latency metrics with the statement fingerprint. Workloads with tens of thousands of distinct query fingerprints should leave this setting false. (experimental, affects performance for workloads with high fingerprint cardinality) application
359+
sql.stats.error_on_concurrent_create_stats.enabled boolean true set to true to error on concurrent CREATE STATISTICS jobs, instead of skipping them application
358360
sql.stats.flush.enabled boolean true if set, SQL execution statistics are periodically flushed to disk application
359361
sql.stats.flush.interval duration 10m0s the interval at which SQL execution statistics are flushed to disk, this value must be less than or equal to 1 hour application
360362
sql.stats.forecasts.enabled boolean true when true, enables generation of statistics forecasts by default for all tables application

docs/generated/settings/settings.html

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
<tr><td><div id="setting-changefeed-event-consumer-workers" class="anchored"><code>changefeed.event_consumer_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
2424
<tr><td><div id="setting-changefeed-fast-gzip-enabled" class="anchored"><code>changefeed.fast_gzip.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
2525
<tr><td><div id="setting-changefeed-frontier-highwater-lag-checkpoint-threshold" class="anchored"><code>changefeed.span_checkpoint.lag_threshold<br />(alias: changefeed.frontier_highwater_lag_checkpoint_threshold)</code></div></td><td>duration</td><td><code>10m0s</code></td><td>the amount of time a changefeed&#39;s lagging (slowest) spans must lag behind its leading (fastest) spans before a span-level checkpoint to save leading span progress is written; if 0, span-level checkpoints due to lagging spans is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
26+
<tr><td><div id="setting-changefeed-kafka-v2-error-details-enabled" class="anchored"><code>changefeed.kafka_v2_error_details.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled, Kafka v2 sinks will include the message key, size, and MVCC timestamp in message too large errors</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
2627
<tr><td><div id="setting-changefeed-memory-per-changefeed-limit" class="anchored"><code>changefeed.memory.per_changefeed_limit</code></div></td><td>byte size</td><td><code>512 MiB</code></td><td>controls amount of data that can be buffered per changefeed</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
2728
<tr><td><div id="setting-changefeed-min-highwater-advance" class="anchored"><code>changefeed.resolved_timestamp.min_update_interval<br />(alias: changefeed.min_highwater_advance)</code></div></td><td>duration</td><td><code>0s</code></td><td>minimum amount of time that must have elapsed since the last time a changefeed&#39;s resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
2829
<tr><td><div id="setting-changefeed-node-throttle-config" class="anchored"><code>changefeed.node_throttle_config</code></div></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
@@ -310,6 +311,7 @@
310311
<tr><td><div id="setting-sql-stats-automatic-partial-collection-min-stale-rows" class="anchored"><code>sql.stats.automatic_partial_collection.min_stale_rows</code></div></td><td>integer</td><td><code>100</code></td><td>target minimum number of stale rows per table that will trigger a partial statistics refresh</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
311312
<tr><td><div id="setting-sql-stats-cleanup-recurrence" class="anchored"><code>sql.stats.cleanup.recurrence</code></div></td><td>string</td><td><code>@hourly</code></td><td>cron-tab recurrence for SQL Stats cleanup job</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
312313
<tr><td><div id="setting-sql-stats-detailed-latency-metrics-enabled" class="anchored"><code>sql.stats.detailed_latency_metrics.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>label latency metrics with the statement fingerprint. Workloads with tens of thousands of distinct query fingerprints should leave this setting false. (experimental, affects performance for workloads with high fingerprint cardinality)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
314+
<tr><td><div id="setting-sql-stats-error-on-concurrent-create-stats-enabled" class="anchored"><code>sql.stats.error_on_concurrent_create_stats.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to true to error on concurrent CREATE STATISTICS jobs, instead of skipping them</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
313315
<tr><td><div id="setting-sql-stats-flush-enabled" class="anchored"><code>sql.stats.flush.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, SQL execution statistics are periodically flushed to disk</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
314316
<tr><td><div id="setting-sql-stats-flush-interval" class="anchored"><code>sql.stats.flush.interval</code></div></td><td>duration</td><td><code>10m0s</code></td><td>the interval at which SQL execution statistics are flushed to disk, this value must be less than or equal to 1 hour</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
315317
<tr><td><div id="setting-sql-stats-forecasts-enabled" class="anchored"><code>sql.stats.forecasts.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>when true, enables generation of statistics forecasts by default for all tables</td><td>Serverless/Dedicated/Self-Hosted</td></tr>

pkg/ccl/changefeedccl/changefeedbase/settings.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,3 +369,14 @@ var RetryBackoffReset = settings.RegisterDurationSettingWithExplicitUnit(
369369
10*time.Minute, /* defaultValue */
370370
settings.DurationInRange(1*time.Second, 1*time.Hour),
371371
)
372+
373+
// KafkaV2IncludeErrorDetails enables detailed error messages for Kafka v2 sinks
374+
// when message_too_large errors occur. This includes the message key, size,
375+
// and MVCC timestamp in the error.
376+
var KafkaV2ErrorDetailsEnabled = settings.RegisterBoolSetting(
377+
settings.ApplicationLevel,
378+
"changefeed.kafka_v2_error_details.enabled",
379+
"if enabled, Kafka v2 sinks will include the message key, size, and MVCC timestamp in message too large errors",
380+
true,
381+
settings.WithPublic,
382+
)

pkg/ccl/changefeedccl/sink_kafka_v2.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@ type kafkaSinkClientV2 struct {
4141
client KafkaClientV2
4242
adminClient KafkaAdminClientV2
4343

44-
knobs kafkaSinkV2Knobs
45-
canTryResizing bool
46-
recordResize func(numRecords int64)
44+
knobs kafkaSinkV2Knobs
45+
canTryResizing bool
46+
includeErrorDetails bool
47+
recordResize func(numRecords int64)
4748

4849
topicsForConnectionCheck []string
4950

@@ -124,6 +125,7 @@ func newKafkaSinkClientV2(
124125
knobs: knobs,
125126
batchCfg: batchCfg,
126127
canTryResizing: changefeedbase.BatchReductionRetryEnabled.Get(&settings.SV),
128+
includeErrorDetails: changefeedbase.KafkaV2ErrorDetailsEnabled.Get(&settings.SV),
127129
recordResize: recordResize,
128130
topicsForConnectionCheck: topicsForConnectionCheck,
129131
}
@@ -165,7 +167,7 @@ func (k *kafkaSinkClientV2) Flush(ctx context.Context, payload SinkPayload) (ret
165167
}
166168
return nil
167169
} else {
168-
if len(msgs) == 1 && errors.Is(err, kerr.MessageTooLarge) {
170+
if len(msgs) == 1 && errors.Is(err, kerr.MessageTooLarge) && k.includeErrorDetails {
169171
msg := msgs[0]
170172
mvccVal := msg.Context.Value(mvccTSKey{})
171173
var ts hlc.Timestamp
@@ -273,7 +275,7 @@ func (k *kafkaSinkClientV2) maybeUpdateTopicPartitions(
273275

274276
// MakeBatchBuffer implements SinkClient.
275277
func (k *kafkaSinkClientV2) MakeBatchBuffer(topic string) BatchBuffer {
276-
return &kafkaBuffer{topic: topic, batchCfg: k.batchCfg}
278+
return &kafkaBuffer{topic: topic, batchCfg: k.batchCfg, includeErrorDetails: k.includeErrorDetails}
277279
}
278280

279281
func (k *kafkaSinkClientV2) shouldTryResizing(err error, msgs []*kgo.Record) bool {
@@ -311,7 +313,8 @@ type kafkaBuffer struct {
311313
messages []*kgo.Record
312314
byteCount int
313315

314-
batchCfg sinkBatchConfig
316+
batchCfg sinkBatchConfig
317+
includeErrorDetails bool
315318
}
316319

317320
type mvccTSKey struct{}
@@ -328,7 +331,10 @@ func (b *kafkaBuffer) Append(ctx context.Context, key []byte, value []byte, attr
328331
headers = append(headers, kgo.RecordHeader{Key: k, Value: v})
329332
}
330333

331-
rctx := context.WithValue(ctx, mvccTSKey{}, attrs.mvcc)
334+
var rctx context.Context
335+
if b.includeErrorDetails {
336+
rctx = context.WithValue(ctx, mvccTSKey{}, attrs.mvcc)
337+
}
332338

333339
b.messages = append(b.messages, &kgo.Record{Key: key, Value: value, Topic: b.topic, Headers: headers, Context: rctx})
334340
b.byteCount += len(value)

pkg/kv/kvserver/asim/gen/generator.go

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -118,21 +118,24 @@ func (ml MultiLoad) Generate(seed int64, settings *config.SimulationSettings) []
118118

119119
// BasicLoad implements the LoadGen interface.
120120
type BasicLoad struct {
121-
RWRatio float64
122-
Rate float64
123-
SkewedAccess bool
124-
MinBlockSize int
125-
MaxBlockSize int
126-
MinKey, MaxKey int64
121+
RWRatio float64
122+
Rate float64
123+
SkewedAccess bool
124+
MinBlockSize int
125+
MaxBlockSize int
126+
MinKey, MaxKey int64
127+
RequestCPUPerAccess int64
128+
RaftCPUPerWrite int64
127129
}
128130

129131
var _ LoadGen = BasicLoad{}
130132

131133
func (bl BasicLoad) String() string {
132134
return fmt.Sprintf(
133135
"basic load with rw_ratio=%0.2f, rate=%0.2f, skewed_access=%t, min_block_size=%d, max_block_size=%d, "+
134-
"min_key=%d, max_key=%d",
135-
bl.RWRatio, bl.Rate, bl.SkewedAccess, bl.MinBlockSize, bl.MaxBlockSize, bl.MinKey, bl.MaxKey)
136+
"min_key=%d, max_key=%d, request_cpu_per_access=%d, raft_cpu_per_write=%d",
137+
bl.RWRatio, bl.Rate, bl.SkewedAccess, bl.MinBlockSize, bl.MaxBlockSize,
138+
bl.MinKey, bl.MaxKey, bl.RequestCPUPerAccess, bl.RaftCPUPerWrite)
136139
}
137140

138141
// Generate returns a new list of workload generators where the generator
@@ -162,6 +165,8 @@ func (bl BasicLoad) Generate(seed int64, settings *config.SimulationSettings) []
162165
bl.RWRatio,
163166
bl.MaxBlockSize,
164167
bl.MinBlockSize,
168+
bl.RaftCPUPerWrite,
169+
bl.RequestCPUPerAccess,
165170
),
166171
}
167172
}
@@ -188,17 +193,19 @@ func (lc LoadedCluster) Regions() []state.Region {
188193

189194
// BasicCluster implements the ClusterGen interace.
190195
type BasicCluster struct {
191-
Nodes int
192-
StoresPerNode int
193-
StoreByteCapacity int64
194-
Region []string
195-
NodesPerRegion []int
196+
Nodes int
197+
StoresPerNode int
198+
StoreByteCapacity int64
199+
Region []string
200+
NodesPerRegion []int
201+
NodeCPURateCapacity int64
196202
}
197203

198204
func (bc BasicCluster) String() string {
199205
var b strings.Builder
200-
_, _ = fmt.Fprintf(&b, "basic cluster with nodes=%d, stores_per_node=%d, store_byte_capacity=%d",
201-
bc.Nodes, bc.StoresPerNode, bc.StoreByteCapacity)
206+
_, _ = fmt.Fprintf(&b,
207+
"basic cluster with nodes=%d, stores_per_node=%d, store_byte_capacity=%d, node_cpu_rate_capacity=%d",
208+
bc.Nodes, bc.StoresPerNode, bc.StoreByteCapacity, bc.NodeCPURateCapacity)
202209
if len(bc.Region) != 0 {
203210
_, _ = fmt.Fprintf(&b, ", region=%v, nodes_per_region=%v", bc.Region, bc.NodesPerRegion)
204211
}
@@ -212,6 +219,7 @@ func (bc BasicCluster) String() string {
212219
func (bc BasicCluster) Generate(seed int64, settings *config.SimulationSettings) state.State {
213220
info := bc.info()
214221
info.StoreDiskCapacityBytes = bc.StoreByteCapacity
222+
info.NodeCPURateCapacityNanos = bc.NodeCPURateCapacity
215223
return state.LoadClusterInfo(info, settings)
216224
}
217225

@@ -346,9 +354,6 @@ func (b BaseRanges) GetRangesInfo(
346354

347355
// LoadRangeInfo loads the given state with the specified rangesInfo.
348356
func (b BaseRanges) LoadRangeInfo(s state.State, rangesInfo state.RangesInfo) {
349-
for _, rangeInfo := range rangesInfo {
350-
rangeInfo.Size = b.Bytes
351-
}
352357
state.LoadRangeInfo(s, rangesInfo...)
353358
}
354359

pkg/kv/kvserver/asim/metrics/series.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ func MakeTS(metrics [][]StoreMetrics) map[string][][]float64 {
2121
// custom scraper or provide definitions for each metric available. These
2222
// are partially duplicated with the cluster tracker.
2323
ret["qps"] = make([][]float64, stores)
24+
ret["cpu"] = make([][]float64, stores)
2425
ret["write"] = make([][]float64, stores)
2526
ret["write_b"] = make([][]float64, stores)
27+
ret["write_bytes_per_second"] = make([][]float64, stores)
2628
ret["read"] = make([][]float64, stores)
2729
ret["read_b"] = make([][]float64, stores)
2830
ret["replicas"] = make([][]float64, stores)
@@ -37,8 +39,10 @@ func MakeTS(metrics [][]StoreMetrics) map[string][][]float64 {
3739
for _, sms := range metrics {
3840
for i, sm := range sms {
3941
ret["qps"][i] = append(ret["qps"][i], float64(sm.QPS))
42+
ret["cpu"][i] = append(ret["cpu"][i], float64(sm.CPU))
4043
ret["write"][i] = append(ret["write"][i], float64(sm.WriteKeys))
4144
ret["write_b"][i] = append(ret["write_b"][i], float64(sm.WriteBytes))
45+
ret["write_bytes_per_second"][i] = append(ret["write_bytes_per_second"][i], float64(sm.WriteBytesPerSecond))
4246
ret["read"][i] = append(ret["read"][i], float64(sm.ReadKeys))
4347
ret["read_b"][i] = append(ret["read_b"][i], float64(sm.ReadBytes))
4448
ret["replicas"][i] = append(ret["replicas"][i], float64(sm.Replicas))

pkg/kv/kvserver/asim/metrics/tracker.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,17 @@ import (
1818
// StoreMetrics tracks metrics per-store in a simulation run. Each metrics
1919
// struct is associated with a tick.
2020
type StoreMetrics struct {
21-
Tick time.Time
22-
StoreID int64
23-
QPS int64
24-
WriteKeys int64
25-
WriteBytes int64
26-
ReadKeys int64
27-
ReadBytes int64
28-
Replicas int64
29-
Leases int64
21+
Tick time.Time
22+
StoreID int64
23+
QPS int64
24+
CPU int64
25+
WriteKeys int64
26+
WriteBytes int64
27+
WriteBytesPerSecond int64
28+
ReadKeys int64
29+
ReadBytes int64
30+
Replicas int64
31+
Leases int64
3032
// LeaseTransfers tracks the number of lease transfer that this store has
3133
// authored. Only the leaseholder store authors transfers.
3234
LeaseTransfers int64
@@ -45,14 +47,14 @@ func (sm *StoreMetrics) GetMetricValue(stat string) float64 {
4547
switch stat {
4648
case "qps":
4749
return float64(sm.QPS)
48-
// case "cpu":
49-
// value = float64(sm.CPU)
50-
// case "write_bytes_per_second":
51-
// value = float64(sm.WriteBytesPerSecond)
50+
case "cpu":
51+
return float64(sm.CPU)
5252
case "write":
5353
return float64(sm.WriteKeys)
5454
case "write_b":
5555
return float64(sm.WriteBytes)
56+
case "write_bytes_per_second":
57+
return float64(sm.WriteBytesPerSecond)
5658
case "read":
5759
return float64(sm.ReadKeys)
5860
case "read_b":
@@ -142,6 +144,7 @@ func (mt *Tracker) Tick(ctx context.Context, tick time.Time, s state.State) {
142144
Tick: tick,
143145
StoreID: int64(storeID),
144146
QPS: int64(desc.Capacity.QueriesPerSecond),
147+
CPU: int64(desc.Capacity.CPUPerSecond),
145148
WriteKeys: u.WriteKeys,
146149
WriteBytes: u.WriteBytes,
147150
ReadKeys: u.ReadKeys,

pkg/kv/kvserver/asim/op/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ go_library(
1515
deps = [
1616
"//pkg/kv/kvpb",
1717
"//pkg/kv/kvserver",
18-
"//pkg/kv/kvserver/allocator",
1918
"//pkg/kv/kvserver/allocator/allocatorimpl",
2019
"//pkg/kv/kvserver/allocator/storepool",
2120
"//pkg/kv/kvserver/asim/config",
@@ -30,7 +29,6 @@ go_test(
3029
srcs = ["controller_test.go"],
3130
embed = [":op"],
3231
deps = [
33-
"//pkg/kv/kvserver/allocator",
3432
"//pkg/kv/kvserver/allocator/allocatorimpl",
3533
"//pkg/kv/kvserver/asim/config",
3634
"//pkg/kv/kvserver/asim/gossip",

pkg/kv/kvserver/asim/op/controller_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"testing"
1212
"time"
1313

14-
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
1514
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
1615
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config"
1716
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/gossip"
@@ -106,7 +105,6 @@ func TestLeaseTransferOp(t *testing.T) {
106105
roachpb.RangeID(rangeID),
107106
0,
108107
roachpb.StoreID(target),
109-
allocator.RangeUsageInfo{},
110108
)
111109
ticket := controller.Dispatch(ctx, state.OffsetTick(start, tick), s, op)
112110
pending = append(pending, ticket)

pkg/kv/kvserver/asim/op/transfer_lease.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package op
88
import (
99
"time"
1010

11-
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
1211
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state"
1312
"github.com/cockroachdb/cockroach/pkg/roachpb"
1413
"github.com/cockroachdb/errors"
@@ -19,22 +18,17 @@ type TransferLeaseOp struct {
1918
baseOp
2019
source, target state.StoreID
2120
rangeID state.RangeID
22-
usage allocator.RangeUsageInfo
2321
}
2422

2523
// NewTransferLeaseOp returns a new TransferLeaseOp.
2624
func NewTransferLeaseOp(
27-
tick time.Time,
28-
rangeID roachpb.RangeID,
29-
source, target roachpb.StoreID,
30-
usage allocator.RangeUsageInfo,
25+
tick time.Time, rangeID roachpb.RangeID, source, target roachpb.StoreID,
3126
) *TransferLeaseOp {
3227
return &TransferLeaseOp{
3328
baseOp: newBaseOp(tick),
3429
source: state.StoreID(source),
3530
target: state.StoreID(target),
3631
rangeID: state.RangeID(rangeID),
37-
usage: usage,
3832
}
3933
}
4034

0 commit comments

Comments
 (0)