Skip to content

Commit 6c9feca

Browse files
committed
kvserver/rangefeed: move from log.KvDistribution to log.KvExec
1 parent a806cde commit 6c9feca

13 files changed

+57
-57
lines changed

pkg/kv/kvserver/rangefeed/budget.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ type SharedBudgetAllocation struct {
248248
func (a *SharedBudgetAllocation) Use(ctx context.Context) {
249249
if a != nil {
250250
if atomic.AddInt32(&a.refCount, 1) == 1 {
251-
log.KvDistribution.Fatalf(ctx, "unexpected shared memory allocation usage increase after free")
251+
log.KvExec.Fatalf(ctx, "unexpected shared memory allocation usage increase after free")
252252
}
253253
}
254254
}

pkg/kv/kvserver/rangefeed/buffered_registration.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ func (br *bufferedRegistration) outputLoop(ctx context.Context) error {
196196
// If the registration has a catch-up scan, run it.
197197
if err := br.maybeRunCatchUpScan(ctx); err != nil {
198198
err = errors.Wrap(err, "catch-up scan failed")
199-
log.KvDistribution.Errorf(ctx, "%v", err)
199+
log.KvExec.Errorf(ctx, "%v", err)
200200
return err
201201
}
202202

@@ -229,7 +229,7 @@ func (br *bufferedRegistration) outputLoop(ctx context.Context) error {
229229

230230
if overflowed {
231231
if wasOverflowedOnFirstIteration && br.shouldLogOverflow(oneCheckpointWithTimestampSent) {
232-
log.KvDistribution.Warningf(ctx, "rangefeed %s overflowed during catch up scan from %s (useful checkpoint sent: %v)",
232+
log.KvExec.Warningf(ctx, "rangefeed %s overflowed during catch up scan from %s (useful checkpoint sent: %v)",
233233
br.span, br.catchUpTimestamp, oneCheckpointWithTimestampSent)
234234
}
235235

pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,13 +255,13 @@ func setupData(
255255
absPath = loc
256256
}
257257
if exists {
258-
log.KvDistribution.Infof(ctx, "using existing refresh range benchmark data: %s", absPath)
258+
log.KvExec.Infof(ctx, "using existing refresh range benchmark data: %s", absPath)
259259
testutils.ReadAllFiles(filepath.Join(loc, "*"))
260260
return emk(b, loc, opts.lBaseMaxBytes, opts.rwMode), loc
261261
}
262262

263263
eng := emk(b, loc, opts.lBaseMaxBytes, fs.ReadWrite)
264-
log.KvDistribution.Infof(ctx, "creating rangefeed benchmark data: %s", absPath)
264+
log.KvExec.Infof(ctx, "creating rangefeed benchmark data: %s", absPath)
265265

266266
// Generate the same data every time.
267267
rng := rand.New(rand.NewSource(1449168817))
@@ -322,7 +322,7 @@ func setupData(
322322
// optimizations which change the data size result in the same number of
323323
// sstables.
324324
if scaled := len(order) / 20; i > 0 && (i%scaled) == 0 {
325-
log.KvDistribution.Infof(ctx, "committing (%d/~%d) (%d/%d)", i/scaled, 20, i, len(order))
325+
log.KvExec.Infof(ctx, "committing (%d/~%d) (%d/%d)", i/scaled, 20, i, len(order))
326326
if err := batch.Commit(false /* sync */); err != nil {
327327
b.Fatal(err)
328328
}

pkg/kv/kvserver/rangefeed/event_size.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ func opsCurrMemUsage(ops opsEvent) int64 {
205205
case *enginepb.MVCCAbortTxnOp:
206206
currMemUsage += abortTxnOpMemUsage(t.TxnID)
207207
default:
208-
log.KvDistribution.Fatalf(context.Background(), "unknown logical op %T", t)
208+
log.KvExec.Fatalf(context.Background(), "unknown logical op %T", t)
209209
}
210210
}
211211
// For each op, a checkpoint may or may not be published depending on whether
@@ -276,7 +276,7 @@ func MemUsage(e event) int64 {
276276
// For sync event, no rangefeed events will be published.
277277
return eventOverhead + syncEventOverhead
278278
default:
279-
log.KvDistribution.Fatalf(context.Background(), "missing event variant: %+v", e)
279+
log.KvExec.Fatalf(context.Background(), "missing event variant: %+v", e)
280280
}
281281
// For empty event, only eventOverhead is accounted.
282282
return eventOverhead

pkg/kv/kvserver/rangefeed/registry.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -191,37 +191,37 @@ func (r *baseRegistration) assertEvent(ctx context.Context, event *kvpb.RangeFee
191191
switch t := event.GetValue().(type) {
192192
case *kvpb.RangeFeedValue:
193193
if t.Key == nil {
194-
log.KvDistribution.Fatalf(ctx, "unexpected empty RangeFeedValue.Key: %v", t)
194+
log.KvExec.Fatalf(ctx, "unexpected empty RangeFeedValue.Key: %v", t)
195195
}
196196
if t.Value.RawBytes == nil {
197-
log.KvDistribution.Fatalf(ctx, "unexpected empty RangeFeedValue.Value.RawBytes: %v", t)
197+
log.KvExec.Fatalf(ctx, "unexpected empty RangeFeedValue.Value.RawBytes: %v", t)
198198
}
199199
if t.Value.Timestamp.IsEmpty() {
200-
log.KvDistribution.Fatalf(ctx, "unexpected empty RangeFeedValue.Value.Timestamp: %v", t)
200+
log.KvExec.Fatalf(ctx, "unexpected empty RangeFeedValue.Value.Timestamp: %v", t)
201201
}
202202
case *kvpb.RangeFeedCheckpoint:
203203
if t.Span.Key == nil {
204-
log.KvDistribution.Fatalf(ctx, "unexpected empty RangeFeedCheckpoint.Span.Key: %v", t)
204+
log.KvExec.Fatalf(ctx, "unexpected empty RangeFeedCheckpoint.Span.Key: %v", t)
205205
}
206206
case *kvpb.RangeFeedSSTable:
207207
if len(t.Data) == 0 {
208-
log.KvDistribution.Fatalf(ctx, "unexpected empty RangeFeedSSTable.Data: %v", t)
208+
log.KvExec.Fatalf(ctx, "unexpected empty RangeFeedSSTable.Data: %v", t)
209209
}
210210
if len(t.Span.Key) == 0 {
211-
log.KvDistribution.Fatalf(ctx, "unexpected empty RangeFeedSSTable.Span: %v", t)
211+
log.KvExec.Fatalf(ctx, "unexpected empty RangeFeedSSTable.Span: %v", t)
212212
}
213213
if t.WriteTS.IsEmpty() {
214-
log.KvDistribution.Fatalf(ctx, "unexpected empty RangeFeedSSTable.Timestamp: %v", t)
214+
log.KvExec.Fatalf(ctx, "unexpected empty RangeFeedSSTable.Timestamp: %v", t)
215215
}
216216
case *kvpb.RangeFeedDeleteRange:
217217
if len(t.Span.Key) == 0 || len(t.Span.EndKey) == 0 {
218-
log.KvDistribution.Fatalf(ctx, "unexpected empty key in RangeFeedDeleteRange.Span: %v", t)
218+
log.KvExec.Fatalf(ctx, "unexpected empty key in RangeFeedDeleteRange.Span: %v", t)
219219
}
220220
if t.Timestamp.IsEmpty() {
221-
log.KvDistribution.Fatalf(ctx, "unexpected empty RangeFeedDeleteRange.Timestamp: %v", t)
221+
log.KvExec.Fatalf(ctx, "unexpected empty RangeFeedDeleteRange.Timestamp: %v", t)
222222
}
223223
default:
224-
log.KvDistribution.Fatalf(ctx, "unexpected RangeFeedEvent variant: %v", t)
224+
log.KvExec.Fatalf(ctx, "unexpected RangeFeedEvent variant: %v", t)
225225
}
226226
}
227227

@@ -263,7 +263,7 @@ func (r *baseRegistration) maybeStripEvent(
263263
// observed all values up to the checkpoint timestamp over a given
264264
// key span if any updates to that span have been filtered out.
265265
if !t.Span.Contains(r.span) {
266-
log.KvDistribution.Fatalf(ctx, "registration span %v larger than checkpoint span %v", r.span, t.Span)
266+
log.KvExec.Fatalf(ctx, "registration span %v larger than checkpoint span %v", r.span, t.Span)
267267
}
268268
t = copyOnWrite().(*kvpb.RangeFeedCheckpoint)
269269
t.Span = r.span
@@ -278,7 +278,7 @@ func (r *baseRegistration) maybeStripEvent(
278278
// SSTs are always sent in their entirety, it is up to the caller to
279279
// filter out irrelevant entries.
280280
default:
281-
log.KvDistribution.Fatalf(ctx, "unexpected RangeFeedEvent variant: %v", t)
281+
log.KvExec.Fatalf(ctx, "unexpected RangeFeedEvent variant: %v", t)
282282
}
283283
return ret
284284
}
@@ -364,7 +364,7 @@ func (reg *registry) Register(ctx context.Context, r registration) {
364364
r.setID(reg.nextID())
365365
if err := reg.tree.Insert(r, false /* fast */); err != nil {
366366
// TODO(erikgrinaker): these errors should arguably be returned.
367-
log.KvDistribution.Fatalf(ctx, "%v", err)
367+
log.KvExec.Fatalf(ctx, "%v", err)
368368
}
369369
}
370370

@@ -400,7 +400,7 @@ func (reg *registry) PublishToOverlapping(
400400
// surprising. Revisit this once RangeFeed has more users.
401401
minTS = hlc.MaxTimestamp
402402
default:
403-
log.KvDistribution.Fatalf(ctx, "unexpected RangeFeedEvent variant: %v", t)
403+
log.KvExec.Fatalf(ctx, "unexpected RangeFeedEvent variant: %v", t)
404404
}
405405

406406
reg.forOverlappingRegs(ctx, span, func(r registration) (bool, *kvpb.Error) {
@@ -468,13 +468,13 @@ func (reg *registry) remove(ctx context.Context, toDelete []interval.Interface)
468468
} else if len(toDelete) == 1 {
469469
reg.updateMetricsOnUnregistration(toDelete[0].(registration))
470470
if err := reg.tree.Delete(toDelete[0], false /* fast */); err != nil {
471-
log.KvDistribution.Fatalf(ctx, "%v", err)
471+
log.KvExec.Fatalf(ctx, "%v", err)
472472
}
473473
} else if len(toDelete) > 1 {
474474
for _, i := range toDelete {
475475
reg.updateMetricsOnUnregistration(i.(registration))
476476
if err := reg.tree.Delete(i, true /* fast */); err != nil {
477-
log.KvDistribution.Fatalf(ctx, "%v", err)
477+
log.KvExec.Fatalf(ctx, "%v", err)
478478
}
479479
}
480480
reg.tree.AdjustRanges()

pkg/kv/kvserver/rangefeed/resolved_timestamp.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ func (rts *resolvedTimestamp) consumeLogicalOp(
224224
return rts.intentQ.Del(t.TxnID)
225225

226226
default:
227-
log.KvDistribution.Fatalf(ctx, "unknown logical op %T", t)
227+
log.KvExec.Fatalf(ctx, "unknown logical op %T", t)
228228
return false
229229
}
230230
}
@@ -237,7 +237,7 @@ func (rts *resolvedTimestamp) recompute(ctx context.Context) bool {
237237
return false
238238
}
239239
if rts.closedTS.Less(rts.resolvedTS) {
240-
log.KvDistribution.Fatalf(ctx, "closed timestamp below resolved timestamp: %s < %s",
240+
log.KvExec.Fatalf(ctx, "closed timestamp below resolved timestamp: %s < %s",
241241
rts.closedTS, rts.resolvedTS)
242242
}
243243
newTS := rts.closedTS
@@ -246,7 +246,7 @@ func (rts *resolvedTimestamp) recompute(ctx context.Context) bool {
246246
// timestamps cannot be resolved yet.
247247
if txn := rts.intentQ.Oldest(); txn != nil {
248248
if txn.timestamp.LessEq(rts.resolvedTS) {
249-
log.KvDistribution.Fatalf(ctx, "unresolved txn equal to or below resolved timestamp: %s <= %s",
249+
log.KvExec.Fatalf(ctx, "unresolved txn equal to or below resolved timestamp: %s <= %s",
250250
txn.timestamp, rts.resolvedTS)
251251
}
252252
// txn.timestamp cannot be resolved, so the resolved timestamp must be Prev.
@@ -258,7 +258,7 @@ func (rts *resolvedTimestamp) recompute(ctx context.Context) bool {
258258
newTS.Logical = 0
259259

260260
if newTS.Less(rts.resolvedTS) {
261-
log.KvDistribution.Fatalf(ctx, "resolved timestamp regression, was %s, recomputed as %s",
261+
log.KvExec.Fatalf(ctx, "resolved timestamp regression, was %s, recomputed as %s",
262262
rts.resolvedTS, newTS)
263263
}
264264
return rts.resolvedTS.Forward(newTS)
@@ -271,7 +271,7 @@ func (rts *resolvedTimestamp) assertNoChange(ctx context.Context) {
271271
before := rts.resolvedTS
272272
changed := rts.recompute(ctx)
273273
if changed || before != rts.resolvedTS {
274-
log.KvDistribution.Fatalf(ctx, "unexpected resolved timestamp change on recomputation, "+
274+
log.KvExec.Fatalf(ctx, "unexpected resolved timestamp change on recomputation, "+
275275
"was %s, recomputed as %s", before, rts.resolvedTS)
276276
}
277277
}
@@ -289,9 +289,9 @@ func (rts *resolvedTimestamp) assertOpAboveRTS(
289289
err := errors.AssertionFailedf(
290290
"resolved timestamp %s equal to or above timestamp of operation %v", rts.resolvedTS, &op)
291291
if fatal {
292-
log.KvDistribution.Fatalf(ctx, "%v", err)
292+
log.KvExec.Fatalf(ctx, "%v", err)
293293
} else {
294-
log.KvDistribution.Errorf(ctx, "%v", err)
294+
log.KvExec.Errorf(ctx, "%v", err)
295295
}
296296
}
297297
}

pkg/kv/kvserver/rangefeed/scheduled_processor.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ func (p *ScheduledProcessor) Register(
372372
return nil
373373
}
374374
if !p.Span.AsRawSpanWithNoLocals().Contains(r.Span()) {
375-
log.KvDistribution.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span)
375+
log.KvExec.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span)
376376
}
377377

378378
// Add the new registration to the registry.
@@ -638,7 +638,7 @@ func runRequest[T interface{}](
638638
if buildutil.CrdbTestBuild {
639639
select {
640640
case <-p.stoppedC:
641-
log.KvDistribution.Fatalf(ctx, "processing request on stopped processor")
641+
log.KvExec.Fatalf(ctx, "processing request on stopped processor")
642642
default:
643643
}
644644
}
@@ -702,7 +702,7 @@ func (p *ScheduledProcessor) consumeEvent(ctx context.Context, e *event) {
702702
case e.sync != nil:
703703
if e.sync.testRegCatchupSpan != nil {
704704
if err := p.reg.waitForCaughtUp(ctx, *e.sync.testRegCatchupSpan); err != nil {
705-
log.KvDistribution.Errorf(
705+
log.KvExec.Errorf(
706706
ctx,
707707
"error waiting for registries to catch up during test, results might be impacted: %s",
708708
err,
@@ -711,7 +711,7 @@ func (p *ScheduledProcessor) consumeEvent(ctx context.Context, e *event) {
711711
}
712712
close(e.sync.c)
713713
default:
714-
log.KvDistribution.Fatalf(ctx, "missing event variant: %+v", e)
714+
log.KvExec.Fatalf(ctx, "missing event variant: %+v", e)
715715
}
716716
}
717717

@@ -749,7 +749,7 @@ func (p *ScheduledProcessor) consumeLogicalOps(
749749
// No updates to publish.
750750

751751
default:
752-
log.KvDistribution.Fatalf(ctx, "unknown logical op %T", t)
752+
log.KvExec.Fatalf(ctx, "unknown logical op %T", t)
753753
}
754754

755755
// Determine whether the operation caused the resolved timestamp to
@@ -793,7 +793,7 @@ func (p *ScheduledProcessor) publishValue(
793793
alloc *SharedBudgetAllocation,
794794
) {
795795
if !p.Span.ContainsKey(roachpb.RKey(key)) {
796-
log.KvDistribution.Fatalf(ctx, "key %v not in Processor's key range %v", key, p.Span)
796+
log.KvExec.Fatalf(ctx, "key %v not in Processor's key range %v", key, p.Span)
797797
}
798798

799799
var prevVal roachpb.Value
@@ -820,7 +820,7 @@ func (p *ScheduledProcessor) publishDeleteRange(
820820
) {
821821
span := roachpb.Span{Key: startKey, EndKey: endKey}
822822
if !p.Span.ContainsKeyRange(roachpb.RKey(startKey), roachpb.RKey(endKey)) {
823-
log.KvDistribution.Fatalf(ctx, "span %s not in Processor's key range %v", span, p.Span)
823+
log.KvExec.Fatalf(ctx, "span %s not in Processor's key range %v", span, p.Span)
824824
}
825825

826826
var event kvpb.RangeFeedEvent
@@ -839,10 +839,10 @@ func (p *ScheduledProcessor) publishSSTable(
839839
alloc *SharedBudgetAllocation,
840840
) {
841841
if sstSpan.Equal(roachpb.Span{}) {
842-
log.KvDistribution.Fatalf(ctx, "received SSTable without span")
842+
log.KvExec.Fatalf(ctx, "received SSTable without span")
843843
}
844844
if sstWTS.IsEmpty() {
845-
log.KvDistribution.Fatalf(ctx, "received SSTable without write timestamp")
845+
log.KvExec.Fatalf(ctx, "received SSTable without write timestamp")
846846
}
847847
p.reg.PublishToOverlapping(ctx, sstSpan, &kvpb.RangeFeedEvent{
848848
SST: &kvpb.RangeFeedSSTable{

pkg/kv/kvserver/rangefeed/scheduler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -493,15 +493,15 @@ func (ss *schedulerShard) processEvents(ctx context.Context) {
493493

494494
if remaining != 0 && buildutil.CrdbTestBuild {
495495
if (remaining^procEventType)&remaining != 0 {
496-
log.KvDistribution.Fatalf(ctx,
496+
log.KvExec.Fatalf(ctx,
497497
"rangefeed processor attempted to reschedule event type %s that was not present in original event set %s",
498498
procEventType, remaining)
499499
}
500500
}
501501

502502
if e&Stopped != 0 {
503503
if remaining != 0 {
504-
log.KvDistribution.VWarningf(ctx, 5,
504+
log.KvExec.VWarningf(ctx, 5,
505505
"rangefeed processor %d didn't process all events on close", entry.id)
506506
}
507507
// We'll keep Stopped state to avoid calling stopped processor again

pkg/kv/kvserver/rangefeed/stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func (s *PerRangeEventSink) SendError(err *kvpb.Error) {
8686
Error: *transformRangefeedErrToClientError(err),
8787
})
8888
if ev.Error == nil {
89-
log.KvDistribution.Fatalf(context.Background(),
89+
log.KvExec.Fatalf(context.Background(),
9090
"unexpected: SendWithoutBlocking called with non-error event")
9191
}
9292
// Silence the error: expected to happen when the buffered sender is closed or

pkg/kv/kvserver/rangefeed/stream_manager.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func (sm *StreamManager) NewStream(streamID int64, rangeID roachpb.RangeID) (sin
108108
case *UnbufferedSender:
109109
return NewPerRangeEventSink(rangeID, streamID, sender)
110110
default:
111-
log.KvDistribution.Fatalf(context.Background(), "unexpected sender type %T", sm)
111+
log.KvExec.Fatalf(context.Background(), "unexpected sender type %T", sm)
112112
return nil
113113
}
114114
}
@@ -133,7 +133,7 @@ func (sm *StreamManager) OnError(streamID int64) {
133133
// DisconnectStream disconnects the stream with the given streamID.
134134
func (sm *StreamManager) DisconnectStream(streamID int64, err *kvpb.Error) {
135135
if err == nil {
136-
log.KvDistribution.Fatalf(context.Background(),
136+
log.KvExec.Fatalf(context.Background(),
137137
"unexpected: DisconnectStream called with nil error")
138138
return
139139
}
@@ -166,7 +166,7 @@ func (sm *StreamManager) AddStream(streamID int64, d Disconnector) {
166166
return
167167
}
168168
if _, ok := sm.streams.m[streamID]; ok {
169-
log.KvDistribution.Fatalf(context.Background(), "stream %d already exists", streamID)
169+
log.KvExec.Fatalf(context.Background(), "stream %d already exists", streamID)
170170
}
171171
sm.streams.m[streamID] = d
172172
sm.metrics.ActiveMuxRangeFeed.Inc(1)
@@ -209,7 +209,7 @@ func (sm *StreamManager) Stop(ctx context.Context) {
209209
sm.sender.cleanup(ctx)
210210
sm.streams.Lock()
211211
defer sm.streams.Unlock()
212-
log.KvDistribution.VInfof(ctx, 2, "stopping stream manager: disconnecting %d streams", len(sm.streams.m))
212+
log.KvExec.VInfof(ctx, 2, "stopping stream manager: disconnecting %d streams", len(sm.streams.m))
213213
rangefeedClosedErr := kvpb.NewError(
214214
kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED))
215215
sm.metrics.ActiveMuxRangeFeed.Dec(int64(len(sm.streams.m)))
@@ -228,7 +228,7 @@ func (sm *StreamManager) Stop(ctx context.Context) {
228228
// sender.run may also finish without sending anything to the channel.
229229
func (sm *StreamManager) Error() <-chan error {
230230
if sm.errCh == nil {
231-
log.KvDistribution.Fatalf(context.Background(), "StreamManager.Error called before StreamManager.Start")
231+
log.KvExec.Fatalf(context.Background(), "StreamManager.Error called before StreamManager.Start")
232232
}
233233
return sm.errCh
234234
}

0 commit comments

Comments
 (0)