@@ -2079,11 +2079,17 @@ func (m *Manager) deleteOrphanedLeasesFromStaleSession(
2079
2079
region = locality .Tiers [0 ].Value
2080
2080
}
2081
2081
2082
+ log .Infof (ctx , "starting orphaned lease cleanup from stale sessions in region %s" , region )
2083
+
2082
2084
var distinctSessions []tree.Datums
2083
2085
aostTime := hlc.Timestamp {WallTime : initialTimestamp }
2084
2086
distinctSessionQuery := `SELECT DISTINCT(session_id) FROM system.lease AS OF SYSTEM TIME %s WHERE crdb_region=$1 AND NOT crdb_internal.sql_liveness_is_alive(session_id, true) LIMIT $2`
2085
2087
syntheticDescriptors := catalog.Descriptors {systemschema .LeaseTable ()}
2086
2088
const limit = 50
2089
+
2090
+ totalSessionsProcessed := 0
2091
+ totalLeasesDeleted := 0
2092
+
2087
2093
for r := retry .StartWithCtx (ctx , retryOpts ); r .Next (); {
2088
2094
// Get a list of distinct, dead session IDs that exist in the system.lease
2089
2095
// table.
@@ -2105,20 +2111,35 @@ func (m *Manager) deleteOrphanedLeasesFromStaleSession(
2105
2111
}
2106
2112
}
2107
2113
2114
+ if len (distinctSessions ) > 0 {
2115
+ log .Infof (ctx , "found %d dead sessions from which to clean up orphaned leases" , len (distinctSessions ))
2116
+ }
2117
+
2108
2118
// Delete rows in our lease table with orphaned sessions.
2109
2119
for _ , sessionRow := range distinctSessions {
2110
2120
sessionID := sqlliveness .SessionID (tree .MustBeDBytes (sessionRow [0 ]))
2111
- if err = deleteLeaseWithSessionIDWithBatch (ctx , ex , retryOpts , syntheticDescriptors , sessionID , region , limit ); err != nil {
2112
- log .Warningf (ctx , "unable to delete orphaned leases: %v" , err )
2121
+ sessionLeasesDeleted , err := deleteLeaseWithSessionIDWithBatch (ctx , ex , retryOpts , syntheticDescriptors , sessionID , region , limit )
2122
+ if err != nil {
2123
+ log .Warningf (ctx , "unable to delete orphaned leases for session %s: %v" , sessionID , err )
2113
2124
break
2114
2125
}
2126
+ totalLeasesDeleted += sessionLeasesDeleted
2127
+ log .Infof (ctx , "deleted %d orphaned leases for dead session %s" , sessionLeasesDeleted , sessionID )
2115
2128
}
2116
2129
2130
+ totalSessionsProcessed += len (distinctSessions )
2131
+
2117
2132
// No more dead sessions to clean up.
2118
2133
if len (distinctSessions ) < limit {
2134
+ log .Infof (ctx , "completed orphaned lease cleanup for region %s: %d sessions processed, %d leases deleted" ,
2135
+ region , totalSessionsProcessed , totalLeasesDeleted )
2119
2136
return
2120
2137
}
2121
2138
2139
+ // Log progress for large cleanup operations.
2140
+ log .Infof (ctx , "orphaned lease cleanup progress for region %s: %d sessions processed, %d leases deleted so far" ,
2141
+ region , totalSessionsProcessed , totalLeasesDeleted )
2142
+
2122
2143
// Advance our aostTime timstamp so that our query to detect leases with
2123
2144
// dead sessions is aware of new deletes and does not keep selecting the
2124
2145
// same leases.
@@ -2127,7 +2148,7 @@ func (m *Manager) deleteOrphanedLeasesFromStaleSession(
2127
2148
}
2128
2149
2129
2150
// deleteLeaseWithSessionIDWithBatch uses batchSize to batch deletes for leases
2130
- // with the given sessionID in system.lease.
2151
+ // with the given sessionID in system.lease. Returns the total number of leases deleted.
2131
2152
func deleteLeaseWithSessionIDWithBatch (
2132
2153
ctx context.Context ,
2133
2154
ex isql.Executor ,
@@ -2136,7 +2157,8 @@ func deleteLeaseWithSessionIDWithBatch(
2136
2157
sessionID sqlliveness.SessionID ,
2137
2158
region string ,
2138
2159
batchSize int ,
2139
- ) error {
2160
+ ) (int , error ) {
2161
+ totalDeleted := 0
2140
2162
for r := retry .StartWithCtx (ctx , retryOpts ); r .Next (); {
2141
2163
var rowsDeleted int
2142
2164
deleteOrphanedQuery := `DELETE FROM system.lease WHERE session_id=$1 AND crdb_region=$2 LIMIT $3`
@@ -2151,16 +2173,17 @@ func deleteLeaseWithSessionIDWithBatch(
2151
2173
return err
2152
2174
}); err != nil {
2153
2175
if ! startup .IsRetryableReplicaError (err ) {
2154
- return err
2176
+ return totalDeleted , err
2155
2177
}
2156
2178
}
2179
+ totalDeleted += rowsDeleted
2157
2180
2158
2181
// No more rows to clean up.
2159
2182
if rowsDeleted < batchSize {
2160
2183
break
2161
2184
}
2162
2185
}
2163
- return nil
2186
+ return totalDeleted , nil
2164
2187
}
2165
2188
2166
2189
func (m * Manager ) deleteOrphanedLeasesWithSameInstanceID (
@@ -2196,8 +2219,15 @@ func (m *Manager) deleteOrphanedLeasesWithSameInstanceID(
2196
2219
log .Warningf (ctx , "unable to read orphaned leases: %v" , err )
2197
2220
return
2198
2221
}
2222
+
2223
+ totalLeases := len (rows )
2224
+ log .Infof (ctx , "found %d orphaned leases to clean up for instance ID %d" , totalLeases , instanceID )
2225
+ if totalLeases == 0 {
2226
+ return
2227
+ }
2228
+
2199
2229
var wg sync.WaitGroup
2200
- defer wg . Wait ()
2230
+ var releasedCount atomic. Int64
2201
2231
for i := range rows {
2202
2232
// Early exit?
2203
2233
row := rows [i ]
@@ -2222,11 +2252,23 @@ func (m *Manager) deleteOrphanedLeasesWithSameInstanceID(
2222
2252
WaitForSem : true ,
2223
2253
},
2224
2254
func (ctx context.Context ) {
2255
+ defer wg .Done ()
2225
2256
m .storage .release (ctx , m .stopper , lease )
2257
+ released := releasedCount .Add (1 )
2226
2258
log .Infof (ctx , "released orphaned lease: %+v" , lease )
2227
- wg .Done ()
2259
+
2260
+ // Log progress every 100 leases for large cleanup operations.
2261
+ if released % 100 == 0 || released == int64 (totalLeases ) {
2262
+ log .Infof (ctx , "orphaned lease cleanup progress for instance ID %d: %d/%d leases released" ,
2263
+ instanceID , released , totalLeases )
2264
+ }
2228
2265
}); err != nil {
2266
+ log .Warningf (ctx , "could not start async task for releasing orphaned lease %+v: %v" , lease , err )
2229
2267
wg .Done ()
2230
2268
}
2231
2269
}
2270
+
2271
+ wg .Wait ()
2272
+ log .Infof (ctx , "completed orphaned lease cleanup for instance ID %d: %d/%d leases released" ,
2273
+ instanceID , releasedCount .Load (), totalLeases )
2232
2274
}
0 commit comments