Skip to content

Commit 71bcba2

Browse files
craig[bot]kev-cao
andcommitted
Merge #149378
149378: roachtest: teach OR roachtests to validate fingerprints r=jeffswenson a=kev-cao #148967 added fingerprint validation to classic restore, this is a follow up commit to also update our online restore roachtests to also perform fingerprinting. Epic: CRDB-51394 Fixes: #148966 Release note: None Co-authored-by: Kevin Cao <[email protected]>
2 parents 9ca435e + 246e948 commit 71bcba2

File tree

1 file changed

+136
-79
lines changed

1 file changed

+136
-79
lines changed

pkg/cmd/roachtest/tests/online_restore.go

Lines changed: 136 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
1919
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
2020
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
21+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
2122
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
2223
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
2324
"github.com/cockroachdb/cockroach/pkg/jobs"
@@ -123,9 +124,10 @@ func registerOnlineRestorePerf(r registry.Registry) {
123124
cloud: spec.GCE,
124125
fixture: SmallFixture,
125126
},
126-
fullBackupOnly: true,
127-
timeout: 1 * time.Hour,
128-
suites: registry.Suites(registry.Nightly),
127+
fullBackupOnly: true,
128+
skipFingerprint: true,
129+
timeout: 1 * time.Hour,
130+
suites: registry.Suites(registry.Nightly),
129131
},
130132
workload: tpccRestore{
131133
opts: tpccRunOpts{waitFraction: 0, workers: 100, maxRate: 300},
@@ -162,9 +164,10 @@ func registerOnlineRestorePerf(r registry.Registry) {
162164
cloud: spec.GCE,
163165
fixture: MediumFixture,
164166
},
165-
timeout: 3 * time.Hour,
166-
suites: registry.Suites(registry.Nightly),
167-
fullBackupOnly: true,
167+
timeout: 3 * time.Hour,
168+
suites: registry.Suites(registry.Nightly),
169+
fullBackupOnly: true,
170+
skipFingerprint: true,
168171
},
169172
workload: tpccRestore{
170173
opts: tpccRunOpts{waitFraction: 0, workers: 100, maxRate: 1000},
@@ -181,9 +184,10 @@ func registerOnlineRestorePerf(r registry.Registry) {
181184
cloud: spec.GCE,
182185
fixture: MediumFixture,
183186
},
184-
timeout: 3 * time.Hour,
185-
suites: registry.Suites(registry.Nightly),
186-
fullBackupOnly: true,
187+
timeout: 3 * time.Hour,
188+
suites: registry.Suites(registry.Nightly),
189+
fullBackupOnly: true,
190+
skipFingerprint: true,
187191
},
188192
workload: tpccRestore{
189193
opts: tpccRunOpts{waitFraction: 0, workers: 100, maxRate: 1000},
@@ -201,9 +205,10 @@ func registerOnlineRestorePerf(r registry.Registry) {
201205
cloud: spec.AWS,
202206
fixture: MediumFixture,
203207
},
204-
timeout: 3 * time.Hour,
205-
suites: registry.Suites(registry.Nightly),
206-
fullBackupOnly: true,
208+
timeout: 3 * time.Hour,
209+
suites: registry.Suites(registry.Nightly),
210+
fullBackupOnly: true,
211+
skipFingerprint: true,
207212
},
208213
workload: tpccRestore{
209214
opts: tpccRunOpts{waitFraction: 0, workers: 100, maxRate: 1000},
@@ -216,10 +221,6 @@ func registerOnlineRestorePerf(r registry.Registry) {
216221
for _, runOnline := range []bool{true, false} {
217222
for _, useWorkarounds := range []bool{true, false} {
218223
for _, runWorkload := range []bool{true, false} {
219-
sp := sp
220-
runOnline := runOnline
221-
runWorkload := runWorkload
222-
useWorkarounds := useWorkarounds
223224
clusterSettings := []string{
224225
// TODO(dt): what's the right value for this? How do we tune this
225226
// on the fly automatically during the restore instead of by-hand?
@@ -247,7 +248,7 @@ func registerOnlineRestorePerf(r registry.Registry) {
247248
sp.namePrefix = "offline/"
248249
sp.skip = "used for ad hoc experiments"
249250
}
250-
if !runWorkload {
251+
if !runWorkload && sp.skipFingerprint {
251252
sp.skip = "used for ad hoc experiments"
252253
}
253254

@@ -275,6 +276,11 @@ func registerOnlineRestorePerf(r registry.Registry) {
275276
sp.skip = "online restore is only tested on development branch"
276277
}
277278

279+
// For the sake of simplicity, we only fingerprint when no active
280+
// workload is running during the download phase so that we do not
281+
// need to account for changes to the database after the restore.
282+
doFingerprint := !sp.skipFingerprint && runOnline && !runWorkload
283+
278284
sp.initTestName()
279285
r.Add(registry.TestSpec{
280286
Name: sp.testName,
@@ -298,6 +304,9 @@ func registerOnlineRestorePerf(r registry.Registry) {
298304
restoreStats := runRestore(
299305
ctx, t, c, sp, rd, runOnline, runWorkload, clusterSettings...,
300306
)
307+
if doFingerprint {
308+
rd.maybeValidateFingerprint(ctx)
309+
}
301310
if runOnline {
302311
require.NoError(t, postRestoreValidation(
303312
ctx,
@@ -677,96 +686,144 @@ func runRestore(
677686
statsCollector, err := createStatCollector(ctx, rd)
678687
require.NoError(t, err)
679688

680-
m := c.NewDeprecatedMonitor(ctx, sp.hardware.getCRDBNodes())
681-
var restoreStartTime, restoreEndTime time.Time
682-
m.Go(func(ctx context.Context) error {
683-
db, err := rd.c.ConnE(ctx, rd.t.L(), rd.c.Node(1)[0])
684-
if err != nil {
685-
return err
686-
}
687-
defer db.Close()
688-
for _, setting := range clusterSettings {
689-
if _, err := db.Exec(fmt.Sprintf("SET CLUSTER SETTING %s", setting)); err != nil {
690-
return errors.Wrapf(err, "failed to set cluster setting %s", setting)
691-
}
692-
}
693-
opts := "WITH UNSAFE_RESTORE_INCOMPATIBLE_VERSION"
694-
if runOnline {
695-
opts = "WITH EXPERIMENTAL DEFERRED COPY, UNSAFE_RESTORE_INCOMPATIBLE_VERSION"
696-
}
697-
if err := maybeAddSomeEmptyTables(ctx, rd); err != nil {
698-
return errors.Wrapf(err, "failed to add some empty tables")
699-
}
700-
restoreStartTime = timeutil.Now()
701-
restoreCmd := rd.restoreCmd(ctx, fmt.Sprintf("DATABASE %s", sp.backup.fixture.DatabaseName()), opts)
702-
t.L().Printf("Running %s", restoreCmd)
703-
if _, err = db.ExecContext(ctx, restoreCmd); err != nil {
704-
return err
705-
}
706-
if runOnline && sp.linkPhaseTimeout != 0 && sp.linkPhaseTimeout < timeutil.Since(restoreStartTime) {
707-
return errors.Newf("link phase took too long: %s greater than timeout %s", timeutil.Since(restoreStartTime), sp.linkPhaseTimeout)
689+
restoreStartTime, restoreEndTime, err := executeTestRestorePhase(
690+
ctx, t, c, sp, rd, runOnline, clusterSettings...,
691+
)
692+
require.NoError(t, err, "failed to execute restore phase")
693+
694+
downloadEndTimeLowerBound, workloadStartTime, workloadEndTime, err := executeTestDownloadPhase(
695+
ctx, t, c, sp, rd, runOnline, runWorkload, testStartTime,
696+
)
697+
require.NoError(t, err, "failed to execute download phase")
698+
699+
return restoreStats{
700+
collector: statsCollector,
701+
restoreStartTime: restoreStartTime,
702+
restoreEndTime: restoreEndTime,
703+
workloadStartTime: workloadStartTime,
704+
workloadEndTime: workloadEndTime,
705+
downloadEndTimeLowerBound: downloadEndTimeLowerBound,
706+
}
707+
}
708+
709+
// executeTestRestorePhase executes the restore phase of the online restore
710+
// roachtests. If `runOnline` is not set, a conventional restore is run instead.
711+
// The start time and end time of the online restore link phase are returned (or
712+
// in the case of conventional restore, the start and end time of the entire
713+
// restore job).
714+
func executeTestRestorePhase(
715+
ctx context.Context,
716+
t test.Test,
717+
c cluster.Cluster,
718+
sp onlineRestoreSpecs,
719+
rd restoreDriver,
720+
runOnline bool,
721+
clusterSettings ...string,
722+
) (time.Time, time.Time, error) {
723+
db, err := rd.c.ConnE(ctx, t.L(), rd.c.Node(1)[0])
724+
if err != nil {
725+
return time.Time{}, time.Time{}, err
726+
}
727+
defer db.Close()
728+
for _, setting := range clusterSettings {
729+
if _, err := db.Exec(fmt.Sprintf("SET CLUSTER SETTING %s", setting)); err != nil {
730+
return time.Time{}, time.Time{}, errors.Wrapf(err, "failed to set cluster setting %s", setting)
708731
}
709-
return nil
710-
})
711-
m.Wait()
712-
restoreEndTime = timeutil.Now()
732+
}
733+
opts := "WITH UNSAFE_RESTORE_INCOMPATIBLE_VERSION"
734+
if runOnline {
735+
opts = "WITH EXPERIMENTAL DEFERRED COPY, UNSAFE_RESTORE_INCOMPATIBLE_VERSION"
736+
}
737+
if err := maybeAddSomeEmptyTables(ctx, rd); err != nil {
738+
return time.Time{}, time.Time{}, errors.Wrapf(err, "failed to add some empty tables")
739+
}
740+
restoreStartTime := timeutil.Now()
741+
restoreCmd := rd.restoreCmd(ctx, fmt.Sprintf("DATABASE %s", sp.backup.fixture.DatabaseName()), opts)
742+
if _, err = db.ExecContext(ctx, restoreCmd); err != nil {
743+
return time.Time{}, time.Time{}, err
744+
}
745+
restoreEndTime := timeutil.Now()
746+
if runOnline && sp.linkPhaseTimeout != 0 && sp.linkPhaseTimeout < restoreEndTime.Sub(restoreStartTime) {
747+
return restoreStartTime, restoreEndTime, errors.Newf(
748+
"link phase took too long: %s greater than timeout %s",
749+
timeutil.Since(restoreStartTime), sp.linkPhaseTimeout,
750+
)
751+
}
752+
return restoreStartTime, restoreEndTime, err
753+
}
713754

714-
workloadCtx, workloadCancel := context.WithCancel(ctx)
715-
mDownload := c.NewDeprecatedMonitor(workloadCtx, sp.hardware.getCRDBNodes())
755+
// executeTestDownloadPhase executes the download phase of the online restore
756+
// roachtest. `runWorkload` indicates whether a workload should be running
757+
// during the download phase. If `runOnline` is not set, no wait for the
758+
// download phase is performed, but the workload is still run for 5 minutes (or
759+
// the remaining time in the test, whichever is shorter).
760+
// The lower bound of the download job end time is returned, along with the
761+
// start and end time of the workload, if it was run.
762+
func executeTestDownloadPhase(
763+
ctx context.Context,
764+
t test.Test,
765+
c cluster.Cluster,
766+
sp onlineRestoreSpecs,
767+
rd restoreDriver,
768+
runOnline bool,
769+
runWorkload bool,
770+
testStartTime time.Time,
771+
) (time.Time, time.Time, time.Time, error) {
772+
mon := t.NewErrorGroup(task.Logger(t.L()))
716773

717774
var workloadStartTime, workloadEndTime time.Time
718-
mDownload.Go(func(ctx context.Context) error {
775+
workloadCancel := mon.GoWithCancel(func(ctx context.Context, logger *logger.Logger) error {
719776
if !runWorkload {
720-
fmt.Printf("roachtest configured to skip running the foreground workload")
777+
logger.Printf("roachtest configured to skip running the foreground workload")
721778
return nil
722779
}
723780
workloadStartTime = timeutil.Now()
724-
err := sp.workload.Run(ctx, t, c, sp.hardware)
725781
// We expect the workload to return a context cancelled error because
726782
// the roachtest driver cancels the monitor's context after the download job completes
727-
if err != nil && ctx.Err() == nil {
783+
if err := sp.workload.Run(ctx, t, c, sp.hardware); err != nil && ctx.Err() == nil {
728784
// Implies the workload context was not cancelled and the workload cmd returned a
729785
// different error.
730786
return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`)
731787
}
732-
rd.t.L().Printf("workload successfully finished")
788+
logger.Printf("workload successfully finished")
733789
return nil
734790
})
791+
735792
var downloadEndTimeLowerBound time.Time
736-
mDownload.Go(func(ctx context.Context) error {
793+
downloadStartTime := timeutil.Now()
794+
mon.Go(func(ctx context.Context, logger *logger.Logger) error {
737795
defer workloadCancel()
738796
if runOnline {
739-
downloadEndTimeLowerBound, err = waitForDownloadJob(ctx, c, t.L())
740-
if err != nil {
797+
var err error
798+
if downloadEndTimeLowerBound, err = waitForDownloadJob(ctx, c, logger); err != nil {
741799
return err
742800
}
743-
if sp.downloadPhaseTimeout != 0 && sp.downloadPhaseTimeout < timeutil.Since(restoreEndTime) {
744-
return errors.Newf("download phase took too long: %s greater than timeout %s", timeutil.Since(restoreEndTime), sp.downloadPhaseTimeout)
801+
downloadTime := downloadEndTimeLowerBound.Sub(downloadStartTime)
802+
if sp.downloadPhaseTimeout != 0 && sp.downloadPhaseTimeout < downloadTime {
803+
return errors.Newf(
804+
"download phase took too long: %s greater than timeout %s",
805+
downloadTime, sp.downloadPhaseTimeout,
806+
)
745807
}
746808
}
809+
747810
if runWorkload {
748-
// Run the workload for at most 5 minutes.
749-
testRuntime := timeutil.Since(testStartTime)
750-
workloadDuration := sp.timeout - (testRuntime + time.Minute)
811+
// Remaining workload duration is capped by the test timeout
812+
testRunTime := timeutil.Since(testStartTime)
813+
testTimeoutRemaining := sp.timeout - (testRunTime + time.Minute)
814+
815+
// Run the workload for at most 5 more minutes.
751816
maxWorkloadDuration := time.Minute * 5
752-
if workloadDuration > maxWorkloadDuration {
753-
workloadDuration = maxWorkloadDuration
754-
}
755-
t.L().Printf("let workload run for another %.2f minutes", workloadDuration.Minutes())
817+
818+
workloadDuration := min(testTimeoutRemaining, maxWorkloadDuration)
819+
logger.Printf("let workload run for another %.2f minutes", workloadDuration.Minutes())
756820
time.Sleep(workloadDuration)
757821
}
758822
return nil
759823
})
760-
mDownload.Wait()
761-
if runWorkload {
762-
workloadEndTime = timeutil.Now()
763-
}
764-
return restoreStats{
765-
collector: statsCollector,
766-
restoreStartTime: restoreStartTime,
767-
restoreEndTime: restoreEndTime,
768-
workloadStartTime: workloadStartTime,
769-
workloadEndTime: workloadEndTime,
770-
downloadEndTimeLowerBound: downloadEndTimeLowerBound,
824+
825+
if err := mon.WaitE(); err != nil {
826+
return time.Time{}, time.Time{}, time.Time{}, err
771827
}
828+
return downloadEndTimeLowerBound, workloadStartTime, workloadEndTime, nil
772829
}

0 commit comments

Comments
 (0)