@@ -2431,11 +2431,17 @@ func (m *Manager) deleteOrphanedLeasesFromStaleSession(
2431
2431
region = locality .Tiers [0 ].Value
2432
2432
}
2433
2433
2434
+ log .Infof (ctx , "starting orphaned lease cleanup from stale sessions in region %s" , region )
2435
+
2434
2436
var distinctSessions []tree.Datums
2435
2437
aostTime := hlc.Timestamp {WallTime : initialTimestamp }
2436
2438
distinctSessionQuery := `SELECT DISTINCT(session_id) FROM system.lease AS OF SYSTEM TIME %s WHERE crdb_region=$1 AND NOT crdb_internal.sql_liveness_is_alive(session_id, true) LIMIT $2`
2437
2439
syntheticDescriptors := catalog.Descriptors {systemschema .LeaseTable ()}
2438
2440
const limit = 50
2441
+
2442
+ totalSessionsProcessed := 0
2443
+ totalLeasesDeleted := 0
2444
+
2439
2445
for r := retry .StartWithCtx (ctx , retryOpts ); r .Next (); {
2440
2446
// Get a list of distinct, dead session IDs that exist in the system.lease
2441
2447
// table.
@@ -2457,20 +2463,35 @@ func (m *Manager) deleteOrphanedLeasesFromStaleSession(
2457
2463
}
2458
2464
}
2459
2465
2466
+ if len (distinctSessions ) > 0 {
2467
+ log .Infof (ctx , "found %d dead sessions from which to clean up orphaned leases" , len (distinctSessions ))
2468
+ }
2469
+
2460
2470
// Delete rows in our lease table with orphaned sessions.
2461
2471
for _ , sessionRow := range distinctSessions {
2462
2472
sessionID := sqlliveness .SessionID (tree .MustBeDBytes (sessionRow [0 ]))
2463
- if err = deleteLeaseWithSessionIDWithBatch (ctx , ex , retryOpts , syntheticDescriptors , sessionID , region , limit ); err != nil {
2464
- log .Warningf (ctx , "unable to delete orphaned leases: %v" , err )
2473
+ sessionLeasesDeleted , err := deleteLeaseWithSessionIDWithBatch (ctx , ex , retryOpts , syntheticDescriptors , sessionID , region , limit )
2474
+ if err != nil {
2475
+ log .Warningf (ctx , "unable to delete orphaned leases for session %s: %v" , sessionID , err )
2465
2476
break
2466
2477
}
2478
+ totalLeasesDeleted += sessionLeasesDeleted
2479
+ log .Infof (ctx , "deleted %d orphaned leases for dead session %s" , sessionLeasesDeleted , sessionID )
2467
2480
}
2468
2481
2482
+ totalSessionsProcessed += len (distinctSessions )
2483
+
2469
2484
// No more dead sessions to clean up.
2470
2485
if len (distinctSessions ) < limit {
2486
+ log .Infof (ctx , "completed orphaned lease cleanup for region %s: %d sessions processed, %d leases deleted" ,
2487
+ region , totalSessionsProcessed , totalLeasesDeleted )
2471
2488
return
2472
2489
}
2473
2490
2491
+ // Log progress for large cleanup operations.
2492
+ log .Infof (ctx , "orphaned lease cleanup progress for region %s: %d sessions processed, %d leases deleted so far" ,
2493
+ region , totalSessionsProcessed , totalLeasesDeleted )
2494
+
2474
2495
// Advance our aostTime timstamp so that our query to detect leases with
2475
2496
// dead sessions is aware of new deletes and does not keep selecting the
2476
2497
// same leases.
@@ -2479,7 +2500,7 @@ func (m *Manager) deleteOrphanedLeasesFromStaleSession(
2479
2500
}
2480
2501
2481
2502
// deleteLeaseWithSessionIDWithBatch uses batchSize to batch deletes for leases
2482
- // with the given sessionID in system.lease.
2503
+ // with the given sessionID in system.lease. Returns the total number of leases deleted.
2483
2504
func deleteLeaseWithSessionIDWithBatch (
2484
2505
ctx context.Context ,
2485
2506
ex isql.Executor ,
@@ -2488,7 +2509,8 @@ func deleteLeaseWithSessionIDWithBatch(
2488
2509
sessionID sqlliveness.SessionID ,
2489
2510
region string ,
2490
2511
batchSize int ,
2491
- ) error {
2512
+ ) (int , error ) {
2513
+ totalDeleted := 0
2492
2514
for r := retry .StartWithCtx (ctx , retryOpts ); r .Next (); {
2493
2515
var rowsDeleted int
2494
2516
deleteOrphanedQuery := `DELETE FROM system.lease WHERE session_id=$1 AND crdb_region=$2 LIMIT $3`
@@ -2503,16 +2525,17 @@ func deleteLeaseWithSessionIDWithBatch(
2503
2525
return err
2504
2526
}); err != nil {
2505
2527
if ! startup .IsRetryableReplicaError (err ) {
2506
- return err
2528
+ return totalDeleted , err
2507
2529
}
2508
2530
}
2531
+ totalDeleted += rowsDeleted
2509
2532
2510
2533
// No more rows to clean up.
2511
2534
if rowsDeleted < batchSize {
2512
2535
break
2513
2536
}
2514
2537
}
2515
- return nil
2538
+ return totalDeleted , nil
2516
2539
}
2517
2540
2518
2541
func (m * Manager ) deleteOrphanedLeasesWithSameInstanceID (
@@ -2548,8 +2571,15 @@ func (m *Manager) deleteOrphanedLeasesWithSameInstanceID(
2548
2571
log .Warningf (ctx , "unable to read orphaned leases: %v" , err )
2549
2572
return
2550
2573
}
2574
+
2575
+ totalLeases := len (rows )
2576
+ log .Infof (ctx , "found %d orphaned leases to clean up for instance ID %d" , totalLeases , instanceID )
2577
+ if totalLeases == 0 {
2578
+ return
2579
+ }
2580
+
2551
2581
var wg sync.WaitGroup
2552
- defer wg . Wait ()
2582
+ var releasedCount atomic. Int64
2553
2583
for i := range rows {
2554
2584
// Early exit?
2555
2585
row := rows [i ]
@@ -2574,13 +2604,25 @@ func (m *Manager) deleteOrphanedLeasesWithSameInstanceID(
2574
2604
WaitForSem : true ,
2575
2605
},
2576
2606
func (ctx context.Context ) {
2607
+ defer wg .Done ()
2577
2608
m .storage .release (ctx , m .stopper , lease )
2609
+ released := releasedCount .Add (1 )
2578
2610
log .Infof (ctx , "released orphaned lease: %+v" , lease )
2579
- wg .Done ()
2611
+
2612
+ // Log progress every 100 leases for large cleanup operations.
2613
+ if released % 100 == 0 || released == int64 (totalLeases ) {
2614
+ log .Infof (ctx , "orphaned lease cleanup progress for instance ID %d: %d/%d leases released" ,
2615
+ instanceID , released , totalLeases )
2616
+ }
2580
2617
}); err != nil {
2618
+ log .Warningf (ctx , "could not start async task for releasing orphaned lease %+v: %v" , lease , err )
2581
2619
wg .Done ()
2582
2620
}
2583
2621
}
2622
+
2623
+ wg .Wait ()
2624
+ log .Infof (ctx , "completed orphaned lease cleanup for instance ID %d: %d/%d leases released" ,
2625
+ instanceID , releasedCount .Load (), totalLeases )
2584
2626
}
2585
2627
2586
2628
// TestingGetBoundAccount returns the bound account used by the lease manager.
0 commit comments