Skip to content

Commit e404533

Browse files
committed
kvserver/closedts: move from log.KvDistribution to log.KvExec
1 parent 6c9feca commit e404533

File tree

7 files changed

+19
-19
lines changed

7 files changed

+19
-19
lines changed

pkg/kv/kvserver/closedts/policyrefresher/policy_refresher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func NewPolicyRefresher(
6969
knobs *TestingKnobs,
7070
) *PolicyRefresher {
7171
if getLeaseholderReplicas == nil || getNodeLatencies == nil {
72-
log.KvDistribution.Fatalf(context.Background(), "getLeaseholderReplicas and getNodeLatencies must be non-nil")
72+
log.KvExec.Fatalf(context.Background(), "getLeaseholderReplicas and getNodeLatencies must be non-nil")
7373
return nil
7474
}
7575
refresher := &PolicyRefresher{

pkg/kv/kvserver/closedts/sidetransport/receiver.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func (s *Receiver) onRecvErr(ctx context.Context, nodeID roachpb.NodeID, err err
147147
defer s.mu.Unlock()
148148

149149
if err != io.EOF {
150-
log.KvDistribution.Warningf(ctx, "closed timestamps side-transport connection dropped from node: %d (%s)", nodeID, err)
150+
log.KvExec.Warningf(ctx, "closed timestamps side-transport connection dropped from node: %d (%s)", nodeID, err)
151151
} else {
152152
log.VEventf(ctx, 2, "closed timestamps side-transport connection dropped from node: %d (%s)", nodeID, err)
153153
}
@@ -236,11 +236,11 @@ func (r *incomingStream) processUpdate(ctx context.Context, msg *ctpb.Update) {
236236
log.VEventf(ctx, 4, "received side-transport update: %v", msg)
237237

238238
if msg.NodeID == 0 {
239-
log.KvDistribution.Fatalf(ctx, "missing NodeID in message: %s", msg)
239+
log.KvExec.Fatalf(ctx, "missing NodeID in message: %s", msg)
240240
}
241241

242242
if msg.NodeID != r.nodeID {
243-
log.KvDistribution.Fatalf(ctx, "wrong NodeID; expected %d, got %d", r.nodeID, msg.NodeID)
243+
log.KvExec.Fatalf(ctx, "wrong NodeID; expected %d, got %d", r.nodeID, msg.NodeID)
244244
}
245245

246246
// Handle the removed ranges. In order to not lose closed ts info, before we
@@ -259,11 +259,11 @@ func (r *incomingStream) processUpdate(ctx context.Context, msg *ctpb.Update) {
259259
for _, rangeID := range msg.Removed {
260260
info, ok := r.mu.tracked[rangeID]
261261
if !ok {
262-
log.KvDistribution.Fatalf(ctx, "attempting to unregister a missing range: r%d", rangeID)
262+
log.KvExec.Fatalf(ctx, "attempting to unregister a missing range: r%d", rangeID)
263263
}
264264
ts, ok := r.mu.lastClosed[info.policy]
265265
if !ok {
266-
log.KvDistribution.Fatalf(ctx, "missing closed timestamp policy %v for range r%d", info.policy, rangeID)
266+
log.KvExec.Fatalf(ctx, "missing closed timestamp policy %v for range r%d", info.policy, rangeID)
267267
}
268268
r.stores.ForwardSideTransportClosedTimestampForRange(ctx, rangeID, ts, info.lai)
269269
}
@@ -279,7 +279,7 @@ func (r *incomingStream) processUpdate(ctx context.Context, msg *ctpb.Update) {
279279
r.mu.lastClosed = make(map[ctpb.RangeClosedTimestampPolicy]hlc.Timestamp, len(r.mu.lastClosed))
280280
r.mu.tracked = make(map[roachpb.RangeID]trackedRange, len(r.mu.tracked))
281281
} else if msg.SeqNum != r.mu.lastSeqNum+1 {
282-
log.KvDistribution.Fatalf(ctx, "expected closed timestamp side-transport message with sequence number "+
282+
log.KvExec.Fatalf(ctx, "expected closed timestamp side-transport message with sequence number "+
283283
"%d, got %d", r.mu.lastSeqNum+1, msg.SeqNum)
284284
}
285285
r.mu.lastSeqNum = msg.SeqNum
@@ -328,13 +328,13 @@ func (r *incomingStream) Run(
328328
r.nodeID = msg.NodeID
329329

330330
if err := r.server.onFirstMsg(ctx, r, r.nodeID); err != nil {
331-
log.KvDistribution.Warningf(ctx, "%s", err.Error())
331+
log.KvExec.Warningf(ctx, "%s", err.Error())
332332
return
333333
} else if ch := r.testingKnobs.onFirstMsg; ch != nil {
334334
ch <- struct{}{}
335335
}
336336
if !msg.Snapshot {
337-
log.KvDistribution.Fatal(ctx, "expected the first message to be a snapshot")
337+
log.KvExec.Fatal(ctx, "expected the first message to be a snapshot")
338338
}
339339
r.AddLogTag("remote", r.nodeID)
340340
ctx = r.AnnotateCtx(ctx)

pkg/kv/kvserver/closedts/sidetransport/sender.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -653,7 +653,7 @@ func (b *updatesBuf) Push(ctx context.Context, update *ctpb.Update) {
653653
if b.sizeLocked() != 0 {
654654
lastIdx := b.lastIdxLocked()
655655
if prevSeq := b.mu.data[lastIdx].SeqNum; prevSeq != update.SeqNum-1 {
656-
log.KvDistribution.Fatalf(ctx, "bad sequence number; expected %d, got %d", prevSeq+1, update.SeqNum)
656+
log.KvExec.Fatalf(ctx, "bad sequence number; expected %d, got %d", prevSeq+1, update.SeqNum)
657657
}
658658
}
659659

@@ -762,7 +762,7 @@ func (b *updatesBuf) GetBySeq(ctx context.Context, seqNum ctpb.SeqNum) (*ctpb.Up
762762
continue
763763
}
764764
if seqNum > lastSeq+1 {
765-
log.KvDistribution.Fatalf(ctx, "skipping sequence numbers; requested: %d, last: %d", seqNum, lastSeq)
765+
log.KvExec.Fatalf(ctx, "skipping sequence numbers; requested: %d, last: %d", seqNum, lastSeq)
766766
}
767767
idx := (b.mu.head + (int)(seqNum-firstSeq)) % len(b.mu.data)
768768
return b.mu.data[idx], true
@@ -963,7 +963,7 @@ func (r *rpcConn) run(ctx context.Context, stopper *stop.Stopper) {
963963
}
964964
if err := r.maybeConnect(ctx, stopper); err != nil {
965965
if !errors.HasType(err, (*netutil.InitialHeartbeatFailedError)(nil)) && everyN.ShouldLog() {
966-
log.KvDistribution.Infof(ctx, "side-transport failed to connect to n%d: %s", r.nodeID, err)
966+
log.KvExec.Infof(ctx, "side-transport failed to connect to n%d: %s", r.nodeID, err)
967967
}
968968
time.Sleep(errSleepTime)
969969
continue
@@ -993,7 +993,7 @@ func (r *rpcConn) run(ctx context.Context, stopper *stop.Stopper) {
993993
}
994994
if err := r.stream.Send(msg); err != nil {
995995
if err != io.EOF && everyN.ShouldLog() {
996-
log.KvDistribution.Warningf(ctx, "failed to send closed timestamp message %d to n%d: %s",
996+
log.KvExec.Warningf(ctx, "failed to send closed timestamp message %d to n%d: %s",
997997
r.lastSent, r.nodeID, err)
998998
}
999999
// Keep track of the fact that we need a new connection.

pkg/kv/kvserver/closedts/tracker/heap_tracker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (h *heapTracker) Track(ctx context.Context, ts hlc.Timestamp) RemovalToken
103103
func (h *heapTracker) Untrack(ctx context.Context, tok RemovalToken) {
104104
idx := tok.(heapToken).index
105105
if idx == -1 {
106-
log.KvDistribution.Fatalf(ctx, "attempting to untrack already-untracked item")
106+
log.KvExec.Fatalf(ctx, "attempting to untrack already-untracked item")
107107
}
108108
h.mu.Lock()
109109
defer h.mu.Unlock()

pkg/kv/kvserver/closedts/tracker/lockfree_tracker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func (t *lockfreeTracker) Untrack(ctx context.Context, tok RemovalToken) {
168168
// types.
169169
refcnt := b.refcnt.Add(-1)
170170
if refcnt < 0 {
171-
log.KvDistribution.Fatalf(ctx, "negative bucket refcount: %d", refcnt)
171+
log.KvExec.Fatalf(ctx, "negative bucket refcount: %d", refcnt)
172172
}
173173
if refcnt == 0 {
174174
// Reset the bucket, so that future Track() calls can create a new one.

pkg/kv/kvserver/closedts/tracker/tracker_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func TestLockfreeTrackerRandomStress(t *testing.T) {
167167
// hours of stressrace on GCE worker (and it failed once on CI stress too).
168168
// Figure out why.
169169
maxToleratedErrorMillis := 3 * c.maxEvaluationTime.Milliseconds()
170-
log.KvDistribution.Infof(ctx, "maximum lower bound error: %dms. maximum request evaluation time: %s",
170+
log.KvExec.Infof(ctx, "maximum lower bound error: %dms. maximum request evaluation time: %s",
171171
maxOvershotMillis, c.maxEvaluationTime)
172172
require.Lessf(t, maxOvershotMillis, maxToleratedErrorMillis,
173173
"maximum tracker lowerbound error was %dms, above maximum tolerated %dms",
@@ -446,7 +446,7 @@ func (c *trackerChecker) run(ctx context.Context) error {
446446
if c.maxOvershotNanos < overshotNanos {
447447
c.maxOvershotNanos = overshotNanos
448448
}
449-
log.KvDistribution.VInfof(ctx, 1, "lower bound error: %dms", overshotNanos/1000000)
449+
log.KvExec.VInfof(ctx, 1, "lower bound error: %dms", overshotNanos/1000000)
450450
}
451451
}
452452

@@ -517,7 +517,7 @@ func benchmarkTracker(ctx context.Context, b *testing.B, t Tracker) {
517517
toks[i] = toks[i][:0]
518518
}
519519
mu.Unlock()
520-
log.KvDistribution.VInfof(ctx, 1, "cleared %d reqs", n)
520+
log.KvExec.VInfof(ctx, 1, "cleared %d reqs", n)
521521
}
522522
}()
523523

pkg/kv/kvserver/replica_closedts.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ func (st *sidetransportAccess) forward(
293293

294294
func (st *sidetransportAccess) assertNoRegression(ctx context.Context, cur, up closedTimestamp) {
295295
if cur.regression(up) {
296-
log.KvDistribution.Fatalf(ctx, "side-transport update saw closed timestamp regression on r%d: "+
296+
log.KvExec.Fatalf(ctx, "side-transport update saw closed timestamp regression on r%d: "+
297297
"(lai=%d, ts=%s) -> (lai=%d, ts=%s)", st.rangeID, cur.lai, cur.ts, up.lai, up.ts)
298298
}
299299
}

0 commit comments

Comments
 (0)