Skip to content

Commit 52dc3a3

Browse files
committed
backup: fix race condition in starting compaction job
In #145930, scheduled compactions are blocked from running if another compaction job is running for the schedule. However, it is currently possible for there to be a race condition which results in a compaction job being unable to find an incremental backup. Take the following circumstance: 1. Compaction job A starts. 2. A scheduled backup B completes and begins considering whether a compaction job should run. It fetches the current chain to its end time and finds that it should run a compaction. 3. Compaction job A completes. 4. B starts a transaction to create the compaction job. Because A has completed, it does not block the job from being created. 5. B creates a compaction job C that has a start time that is now skipped due to A's completion. 6. When C is picked up by the job system, it resolves the backup chain again, which now no longer has its start time and it fails. This is resolved by opening a transaction before fetching the backup chain to check for an already running compaction job. Fixes: #149867, #147264 Release note: None
1 parent d7ce141 commit 52dc3a3

File tree

1 file changed

+55
-12
lines changed

1 file changed

+55
-12
lines changed

pkg/backup/compaction_job.go

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,28 @@ func maybeStartCompactionJob(
9999
user,
100100
)
101101

102+
// A race condition can occur where a compaction job ends after we fetch the
103+
// backup chain but before we open a transaction to write the record. As a
104+
// result, the written record is based on a chain that did not include the
105+
// newly completed compaction job. In this scenario, it is possible that the
106+
// chosen times for this compaction job actually no longer exist in the chain
107+
// because it was compacted away. To avoid this, we need to check for the lock
108+
// before fetching the backup chain.
109+
//
110+
// Note: _Technically_, this isn't entirely alleviated as a compaction job
111+
// could start and finish in between the time we grab the backup chain and
112+
// before we write the job record. However, this would require the schedule to
113+
// have `on_previous_running=start` and realistically speaking, no compaction
114+
// job would complete that quickly.
115+
if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
116+
_, err := getScheduledExecutionArgsAndCheckCompactionLock(
117+
ctx, txn, env, triggerJob.ScheduleID,
118+
)
119+
return err
120+
}); err != nil {
121+
return 0, err
122+
}
123+
102124
chain, _, _, _, err := getBackupChain(
103125
ctx, execCfg, user, triggerJob.Destination, triggerJob.EncryptionOptions,
104126
triggerJob.EndTime, &kmsEnv,
@@ -115,23 +137,14 @@ func maybeStartCompactionJob(
115137
return 0, err
116138
}
117139
startTS, endTS := chain[start].StartTime, chain[end-1].EndTime
118-
log.Infof(ctx, "compacting backups from %s to %s", startTS, endTS)
119140

120141
var jobID jobspb.JobID
121142
err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
122-
_, args, err := getScheduledBackupExecutionArgsFromSchedule(
123-
ctx, env, jobs.ScheduledJobTxn(txn), triggerJob.ScheduleID,
143+
args, err := getScheduledExecutionArgsAndCheckCompactionLock(
144+
ctx, txn, env, triggerJob.ScheduleID,
124145
)
125146
if err != nil {
126-
return errors.Wrapf(
127-
err, "failed to get scheduled backup execution args for schedule %d", triggerJob.ScheduleID,
128-
)
129-
}
130-
if args.CompactionJobID != 0 {
131-
return errors.Newf(
132-
"compaction job %d already running for schedule %d",
133-
args.CompactionJobID, triggerJob.ScheduleID,
134-
)
147+
return err
135148
}
136149
datums, err := txn.QueryRowEx(
137150
ctx,
@@ -171,6 +184,9 @@ func maybeStartCompactionJob(
171184
)
172185
return scheduledJob.Update(ctx, backupSchedule)
173186
})
187+
if err == nil {
188+
log.Infof(ctx, "compacting backups from %s to %s", startTS, endTS)
189+
}
174190
return jobID, err
175191
}
176192

@@ -931,6 +947,33 @@ func maybeWriteBackupCheckpoint(
931947
return true, nil
932948
}
933949

950+
// getScheduledExecutionArgsAndCheckCompactionLock retrieves the scheduled
951+
// backup execution args and also checks if a compaction jobs is already
952+
// running. If we fail to fetch the args or if a compaction job is already
953+
// running for this schedule, an error is returned.
954+
func getScheduledExecutionArgsAndCheckCompactionLock(
955+
ctx context.Context,
956+
txn isql.Txn,
957+
env scheduledjobs.JobSchedulerEnv,
958+
scheduleID jobspb.ScheduleID,
959+
) (*backuppb.ScheduledBackupExecutionArgs, error) {
960+
_, args, err := getScheduledBackupExecutionArgsFromSchedule(
961+
ctx, env, jobs.ScheduledJobTxn(txn), scheduleID,
962+
)
963+
if err != nil {
964+
return nil, errors.Wrapf(
965+
err, "failed to get scheduled backup execution args for schedule %d", scheduleID,
966+
)
967+
}
968+
if args.CompactionJobID != 0 {
969+
return nil, errors.Newf(
970+
"compaction job %d already running for schedule %d",
971+
args.CompactionJobID, scheduleID,
972+
)
973+
}
974+
return args, nil
975+
}
976+
934977
func init() {
935978
builtins.StartCompactionJob = StartCompactionJob
936979
}

0 commit comments

Comments
 (0)