Skip to content

Commit adfa889

Browse files
committed
changefeedccl: have aggregators send their frontiers regularly
This patch updates the aggregators to send their frontiers regularly every `min_checkpoint_frequency` (limited by average time to flush) to support periodic span frontier persistence. Release note: None
1 parent d8e21ad commit adfa889

File tree

7 files changed

+272
-216
lines changed

7 files changed

+272
-216
lines changed

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ go_test(
203203
"alter_changefeed_test.go",
204204
"changefeed_dist_test.go",
205205
"changefeed_job_info_test.go",
206+
"changefeed_processors_test.go",
206207
"changefeed_progress_test.go",
207208
"changefeed_stmt_test.go",
208209
"changefeed_test.go",

pkg/ccl/changefeedccl/alter_changefeed_test.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1524,13 +1524,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
15241524
defer rndMu.Unlock()
15251525

15261526
if r.Span.Equal(fooTableSpan) {
1527-
// Do not emit resolved events for the entire table span.
1528-
// We "simulate" large table by splitting single table span into many parts, so
1529-
// we want to resolve those sub-spans instead of the entire table span.
1530-
// However, we have to emit something -- otherwise the entire changefeed
1531-
// machine would not work.
1532-
r.Span.EndKey = fooTableSpan.Key.Next()
1533-
return false, nil
1527+
return true, nil
15341528
}
15351529
if haveGaps {
15361530
return rndMu.rnd.Intn(10) > 7, nil

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 81 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,11 @@ type changeAggregator struct {
106106
// eventConsumer consumes the event.
107107
eventConsumer eventConsumer
108108

109-
nextHighWaterFlush time.Time // next time high watermark may be flushed.
110-
flushFrequency time.Duration // how often high watermark can be checkpointed.
111-
lastSpanFlush time.Time // last time expensive, span based checkpoint was written.
109+
flushFrequency time.Duration // how often high watermark can be checkpointed.
110+
111+
// frontierFlushLimiter is a rate limiter for flushing the span frontier
112+
// to the coordinator.
113+
frontierFlushLimiter *saveRateLimiter
112114

113115
// frontier keeps track of resolved timestamps for spans along with schema change
114116
// boundary information.
@@ -281,6 +283,22 @@ func newChangeAggregatorProcessor(
281283
ca.flushFrequency = changefeedbase.DefaultMinCheckpointFrequency
282284
}
283285

286+
ca.frontierFlushLimiter, err = newSaveRateLimiter(saveRateConfig{
287+
name: "frontier",
288+
intervalName: func() redact.SafeValue {
289+
return redact.SafeString(changefeedbase.OptMinCheckpointFrequency)
290+
},
291+
interval: func() time.Duration {
292+
return ca.flushFrequency
293+
},
294+
jitter: func() float64 {
295+
return aggregatorFlushJitter.Get(&ca.FlowCtx.Cfg.Settings.SV)
296+
},
297+
}, timeutil.DefaultTimeSource{})
298+
if err != nil {
299+
return nil, err
300+
}
301+
284302
return ca, nil
285303
}
286304

@@ -461,9 +479,6 @@ func (ca *changeAggregator) Start(ctx context.Context) {
461479

462480
// Init heartbeat timer.
463481
ca.lastPush = timeutil.Now()
464-
465-
// Generate expensive checkpoint only after we ran for a while.
466-
ca.lastSpanFlush = timeutil.Now()
467482
}
468483

469484
func (ca *changeAggregator) startKVFeed(
@@ -730,18 +745,6 @@ var aggregatorFlushJitter = settings.RegisterFloatSetting(
730745
settings.WithPublic,
731746
)
732747

733-
func nextFlushWithJitter(s timeutil.TimeSource, d time.Duration, j float64) (time.Time, error) {
734-
if j < 0 || d < 0 {
735-
return s.Now(), errors.AssertionFailedf("invalid jitter value: %f, duration: %s", j, d)
736-
}
737-
maxJitter := int64(j * float64(d))
738-
if maxJitter == 0 {
739-
return s.Now().Add(d), nil
740-
}
741-
nextFlush := d + time.Duration(rand.Int63n(maxJitter))
742-
return s.Now().Add(nextFlush), nil
743-
}
744-
745748
// Next is part of the RowSource interface.
746749
func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
747750
shouldEmitHeartBeat := func() bool {
@@ -897,7 +900,7 @@ func (ca *changeAggregator) flushBufferedEvents(ctx context.Context) error {
897900
// noteResolvedSpan periodically flushes Frontier progress from the current
898901
// changeAggregator node to the changeFrontier node to allow the changeFrontier
899902
// to persist the overall changefeed's progress
900-
func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (returnErr error) {
903+
func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error {
901904
ctx, sp := tracing.ChildSpan(ca.Ctx(), "changefeed.aggregator.note_resolved_span")
902905
defer sp.Finish()
903906

@@ -935,32 +938,17 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) (retu
935938
// TODO(yevgeniy): Consider doing something similar to how job checkpointing
936939
// works in the frontier where if we missed the window to checkpoint, we will attempt
937940
// the checkpoint at the next opportune moment.
938-
checkpointFrontier := advanced &&
939-
(forceFlush || timeutil.Now().After(ca.nextHighWaterFlush))
941+
checkpointFrontier := (advanced && forceFlush) || ca.frontierFlushLimiter.canSave(ctx)
940942

941943
if checkpointFrontier {
942-
defer func() {
943-
ca.nextHighWaterFlush, err = nextFlushWithJitter(
944-
timeutil.DefaultTimeSource{}, ca.flushFrequency, aggregatorFlushJitter.Get(sv))
945-
if err != nil {
946-
returnErr = errors.CombineErrors(returnErr, err)
947-
}
948-
}()
949-
return ca.flushFrontier(ctx)
944+
now := timeutil.Now()
945+
if err := ca.flushFrontier(ctx); err != nil {
946+
return err
947+
}
948+
ca.frontierFlushLimiter.doneSave(timeutil.Since(now))
950949
}
951950

952-
// At a lower frequency, we checkpoint specific spans in the job progress
953-
// either in backfills or if the highwater mark is excessively lagging behind.
954-
checkpointSpans := (ca.frontier.InBackfill(resolved) || ca.frontier.HasLaggingSpans(sv)) &&
955-
canCheckpointSpans(sv, ca.lastSpanFlush)
956-
957-
if checkpointSpans {
958-
defer func() {
959-
ca.lastSpanFlush = timeutil.Now()
960-
}()
961-
return ca.flushFrontier(ctx)
962-
}
963-
return returnErr
951+
return nil
964952
}
965953

966954
// flushFrontier flushes sink and emits resolved spans to the change frontier.
@@ -1313,8 +1301,18 @@ func newChangeFrontierProcessor(
13131301
cf.freqEmitResolved = emitNoResolved
13141302
}
13151303

1316-
cf.frontierPersistenceLimiter = newSaveRateLimiter(
1317-
"frontier" /* name */, changefeedbase.FrontierPersistenceInterval)
1304+
cf.frontierPersistenceLimiter, err = newSaveRateLimiter(saveRateConfig{
1305+
name: "frontier",
1306+
intervalName: func() redact.SafeValue {
1307+
return changefeedbase.FrontierPersistenceInterval.Name()
1308+
},
1309+
interval: func() time.Duration {
1310+
return changefeedbase.FrontierPersistenceInterval.Get(&cf.FlowCtx.Cfg.Settings.SV)
1311+
},
1312+
}, timeutil.DefaultTimeSource{})
1313+
if err != nil {
1314+
return nil, err
1315+
}
13181316

13191317
encodingOpts, err := opts.GetEncodingOptions()
13201318
if err != nil {
@@ -1919,7 +1917,7 @@ func (cf *changeFrontier) maybePersistFrontier(ctx context.Context) error {
19191917

19201918
if cf.spec.JobID == 0 ||
19211919
!cf.evalCtx.Settings.Version.IsActive(ctx, clusterversion.V25_4) ||
1922-
!cf.frontierPersistenceLimiter.canSave(ctx, &cf.FlowCtx.Cfg.Settings.SV) {
1920+
!cf.frontierPersistenceLimiter.canSave(ctx) {
19231921
return nil
19241922
}
19251923

@@ -2349,41 +2347,62 @@ func shouldCountUsageError(err error) bool {
23492347
status.Code(err) != codes.Canceled
23502348
}
23512349

2352-
// durationSetting is a duration cluster setting.
2353-
type durationSetting interface {
2354-
Name() settings.SettingName
2355-
Get(sv *settings.Values) time.Duration
2350+
// saveRateConfig is the config for a saveRateLimiter.
2351+
type saveRateConfig struct {
2352+
name redact.SafeString
2353+
intervalName func() redact.SafeValue
2354+
interval func() time.Duration
2355+
jitter func() float64 // optional
23562356
}
23572357

23582358
// saveRateLimiter is a rate limiter for saving a piece of progress.
23592359
// It uses a duration setting as the minimum interval between saves.
23602360
// It also limits saving to not be more frequent than the average
23612361
// duration it takes to save progress.
23622362
type saveRateLimiter struct {
2363-
name redact.SafeString
2364-
saveInterval durationSetting
2365-
warnEveryN util.EveryN
2363+
config saveRateConfig
2364+
warnEveryN util.EveryN
2365+
2366+
clock timeutil.TimeSource
23662367

23672368
lastSave time.Time
23682369
avgSaveDuration time.Duration
23692370
}
23702371

23712372
// newSaveRateLimiter returns a new saveRateLimiter.
2372-
func newSaveRateLimiter(name redact.SafeString, saveInterval durationSetting) *saveRateLimiter {
2373-
return &saveRateLimiter{
2374-
name: name,
2375-
saveInterval: saveInterval,
2376-
warnEveryN: util.Every(time.Minute),
2373+
func newSaveRateLimiter(
2374+
config saveRateConfig, clock timeutil.TimeSource,
2375+
) (*saveRateLimiter, error) {
2376+
if len(config.name) == 0 {
2377+
return nil, errors.AssertionFailedf("name is required")
2378+
}
2379+
if config.intervalName == nil {
2380+
return nil, errors.AssertionFailedf("interval name is required")
2381+
}
2382+
if config.interval == nil {
2383+
return nil, errors.AssertionFailedf("interval is required")
23772384
}
2385+
return &saveRateLimiter{
2386+
config: config,
2387+
warnEveryN: util.Every(time.Minute),
2388+
clock: clock,
2389+
}, nil
23782390
}
23792391

23802392
// canSave returns whether enough time has passed to save progress again.
2381-
func (l *saveRateLimiter) canSave(ctx context.Context, sv *settings.Values) bool {
2382-
interval := l.saveInterval.Get(sv)
2383-
if interval == 0 {
2393+
func (l *saveRateLimiter) canSave(ctx context.Context) bool {
2394+
interval := l.config.interval()
2395+
if interval <= 0 {
23842396
return false
23852397
}
2386-
now := timeutil.Now()
2398+
if l.config.jitter != nil {
2399+
if jitter := l.config.jitter(); jitter > 0 {
2400+
if maxJitter := time.Duration(jitter * float64(interval)); maxJitter > 0 {
2401+
interval += time.Duration(rand.Int63n(int64(maxJitter) + 1))
2402+
}
2403+
}
2404+
}
2405+
now := l.clock.Now()
23872406
elapsed := now.Sub(l.lastSave)
23882407
if elapsed < interval {
23892408
return false
@@ -2393,8 +2412,7 @@ func (l *saveRateLimiter) canSave(ctx context.Context, sv *settings.Values) bool
23932412
log.Changefeed.Warningf(ctx, "cannot save %s even though %s has elapsed "+
23942413
"since last save and %s is set to %s because average duration to save was %s "+
23952414
"and further saving is disabled until that much time elapses",
2396-
l.name, elapsed, l.saveInterval.Name(),
2397-
interval, l.avgSaveDuration)
2415+
l.config.name, elapsed, l.config.intervalName(), interval, l.avgSaveDuration)
23982416
}
23992417
return false
24002418
}
@@ -2404,7 +2422,7 @@ func (l *saveRateLimiter) canSave(ctx context.Context, sv *settings.Values) bool
24042422
// doneSave must be called after each save is completed with the duration
24052423
// it took to save progress.
24062424
func (l *saveRateLimiter) doneSave(saveDuration time.Duration) {
2407-
l.lastSave = timeutil.Now()
2425+
l.lastSave = l.clock.Now()
24082426

24092427
// Update the average save duration using an exponential moving average.
24102428
if l.avgSaveDuration == 0 {

0 commit comments

Comments
 (0)