Skip to content

Commit ad1e2b9

Browse files
committed
roachtest: refactor online restore roachtest into helper functions
This commit cleans up the `runRestore` function for the online restore roachtests by moving the restore driver logic into separate functions. It also updates the monitor logic and moves away from the deprecated monitor to the new `test.Go`.
1 parent 77e13d8 commit ad1e2b9

File tree

1 file changed

+112
-62
lines changed

1 file changed

+112
-62
lines changed

pkg/cmd/roachtest/tests/online_restore.go

Lines changed: 112 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
1919
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
2020
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
21+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/task"
2122
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
2223
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
2324
"github.com/cockroachdb/cockroach/pkg/jobs"
@@ -677,96 +678,145 @@ func runRestore(
677678
statsCollector, err := createStatCollector(ctx, rd)
678679
require.NoError(t, err)
679680

680-
m := c.NewDeprecatedMonitor(ctx, sp.hardware.getCRDBNodes())
681-
var restoreStartTime, restoreEndTime time.Time
682-
m.Go(func(ctx context.Context) error {
683-
db, err := rd.c.ConnE(ctx, rd.t.L(), rd.c.Node(1)[0])
684-
if err != nil {
685-
return err
686-
}
687-
defer db.Close()
688-
for _, setting := range clusterSettings {
689-
if _, err := db.Exec(fmt.Sprintf("SET CLUSTER SETTING %s", setting)); err != nil {
690-
return errors.Wrapf(err, "failed to set cluster setting %s", setting)
691-
}
692-
}
693-
opts := "WITH UNSAFE_RESTORE_INCOMPATIBLE_VERSION"
694-
if runOnline {
695-
opts = "WITH EXPERIMENTAL DEFERRED COPY, UNSAFE_RESTORE_INCOMPATIBLE_VERSION"
696-
}
697-
if err := maybeAddSomeEmptyTables(ctx, rd); err != nil {
698-
return errors.Wrapf(err, "failed to add some empty tables")
699-
}
700-
restoreStartTime = timeutil.Now()
701-
restoreCmd := rd.restoreCmd(ctx, fmt.Sprintf("DATABASE %s", sp.backup.fixture.DatabaseName()), opts)
702-
t.L().Printf("Running %s", restoreCmd)
703-
if _, err = db.ExecContext(ctx, restoreCmd); err != nil {
704-
return err
705-
}
706-
if runOnline && sp.linkPhaseTimeout != 0 && sp.linkPhaseTimeout < timeutil.Since(restoreStartTime) {
707-
return errors.Newf("link phase took too long: %s greater than timeout %s", timeutil.Since(restoreStartTime), sp.linkPhaseTimeout)
681+
restoreStartTime, restoreEndTime, err := executeTestRestorePhase(
682+
ctx, t, c, sp, rd, runOnline, clusterSettings...,
683+
)
684+
require.NoError(t, err, "failed to execute restore phase")
685+
686+
downloadEndTimeLowerBound, workloadStartTime, workloadEndTime, err := executeTestDownloadPhase(
687+
ctx, t, c, sp, rd, runOnline, runWorkload, testStartTime,
688+
)
689+
require.NoError(t, err, "failed to execute download phase")
690+
691+
return restoreStats{
692+
collector: statsCollector,
693+
restoreStartTime: restoreStartTime,
694+
restoreEndTime: restoreEndTime,
695+
workloadStartTime: workloadStartTime,
696+
workloadEndTime: workloadEndTime,
697+
downloadEndTimeLowerBound: downloadEndTimeLowerBound,
698+
}
699+
}
700+
701+
// executeTestRestorePhase executes the restore phase of the online restore
702+
// roachtests. If `runOnline` is not set, a conventional restore is run instead.
703+
// The start time and end time of the online restore link phase are returned (or
704+
// in the case of conventional restore, the start and end time of the entire
705+
// restore job).
706+
func executeTestRestorePhase(
707+
ctx context.Context,
708+
t test.Test,
709+
c cluster.Cluster,
710+
sp onlineRestoreSpecs,
711+
rd restoreDriver,
712+
runOnline bool,
713+
clusterSettings ...string,
714+
) (time.Time, time.Time, error) {
715+
db, err := rd.c.ConnE(ctx, t.L(), rd.c.Node(1)[0])
716+
if err != nil {
717+
return time.Time{}, time.Time{}, err
718+
}
719+
defer db.Close()
720+
for _, setting := range clusterSettings {
721+
if _, err := db.Exec(fmt.Sprintf("SET CLUSTER SETTING %s", setting)); err != nil {
722+
return time.Time{}, time.Time{}, errors.Wrapf(err, "failed to set cluster setting %s", setting)
708723
}
709-
return nil
710-
})
711-
m.Wait()
712-
restoreEndTime = timeutil.Now()
724+
}
725+
opts := "WITH UNSAFE_RESTORE_INCOMPATIBLE_VERSION"
726+
if runOnline {
727+
opts = "WITH EXPERIMENTAL DEFERRED COPY, UNSAFE_RESTORE_INCOMPATIBLE_VERSION"
728+
}
729+
if err := maybeAddSomeEmptyTables(ctx, rd); err != nil {
730+
return time.Time{}, time.Time{}, errors.Wrapf(err, "failed to add some empty tables")
731+
}
732+
restoreStartTime := timeutil.Now()
733+
restoreCmd := rd.restoreCmd(ctx, fmt.Sprintf("DATABASE %s", sp.backup.fixture.DatabaseName()), opts)
734+
t.L().Printf("Running %s", restoreCmd)
735+
if _, err = db.ExecContext(ctx, restoreCmd); err != nil {
736+
return time.Time{}, time.Time{}, err
737+
}
738+
restoreEndTime := time.Now()
739+
if runOnline && sp.linkPhaseTimeout != 0 && sp.linkPhaseTimeout < restoreEndTime.Sub(restoreStartTime) {
740+
return restoreStartTime, restoreEndTime, errors.Newf(
741+
"link phase took too long: %s greater than timeout %s",
742+
timeutil.Since(restoreStartTime), sp.linkPhaseTimeout,
743+
)
744+
}
745+
return restoreStartTime, restoreEndTime, err
746+
}
713747

714-
workloadCtx, workloadCancel := context.WithCancel(ctx)
715-
mDownload := c.NewDeprecatedMonitor(workloadCtx, sp.hardware.getCRDBNodes())
748+
// executeTestDownloadPhase executes the download phase of the online restore
749+
// roachtest. `runWorkload` indicates whether a workload should be running
750+
// during the download phase. If `runOnline` is not set, no wait for the
751+
// download phase is performed, but the workload is still run for 5 minutes (or
752+
// the remaining time in the test, whichever is shorter).
753+
// The lower bound of the download job end time is returned, along with the
754+
// start and end time of the workload, if it was run.
755+
func executeTestDownloadPhase(
756+
ctx context.Context,
757+
t test.Test,
758+
c cluster.Cluster,
759+
sp onlineRestoreSpecs,
760+
rd restoreDriver,
761+
runOnline bool,
762+
runWorkload bool,
763+
testStartTime time.Time,
764+
) (time.Time, time.Time, time.Time, error) {
765+
mon := t.NewErrorGroup(task.Logger(t.L()))
716766

717767
var workloadStartTime, workloadEndTime time.Time
718-
mDownload.Go(func(ctx context.Context) error {
768+
workloadCancel := mon.GoWithCancel(func(ctx context.Context, logger *logger.Logger) error {
719769
if !runWorkload {
720-
fmt.Printf("roachtest configured to skip running the foreground workload")
770+
logger.Printf("roachtest configured to skip running the foreground workload")
721771
return nil
722772
}
723773
workloadStartTime = timeutil.Now()
724-
err := sp.workload.Run(ctx, t, c, sp.hardware)
725774
// We expect the workload to return a context cancelled error because
726775
// the roachtest driver cancels the monitor's context after the download job completes
727-
if err != nil && ctx.Err() == nil {
776+
if err := sp.workload.Run(ctx, t, c, sp.hardware); err != nil && ctx.Err() == nil {
728777
// Implies the workload context was not cancelled and the workload cmd returned a
729778
// different error.
730779
return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`)
731780
}
732-
rd.t.L().Printf("workload successfully finished")
781+
logger.Printf("workload successfully finished")
733782
return nil
734783
})
784+
735785
var downloadEndTimeLowerBound time.Time
736-
mDownload.Go(func(ctx context.Context) error {
786+
downloadStartTime := timeutil.Now()
787+
mon.Go(func(ctx context.Context, logger *logger.Logger) error {
737788
defer workloadCancel()
738789
if runOnline {
739-
downloadEndTimeLowerBound, err = waitForDownloadJob(ctx, c, t.L())
740-
if err != nil {
790+
var err error
791+
if downloadEndTimeLowerBound, err = waitForDownloadJob(ctx, c, logger); err != nil {
741792
return err
742793
}
743-
if sp.downloadPhaseTimeout != 0 && sp.downloadPhaseTimeout < timeutil.Since(restoreEndTime) {
744-
return errors.Newf("download phase took too long: %s greater than timeout %s", timeutil.Since(restoreEndTime), sp.downloadPhaseTimeout)
794+
downloadTime := downloadEndTimeLowerBound.Sub(downloadStartTime)
795+
if sp.downloadPhaseTimeout != 0 && sp.downloadPhaseTimeout < downloadTime {
796+
return errors.Newf(
797+
"download phase took too long: %s greater than timeout %s",
798+
downloadTime, sp.downloadPhaseTimeout,
799+
)
745800
}
746801
}
802+
747803
if runWorkload {
748-
// Run the workload for at most 5 minutes.
749-
testRuntime := timeutil.Since(testStartTime)
750-
workloadDuration := sp.timeout - (testRuntime + time.Minute)
804+
// Remaining workload duration is capped by the test timeout
805+
testRunTime := timeutil.Since(testStartTime)
806+
testTimeoutRemaining := sp.timeout - (testRunTime + time.Minute)
807+
808+
// Run the workload for at most 5 more minutes.
751809
maxWorkloadDuration := time.Minute * 5
752-
if workloadDuration > maxWorkloadDuration {
753-
workloadDuration = maxWorkloadDuration
754-
}
755-
t.L().Printf("let workload run for another %.2f minutes", workloadDuration.Minutes())
810+
811+
workloadDuration := min(testTimeoutRemaining, maxWorkloadDuration)
812+
logger.Printf("let workload run for another %.2f minutes", workloadDuration.Minutes())
756813
time.Sleep(workloadDuration)
757814
}
758815
return nil
759816
})
760-
mDownload.Wait()
761-
if runWorkload {
762-
workloadEndTime = timeutil.Now()
763-
}
764-
return restoreStats{
765-
collector: statsCollector,
766-
restoreStartTime: restoreStartTime,
767-
restoreEndTime: restoreEndTime,
768-
workloadStartTime: workloadStartTime,
769-
workloadEndTime: workloadEndTime,
770-
downloadEndTimeLowerBound: downloadEndTimeLowerBound,
817+
818+
if err := mon.WaitE(); err != nil {
819+
return time.Time{}, time.Time{}, time.Time{}, err
771820
}
821+
return downloadEndTimeLowerBound, workloadStartTime, workloadEndTime, nil
772822
}

0 commit comments

Comments
 (0)