Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 201 additions & 39 deletions pkg/cmd/roachtest/tests/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
gosql "database/sql"
"fmt"
"math/rand"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
Expand All @@ -29,10 +31,11 @@ import (

var (
tableCount = 5000
nodeCount = 4
nodeCount = 20
tableNamePrefix = "t"
tableSchema = "(id INT PRIMARY KEY, s STRING)"
showJobsTimeout = time.Minute * 2
showChangefeedJobsTimeout = showJobsTimeout * 5
pollerMinFrequencySeconds = 30
roachtestTimeout = time.Minute * 45
workloadDuration = roachtestTimeout - time.Minute*10
Expand All @@ -46,9 +49,8 @@ func registerJobs(r registry.Registry) {
Owner: registry.OwnerDisasterRecovery,
Cluster: jobsSpec,
EncryptionSupport: registry.EncryptionMetamorphic,
Leases: registry.MetamorphicLeases,
CompatibleClouds: registry.OnlyGCE,
Suites: registry.Suites(registry.Nightly),
Suites: registry.Suites(registry.Weekly),
Timeout: roachtestTimeout,
Run: runJobsStress,
})
Expand All @@ -64,18 +66,17 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) {

if c.IsLocal() {
tableCount = 250
showJobsTimeout = 30 * time.Second
showJobsTimeout = 30 * time.Minute
pollerMinFrequencySeconds = 5
workloadDuration = time.Minute * 5
}
sqlDB := sqlutils.MakeSQLRunner(conn)
sqlDB.Exec(t, "CREATE DATABASE d")
sqlDB.Exec(t, "SET DATABASE=d")

// Because this roachtest spins up and pauses/cancels 5k changefeed jobs
// really quickly, run the adopt interval which by default only runs every 30s
// and adopts 10 jobs at a time.
sqlDB.Exec(t, "SET CLUSTER SETTING jobs.registry.interval.adopt='5s'")
// Simulate contention of a cluster 10x the size by reducing job registry
// intervals.
sqlDB.Exec(t, "SET CLUSTER SETTING jobs.registry.interval.base=0.1")

rng, seed := randutil.NewLockedPseudoRand()
t.L().Printf("Rand seed: %d", seed)
Expand All @@ -97,13 +98,12 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) {
return nil
})

