@@ -93,6 +93,8 @@ func runTTLRestart(ctx context.Context, t test.Test, c cluster.Cluster, numResta
93
93
// Speed up the test by doing the replan check often and with a low threshold.
94
94
"SET CLUSTER SETTING sql.ttl.replan_flow_frequency = '15s'" ,
95
95
"SET CLUSTER SETTING sql.ttl.replan_flow_threshold = '0.1'" ,
96
+ // Disable the stability window to ensure immediate replanning on node changes.
97
+ "SET CLUSTER SETTING sql.ttl.replan_stability_window = 1" ,
96
98
// Add additional logging to help debug the test on failure.
97
99
"SET CLUSTER SETTING server.debug.default_vmodule = 'ttljob_processor=1,distsql_plan_bulk=1'" ,
98
100
// Create the schema to be used in the test
@@ -165,8 +167,13 @@ func runTTLRestart(ctx context.Context, t test.Test, c cluster.Cluster, numResta
165
167
db = c .Conn (ctx , t .L (), jobInfo .CoordinatorID )
166
168
167
169
t .Status ("wait for TTL deletions to start happening" )
170
+ // Take baseline once and reuse it for all progress checks
171
+ baseline , err := takeProgressBaseline (ctx , t , db )
172
+ if err != nil {
173
+ return errors .Wrapf (err , "error taking TTL progress baseline" )
174
+ }
168
175
waitForTTLProgressAcrossAllNodes := func () error {
169
- if err := waitForTTLProgressAcrossAllRanges (ctx , db ); err != nil {
176
+ if err := checkTTLProgressAgainstBaseline (ctx , db , baseline ); err != nil {
170
177
return errors .Wrapf (err , "error waiting for TTL progress after restart" )
171
178
}
172
179
return nil
@@ -310,11 +317,11 @@ func distributeLeases(ctx context.Context, t test.Test, db *gosql.DB) error {
310
317
311
318
}
312
319
313
- // waitForTTLProgressAcrossAllRanges ensures that TTL deletions are happening across
314
- // all ranges. It builds a baseline of key counts for each leaseholder's ranges,
315
- // and later checks that each leaseholder made progress on at least one of those ranges,
316
- // regardless of current leaseholder assignment.
317
- func waitForTTLProgressAcrossAllRanges ( ctx context. Context , db * gosql. DB ) error {
320
+ // takeProgressBaseline captures the initial key counts for each range and its leaseholder.
321
+ // This baseline will be used later to check if TTL progress is being made.
322
+ func takeProgressBaseline (
323
+ ctx context. Context , t test. Test , db * gosql. DB ,
324
+ ) ( map [ int ] map [ int ] int , error ) {
318
325
query := `
319
326
WITH r AS (
320
327
SHOW RANGES FROM TABLE ttldb.tab1 WITH DETAILS
@@ -337,59 +344,79 @@ func waitForTTLProgressAcrossAllRanges(ctx context.Context, db *gosql.DB) error
337
344
338
345
rows , err := db .QueryContext (ctx , query )
339
346
if err != nil {
340
- return err
347
+ return nil , err
341
348
}
342
349
defer rows .Close ()
343
350
344
351
for rows .Next () {
345
352
var rangeID , leaseHolder , keyCount int
346
353
if err := rows .Scan (& rangeID , & leaseHolder , & keyCount ); err != nil {
347
- return err
354
+ return nil , err
348
355
}
349
356
if _ , ok := baseline [leaseHolder ]; ! ok {
350
357
baseline [leaseHolder ] = make (map [int ]int )
351
358
}
352
359
baseline [leaseHolder ][rangeID ] = keyCount
353
360
}
354
361
355
- compareWithBaseline := func () error {
356
- current := make ( map [ int ] int ) // rangeID -> keyCount
362
+ return baseline , nil
363
+ }
357
364
358
- rows , err := db .QueryContext (ctx , query )
359
- if err != nil {
360
- return err
361
- }
362
- defer rows .Close ()
365
+ // checkTTLProgressAgainstBaseline checks if each leaseholder has made progress
366
+ // on at least one of their original ranges compared to the provided baseline.
367
+ func checkTTLProgressAgainstBaseline (
368
+ ctx context.Context , db * gosql.DB , baseline map [int ]map [int ]int ,
369
+ ) error {
370
+ query := `
371
+ WITH r AS (
372
+ SHOW RANGES FROM TABLE ttldb.tab1 WITH DETAILS
373
+ )
374
+ SELECT
375
+ range_id,
376
+ lease_holder,
377
+ count(*) AS key_count
378
+ FROM
379
+ r,
380
+ LATERAL crdb_internal.list_sql_keys_in_range(range_id)
381
+ GROUP BY
382
+ range_id,
383
+ lease_holder
384
+ ORDER BY
385
+ range_id`
363
386
364
- for rows .Next () {
365
- var rangeID , leaseHolder , keyCount int
366
- if err := rows .Scan (& rangeID , & leaseHolder , & keyCount ); err != nil {
367
- return err
368
- }
369
- current [rangeID ] = keyCount
387
+ current := make (map [int ]int ) // rangeID -> keyCount
388
+
389
+ rows , err := db .QueryContext (ctx , query )
390
+ if err != nil {
391
+ return err
392
+ }
393
+ defer rows .Close ()
394
+
395
+ for rows .Next () {
396
+ var rangeID , leaseHolder , keyCount int
397
+ if err := rows .Scan (& rangeID , & leaseHolder , & keyCount ); err != nil {
398
+ return err
370
399
}
400
+ current [rangeID ] = keyCount
401
+ }
371
402
372
- for leaseHolder , ranges := range baseline {
373
- madeProgress := false
374
- for rangeID , oldCount := range ranges {
375
- newCount , ok := current [rangeID ]
376
- if ! ok {
377
- return errors .Newf ("range %d (from leaseholder %d) not found in follow-up check" , rangeID , leaseHolder )
378
- }
379
- if newCount < oldCount {
380
- madeProgress = true
381
- break
382
- }
403
+ for leaseHolder , ranges := range baseline {
404
+ madeProgress := false
405
+ for rangeID , oldCount := range ranges {
406
+ newCount , ok := current [rangeID ]
407
+ if ! ok {
408
+ return errors .Newf ("range %d (from leaseholder %d) not found in follow-up check" , rangeID , leaseHolder )
383
409
}
384
- if ! madeProgress {
385
- return errors . Newf ( "leaseholder %d made no progress on any of their original ranges" , leaseHolder )
410
+ if newCount < oldCount {
411
+ madeProgress = true
386
412
}
387
413
}
388
-
389
- return nil
414
+ if ! madeProgress {
415
+ return errors .Newf ("leaseholder %d made no progress on any of their original ranges" , leaseHolder )
416
+ }
390
417
}
391
418
392
- return testutils . SucceedsWithinError ( compareWithBaseline , 20 * time . Second )
419
+ return nil
393
420
}
394
421
395
422
// findRunningJob checks the current state of the TTL job and returns metadata
0 commit comments