Skip to content

Commit c6fb9cf

Browse files
committed
changefeedccl: avoid heap allocation in timers
Unfortunately, the closure from Start() seemed to always be heap allocated. While minor, it was enough to spot when reading rangefeed-related memory profiles. ``` BenchmarkTimerStart/closure-based 8992018 128.1 ns/op 24 B/op 1 allocs/op BenchmarkTimerStart/handle-based 12124416 98.81 ns/op 0 B/op 0 allocs/op ``` Epic: none Release note: None
1 parent 45cc319 commit c6fb9cf

13 files changed

+89
-27
lines changed

pkg/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ ALL_TESTS = [
3939
"//pkg/ccl/changefeedccl/resolvedspan:resolvedspan_test",
4040
"//pkg/ccl/changefeedccl/schemafeed:schemafeed_test",
4141
"//pkg/ccl/changefeedccl/tableset:tableset_test",
42+
"//pkg/ccl/changefeedccl/timers:timers_test",
4243
"//pkg/ccl/changefeedccl:changefeedccl_test",
4344
"//pkg/ccl/cliccl:cliccl_test",
4445
"//pkg/ccl/cloudccl/amazon:amazon_test",
@@ -958,6 +959,7 @@ GO_TARGETS = [
958959
"//pkg/ccl/changefeedccl/tableset:tableset",
959960
"//pkg/ccl/changefeedccl/tableset:tableset_test",
960961
"//pkg/ccl/changefeedccl/timers:timers",
962+
"//pkg/ccl/changefeedccl/timers:timers_test",
961963
"//pkg/ccl/changefeedccl:changefeedccl",
962964
"//pkg/ccl/changefeedccl:changefeedccl_test",
963965
"//pkg/ccl/cliccl:cliccl",

pkg/ccl/changefeedccl/batching_sink.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
359359
batch, _ := req.(*sinkBatch)
360360
defer s.metrics.recordSinkIOInflightChange(int64(-batch.numMessages))
361361
s.metrics.recordSinkIOInflightChange(int64(batch.numMessages))
362-
defer s.metrics.timers().DownstreamClientSend.Start()()
362+
defer s.metrics.timers().DownstreamClientSend.Start().End()
363363

364364
return s.client.Flush(ctx, batch.payload)
365365
}

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1843,7 +1843,7 @@ func (cf *changeFrontier) checkpointJobProgress(
18431843
) error {
18441844
ctx, sp := tracing.ChildSpan(ctx, "changefeed.frontier.checkpoint_job_progress")
18451845
defer sp.Finish()
1846-
defer cf.sliMetrics.Timers.CheckpointJobProgress.Start()()
1846+
defer cf.sliMetrics.Timers.CheckpointJobProgress.Start().End()
18471847

18481848
if cf.knobs.RaiseRetryableError != nil {
18491849
if err := cf.knobs.RaiseRetryableError(); err != nil {
@@ -1924,7 +1924,7 @@ func (cf *changeFrontier) maybePersistFrontier(ctx context.Context) error {
19241924
}); err != nil {
19251925
return err
19261926
}
1927-
persistDuration := timer()
1927+
persistDuration := timer.End()
19281928
cf.frontierPersistenceLimiter.doneSave(persistDuration)
19291929
return nil
19301930
}
@@ -1950,11 +1950,11 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19501950
recordPTSMetricsErrorTime := cf.sliMetrics.Timers.PTSManageError.Start()
19511951
defer func() {
19521952
if err != nil {
1953-
recordPTSMetricsErrorTime()
1953+
recordPTSMetricsErrorTime.End()
19541954
return
19551955
}
19561956
if updated {
1957-
recordPTSMetricsTime()
1957+
recordPTSMetricsTime.End()
19581958
}
19591959
}()
19601960

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ func changefeedPlanHook(
446446
}
447447
return err
448448
}
449-
recordPTSMetricsTime()
449+
recordPTSMetricsTime.End()
450450
}
451451

452452
// Start the job.

pkg/ccl/changefeedccl/event_processing.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,7 @@ func (c *kvEventToRowConsumer) encodeAndEmit(
442442
}
443443
}
444444

