Skip to content

Commit 90eb30f

Browse files
committed
backup: only flush per-node progress every 15s
Previously this could queue up a large number of small changes to flush, particularly if flushing was slower than updates. Now all the updates that arrive over a 15s window are rolled up before being saved. If the channel becomes full while saving, additional messages may be dropped. Release note: none. Epic: none.
1 parent d5e3063 commit 90eb30f

File tree

1 file changed

+23
-8
lines changed

1 file changed

+23
-8
lines changed

pkg/backup/backup_job.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -226,17 +226,35 @@ func backup(
226226
}
227227
}
228228

229-
// Create a channel that is large enough that it does not block.
230-
perNodeProgressCh := make(chan map[execinfrapb.ComponentID]float32, numTotalSpans)
229+
// Create a channel with a little buffering, but plan on dropping if blocked.
230+
perNodeProgressCh := make(chan map[execinfrapb.ComponentID]float32, len(backupSpecs))
231231
storePerNodeProgressLoop := func(ctx context.Context) error {
232+
// track the last progress per component, periodically flushing those that
233+
// have changed to info rows.
234+
current, persisted := make(map[execinfrapb.ComponentID]float32), make(map[execinfrapb.ComponentID]float32)
235+
lastSaved := timeutil.Now()
236+
232237
for {
233238
select {
234239
case prog, ok := <-perNodeProgressCh:
235240
if !ok {
236241
return nil
237242
}
238-
jobsprofiler.StorePerNodeProcessorProgressFraction(
239-
ctx, execCtx.ExecCfg().InternalDB, job.ID(), prog)
243+
for k, v := range prog {
244+
current[k] = v
245+
}
246+
if timeutil.Since(lastSaved) > time.Second*15 {
247+
lastSaved = timeutil.Now()
248+
updates := make(map[execinfrapb.ComponentID]float32)
249+
for k := range current {
250+
if current[k] != persisted[k] {
251+
persisted[k] = current[k]
252+
updates[k] = current[k]
253+
}
254+
}
255+
jobsprofiler.StorePerNodeProcessorProgressFraction(
256+
ctx, execCtx.ExecCfg().InternalDB, job.ID(), updates)
257+
}
240258
case <-ctx.Done():
241259
return ctx.Err()
242260
}
@@ -285,11 +303,8 @@ func backup(
285303
perComponentProgress[component] = fraction
286304
}
287305
select {
288-
// This send to a buffered channel should never block but incase it does
289-
// we will fallthrough to the default case.
290306
case perNodeProgressCh <- perComponentProgress:
291-
default:
292-
log.Warningf(ctx, "skipping persisting per component progress as buffered channel was full")
307+
default: // discard the update if the flusher is backed up.
293308
}
294309

295310
// Check if we should persist a checkpoint backup manifest.

0 commit comments

Comments
 (0)