Skip to content

Commit 2872602

Browse files
committed
kvserver: assign some Dev logs to KvDistribution
``` sed -i '' 's/log\.Dev/log.KvDistribution/g' consistency_queue.go flow_control_stores.go kv_snapshot_strategy.go merge_queue.go mma_store_rebalancer.go mvcc_gc_queue.go queue.go queue_helpers_testutil.go rebalance_objective.go replica_closedts.go replica_destroy.go replica_gc_queue.go replica_gossip.go replica_proposal_quota.go replica_rangefeed.go replicate_queue.go scanner.go snapshot_apply_prepare.go store_gossip.go store_init.go store_merge.go store_remove_replica.go store_snapshot.go ```
1 parent ef0a1b5 commit 2872602

16 files changed

+63
-63
lines changed

pkg/kv/kvserver/flow_control_stores.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (sh *storesForFlowControl) LookupReplicationAdmissionHandle(
4545
return nil
4646
}); err != nil {
4747
ctx := ls.AnnotateCtx(context.Background())
48-
log.Dev.Errorf(ctx, "unexpected error: %s", err)
48+
log.KvDistribution.Errorf(ctx, "unexpected error: %s", err)
4949
return nil, false
5050
}
5151
return handle, found
@@ -151,7 +151,7 @@ func (ss *storesForRACv2) ScheduleAdmittedResponseForRangeRACv2(
151151
for _, m := range msgs {
152152
s, err := ls.GetStore(m.ToStoreID)
153153
if err != nil {
154-
log.Dev.Errorf(ctx, "store %s not found", m.ToStoreID)
154+
log.KvDistribution.Errorf(ctx, "store %s not found", m.ToStoreID)
155155
continue
156156
}
157157
repl := s.GetReplicaIfExists(m.RangeID)
@@ -179,7 +179,7 @@ func (ss *storesForRACv2) LookupInspect(
179179
}
180180
return nil
181181
}); err != nil {
182-
log.Dev.Errorf(ls.AnnotateCtx(context.Background()),
182+
log.KvDistribution.Errorf(ls.AnnotateCtx(context.Background()),
183183
"unexpected error iterating stores: %s", err)
184184
}
185185
return handle, found
@@ -196,7 +196,7 @@ func (ss *storesForRACv2) Inspect() []roachpb.RangeID {
196196
})
197197
return nil
198198
}); err != nil {
199-
log.Dev.Errorf(ls.AnnotateCtx(context.Background()),
199+
log.KvDistribution.Errorf(ls.AnnotateCtx(context.Background()),
200200
"unexpected error iterating stores: %s", err)
201201
}
202202
return rangeIDs

pkg/kv/kvserver/kv_snapshot_strategy.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
136136

137137
snapshotQ := s.cfg.KVAdmissionController.GetSnapshotQueue(s.StoreID())
138138
if snapshotQ == nil {
139-
log.Dev.Errorf(ctx, "unable to find snapshot queue for store: %s", s.StoreID())
139+
log.KvDistribution.Errorf(ctx, "unable to find snapshot queue for store: %s", s.StoreID())
140140
}
141141
// Using a nil pacer is effectively a noop if snapshot control is disabled.
142142
var pacer *admission.SnapshotPacer = nil
@@ -602,7 +602,7 @@ func (kvSS *kvBatchSnapshotStrategy) Close(ctx context.Context) {
602602
// disk space (which is reclaimed on node restart). It is unexpected
603603
// though, so log a warning.
604604
if err := kvSS.scratch.Close(); err != nil {
605-
log.Dev.Warningf(ctx, "error closing kvBatchSnapshotStrategy: %v", err)
605+
log.KvDistribution.Warningf(ctx, "error closing kvBatchSnapshotStrategy: %v", err)
606606
}
607607
}
608608
}

