@@ -99,6 +99,28 @@ func maybeStartCompactionJob(
99
99
user ,
100
100
)
101
101
102
+ // A race condition can occur where a compaction job ends after we fetch the
103
+ // backup chain but before we open a transaction to write the record. As a
104
+ // result, the written record is based on a chain that did not include the
105
+ // newly completed compaction job. In this scenario, it is possible that the
106
+ // chosen times for this compaction job actually no longer exist in the chain
107
+ // because it was compacted away. To avoid this, we need to check for the lock
108
+ // before fetching the backup chain.
109
+ //
110
+ // Note: _Technically_, this isn't entirely alleviated as a compaction job
111
+ // could start and finish in between the time we grab the backup chain and
112
+ // before we write the job record. However, this would require the schedule to
113
+ // have `on_previous_running=start` and realistically speaking, no compaction
114
+ // job would complete that quickly.
115
+ if err := execCfg .InternalDB .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
116
+ _ , err := getScheduledExecutionArgsAndCheckCompactionLock (
117
+ ctx , txn , env , triggerJob .ScheduleID ,
118
+ )
119
+ return err
120
+ }); err != nil {
121
+ return 0 , err
122
+ }
123
+
102
124
chain , _ , _ , _ , err := getBackupChain (
103
125
ctx , execCfg , user , triggerJob .Destination , triggerJob .EncryptionOptions ,
104
126
triggerJob .EndTime , & kmsEnv ,
@@ -115,23 +137,14 @@ func maybeStartCompactionJob(
115
137
return 0 , err
116
138
}
117
139
startTS , endTS := chain [start ].StartTime , chain [end - 1 ].EndTime
118
- log .Infof (ctx , "compacting backups from %s to %s" , startTS , endTS )
119
140
120
141
var jobID jobspb.JobID
121
142
err = execCfg .InternalDB .Txn (ctx , func (ctx context.Context , txn isql.Txn ) error {
122
- _ , args , err := getScheduledBackupExecutionArgsFromSchedule (
123
- ctx , env , jobs . ScheduledJobTxn ( txn ) , triggerJob .ScheduleID ,
143
+ args , err := getScheduledExecutionArgsAndCheckCompactionLock (
144
+ ctx , txn , env , triggerJob .ScheduleID ,
124
145
)
125
146
if err != nil {
126
- return errors .Wrapf (
127
- err , "failed to get scheduled backup execution args for schedule %d" , triggerJob .ScheduleID ,
128
- )
129
- }
130
- if args .CompactionJobID != 0 {
131
- return errors .Newf (
132
- "compaction job %d already running for schedule %d" ,
133
- args .CompactionJobID , triggerJob .ScheduleID ,
134
- )
147
+ return err
135
148
}
136
149
datums , err := txn .QueryRowEx (
137
150
ctx ,
@@ -171,6 +184,9 @@ func maybeStartCompactionJob(
171
184
)
172
185
return scheduledJob .Update (ctx , backupSchedule )
173
186
})
187
+ if err == nil {
188
+ log .Infof (ctx , "compacting backups from %s to %s" , startTS , endTS )
189
+ }
174
190
return jobID , err
175
191
}
176
192
@@ -931,6 +947,33 @@ func maybeWriteBackupCheckpoint(
931
947
return true , nil
932
948
}
933
949
950
+ // getScheduledExecutionArgsAndCheckCompactionLock retrieves the scheduled
951
+ // backup execution args and also checks if a compaction jobs is already
952
+ // running. If we fail to fetch the args or if a compaction job is already
953
+ // running for this schedule, an error is returned.
954
+ func getScheduledExecutionArgsAndCheckCompactionLock (
955
+ ctx context.Context ,
956
+ txn isql.Txn ,
957
+ env scheduledjobs.JobSchedulerEnv ,
958
+ scheduleID jobspb.ScheduleID ,
959
+ ) (* backuppb.ScheduledBackupExecutionArgs , error ) {
960
+ _ , args , err := getScheduledBackupExecutionArgsFromSchedule (
961
+ ctx , env , jobs .ScheduledJobTxn (txn ), scheduleID ,
962
+ )
963
+ if err != nil {
964
+ return nil , errors .Wrapf (
965
+ err , "failed to get scheduled backup execution args for schedule %d" , scheduleID ,
966
+ )
967
+ }
968
+ if args .CompactionJobID != 0 {
969
+ return nil , errors .Newf (
970
+ "compaction job %d already running for schedule %d" ,
971
+ args .CompactionJobID , scheduleID ,
972
+ )
973
+ }
974
+ return args , nil
975
+ }
976
+
934
977
func init () {
935
978
builtins .StartCompactionJob = StartCompactionJob
936
979
}
0 commit comments