Skip to content

Commit ab393bc

Browse files
authored
Merge pull request #151792 from fqazi/backport25.3-151202
release-25.3: catalog/lease: fix hang on range feed restart
2 parents 512e905 + 739c945 commit ab393bc

File tree

2 files changed

+39
-7
lines changed

2 files changed

+39
-7
lines changed

pkg/sql/catalog/lease/lease.go

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
4242
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
4343
kvstorage "github.com/cockroachdb/cockroach/pkg/storage"
44+
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
4445
"github.com/cockroachdb/cockroach/pkg/util/hlc"
4546
"github.com/cockroachdb/cockroach/pkg/util/log"
4647
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
@@ -1086,6 +1087,9 @@ type Manager struct {
10861087

10871088
// rangeFeed current range feed on system.descriptors.
10881089
rangeFeed *rangefeed.RangeFeed
1090+
1091+
// rangeFeedRestartInProgress tracks if a range feed restart is in progress.
1092+
rangeFeedRestartInProgress bool
10891093
}
10901094

10911095
// closeTimeStamp for the range feed, which is the timestamp
@@ -1712,6 +1716,19 @@ func (m *Manager) GetSafeReplicationTS() hlc.Timestamp {
17121716
return m.closeTimestamp.Load().(hlc.Timestamp)
17131717
}
17141718

1719+
// closeRangeFeed closes the currently open range feed, which will involve
1720+
// temporarily releasing the lease manager mutex.
1721+
func (m *Manager) closeRangeFeedLocked() {
1722+
// We cannot terminate the range feed while holding the lease manager
1723+
// lock, since there may be event handlers that need the lock that need to
1724+
// drain.
1725+
oldRangeFeed := m.mu.rangeFeed
1726+
m.mu.rangeFeed = nil
1727+
m.mu.Unlock() // nolint:deferunlockcheck
1728+
oldRangeFeed.Close()
1729+
m.mu.Lock() // nolint:deferunlockcheck
1730+
}
1731+
17151732
// watchForUpdates will watch a rangefeed on the system.descriptor table for
17161733
// updates.
17171734
func (m *Manager) watchForUpdates(ctx context.Context) {
@@ -1773,14 +1790,12 @@ func (m *Manager) watchForUpdates(ctx context.Context) {
17731790
m.closeTimestamp.Store(checkpoint.ResolvedTS)
17741791
}
17751792

1776-
// If we already started a range feed terminate it first
1793+
// Assert that the range feed is already terminated.
17771794
if m.mu.rangeFeed != nil {
1778-
m.mu.rangeFeed.Close()
1779-
m.mu.rangeFeed = nil
1780-
if m.testingKnobs.RangeFeedResetChannel != nil {
1781-
close(m.testingKnobs.RangeFeedResetChannel)
1782-
m.testingKnobs.RangeFeedResetChannel = nil
1795+
if buildutil.CrdbTestBuild {
1796+
panic(errors.AssertionFailedf("range feed was not closed before a restart attempt"))
17831797
}
1798+
log.Warningf(ctx, "range feed was not closed before a restart attempt")
17841799
}
17851800
// Ignore errors here because they indicate that the server is shutting down.
17861801
// Also note that the range feed automatically shuts down when the server
@@ -1937,12 +1952,28 @@ func (m *Manager) handleRangeFeedError(ctx context.Context) {
19371952
}
19381953

19391954
func (m *Manager) restartLeasingRangeFeedLocked(ctx context.Context) {
1955+
// If someone else is already starting a range feed then exit early.
1956+
if m.mu.rangeFeedRestartInProgress {
1957+
return
1958+
}
19401959
log.Warning(ctx, "attempting restart of leasing range feed")
1960+
// We will temporarily release the lock closing the range feed,
1961+
// in case things need to drain before termination. It is possible for
1962+
// another restart to enter once we release the lock.
1963+
m.mu.rangeFeedRestartInProgress = true
1964+
if m.mu.rangeFeed != nil {
1965+
m.closeRangeFeedLocked()
1966+
if m.testingKnobs.RangeFeedResetChannel != nil {
1967+
close(m.testingKnobs.RangeFeedResetChannel)
1968+
m.testingKnobs.RangeFeedResetChannel = nil
1969+
}
1970+
}
19411971
// Attempt a range feed restart if it has been down too long.
19421972
m.watchForUpdates(ctx)
19431973
// Track when the last restart occurred.
19441974
m.mu.rangeFeedIsUnavailableAt = timeutil.Now()
19451975
m.mu.rangeFeedCheckpoints = 0
1976+
m.mu.rangeFeedRestartInProgress = false
19461977
}
19471978

19481979
// cleanupExpiredSessionLeases expires session based leases marked for removal,

pkg/sql/catalog/lease/lease_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3187,7 +3187,8 @@ func TestAmbiguousResultIsRetried(t *testing.T) {
31873187
func TestLeaseDescriptorRangeFeedFailure(t *testing.T) {
31883188
defer leaktest.AfterTest(t)()
31893189
defer log.Scope(t).Close(t)
3190-
skip.UnderDuress(t)
3190+
// Too slow for execution under race.
3191+
skip.UnderRace(t)
31913192

31923193
settings := cluster.MakeTestingClusterSettings()
31933194
ctx := context.Background()

0 commit comments

Comments
 (0)