Skip to content

Commit ef0a1b5

Browse files
committed
kvserver: assign some Dev logs to KvExec
``` sed -i '' 's/log\.Dev/log.KvExec/g' app_batch.go metric_rules.go metrics.go raft.go raft_log_truncator.go raft_transport.go range_log.go replica_app_batch.go replica_application_cmd.go replica_application_decoder.go replica_application_result.go replica_application_state_machine.go replica_backpressure.go replica_batch_updates.go replica_circuit_breaker.go replica_corruption.go replica_eval_context.go replica_evaluate.go replica_proposal.go replica_proposal_buf.go replica_raft.go replica_raft_quiesce.go replica_raftlog.go replica_raftstorage.go replica_range_lease.go replica_read.go replica_send.go replica_tscache.go replica_write.go scheduler.go split_trigger_helper.go store_create_replica.go store_raft.go store_send.go store_split.go stores_server.go ```
1 parent 3476899 commit ef0a1b5

36 files changed

+260
-260
lines changed

pkg/kv/kvserver/app_batch.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func (b *appBatch) assertAndCheckCommand(
8484
ctx context.Context, cmd *raftlog.ReplicatedCmd, state *kvserverpb.ReplicaState, isLocal bool,
8585
) (kvserverbase.ForcedErrResult, error) {
8686
if log.V(4) {
87-
log.Dev.Infof(ctx, "processing command %x: raftIndex=%d maxLeaseIndex=%d closedts=%s",
87+
log.KvExec.Infof(ctx, "processing command %x: raftIndex=%d maxLeaseIndex=%d closedts=%s",
8888
cmd.ID, cmd.Index(), cmd.Cmd.MaxLeaseIndex, cmd.Cmd.ClosedTimestamp)
8989
}
9090

@@ -138,7 +138,7 @@ func (b *appBatch) addWriteBatch(
138138
return nil
139139
}
140140
if mutations, err := storage.BatchCount(wb.Data); err != nil {
141-
log.Dev.Errorf(ctx, "unable to read header of committed WriteBatch: %+v", err)
141+
log.KvExec.Errorf(ctx, "unable to read header of committed WriteBatch: %+v", err)
142142
} else {
143143
b.numMutations += mutations
144144
}

pkg/kv/kvserver/metric_rules.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,10 +291,10 @@ func MaybeAddRuleToRegistry(
291291
ctx context.Context, err error, name string, rule metric.Rule, ruleRegistry *metric.RuleRegistry,
292292
) {
293293
if err != nil {
294-
log.Dev.Warningf(ctx, "unable to create kv rule %s: %s", name, err.Error())
294+
log.KvExec.Warningf(ctx, "unable to create kv rule %s: %s", name, err.Error())
295295
}
296296
if ruleRegistry == nil {
297-
log.Dev.Warningf(ctx, "unable to add kv rule %s: rule registry uninitialized", name)
297+
log.KvExec.Warningf(ctx, "unable to add kv rule %s: rule registry uninitialized", name)
298298
}
299299
ruleRegistry.AddRule(rule)
300300
}

pkg/kv/kvserver/metrics.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3515,11 +3515,11 @@ func (sm *TenantsStorageMetrics) releaseTenant(ctx context.Context, m *tenantSto
35153515
m.mu.Lock()
35163516
defer m.mu.Unlock()
35173517
if m.mu.released.Load() {
3518-
log.Dev.FatalfDepth(ctx, 1, "tenant metrics already released in:\n%s", m.mu.stack)
3518+
log.KvExec.FatalfDepth(ctx, 1, "tenant metrics already released in:\n%s", m.mu.stack)
35193519
}
35203520
m.mu.refCount--
35213521
if n := m.mu.refCount; n < 0 {
3522-
log.Dev.Fatalf(ctx, "invalid refCount on metrics for tenant %v: %d", m.tenantID, n)
3522+
log.KvExec.Fatalf(ctx, "invalid refCount on metrics for tenant %v: %d", m.tenantID, n)
35233523
} else if n > 0 {
35243524
return
35253525
}
@@ -3609,7 +3609,7 @@ func (tm *tenantStorageMetrics) assert(ctx context.Context) {
36093609
if tm.mu.released.Load() {
36103610
tm.mu.Lock()
36113611
defer tm.mu.Unlock()
3612-
log.Dev.Fatalf(ctx, "tenant metrics already released in:\n%s", tm.mu.stack)
3612+
log.KvExec.Fatalf(ctx, "tenant metrics already released in:\n%s", tm.mu.stack)
36133613
}
36143614
}
36153615

@@ -4461,7 +4461,7 @@ func (sm *StoreMetrics) updateDiskStats(
44614461
sm.DiskIopsInProgress.Update(int64(cumulativeStats.InProgressCount))
44624462
} else {
44634463
// Don't update cumulative stats to the useless zero value.
4464-
log.Dev.Errorf(ctx, "not updating cumulative stats due to %s", cumulativeStatsErr)
4464+
log.KvExec.Errorf(ctx, "not updating cumulative stats due to %s", cumulativeStatsErr)
44654465
}
44664466
maxRollingStats := rollingStats.Max()
44674467
// maxRollingStats is computed as the change in stats every 100ms
@@ -4503,7 +4503,7 @@ func (sm *StoreMetrics) handleMetricsResult(ctx context.Context, metric result.M
45034503
metric.SplitEstimatedTotalBytesDiff = 0
45044504

45054505
if metric != (result.Metrics{}) {
4506-
log.Dev.Fatalf(ctx, "unhandled fields in metrics result: %+v", metric)
4506+
log.KvExec.Fatalf(ctx, "unhandled fields in metrics result: %+v", metric)
45074507
}
45084508
}
45094509

pkg/kv/kvserver/raft.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,62 +56,62 @@ type raftLogger struct {
5656

5757
func (r *raftLogger) Debug(v ...interface{}) {
5858
if log.V(3) {
59-
log.Dev.InfofDepth(r.ctx, 1, "", v...)
59+
log.KvExec.InfofDepth(r.ctx, 1, "", v...)
6060
}
6161
}
6262

6363
func (r *raftLogger) Debugf(format string, v ...interface{}) {
6464
if log.V(3) {
65-
log.Dev.InfofDepth(r.ctx, 1, format, v...)
65+
log.KvExec.InfofDepth(r.ctx, 1, format, v...)
6666
}
6767
}
6868

6969
func (r *raftLogger) Info(v ...interface{}) {
7070
if log.V(2) {
71-
log.Dev.InfofDepth(r.ctx, 1, "", v...)
71+
log.KvExec.InfofDepth(r.ctx, 1, "", v...)
7272
}
7373
}
7474

7575
func (r *raftLogger) Infof(format string, v ...interface{}) {
7676
if log.V(2) {
77-
log.Dev.InfofDepth(r.ctx, 1, format, v...)
77+
log.KvExec.InfofDepth(r.ctx, 1, format, v...)
7878
}
7979
}
8080

8181
func (r *raftLogger) Warning(v ...interface{}) {
82-
log.Dev.WarningfDepth(r.ctx, 1, "", v...)
82+
log.KvExec.WarningfDepth(r.ctx, 1, "", v...)
8383
}
8484

8585
func (r *raftLogger) Warningf(format string, v ...interface{}) {
86-
log.Dev.WarningfDepth(r.ctx, 1, format, v...)
86+
log.KvExec.WarningfDepth(r.ctx, 1, format, v...)
8787
}
8888

8989
func (r *raftLogger) Error(v ...interface{}) {
90-
log.Dev.ErrorfDepth(r.ctx, 1, "", v...)
90+
log.KvExec.ErrorfDepth(r.ctx, 1, "", v...)
9191
}
9292

9393
func (r *raftLogger) Errorf(format string, v ...interface{}) {
94-
log.Dev.ErrorfDepth(r.ctx, 1, format, v...)
94+
log.KvExec.ErrorfDepth(r.ctx, 1, format, v...)
9595
}
9696

9797
func (r *raftLogger) Fatal(v ...interface{}) {
9898
wrapNumbersAsSafe(v)
99-
log.Dev.FatalfDepth(r.ctx, 1, "", v...)
99+
log.KvExec.FatalfDepth(r.ctx, 1, "", v...)
100100
}
101101

102102
func (r *raftLogger) Fatalf(format string, v ...interface{}) {
103103
wrapNumbersAsSafe(v)
104-
log.Dev.FatalfDepth(r.ctx, 1, format, v...)
104+
log.KvExec.FatalfDepth(r.ctx, 1, format, v...)
105105
}
106106

107107
func (r *raftLogger) Panic(v ...interface{}) {
108108
wrapNumbersAsSafe(v)
109-
log.Dev.FatalfDepth(r.ctx, 1, "", v...)
109+
log.KvExec.FatalfDepth(r.ctx, 1, "", v...)
110110
}
111111

112112
func (r *raftLogger) Panicf(format string, v ...interface{}) {
113113
wrapNumbersAsSafe(v)
114-
log.Dev.FatalfDepth(r.ctx, 1, format, v...)
114+
log.KvExec.FatalfDepth(r.ctx, 1, format, v...)
115115
}
116116

117117
func wrapNumbersAsSafe(v ...interface{}) {
@@ -183,7 +183,7 @@ func logRaftReady(ctx context.Context, ready raft.Ready) {
183183
fmt.Fprintf(&buf, " Outgoing Message[%d]: %.200s\n",
184184
i, raft.DescribeMessage(m, raftEntryFormatter))
185185
}
186-
log.Dev.Infof(ctx, "raft ready\n%s", buf.String())
186+
log.KvExec.Infof(ctx, "raft ready\n%s", buf.String())
187187
}
188188

189189
func raftEntryFormatter(data []byte) string {

pkg/kv/kvserver/raft_log_truncator.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ func (t *raftLogTruncator) addPendingTruncation(
324324
After: alreadyTruncIndex, Last: pendingTrunc.Index,
325325
}); err != nil {
326326
// Log a loud error since we need to continue enqueuing the truncation.
327-
log.Dev.Errorf(ctx, "while computing size of sideloaded files to truncate: %+v", err)
327+
log.KvExec.Errorf(ctx, "while computing size of sideloaded files to truncate: %+v", err)
328328
pendingTrunc.isDeltaTrusted = false
329329
} else if entries != 0 {
330330
pendingTrunc.logDeltaBytes -= size
@@ -523,7 +523,7 @@ func (t *raftLogTruncator) tryEnactTruncations(
523523
stateLoader := r.getStateLoader()
524524
as, err := stateLoader.LoadRangeAppliedState(ctx, reader)
525525
if err != nil {
526-
log.Dev.Errorf(ctx, "error loading RangeAppliedState, dropping all pending log truncations: %s",
526+
log.KvExec.Errorf(ctx, "error loading RangeAppliedState, dropping all pending log truncations: %s",
527527
err)
528528
pendingTruncs.reset()
529529
return
@@ -551,7 +551,7 @@ func (t *raftLogTruncator) tryEnactTruncations(
551551
pendingTruncs.mu.truncs[enactIndex].RaftTruncatedState,
552552
stateLoader.StateLoader, batch,
553553
); err != nil {
554-
log.Dev.Errorf(ctx, "while attempting to truncate raft log: %+v", err)
554+
log.KvExec.Errorf(ctx, "while attempting to truncate raft log: %+v", err)
555555
pendingTruncs.reset()
556556
return
557557
}
@@ -570,7 +570,7 @@ func (t *raftLogTruncator) tryEnactTruncations(
570570
// so that the subsequent removals from the sideloaded storage are safe.
571571
sync := pendingTruncs.mu.truncs[enactIndex].hasSideloaded
572572
if err := batch.Commit(sync); err != nil {
573-
log.Dev.Fatalf(ctx, "while committing batch to truncate raft log: %+v", err)
573+
log.KvExec.Fatalf(ctx, "while committing batch to truncate raft log: %+v", err)
574574
return
575575
}
576576
r.finalizeTruncation(ctx)

pkg/kv/kvserver/raft_transport.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ func (t *RaftTransport) handleRaftRequest(
347347
incomingMessageHandler, ok := t.getIncomingRaftMessageHandler(req.ToReplica.StoreID)
348348
if !ok {
349349
if isV1 {
350-
log.Dev.Warningf(ctx, "unable to accept Raft message from %+v: no handler registered for %+v",
350+
log.KvExec.Warningf(ctx, "unable to accept Raft message from %+v: no handler registered for %+v",
351351
req.FromReplica, req.ToReplica)
352352
}
353353
// We don't return an error to the client. If this node restarted with fewer
@@ -516,7 +516,7 @@ func (t *RaftTransport) InternalDelegateRaftSnapshot(
516516
// Get the handler of the sender store.
517517
incomingMessageHandler, ok := t.getIncomingRaftMessageHandler(req.DelegatedSender.StoreID)
518518
if !ok {
519-
log.Dev.Warningf(
519+
log.KvExec.Warningf(
520520
ctx,
521521
"unable to accept Raft message: %+v: no handler registered for"+
522522
" the sender store"+" %+v",
@@ -559,7 +559,7 @@ func (t *RaftTransport) raftSnapshot(stream RPCMultiRaft_RaftSnapshotStream) err
559559
rmr := req.Header.RaftMessageRequest
560560
incomingMessageHandler, ok := t.getIncomingRaftMessageHandler(rmr.ToReplica.StoreID)
561561
if !ok {
562-
log.Dev.Warningf(ctx, "unable to accept Raft message from %+v: no handler registered for %+v",
562+
log.KvExec.Warningf(ctx, "unable to accept Raft message from %+v: no handler registered for %+v",
563563
rmr.FromReplica, rmr.ToReplica)
564564
return kvpb.NewStoreNotFoundError(rmr.ToReplica.StoreID)
565565
}
@@ -633,7 +633,7 @@ func (t *RaftTransport) processQueue(
633633
t.metrics.ReverseRcvd.Inc(1)
634634
incomingMessageHandler, ok := t.getIncomingRaftMessageHandler(resp.ToReplica.StoreID)
635635
if !ok {
636-
log.Dev.Warningf(ctx, "no handler found for store %s in response %s",
636+
log.KvExec.Warningf(ctx, "no handler found for store %s in response %s",
637637
resp.ToReplica.StoreID, resp)
638638
continue
639639
}
@@ -857,7 +857,7 @@ func (t *RaftTransport) SendAsync(
857857
return true
858858
default:
859859
if logRaftSendQueueFullEvery.ShouldLog() {
860-
log.Dev.Warningf(t.AnnotateCtx(context.Background()), "raft send queue to n%d is full", toNodeID)
860+
log.KvExec.Warningf(t.AnnotateCtx(context.Background()), "raft send queue to n%d is full", toNodeID)
861861
}
862862
return false
863863
}
@@ -896,7 +896,7 @@ func (t *RaftTransport) startProcessNewQueue(
896896
worker := func(ctx context.Context) {
897897
q, existingQueue := t.getQueue(toNodeID, class)
898898
if !existingQueue {
899-
log.Dev.Fatalf(ctx, "queue for n%d does not exist", toNodeID)
899+
log.KvExec.Fatalf(ctx, "queue for n%d does not exist", toNodeID)
900900
}
901901
defer func() {
902902
if fn := t.knobs.OnWorkerTeardown; fn != nil {
@@ -916,7 +916,7 @@ func (t *RaftTransport) startProcessNewQueue(
916916
return
917917
}
918918
if err := t.processQueue(ctx, q, client, class); err != nil {
919-
log.Dev.Warningf(ctx, "while processing outgoing Raft queue to node %d: %s:", toNodeID, err)
919+
log.KvExec.Warningf(ctx, "while processing outgoing Raft queue to node %d: %s:", toNodeID, err)
920920
}
921921
}
922922
ctx, hdl, err := t.stopper.GetHandle(ctx, stop.TaskOpts{
@@ -1050,7 +1050,7 @@ func (t *RaftTransport) SendSnapshot(
10501050

10511051
defer func() {
10521052
if err := stream.CloseSend(); err != nil {
1053-
log.Dev.Warningf(ctx, "failed to close snapshot stream: %+v", err)
1053+
log.KvExec.Warningf(ctx, "failed to close snapshot stream: %+v", err)
10541054
}
10551055
}()
10561056
return sendSnapshot(ctx, clusterID, t.st, t.Tracer, stream, storePool, header, snap, newWriteBatch, sent, recordBytesSent)
@@ -1074,7 +1074,7 @@ func (t *RaftTransport) DelegateSnapshot(
10741074
}
10751075
defer func() {
10761076
if err := stream.CloseSend(); err != nil {
1077-
log.Dev.Warningf(ctx, "failed to close delegate snapshot stream: %+v", err)
1077+
log.KvExec.Warningf(ctx, "failed to close delegate snapshot stream: %+v", err)
10781078
}
10791079
}()
10801080

@@ -1094,7 +1094,7 @@ func (t *RaftTransport) DelegateSnapshot(
10941094
if len(resp.CollectedSpans) != 0 {
10951095
span := tracing.SpanFromContext(ctx)
10961096
if span == nil {
1097-
log.Dev.Warningf(ctx, "trying to ingest remote spans but there is no recording span set up")
1097+
log.KvExec.Warningf(ctx, "trying to ingest remote spans but there is no recording span set up")
10981098
} else {
10991099
span.ImportRemoteRecording(resp.CollectedSpans)
11001100
}

pkg/kv/kvserver/range_log.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func maybeLogRangeLogEvent(ctx context.Context, event kvserverpb.RangeLogEvent)
8282
if event.Info != nil {
8383
info = event.Info.String()
8484
}
85-
log.Dev.Infof(ctx, "Range Event: %q, range: %d, info: %s",
85+
log.KvExec.Infof(ctx, "Range Event: %q, range: %d, info: %s",
8686
event.EventType, event.RangeID, info)
8787
}
8888

@@ -246,13 +246,13 @@ func writeToRangeLogTable(
246246
if err := timeutil.RunWithTimeout(ctx, "rangelog-timeout", perAttemptTimeout, func(ctx context.Context) error {
247247
return s.cfg.RangeLogWriter.WriteRangeLogEvent(ctx, txn.DB(), logEvent)
248248
}); err != nil {
249-
log.Dev.Warningf(ctx, "error logging to system.rangelog: %v", err)
249+
log.KvExec.Warningf(ctx, "error logging to system.rangelog: %v", err)
250250
continue
251251
}
252252
break
253253
}
254254
}); err != nil {
255-
log.Dev.Warningf(asyncCtx, "async task error while logging to system.rangelog: %v", err)
255+
log.KvExec.Warningf(asyncCtx, "async task error while logging to system.rangelog: %v", err)
256256
stopCancel()
257257
}
258258
}

pkg/kv/kvserver/replica_app_batch.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ func (b *replicaAppBatch) runPostAddTriggersReplicaOnly(
474474
if ops := cmd.Cmd.LogicalOpLog; cmd.Cmd.WriteBatch != nil {
475475
b.r.handleLogicalOpLogRaftMuLocked(ctx, ops, b.batch)
476476
} else if ops != nil {
477-
log.Dev.Fatalf(ctx, "non-nil logical op log with nil write batch: %v", cmd.Cmd)
477+
log.KvExec.Fatalf(ctx, "non-nil logical op log with nil write batch: %v", cmd.Cmd)
478478
}
479479

480480
return nil
@@ -598,7 +598,7 @@ func (b *replicaAppBatch) stageTrivialReplicatedEvalResult(
598598
// application.
599599
func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {
600600
if log.V(4) {
601-
log.Dev.Infof(ctx, "flushing batch %v of %d entries", b.state, b.ab.numEntriesProcessed)
601+
log.KvExec.Infof(ctx, "flushing batch %v of %d entries", b.state, b.ab.numEntriesProcessed)
602602
}
603603

604604
// Add the replica applied state key to the write batch if this change

pkg/kv/kvserver/replica_application_cmd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func (c *replicatedCmd) AckOutcomeAndFinish(ctx context.Context) error {
158158
// command's proposal if it is local, it asserts that the proposal is not local.
159159
func (c *replicatedCmd) FinishNonLocal(ctx context.Context) {
160160
if c.IsLocal() {
161-
log.Dev.Fatalf(ctx, "proposal unexpectedly local: %v", c.ReplicatedResult())
161+
log.KvExec.Fatalf(ctx, "proposal unexpectedly local: %v", c.ReplicatedResult())
162162
}
163163
c.finishTracingSpan()
164164
}

pkg/kv/kvserver/replica_application_decoder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func (d *replicaDecoder) createTracingSpans(ctx context.Context) {
158158
Map: cmd.Cmd.TraceData,
159159
})
160160
if err != nil {
161-
log.Dev.Errorf(ctx, "unable to extract trace data from raft command: %s", err)
161+
log.KvExec.Errorf(ctx, "unable to extract trace data from raft command: %s", err)
162162
} else {
163163
cmd.ctx, cmd.sp = d.r.AmbientContext.Tracer.StartSpanCtx(
164164
ctx,

0 commit comments

Comments
 (0)