Skip to content

Commit a4b47dc

Browse files
craig[bot]andyyang890
andcommitted
Merge #152364
152364: changefeedccl: add missing error return during change frontier startup r=rharding6373 a=andyyang890 Before this change, when the change frontier encountered an error while restoring a changefeed's span-level checkpoint, it would ignore the error instead of moving to draining like change aggregators would. Fixes #152104 Release note: None Co-authored-by: Andy Yang <[email protected]>
2 parents afdad22 + cb4d0c1 commit a4b47dc

File tree

2 files changed

+29
-66
lines changed

2 files changed

+29
-66
lines changed

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 27 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -345,19 +345,15 @@ func (ca *changeAggregator) Start(ctx context.Context) {
345345

346346
spans, err := ca.setupSpansAndFrontier()
347347
if err != nil {
348-
if log.V(2) {
349-
log.Dev.Infof(ca.Ctx(), "change aggregator moving to draining due to error setting up spans and frontier: %v", err)
350-
}
348+
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error setting up spans and frontier: %v", err)
351349
ca.MoveToDraining(err)
352350
ca.cancel()
353351
return
354352
}
355353

356354
feed, err := makeChangefeedConfigFromJobDetails(ctx, ca.spec.Feed, ca.FlowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig))
357355
if err != nil {
358-
if log.V(2) {
359-
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error making changefeed config: %v", err)
360-
}
356+
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error making changefeed config: %v", err)
361357
ca.MoveToDraining(err)
362358
ca.cancel()
363359
return
@@ -380,19 +376,15 @@ func (ca *changeAggregator) Start(ctx context.Context) {
380376
scope, _ := opts.GetMetricScope()
381377
ca.sliMetrics, err = ca.metrics.getSLIMetrics(scope)
382378
if err != nil {
383-
if log.V(2) {
384-
log.Dev.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting sli metrics: %v", err)
385-
}
379+
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error getting sli metrics: %v", err)
386380
ca.MoveToDraining(err)
387381
ca.cancel()
388382
return
389383
}
390384
ca.sliMetricsID = ca.sliMetrics.claimId()
391385
ca.targets, err = AllTargets(ctx, ca.spec.Feed, ca.FlowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig))
392386
if err != nil {
393-
if log.V(2) {
394-
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting targets: %v", err)
395-
}
387+
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error getting targets: %v", err)
396388
ca.MoveToDraining(err)
397389
ca.cancel()
398390
return
@@ -402,9 +394,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
402394
recorder, err = ca.wrapMetricsRecorderWithTelemetry(ctx, recorder, ca.targets)
403395

404396
if err != nil {
405-
if log.V(2) {
406-
log.Dev.Infof(ca.Ctx(), "change aggregator moving to draining due to error wrapping metrics controller: %v", err)
407-
}
397+
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error wrapping metrics controller: %v", err)
408398
ca.MoveToDraining(err)
409399
ca.cancel()
410400
}
@@ -413,9 +403,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
413403
ca.spec.User(), ca.spec.JobID, recorder, ca.targets)
414404
if err != nil {
415405
err = changefeedbase.MarkRetryableError(err)
416-
if log.V(2) {
417-
log.Dev.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting sink: %v", err)
418-
}
406+
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error getting sink: %v", err)
419407
ca.MoveToDraining(err)
420408
ca.cancel()
421409
return
@@ -447,9 +435,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
447435
limit := changefeedbase.PerChangefeedMemLimit.Get(&ca.FlowCtx.Cfg.Settings.SV)
448436
ca.eventProducer, ca.kvFeedDoneCh, ca.errCh, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, feed, pool, limit, opts)
449437
if err != nil {
450-
if log.V(2) {
451-
log.Dev.Infof(ca.Ctx(), "change aggregator moving to draining due to error starting kv feed: %v", err)
452-
}
438+
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error starting kv feed: %v", err)
453439
ca.MoveToDraining(err)
454440
ca.cancel()
455441
return
@@ -459,9 +445,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
459445
ctx, ca.FlowCtx.Cfg, ca.spec, feed, ca.frontier, kvFeedHighWater,
460446
ca.sink, ca.metrics, ca.sliMetrics, ca.knobs)
461447
if err != nil {
462-
if log.V(2) {
463-
log.Dev.Infof(ca.Ctx(), "change aggregator moving to draining due to error creating event consumer: %v", err)
464-
}
448+
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error creating event consumer: %v", err)
465449
ca.MoveToDraining(err)
466450
ca.cancel()
467451
return
@@ -661,7 +645,7 @@ func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err e
661645
// Checkpointed spans are spans that were above the highwater mark, and we
662646
// must preserve that information in the frontier for future checkpointing.
663647
if err := checkpoint.Restore(ca.frontier, ca.spec.SpanLevelCheckpoint); err != nil {
664-
return nil, err
648+
return nil, errors.Wrapf(err, "failed to restore span-level checkpoint")
665649
}
666650

667651
return spans, nil
@@ -777,9 +761,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
777761
// NB: we do not invoke ca.cancel here -- just merely moving
778762
// to drain state so that the trailing metadata callback
779763
// has a chance to produce shutdown checkpoint.
780-
if log.V(2) {
781-
log.Dev.Infof(ca.Ctx(), "change aggregator moving to draining due to error while checking for node drain: %v", err)
782-
}
764+
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error while checking for node drain: %v", err)
783765
ca.MoveToDraining(err)
784766
break
785767
}
@@ -810,9 +792,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
810792
}
811793
// Shut down the poller if it wasn't already.
812794
ca.cancel()
813-
if log.V(2) {
814-
log.Dev.Infof(ca.Ctx(), "change aggregator moving to draining due to error from tick: %v", err)
815-
}
795+
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error from tick: %v", err)
816796
ca.MoveToDraining(err)
817797
break
818798
}
@@ -1378,9 +1358,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
13781358
scope := cf.spec.Feed.Opts[changefeedbase.OptMetricsScope]
13791359
sli, err := cf.metrics.getSLIMetrics(scope)
13801360
if err != nil {
1381-
if log.V(2) {
1382-
log.Dev.Infof(cf.Ctx(), "change frontier moving to draining due to error getting sli metrics: %v", err)
1383-
}
1361+
log.Dev.Warningf(cf.Ctx(), "moving to draining due to error getting sli metrics: %v", err)
13841362
cf.MoveToDraining(err)
13851363
return
13861364
}
@@ -1389,9 +1367,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
13891367
cf.spec.User(), cf.spec.JobID, sli, cf.targets)
13901368
if err != nil {
13911369
err = changefeedbase.MarkRetryableError(err)
1392-
if log.V(2) {
1393-
log.Dev.Infof(cf.Ctx(), "change frontier moving to draining due to error getting sink: %v", err)
1394-
}
1370+
log.Dev.Warningf(cf.Ctx(), "moving to draining due to error getting sink: %v", err)
13951371
cf.MoveToDraining(err)
13961372
return
13971373
}
@@ -1404,9 +1380,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
14041380

