Skip to content

Commit 7dc9b6f

Browse files
craig[bot]Vidit Bhat
andcommitted
Merge #144365
144365: drt: generalize pause job to support backup, restore and LDR r=shailendra-patel a=vidit-bhat Previously, the `pause-job` operation was hardcoded to target only `LOGICAL REPLICATION` jobs. This was inadequate because it prevented the reuse of logic for other long-running jobs like BACKUP and RESTORE. This patch generalizes the `pause-job` operation into a parameterized function supporting `LOGICAL REPLICATION`, `BACKUP`, and `RESTORE`. Epic: none Fixes: #138503 #138504 #138507 #138508 Release note: None Co-authored-by: Vidit Bhat <[email protected]>
2 parents 6380073 + 4bec6b3 commit 7dc9b6f

File tree

4 files changed

+131
-40
lines changed

4 files changed

+131
-40
lines changed

pkg/cmd/roachtest/operations/pause_job.go

Lines changed: 82 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,66 +18,109 @@ import (
1818
"github.com/cockroachdb/cockroach/pkg/util/randutil"
1919
)
2020

21-
type resumePausedJob struct {
21+
// resumeJob implements OperationCleanup and resumes a paused job by job ID.
22+
type resumeJob struct {
2223
jobId string
2324
}
2425

25-
func (r *resumePausedJob) Cleanup(ctx context.Context, o operation.Operation, c cluster.Cluster) {
26+
func (r *resumeJob) Cleanup(ctx context.Context, o operation.Operation, c cluster.Cluster) {
2627
conn := c.Conn(ctx, o.L(), 1, option.VirtualClusterName(roachtestflags.VirtualCluster))
2728
defer conn.Close()
2829

30+
o.Status(fmt.Sprintf("resuming job %s", r.jobId))
2931
resumeJobStmt := fmt.Sprintf("RESUME JOB %s", r.jobId)
3032
_, err := conn.ExecContext(ctx, resumeJobStmt)
3133
if err != nil {
3234
o.Fatal(err)
3335
}
3436
}
3537

36-
func pauseLDRJob(
37-
ctx context.Context, o operation.Operation, c cluster.Cluster,
38-
) registry.OperationCleanup {
39-
conn := c.Conn(ctx, o.L(), 1, option.VirtualClusterName(roachtestflags.VirtualCluster))
40-
defer conn.Close()
38+
// runPauseJob returns a registry.OperationCleanup function that pauses a running job of the given type.
39+
func runPauseJob(
40+
jobType string,
41+
) func(ctx context.Context, o operation.Operation, c cluster.Cluster) registry.OperationCleanup {
42+
return func(ctx context.Context, o operation.Operation, c cluster.Cluster) registry.OperationCleanup {
43+
conn := c.Conn(ctx, o.L(), 1, option.VirtualClusterName(roachtestflags.VirtualCluster))
44+
defer conn.Close()
4145

42-
//fetch running ldr jobs
43-
jobs, err := conn.QueryContext(ctx, "(WITH x AS (SHOW JOBS) SELECT job_id FROM x WHERE job_type = 'LOGICAL REPLICATION' AND status = 'running')")
44-
if err != nil {
45-
o.Fatal(err)
46-
}
46+
// Query for running jobs of the specified type
47+
query := `
48+
WITH jobs AS (SHOW JOBS)
49+
SELECT job_id FROM jobs
50+
WHERE job_type = $1 AND status = 'running'
51+
`
52+
rows, err := conn.QueryContext(ctx, query, jobType)
53+
if err != nil {
54+
o.Fatal(err)
55+
}
56+
57+
var jobIds []string
58+
for rows.Next() {
59+
var jobId string
60+
if err := rows.Scan(&jobId); err != nil {
61+
o.Fatal(err)
62+
}
63+
jobIds = append(jobIds, jobId)
64+
}
4765

48-
var jobIds []string
49-
for jobs.Next() {
50-
var jobId string
51-
if err := jobs.Scan(&jobId); err != nil {
66+
if len(jobIds) == 0 {
67+
o.Fatal(fmt.Sprintf("no running %s jobs found", jobType))
68+
}
69+
70+
// Randomly pick a job to pause
71+
rng, _ := randutil.NewPseudoRand()
72+
jobId := jobIds[rng.Intn(len(jobIds))]
73+
74+
o.Status(fmt.Sprintf("pausing %s job %s", jobType, jobId))
75+
pauseStmt := fmt.Sprintf("PAUSE JOB %s WITH REASON = 'roachtest operation'", jobId)
76+
if _, err := conn.ExecContext(ctx, pauseStmt); err != nil {
5277
o.Fatal(err)
5378
}
54-
jobIds = append(jobIds, jobId)
55-
}
5679

57-
//pick a random ldr job
58-
rng, _ := randutil.NewPseudoRand()
59-
jobId := jobIds[rng.Intn(len(jobIds))]
80+
o.Status(fmt.Sprintf("paused %s job %s", jobType, jobId))
6081

61-
o.Status(fmt.Sprintf("pausing LDR job %s", jobId))
62-
pauseJobStmt := fmt.Sprintf("PAUSE JOB %s WITH REASON = 'roachtest operation'", jobId)
63-
_, err = conn.ExecContext(ctx, pauseJobStmt)
64-
if err != nil {
65-
o.Fatal(err)
82+
return &resumeJob{jobId: jobId}
6683
}
84+
}
6785

68-
o.Status(fmt.Sprintf("paused LDR job %s", jobId))
69-
return &resumePausedJob{
70-
jobId: jobId,
86+
// registerPauseJob registers pause-job operations for various job types (e.g., BACKUP, RESTORE, LOGICAL REPLICATION).
87+
func registerPauseJob(r registry.Registry) {
88+
jobTypes := []struct {
89+
JobType string
90+
OpName string
91+
Dependency registry.OperationDependency
92+
Owner registry.Owner
93+
Description string
94+
}{
95+
{
96+
JobType: "LOGICAL REPLICATION",
97+
OpName: "pause-job/logical-replication",
98+
Dependency: registry.OperationRequiresLDRJobRunning,
99+
Owner: registry.OwnerDisasterRecovery,
100+
},
101+
{
102+
JobType: "BACKUP",
103+
OpName: "pause-job/backup",
104+
Dependency: registry.OperationRequiresRunningBackupJob,
105+
Owner: registry.OwnerDisasterRecovery,
106+
},
107+
{
108+
JobType: "RESTORE",
109+
OpName: "pause-job/restore",
110+
Dependency: registry.OperationRequiresRunningRestoreJob,
111+
Owner: registry.OwnerDisasterRecovery,
112+
},
71113
}
72-
}
73114

74-
func registerPauseLDRJob(r registry.Registry) {
75-
r.AddOperation(registry.OperationSpec{
76-
Name: "pause-ldr",
77-
Owner: registry.OwnerDisasterRecovery,
78-
Timeout: 15 * time.Minute,
79-
CompatibleClouds: registry.AllClouds,
80-
Dependencies: []registry.OperationDependency{registry.OperationRequiresLDRJobRunning},
81-
Run: pauseLDRJob,
82-
})
115+
for _, jt := range jobTypes {
116+
r.AddOperation(registry.OperationSpec{
117+
Name: jt.OpName,
118+
Owner: jt.Owner,
119+
Timeout: 15 * time.Minute,
120+
CompatibleClouds: registry.AllClouds,
121+
Dependencies: []registry.OperationDependency{jt.Dependency},
122+
Run: runPauseJob(jt.JobType),
123+
CanRunConcurrently: registry.OperationCanRunConcurrently,
124+
})
125+
}
83126
}

pkg/cmd/roachtest/operations/register.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func RegisterOperations(r registry.Registry) {
2727
registerBackupRestore(r)
2828
registerManualCompaction(r)
2929
registerResize(r)
30-
registerPauseLDRJob(r)
30+
registerPauseJob(r)
3131
registerLicenseThrottle(r)
3232
registerSessionVariables(r)
3333
registerDebugZip(r)

pkg/cmd/roachtest/registry/operation_spec.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ const (
2727
OperationRequiresZeroUnavailableRanges
2828
OperationRequiresZeroUnderreplicatedRanges
2929
OperationRequiresLDRJobRunning
30+
OperationRequiresRunningBackupJob
31+
OperationRequiresRunningRestoreJob
3032
)
3133

3234
// OperationIsolation specifies to what extent the operation runner will try

pkg/cmd/roachtest/roachtestutil/operations/dependency.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,18 @@ func CheckDependencies(
5252
return ok, err
5353
}
5454

55+
case registry.OperationRequiresRunningBackupJob:
56+
ok, err := checkBackupJobRunning(ctx, c, l)
57+
if err != nil || !ok {
58+
return ok, err
59+
}
60+
61+
case registry.OperationRequiresRunningRestoreJob:
62+
ok, err := checkRestoreJobRunning(ctx, c, l)
63+
if err != nil || !ok {
64+
return ok, err
65+
}
66+
5567
default:
5668
panic(fmt.Sprintf("unknown operation dependency %d", dep))
5769
}
@@ -128,3 +140,37 @@ func checkLDRJobRunning(
128140
_ = jobsCur.Scan(&jobId)
129141
return jobId != "", nil
130142
}
143+
144+
func checkBackupJobRunning(
145+
ctx context.Context, c cluster.Cluster, l *logger.Logger,
146+
) (ok bool, _ error) {
147+
conn := c.Conn(ctx, l, 1, option.VirtualClusterName("system"))
148+
defer conn.Close()
149+
150+
jobsCur, err := conn.QueryContext(ctx,
151+
"(WITH x AS (SHOW JOBS) SELECT job_id FROM x WHERE job_type = 'BACKUP' AND status = 'running' LIMIT 1)")
152+
if err != nil {
153+
return false, err
154+
}
155+
jobsCur.Next()
156+
var jobId string
157+
_ = jobsCur.Scan(&jobId)
158+
return jobId != "", nil
159+
}
160+
161+
func checkRestoreJobRunning(
162+
ctx context.Context, c cluster.Cluster, l *logger.Logger,
163+
) (ok bool, _ error) {
164+
conn := c.Conn(ctx, l, 1, option.VirtualClusterName("system"))
165+
defer conn.Close()
166+
167+
jobsCur, err := conn.QueryContext(ctx,
168+
"WITH x AS (SHOW JOBS) SELECT job_id FROM x WHERE job_type = 'RESTORE' AND status = 'running' LIMIT 1)")
169+
if err != nil {
170+
return false, err
171+
}
172+
jobsCur.Next()
173+
var jobId string
174+
_ = jobsCur.Scan(&jobId)
175+
return jobId != "", nil
176+
}

0 commit comments

Comments
 (0)