@@ -20,6 +20,8 @@ import (
2020 "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade"
2121 "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
2222 "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
23+ "github.com/cockroachdb/cockroach/pkg/jobs"
24+ "github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2325 "github.com/cockroachdb/cockroach/pkg/roachprod"
2426 "github.com/cockroachdb/cockroach/pkg/roachprod/blobfixture"
2527 "github.com/cockroachdb/cockroach/pkg/roachprod/install"
@@ -35,6 +37,8 @@ type TpccFixture struct {
3537 WorkloadWarehouses int
3638 MinutesPerIncremental int
3739 IncrementalChainLength int
40+ CompactionThreshold int
41+ CompactionWindow int
3842 RestoredSizeEstimate string
3943}
4044
@@ -45,6 +49,8 @@ var TinyFixture = TpccFixture{
4549 ImportWarehouses : 10 ,
4650 WorkloadWarehouses : 10 ,
4751 IncrementalChainLength : 4 ,
52+ CompactionThreshold : 3 ,
53+ CompactionWindow : 2 ,
4854 RestoredSizeEstimate : "700MiB" ,
4955}
5056
@@ -55,6 +61,8 @@ var SmallFixture = TpccFixture{
5561 ImportWarehouses : 5000 ,
5662 WorkloadWarehouses : 1000 ,
5763 IncrementalChainLength : 48 ,
64+ CompactionThreshold : 3 ,
65+ CompactionWindow : 2 ,
5866 RestoredSizeEstimate : "350GiB" ,
5967}
6068
@@ -65,6 +73,8 @@ var MediumFixture = TpccFixture{
6573 ImportWarehouses : 30000 ,
6674 WorkloadWarehouses : 5000 ,
6775 IncrementalChainLength : 400 ,
76+ CompactionThreshold : 3 ,
77+ CompactionWindow : 2 ,
6878 RestoredSizeEstimate : "2TiB" ,
6979}
7080
@@ -77,6 +87,8 @@ var LargeFixture = TpccFixture{
7787 ImportWarehouses : 300000 ,
7888 WorkloadWarehouses : 7500 ,
7989 IncrementalChainLength : 400 ,
90+ CompactionThreshold : 3 ,
91+ CompactionWindow : 2 ,
8092 RestoredSizeEstimate : "20TiB" ,
8193}
8294
@@ -205,37 +217,136 @@ func (bd *backupDriver) runWorkload(ctx context.Context) (func(), error) {
205217// scheduleBackups begins the backup schedule.
206218func (bd * backupDriver ) scheduleBackups (ctx context.Context ) {
207219 bd .t .L ().Printf ("creating backup schedule" , bd .sp .fixture .WorkloadWarehouses )
208-
209- createScheduleStatement := CreateScheduleStatement (bd .registry .URI (bd .fixture .DataPath ))
210220 conn := bd .c .Conn (ctx , bd .t .L (), 1 )
221+ defer conn .Close ()
222+ if bd .sp .fixture .CompactionThreshold > 0 {
223+ bd .t .L ().Printf (
224+ "enabling compaction with threshold %d and window size %d" ,
225+ bd .sp .fixture .CompactionThreshold , bd .sp .fixture .CompactionWindow ,
226+ )
227+ _ , err := conn .Exec (fmt .Sprintf (
228+ "SET CLUSTER SETTING backup.compaction.threshold = %d" , bd .sp .fixture .CompactionThreshold ,
229+ ))
230+ require .NoError (bd .t , err )
231+ _ , err = conn .Exec (fmt .Sprintf (
232+ "SET CLUSTER SETTING backup.compaction.window_size = %d" , bd .sp .fixture .CompactionWindow ,
233+ ))
234+ require .NoError (bd .t , err )
235+ }
236+ createScheduleStatement := CreateScheduleStatement (bd .registry .URI (bd .fixture .DataPath ))
211237 _ , err := conn .Exec (createScheduleStatement )
212238 require .NoError (bd .t , err )
213239}
214240
215241// monitorBackups pauses the schedule once the target number of backups in the
216242// chain have been taken.
217- func (bd * backupDriver ) monitorBackups (ctx context.Context ) {
218- sql := sqlutils .MakeSQLRunner (bd .c .Conn (ctx , bd .t .L (), 1 ))
243+ func (bd * backupDriver ) monitorBackups (ctx context.Context ) error {
244+ conn := bd .c .Conn (ctx , bd .t .L (), 1 )
245+ defer conn .Close ()
246+ sql := sqlutils .MakeSQLRunner (conn )
219247 fixtureURI := bd .registry .URI (bd .fixture .DataPath )
220- for {
248+ const (
249+ WaitingFirstFull = iota
250+ RunningIncrementals
251+ WaitingCompactions
252+ Done
253+ )
254+ state := WaitingFirstFull
255+ for state != Done {
221256 time .Sleep (1 * time .Minute )
222- var activeScheduleCount int
223- scheduleCountQuery := fmt .Sprintf (`SELECT count(*) FROM [SHOW SCHEDULES] WHERE label='%s' AND schedule_status='ACTIVE'` , scheduleLabel )
224- sql .QueryRow (bd .t , scheduleCountQuery ).Scan (& activeScheduleCount )
225- if activeScheduleCount < 2 {
226- bd .t .L ().Printf (`First full backup still running` )
227- continue
257+ compSuccess , compRunning , compFailed , err := bd .compactionJobStates (sql )
258+ if err != nil {
259+ return err
260+ }
261+ switch state {
262+ case WaitingFirstFull :
263+ var activeScheduleCount int
264+ scheduleCountQuery := fmt .Sprintf (
265+ `SELECT count(*) FROM [SHOW SCHEDULES] WHERE label='%s' AND schedule_status='ACTIVE'` , scheduleLabel ,
266+ )
267+ sql .QueryRow (bd .t , scheduleCountQuery ).Scan (& activeScheduleCount )
268+ if activeScheduleCount < 2 {
269+ bd .t .L ().Printf (`First full backup still running` )
270+ } else {
271+ state = RunningIncrementals
272+ }
273+ case RunningIncrementals :
274+ var backupCount int
275+ backupCountQuery := fmt .Sprintf (
276+ `SELECT count(DISTINCT end_time) FROM [SHOW BACKUP FROM LATEST IN '%s']` , fixtureURI .String (),
277+ )
278+ sql .QueryRow (bd .t , backupCountQuery ).Scan (& backupCount )
279+ bd .t .L ().Printf (`%d scheduled backups taken` , backupCount )
280+
281+ if bd .sp .fixture .CompactionThreshold > 0 {
282+ bd .t .L ().Printf ("%d compaction jobs succeeded, %d running" , len (compSuccess ), len (compRunning ))
283+ if len (compFailed ) > 0 {
284+ return errors .Newf ("compaction jobs with ids %v failed" , compFailed )
285+ }
286+ }
287+
288+ if backupCount >= bd .sp .fixture .IncrementalChainLength {
289+ pauseSchedulesQuery := fmt .Sprintf (
290+ `PAUSE SCHEDULES WITH x AS (SHOW SCHEDULES) SELECT id FROM x WHERE label = '%s'` , scheduleLabel ,
291+ )
292+ sql .Exec (bd .t , pauseSchedulesQuery )
293+ if len (compRunning ) > 0 {
294+ state = WaitingCompactions
295+ } else {
296+ state = Done
297+ }
298+ }
299+ case WaitingCompactions :
300+ if len (compFailed ) > 0 {
301+ return errors .Newf ("compaction jobs with ids %v failed" , compFailed )
302+ } else if len (compRunning ) > 0 {
303+ bd .t .L ().Printf ("waiting for %d compaction jobs to finish" , len (compRunning ))
304+ } else {
305+ state = Done
306+ }
307+ }
308+ }
309+ return nil
310+ }
311+
312+ // compactionJobStates returns the state of the compaction jobs, returning
313+ // the IDs of the jobs that succeeded, are running, and failed.
314+ func (bd * backupDriver ) compactionJobStates (
315+ sql * sqlutils.SQLRunner ,
316+ ) ([]jobspb.JobID , []jobspb.JobID , []jobspb.JobID , error ) {
317+ if bd .sp .fixture .CompactionThreshold == 0 {
318+ return nil , nil , nil , nil
319+ }
320+ type Job struct {
321+ jobID jobspb.JobID
322+ status jobs.State
323+ }
324+ compactionQuery := `SELECT job_id, status FROM [SHOW JOBS] WHERE job_type = 'BACKUP' AND
325+ description ILIKE 'COMPACT BACKUPS%'`
326+ rows := sql .Query (bd .t , compactionQuery )
327+ defer rows .Close ()
328+ var compJobs []Job
329+ for rows .Next () {
330+ var job Job
331+ if err := rows .Scan (& job .jobID , & job .status ); err != nil {
332+ return nil , nil , nil , errors .Wrapf (err , "error scanning compaction job" )
228333 }
229- var backupCount int
230- backupCountQuery := fmt .Sprintf (`SELECT count(DISTINCT end_time) FROM [SHOW BACKUP FROM LATEST IN '%s']` , fixtureURI .String ())
231- sql .QueryRow (bd .t , backupCountQuery ).Scan (& backupCount )
232- bd .t .L ().Printf (`%d scheduled backups taken` , backupCount )
233- if backupCount >= bd .sp .fixture .IncrementalChainLength {
234- pauseSchedulesQuery := fmt .Sprintf (`PAUSE SCHEDULES WITH x AS (SHOW SCHEDULES) SELECT id FROM x WHERE label = '%s'` , scheduleLabel )
235- sql .Exec (bd .t , pauseSchedulesQuery )
236- break
334+ compJobs = append (compJobs , job )
335+ }
336+ var successes , running , failures []jobspb.JobID
337+ for _ , job := range compJobs {
338+ switch job .status {
339+ case jobs .StateSucceeded :
340+ successes = append (successes , job .jobID )
341+ case jobs .StateRunning :
342+ running = append (running , job .jobID )
343+ case jobs .StateFailed :
344+ failures = append (failures , job .jobID )
345+ default :
346+ bd .t .L ().Printf (`unexpected compaction job %d in state %s` , job .jobID , job .status )
237347 }
238348 }
349+ return successes , running , failures , nil
239350}
240351
241352func fixtureDirectory () string {
@@ -357,7 +468,7 @@ func registerBackupFixtures(r registry.Registry) {
357468 require .NoError (t , err )
358469
359470 bd .scheduleBackups (ctx )
360- bd .monitorBackups (ctx )
471+ require . NoError ( t , bd .monitorBackups (ctx ) )
361472
362473 stopWorkload ()
363474
0 commit comments