14051381
cf.highWaterAtStart = cf.spec.Feed.StatementTime
14061382
if cf.evalCtx.ChangefeedState == nil {
1407-
if log.V(2) {
1408-
log.Dev.Infof(cf.Ctx(), "change frontier moving to draining due to missing changefeed state")
1409-
}
1383+
log.Dev.Warningf(cf.Ctx(), "moving to draining due to missing changefeed state")
14101384
cf.MoveToDraining(errors.AssertionFailedf("expected initialized local state"))
14111385
return
14121386
}
@@ -1418,9 +1392,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
14181392
if cf.spec.JobID != 0 {
14191393
job, err := cf.FlowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID)
14201394
if err != nil {
1421-
if log.V(2) {
1422-
log.Dev.Infof(cf.Ctx(), "change frontier moving to draining due to error loading claimed job: %v", err)
1423-
}
1395+
log.Dev.Warningf(cf.Ctx(), "moving to draining due to error loading claimed job: %v", err)
14241396
cf.MoveToDraining(err)
14251397
return
14261398
}
@@ -1472,15 +1444,16 @@ func (cf *changeFrontier) Start(ctx context.Context) {
14721444
perTableTracking,
14731445
cf.spec.TrackedSpans...)
14741446
if err != nil {
1475-
log.Dev.Infof(cf.Ctx(), "change frontier moving to draining due to error setting up frontier: %v", err)
1447+
log.Dev.Warningf(cf.Ctx(), "moving to draining due to error setting up frontier: %v", err)
14761448
cf.MoveToDraining(err)
14771449
return
14781450
}
14791451

