Skip to content

Commit 87ba01d

Browse files
committed
backup: checkpoint after online restore link phase
This patch will prevent online restore from rerunning the link flow if it has already completed. This will be useful for restore with experimental copy, which runs the download phase in the same job as the link phase. At some point in the future, we could add finer grain checkpointing to the link phase, but that would require more work and thought. Epic: none Release note: none
1 parent e1a9fa7 commit 87ba01d

File tree

3 files changed

+80
-10
lines changed

3 files changed

+80
-10
lines changed

pkg/backup/restore_job.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,11 +297,27 @@ func restore(
297297
return emptyRowCount, nil
298298
}
299299

300+
job := resumer.job
301+
details := job.Details().(jobspb.RestoreDetails)
302+
303+
if details.OnlineImpl() {
304+
var linkPhaseComplete bool
305+
if err := execCtx.ExecCfg().InternalDB.Txn(restoreCtx, func(ctx context.Context, txn isql.Txn) error {
306+
jobInfo := jobs.InfoStorageForJob(txn, resumer.job.ID())
307+
_, ok, err := jobInfo.Get(ctx, "get-link-complete-key", linkCompleteKey)
308+
linkPhaseComplete = ok
309+
return err
310+
}); err != nil {
311+
log.Warningf(restoreCtx, "failed to get checkpoint for link phase %v", err)
312+
}
313+
if linkPhaseComplete {
314+
return emptyRowCount, nil
315+
}
316+
}
317+
300318
// If we've already migrated some of the system tables we're about to
301319
// restore, this implies that a previous attempt restored all of this data.
302320
// We want to avoid restoring again since we'll be shadowing migrated keys.
303-
job := resumer.job
304-
details := job.Details().(jobspb.RestoreDetails)
305321
if alreadyMigrated := checkForMigratedData(details, dataToRestore); alreadyMigrated {
306322
return emptyRowCount, nil
307323
}
@@ -1982,6 +1998,16 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
19821998
if err := insertStats(ctx, r.job, p.ExecCfg(), remappedStats); err != nil {
19831999
return errors.Wrap(err, "inserting table statistics")
19842000
}
2001+
2002+
if details.OnlineImpl() {
2003+
if err := p.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
2004+
jobInfo := jobs.InfoStorageForJob(txn, r.job.ID())
2005+
return jobInfo.Write(ctx, linkCompleteKey, []byte{})
2006+
}); err != nil {
2007+
log.Warningf(ctx, "failed to checkpoint link flow %v", err)
2008+
}
2009+
}
2010+
19852011
if details.ExperimentalCopy {
19862012
downloadSpans, err := getDownloadSpans(p.ExecCfg().Codec, preData, mainData)
19872013
if err != nil {

pkg/backup/restore_online.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ var onlineRestoreLayerLimit = settings.RegisterIntSetting(
6161
settings.WithVisibility(settings.Reserved),
6262
)
6363

64+
const linkCompleteKey = "link_complete"
65+
6466
// splitAndScatter runs through all entries produced by genSpans splitting and
6567
// scattering the key-space designated by the passed rewriter such that if all
6668
// files in the entries in those spans were ingested the amount ingested between

pkg/backup/restore_online_test.go

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package backup
88
import (
99
"context"
1010
"fmt"
11+
"math/rand"
1112
"reflect"
1213
"strings"
1314
"testing"
@@ -26,11 +27,27 @@ import (
2627
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
2728
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2829
"github.com/cockroachdb/cockroach/pkg/util/log"
30+
"github.com/cockroachdb/cockroach/pkg/util/randutil"
2931
"github.com/cockroachdb/cockroach/pkg/util/stop"
3032
"github.com/kr/pretty"
3133
"github.com/stretchr/testify/require"
3234
)
3335

36+
var orParams = base.TestClusterArgs{
37+
// Online restore is not supported in a secondary tenant yet.
38+
ServerArgs: base.TestServerArgs{
39+
DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant,
40+
},
41+
}
42+
43+
func onlineImpl(rng *rand.Rand) string {
44+
opt := "EXPERIMENTAL DEFERRED COPY"
45+
if rng.Intn(2) == 0 {
46+
opt = "EXPERIMENTAL COPY"
47+
}
48+
return opt
49+
}
50+
3451
func TestOnlineRestoreBasic(t *testing.T) {
3552
defer leaktest.AfterTest(t)()
3653
defer log.Scope(t).Close(t)
@@ -40,16 +57,11 @@ func TestOnlineRestoreBasic(t *testing.T) {
4057
ctx := context.Background()
4158

4259
const numAccounts = 1000
43-
params := base.TestClusterArgs{
44-
// Online restore is not supported in a secondary tenant yet.
45-
ServerArgs: base.TestServerArgs{
46-
DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant,
47-
},
48-
}
49-
tc, sqlDB, dir, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params)
60+
61+
tc, sqlDB, dir, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, orParams)
5062
defer cleanupFn()
5163

52-
rtc, rSQLDB, cleanupFnRestored := backupRestoreTestSetupEmpty(t, 1, dir, InitManualReplication, params)
64+
rtc, rSQLDB, cleanupFnRestored := backupRestoreTestSetupEmpty(t, 1, dir, InitManualReplication, orParams)
5365
defer cleanupFnRestored()
5466

5567
externalStorage := "nodelocal://1/backup"
@@ -118,6 +130,36 @@ func TestOnlineRestorePartitioned(t *testing.T) {
118130
sqlDB.Exec(t, fmt.Sprintf(`SHOW JOB WHEN COMPLETE %s`, j[0][4]))
119131
}
120132

133+
func TestOnlineRestoreLinkCheckpoint(t *testing.T) {
134+
defer leaktest.AfterTest(t)()
135+
defer log.Scope(t).Close(t)
136+
defer nodelocal.ReplaceNodeLocalForTesting(t.TempDir())()
137+
138+
rng, _ := randutil.NewTestRand()
139+
140+
const numAccounts = 10
141+
_, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(
142+
t,
143+
singleNode,
144+
numAccounts,
145+
InitManualReplication,
146+
orParams,
147+
)
148+
defer cleanupFn()
149+
sqlDB.Exec(t, "BACKUP DATABASE data INTO $1", localFoo)
150+
sqlDB.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.before_publishing_descriptors'")
151+
var jobID jobspb.JobID
152+
stmt := fmt.Sprintf("RESTORE DATABASE data FROM LATEST IN $1 WITH OPTIONS (new_db_name='data2', %s, detached)", onlineImpl(rng))
153+
sqlDB.QueryRow(t, stmt, localFoo).Scan(&jobID)
154+
jobutils.WaitForJobToPause(t, sqlDB, jobID)
155+
156+
// Set a pauspoint during the link phase which should not get hit because of
157+
// checkpointing.
158+
sqlDB.Exec(t, "SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.before_link'")
159+
sqlDB.Exec(t, "RESUME JOB $1", jobID)
160+
jobutils.WaitForJobToSucceed(t, sqlDB, jobID)
161+
}
162+
121163
func TestOnlineRestoreStatementResult(t *testing.T) {
122164
defer leaktest.AfterTest(t)()
123165
defer log.Scope(t).Close(t)

0 commit comments

Comments
 (0)