Skip to content

Commit e5f821c

Browse files
craig[bot]stevendannapav-kv
committed
154318: more timeutil.Now -> crtime.NowMono conversions r=tbg,aerfrei a=stevendanna See individual commits. Just a few I noticed when looking at rangefeed profiles. 154371: kvserver: deflake TestLogGrowthWhenRefreshingPendingCommands r=arulajmani a=pav-kv Fixes #154329 Co-authored-by: Steven Danna <[email protected]> Co-authored-by: Pavel Kalinnikov <[email protected]>
3 parents e3e757c + 90510e5 + 1381de1 commit e5f821c

32 files changed

+104
-87
lines changed

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ go_library(
169169
"//pkg/util/uuid",
170170
"@com_github_apache_pulsar_client_go//pulsar",
171171
"@com_github_cockroachdb_changefeedpb//:go_default_library",
172+
"@com_github_cockroachdb_crlib//crtime",
172173
"@com_github_cockroachdb_errors//:errors",
173174
"@com_github_cockroachdb_logtags//:logtags",
174175
"@com_github_cockroachdb_redact//:redact",

pkg/ccl/changefeedccl/batching_sink.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/cockroachdb/cockroach/pkg/util/retry"
2323
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
2424
"github.com/cockroachdb/cockroach/pkg/util/tracing"
25+
"github.com/cockroachdb/crlib/crtime"
2526
"github.com/cockroachdb/errors"
2627
)
2728

