Skip to content

Commit af51942

Browse files
craig[bot]sumeerbholamgartnerstevendanna
committed
153364: admission,storage: increase overload score of store with disk slowness r=tbg,wenyihu6 a=sumeerbhola So that the allocator accounts for it in shedding leases etc. The unhealthy duration is also exported as a metric. Informs #153280 Epic: none Release note (ops change): The cluster setting storage.unhealthy_write_duration (defaults to 20s), is used to indicate to the allocator that a store's disk is unhealthy. The cluster setting kv.allocator.disk_unhealthy_io_overload_score controls the overload score assigned to a store with an unhealthy disk, where a higher score results in preventing lease or replica transfers to the store, or shedding of leases by the store. The default value of that setting is 0, so the allocator behavior is unaffected. 153810: opt: track distinct object references in metadata r=mgartner a=mgartner Prior to this commit, all referenced object names in a query, including tables, UDTs, and UDFs, were appended to slices in a memo's metadata. These names are later re-resolved to determine if a memo is stale. Because this list could contain duplicate entries, the same name could be re-resolved multiple times during the staleness check. Now, the distinct set of reference object names are maintained, eliminating duplicate object resolution. The `tree.UnresolvedObjectNameSet` type has been added to facilitate this. Fixes #153800 Release note: None 154322: changefeedccl: avoid heap allocation in timers r=andyyang890 a=stevendanna Unfortunately, the closure from Start() seemed to always be heap allocated. While minor, it was enough to spot when reading rangefeed-related memory profiles. ``` BenchmarkTimerStart/closure-based 8992018 128.1 ns/op 24 B/op 1 allocs/op BenchmarkTimerStart/handle-based 12124416 98.81 ns/op 0 B/op 0 allocs/op ``` Epic: none Release note: None Co-authored-by: sumeerbhola <[email protected]> Co-authored-by: Marcus Gartner <[email protected]> Co-authored-by: Steven Danna <[email protected]>
4 parents 4b9c7ab + 3834ff7 + d37c5d2 + c6fb9cf commit af51942

35 files changed

+708
-75
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16395,6 +16395,14 @@ layers:
1639516395
unit: COUNT
1639616396
aggregation: AVG
1639716397
derivative: NON_NEGATIVE_DERIVATIVE
16398+
- name: storage.disk-unhealthy.duration
16399+
exported_name: storage_disk_unhealthy_duration
16400+
description: Total disk unhealthy duration in nanos
16401+
y_axis_label: Nanoseconds
16402+
type: COUNTER
16403+
unit: NANOSECONDS
16404+
aggregation: AVG
16405+
derivative: NON_NEGATIVE_DERIVATIVE
1639816406
- name: storage.disk.io.time
1639916407
exported_name: storage_disk_io_time
1640016408
description: Time spent reading from or writing to the store's disk since this process started (as reported by the OS)

docs/generated/settings/settings.html

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@
367367
<tr><td><div id="setting-storage-sstable-compression-algorithm" class="anchored"><code>storage.sstable.compression_algorithm</code></div></td><td>enumeration</td><td><code>fastest</code></td><td>determines the compression algorithm to use when compressing sstable data blocks for use in a Pebble store (balanced,good are experimental); [snappy = 1, zstd = 2, none = 3, minlz = 4, fastest = 5, balanced = 6, good = 7]</td><td>Advanced/Self-hosted (read-write); Basic/Standard (read-only)</td></tr>
368368
<tr><td><div id="setting-storage-sstable-compression-algorithm-backup-storage" class="anchored"><code>storage.sstable.compression_algorithm_backup_storage</code></div></td><td>enumeration</td><td><code>fastest</code></td><td>determines the compression algorithm to use when compressing sstable data blocks for backup row data storage (fast,balanced,good are experimental); [snappy = 1, zstd = 2, none = 3, minlz = 4, fastest = 5, fast = 6, balanced = 7, good = 8]</td><td>Advanced/Self-hosted (read-write); Basic/Standard (read-only)</td></tr>
369369
<tr><td><div id="setting-storage-sstable-compression-algorithm-backup-transport" class="anchored"><code>storage.sstable.compression_algorithm_backup_transport</code></div></td><td>enumeration</td><td><code>fastest</code></td><td>determines the compression algorithm to use when compressing sstable data blocks for backup transport (fast,balanced,good are experimental); [snappy = 1, zstd = 2, none = 3, minlz = 4, fastest = 5, fast = 6, balanced = 7, good = 8]</td><td>Advanced/Self-hosted (read-write); Basic/Standard (read-only)</td></tr>
370+
<tr><td><div id="setting-storage-unhealthy-write-duration" class="anchored"><code>storage.unhealthy_write_duration</code></div></td><td>duration</td><td><code>20s</code></td><td>duration for disk write operations, beyond which the disk will be reported as unhealthy for higher layer actions</td><td>Advanced/Self-Hosted</td></tr>
370371
<tr><td><div id="setting-storage-wal-failover-unhealthy-op-threshold" class="anchored"><code>storage.wal_failover.unhealthy_op_threshold</code></div></td><td>duration</td><td><code>100ms</code></td><td>the latency of a WAL write considered unhealthy and triggers a failover to a secondary WAL location</td><td>Advanced/Self-Hosted</td></tr>
371372
<tr><td><div id="setting-timeseries-storage-enabled" class="anchored"><code>timeseries.storage.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere</td><td>Advanced/Self-Hosted</td></tr>
372373
<tr><td><div id="setting-timeseries-storage-resolution-10s-ttl" class="anchored"><code>timeseries.storage.resolution_10s.ttl</code></div></td><td>duration</td><td><code>240h0m0s</code></td><td>the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion.</td><td>Advanced/Self-hosted (read-write); Basic/Standard (read-only)</td></tr>

