Skip to content

Commit 33254c8

Browse files
craig[bot]fqazimichae2
committed
145462: catalog/lease: purge old versions on range feed recovery r=fqazi a=fqazi catalog/lease: purge old versions on range feed recovery Previously, the lease manager range feed recovery logic would pick up newer versions of descriptors after a range feed failure. However, it would not purge old versions, which could lead to old versions still being in use. To address this, this patch always purges old versions when refreshing during a range feed failure. Fixes: #145422 Release note (bug fix): Addresses a bug that can lead to hung schema changes after recovery from availability issues. This patch also fixes TestLeaseDescriptorRangeFeedFailure which did not work correctly when the range feed recovery logic was added. 145494: sql: fix telemetry for CREATE STATISTICS r=DrewKimball,msbutler a=michae2 My change in #144316 accidentally coupled telemetry for CREATE STATISTICS with cluster setting `sql.stats.automatic_job_check_before_creating_job`. Michael even pointed this out, but I failed to understand! (Sorry Michael!) Decouple telemetry from the cluster setting, so that telemetry behavior matches how it was before #144316. Epic: None Release note: None Co-authored-by: Faizan Qazi <[email protected]> Co-authored-by: Michael Erickson <[email protected]>
3 parents 93518dd + 7e7c4fe + 57844ae commit 33254c8

File tree

4 files changed

+46
-32
lines changed

4 files changed

+46
-32
lines changed

pkg/sql/catalog/lease/helpers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ func (m *Manager) Publish(
290290
}
291291

292292
func (m *Manager) TestingRefreshSomeLeases(ctx context.Context) {
293-
m.refreshSomeLeases(ctx, false /*refreshAll*/)
293+
m.refreshSomeLeases(ctx, false /*refreshAndPurgeAllDescriptors*/)
294294
}
295295