@@ -265,7 +266,7 @@ type sinkBatch struct {
265266
numMessages int
266267
numKVBytes int // the total amount of uncompressed kv data in the batch
267268
keys intsets.Fast // the set of keys within the batch to provide to parallelIO
268-
bufferTime time.Time // the earliest time a message was inserted into the batch
269+
bufferTime crtime.Mono // the earliest time a message was inserted into the batch
269270
mvcc hlc.Timestamp
270271

271272
alloc kvevent.Alloc
@@ -306,7 +307,7 @@ func hashToInt(h hash.Hash32, buf []byte) int {
306307
// Append adds the contents of a kvEvent to the batch, merging its alloc pool.
307308
func (sb *sinkBatch) Append(ctx context.Context, e *rowEvent) {
308309
if sb.isEmpty() {
309-
sb.bufferTime = timeutil.Now()
310+
sb.bufferTime = crtime.NowMono()
310311
}
311312

312313
sb.buffer.Append(ctx, e.key, e.val, attributes{
@@ -409,7 +410,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
409410

410411
req, send, err := ioEmitter.AdmitRequest(ctx, batchBuffer)
411412
if errors.Is(err, ErrNotEnoughQuota) {
412-
waitStart := timeutil.Now()
413+
waitStart := crtime.NowMono()
413414

414415
// Quota can only be freed by consuming a result.
415416
select {
@@ -431,7 +432,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
431432
return err
432433
}
433434

434-
s.metrics.recordSinkBackpressure(timeutil.Since(waitStart))
435+
s.metrics.recordSinkBackpressure(waitStart.Elapsed())
435436
} else if err != nil {
436437
return err
437438
} else {

pkg/ccl/changefeedccl/cdceval/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ go_library(
4343
"//pkg/util/ctxgroup",
4444
"//pkg/util/hlc",
4545
"//pkg/util/log",
46-
"//pkg/util/timeutil",
46+
"@com_github_cockroachdb_crlib//crtime",
4747
"@com_github_cockroachdb_errors//:errors",
4848
"@com_github_lib_pq//oid",
4949
],

pkg/ccl/changefeedccl/cdceval/expr_eval.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
2626
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2727
"github.com/cockroachdb/cockroach/pkg/util/log"
28-
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
28+
"github.com/cockroachdb/crlib/crtime"
2929
"github.com/cockroachdb/errors"
3030
)
3131

@@ -256,10 +256,10 @@ func sameVersion(currentVersion, newVersion *cdcevent.EventDescriptor) bool {
256256
// planAndRun plans CDC expression and starts execution pipeline.
257257
func (e *familyEvaluator) planAndRun(ctx context.Context) (err error) {
258258
if log.V(1) {
259-
start := timeutil.Now()
259+
start := crtime.NowMono()
260260
defer func() {
261261
log.Changefeed.Infof(ctx, "Planning for CDC expression %s (v=%d) took %s (err=%v)",
262-
tree.AsString(e.norm), e.norm.desc.Version, timeutil.Since(start), err)
262+
tree.AsString(e.norm), e.norm.desc.Version, start.Elapsed(), err)
263263
}()
264264
}
265265

pkg/ccl/changefeedccl/cdcutils/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ go_library(
1111
"//pkg/util/log",
1212
"//pkg/util/metric",
1313
"//pkg/util/quotapool",
14-
"//pkg/util/timeutil",
1514
"//pkg/util/tracing",
15+
"@com_github_cockroachdb_crlib//crtime",
1616
],
1717
)
1818

pkg/ccl/changefeedccl/cdcutils/throttle.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/util/log"
1818
"github.com/cockroachdb/cockroach/pkg/util/metric"
1919
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
20-
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
2120
"github.com/cockroachdb/cockroach/pkg/util/tracing"
21+
"github.com/cockroachdb/crlib/crtime"
2222
)
2323

2424
// Throttler is a changefeed IO throttler.
@@ -168,9 +168,9 @@ func (m Metrics) MetricStruct() {}
168168
func waitQuota(
169169
ctx context.Context, n int64, limit *quotapool.RateLimiter, c *metric.Counter,
170170
) error {
171-
start := timeutil.Now()
171+
start := crtime.NowMono()
172172
defer func() {
173-
c.Inc(int64(timeutil.Since(start)))
173+
c.Inc(start.Elapsed().Nanoseconds())
174174
}()
175175
return limit.WaitN(ctx, n)
176176
}

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ import (
5555
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
5656
"github.com/cockroachdb/cockroach/pkg/util/tracing"
5757
"github.com/cockroachdb/cockroach/pkg/util/uuid"
58+
"github.com/cockroachdb/crlib/crtime"
5859
"github.com/cockroachdb/errors"
5960
"github.com/cockroachdb/logtags"
6061
"github.com/cockroachdb/redact"
@@ -856,7 +857,7 @@ func (ca *changeAggregator) tick() error {
856857
return err
857858
}
858859

859-
queuedNanos := timeutil.Since(event.BufferAddTimestamp()).Nanoseconds()
860+
queuedNanos := event.BufferAddTimestamp().Elapsed().Nanoseconds()
860861
ca.metrics.QueueTimeNanos.Inc(queuedNanos)
861862

862863
switch event.Type() {
@@ -935,11 +936,11 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error
935936
checkpointFrontier := (advanced && forceFlush) || ca.frontierFlushLimiter.canSave(ctx)
936937

937938
if checkpointFrontier {
938-
now := timeutil.Now()
939+
now := crtime.NowMono()
939940
if err := ca.flushFrontier(ctx); err != nil {
940941
return err
941942
}
942-
ca.frontierFlushLimiter.doneSave(timeutil.Since(now))
943+
ca.frontierFlushLimiter.doneSave(now.Elapsed())
943944
}
944945

945946
return nil

pkg/ccl/changefeedccl/checkpoint/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ go_library(
1515
"//pkg/util/hlc",
1616
"//pkg/util/metric",
1717
"//pkg/util/metric/aggmetric",
18-
"//pkg/util/timeutil",
18+
"@com_github_cockroachdb_crlib//crtime",
1919
"@com_github_cockroachdb_errors//:errors",
2020
],
2121
)

pkg/ccl/changefeedccl/checkpoint/checkpoint.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1414
"github.com/cockroachdb/cockroach/pkg/roachpb"
1515
"github.com/cockroachdb/cockroach/pkg/util/hlc"
16-
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
16+
"github.com/cockroachdb/crlib/crtime"
1717
"github.com/cockroachdb/errors"
1818
)
1919

@@ -27,7 +27,7 @@ func Make(
2727
maxBytes int64,
2828
metrics *Metrics,
2929
) *jobspb.TimestampSpansMap {
30-
start := timeutil.Now()
30+
start := crtime.NowMono()
3131

3232
spanGroupMap := make(map[hlc.Timestamp]*roachpb.SpanGroup)
3333
for s, ts := range spans {
@@ -60,7 +60,7 @@ func Make(
6060
}
6161

6262
if metrics != nil {
63-
metrics.CreateNanos.RecordValue(int64(timeutil.Since(start)))
63+
metrics.CreateNanos.RecordValue(start.Elapsed().Nanoseconds())
6464
metrics.TotalBytes.RecordValue(int64(cp.Size()))
6565
metrics.TimestampCount.RecordValue(int64(cp.TimestampCount()))
6666
metrics.SpanCount.RecordValue(int64(cp.SpanCount()))

pkg/ccl/changefeedccl/event_processing.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
3535
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
3636
"github.com/cockroachdb/cockroach/pkg/util/tracing"
37+
"github.com/cockroachdb/crlib/crtime"
3738
"github.com/cockroachdb/errors"
3839
"github.com/cockroachdb/redact"
3940
)
@@ -631,10 +632,9 @@ type parallelEventConsumer struct {
631632
var _ eventConsumer = (*parallelEventConsumer)(nil)
632633

633634
func (c *parallelEventConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Event) error {
634-
startTime := timeutil.Now().UnixNano()
635+
start := crtime.NowMono()
635636
defer func() {
636-
time := timeutil.Now().UnixNano()
637-
c.metrics.ParallelConsumerConsumeNanos.RecordValue(time - startTime)
637+
c.metrics.ParallelConsumerConsumeNanos.RecordValue(start.Elapsed().Nanoseconds())
638638
}()
639639

640640
bucket := c.getBucketForEvent(ev)
@@ -760,10 +760,9 @@ func (c *parallelEventConsumer) setWorkerError(err error) error {
760760
// Flush flushes the consumer by blocking until all events are consumed,
761761
// or until there is an error.
762762
func (c *parallelEventConsumer) Flush(ctx context.Context) error {
763-
startTime := timeutil.Now().UnixNano()
763+
start := crtime.NowMono()
764764
defer func() {
765-
time := timeutil.Now().UnixNano()
766-
c.metrics.ParallelConsumerFlushNanos.RecordValue(time - startTime)
765+
c.metrics.ParallelConsumerFlushNanos.RecordValue(start.Elapsed().Nanoseconds())
767766
}()
768767

769768
needFlush := func() bool {

0 commit comments

Comments
 (0)