Skip to content

Commit 5ced8ed

Browse files
craig[bot]kev-cao
andcommitted
Merge #143597
143597: backup: explicitly require full path for backup compaction builtin r=msbutler a=kev-cao For the backup compaction builtin, the full backup path must be explicitly stated. If LATEST is used, a race condition can occur where a full backup executes before a compaction job can resolve its destination from its triggering incremental. The full backup will overwrite the LATEST file and compaction will run on the wrong chain and fail. Fixes: #143543 Release note: None Co-authored-by: Kevin Cao <[email protected]>
2 parents ba23afd + a44ebf9 commit 5ced8ed

File tree

6 files changed

+143
-65
lines changed

6 files changed

+143
-65
lines changed

pkg/backup/backup_compaction.go

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ var (
6666
// maybeStartCompactionJob will initiate a compaction job off of a triggering
6767
// incremental job if the backup chain length exceeds the threshold.
6868
// backupStmt should be the original backup statement that triggered the job.
69+
// It is the responsibility of the caller to ensure that the backup details'
70+
// destination contains a resolved subdir.
6971
func maybeStartCompactionJob(
7072
ctx context.Context,
7173
execCfg *sql.ExecutorConfig,
@@ -82,6 +84,19 @@ func maybeStartCompactionJob(
8284
if knobs != nil && knobs.JobSchedulerEnv != nil {
8385
env = knobs.JobSchedulerEnv
8486
}
87+
kmsEnv := backupencryption.MakeBackupKMSEnv(
88+
execCfg.Settings,
89+
&execCfg.ExternalIODirConfig,
90+
execCfg.InternalDB,
91+
user,
92+
)
93+
chain, _, _, _, err := getBackupChain(ctx, execCfg, user, triggerJob, &kmsEnv)
94+
if err != nil {
95+
return 0, errors.Wrap(err, "failed to get backup chain")
96+
}
97+
if int64(len(chain)) < threshold {
98+
return 0, nil
99+
}
85100
var backupStmt string
86101
if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
87102
_, args, err := getScheduledBackupExecutionArgsFromSchedule(
@@ -97,19 +112,6 @@ func maybeStartCompactionJob(
97112
}); err != nil {
98113
return 0, err
99114
}
100-
kmsEnv := backupencryption.MakeBackupKMSEnv(
101-
execCfg.Settings,
102-
&execCfg.ExternalIODirConfig,
103-
execCfg.InternalDB,
104-
user,
105-
)
106-
chain, _, _, _, err := getBackupChain(ctx, execCfg, user, triggerJob, &kmsEnv)
107-
if err != nil {
108-
return 0, errors.Wrap(err, "failed to get backup chain")
109-
}
110-
if int64(len(chain)) < threshold {
111-
return 0, nil
112-
}
113115
start, end, err := minSizeDeltaHeuristic(ctx, execCfg, chain)
114116
if err != nil {
115117
return 0, err
@@ -124,8 +126,9 @@ func maybeStartCompactionJob(
124126
"start-compaction-job",
125127
txn.KV(),
126128
sessiondata.NoSessionDataOverride,
127-
`SELECT crdb_internal.backup_compaction($1, $2::DECIMAL, $3::DECIMAL)`,
129+
`SELECT crdb_internal.backup_compaction($1, $2, $3::DECIMAL, $4::DECIMAL)`,
128130
backupStmt,
131+
triggerJob.Destination.Subdir,
129132
startTS.AsOfSystemTime(),
130133
endTS.AsOfSystemTime(),
131134
)
@@ -219,6 +222,12 @@ func (b *backupResumer) ResumeCompaction(
219222
var backupManifest *backuppb.BackupManifest
220223
updatedDetails := initialDetails
221224
if initialDetails.URI == "" {
225+
testingKnobs := execCtx.ExecCfg().BackupRestoreTestingKnobs
226+
if testingKnobs != nil && testingKnobs.RunBeforeResolvingCompactionDest != nil {
227+
if err := testingKnobs.RunBeforeResolvingCompactionDest(); err != nil {
228+
return err
229+
}
230+
}
222231
// Resolve the backup destination. If we have already resolved and persisted
223232
// the destination during a previous resumption of this job, we can re-use
224233
// the previous resolution.

pkg/backup/backup_compaction_test.go

Lines changed: 83 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package backup
88
import (
99
"context"
1010
"fmt"
11+
"math/rand"
1112
"net/url"
1213
"strconv"
1314
"strings"
@@ -20,6 +21,7 @@ import (
2021
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2122
"github.com/cockroachdb/cockroach/pkg/roachpb"
2223
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
24+
"github.com/cockroachdb/cockroach/pkg/sql"
2325
"github.com/cockroachdb/cockroach/pkg/testutils"
2426
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
2527
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
@@ -50,11 +52,11 @@ func TestBackupCompaction(t *testing.T) {
5052
defer cleanupDB()
5153

5254
// Expects start/end to be nanosecond epoch.
53-
startCompaction := func(bucket int, start, end int64) jobspb.JobID {
55+
startCompaction := func(bucket int, subdir string, start, end int64) jobspb.JobID {
5456
compactionBuiltin := `SELECT crdb_internal.backup_compaction(
55-
ARRAY['nodelocal://1/backup/%d'], 'LATEST', ''::BYTES, %d::DECIMAL, %d::DECIMAL
57+
ARRAY['nodelocal://1/backup/%d'], '%s', ''::BYTES, %d::DECIMAL, %d::DECIMAL
5658
)`
57-
row := db.QueryRow(t, fmt.Sprintf(compactionBuiltin, bucket, start, end))
59+
row := db.QueryRow(t, fmt.Sprintf(compactionBuiltin, bucket, subdir, start, end))
5860
var jobID jobspb.JobID
5961
row.Scan(&jobID)
6062
return jobID
@@ -88,7 +90,7 @@ ARRAY['nodelocal://1/backup/%d'], 'LATEST', ''::BYTES, %d::DECIMAL, %d::DECIMAL
8890
t,
8991
fmt.Sprintf(incBackupAostCmd, 1, end),
9092
)
91-
waitForSuccessfulJob(t, tc, startCompaction(1, start, end))
93+
waitForSuccessfulJob(t, tc, startCompaction(1, backupPath, start, end))
9294
validateCompactedBackupForTables(t, db, []string{"foo"}, "'nodelocal://1/backup/1'", start, end)
9395
start = end
9496
}
@@ -127,7 +129,9 @@ ARRAY['nodelocal://1/backup/%d'], 'LATEST', ''::BYTES, %d::DECIMAL, %d::DECIMAL
127129
t,
128130
fmt.Sprintf(incBackupAostCmd, 2, end),
129131
)
130-
waitForSuccessfulJob(t, tc, startCompaction(2, start, end))
132+
var backupPath string
133+
db.QueryRow(t, "SHOW BACKUPS IN 'nodelocal://1/backup/2'").Scan(&backupPath)
134+
waitForSuccessfulJob(t, tc, startCompaction(2, backupPath, start, end))
131135
validateCompactedBackupForTables(
132136
t, db,
133137
[]string{"foo", "bar", "baz"},
@@ -141,7 +145,7 @@ ARRAY['nodelocal://1/backup/%d'], 'LATEST', ''::BYTES, %d::DECIMAL, %d::DECIMAL
141145
t,
142146
fmt.Sprintf(incBackupAostCmd, 2, end),
143147
)
144-
waitForSuccessfulJob(t, tc, startCompaction(2, start, end))
148+
waitForSuccessfulJob(t, tc, startCompaction(2, backupPath, start, end))
145149

146150
db.Exec(t, "DROP TABLE foo, baz")
147151
db.Exec(t, "RESTORE FROM LATEST IN 'nodelocal://1/backup/2'")
@@ -169,7 +173,9 @@ ARRAY['nodelocal://1/backup/%d'], 'LATEST', ''::BYTES, %d::DECIMAL, %d::DECIMAL
169173
t,
170174
fmt.Sprintf(incBackupAostCmd, 3, end),
171175
)
172-
waitForSuccessfulJob(t, tc, startCompaction(3, start, end))
176+
var backupPath string
177+
db.QueryRow(t, "SHOW BACKUPS IN 'nodelocal://1/backup/3'").Scan(&backupPath)
178+
waitForSuccessfulJob(t, tc, startCompaction(3, backupPath, start, end))
173179

174180
var numIndexes, restoredNumIndexes int
175181
db.QueryRow(t, "SELECT count(*) FROM [SHOW INDEXES FROM foo]").Scan(&numIndexes)
@@ -210,7 +216,9 @@ ARRAY['nodelocal://1/backup/%d'], 'LATEST', ''::BYTES, %d::DECIMAL, %d::DECIMAL
210216
db.Exec(t, "INSERT INTO foo VALUES (6, 6)")
211217
db.Exec(t, fmt.Sprintf(incBackupCmd, 4))
212218

213-
waitForSuccessfulJob(t, tc, startCompaction(4, start, end))
219+
var backupPath string
220+
db.QueryRow(t, "SHOW BACKUPS IN 'nodelocal://1/backup/4'").Scan(&backupPath)
221+
waitForSuccessfulJob(t, tc, startCompaction(4, backupPath, start, end))
214222
validateCompactedBackupForTables(t, db, []string{"foo"}, "'nodelocal://1/backup/4'", start, end)
215223
})
216224

@@ -237,7 +245,9 @@ ARRAY['nodelocal://1/backup/%d'], 'LATEST', ''::BYTES, %d::DECIMAL, %d::DECIMAL
237245
),
238246
)
239247

240-
waitForSuccessfulJob(t, tc, startCompaction(5, start, end))
248+
var backupPath string
249+
db.QueryRow(t, "SHOW BACKUPS IN 'nodelocal://1/backup/5'").Scan(&backupPath)
250+
waitForSuccessfulJob(t, tc, startCompaction(5, backupPath, start, end))
241251
validateCompactedBackupForTables(t, db, []string{"foo"}, "'nodelocal://1/backup/5'", start, end)
242252
})
243253

@@ -263,18 +273,20 @@ ARRAY['nodelocal://1/backup/%d'], 'LATEST', ''::BYTES, %d::DECIMAL, %d::DECIMAL
263273
t,
264274
fmt.Sprintf(incBackupAostCmd+" WITH %s", 6, end, opts),
265275
)
276+
var backupPath string
277+
db.QueryRow(t, "SHOW BACKUPS IN 'nodelocal://1/backup/6'").Scan(&backupPath)
266278
var jobID jobspb.JobID
267279
db.QueryRow(
268280
t,
269281
fmt.Sprintf(
270282
`SELECT crdb_internal.backup_compaction(
271283
ARRAY['nodelocal://1/backup/6'],
272-
'LATEST',
284+
'%s',
273285
crdb_internal.json_to_pb(
274286
'cockroach.sql.jobs.jobspb.BackupEncryptionOptions',
275287
'{"mode": 0, "raw_passphrase": "correct-horse-battery-staple"}'
276288
), '%d', '%d')`,
277-
start, end,
289+
backupPath, start, end,
278290
),
279291
).Scan(&jobID)
280292
waitForSuccessfulJob(t, tc, jobID)
@@ -299,7 +311,9 @@ crdb_internal.json_to_pb(
299311
db.Exec(t, fmt.Sprintf(incBackupAostCmd, 7, end))
300312
db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.after.details_has_checkpoint'")
301313

302-
jobID := startCompaction(7, start, end)
314+
var backupPath string
315+
db.QueryRow(t, "SHOW BACKUPS IN 'nodelocal://1/backup/7'").Scan(&backupPath)
316+
jobID := startCompaction(7, backupPath, start, end)
303317
jobutils.WaitForJobToPause(t, db, jobID)
304318
db.Exec(t, "RESUME JOB $1", jobID)
305319
waitForSuccessfulJob(t, tc, jobID)
@@ -310,7 +324,7 @@ crdb_internal.json_to_pb(
310324
end = getTime(t)
311325
db.Exec(t, fmt.Sprintf(incBackupAostCmd, 7, end))
312326
db.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.after.details_has_checkpoint'")
313-
jobID = startCompaction(7, start, end)
327+
jobID = startCompaction(7, backupPath, start, end)
314328
jobutils.WaitForJobToPause(t, db, jobID)
315329
db.Exec(t, "CANCEL JOB $1", jobID)
316330
jobutils.WaitForJobToCancel(t, db, jobID)
@@ -330,12 +344,14 @@ crdb_internal.json_to_pb(
330344
db.Exec(t, "INSERT INTO foo VALUES (3, 3)")
331345
end := getTime(t)
332346
db.Exec(t, fmt.Sprintf(incBackupAostCmd+" WITH %s", 9, end, opts))
347+
var backupPath string
348+
db.QueryRow(t, "SHOW BACKUPS IN 'nodelocal://1/backup/9'").Scan(&backupPath)
333349
var jobID jobspb.JobID
334350
db.QueryRow(t, fmt.Sprintf(
335351
`SELECT crdb_internal.backup_compaction(
336352
'BACKUP INTO LATEST IN ''nodelocal://1/backup/9'' WITH encryption_passphrase = ''correct-horse-battery-staple''',
337-
%d::DECIMAL, %d::DECIMAL
338-
)`, start, end,
353+
'%s', %d::DECIMAL, %d::DECIMAL
354+
)`, backupPath, start, end,
339355
)).Scan(&jobID)
340356
waitForSuccessfulJob(t, tc, jobID)
341357
validateCompactedBackupForTablesWithOpts(
@@ -368,10 +384,12 @@ crdb_internal.json_to_pb(
368384
end := getTime(t)
369385
db.Exec(t, fmt.Sprintf(incBackupAostCmd, 11, end))
370386

371-
c1JobID := startCompaction(11, mid, end)
387+
var backupPath string
388+
db.QueryRow(t, "SHOW BACKUPS IN 'nodelocal://1/backup/11'").Scan(&backupPath)
389+
c1JobID := startCompaction(11, backupPath, mid, end)
372390
waitForSuccessfulJob(t, tc, c1JobID)
373391

374-
c2JobID := startCompaction(11, start, end)
392+
c2JobID := startCompaction(11, backupPath, start, end)
375393
waitForSuccessfulJob(t, tc, c2JobID)
376394
ensureBackupExists(t, db, "'nodelocal://1/backup/11'", mid, end, "")
377395
ensureBackupExists(t, db, "'nodelocal://1/backup/11'", start, end, "")
@@ -450,8 +468,11 @@ func TestBackupCompactionLocalityAware(t *testing.T) {
450468
t,
451469
fmt.Sprintf("BACKUP INTO LATEST IN (%s) AS OF SYSTEM TIME '%d'", collectionURIs, end),
452470
)
453-
compactionBuiltin := "SELECT crdb_internal.backup_compaction(ARRAY[%s], 'LATEST', '', %d::DECIMAL, %d::DECIMAL)"
454-
row := db.QueryRow(t, fmt.Sprintf(compactionBuiltin, collectionURIs, start, end))
471+
compactionBuiltin := "SELECT crdb_internal.backup_compaction(ARRAY[%s], '%s', '', %d::DECIMAL, %d::DECIMAL)"
472+
473+
var backupPath string
474+
db.QueryRow(t, "SHOW BACKUPS IN 'nodelocal://1/backup'").Scan(&backupPath)
475+
row := db.QueryRow(t, fmt.Sprintf(compactionBuiltin, collectionURIs, backupPath, start, end))
455476
var jobID jobspb.JobID
456477
row.Scan(&jobID)
457478
waitForSuccessfulJob(t, tc, jobID)
@@ -462,13 +483,34 @@ func TestScheduledBackupCompaction(t *testing.T) {
462483
defer leaktest.AfterTest(t)()
463484
defer log.Scope(t).Close(t)
464485

465-
skip.WithIssue(t, 143543, "flaky test")
466-
467486
ctx := context.Background()
468-
th, cleanup := newTestHelper(t)
487+
var blockCompaction chan struct{}
488+
var testKnobs []func(*base.TestingKnobs)
489+
if rand.Intn(2) == 0 {
490+
// Artificially force a scheduled full backup to also execute before the
491+
// compaction job resolves its destination, which will cause the LATEST
492+
// file to be updated. This is to ensure that the compaction job correctly
493+
// identifies the full backup it belongs to and does not attempt to chain
494+
// off of the second full backup.
495+
blockCompaction = make(chan struct{})
496+
defer close(blockCompaction)
497+
testKnobs = append(testKnobs, func(testKnobs *base.TestingKnobs) {
498+
testKnobs.BackupRestore = &sql.BackupRestoreTestingKnobs{
499+
RunBeforeResolvingCompactionDest: func() error {
500+
<-blockCompaction
501+
return nil
502+
},
503+
}
504+
})
505+
}
506+
th, cleanup := newTestHelper(t, testKnobs...)
469507
defer cleanup()
470508

471509
th.setOverrideAsOfClauseKnob(t)
510+
// Time is set to a time such that no full backup will unexpectedly run as we
511+
// artificially time travel. This ensures deterministic behavior that is not
512+
// impacted by when the test runs.
513+
th.env.SetTime(time.Date(2025, 3, 27, 1, 0, 0, 0, time.UTC))
472514

473515
th.sqlDB.Exec(t, "SET CLUSTER SETTING backup.compaction.threshold = 3")
474516
th.sqlDB.Exec(t, "SET CLUSTER SETTING backup.compaction.window_size = 2")
@@ -486,6 +528,8 @@ func TestScheduledBackupCompaction(t *testing.T) {
486528
th.env.SetTime(full.NextRun().Add(time.Second))
487529
require.NoError(t, th.executeSchedules())
488530
th.waitForSuccessfulScheduledJob(t, full.ScheduleID())
531+
var backupPath string
532+
th.sqlDB.QueryRow(t, "SHOW BACKUPS IN 'nodelocal://1/backup'").Scan(&backupPath)
489533

490534
inc, err = jobs.ScheduledJobDB(th.internalDB()).
491535
Load(context.Background(), th.env, inc.ScheduleID())
@@ -503,6 +547,21 @@ func TestScheduledBackupCompaction(t *testing.T) {
503547
require.NoError(t, th.executeSchedules())
504548
th.waitForSuccessfulScheduledJob(t, inc.ScheduleID())
505549

550+
if blockCompaction != nil {
551+
t.Log("executing second full backup before compaction job resolves destination")
552+
// Instead of fast forwarding to the full backup's next run, which can result in additional
553+
// incrementeals being triggered by `executeSchedules`, we update the full's next run to the
554+
// current time.
555+
th.sqlDB.QueryStr(t, fmt.Sprintf("ALTER BACKUP SCHEDULE %d EXECUTE FULL IMMEDIATELY", full.ScheduleID()))
556+
full, err = jobs.ScheduledJobDB(th.internalDB()).
557+
Load(context.Background(), th.env, full.ScheduleID())
558+
require.NoError(t, err)
559+
th.env.SetTime(full.NextRun().Add(time.Second))
560+
require.NoError(t, th.executeSchedules())
561+
th.waitForSuccessfulScheduledJob(t, full.ScheduleID())
562+
blockCompaction <- struct{}{}
563+
}
564+
506565
var jobID jobspb.JobID
507566
// The scheduler is notified of the backup job completion and then the
508567
// compaction job is created in a separate transaction. As such, we need to
@@ -527,8 +586,8 @@ func TestScheduledBackupCompaction(t *testing.T) {
527586
var numBackups int
528587
th.sqlDB.QueryRow(
529588
t,
530-
"SELECT count(DISTINCT (start_time, end_time)) FROM "+
531-
"[SHOW BACKUP FROM LATEST IN 'nodelocal://1/backup']",
589+
fmt.Sprintf("SELECT count(DISTINCT (start_time, end_time)) FROM "+
590+
"[SHOW BACKUP FROM '%s' IN 'nodelocal://1/backup']", backupPath),
532591
).Scan(&numBackups)
533592
require.Equal(t, 4, numBackups)
534593
}

pkg/backup/create_scheduled_backup_test.go

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (th *testHelper) protectedTimestamps() protectedts.Manager {
6767

6868
// newTestHelper creates and initializes appropriate state for a test,
6969
// returning testHelper as well as a cleanup function.
70-
func newTestHelper(t *testing.T) (*testHelper, func()) {
70+
func newTestHelper(t *testing.T, testKnobs ...func(*base.TestingKnobs)) (*testHelper, func()) {
7171
dir, dirCleanupFn := testutils.TempDir(t)
7272

7373
th := &testHelper{
@@ -76,18 +76,23 @@ func newTestHelper(t *testing.T) (*testHelper, func()) {
7676
iodir: dir,
7777
}
7878

79-
knobs := &jobs.TestingKnobs{
80-
JobSchedulerEnv: th.env,
81-
TakeOverJobsScheduling: func(fn execSchedulesFn) {
82-
th.executeSchedules = func() error {
83-
defer th.server.JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue()
84-
return fn(context.Background(), allSchedules)
85-
}
86-
},
87-
CaptureJobExecutionConfig: func(config *scheduledjobs.JobExecutionConfig) {
88-
th.cfg = config
79+
knobs := base.TestingKnobs{
80+
JobsTestingKnobs: &jobs.TestingKnobs{
81+
JobSchedulerEnv: th.env,
82+
TakeOverJobsScheduling: func(fn execSchedulesFn) {
83+
th.executeSchedules = func() error {
84+
defer th.server.JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue()
85+
return fn(context.Background(), allSchedules)
86+
}
87+
},
88+
CaptureJobExecutionConfig: func(config *scheduledjobs.JobExecutionConfig) {
89+
th.cfg = config
90+
},
91+
IntervalOverrides: jobs.NewTestingKnobsWithShortIntervals().IntervalOverrides,
8992
},
90-
IntervalOverrides: jobs.NewTestingKnobsWithShortIntervals().IntervalOverrides,
93+
}
94+
for _, testKnob := range testKnobs {
95+
testKnob(&knobs)
9196
}
9297

9398
args := base.TestServerArgs{
@@ -97,9 +102,7 @@ func newTestHelper(t *testing.T) (*testHelper, func()) {
97102
// Some scheduled backup tests fail when run within a tenant. More
98103
// investigation is required. Tracked with #76378.
99104
DefaultTestTenant: base.TODOTestTenantDisabled,
100-
Knobs: base.TestingKnobs{
101-
JobsTestingKnobs: knobs,
102-
},
105+
Knobs: knobs,
103106
}
104107
jobs.PollJobsMetricsInterval.Override(context.Background(), &args.Settings.SV, 250*time.Millisecond)
105108
s, db, _ := serverutils.StartServer(t, args)

pkg/sql/exec_util.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1910,6 +1910,8 @@ type BackupRestoreTestingKnobs struct {
19101910
RunAfterRetryIteration func(err error) error
19111911

19121912
RunAfterRestoreProcDrains func()
1913+
1914+
RunBeforeResolvingCompactionDest func() error
19131915
}
19141916

19151917
var _ base.ModuleTestingKnobs = &BackupRestoreTestingKnobs{}

0 commit comments

Comments
 (0)