445-
stop := c.metrics.Timers.Encode.Start()
445+
timer := c.metrics.Timers.Encode.Start()
446446
if c.encodingOpts.Format == changefeedbase.OptFormatParquet {
447447
return c.encodeForParquet(
448448
ctx, updatedRow, prevRow, topic, schemaTS, updatedRow.MvccTimestamp,
@@ -466,7 +466,7 @@ func (c *kvEventToRowConsumer) encodeAndEmit(
466466
// Since we're done processing/converting this event, and will not use much more
467467
// than len(key)+len(bytes) worth of resources, adjust allocation to match.
468468
alloc.AdjustBytesToTarget(ctx, int64(len(keyCopy)+len(valueCopy)))
469-
stop()
469+
timer.End()
470470

471471
headers, err := c.makeRowHeaders(ctx, updatedRow)
472472
if err != nil {

pkg/ccl/changefeedccl/kvfeed/kv_feed.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -732,7 +732,7 @@ func copyFromSourceToDestUntilTableEvent(
732732
// from rangefeed) and checks if a table event was encountered at or before
733733
// said timestamp. If so, it replaces the copy boundary with the table event.
734734
checkForTableEvent = func(ts hlc.Timestamp) error {
735-
defer st.KVFeedWaitForTableEvent.Start()()
735+
defer st.KVFeedWaitForTableEvent.Start().End()
736736
// There's no need to check for table events again if we already found one
737737
// since that should already be the earliest one.
738738
if _, ok := boundary.(*errTableEventReached); ok {
@@ -829,7 +829,7 @@ func copyFromSourceToDestUntilTableEvent(
829829

830830
// writeToDest writes an event to the dest.
831831
writeToDest = func(e kvevent.Event) error {
832-
defer st.KVFeedBuffer.Start()()
832+
defer st.KVFeedBuffer.Start().End()
833833

834834
switch e.Type() {
835835
case kvevent.TypeKV, kvevent.TypeFlush:

pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,13 +144,13 @@ func (p *rangefeed) handleRangefeedEvent(ctx context.Context, e *kvpb.RangeFeedE
144144
return err
145145
}
146146
}
147-
stop := p.st.RangefeedBufferValue.Start()
147+
timer := p.st.RangefeedBufferValue.Start()
148148
if err := p.memBuf.Add(
149149
ctx, kvevent.MakeKVEvent(e),
150150
); err != nil {
151151
return err
152152
}
153-
stop()
153+
timer.End()
154154
case *kvpb.RangeFeedCheckpoint:
155155
ev := e.ShallowCopy()
156156
ev.Checkpoint.ResolvedTS = quantizeTS(ev.Checkpoint.ResolvedTS, p.cfg.WithFrontierQuantize)
@@ -163,13 +163,13 @@ func (p *rangefeed) handleRangefeedEvent(ctx context.Context, e *kvpb.RangeFeedE
163163
if p.knobs.ShouldSkipCheckpoint != nil && p.knobs.ShouldSkipCheckpoint(t) {
164164
return nil
165165
}
166-
stop := p.st.RangefeedBufferCheckpoint.Start()
166+
timer := p.st.RangefeedBufferCheckpoint.Start()
167167
if err := p.memBuf.Add(
168168
ctx, kvevent.MakeResolvedEvent(ev, jobspb.ResolvedSpan_NONE),
169169
); err != nil {
170170
return err
171171
}
172-
stop()
172+
timer.End()
173173
case *kvpb.RangeFeedSSTable:
174174
// For now, we just error on SST ingestion, since we currently don't
175175
// expect SST ingestion into spans with active changefeeds.

pkg/ccl/changefeedccl/sink_cloudstorage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -892,7 +892,7 @@ func (f *cloudStorageSinkFile) flushToStorage(
892892
ctx context.Context, es cloud.ExternalStorage, dest string, m metricsRecorder,
893893
) error {
894894
defer f.releaseAlloc(ctx)
895-
defer m.timers().DownstreamClientSend.Start()()
895+
defer m.timers().DownstreamClientSend.Start().End()
896896

897897
if f.rawSize == 0 {
898898
// This method shouldn't be called with an empty file, but be defensive

pkg/ccl/changefeedccl/sink_kafka.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -382,10 +382,10 @@ func (s *kafkaSink) EmitRow(
382382
// inside sarama, set the DownstreamClientSend timer to the same value as
383383
// BatchHistNanos.
384384
recordOneMessageCb := s.metrics.recordOneMessage()
385-
downstreamClientSendCb := s.metrics.timers().DownstreamClientSend.Start()
385+
downstreamClientSend := s.metrics.timers().DownstreamClientSend.Start()
386386
updateMetrics := func(mvcc hlc.Timestamp, bytes int, compressedBytes int) {
387387
recordOneMessageCb(mvcc, bytes, compressedBytes)
388-
downstreamClientSendCb()
388+
downstreamClientSend.End()
389389
}
390390

391391
recordHeaders := make([]sarama.RecordHeader, 0, len(headers))

pkg/ccl/changefeedccl/sink_pulsar.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ func (p *pulsarSink) msgCallback(
262262
oneMsgCb := p.metrics.recordOneMessage()
263263

264264
return func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) {
265-
sendCb()
265+
sendCb.End()
266266
if err == nil {
267267
oneMsgCb(mvcc, len(message.Payload), len(message.Payload))
268268
} else {

0 commit comments

Comments
 (0)