randomPoller := func(f func(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) error) func(ctx context.Context, _ *logger.Logger) error {
poller := func(waitTime time.Duration, f func(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) error) func(ctx context.Context, _ *logger.Logger) error {

return func(ctx context.Context, _ *logger.Logger) error {
var pTimer timeutil.Timer
defer pTimer.Stop()
for {
waitTime := time.Duration(rng.Intn(pollerMinFrequencySeconds)+1) * time.Second
pTimer.Reset(waitTime)
select {
case <-ctx.Done():
Expand All @@ -120,13 +120,25 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) {
}
}
}
for i := 0; i < nodeCount; i++ {
group.Go(poller(time.Second, checkJobQueryLatency))
}

waitTime := time.Duration(rng.Intn(pollerMinFrequencySeconds)+1) * time.Second
jobIDToTableName := make(map[jobspb.JobID]string)
var jobIDToTableNameMu syncutil.Mutex

controlChangefeedsWithMap := func(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) error {
return controlChangefeeds(ctx, t, c, rng, jobIDToTableName, &jobIDToTableNameMu)
}
group.Go(poller(waitTime, controlChangefeedsWithMap))

group.Go(randomPoller(checkJobQueryLatency))
group.Go(poller(time.Minute*5, splitScatterMergeJobsTable))

group.Go(randomPoller(pauseResumeChangefeeds))
group.Go(poller(time.Minute, checkPersistentUnclaimedJobs))

group.Go(func(ctx context.Context, _ *logger.Logger) error {
createTablesWithChangefeeds(ctx, t, c, rng)
createTablesWithChangefeeds(ctx, t, c, rng, jobIDToTableName, &jobIDToTableNameMu)
return nil
})

Expand All @@ -135,10 +147,16 @@ func runJobsStress(ctx context.Context, t test.Test, c cluster.Cluster) {

require.NoError(t, group.WaitE())
checkJobSystemHealth(ctx, t, c, rng)
require.NoError(t, checkJobQueryLatency(ctx, t, c, rng))
}

func createTablesWithChangefeeds(
ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand,
ctx context.Context,
t test.Test,
c cluster.Cluster,
rng *rand.Rand,
jobIDToTableName map[jobspb.JobID]string,
jobIDToTableNameMu *syncutil.Mutex,
) {
t.L().Printf("Creating %d tables with changefeeds", tableCount)

Expand All @@ -155,7 +173,14 @@ func createTablesWithChangefeeds(
tableName := tableNamePrefix + fmt.Sprintf("%d", i)
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s %s`, tableName, tableSchema))
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (1, 'x'),(2,'y')`, tableName))
sqlDB.Exec(t, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='2m', protect_data_from_gc_on_pause", tableName))

var jobID jobspb.JobID
sqlDB.QueryRow(t, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='6m', protect_data_from_gc_on_pause", tableName)).Scan(&jobID)

jobIDToTableNameMu.Lock()
jobIDToTableName[jobID] = tableName
jobIDToTableNameMu.Unlock()

if i%(tableCount/5) == 0 {
t.L().Printf("Created %d tables so far", i)
}
Expand All @@ -169,17 +194,17 @@ func checkJobQueryLatency(
ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand,
) error {
conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1)
defer conn.Close()
if err := checkQueryLatency(ctx, "SHOW JOBS", conn, t.L(), showJobsTimeout); err != nil {
return err
}

var randomChangefeedJobID int
if err := conn.QueryRowContext(ctx, "SELECT job_id FROM [SHOW JOBS] ORDER BY random() LIMIT 1").Scan(&randomChangefeedJobID); err != nil {
toLog := false
if rng.Intn(100) == 0 {
toLog = true
}
defer conn.Close()
if err := checkQueryLatency(ctx, "SHOW JOBS", conn, t.L(), showJobsTimeout, toLog); err != nil {
return err
}

if err := checkQueryLatency(ctx, redact.Sprintf("SHOW JOB %d", randomChangefeedJobID), conn, t.L(), showJobsTimeout/10); err != nil {
if err := checkQueryLatency(ctx, "SHOW CHANGEFEED JOBS", conn, t.L(), showChangefeedJobsTimeout, toLog); err != nil {
return err
}
return nil
Expand All @@ -191,6 +216,7 @@ func checkQueryLatency(
conn *gosql.DB,
l *logger.Logger,
timeout time.Duration,
toLog bool,
) error {
queryBegin := timeutil.Now()
var err error
Expand All @@ -207,49 +233,185 @@ func checkQueryLatency(
})
return errors.CombineErrors(err, explainErr)
}
l.Printf("%s ran in %.2f seconds", query, timeutil.Since(queryBegin).Seconds())
if toLog {
l.Printf("%s ran in %.2f seconds", query, timeutil.Since(queryBegin).Seconds())
}
return nil
}

func controlChangefeeds(
ctx context.Context,
t test.Test,
c cluster.Cluster,
rng *rand.Rand,
jobIDToTableName map[jobspb.JobID]string,
jobIDToTableNameMu *syncutil.Mutex,
) error {
conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1)
defer conn.Close()

jobAction := func(action, gatherQuery, actionQuery string, pctAction float32) error {

rows, err := conn.QueryContext(ctx, gatherQuery)
if err != nil {
return err
}
defer rows.Close()

var jobs []jobspb.JobID
for rows.Next() {
var jobID jobspb.JobID
if err := rows.Scan(&jobID); err != nil {
return err
}
jobs = append(jobs, jobID)
}
rows.Close()

errCount := 0
recreatedCount := 0
count := int(float32(len(jobs)) * pctAction)
for i := 0; i < count; i++ {
jobIdx := rng.Intn(len(jobs))
jobID := jobs[jobIdx]
_, err := conn.Exec(actionQuery, jobID)
if err != nil {
errCount++
// Try to recreate the changefeed
jobIDToTableNameMu.Lock()
tableName, exists := jobIDToTableName[jobID]
jobIDToTableNameMu.Unlock()
if !exists {
t.L().Printf("No table name found for job %d, skipping recreation", jobID)
continue
}
var newJobID jobspb.JobID
err := conn.QueryRowContext(ctx, fmt.Sprintf("CREATE CHANGEFEED FOR %s INTO 'null://' WITH gc_protect_expires_after='6m', protect_data_from_gc_on_pause", tableName)).Scan(&newJobID)
if err == nil {
jobIDToTableNameMu.Lock()
jobIDToTableName[newJobID] = tableName
jobIDToTableNameMu.Unlock()
recreatedCount++
} else {
t.L().Printf("Failed to recreate changefeed for table %s (job %d): %v", tableName, jobID, err)
}
}
}
t.L().Printf("%s on %d of %d jobs, recreated %d changefeeds, of %d total eligible jobs", action, count-errCount, count, recreatedCount, len(jobs))
return nil
}
if err := jobAction("pause running jobs", "SELECT id from system.jobs where job_type='CHANGEFEED' and status='running'", "PAUSE JOB $1", 0.2); err != nil {
return err
}
if err := jobAction("resume paused jobs", "SELECT id from system.jobs where job_type='CHANGEFEED' and status='paused'", "RESUME JOB $1", 0.2); err != nil {
return err
}

// This call will recreate up to 200 canceled jobs. The resume query will obviously fail on cancelled
// jobs.
return jobAction("resume canceled jobs", "SELECT id from system.jobs where job_type='CHANGEFEED' and status='canceled' order by created DESC limit 200", "RESUME JOB $1", 1)
}

func splitScatterMergeJobsTable(
ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand,
) error {
conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1)
defer conn.Close()

_, err := conn.ExecContext(ctx, "ALTER TABLE system.jobs SPLIT AT (select id from system.jobs order by random() limit 3)")
if err != nil {
t.L().Printf("Error splitting %s", err)
}

return nil
}

func pauseResumeChangefeeds(
func checkPersistentUnclaimedJobs(
ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand,
) error {
conn := c.Conn(ctx, t.L(), rng.Intn(nodeCount)+1)
defer conn.Close()

rows, err := conn.QueryContext(ctx, "SELECT job_id FROM [SHOW CHANGEFEED JOBS]")
sqlDB := sqlutils.MakeSQLRunner(conn)

// First, get currently unclaimed running jobs
rows, err := sqlDB.DB.QueryContext(ctx, `
SELECT id
FROM system.jobs
WHERE claim_instance_id IS NULL
AND status NOT IN ('paused', 'canceled', 'failed');
`)
if err != nil {
return err
}
defer rows.Close()

var jobs []jobspb.JobID
var currentUnclaimedJobIDs []jobspb.JobID
for rows.Next() {
var jobID jobspb.JobID
if err := rows.Scan(&jobID); err != nil {
return err
}
jobs = append(jobs, jobID)
currentUnclaimedJobIDs = append(currentUnclaimedJobIDs, jobID)
}
rows.Close()

if len(jobs) < tableCount/10 {
// If no unclaimed jobs, nothing to check
if len(currentUnclaimedJobIDs) == 0 {
return nil
}
jobAction := func(cmd string, count int) {
errCount := 0
for i := 0; i < count; i++ {
jobIdx := rng.Intn(len(jobs))
_, err := conn.Exec(cmd, jobs[jobIdx])
if err != nil {
errCount++

// Check progressively back in time (1m, 2m, 3m, 4m, 5m)
for minutesAgo := 1; minutesAgo <= 5; minutesAgo++ {
// Build the job ID list for the WHERE IN clause
jobIDList := make([]string, len(currentUnclaimedJobIDs))
for i, id := range currentUnclaimedJobIDs {
jobIDList[i] = fmt.Sprintf("%d", id)
}
jobIDsStr := "(" + strings.Join(jobIDList, ", ") + ")"

t.L().Printf("Checking for persistent unclaimed jobs %s from %d minutes ago", jobIDsStr, minutesAgo)

// Query unclaimed jobs at this time in the past using AOST, filtered by current unclaimed job IDs
query := fmt.Sprintf(`
SELECT id
FROM system.jobs AS OF SYSTEM TIME '-%dm'
WHERE claim_instance_id IS NULL
AND status = 'running'
AND id IN %s
`, minutesAgo, jobIDsStr)

rows, err := sqlDB.DB.QueryContext(ctx, query)
if err != nil {
return err
}

var stillUnclaimedJobIDs []jobspb.JobID
for rows.Next() {
var jobID jobspb.JobID
if err := rows.Scan(&jobID); err != nil {
rows.Close()
return err
}
stillUnclaimedJobIDs = append(stillUnclaimedJobIDs, jobID)
}
rows.Close()

// If none of the current unclaimed jobs were unclaimed at this time, they're all new
if len(stillUnclaimedJobIDs) == 0 {
return nil
}
t.L().Printf("Failed to run %s on %d of %d jobs", cmd, errCount, count)

// If we've reached 5 minutes and still have unclaimed jobs, that's an error
if minutesAgo == 5 {
return errors.Errorf("there exists %d jobs that have been persistently unclaimed for at least 5 minutes: %v", len(stillUnclaimedJobIDs), stillUnclaimedJobIDs)
}

// Update the list to only check the jobs that are still unclaimed
currentUnclaimedJobIDs = stillUnclaimedJobIDs
}
jobAction("PAUSE JOB $1", len(jobs)/10)
jobAction("RESUME JOB $1", len(jobs)/10)
return nil

return errors.New("unreachable code reached in checkPersistentUnclaimedJobs")
}

func checkJobSystemHealth(ctx context.Context, t test.Test, c cluster.Cluster, rng *rand.Rand) {
Expand Down
Loading