Skip to content

Commit ce88073

Browse files
craig[bot]fqazi
andcommitted
Merge #151897
151897: catalog/lease: handle errRenewLease in purgeOldVersions r=fqazi a=fqazi Previously, when purge old versions attempted to acquire a lease on the newest version of a descriptor it was possible that a errRenewLease could be hit. This would happen if we weren't able to referesh the sqlliveness sessions fast enough and descriptors never being cleaned up from the system.lease table. To address this, this patch adds defensive code to clean up all leases in storge if this happens. It also fixes incorrect handling of inative descriptors that could lead to leaked descriptors. Fixes: #151656 Release note: None Co-authored-by: Faizan Qazi <[email protected]>
2 parents 55223e2 + c8a127c commit ce88073

File tree

3 files changed

+51
-21
lines changed

3 files changed

+51
-21
lines changed

pkg/sql/catalog/lease/descriptor_version_state.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,11 @@ func (s *descriptorVersionState) hasExpired(ctx context.Context, timestamp hlc.T
138138
return s.getExpiration(ctx).LessEq(timestamp)
139139
}
140140

141+
// hasFixedExpiration returns if the descriptor has a fixed expiration.
142+
func (s *descriptorVersionState) hasFixedExpiration() bool {
143+
return s.expiration.Load() != nil
144+
}
145+
141146
func (s *descriptorVersionState) incRefCount(ctx context.Context, expensiveLogEnabled bool) {
142147
s.refcount.Add(1)
143148
if expensiveLogEnabled {

pkg/sql/catalog/lease/lease.go

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1119,13 +1119,8 @@ func releaseLease(ctx context.Context, lease *storedLease, m *Manager) (released
11191119
// which will cause existing in-use leases to be eagerly released once
11201120
// they're not in use any more.
11211121
// If t has no active leases, nothing is done.
1122-
func purgeOldVersions(
1123-
ctx context.Context,
1124-
db *kv.DB,
1125-
id descpb.ID,
1126-
dropped bool,
1127-
minVersion descpb.DescriptorVersion,
1128-
m *Manager,
1122+
func (m *Manager) purgeOldVersions(
1123+
ctx context.Context, db *kv.DB, id descpb.ID, dropped bool, minVersion descpb.DescriptorVersion,
11291124
) error {
11301125
t := m.findDescriptorState(id, false /*create*/)
11311126
if t == nil {
@@ -1193,10 +1188,40 @@ func purgeOldVersions(
11931188
return err
11941189
}
11951190

1196-
// Acquire a refcount on the descriptor on the latest version to maintain an
1197-
// active lease, so that it doesn't get released when removeInactives()
1198-
// is called below. Release this lease after calling removeInactives().
1199-
desc, _, err := t.findForTimestamp(ctx, m.storage.clock.Now())
1191+
var err error
1192+
var desc *descriptorVersionState
1193+
for r := retry.StartWithCtx(ctx,
1194+
retry.Options{
1195+
MaxDuration: time.Second * 30}); r.Next(); {
1196+
// Acquire a refcount on the descriptor on the latest version to maintain an
1197+
// active lease, so that it doesn't get released when removeInactives()
1198+
// is called below. Release this lease after calling removeInactives().
1199+
desc, _, err = t.findForTimestamp(ctx, m.storage.clock.Now())
1200+
if err == nil || !errors.Is(err, errRenewLease) {
1201+
break
1202+
}
1203+
// We encountered an error telling us to renew the lease.
1204+
newest := m.findNewest(id)
1205+
// Assert this should never happen due to a fixed expiration, since the range
1206+
// feed is responsible for purging old versions and acquiring new versions.
1207+
if newest.hasFixedExpiration() {
1208+
return errors.AssertionFailedf("the latest version of the descriptor has" +
1209+
"a fixed expiration, this should never happen")
1210+
}
1211+
// Otherwise, we ran into some type of transient issue, where the sqllivness
1212+
// session was expired. This could happen if the sqlliveness range is slow
1213+
// for some reason.
1214+
log.Infof(ctx, "unable to acquire lease on latest descriptor "+
1215+
"version of ID: %d, retrying...", id)
1216+
}
1217+
// As a last resort, we will release all versions of the descriptor. This is
1218+
// suboptimal, but the safest option.
1219+
if errors.Is(err, errRenewLease) {
1220+
log.Warningf(ctx, "unable to acquire lease on latest descriptor "+
1221+
"version of ID: %d, cleaning up all versions from storage.", id)
1222+
err = nil
1223+
}
1224+
12001225
if isInactive := catalog.HasInactiveDescriptorError(err); err == nil || isInactive {
12011226
removeInactives(isInactive)
12021227
if desc != nil {
@@ -1866,7 +1891,7 @@ func (m *Manager) StartRefreshLeasesTask(ctx context.Context, s *stop.Stopper, d
18661891
defer m.leaseGeneration.Add(1)
18671892
state := m.findNewest(id)
18681893
if state != nil {
1869-
if err := purgeOldVersions(ctx, db, id, true /* dropped */, state.GetVersion(), m); err != nil {
1894+
if err := m.purgeOldVersions(ctx, db, id, true /* dropped */, state.GetVersion()); err != nil {
18701895
log.Warningf(ctx, "error purging leases for deleted descriptor %d",
18711896
id)
18721897
}
@@ -1926,7 +1951,7 @@ func (m *Manager) StartRefreshLeasesTask(ctx context.Context, s *stop.Stopper, d
19261951
// descriptor versions, which could have been acquired concurrently.
19271952
// For example the range feed sees version 2 and a query concurrently
19281953
// acquires version 1.
1929-
if err := purgeOldVersions(ctx, db, desc.GetID(), dropped, desc.GetVersion(), m); err != nil {
1954+
if err := m.purgeOldVersions(ctx, db, desc.GetID(), dropped, desc.GetVersion()); err != nil {
19301955
log.Warningf(ctx, "error purging leases for descriptor %d(%s): %s",
19311956
desc.GetID(), desc.GetName(), err)
19321957
}
@@ -2347,8 +2372,8 @@ func (m *Manager) refreshSomeLeases(ctx context.Context, refreshAndPurgeAllDescr
23472372

23482373
if errors.Is(err, catalog.ErrDescriptorNotFound) || errors.Is(err, catalog.ErrDescriptorDropped) {
23492374
// Lease renewal failed due to removed descriptor; Remove this descriptor from cache.
2350-
if err := purgeOldVersions(
2351-
ctx, m.storage.db.KV(), id, true /* dropped */, 0 /* minVersion */, m,
2375+
if err := m.purgeOldVersions(
2376+
ctx, m.storage.db.KV(), id, true /* dropped */, 0, /* minVersion */
23522377
); err != nil {
23532378
log.Warningf(ctx, "error purging leases for descriptor %d: %v",
23542379
id, err)
@@ -2363,7 +2388,7 @@ func (m *Manager) refreshSomeLeases(ctx context.Context, refreshAndPurgeAllDescr
23632388
if refreshAndPurgeAllDescriptors {
23642389
// If we are refreshing all descriptors, then we want to purge older versions as
23652390
// we are doing this operation.
2366-
err := purgeOldVersions(ctx, m.storage.db.KV(), id, false /* dropped */, 0 /* minVersion */, m)
2391+
err := m.purgeOldVersions(ctx, m.storage.db.KV(), id, false /* dropped */, 0 /* minVersion */)
23672392
if err != nil {
23682393
log.Warningf(ctx, "error purging leases for descriptor %d: %v",
23692394
id, err)

pkg/sql/catalog/lease/lease_internal_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,8 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);
208208
t.Fatalf("found %d versions instead of 2", numLeases)
209209
}
210210
ctx := context.Background()
211-
if err := purgeOldVersions(
212-
ctx, kvDB, tableDesc.GetID(), false, 2 /* minVersion */, leaseManager); err != nil {
211+
if err := leaseManager.purgeOldVersions(
212+
ctx, kvDB, tableDesc.GetID(), false, 2 /* minVersion */); err != nil {
213213
t.Fatal(err)
214214
}
215215

@@ -241,8 +241,8 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);
241241
if numLeases := getNumVersions(ts); numLeases != 2 {
242242
t.Fatalf("found %d versions instead of 2", numLeases)
243243
}
244-
if err := purgeOldVersions(
245-
context.Background(), kvDB, tableDesc.GetID(), false, 2 /* minVersion */, leaseManager); err != nil {
244+
if err := leaseManager.purgeOldVersions(
245+
context.Background(), kvDB, tableDesc.GetID(), false, 2 /* minVersion */); err != nil {
246246
t.Fatal(err)
247247
}
248248
if numLeases := getNumVersions(ts); numLeases != 1 {
@@ -347,7 +347,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);
347347

348348
// Purge old versions and make sure that the newest lease survives the
349349
// purge.
350-
if err := purgeOldVersions(ctx, kvDB, tableDesc.GetID(), false, 2 /* minVersion */, leaseManager); err != nil {
350+
if err := leaseManager.purgeOldVersions(ctx, kvDB, tableDesc.GetID(), false, 2 /* minVersion */); err != nil {
351351
t.Fatal(err)
352352
}
353353
if numLeases := getNumVersions(ts); numLeases != 1 {

0 commit comments

Comments
 (0)