@@ -226,17 +226,35 @@ func backup(
226
226
}
227
227
}
228
228
229
- // Create a channel that is large enough that it does not block .
230
- perNodeProgressCh := make (chan map [execinfrapb.ComponentID ]float32 , numTotalSpans )
229
+ // Create a channel with a little buffering, but plan on dropping if blocked .
230
+ perNodeProgressCh := make (chan map [execinfrapb.ComponentID ]float32 , len ( backupSpecs ) )
231
231
storePerNodeProgressLoop := func (ctx context.Context ) error {
232
+ // track the last progress per component, periodically flushing those that
233
+ // have changed to info rows.
234
+ current , persisted := make (map [execinfrapb.ComponentID ]float32 ), make (map [execinfrapb.ComponentID ]float32 )
235
+ lastSaved := timeutil .Now ()
236
+
232
237
for {
233
238
select {
234
239
case prog , ok := <- perNodeProgressCh :
235
240
if ! ok {
236
241
return nil
237
242
}
238
- jobsprofiler .StorePerNodeProcessorProgressFraction (
239
- ctx , execCtx .ExecCfg ().InternalDB , job .ID (), prog )
243
+ for k , v := range prog {
244
+ current [k ] = v
245
+ }
246
+ if timeutil .Since (lastSaved ) > time .Second * 15 {
247
+ lastSaved = timeutil .Now ()
248
+ updates := make (map [execinfrapb.ComponentID ]float32 )
249
+ for k := range current {
250
+ if current [k ] != persisted [k ] {
251
+ persisted [k ] = current [k ]
252
+ updates [k ] = current [k ]
253
+ }
254
+ }
255
+ jobsprofiler .StorePerNodeProcessorProgressFraction (
256
+ ctx , execCtx .ExecCfg ().InternalDB , job .ID (), updates )
257
+ }
240
258
case <- ctx .Done ():
241
259
return ctx .Err ()
242
260
}
@@ -285,11 +303,8 @@ func backup(
285
303
perComponentProgress [component ] = fraction
286
304
}
287
305
select {
288
- // This send to a buffered channel should never block but incase it does
289
- // we will fallthrough to the default case.
290
306
case perNodeProgressCh <- perComponentProgress :
291
- default :
292
- log .Warningf (ctx , "skipping persisting per component progress as buffered channel was full" )
307
+ default : // discard the update if the flusher is backed up.
293
308
}
294
309
295
310
// Check if we should persist a checkpoint backup manifest.
0 commit comments