pkg/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ ALL_TESTS = [
3939
"//pkg/ccl/changefeedccl/resolvedspan:resolvedspan_test",
4040
"//pkg/ccl/changefeedccl/schemafeed:schemafeed_test",
4141
"//pkg/ccl/changefeedccl/tableset:tableset_test",
42+
"//pkg/ccl/changefeedccl/timers:timers_test",
4243
"//pkg/ccl/changefeedccl:changefeedccl_test",
4344
"//pkg/ccl/cliccl:cliccl_test",
4445
"//pkg/ccl/cloudccl/amazon:amazon_test",
@@ -958,6 +959,7 @@ GO_TARGETS = [
958959
"//pkg/ccl/changefeedccl/tableset:tableset",
959960
"//pkg/ccl/changefeedccl/tableset:tableset_test",
960961
"//pkg/ccl/changefeedccl/timers:timers",
962+
"//pkg/ccl/changefeedccl/timers:timers_test",
961963
"//pkg/ccl/changefeedccl:changefeedccl",
962964
"//pkg/ccl/changefeedccl:changefeedccl_test",
963965
"//pkg/ccl/cliccl:cliccl",

pkg/ccl/changefeedccl/batching_sink.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
359359
batch, _ := req.(*sinkBatch)
360360
defer s.metrics.recordSinkIOInflightChange(int64(-batch.numMessages))
361361
s.metrics.recordSinkIOInflightChange(int64(batch.numMessages))
362-
defer s.metrics.timers().DownstreamClientSend.Start()()
362+
defer s.metrics.timers().DownstreamClientSend.Start().End()
363363

364364
return s.client.Flush(ctx, batch.payload)
365365
}

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1843,7 +1843,7 @@ func (cf *changeFrontier) checkpointJobProgress(
18431843
) error {
18441844
ctx, sp := tracing.ChildSpan(ctx, "changefeed.frontier.checkpoint_job_progress")
18451845
defer sp.Finish()
1846-
defer cf.sliMetrics.Timers.CheckpointJobProgress.Start()()
1846+
defer cf.sliMetrics.Timers.CheckpointJobProgress.Start().End()
18471847

18481848
if cf.knobs.RaiseRetryableError != nil {
18491849
if err := cf.knobs.RaiseRetryableError(); err != nil {
@@ -1924,7 +1924,7 @@ func (cf *changeFrontier) maybePersistFrontier(ctx context.Context) error {
19241924
}); err != nil {
19251925
return err
19261926
}
1927-
persistDuration := timer()
1927+
persistDuration := timer.End()
19281928
cf.frontierPersistenceLimiter.doneSave(persistDuration)
19291929
return nil
19301930
}
@@ -1950,11 +1950,11 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19501950
recordPTSMetricsErrorTime := cf.sliMetrics.Timers.PTSManageError.Start()
19511951
defer func() {
19521952
if err != nil {
1953-
recordPTSMetricsErrorTime()
1953+
recordPTSMetricsErrorTime.End()
19541954
return
19551955
}
19561956
if updated {
1957-
recordPTSMetricsTime()
1957+
recordPTSMetricsTime.End()
19581958
}
19591959
}()
19601960

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ func changefeedPlanHook(
446446
}
447447
return err
448448
}
449-
recordPTSMetricsTime()
449+
recordPTSMetricsTime.End()
450450
}
451451