296296
func (m *Manager) TestingDescriptorStateIsNil(id descpb.ID) bool {

pkg/sql/catalog/lease/lease.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1886,7 +1886,7 @@ func (m *Manager) RunBackgroundLeasingTask(ctx context.Context) {
18861886
// If the range feed recovers after a failure, re-read all
18871887
// descriptors.
18881888
if refreshAllDescriptors {
1889-
m.refreshSomeLeases(ctx, true /*refreshAll*/)
1889+
m.refreshSomeLeases(ctx, true /*refreshAndPurgeAllDescriptors*/)
18901890
}
18911891
}
18921892
rangeFeedProgressWatchDogTimeout,
@@ -1895,7 +1895,7 @@ func (m *Manager) RunBackgroundLeasingTask(ctx context.Context) {
18951895
case err := <-m.rangefeedErrCh:
18961896
log.Warningf(ctx, "lease rangefeed failed with error: %s", err.Error())
18971897
m.handleRangeFeedError(ctx)
1898-
m.refreshSomeLeases(ctx, true /*refreshAll*/)
1898+
m.refreshSomeLeases(ctx, true /*refreshAndPurgeAllDescriptors*/)
18991899
case <-refreshTimer.C:
19001900
refreshTimer.Reset(getRefreshTimerDuration() / 2)
19011901

@@ -1999,8 +1999,9 @@ func (m *Manager) cleanupExpiredSessionLeases(ctx context.Context) {
19991999
}
20002000
}
20012001

2002-
// Refresh some of the current leases.
2003-
func (m *Manager) refreshSomeLeases(ctx context.Context, includeAll bool) {
2002+
// Refresh some of the current leases. If refreshAndPurgeAllDescriptors is set,
2003+
// then all descriptors are refreshed, and old versions are purged.
2004+
func (m *Manager) refreshSomeLeases(ctx context.Context, refreshAndPurgeAllDescriptors bool) {
20042005
limit := leaseRefreshLimit.Get(&m.storage.settings.SV)
20052006
if limit <= 0 {
20062007
return
@@ -2013,7 +2014,7 @@ func (m *Manager) refreshSomeLeases(ctx context.Context, includeAll bool) {
20132014
ids := make([]descpb.ID, 0, len(m.mu.descriptors))
20142015
var i int64
20152016
for k, desc := range m.mu.descriptors {
2016-
if i++; i > limit && !includeAll {
2017+
if i++; i > limit && !refreshAndPurgeAllDescriptors {
20172018
break
20182019
}
20192020
takenOffline := func() bool {
@@ -2048,7 +2049,6 @@ func (m *Manager) refreshSomeLeases(ctx context.Context, includeAll bool) {
20482049
return
20492050
}
20502051
}
2051-
20522052
if _, err := acquireNodeLease(ctx, m, id, AcquireBackground); err != nil {
20532053
log.Errorf(ctx, "refreshing descriptor: %d lease failed: %s", id, err)
20542054

@@ -2057,7 +2057,7 @@ func (m *Manager) refreshSomeLeases(ctx context.Context, includeAll bool) {
20572057
if err := purgeOldVersions(
20582058
ctx, m.storage.db.KV(), id, true /* dropped */, 0 /* minVersion */, m,
20592059
); err != nil {
2060-
log.Warningf(ctx, "error purging leases for descriptor %d: %s",
2060+
log.Warningf(ctx, "error purging leases for descriptor %d: %v",
20612061
id, err)
20622062
}
20632063
func() {
@@ -2067,6 +2067,15 @@ func (m *Manager) refreshSomeLeases(ctx context.Context, includeAll bool) {
20672067
}()
20682068
}
20692069
}
2070+
if refreshAndPurgeAllDescriptors {
2071+
// If we are refreshing all descriptors, then we want to purge older versions as
2072+
// we are doing this operation.
2073+
err := purgeOldVersions(ctx, m.storage.db.KV(), id, false /* dropped */, 0 /* minVersion */, m)
2074+
if err != nil {
2075+
log.Warningf(ctx, "error purging leases for descriptor %d: %v",
2076+
id, err)
2077+
}
2078+
}
20702079
}); err != nil {
20712080
log.Infof(ctx, "didnt refresh descriptor: %d lease: %s", id, err)
20722081
wg.Done()

pkg/sql/catalog/lease/lease_test.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3200,30 +3200,31 @@ func TestLeaseDescriptorRangeFeedFailure(t *testing.T) {
32003200
var srv serverutils.TestClusterInterface
32013201
var enableAfterStageKnob atomic.Bool
32023202
var rangeFeedResetChan chan struct{}
3203+
grp := ctxgroup.WithContext(ctx)
32033204
srv = serverutils.StartCluster(t, 3, base.TestClusterArgs{
32043205
ServerArgs: base.TestServerArgs{
32053206
Settings: settings,
32063207
Knobs: base.TestingKnobs{
32073208
SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{
32083209
BeforeStage: func(p scplan.Plan, stageIdx int) error {
3209-
// Once this stage completes, we can "resume" the range feed,
3210-
// so the update is detected.
3211-
if p.Params.ExecutionPhase == scop.PostCommitPhase &&
3212-
enableAfterStageKnob.Load() &&
3213-
strings.Contains(p.Statements[0].Statement, "ALTER TABLE t1 ADD COLUMN j INT DEFAULT 64") {
3214-
rangeFeedResetChan = srv.ApplicationLayer(1).LeaseManager().(*lease.Manager).TestingSetDisableRangeFeedCheckpointFn(true)
3210+
// Skip if the knob is disabled
3211+
if !enableAfterStageKnob.Load() ||
3212+
!strings.Contains(p.Statements[0].Statement, "ALTER TABLE t1 ADD COLUMN j INT8 DEFAULT 64") ||
3213+
p.Params.ExecutionPhase != scop.PostCommitPhase {
3214+
return nil
32153215
}
3216-
return nil
3217-
},
3218-
AfterStage: func(p scplan.Plan, stageIdx int) error {
32193216
// Once this stage completes, we can "resume" the range feed,
32203217
// so the update is detected.
3221-
if p.Params.ExecutionPhase == scop.PostCommitPhase &&
3222-
enableAfterStageKnob.Load() &&
3223-
strings.Contains(p.Statements[0].Statement, "ALTER TABLE t1 ADD COLUMN j INT DEFAULT 64") {
3224-
<-rangeFeedResetChan
3225-
srv.ApplicationLayer(1).LeaseManager().(*lease.Manager).TestingSetDisableRangeFeedCheckpointFn(false)
3218+
if stageIdx == 0 {
3219+
rangeFeedResetChan = srv.ApplicationLayer(1).LeaseManager().(*lease.Manager).TestingSetDisableRangeFeedCheckpointFn(true)
32263220
enableAfterStageKnob.Swap(false)
3221+
grp.Go(func() error {
3222+
// Once this channel is closed we know for certain the range feed has
3223+
// recovered. Allow updates so that descriptors are refreshed and purged.
3224+
<-rangeFeedResetChan
3225+
srv.ApplicationLayer(1).LeaseManager().(*lease.Manager).TestingSetDisableRangeFeedCheckpointFn(false)
3226+
return nil
3227+
})
32273228
}
32283229
return nil
32293230
},
@@ -3260,6 +3261,8 @@ func TestLeaseDescriptorRangeFeedFailure(t *testing.T) {
32603261
t.Fatal("no error encountered")
32613262
}
32623263
}
3264+
require.NoError(t, grp.Wait())
3265+
require.Falsef(t, enableAfterStageKnob.Load(), "testing knob was not hit")
32633266
}
32643267

32653268
// TestLeaseTableWriteFailure is used to ensure that sqlliveness heart-beating

pkg/sql/create_stats.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -156,16 +156,18 @@ func (n *createStatsNode) runJob(ctx context.Context) error {
156156
details := record.Details.(jobspb.CreateStatsDetails)
157157

158158
jobCheckBefore := automaticJobCheckBeforeCreatingJob.Get(n.p.ExecCfg().SV())
159-
if (n.Name == jobspb.AutoStatsName || n.Name == jobspb.AutoPartialStatsName) && jobCheckBefore {
160-
// Don't start the job if there is already a CREATE STATISTICS job running.
161-
// (To handle race conditions we check this again after the job starts,
162-
// but this check is used to prevent creating a large number of jobs that
163-
// immediately fail).
164-
if err := checkRunningJobs(
165-
ctx, nil /* job */, n.p, n.Name == jobspb.AutoPartialStatsName, n.p.ExecCfg().JobRegistry,
166-
details.Table.ID,
167-
); err != nil {
168-
return err
159+
if n.Name == jobspb.AutoStatsName || n.Name == jobspb.AutoPartialStatsName {
160+
if jobCheckBefore {
161+
// Don't start the job if there is already a CREATE STATISTICS job running.
162+
// (To handle race conditions we check this again after the job starts,
163+
// but this check is used to prevent creating a large number of jobs that
164+
// immediately fail).
165+
if err := checkRunningJobs(
166+
ctx, nil /* job */, n.p, n.Name == jobspb.AutoPartialStatsName, n.p.ExecCfg().JobRegistry,
167+
details.Table.ID,
168+
); err != nil {
169+
return err
170+
}
169171
}
170172
} else {
171173
telemetry.Inc(sqltelemetry.CreateStatisticsUseCounter)

0 commit comments

Comments
 (0)