14801452
if err := checkpoint.Restore(cf.frontier, cf.spec.SpanLevelCheckpoint); err != nil {
1481-
if log.V(2) {
1482-
log.Dev.Infof(cf.Ctx(), "change frontier encountered error on checkpoint restore: %v", err)
1483-
}
1453+
log.Dev.Warningf(cf.Ctx(),
1454+
"moving to draining due to error restoring span-level checkpoint: %v", err)
1455+
cf.MoveToDraining(err)
1456+
return
14841457
}
14851458

14861459
if cf.knobs.AfterCoordinatorFrontierRestore != nil {
@@ -1637,39 +1610,31 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
16371610
}
16381611
}
16391612

1640-
if log.V(2) {
1641-
log.Dev.Infof(cf.Ctx(),
1642-
"change frontier moving to draining after reaching resolved span boundary (%s): %v",
1643-
boundaryType, err)
1644-
}
1613+
log.Dev.Warningf(cf.Ctx(),
1614+
"moving to draining after reaching resolved span boundary (%s): %v",
1615+
boundaryType, err)
16451616
cf.MoveToDraining(err)
16461617
break
16471618
}
16481619

16491620
row, meta := cf.input.Next()
16501621
if meta != nil {
16511622
if meta.Err != nil {
1652-
if log.V(2) {
1653-
log.Dev.Infof(cf.Ctx(), "change frontier moving to draining after getting error from aggregator: %v", meta.Err)
1654-
}
1623+
log.Dev.Warningf(cf.Ctx(), "moving to draining after getting error from aggregator: %v", meta.Err)
16551624
cf.MoveToDraining(nil /* err */)
16561625
}
16571626
if meta.Changefeed != nil && meta.Changefeed.DrainInfo != nil {
16581627
// Seeing changefeed drain info metadata from the aggregator means
16591628
// that the aggregator exited due to node shutdown. Transition to
16601629
// draining so that the remaining aggregators will shut down and
16611630
// transmit their up-to-date frontier.
1662-
if log.V(2) {
1663-
log.Dev.Infof(cf.Ctx(), "change frontier moving to draining due to aggregator shutdown: %s", meta.Changefeed)
1664-
}
1631+
log.Dev.Warningf(cf.Ctx(), "moving to draining due to aggregator shutdown: %s", meta.Changefeed)
16651632
cf.MoveToDraining(changefeedbase.ErrNodeDraining)
16661633
}
16671634
return nil, meta
16681635
}
16691636
if row == nil {
1670-
if log.V(2) {
1671-
log.Dev.Infof(cf.Ctx(), "change frontier moving to draining after getting nil row from aggregator")
1672-
}
1637+
log.Dev.Warningf(cf.Ctx(), "moving to draining after getting nil row from aggregator")
16731638
cf.MoveToDraining(nil /* err */)
16741639
break
16751640
}
@@ -1684,9 +1649,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
16841649
}
16851650

16861651
if err := cf.noteAggregatorProgress(cf.Ctx(), row[0]); err != nil {
1687-
if log.V(2) {
1688-
log.Dev.Infof(cf.Ctx(), "change frontier moving to draining after error while processing aggregator progress: %v", err)
1689-
}
1652+
log.Dev.Warningf(cf.Ctx(), "moving to draining after error while processing aggregator progress: %v", err)
16901653
cf.MoveToDraining(err)
16911654
break
16921655
}

pkg/ccl/changefeedccl/checkpoint/checkpoint.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,11 @@ type SpanForwarder interface {
7878
func Restore(sf SpanForwarder, checkpoint *jobspb.TimestampSpansMap) error {
7979
for ts, spans := range checkpoint.All() {
8080
if ts.IsEmpty() {
81-
return errors.New("checkpoint timestamp is empty")
81+
return errors.AssertionFailedf("checkpoint timestamp is empty")
8282
}
8383
for _, sp := range spans {
8484
if _, err := sf.Forward(sp, ts); err != nil {
85-
return err
85+
return errors.Wrapf(err, "forwarding span %v to %v", sp, ts)
8686
}
8787
}
8888
}

0 commit comments

Comments
 (0)