Skip to content

Commit 6c3f184

Browse files
authored
Merge pull request #151489 from spilchen/backport24.3-150771-151063-151323
release-24.3: sql/ttl: improve TTL replan decision logic
2 parents 122edda + 9666366 commit 6c3f184

File tree

4 files changed

+470
-171
lines changed

4 files changed

+470
-171
lines changed

pkg/cmd/roachtest/tests/ttl_restart.go

Lines changed: 156 additions & 168 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ const jobCheckSQL = `
5353
ORDER BY created DESC LIMIT 1
5454
`
5555

56+
const (
57+
maxRetryAttempts = 5
58+
ttlJobWaitTime = 8 * time.Minute
59+
)
60+
5661
type ttlJobInfo struct {
5762
JobID int
5863
CoordinatorID int
@@ -88,105 +93,80 @@ func runTTLRestart(ctx context.Context, t test.Test, c cluster.Cluster, numResta
8893
db := c.Conn(ctx, t.L(), 1)
8994
defer db.Close()
9095

91-
t.Status("create the table")
92-
setup := []string{
93-
// Speed up the test by doing the replan check often and with a low threshold.
94-
"SET CLUSTER SETTING sql.ttl.replan_flow_frequency = '15s'",
95-
"SET CLUSTER SETTING sql.ttl.replan_flow_threshold = '0.1'",
96-
// Add additional logging to help debug the test on failure.
97-
"SET CLUSTER SETTING server.debug.default_vmodule = 'ttljob_processor=1,distsql_plan_bulk=1'",
98-
// Create the schema to be used in the test
99-
"CREATE DATABASE IF NOT EXISTS ttldb",
100-
"CREATE TABLE IF NOT EXISTS ttldb.tab1 (pk INT8 NOT NULL PRIMARY KEY, ts TIMESTAMP NOT NULL DEFAULT now():::TIMESTAMP)",
96+
// Determine how many nodes we need TTL activity on based on restart scenario
97+
requiredTTLNodes := 3 // Default for numRestartNodes=1
98+
if numRestartNodes == 2 {
99+
requiredTTLNodes = 2
101100
}
102-
for _, stmt := range setup {
103-
if _, err := db.ExecContext(ctx, stmt); err != nil {
104-
return errors.Wrapf(err, "error with statement: %s", stmt)
101+
102+
var jobInfo ttlJobInfo
103+
var ttlNodes map[int]struct{}
104+
105+
// Retry loop: attempt to setup table and get proper TTL distribution
106+
for attempt := 1; attempt <= maxRetryAttempts; attempt++ {
107+
if attempt > 1 {
108+
t.L().Printf("Attempt %d/%d: Retrying table setup due to insufficient TTL distribution", attempt, maxRetryAttempts)
105109
}
106-
}
107110

108-
t.Status("add manual splits so that ranges are distributed evenly across the cluster")
109-
if _, err := db.ExecContext(ctx, "ALTER TABLE ttldb.tab1 SPLIT AT VALUES (6000), (12000)"); err != nil {
110-
return errors.Wrapf(err, "error adding manual splits")
111-
}
111+
// Setup table and data
112+
if err := setupTableAndData(ctx, t, db); err != nil {
113+
return err
114+
}
112115

113-
t.Status("insert data")
114-
if _, err := db.ExecContext(ctx, "INSERT INTO ttldb.tab1 (pk) SELECT generate_series(1, 18000)"); err != nil {
115-
return errors.Wrapf(err, "error ingesting data")
116-
}
116+
// Enable TTL and wait for job to start
117+
var err error
118+
jobInfo, err = enableTTLAndWaitForJob(ctx, t, c, db)
119+
if err != nil {
120+
return err
121+
}
117122

118-
t.Status("relocate ranges to distribute across nodes")
119-
// Moving ranges is put under a SucceedsSoon to account for errors like:
120-
// "lease target replica not found in RangeDescriptor"
121-
testutils.SucceedsSoon(t, func() error { return distributeLeases(ctx, t, db) })
122-
leases, err := gatherLeaseDistribution(ctx, t, db)
123-
if err != nil {
124-
return errors.Wrapf(err, "error gathering lease distribution")
125-
}
126-
showLeaseDistribution(t, leases)
123+
// Reset the connection so that we query from the coordinator as this is the
124+
// only node that will stay up.
125+
if err := db.Close(); err != nil {
126+
return errors.Wrapf(err, "error closing connection")
127+
}
128+
db = c.Conn(ctx, t.L(), jobInfo.CoordinatorID)
127129

128-
t.Status("enable TTL")
129-
ts := db.QueryRowContext(ctx, "SELECT EXTRACT(HOUR FROM now() + INTERVAL '2 minutes') AS hour, EXTRACT(MINUTE FROM now() + INTERVAL '2 minutes') AS minute")
130-
var hour, minute int
131-
if err := ts.Scan(&hour, &minute); err != nil {
132-
return errors.Wrapf(err, "error generating cron expression")
133-
}
134-
ttlCronExpression := fmt.Sprintf("%d %d * * *", minute, hour)
135-
t.L().Printf("using a cron expression of '%s'", ttlCronExpression)
136-
ttlJobSettingSQL := fmt.Sprintf(`ALTER TABLE ttldb.tab1
137-
SET (ttl_expiration_expression = $$(ts::timestamptz + '1 minutes')$$,
138-
ttl_select_batch_size=100,
139-
ttl_delete_batch_size=100,
140-
ttl_select_rate_limit=100,
141-
ttl_job_cron='%s')`,
142-
ttlCronExpression)
143-
if _, err := db.ExecContext(ctx, ttlJobSettingSQL); err != nil {
144-
return errors.Wrapf(err, "error setting TTL attributes")
145-
}
130+
t.Status("check TTL activity distribution across nodes")
131+
err = testutils.SucceedsWithinError(func() error {
132+
var err error
133+
ttlNodes, err = findNodesWithJobLogs(ctx, t, c, jobInfo.JobID)
134+
if err != nil {
135+
return err
136+
}
137+
if len(ttlNodes) < requiredTTLNodes {
138+
return errors.Newf("TTL activity found on only %d nodes (need %d)", len(ttlNodes), requiredTTLNodes)
139+
}
140+
return nil
141+
}, 1*time.Minute)
146142

147-
t.Status("wait for the TTL job to start")
148-
var jobInfo ttlJobInfo
149-
waitForTTLJob := func() error {
150-
var err error
151-
jobInfo, err = findRunningJob(ctx, t, c, db, nil,
152-
false /* expectJobRestart */, false /* allowJobSucceeded */)
153-
return err
154-
}
155-
// The ttl job is scheduled to run in about 2 minutes. Wait for a little
156-
// while longer to give the job system enough time to start the job in case
157-
// the system is slow.
158-
testutils.SucceedsWithin(t, waitForTTLJob, 8*time.Minute)
159-
t.L().Printf("TTL job (ID %d) is running at node %d", jobInfo.JobID, jobInfo.CoordinatorID)
160-
// Reset the connection so that we query from the coordinator as this is the
161-
// only node that will stay up.
162-
if err := db.Close(); err != nil {
163-
return errors.Wrapf(err, "error closing connection")
164-
}
165-
db = c.Conn(ctx, t.L(), jobInfo.CoordinatorID)
143+
if err == nil {
144+
// Success! TTL is distributed across enough nodes
145+
t.L().Printf("TTL job %d found on nodes: %v", jobInfo.JobID, ttlNodes)
146+
break
147+
}
148+
149+
// Handle the error
150+
if ttlNodes != nil && len(ttlNodes) < requiredTTLNodes {
151+
t.L().Printf("Attempt %d/%d: TTL job %d found on nodes: %v", attempt, maxRetryAttempts, jobInfo.JobID, ttlNodes)
152+
t.L().Printf("Attempt %d/%d: TTL activity found on only %d nodes (need %d)", attempt, maxRetryAttempts, len(ttlNodes), requiredTTLNodes)
166153

167-
t.Status("wait for TTL deletions to start happening")
168-
waitForTTLProgressAcrossAllNodes := func() error {
169-
if err := waitForTTLProgressAcrossAllRanges(ctx, db); err != nil {
170-
return errors.Wrapf(err, "error waiting for TTL progress after restart")
154+
if attempt == maxRetryAttempts {
155+
// Final attempt failed - exit successfully as current behavior
156+
t.L().Printf("After %d attempts, TTL activity found on only %d nodes (need %d for restart test). Test completed successfully.", maxRetryAttempts, len(ttlNodes), requiredTTLNodes)
157+
return nil
158+
}
159+
// Continue to next attempt
160+
continue
171161
}
172-
return nil
162+
163+
// Other error that's not related to TTL distribution
164+
return errors.Wrapf(err, "error waiting for TTL activity distribution")
173165
}
174-
testutils.SucceedsWithin(t, waitForTTLProgressAcrossAllNodes, 1*time.Minute)
175166

176167
t.Status("stop non-coordinator nodes")
177168
nonCoordinatorCount := c.Spec().NodeCount - 1
178169
stoppingAllNonCoordinators := numRestartNodes == nonCoordinatorCount
179-
var ttlNodes map[int]struct{}
180-
if !stoppingAllNonCoordinators {
181-
// We need to stop a node that actually executed part of the TTL job.
182-
// Relying on SQL isn't fully reliable due to potential cache staleness.
183-
// Instead, we scan cockroach.log files for known TTL job log markers to
184-
// identify nodes that were truly involved in the job execution.
185-
ttlNodes, err = findNodesWithJobLogs(ctx, t, c, jobInfo.JobID)
186-
if err != nil {
187-
return errors.Wrapf(err, "error finding nodes with job logs")
188-
}
189-
}
190170
stoppedNodes := make([]int, 0)
191171
for node := 1; node <= c.Spec().NodeCount && len(stoppedNodes) < numRestartNodes; node++ {
192172
if node == jobInfo.CoordinatorID {
@@ -207,6 +187,7 @@ func runTTLRestart(ctx context.Context, t test.Test, c cluster.Cluster, numResta
207187

208188
// If we haven't lost quorum, then the TTL job should restart and continue
209189
// working before restarting the down nodes.
190+
var err error
210191
if numRestartNodes <= c.Spec().NodeCount/2 {
211192
t.Status("ensure TTL job restarts")
212193
testutils.SucceedsWithin(t, func() error {
@@ -310,88 +291,6 @@ func distributeLeases(ctx context.Context, t test.Test, db *gosql.DB) error {
310291

311292
}
312293

313-
// waitForTTLProgressAcrossAllRanges ensures that TTL deletions are happening across
314-
// all ranges. It builds a baseline of key counts for each leaseholder's ranges,
315-
// and later checks that each leaseholder made progress on at least one of those ranges,
316-
// regardless of current leaseholder assignment.
317-
func waitForTTLProgressAcrossAllRanges(ctx context.Context, db *gosql.DB) error {
318-
query := `
319-
WITH r AS (
320-
SHOW RANGES FROM TABLE ttldb.tab1 WITH DETAILS
321-
)
322-
SELECT
323-
range_id,
324-
lease_holder,
325-
count(*) AS key_count
326-
FROM
327-
r,
328-
LATERAL crdb_internal.list_sql_keys_in_range(range_id)
329-
GROUP BY
330-
range_id,
331-
lease_holder
332-
ORDER BY
333-
range_id`
334-
335-
// Map of leaseholder -> rangeID -> keyCount
336-
baseline := make(map[int]map[int]int)
337-
338-
rows, err := db.QueryContext(ctx, query)
339-
if err != nil {
340-
return err
341-
}
342-
defer rows.Close()
343-
344-
for rows.Next() {
345-
var rangeID, leaseHolder, keyCount int
346-
if err := rows.Scan(&rangeID, &leaseHolder, &keyCount); err != nil {
347-
return err
348-
}
349-
if _, ok := baseline[leaseHolder]; !ok {
350-
baseline[leaseHolder] = make(map[int]int)
351-
}
352-
baseline[leaseHolder][rangeID] = keyCount
353-
}
354-
355-
compareWithBaseline := func() error {
356-
current := make(map[int]int) // rangeID -> keyCount
357-
358-
rows, err := db.QueryContext(ctx, query)
359-
if err != nil {
360-
return err
361-
}
362-
defer rows.Close()
363-
364-
for rows.Next() {
365-
var rangeID, leaseHolder, keyCount int
366-
if err := rows.Scan(&rangeID, &leaseHolder, &keyCount); err != nil {
367-
return err
368-
}
369-
current[rangeID] = keyCount
370-
}
371-
372-
for leaseHolder, ranges := range baseline {
373-
madeProgress := false
374-
for rangeID, oldCount := range ranges {
375-
newCount, ok := current[rangeID]
376-
if !ok {
377-
return errors.Newf("range %d (from leaseholder %d) not found in follow-up check", rangeID, leaseHolder)
378-
}
379-
if newCount < oldCount {
380-
madeProgress = true
381-
break
382-
}
383-
}
384-
if !madeProgress {
385-
return errors.Newf("leaseholder %d made no progress on any of their original ranges", leaseHolder)
386-
}
387-
}
388-
389-
return nil
390-
}
391-
392-
return testutils.SucceedsWithinError(compareWithBaseline, 20*time.Second)
393-
}
394-
395294
// findRunningJob checks the current state of the TTL job and returns metadata
396295
// about it. If a previous job state (lastJob) is provided, and expectJobRestart
397296
// is true, the function verifies that the job has restarted by comparing resume
@@ -499,7 +398,96 @@ func findNodesWithJobLogs(
499398
nodeList = append(nodeList, node)
500399
}
501400
sort.Ints(nodeList)
502-
t.L().Printf("TTL job %d found on nodes: %v", jobID, nodeList)
503401

504402
return nodesWithJob, nil
505403
}
404+
405+
// setupTableAndData creates the ttldb database and tab1 table, inserts data,
406+
// and distributes ranges across nodes. This function can be called multiple
407+
// times to recreate the table for retry scenarios.
408+
func setupTableAndData(ctx context.Context, t test.Test, db *gosql.DB) error {
409+
t.Status("create/recreate the table")
410+
setup := []string{
411+
// Speed up the test by doing the replan check often and with a low threshold.
412+
"SET CLUSTER SETTING sql.ttl.replan_flow_frequency = '15s'",
413+
"SET CLUSTER SETTING sql.ttl.replan_flow_threshold = '0.1'",
414+
// Disable the stability window to ensure immediate replanning on node changes.
415+
"SET CLUSTER SETTING sql.ttl.replan_stability_window = 1",
416+
// Add additional logging to help debug the test on failure.
417+
"SET CLUSTER SETTING server.debug.default_vmodule = 'ttljob_processor=1,distsql_plan_bulk=1'",
418+
// Drop existing table if it exists
419+
"DROP TABLE IF EXISTS ttldb.tab1",
420+
// Create the schema to be used in the test
421+
"CREATE DATABASE IF NOT EXISTS ttldb",
422+
"CREATE TABLE IF NOT EXISTS ttldb.tab1 (pk INT8 NOT NULL PRIMARY KEY, ts TIMESTAMP NOT NULL DEFAULT now():::TIMESTAMP)",
423+
}
424+
for _, stmt := range setup {
425+
if _, err := db.ExecContext(ctx, stmt); err != nil {
426+
return errors.Wrapf(err, "error with statement: %s", stmt)
427+
}
428+
}
429+
430+
t.Status("add manual splits so that ranges are distributed evenly across the cluster")
431+
if _, err := db.ExecContext(ctx, "ALTER TABLE ttldb.tab1 SPLIT AT VALUES (6000), (12000)"); err != nil {
432+
return errors.Wrapf(err, "error adding manual splits")
433+
}
434+
435+
t.Status("insert data")
436+
if _, err := db.ExecContext(ctx, "INSERT INTO ttldb.tab1 (pk) SELECT generate_series(1, 18000)"); err != nil {
437+
return errors.Wrapf(err, "error ingesting data")
438+
}
439+
440+
t.Status("relocate ranges to distribute across nodes")
441+
// Moving ranges is put under a SucceedsSoon to account for errors like:
442+
// "lease target replica not found in RangeDescriptor"
443+
testutils.SucceedsSoon(t, func() error { return distributeLeases(ctx, t, db) })
444+
leases, err := gatherLeaseDistribution(ctx, t, db)
445+
if err != nil {
446+
return errors.Wrapf(err, "error gathering lease distribution")
447+
}
448+
showLeaseDistribution(t, leases)
449+
450+
return nil
451+
}
452+
453+
// enableTTLAndWaitForJob enables TTL on the table with a cron expression
454+
// set to run in about 2 minutes, then waits for the job to start.
455+
func enableTTLAndWaitForJob(
456+
ctx context.Context, t test.Test, c cluster.Cluster, db *gosql.DB,
457+
) (ttlJobInfo, error) {
458+
var jobInfo ttlJobInfo
459+
460+
t.Status("enable TTL")
461+
ts := db.QueryRowContext(ctx, "SELECT EXTRACT(HOUR FROM now() + INTERVAL '2 minutes') AS hour, EXTRACT(MINUTE FROM now() + INTERVAL '2 minutes') AS minute")
462+
var hour, minute int
463+
if err := ts.Scan(&hour, &minute); err != nil {
464+
return jobInfo, errors.Wrapf(err, "error generating cron expression")
465+
}
466+
ttlCronExpression := fmt.Sprintf("%d %d * * *", minute, hour)
467+
t.L().Printf("using a cron expression of '%s'", ttlCronExpression)
468+
ttlJobSettingSQL := fmt.Sprintf(`ALTER TABLE ttldb.tab1
469+
SET (ttl_expiration_expression = $$(ts::timestamptz + '1 minutes')$$,
470+
ttl_select_batch_size=100,
471+
ttl_delete_batch_size=100,
472+
ttl_select_rate_limit=100,
473+
ttl_job_cron='%s')`,
474+
ttlCronExpression)
475+
if _, err := db.ExecContext(ctx, ttlJobSettingSQL); err != nil {
476+
return jobInfo, errors.Wrapf(err, "error setting TTL attributes")
477+
}
478+
479+
t.Status("wait for the TTL job to start")
480+
waitForTTLJob := func() error {
481+
var err error
482+
jobInfo, err = findRunningJob(ctx, t, c, db, nil,
483+
false /* expectJobRestart */, false /* allowJobSucceeded */)
484+
return err
485+
}
486+
// The ttl job is scheduled to run in about 2 minutes. Wait for a little
487+
// while longer to give the job system enough time to start the job in case
488+
// the system is slow.
489+
testutils.SucceedsWithin(t, waitForTTLJob, ttlJobWaitTime)
490+
t.L().Printf("TTL job (ID %d) is running at node %d", jobInfo.JobID, jobInfo.CoordinatorID)
491+
492+
return jobInfo, nil
493+
}

pkg/sql/ttl/ttljob/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ go_test(
6363
size = "large",
6464
srcs = [
6565
"main_test.go",
66+
"ttljob_internal_test.go",
6667
"ttljob_plans_test.go",
6768
"ttljob_processor_internal_test.go",
6869
"ttljob_processor_test.go",
@@ -100,6 +101,7 @@ go_test(
100101
"//pkg/sql/isql",
101102
"//pkg/sql/lexbase",
102103
"//pkg/sql/parser",
104+
"//pkg/sql/physicalplan",
103105
"//pkg/sql/randgen",
104106
"//pkg/sql/rowenc",
105107
"//pkg/sql/sem/eval",

0 commit comments

Comments
 (0)