diff --git a/pkg/cmd/roachtest/tests/jobs.go b/pkg/cmd/roachtest/tests/jobs.go index b20bde07baf7..8ce3cec55376 100644 --- a/pkg/cmd/roachtest/tests/jobs.go +++ b/pkg/cmd/roachtest/tests/jobs.go @@ -10,6 +10,7 @@ import ( gosql "database/sql" "fmt" "math/rand" + "strings" "time" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" @@ -21,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" @@ -29,10 +31,11 @@ import ( var ( tableCount = 5000 - nodeCount = 4 + nodeCount = 20 tableNamePrefix = "t" tableSchema = "(id INT PRIMARY KEY, s STRING)" showJobsTimeout = time.Minute * 2 + showChangefeedJobsTimeout = showJobsTimeout * 5 pollerMinFrequencySeconds = 30 roachtestTimeout = time.Minute * 45 workloadDuration = roachtestTimeout - time.Minute*10 @@ -46,9 +49,8 @@ func registerJobs(r registry.Registry) { Owner: registry.OwnerDisasterRecovery, Cluster: jobsSpec, EncryptionSupport: registry.EncryptionMetamorphic, - Leases: registry.MetamorphicLeases, CompatibleClouds: registry.OnlyGCE, - Suites: registry.Suites(registry.Nightly), + Suites: registry.Suites(registry.Weekly), Timeout: roachtestTimeout, Run: runJobsStress, }) @@ -64,7 +66,7 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) { if c.IsLocal() { tableCount = 250 - showJobsTimeout = 30 * time.Second + showJobsTimeout = 30 * time.Minute pollerMinFrequencySeconds = 5 workloadDuration = time.Minute * 5 } @@ -72,10 +74,9 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) { sqlDB.Exec(t, "CREATE DATABASE d") sqlDB.Exec(t, "SET DATABASE=d") - // Because this roachtest spins up and pauses/cancels 5k changefeed jobs - // really quickly, run the adopt interval which by default only runs every 30s - // and adopts 10 jobs at a time. - sqlDB.Exec(t, "SET CLUSTER SETTING jobs.registry.interval.adopt='5s'") + // Simulate contention of a cluster 10x the size by reducing job registry + // intervals. + sqlDB.Exec(t, "SET CLUSTER SETTING jobs.registry.interval.base=0.1") rng, seed := randutil.NewLockedPseudoRand() t.L().Printf("Rand seed: %d", seed) @@ -97,13 +98,12 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) { return nil }) - randomPoller := func(f func(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) error) func(ctx context.Context, _ *logger.Logger) error { + poller := func(waitTime time.Duration, f func(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) error) func(ctx context.Context, _ *logger.Logger) error { return func(ctx context.Context, _ *logger.Logger) error { var pTimer timeutil.Timer defer pTimer.Stop() for { - waitTime := time.Duration(rng.Intn(pollerMinFrequencySeconds)+1) * time.Second pTimer.Reset(waitTime) select { case <-ctx.Done(): @@ -120,13 +120,25 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) { } } } + for i := 0; i < nodeCount; i++ { + group.Go(poller(time.Second, checkJobQueryLatency)) + } + + waitTime := time.Duration(rng.Intn(pollerMinFrequencySeconds)+1) * time.Second + jobIDToTableName := make(map[jobspb.JobID]string) + var jobIDToTableNameMu syncutil.Mutex + + controlChangefeedsWithMap := func(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) error { + return controlChangefeeds(ctx, t, c, rng, jobIDToTableName, &jobIDToTableNameMu) + } + group.Go(poller(waitTime, controlChangefeedsWithMap)) - group.Go(randomPoller(checkJobQueryLatency)) + group.Go(poller(time.Minute*5, splitScatterMergeJobsTable)) - group.Go(randomPoller(pauseResumeChangefeeds)) + group.Go(poller(time.Minute, checkPersistentUnclaimedJobs)) group.Go(func(ctx context.Context, _ *logger.Logger) error { - createTablesWithChangefeeds(ctx, t, c, rng) + createTablesWithChangefeeds(ctx, t, c, rng, jobIDToTableName, &jobIDToTableNameMu) return nil }) @@ -135,10 +147,16 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) { require.NoError(t, group.WaitE()) checkJobSystemHealth(ctx, t, c, rng) + require.NoError(t, checkJobQueryLatency(ctx, t, c, rng)) } func createTablesWithChangefeeds( - ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand, + ctx context.Context, + t test.Test, + c cluster.Cluster, + rng *rand.Rand, + jobIDToTableName map[jobspb.JobID]string, + jobIDToTableNameMu *syncutil.Mutex, ) { t.L().Printf("Creating %d tables with changefeeds", tableCount) @@ -155,7 +173,14 @@ func createTablesWithChangefeeds( tableName := tableNamePrefix + fmt.Sprintf("%d", i) sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s %s`, tableName, tableSchema)) sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (1, 'x'),(2,'y')`, tableName)) - sqlDB.Exec(t, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='2m', protect_data_from_gc_on_pause", tableName)) + + var jobID jobspb.JobID + sqlDB.QueryRow(t, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='6m', protect_data_from_gc_on_pause", tableName)).Scan(&jobID) + + jobIDToTableNameMu.Lock() + jobIDToTableName[jobID] = tableName + jobIDToTableNameMu.Unlock() + if i%(tableCount/5) == 0 { t.L().Printf("Created %d tables so far", i) } @@ -169,17 +194,17 @@ func checkJobQueryLatency( ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand, ) error { conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1) - defer conn.Close() - if err := checkQueryLatency(ctx, "SHOW JOBS", conn, t.L(), showJobsTimeout); err != nil { - return err - } - var randomChangefeedJobID int - if err := conn.QueryRowContext(ctx, "SELECT job_id FROM [SHOW JOBS] ORDER BY random() LIMIT 1").Scan(&randomChangefeedJobID); err != nil { + toLog := false + if rng.Intn(100) == 0 { + toLog = true + } + defer conn.Close() + if err := checkQueryLatency(ctx, "SHOW JOBS", conn, t.L(), showJobsTimeout, toLog); err != nil { return err } - if err := checkQueryLatency(ctx, redact.Sprintf("SHOW JOB %d", randomChangefeedJobID), conn, t.L(), showJobsTimeout/10); err != nil { + if err := checkQueryLatency(ctx, "SHOW CHANGEFEED JOBS", conn, t.L(), showChangefeedJobsTimeout, toLog); err != nil { return err } return nil @@ -191,6 +216,7 @@ func checkQueryLatency( conn *gosql.DB, l *logger.Logger, timeout time.Duration, + toLog bool, ) error { queryBegin := timeutil.Now() var err error @@ -207,49 +233,185 @@ func checkQueryLatency( }) return errors.CombineErrors(err, explainErr) } - l.Printf("%s ran in %.2f seconds", query, timeutil.Since(queryBegin).Seconds()) + if toLog { + l.Printf("%s ran in %.2f seconds", query, timeutil.Since(queryBegin).Seconds()) + } + return nil +} + +func controlChangefeeds( + ctx context.Context, + t test.Test, + c cluster.Cluster, + rng *rand.Rand, + jobIDToTableName map[jobspb.JobID]string, + jobIDToTableNameMu *syncutil.Mutex, +) error { + conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1) + defer conn.Close() + + jobAction := func(action, gatherQuery, actionQuery string, pctAction float32) error { + + rows, err := conn.QueryContext(ctx, gatherQuery) + if err != nil { + return err + } + defer rows.Close() + + var jobs []jobspb.JobID + for rows.Next() { + var jobID jobspb.JobID + if err := rows.Scan(&jobID); err != nil { + return err + } + jobs = append(jobs, jobID) + } + rows.Close() + + errCount := 0 + recreatedCount := 0 + count := int(float32(len(jobs)) * pctAction) + for i := 0; i < count; i++ { + jobIdx := rng.Intn(len(jobs)) + jobID := jobs[jobIdx] + _, err := conn.Exec(actionQuery, jobID) + if err != nil { + errCount++ + // Try to recreate the changefeed + jobIDToTableNameMu.Lock() + tableName, exists := jobIDToTableName[jobID] + jobIDToTableNameMu.Unlock() + if !exists { + t.L().Printf("No table name found for job %d, skipping recreation", jobID) + continue + } + var newJobID jobspb.JobID + err := conn.QueryRowContext(ctx, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='6m', protect_data_from_gc_on_pause", tableName)).Scan(&newJobID) + if err == nil { + jobIDToTableNameMu.Lock() + jobIDToTableName[newJobID] = tableName + jobIDToTableNameMu.Unlock() + recreatedCount++ + } else { + t.L().Printf("Failed to recreate changefeed for table %s (job %d): %v", tableName, jobID, err) + } + } + } + t.L().Printf("%s on %d of %d jobs, recreated %d changefeeds, of %d total eligible jobs", action, count-errCount, count, recreatedCount, len(jobs)) + return nil + } + if err := jobAction("pause running jobs", "SELECT id from system.jobs where job_type='CHANGEFEED' and status='running'", "PAUSE JOB $1", 0.2); err != nil { + return err + } + if err := jobAction("resume paused jobs", "SELECT id from system.jobs where job_type='CHANGEFEED' and status='paused'", "RESUME JOB $1", 0.2); err != nil { + return err + } + + // This call will recreate up to 200 canceled jobs. The resume query will obviously fail on cancelled + // jobs. + return jobAction("resume canceled jobs", "SELECT id from system.jobs where job_type='CHANGEFEED' and status='canceled' order by created DESC limit 200", "RESUME JOB $1", 1) +} + +func splitScatterMergeJobsTable( + ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand, +) error { + conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1) + defer conn.Close() + + _, err := conn.ExecContext(ctx, "ALTER TABLE system.jobs SPLIT AT (select id from system.jobs order by random() limit 3)") + if err != nil { + t.L().Printf("Error splitting %s", err) + } + return nil } -func pauseResumeChangefeeds( +func checkPersistentUnclaimedJobs( ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand, ) error { conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1) defer conn.Close() - rows, err := conn.QueryContext(ctx, "SELECT job_id FROM [SHOW CHANGEFEED JOBS]") + sqlDB := sqlutils.MakeSQLRunner(conn) + + // First, get currently unclaimed running jobs + rows, err := sqlDB.DB.QueryContext(ctx, ` + SELECT id + FROM system.jobs + WHERE claim_instance_id IS NULL + AND status NOT IN ('paused', 'canceled', 'failed'); + `) if err != nil { return err } defer rows.Close() - var jobs []jobspb.JobID + var currentUnclaimedJobIDs []jobspb.JobID for rows.Next() { var jobID jobspb.JobID if err := rows.Scan(&jobID); err != nil { return err } - jobs = append(jobs, jobID) + currentUnclaimedJobIDs = append(currentUnclaimedJobIDs, jobID) } rows.Close() - if len(jobs) < tableCount/10 { + // If no unclaimed jobs, nothing to check + if len(currentUnclaimedJobIDs) == 0 { return nil } - jobAction := func(cmd string, count int) { - errCount := 0 - for i := 0; i < count; i++ { - jobIdx := rng.Intn(len(jobs)) - _, err := conn.Exec(cmd, jobs[jobIdx]) - if err != nil { - errCount++ + + // Check progressively back in time (1m, 2m, 3m, 4m, 5m) + for minutesAgo := 1; minutesAgo <= 5; minutesAgo++ { + // Build the job ID list for the WHERE IN clause + jobIDList := make([]string, len(currentUnclaimedJobIDs)) + for i, id := range currentUnclaimedJobIDs { + jobIDList[i] = fmt.Sprintf("%d", id) + } + jobIDsStr := "(" + strings.Join(jobIDList, ", ") + ")" + + t.L().Printf("Checking for persistent unclaimed jobs %s from %d minutes ago", jobIDsStr, minutesAgo) + + // Query unclaimed jobs at this time in the past using AOST, filtered by current unclaimed job IDs + query := fmt.Sprintf(` + SELECT id + FROM system.jobs AS OF SYSTEM TIME '-%dm' + WHERE claim_instance_id IS NULL + AND status = 'running' + AND id IN %s + `, minutesAgo, jobIDsStr) + + rows, err := sqlDB.DB.QueryContext(ctx, query) + if err != nil { + return err + } + + var stillUnclaimedJobIDs []jobspb.JobID + for rows.Next() { + var jobID jobspb.JobID + if err := rows.Scan(&jobID); err != nil { + rows.Close() + return err } + stillUnclaimedJobIDs = append(stillUnclaimedJobIDs, jobID) + } + rows.Close() + + // If none of the current unclaimed jobs were unclaimed at this time, they're all new + if len(stillUnclaimedJobIDs) == 0 { + return nil } - t.L().Printf("Failed to run %s on %d of %d jobs", cmd, errCount, count) + + // If we've reached 5 minutes and still have unclaimed jobs, that's an error + if minutesAgo == 5 { + return errors.Errorf("there exists %d jobs that have been persistently unclaimed for at least 5 minutes: %v", len(stillUnclaimedJobIDs), stillUnclaimedJobIDs) + } + + // Update the list to only check the jobs that are still unclaimed + currentUnclaimedJobIDs = stillUnclaimedJobIDs } - jobAction("PAUSE JOB $1", len(jobs)/10) - jobAction("RESUME JOB $1", len(jobs)/10) - return nil + + return errors.New("unreachable code reached in checkPersistentUnclaimedJobs") } func checkJobSystemHealth(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) {