Skip to content

Commit 3fa36dc

Browse files
craig[bot]KeithCh
andcommitted
Merge #153050
153050: changefeedccl: migrate CDC logs from DEV to CHANGEFEED logging channel r=dhartunian,andyyang890 a=KeithCh Replace all instances of log.Dev with log.Changefeed in the changefeedccl directory. Resolves: #153046 Release note: None Co-authored-by: Keith Chow <[email protected]>
2 parents 803a15e + 721e1c3 commit 3fa36dc

31 files changed

+188
-188
lines changed

pkg/ccl/changefeedccl/cdceval/expr_eval.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func (e *familyEvaluator) planAndRun(ctx context.Context) (err error) {
258258
if log.V(1) {
259259
start := timeutil.Now()
260260
defer func() {
261-
log.Dev.Infof(ctx, "Planning for CDC expression %s (v=%d) took %s (err=%v)",
261+
log.Changefeed.Infof(ctx, "Planning for CDC expression %s (v=%d) took %s (err=%v)",
262262
tree.AsString(e.norm), e.norm.desc.Version, timeutil.Since(start), err)
263263
}()
264264
}

pkg/ccl/changefeedccl/cdcevent/event.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -612,7 +612,7 @@ func (d *eventDecoder) DecodeKV(
612612
err = changefeedbase.WithTerminalError(errors.Wrapf(err,
613613
"error decoding key %s@%s (hex_kv: %x)",
614614
keys.PrettyPrint(nil, kv.Key), kv.Value.Timestamp, kvBytes))
615-
log.Dev.Errorf(ctx, "terminal error decoding KV: %v", err)
615+
log.Changefeed.Errorf(ctx, "terminal error decoding KV: %v", err)
616616
return Row{}, err
617617
}
618618

@@ -844,7 +844,7 @@ func MakeRowFromTuple(ctx context.Context, evalCtx *eval.Context, t *tree.DTuple
844844
r.AddValueColumn(name, d.ResolvedType())
845845
if err := r.SetValueDatumAt(i, d); err != nil {
846846
if build.IsRelease() {
847-
log.Dev.Warningf(ctx, "failed to set row value from tuple due to error %v", err)
847+
log.Changefeed.Warningf(ctx, "failed to set row value from tuple due to error %v", err)
848848
_ = r.SetValueDatumAt(i, tree.DNull)
849849
} else {
850850
panic(err)

pkg/ccl/changefeedccl/cdcevent/event_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -784,10 +784,10 @@ func TestEventColumnOrderingWithSchemaChanges(t *testing.T) {
784784
require.NoError(t, err)
785785

786786
expectedEvents := len(tc.expectMainFamily) + len(tc.expectECFamily)
787-
log.Dev.Infof(ctx, "expectedEvents: %d\n", expectedEvents)
787+
log.Changefeed.Infof(ctx, "expectedEvents: %d\n", expectedEvents)
788788
for i := 0; i < expectedEvents; i++ {
789789
v, deleteRange := popRow(t)
790-
log.Dev.Infof(ctx, "event[%d]: v=%+v, deleteRange=%+v", i, v, deleteRange)
790+
log.Changefeed.Infof(ctx, "event[%d]: v=%+v, deleteRange=%+v", i, v, deleteRange)
791791

792792
if deleteRange != nil {
793793
// Should not see a RangeFeedValue and a RangeFeedDeleteRange

pkg/ccl/changefeedccl/cdctest/nemeses.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -423,16 +423,16 @@ func RunNemesis(
423423
time := timeutil.Now()
424424
for i := 0; i < numInserts; i++ {
425425
query := queryGen.Generate()
426-
log.Dev.Infof(ctx, "Executing query: %s", query)
426+
log.Changefeed.Infof(ctx, "Executing query: %s", query)
427427
err := timeutil.RunWithTimeout(ctx, "nemeses populate table",
428428
insertTimeout, func(ctx context.Context) error {
429429
_, err := db.ExecContext(ctx, query)
430430
return err
431431
})
432-
log.Dev.Infof(ctx, "Time taken to execute last query: %s", timeutil.Since(time))
432+
log.Changefeed.Infof(ctx, "Time taken to execute last query: %s", timeutil.Since(time))
433433
time = timeutil.Now()
434434
if err != nil {
435-
log.Dev.Infof(ctx, "Skipping query %s because error %s", query, err)
435+
log.Changefeed.Infof(ctx, "Skipping query %s because error %s", query, err)
436436
continue
437437
}
438438
}
@@ -454,18 +454,18 @@ func RunNemesis(
454454
var id int
455455
var ts string
456456
if err := rows.Scan(&id, &ts); err != nil {
457-
log.Dev.Infof(ctx, "# skipping row because error: %s", err)
457+
log.Changefeed.Infof(ctx, "# skipping row because error: %s", err)
458458
continue
459459
}
460-
log.Dev.Infof(ctx, "INSERT INTO foo (id,ts) VALUES (%d, %s);", id, ts)
460+
log.Changefeed.Infof(ctx, "INSERT INTO foo (id,ts) VALUES (%d, %s);", id, ts)
461461
}
462462

463463
cfo := newChangefeedOption(testName)
464464
changefeedStatement := fmt.Sprintf(
465465
`CREATE CHANGEFEED FOR foo %s`,
466466
cfo.OptionString(),
467467
)
468-
log.Dev.Infof(ctx, "Using changefeed options: %s", changefeedStatement)
468+
log.Changefeed.Infof(ctx, "Using changefeed options: %s", changefeedStatement)
469469
foo, err := f.Feed(changefeedStatement)
470470
if err != nil {
471471
return nil, err
@@ -922,7 +922,7 @@ var compiledStateTransitions = fsm.Compile(stateTransitions)
922922

923923
func logEvent(fn func(fsm.Args) error) func(fsm.Args) error {
924924
return func(a fsm.Args) error {
925-
log.Dev.Infof(a.Ctx, "Event: %#v, Payload: %#v\n", a.Event, a.Payload)
925+
log.Changefeed.Infof(a.Ctx, "Event: %#v, Payload: %#v\n", a.Event, a.Payload)
926926
return fn(a)
927927
}
928928
}
@@ -1084,7 +1084,7 @@ func noteFeedMessage(a fsm.Args) error {
10841084
if err != nil {
10851085
return err
10861086
}
1087-
log.Dev.Infof(a.Ctx, "%v", string(m.Resolved))
1087+
log.Changefeed.Infof(a.Ctx, "%v", string(m.Resolved))
10881088
err = ns.v.NoteResolved(m.Partition, ts)
10891089
if err != nil {
10901090
return err
@@ -1096,7 +1096,7 @@ func noteFeedMessage(a fsm.Args) error {
10961096
return err
10971097
}
10981098
ns.availableRows--
1099-
log.Dev.Infof(a.Ctx, "%s->%s", m.Key, m.Value)
1099+
log.Changefeed.Infof(a.Ctx, "%s->%s", m.Key, m.Value)
11001100
return ns.v.NoteRow(m.Partition, string(m.Key), string(m.Value), ts, m.Topic)
11011101
}
11021102
}

pkg/ccl/changefeedccl/cdctest/row.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,10 @@ func MakeRangeFeedValueReaderExtended(
9595
case r := <-rows:
9696
rowKey := r.Key.String() + r.Value.String()
9797
if _, isDup := dups[rowKey]; isDup {
98-
log.Dev.Infof(context.Background(), "Skip duplicate %s", roachpb.PrettyPrintKey(nil, r.Key))
98+
log.Changefeed.Infof(context.Background(), "Skip duplicate %s", roachpb.PrettyPrintKey(nil, r.Key))
9999
continue
100100
}
101-
log.Dev.Infof(context.Background(), "Read row %s", roachpb.PrettyPrintKey(nil, r.Key))
101+
log.Changefeed.Infof(context.Background(), "Read row %s", roachpb.PrettyPrintKey(nil, r.Key))
102102
dups[rowKey] = struct{}{}
103103
return r, nil
104104
case d := <-deleteRangeC:

pkg/ccl/changefeedccl/cdcutils/throttle.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func NodeLevelThrottler(sv *settings.Values, metrics *Metrics) *Throttler {
113113
configStr := changefeedbase.NodeSinkThrottleConfig.Get(sv)
114114
if configStr != "" {
115115
if err := json.Unmarshal([]byte(configStr), &config); err != nil {
116-
log.Dev.Errorf(context.Background(),
116+
log.Changefeed.Errorf(context.Background(),
117117
"failed to parse node throttle config %q: err=%v; throttling disabled", configStr, err)
118118
}
119119
}

pkg/ccl/changefeedccl/changefeed.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func emitResolvedTimestamp(
164164
return err
165165
}
166166
if log.V(2) {
167-
log.Dev.Infof(ctx, `resolved %s`, resolved)
167+
log.Changefeed.Infof(ctx, `resolved %s`, resolved)
168168
}
169169
return nil
170170
}

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ func startDistChangefeed(
252252
return err
253253
}
254254
if log.ExpensiveLogEnabled(ctx, 2) {
255-
log.Dev.Infof(ctx, "tracked spans: %s", trackedSpans)
255+
log.Changefeed.Infof(ctx, "tracked spans: %s", trackedSpans)
256256
}
257257
localState.trackedSpans = trackedSpans
258258

@@ -265,7 +265,7 @@ func startDistChangefeed(
265265
if progress := localState.progress.GetChangefeed(); progress != nil && progress.SpanLevelCheckpoint != nil {
266266
spanLevelCheckpoint = progress.SpanLevelCheckpoint
267267
if log.V(2) {
268-
log.Dev.Infof(ctx, "span-level checkpoint: %s", spanLevelCheckpoint)
268+
log.Changefeed.Infof(ctx, "span-level checkpoint: %s", spanLevelCheckpoint)
269269
}
270270
}
271271
p, planCtx, err := makePlan(execCtx, jobID, details, description, initialHighWater,
@@ -413,7 +413,7 @@ func makePlan(
413413
evalCtx := execCtx.ExtendedEvalContext()
414414
oracle := replicaoracle.NewOracle(replicaOracleChoice, dsp.ReplicaOracleConfig(locFilter))
415415
if useBulkOracle.Get(&evalCtx.Settings.SV) {
416-
log.Dev.Infof(ctx, "using bulk oracle for DistSQL planning")
416+
log.Changefeed.Infof(ctx, "using bulk oracle for DistSQL planning")
417417
oracle = kvfollowerreadsccl.NewBulkOracle(dsp.ReplicaOracleConfig(evalCtx.Locality), locFilter, kvfollowerreadsccl.StreakConfig{})
418418
}
419419
planCtx := dsp.NewPlanningCtxWithOracle(ctx, execCtx.ExtendedEvalContext(), nil, /* planner */
@@ -423,12 +423,12 @@ func makePlan(
423423
return nil, nil, err
424424
}
425425
if log.ExpensiveLogEnabled(ctx, 2) {
426-
log.Dev.Infof(ctx, "spans returned by DistSQL: %v", spanPartitions)
426+
log.Changefeed.Infof(ctx, "spans returned by DistSQL: %v", spanPartitions)
427427
}
428428
switch {
429429
case distMode == sql.LocalDistribution || rangeDistribution == defaultDistribution:
430430
case rangeDistribution == balancedSimpleDistribution:
431-
log.Dev.Infof(ctx, "rebalancing ranges using balanced simple distribution")
431+
log.Changefeed.Infof(ctx, "rebalancing ranges using balanced simple distribution")
432432
sender := execCtx.ExecCfg().DB.NonTransactionalSender()
433433
distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)
434434
ri := kvcoord.MakeRangeIterator(distSender)
@@ -438,7 +438,7 @@ func makePlan(
438438
return nil, nil, err
439439
}
440440
if log.ExpensiveLogEnabled(ctx, 2) {
441-
log.Dev.Infof(ctx, "spans after balanced simple distribution rebalancing: %v", spanPartitions)
441+
log.Changefeed.Infof(ctx, "spans after balanced simple distribution rebalancing: %v", spanPartitions)
442442
}
443443
default:
444444
return nil, nil, errors.AssertionFailedf("unsupported dist strategy %d and dist mode %d",
@@ -473,7 +473,7 @@ func makePlan(
473473
aggregatorSpecs := make([]*execinfrapb.ChangeAggregatorSpec, len(spanPartitions))
474474
for i, sp := range spanPartitions {
475475
if log.ExpensiveLogEnabled(ctx, 2) {
476-
log.Dev.Infof(ctx, "watched spans for node %d: %v", sp.SQLInstanceID, sp)
476+
log.Changefeed.Infof(ctx, "watched spans for node %d: %v", sp.SQLInstanceID, sp)
477477
}
478478

479479
watches := make([]execinfrapb.ChangeAggregatorSpec_Watch, len(sp.Spans))
@@ -543,11 +543,11 @@ func makePlan(
543543
flowSpecs,
544544
execinfrapb.DiagramFlags{},
545545
); err != nil {
546-
log.Dev.Warningf(ctx, "failed to generate changefeed plan diagram: %s", err)
546+
log.Changefeed.Warningf(ctx, "failed to generate changefeed plan diagram: %s", err)
547547
} else if diagURL := diagURL.String(); len(diagURL) > maxLenDiagURL {
548-
log.Dev.Warningf(ctx, "changefeed plan diagram length is too large to be logged: %d", len(diagURL))
548+
log.Changefeed.Warningf(ctx, "changefeed plan diagram length is too large to be logged: %d", len(diagURL))
549549
} else {
550-
log.Dev.Infof(ctx, "changefeed plan diagram: %s", diagURL)
550+
log.Changefeed.Infof(ctx, "changefeed plan diagram: %s", diagURL)
551551
}
552552

553553
return p, planCtx, nil
@@ -656,7 +656,7 @@ func rebalanceSpanPartitions(
656656
nRanges, ok := p.NumRanges()
657657
// We cannot rebalance if we're missing range information.
658658
if !ok {
659-
log.Dev.Warning(ctx, "skipping rebalance due to missing range info")
659+
log.Changefeed.Warning(ctx, "skipping rebalance due to missing range info")
660660
return partitions, nil
661661
}
662662
builders[i].numRanges = nRanges

0 commit comments

Comments
 (0)