@@ -257,7 +257,7 @@ func (bd *backupDriver) monitorBackups(ctx context.Context) error {
257
257
const (
258
258
WaitingFirstFull = iota
259
259
RunningIncrementals
260
- WaitingCompactions
260
+ WaitingCompletion
261
261
Done
262
262
)
263
263
state := WaitingFirstFull
@@ -267,30 +267,41 @@ func (bd *backupDriver) monitorBackups(ctx context.Context) error {
267
267
if err != nil {
268
268
return err
269
269
}
270
+ _ , backupRunning , backupFailed , err := bd .backupJobStates (sql )
271
+ if err != nil {
272
+ return err
273
+ }
270
274
switch state {
271
275
case WaitingFirstFull :
272
276
var activeScheduleCount int
273
277
scheduleCountQuery := fmt .Sprintf (
274
278
`SELECT count(*) FROM [SHOW SCHEDULES] WHERE label='%s' AND schedule_status='ACTIVE'` , scheduleLabel ,
275
279
)
276
280
sql .QueryRow (bd .t , scheduleCountQuery ).Scan (& activeScheduleCount )
277
- if activeScheduleCount < 2 {
281
+ if len (backupFailed ) > 0 {
282
+ return errors .Newf ("backup jobs failed while waiting first full: %v" , backupFailed )
283
+ } else if activeScheduleCount < 2 {
278
284
bd .t .L ().Printf (`First full backup still running` )
279
285
} else {
280
286
state = RunningIncrementals
281
287
}
282
288
case RunningIncrementals :
283
289
var backupCount int
290
+ // We track completed backups via SHOW BACKUP as opposed to SHOW JOBs in
291
+ // the case that a fixture runs for a long enough time that old backup
292
+ // jobs stop showing up in SHOW JOBS.
284
293
backupCountQuery := fmt .Sprintf (
285
294
`SELECT count(DISTINCT end_time) FROM [SHOW BACKUP FROM LATEST IN '%s']` , fixtureURI .String (),
286
295
)
287
296
sql .QueryRow (bd .t , backupCountQuery ).Scan (& backupCount )
288
297
bd .t .L ().Printf (`%d scheduled backups taken` , backupCount )
289
298
290
- if bd .sp .fixture .CompactionThreshold > 0 {
299
+ if len (backupFailed ) > 0 {
300
+ return errors .Newf ("backup jobs failed while running incrementals: %v" , backupFailed )
301
+ } else if bd .sp .fixture .CompactionThreshold > 0 {
291
302
bd .t .L ().Printf ("%d compaction jobs succeeded, %d running" , len (compSuccess ), len (compRunning ))
292
303
if len (compFailed ) > 0 {
293
- return errors .Newf ("compaction jobs failed: %v" , compFailed )
304
+ return errors .Newf ("compaction jobs failed while running incrementals : %v" , compFailed )
294
305
}
295
306
}
296
307
@@ -299,15 +310,19 @@ func (bd *backupDriver) monitorBackups(ctx context.Context) error {
299
310
`PAUSE SCHEDULES WITH x AS (SHOW SCHEDULES) SELECT id FROM x WHERE label = '%s'` , scheduleLabel ,
300
311
)
301
312
sql .Exec (bd .t , pauseSchedulesQuery )
302
- if len (compRunning ) > 0 {
303
- state = WaitingCompactions
313
+ if len (compRunning ) > 0 || len ( backupRunning ) > 0 {
314
+ state = WaitingCompletion
304
315
} else {
305
316
state = Done
306
317
}
307
318
}
308
- case WaitingCompactions :
309
- if len (compFailed ) > 0 {
310
- return errors .Newf ("compaction jobs failed: %v" , compFailed )
319
+ case WaitingCompletion :
320
+ if len (backupFailed ) > 0 {
321
+ return errors .Newf ("backup jobs failed while waiting completion: %v" , backupFailed )
322
+ } else if len (compFailed ) > 0 {
323
+ return errors .Newf ("compaction jobs failed while waiting completion: %v" , compFailed )
324
+ } else if len (backupRunning ) > 0 {
325
+ bd .t .L ().Printf ("waiting for %d backup jobs to finish" , len (backupRunning ))
311
326
} else if len (compRunning ) > 0 {
312
327
bd .t .L ().Printf ("waiting for %d compaction jobs to finish" , len (compRunning ))
313
328
} else {
@@ -332,20 +347,45 @@ func (bd *backupDriver) compactionJobStates(
332
347
if bd .sp .fixture .CompactionThreshold == 0 {
333
348
return nil , nil , nil , nil
334
349
}
335
- compactionQuery := `SELECT job_id, status, error FROM [SHOW JOBS] WHERE job_type = 'BACKUP' AND
336
- description ILIKE 'COMPACT BACKUPS%'`
337
- rows := sql .Query (bd .t , compactionQuery )
350
+ s , r , f , err := bd .queryJobStates (
351
+ sql , "job_type = 'BACKUP' AND description ILIKE 'COMPACT BACKUPS%'" ,
352
+ )
353
+ return s , r , f , errors .Wrapf (err , "error querying compaction job states" )
354
+ }
355
+
356
+ // backupJobStates returns the state of the backup jobs, returning
357
+ // a partition of jobs that succeeded, are running, and failed.
358
+ func (bd * backupDriver ) backupJobStates (
359
+ sql * sqlutils.SQLRunner ,
360
+ ) ([]jobMeta , []jobMeta , []jobMeta , error ) {
361
+ s , r , f , err := bd .queryJobStates (
362
+ sql , "job_type = 'BACKUP' AND description ILIKE 'BACKUP %'" ,
363
+ )
364
+ return s , r , f , errors .Wrapf (err , "error querying backup job states" )
365
+ }
366
+
367
+ // queryJobStates queries the job table and returns a partition of jobs that
368
+ // succeeded, are running, and failed. The filter is applied to the query to
369
+ // limit the jobs searched. If the filter is empty, all jobs are searched.
370
+ func (bd * backupDriver ) queryJobStates (
371
+ sql * sqlutils.SQLRunner , filter string ,
372
+ ) ([]jobMeta , []jobMeta , []jobMeta , error ) {
373
+ query := "SELECT job_id, status, error FROM [SHOW JOBS]"
374
+ if filter != "" {
375
+ query += fmt .Sprintf (" WHERE %s" , filter )
376
+ }
377
+ rows := sql .Query (bd .t , query )
338
378
defer rows .Close ()
339
- var compJobs []jobMeta
379
+ var jobMetas []jobMeta
340
380
for rows .Next () {
341
381
var job jobMeta
342
382
if err := rows .Scan (& job .jobID , & job .state , & job .error ); err != nil {
343
- return nil , nil , nil , errors .Wrapf (err , "error scanning compaction job" )
383
+ return nil , nil , nil , errors .Wrapf (err , "error scanning job" )
344
384
}
345
- compJobs = append (compJobs , job )
385
+ jobMetas = append (jobMetas , job )
346
386
}
347
387
var successes , running , failures []jobMeta
348
- for _ , job := range compJobs {
388
+ for _ , job := range jobMetas {
349
389
switch job .state {
350
390
case jobs .StateSucceeded :
351
391
successes = append (successes , job )
@@ -354,7 +394,7 @@ func (bd *backupDriver) compactionJobStates(
354
394
case jobs .StateFailed :
355
395
failures = append (failures , job )
356
396
default :
357
- bd .t .L ().Printf (`unexpected compaction job %d in state %s` , job .jobID , job .state )
397
+ bd .t .L ().Printf (`unexpected job %d in state %s` , job .jobID , job .state )
358
398
}
359
399
}
360
400
return successes , running , failures , nil
0 commit comments