pkg/kv/kvserver/mma_store_rebalancer.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func newMMAStoreRebalancer(
6969
func (m *mmaStoreRebalancer) run(ctx context.Context, stopper *stop.Stopper) {
7070
timer := time.NewTicker(jitteredInterval(allocator.LoadBasedRebalanceInterval.Get(&m.st.SV)))
7171
defer timer.Stop()
72-
log.Dev.Infof(ctx, "starting multi-metric store rebalancer with mode=%v", kvserverbase.LoadBasedRebalancingMode.Get(&m.st.SV))
72+
log.KvDistribution.Infof(ctx, "starting multi-metric store rebalancer with mode=%v", kvserverbase.LoadBasedRebalancingMode.Get(&m.st.SV))
7373

7474
for {
7575
select {
@@ -129,7 +129,7 @@ func (m *mmaStoreRebalancer) rebalance(ctx context.Context) bool {
129129
knownStoresByMMA := m.mma.KnownStores()
130130
storeLeaseholderMsg, numIgnoredRanges := m.store.MakeStoreLeaseholderMsg(ctx, knownStoresByMMA)
131131
if numIgnoredRanges > 0 {
132-
log.Dev.Infof(ctx, "mma rebalancer: ignored %d ranges since the allocator does not know all stores",
132+
log.KvDistribution.Infof(ctx, "mma rebalancer: ignored %d ranges since the allocator does not know all stores",
133133
numIgnoredRanges)
134134
}
135135

@@ -140,7 +140,7 @@ func (m *mmaStoreRebalancer) rebalance(ctx context.Context) bool {
140140
// TODO(wenyihu6): add allocator sync and post apply here
141141
for _, change := range changes {
142142
if err := m.applyChange(ctx, change); err != nil {
143-
log.Dev.VInfof(ctx, 1, "failed to apply change for range %d: %v", change.RangeID, err)
143+
log.KvDistribution.VInfof(ctx, 1, "failed to apply change for range %d: %v", change.RangeID, err)
144144
}
145145
}
146146

pkg/kv/kvserver/rebalance_objective.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ func (rom *RebalanceObjectiveManager) maybeUpdateRebalanceObjective(ctx context.
243243
return
244244
}
245245

246-
log.Dev.Infof(ctx, "Updating the rebalance objective from %s to %s",
246+
log.KvDistribution.Infof(ctx, "Updating the rebalance objective from %s to %s",
247247
prev, next)
248248

249249
rom.mu.obj = next
@@ -264,7 +264,7 @@ func ResolveLBRebalancingObjective(
264264
// When the cpu timekeeping utility is unsupported on this aarch, the cpu
265265
// usage cannot be gathered. Fall back to QPS balancing.
266266
if !grunning.Supported {
267-
log.Dev.Infof(ctx, "cpu timekeeping unavailable on host, reverting to qps balance objective")
267+
log.KvDistribution.Infof(ctx, "cpu timekeeping unavailable on host, reverting to qps balance objective")
268268
return LBRebalancingQueries
269269
}
270270

@@ -275,7 +275,7 @@ func ResolveLBRebalancingObjective(
275275
// disallows any other store using the cpu balancing objective.
276276
for _, desc := range descs {
277277
if desc.Capacity.CPUPerSecond == -1 {
278-
log.Dev.Warningf(ctx,
278+
log.KvDistribution.Warningf(ctx,
279279
"cpu timekeeping unavailable on node %d but available locally, reverting to qps balance objective",
280280
desc.Node.NodeID)
281281
return LBRebalancingQueries

pkg/kv/kvserver/replica_closedts.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ func (st *sidetransportAccess) forward(
292292

293293
func (st *sidetransportAccess) assertNoRegression(ctx context.Context, cur, up closedTimestamp) {
294294
if cur.regression(up) {
295-
log.Dev.Fatalf(ctx, "side-transport update saw closed timestamp regression on r%d: "+
295+
log.KvDistribution.Fatalf(ctx, "side-transport update saw closed timestamp regression on r%d: "+
296296
"(lai=%d, ts=%s) -> (lai=%d, ts=%s)", st.rangeID, cur.lai, cur.ts, up.lai, up.ts)
297297
}
298298
}

pkg/kv/kvserver/replica_destroy.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,13 @@ func (r *Replica) destroyRaftMuLocked(ctx context.Context, nextReplicaID roachpb
130130
return err
131131
}
132132
if r.IsInitialized() {
133-
log.Dev.Infof(ctx, "removed %d (%d+%d) keys in %0.0fms [clear=%0.0fms commit=%0.0fms]",
133+
log.KvDistribution.Infof(ctx, "removed %d (%d+%d) keys in %0.0fms [clear=%0.0fms commit=%0.0fms]",
134134
ms.KeyCount+ms.SysCount, ms.KeyCount, ms.SysCount,
135135
commitTime.Sub(startTime).Seconds()*1000,
136136
preTime.Sub(startTime).Seconds()*1000,
137137
commitTime.Sub(preTime).Seconds()*1000)
138138
} else {
139-
log.Dev.Infof(ctx, "removed uninitialized range in %0.0fms [clear=%0.0fms commit=%0.0fms]",
139+
log.KvDistribution.Infof(ctx, "removed uninitialized range in %0.0fms [clear=%0.0fms commit=%0.0fms]",
140140
commitTime.Sub(startTime).Seconds()*1000,
141141
preTime.Sub(startTime).Seconds()*1000,
142142
commitTime.Sub(preTime).Seconds()*1000)
@@ -168,7 +168,7 @@ func (r *Replica) disconnectReplicationRaftMuLocked(ctx context.Context) {
168168
}
169169

170170
if !r.shMu.destroyStatus.Removed() {
171-
log.Dev.Fatalf(ctx, "removing raft group before destroying replica %s", r)
171+
log.KvDistribution.Fatalf(ctx, "removing raft group before destroying replica %s", r)
172172
}
173173
r.mu.internalRaftGroup = nil
174174
r.mu.raftTracer.Close()

pkg/kv/kvserver/replica_gossip.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,20 +35,20 @@ func (r *Replica) gossipFirstRangeLocked(ctx context.Context) {
3535
}
3636
log.Event(ctx, "gossiping sentinel and first range")
3737
if log.V(1) {
38-
log.Dev.Infof(ctx, "gossiping sentinel from store %d, r%d", r.store.StoreID(), r.RangeID)
38+
log.KvDistribution.Infof(ctx, "gossiping sentinel from store %d, r%d", r.store.StoreID(), r.RangeID)
3939
}
4040
if err := r.store.Gossip().AddInfo(
4141
gossip.KeySentinel, r.store.ClusterID().GetBytes(),
4242
r.store.cfg.SentinelGossipTTL()); err != nil {
43-
log.Dev.Errorf(ctx, "failed to gossip sentinel: %+v", err)
43+
log.KvDistribution.Errorf(ctx, "failed to gossip sentinel: %+v", err)
4444
}
4545
if log.V(1) {
46-
log.Dev.Infof(ctx, "gossiping first range from store %d, r%d: %s",
46+
log.KvDistribution.Infof(ctx, "gossiping first range from store %d, r%d: %s",
4747
r.store.StoreID(), r.RangeID, r.shMu.state.Desc.Replicas())
4848
}
4949
if err := r.store.Gossip().AddInfoProto(
5050
gossip.KeyFirstRangeDescriptor, r.shMu.state.Desc, configGossipTTL); err != nil {
51-
log.Dev.Errorf(ctx, "failed to gossip first range metadata: %+v", err)
51+
log.KvDistribution.Errorf(ctx, "failed to gossip first range metadata: %+v", err)
5252
}
5353
}
5454

@@ -176,7 +176,7 @@ func (r *Replica) getLeaseForGossip(ctx context.Context) (bool, *kvpb.Error) {
176176
}
177177
default:
178178
// Any other error is worth being logged visibly.
179-
log.Dev.Warningf(ctx, "could not acquire lease for range gossip: %s", pErr)
179+
log.KvDistribution.Warningf(ctx, "could not acquire lease for range gossip: %s", pErr)
180180
}
181181
}
182182
}); err != nil {
@@ -198,7 +198,7 @@ func (r *Replica) maybeGossipFirstRange(ctx context.Context) *kvpb.Error {
198198
// so we error out below.
199199
if gossipClusterID, err := r.store.Gossip().GetClusterID(); err == nil {
200200
if gossipClusterID != r.store.ClusterID() {
201-
log.Dev.Fatalf(
201+
log.KvDistribution.Fatalf(
202202
ctx, "store %d belongs to cluster %s, but attempted to join cluster %s via gossip",
203203
r.store.StoreID(), r.store.ClusterID(), gossipClusterID)
204204
}
@@ -207,11 +207,11 @@ func (r *Replica) maybeGossipFirstRange(ctx context.Context) *kvpb.Error {
207207
// Gossip the cluster ID from all replicas of the first range; there
208208
// is no expiration on the cluster ID.
209209
if log.V(1) {
210-
log.Dev.Infof(ctx, "gossiping cluster ID %q from store %d, r%d", r.store.ClusterID(),
210+
log.KvDistribution.Infof(ctx, "gossiping cluster ID %q from store %d, r%d", r.store.ClusterID(),
211211
r.store.StoreID(), r.RangeID)
212212
}
213213
if err := r.store.Gossip().AddClusterID(r.store.ClusterID()); err != nil {
214-
log.Dev.Errorf(ctx, "failed to gossip cluster ID: %+v", err)
214+
log.KvDistribution.Errorf(ctx, "failed to gossip cluster ID: %+v", err)
215215
}
216216

217217
hasLease, pErr := r.getLeaseForGossip(ctx)

pkg/kv/kvserver/replica_proposal_quota.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,10 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
213213
// and the quotaReleaseQueue together track status.Applied exactly.
214214
r.mu.proposalQuotaBaseIndex = kvpb.RaftIndex(status.Applied)
215215
if r.mu.proposalQuota != nil {
216-
log.Dev.Fatal(ctx, "proposalQuota was not nil before becoming the leader")
216+
log.KvDistribution.Fatal(ctx, "proposalQuota was not nil before becoming the leader")
217217
}
218218
if releaseQueueLen := len(r.mu.quotaReleaseQueue); releaseQueueLen != 0 {
219-
log.Dev.Fatalf(ctx, "len(r.mu.quotaReleaseQueue) = %d, expected 0", releaseQueueLen)
219+
log.KvDistribution.Fatalf(ctx, "len(r.mu.quotaReleaseQueue) = %d, expected 0", releaseQueueLen)
220220
}
221221
r.mu.proposalQuota = quotapool.NewIntPool(
222222
"raft proposal",
@@ -324,7 +324,7 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
324324
// index.
325325
releasableIndex := r.mu.proposalQuotaBaseIndex + kvpb.RaftIndex(len(r.mu.quotaReleaseQueue))
326326
if releasableIndex != kvpb.RaftIndex(status.Applied) {
327-
log.Dev.Fatalf(ctx, "proposalQuotaBaseIndex (%d) + quotaReleaseQueueLen (%d) = %d"+
327+
log.KvDistribution.Fatalf(ctx, "proposalQuotaBaseIndex (%d) + quotaReleaseQueueLen (%d) = %d"+
328328
" must equal the applied index (%d)",
329329
r.mu.proposalQuotaBaseIndex, len(r.mu.quotaReleaseQueue), releasableIndex,
330330
status.Applied)

pkg/kv/kvserver/replica_rangefeed.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ func logSlowRangefeedRegistration(ctx context.Context) func() {
431431
return func() {
432432
elapsed := timeutil.Since(start)
433433
if elapsed >= slowRaftMuWarnThreshold {
434-
log.Dev.Warningf(ctx, "rangefeed registration took %s", elapsed)
434+
log.KvDistribution.Warningf(ctx, "rangefeed registration took %s", elapsed)
435435
}
436436
}
437437
}
@@ -889,9 +889,9 @@ func (r *Replica) handleClosedTimestampUpdateRaftMuLocked(
889889
expensiveLog := m.RangeFeedSlowClosedTimestampLogN.ShouldLog()
890890
if expensiveLog {
891891
if closedTS.IsEmpty() {
892-
log.Dev.Infof(ctx, "RangeFeed closed timestamp is empty")
892+
log.KvDistribution.Infof(ctx, "RangeFeed closed timestamp is empty")
893893
} else {
894-
log.Dev.Infof(ctx, "RangeFeed closed timestamp %s is behind by %s (%v)",
894+
log.KvDistribution.Infof(ctx, "RangeFeed closed timestamp %s is behind by %s (%v)",
895895
closedTS, signal.lag, signal)
896896
}
897897
}
@@ -919,7 +919,7 @@ func (r *Replica) handleClosedTimestampUpdateRaftMuLocked(
919919
}
920920
defer func() { <-m.RangeFeedSlowClosedTimestampNudgeSem }()
921921
if err := r.ensureClosedTimestampStarted(ctx); err != nil {
922-
log.Dev.Infof(ctx, `RangeFeed failed to nudge: %s`, err)
922+
log.KvDistribution.Infof(ctx, `RangeFeed failed to nudge: %s`, err)
923923
} else if signal.exceedsCancelLagThreshold {
924924
// We have successfully nudged the leaseholder to make progress on
925925
// the closed timestamp. If the lag was already persistently too
@@ -933,7 +933,7 @@ func (r *Replica) handleClosedTimestampUpdateRaftMuLocked(
933933
// prohibit us from cancelling the rangefeed in the current version,
934934
// due to mixed version compatibility.
935935
if expensiveLog {
936-
log.Dev.Infof(ctx,
936+
log.KvDistribution.Infof(ctx,
937937
`RangeFeed is too far behind, cancelling for replanning [%v]`, signal)
938938
}
939939
r.disconnectRangefeedWithReason(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED)

pkg/kv/kvserver/scanner.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -213,20 +213,20 @@ func (rs *replicaScanner) waitAndProcess(ctx context.Context, start time.Time, r
213213
waitInterval := rs.paceInterval(start, timeutil.Now())
214214
rs.waitTimer.Reset(waitInterval)
215215
if log.V(6) {
216-
log.Dev.Infof(ctx, "wait timer interval set to %s", waitInterval)
216+
log.KvDistribution.Infof(ctx, "wait timer interval set to %s", waitInterval)
217217
}
218218
for {
219219
select {
220220
case <-rs.waitTimer.C:
221221
if log.V(6) {
222-
log.Dev.Infof(ctx, "wait timer fired")
222+
log.KvDistribution.Infof(ctx, "wait timer fired")
223223
}
224224
if repl == nil {
225225
return false
226226
}
227227

228228
if log.V(2) {
229-
log.Dev.Infof(ctx, "replica scanner processing %s", repl)
229+
log.KvDistribution.Infof(ctx, "replica scanner processing %s", repl)
230230
}
231231
for _, q := range rs.queues {
232232
q.MaybeAddAsync(ctx, repl, rs.clock.NowAsClockTimestamp())
@@ -251,7 +251,7 @@ func (rs *replicaScanner) removeReplica(repl *Replica) {
251251
}
252252
if log.V(6) {
253253
ctx := rs.AnnotateCtx(context.TODO())
254-
log.Dev.Infof(ctx, "removed replica %s", repl)
254+
log.KvDistribution.Infof(ctx, "removed replica %s", repl)
255255
}
256256
}
257257

@@ -299,7 +299,7 @@ func (rs *replicaScanner) scanLoop() {
299299
rs.mu.total += timeutil.Since(start)
300300
}()
301301
if log.V(6) {
302-
log.Dev.Infof(ctx, "reset replica scan iteration")
302+
log.KvDistribution.Infof(ctx, "reset replica scan iteration")
303303
}
304304

305305
// Reset iteration and start time.

0 commit comments

Comments
 (0)