@@ -2401,11 +2401,17 @@ func (m *Manager) deleteOrphanedLeasesFromStaleSession(
2401
2401
region = locality .Tiers [0 ].Value
2402
2402
}
2403
2403
2404
+ log .Infof (ctx , "starting orphaned lease cleanup from stale sessions in region %s" , region )
2405
+
2404
2406
var distinctSessions []tree.Datums
2405
2407
aostTime := hlc.Timestamp {WallTime : initialTimestamp }
2406
2408
distinctSessionQuery := `SELECT DISTINCT(session_id) FROM system.lease AS OF SYSTEM TIME %s WHERE crdb_region=$1 AND NOT crdb_internal.sql_liveness_is_alive(session_id, true) LIMIT $2`
2407
2409
syntheticDescriptors := catalog.Descriptors {systemschema .LeaseTable ()}
2408
2410
const limit = 50
2411
+
2412
+ totalSessionsProcessed := 0
2413
+ totalLeasesDeleted := 0
2414
+
2409
2415
for r := retry .StartWithCtx (ctx , retryOpts ); r .Next (); {
2410
2416
// Get a list of distinct, dead session IDs that exist in the system.lease
2411
2417
// table.
@@ -2427,20 +2433,35 @@ func (m *Manager) deleteOrphanedLeasesFromStaleSession(
2427
2433
}
2428
2434
}
2429
2435
2436
+ if len (distinctSessions ) > 0 {
2437
+ log .Infof (ctx , "found %d dead sessions from which to clean up orphaned leases" , len (distinctSessions ))
2438
+ }
2439
+
2430
2440
// Delete rows in our lease table with orphaned sessions.
2431
2441
for _ , sessionRow := range distinctSessions {
2432
2442
sessionID := sqlliveness .SessionID (tree .MustBeDBytes (sessionRow [0 ]))
2433
- if err = deleteLeaseWithSessionIDWithBatch (ctx , ex , retryOpts , syntheticDescriptors , sessionID , region , limit ); err != nil {
2434
- log .Warningf (ctx , "unable to delete orphaned leases: %v" , err )
2443
+ sessionLeasesDeleted , err := deleteLeaseWithSessionIDWithBatch (ctx , ex , retryOpts , syntheticDescriptors , sessionID , region , limit )
2444
+ if err != nil {
2445
+ log .Warningf (ctx , "unable to delete orphaned leases for session %s: %v" , sessionID , err )
2435
2446
break
2436
2447
}
2448
+ totalLeasesDeleted += sessionLeasesDeleted
2449
+ log .Infof (ctx , "deleted %d orphaned leases for dead session %s" , sessionLeasesDeleted , sessionID )
2437
2450
}
2438
2451
2452
+ totalSessionsProcessed += len (distinctSessions )
2453
+
2439
2454
// No more dead sessions to clean up.
2440
2455
if len (distinctSessions ) < limit {
2456
+ log .Infof (ctx , "completed orphaned lease cleanup for region %s: %d sessions processed, %d leases deleted" ,
2457
+ region , totalSessionsProcessed , totalLeasesDeleted )
2441
2458
return
2442
2459
}
2443
2460
2461
+ // Log progress for large cleanup operations.
2462
+ log .Infof (ctx , "orphaned lease cleanup progress for region %s: %d sessions processed, %d leases deleted so far" ,
2463
+ region , totalSessionsProcessed , totalLeasesDeleted )
2464
+
2444
2465
// Advance our aostTime timstamp so that our query to detect leases with
2445
2466
// dead sessions is aware of new deletes and does not keep selecting the
2446
2467
// same leases.
@@ -2449,7 +2470,7 @@ func (m *Manager) deleteOrphanedLeasesFromStaleSession(
2449
2470
}
2450
2471
2451
2472
// deleteLeaseWithSessionIDWithBatch uses batchSize to batch deletes for leases
2452
- // with the given sessionID in system.lease.
2473
+ // with the given sessionID in system.lease. Returns the total number of leases deleted.
2453
2474
func deleteLeaseWithSessionIDWithBatch (
2454
2475
ctx context.Context ,
2455
2476
ex isql.Executor ,
@@ -2458,7 +2479,8 @@ func deleteLeaseWithSessionIDWithBatch(
2458
2479
sessionID sqlliveness.SessionID ,
2459
2480
region string ,
2460
2481
batchSize int ,
2461
- ) error {
2482
+ ) (int , error ) {
2483
+ totalDeleted := 0
2462
2484
for r := retry .StartWithCtx (ctx , retryOpts ); r .Next (); {
2463
2485
var rowsDeleted int
2464
2486
deleteOrphanedQuery := `DELETE FROM system.lease WHERE session_id=$1 AND crdb_region=$2 LIMIT $3`
@@ -2473,16 +2495,17 @@ func deleteLeaseWithSessionIDWithBatch(
2473
2495
return err
2474
2496
}); err != nil {
2475
2497
if ! startup .IsRetryableReplicaError (err ) {
2476
- return err
2498
+ return totalDeleted , err
2477
2499
}
2478
2500
}
2501
+ totalDeleted += rowsDeleted
2479
2502
2480
2503
// No more rows to clean up.
2481
2504
if rowsDeleted < batchSize {
2482
2505
break
2483
2506
}
2484
2507
}
2485
- return nil
2508
+ return totalDeleted , nil
2486
2509
}
2487
2510
2488
2511
func (m * Manager ) deleteOrphanedLeasesWithSameInstanceID (
@@ -2518,8 +2541,15 @@ func (m *Manager) deleteOrphanedLeasesWithSameInstanceID(
2518
2541
log .Warningf (ctx , "unable to read orphaned leases: %v" , err )
2519
2542
return
2520
2543
}
2544
+
2545
+ totalLeases := len (rows )
2546
+ log .Infof (ctx , "found %d orphaned leases to clean up for instance ID %d" , totalLeases , instanceID )
2547
+ if totalLeases == 0 {
2548
+ return
2549
+ }
2550
+
2521
2551
var wg sync.WaitGroup
2522
- defer wg . Wait ()
2552
+ var releasedCount atomic. Int64
2523
2553
for i := range rows {
2524
2554
// Early exit?
2525
2555
row := rows [i ]
@@ -2544,13 +2574,25 @@ func (m *Manager) deleteOrphanedLeasesWithSameInstanceID(
2544
2574
WaitForSem : true ,
2545
2575
},
2546
2576
func (ctx context.Context ) {
2577
+ defer wg .Done ()
2547
2578
m .storage .release (ctx , m .stopper , lease )
2579
+ released := releasedCount .Add (1 )
2548
2580
log .Infof (ctx , "released orphaned lease: %+v" , lease )
2549
- wg .Done ()
2581
+
2582
+ // Log progress every 100 leases for large cleanup operations.
2583
+ if released % 100 == 0 || released == int64 (totalLeases ) {
2584
+ log .Infof (ctx , "orphaned lease cleanup progress for instance ID %d: %d/%d leases released" ,
2585
+ instanceID , released , totalLeases )
2586
+ }
2550
2587
}); err != nil {
2588
+ log .Warningf (ctx , "could not start async task for releasing orphaned lease %+v: %v" , lease , err )
2551
2589
wg .Done ()
2552
2590
}
2553
2591
}
2592
+
2593
+ wg .Wait ()
2594
+ log .Infof (ctx , "completed orphaned lease cleanup for instance ID %d: %d/%d leases released" ,
2595
+ instanceID , releasedCount .Load (), totalLeases )
2554
2596
}
2555
2597
2556
2598
// TestingGetBoundAccount returns the bound account used by the lease manager.
0 commit comments