452452
// Start the job.

pkg/ccl/changefeedccl/event_processing.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,7 @@ func (c *kvEventToRowConsumer) encodeAndEmit(
442442
}
443443
}
444444

445-
stop := c.metrics.Timers.Encode.Start()
445+
timer := c.metrics.Timers.Encode.Start()
446446
if c.encodingOpts.Format == changefeedbase.OptFormatParquet {
447447
return c.encodeForParquet(
448448
ctx, updatedRow, prevRow, topic, schemaTS, updatedRow.MvccTimestamp,
@@ -466,7 +466,7 @@ func (c *kvEventToRowConsumer) encodeAndEmit(
466466
// Since we're done processing/converting this event, and will not use much more
467467
// than len(key)+len(bytes) worth of resources, adjust allocation to match.
468468
alloc.AdjustBytesToTarget(ctx, int64(len(keyCopy)+len(valueCopy)))
469-
stop()
469+
timer.End()
470470

471471
headers, err := c.makeRowHeaders(ctx, updatedRow)
472472
if err != nil {

pkg/ccl/changefeedccl/kvfeed/kv_feed.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -732,7 +732,7 @@ func copyFromSourceToDestUntilTableEvent(
732732
// from rangefeed) and checks if a table event was encountered at or before
733733
// said timestamp. If so, it replaces the copy boundary with the table event.
734734
checkForTableEvent = func(ts hlc.Timestamp) error {
735-
defer st.KVFeedWaitForTableEvent.Start()()
735+
defer st.KVFeedWaitForTableEvent.Start().End()
736736
// There's no need to check for table events again if we already found one
737737
// since that should already be the earliest one.
738738
if _, ok := boundary.(*errTableEventReached); ok {
@@ -829,7 +829,7 @@ func copyFromSourceToDestUntilTableEvent(
829829

830830
// writeToDest writes an event to the dest.
831831
writeToDest = func(e kvevent.Event) error {
832-
defer st.KVFeedBuffer.Start()()
832+
defer st.KVFeedBuffer.Start().End()
833833

834834
switch e.Type() {
835835
case kvevent.TypeKV, kvevent.TypeFlush:

pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,13 +144,13 @@ func (p *rangefeed) handleRangefeedEvent(ctx context.Context, e *kvpb.RangeFeedE
144144
return err
145145
}
146146
}
147-
stop := p.st.RangefeedBufferValue.Start()
147+
timer := p.st.RangefeedBufferValue.Start()
148148
if err := p.memBuf.Add(
149149
ctx, kvevent.MakeKVEvent(e),
150150
); err != nil {
151151
return err
152152
}
153-
stop()
153+
timer.End()
154154
case *kvpb.RangeFeedCheckpoint:
155155
ev := e.ShallowCopy()
156156
ev.Checkpoint.ResolvedTS = quantizeTS(ev.Checkpoint.ResolvedTS, p.cfg.WithFrontierQuantize)
@@ -163,13 +163,13 @@ func (p *rangefeed) handleRangefeedEvent(ctx context.Context, e *kvpb.RangeFeedE
163163
if p.knobs.ShouldSkipCheckpoint != nil && p.knobs.ShouldSkipCheckpoint(t) {
164164
return nil
165165
}
166-
stop := p.st.RangefeedBufferCheckpoint.Start()
166+
timer := p.st.RangefeedBufferCheckpoint.Start()
167167
if err := p.memBuf.Add(
168168
ctx, kvevent.MakeResolvedEvent(ev, jobspb.ResolvedSpan_NONE),
169169
); err != nil {
170170
return err
171171
}
172-
stop()
172+
timer.End()
173173
case *kvpb.RangeFeedSSTable:
174174
// For now, we just error on SST ingestion, since we currently don't
175175
// expect SST ingestion into spans with active changefeeds.

pkg/ccl/changefeedccl/sink_cloudstorage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -892,7 +892,7 @@ func (f *cloudStorageSinkFile) flushToStorage(
892892
ctx context.Context, es cloud.ExternalStorage, dest string, m metricsRecorder,
893893
) error {
894894
defer f.releaseAlloc(ctx)
895-
defer m.timers().DownstreamClientSend.Start()()
895+
defer m.timers().DownstreamClientSend.Start().End()
896896

897897
if f.rawSize == 0 {
898898
// This method shouldn't be called with an empty file, but be defensive

0 commit comments

Comments
 (0)