Skip to content

Commit f753ef0

Browse files
craig[bot]fqazi
andcommitted
Merge #153619
153619: catalog/lease: initial work for support timestamp locking leasing r=fqazi a=fqazi Previously, when descriptors were leased it was possible to lease different versions of dependent descriptors, since the range feed used by the lease manager would make them instantly available. This meant that using leased descriptors for crdb_internal / pg_catalog functions with on going schema changes was not safe. To address this, this patch lays the initial ground work for locking leased descriptors to timestamps that the lease manager is aware of as consistent (i.e. all updates are received). Informs: #153618 Release note: None Co-authored-by: Faizan Qazi <[email protected]>
2 parents 73d1df8 + c932fa2 commit f753ef0

17 files changed

+350
-59
lines changed

pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (f *dbTableDescFetcher) FetchTableDesc(
7474
) (catalog.TableDescriptor, error) {
7575
// Retrieve the target TableDescriptor from the lease manager. No caching
7676
// is attempted because the lease manager does its own caching.
77-
desc, err := f.leaseMgr.Acquire(ctx, ts, tableID)
77+
desc, err := f.leaseMgr.Acquire(ctx, lease.TimestampToReadTimestamp(ts), tableID)
7878
if err != nil {
7979
// Manager can return all kinds of errors during chaos, but based on
8080
// its usage, none of them should ever be terminal.

pkg/ccl/changefeedccl/schemafeed/schema_feed.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func (e TableEvent) Timestamp() hlc.Timestamp {
5858
// leaseAcquirer is an interface containing the methods on *lease.Manager used
5959
// by the schema feed.
6060
type leaseAcquirer interface {
61-
Acquire(ctx context.Context, timestamp hlc.Timestamp, id descpb.ID) (lease.LeasedDescriptor, error)
61+
Acquire(ctx context.Context, timestamp lease.ReadTimestamp, id descpb.ID) (lease.LeasedDescriptor, error)
6262
AcquireFreshestFromStore(ctx context.Context, id descpb.ID) error
6363
// TODO(yang): Investigate whether the codec can be stored in the schema feed itself.
6464
Codec() keys.SQLCodec
@@ -504,7 +504,7 @@ func (tf *schemaFeed) pauseOrResumePolling(ctx context.Context, atOrBefore hlc.T
504504

505505
if canPausePolling, err := tf.targets.EachTableIDWithBool(func(id descpb.ID) (bool, error) {
506506
// Check if target table is schema-locked at the current frontier.
507-
ld1, err := tf.leaseMgr.Acquire(ctx, frontier, id)
507+
ld1, err := tf.leaseMgr.Acquire(ctx, lease.TimestampToReadTimestamp(frontier), id)
508508
if err != nil {
509509
return false, err
510510
}
@@ -522,7 +522,7 @@ func (tf *schemaFeed) pauseOrResumePolling(ctx context.Context, atOrBefore hlc.T
522522
}
523523

524524
// Check if target table remains at the same version at atOrBefore.
525-
ld2, err := tf.leaseMgr.Acquire(ctx, atOrBefore, id)
525+
ld2, err := tf.leaseMgr.Acquire(ctx, lease.TimestampToReadTimestamp(atOrBefore), id)
526526
if err != nil {
527527
return false, err
528528
}

pkg/ccl/changefeedccl/schemafeed/schema_feed_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -323,14 +323,14 @@ type testLeaseAcquirer struct {
323323
}
324324

325325
func (t *testLeaseAcquirer) Acquire(
326-
ctx context.Context, timestamp hlc.Timestamp, id descpb.ID,
326+
ctx context.Context, timestamp lease.ReadTimestamp, id descpb.ID,
327327
) (lease.LeasedDescriptor, error) {
328328
if id != t.id {
329329
return nil, errors.Newf("unknown id: %d", id)
330330
}
331331

332-
i, ok := slices.BinarySearchFunc(t.descs, timestamp, func(desc *testLeasedDescriptor, timestamp hlc.Timestamp) int {
333-
return desc.timestamp.Compare(timestamp)
332+
i, ok := slices.BinarySearchFunc(t.descs, timestamp, func(desc *testLeasedDescriptor, timestamp lease.ReadTimestamp) int {
333+
return desc.timestamp.Compare(timestamp.GetTimestamp())
334334
})
335335
if ok {
336336
return t.descs[i], nil

pkg/crosscluster/logical/lww_kv_processor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ func (p *kvRowProcessor) getWriter(
384384
}
385385
}
386386

387-
l, err := p.cfg.LeaseManager.(*lease.Manager).Acquire(ctx, ts, id)
387+
l, err := p.cfg.LeaseManager.(*lease.Manager).Acquire(ctx, lease.TimestampToReadTimestamp(ts), id)
388388
if err != nil {
389389
return nil, err
390390
}

pkg/crosscluster/logical/tombstone_updater.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func (tu *tombstoneUpdater) getDeleter(ctx context.Context, txn *kv.Txn) (row.De
159159
tu.ReleaseLeases(ctx)
160160

161161
var err error
162-
tu.leased.descriptor, err = tu.leaseMgr.Acquire(ctx, timestamp, tu.descID)
162+
tu.leased.descriptor, err = tu.leaseMgr.Acquire(ctx, lease.TimestampToReadTimestamp(timestamp), tu.descID)
163163
if err != nil {
164164
return row.Deleter{}, err
165165
}

pkg/sql/catalog/descs/collection.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1373,7 +1373,7 @@ func (tc *Collection) GetIndexComment(
13731373
// MaybeSetReplicationSafeTS modifies a txn to apply the replication safe timestamp,
13741374
// if we are executing against a PCR reader catalog.
13751375
func (tc *Collection) MaybeSetReplicationSafeTS(ctx context.Context, txn *kv.Txn) error {
1376-
now := txn.DB().Clock().Now()
1376+
now := tc.leased.lm.GetReadTimestamp(txn.DB().Clock().Now())
13771377
desc, err := tc.leased.lm.Acquire(ctx, now, keys.SystemDatabaseID)
13781378
if err != nil {
13791379
return err

pkg/sql/catalog/descs/leased_descriptors.go

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@ import (
2323
type LeaseManager interface {
2424
AcquireByName(
2525
ctx context.Context,
26-
timestamp hlc.Timestamp,
26+
timestamp lease.ReadTimestamp,
2727
parentID descpb.ID,
2828
parentSchemaID descpb.ID,
2929
name string,
3030
) (lease.LeasedDescriptor, error)
3131

3232
Acquire(
33-
ctx context.Context, timestamp hlc.Timestamp, id descpb.ID,
33+
ctx context.Context, timestamp lease.ReadTimestamp, id descpb.ID,
3434
) (lease.LeasedDescriptor, error)
3535

3636
IncGaugeAfterLeaseDuration(
@@ -40,10 +40,13 @@ type LeaseManager interface {
4040
GetSafeReplicationTS() hlc.Timestamp
4141

4242
GetLeaseGeneration() int64
43+
44+
GetReadTimestamp(timestamp hlc.Timestamp) lease.ReadTimestamp
4345
}
4446

4547
type deadlineHolder interface {
4648
ReadTimestamp() hlc.Timestamp
49+
ReadTimestampFixed() bool
4750
UpdateDeadline(ctx context.Context, deadline hlc.Timestamp) error
4851
}
4952

@@ -59,6 +62,10 @@ func (m maxTimestampBoundDeadlineHolder) ReadTimestamp() hlc.Timestamp {
5962
return m.maxTimestampBound.Prev()
6063
}
6164

65+
func (m maxTimestampBoundDeadlineHolder) ReadTimestampFixed() bool {
66+
return true
67+
}
68+
6269
// UpdateDeadline implements the deadlineHolder interface.
6370
func (m maxTimestampBoundDeadlineHolder) UpdateDeadline(
6471
ctx context.Context, deadline hlc.Timestamp,
@@ -75,8 +82,10 @@ func makeLeasedDescriptors(lm LeaseManager) leasedDescriptors {
7582
// leasedDescriptors holds references to all the descriptors leased in the
7683
// transaction, and supports access by name and by ID.
7784
type leasedDescriptors struct {
78-
lm LeaseManager
79-
cache nstree.NameMap
85+
lm LeaseManager
86+
cache nstree.NameMap
87+
leaseTimestamp lease.ReadTimestamp
88+
leaseTimestampSet bool
8089
}
8190

8291
// mismatchedExternalDataRowTimestamp is generated when the external row data timestamps
@@ -170,6 +179,22 @@ func (ld *leasedDescriptors) maybeAssertExternalRowDataTS(desc catalog.Descripto
170179
})
171180
}
172181

182+
// maybeInitReadTimestamp selects a read timestamp for the lease manager.
183+
func (ld *leasedDescriptors) maybeInitReadTimestamp(txn deadlineHolder) {
184+
if ld.leaseTimestampSet {
185+
return
186+
}
187+
readTimestamp := txn.ReadTimestamp()
188+
ld.leaseTimestampSet = true
189+
// Fixed timestamp queries will use descriptors at the user select timestamp.
190+
if txn.ReadTimestampFixed() {
191+
ld.leaseTimestamp = lease.TimestampToReadTimestamp(readTimestamp)
192+
return
193+
}
194+
// Otherwise, get a safe read timestamp from the lease manager.
195+
ld.leaseTimestamp = ld.lm.GetReadTimestamp(readTimestamp)
196+
}
197+
173198
// getLeasedDescriptorByName return a leased descriptor valid for the
174199
// transaction, acquiring one if necessary. Due to a bug in lease acquisition
175200
// for dropped descriptors, the descriptor may have to be read from the store,
@@ -193,9 +218,8 @@ func (ld *leasedDescriptors) getByName(
193218
desc = cached.(lease.LeasedDescriptor).Underlying()
194219
return desc, false, nil
195220
}
196-
197-
readTimestamp := txn.ReadTimestamp()
198-
ldesc, err := ld.lm.AcquireByName(ctx, readTimestamp, parentID, parentSchemaID, name)
221+
ld.maybeInitReadTimestamp(txn)
222+
ldesc, err := ld.lm.AcquireByName(ctx, ld.leaseTimestamp, parentID, parentSchemaID, name)
199223
const setTxnDeadline = true
200224
return ld.getResult(ctx, txn, setTxnDeadline, ldesc, err)
201225
}
@@ -209,9 +233,8 @@ func (ld *leasedDescriptors) getByID(
209233
if cached := ld.getCachedByID(ctx, id); cached != nil {
210234
return cached, false, nil
211235
}
212-
213-
readTimestamp := txn.ReadTimestamp()
214-
desc, err := ld.lm.Acquire(ctx, readTimestamp, id)
236+
ld.maybeInitReadTimestamp(txn)
237+
desc, err := ld.lm.Acquire(ctx, ld.leaseTimestamp, id)
215238
const setTxnDeadline = false
216239
return ld.getResult(ctx, txn, setTxnDeadline, desc, err)
217240
}
@@ -363,6 +386,7 @@ func (ld *leasedDescriptors) releaseAll(ctx context.Context) {
363386
return nil
364387
})
365388
ld.cache.Clear()
389+
ld.leaseTimestampSet = false
366390
}
367391

368392
func (ld *leasedDescriptors) release(ctx context.Context, descs []lease.IDVersion) {
@@ -371,6 +395,9 @@ func (ld *leasedDescriptors) release(ctx context.Context, descs []lease.IDVersio
371395
removed.(lease.LeasedDescriptor).Release(ctx)
372396
}
373397
}
398+
if ld.cache.Len() == 0 {
399+
ld.leaseTimestampSet = false
400+
}
374401
}
375402

376403
func (ld *leasedDescriptors) numDescriptors() int {

pkg/sql/catalog/lease/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ go_library(
1010
"kv_writer.go",
1111
"lease.go",
1212
"lease_test_utils.go",
13+
"lease_timestamp.go",
1314
"name_cache.go",
1415
"storage.go",
1516
"testutils.go",

pkg/sql/catalog/lease/descriptor_state.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ type descriptorState struct {
8282
// It returns true if the descriptor returned is the known latest version
8383
// of the descriptor.
8484
func (t *descriptorState) findForTimestamp(
85-
ctx context.Context, timestamp hlc.Timestamp,
85+
ctx context.Context, timestamp ReadTimestamp,
8686
) (*descriptorVersionState, bool, error) {
8787
expensiveLogEnabled := log.ExpensiveLogEnabled(ctx, 2)
8888
t.mu.Lock()
@@ -95,10 +95,12 @@ func (t *descriptorState) findForTimestamp(
9595

9696
// Walk back the versions to find one that is valid for the timestamp.
9797
for i := len(t.mu.active.data) - 1; i >= 0; i-- {
98-
// Check to see if the ModificationTime is valid.
99-
if desc := t.mu.active.data[i]; desc.GetModificationTime().LessEq(timestamp) {
98+
// Check to see if the ModificationTime is valid. If only the initial version
99+
// of the descriptor is known, then read it at the base timestamp.
100+
if desc := t.mu.active.data[i]; desc.GetModificationTime().LessEq(timestamp.GetTimestamp()) ||
101+
(len(t.mu.active.data) == 1 && desc.GetModificationTime().LessEq(timestamp.GetBaseTimestamp())) {
100102
latest := i+1 == len(t.mu.active.data)
101-
if !desc.hasExpired(ctx, timestamp) {
103+
if !desc.hasExpired(ctx, timestamp.GetBaseTimestamp()) {
102104
// Existing valid descriptor version.
103105
desc.incRefCount(ctx, expensiveLogEnabled)
104106
return desc, latest, nil

pkg/sql/catalog/lease/lease.go

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,12 @@ var WaitForInitialVersion = settings.RegisterBoolSetting(settings.ApplicationLev
102102
"enables waiting for the initial version of a descriptor",
103103
true)
104104

105+
var LockedLeaseTimestamp = settings.RegisterBoolSetting(settings.ApplicationLevel,
106+
"sql.catalog.descriptor_lease.use_locked_timestamps.enabled",
107+
"guarantees transactional version consistency for descriptors used by the lease manager,"+
108+
"descriptors used can be intentionally older to support this",
109+
false)
110+
105111
// WaitForNoVersion returns once there are no unexpired leases left
106112
// for any version of the descriptor.
107113
func (m *Manager) WaitForNoVersion(
@@ -1265,7 +1271,7 @@ func (m *Manager) purgeOldVersions(
12651271
// Acquire a refcount on the descriptor on the latest version to maintain an
12661272
// active lease, so that it doesn't get released when removeInactives()
12671273
// is called below. Release this lease after calling removeInactives().
1268-
desc, _, err = t.findForTimestamp(ctx, m.storage.clock.Now())
1274+
desc, _, err = t.findForTimestamp(ctx, TimestampToReadTimestamp(m.storage.clock.Now()))
12691275
if err == nil || !errors.Is(err, errRenewLease) {
12701276
break
12711277
}
@@ -1600,7 +1606,7 @@ func (m *Manager) SetRegionPrefix(val []byte) {
16001606
// id and fails because the id has been dropped by the TRUNCATE.
16011607
func (m *Manager) AcquireByName(
16021608
ctx context.Context,
1603-
timestamp hlc.Timestamp,
1609+
timestamp ReadTimestamp,
16041610
parentID descpb.ID,
16051611
parentSchemaID descpb.ID,
16061612
name string,
@@ -1622,9 +1628,9 @@ func (m *Manager) AcquireByName(
16221628
return desc, nil
16231629
}
16241630
// Check if we have cached an ID for this name.
1625-
descVersion, _ := m.names.get(ctx, parentID, parentSchemaID, name, timestamp)
1631+
descVersion, _ := m.names.get(ctx, parentID, parentSchemaID, name, timestamp.GetTimestamp())
16261632
if descVersion != nil {
1627-
if descVersion.GetModificationTime().LessEq(timestamp) {
1633+
if descVersion.GetModificationTime().LessEq(timestamp.GetTimestamp()) {
16281634
return validateDescriptorForReturn(descVersion)
16291635
}
16301636
// m.names.get() incremented the refcount, we decrement it to get a new
@@ -1643,7 +1649,7 @@ func (m *Manager) AcquireByName(
16431649
// lease with at least a bit of lifetime left in it. So, we do it the hard
16441650
// way: look in the database to resolve the name, then acquire a new lease.
16451651
var err error
1646-
id, err := m.resolveName(ctx, timestamp, parentID, parentSchemaID, name)
1652+
id, err := m.resolveName(ctx, timestamp.GetTimestamp(), parentID, parentSchemaID, name)
16471653
if err != nil {
16481654
return nil, err
16491655
}
@@ -1777,7 +1783,7 @@ type LeasedDescriptor interface {
17771783
// can only return an older version of a descriptor if the latest version
17781784
// can be leased; as it stands a dropped descriptor cannot be leased.
17791785
func (m *Manager) Acquire(
1780-
ctx context.Context, timestamp hlc.Timestamp, id descpb.ID,
1786+
ctx context.Context, timestamp ReadTimestamp, id descpb.ID,
17811787
) (LeasedDescriptor, error) {
17821788
for {
17831789
if m.IsDraining() {
@@ -1802,7 +1808,7 @@ func (m *Manager) Acquire(
18021808

18031809
case errors.Is(err, errReadOlderVersion):
18041810
// Read old versions from the store. This can block while reading.
1805-
versions, errRead := m.readOlderVersionForTimestamp(ctx, id, timestamp)
1811+
versions, errRead := m.readOlderVersionForTimestamp(ctx, id, timestamp.GetTimestamp())
18061812
if errRead != nil {
18071813
return nil, errRead
18081814
}
@@ -2135,6 +2141,9 @@ func (m *Manager) watchForUpdates(ctx context.Context) {
21352141
}
21362142

21372143
handleCheckpoint := func(ctx context.Context, checkpoint *kvpb.RangeFeedCheckpoint) {
2144+
if m.testingKnobs.TestingOnRangeFeedCheckPoint != nil {
2145+
m.testingKnobs.TestingOnRangeFeedCheckPoint()
2146+
}
21382147
// Track checkpoints that occur from the rangefeed to make sure progress
21392148
// is always made.
21402149
m.mu.Lock()
@@ -2908,6 +2917,23 @@ func (m *Manager) deleteOrphanedLeasesWithSameInstanceID(
29082917
instanceID, releasedCount.Load(), totalLeases)
29092918
}
29102919

2920+
// GetReadTimestamp returns a locked timestamp to use for lease management.
2921+
func (m *Manager) GetReadTimestamp(timestamp hlc.Timestamp) ReadTimestamp {
2922+
if LockedLeaseTimestamp.Get(&m.settings.SV) {
2923+
replicationTS := m.GetSafeReplicationTS()
2924+
if !replicationTS.IsEmpty() && replicationTS.Less(timestamp) {
2925+
return LeaseTimestamp{
2926+
ReadTimestamp: timestamp,
2927+
LeaseTimestamp: replicationTS,
2928+
}
2929+
}
2930+
}
2931+
// Fallback to existing behavior with timestamps.
2932+
return LeaseTimestamp{
2933+
ReadTimestamp: timestamp,
2934+
}
2935+
}
2936+
29112937
// TestingGetBoundAccount returns the bound account used by the lease manager.
29122938
func (m *Manager) TestingGetBoundAccount() *mon.ConcurrentBoundAccount {
29132939
return m.boundAccount

0 commit comments

Comments
 (0)