Skip to content

Commit 3082fc2

Browse files
craig[bot]bghal
andcommitted
Merge #153040
153040: sql: unblock user create from long-running transactions r=bghal a=bghal The changes in #150747 to unblock user create were partially reverted in #152297 due to it introducing a race condition that caused tests to flake. This change restores that behavior and accounts for the race by updating the lease manager before acquiring a lease on tables used for user access management. Epic: CRDB-49398 Fixes: #138691 Release note: None Co-authored-by: Brendan Gerrity <[email protected]>
2 parents e4db2dd + bb57ab8 commit 3082fc2

7 files changed

+133
-109
lines changed

pkg/sql/catalog/lease/lease.go

Lines changed: 88 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,7 +1002,7 @@ func (m *Manager) AcquireFreshestFromStore(ctx context.Context, id descpb.ID) er
10021002
attemptsMade := 0
10031003
for {
10041004
// Acquire a fresh lease.
1005-
didAcquire, err := acquireNodeLease(ctx, m, id, AcquireFreshestBlock)
1005+
didAcquire, err := m.acquireNodeLease(ctx, id, AcquireFreshestBlock)
10061006
if err != nil {
10071007
return err
10081008
}
@@ -1024,8 +1024,8 @@ func (m *Manager) AcquireFreshestFromStore(ctx context.Context, id descpb.ID) er
10241024
// being dropped or offline, the error will be of type inactiveTableError.
10251025
// The boolean returned is true if this call was actually responsible for the
10261026
// lease acquisition.
1027-
func acquireNodeLease(
1028-
ctx context.Context, m *Manager, id descpb.ID, typ AcquireType,
1027+
func (m *Manager) acquireNodeLease(
1028+
ctx context.Context, id descpb.ID, typ AcquireType,
10291029
) (bool, error) {
10301030
start := timeutil.Now()
10311031
log.VEventf(ctx, 2, "acquiring lease for descriptor %d...", id)
@@ -1037,7 +1037,7 @@ func acquireNodeLease(
10371037
},
10381038
func(ctx context.Context) (interface{}, error) {
10391039
if m.IsDraining() {
1040-
return nil, errLeaseManagerIsDraining
1040+
return false, errLeaseManagerIsDraining
10411041
}
10421042
newest := m.findNewest(id)
10431043
var currentVersion descpb.DescriptorVersion
@@ -1051,26 +1051,75 @@ func acquireNodeLease(
10511051
if err != nil {
10521052
return false, errors.Wrapf(err, "lease acquisition was unable to resolve liveness session")
10531053
}
1054-
desc, regionPrefix, err := m.storage.acquire(ctx, session, id, currentVersion, currentSessionID)
1055-
if err != nil {
1056-
return nil, err
1057-
}
1058-
// If a nil descriptor is returned, then the latest version has already
1059-
// been leased. So, nothing needs to be done here.
1060-
if desc == nil {
1061-
return true, nil
1054+
1055+
doAcquisition := func() (catalog.Descriptor, error) {
1056+
desc, _, err := m.storage.acquire(ctx, session, id, currentVersion, currentSessionID)
1057+
if err != nil {
1058+
return nil, err
1059+
}
1060+
1061+
return desc, nil
10621062
}
1063-
t := m.findDescriptorState(id, false /* create */)
1064-
if t == nil {
1065-
return nil, errors.AssertionFailedf("could not find descriptor state for id %d", id)
1063+
1064+
doUpsertion := func(desc catalog.Descriptor) error {
1065+
t := m.findDescriptorState(id, false /* create */)
1066+
if t == nil {
1067+
return errors.AssertionFailedf("could not find descriptor state for id %d", id)
1068+
}
1069+
t.mu.Lock()
1070+
t.mu.takenOffline = false
1071+
defer t.mu.Unlock()
1072+
err = t.upsertLeaseLocked(ctx, desc, session, m.storage.getRegionPrefix())
1073+
if err != nil {
1074+
return err
1075+
}
1076+
1077+
return nil
10661078
}
1067-
t.mu.Lock()
1068-
t.mu.takenOffline = false
1069-
defer t.mu.Unlock()
1070-
err = t.upsertLeaseLocked(ctx, desc, session, regionPrefix)
1071-
if err != nil {
1072-
return nil, err
1079+
1080+
// These tables are special and can have their versions bumped
1081+
// without blocking on other nodes converging to that version.
1082+
if newest != nil && (id == keys.UsersTableID ||
1083+
id == keys.RoleMembersTableID ||
1084+
id == keys.RoleOptionsTableID ||
1085+
m.isMaybeSystemPrivilegesTable(ctx, id)) {
1086+
1087+
// The two-version invariant allows an update in lease manager
1088+
// without (immediately) acquiring a new lease. This prevents
1089+
// a race on where the lease is acquired but the manager isn't
1090+
// yet updated.
1091+
desc, err := m.maybeGetDescriptorWithoutValidation(ctx, id, true /* existenceRequired */)
1092+
if err != nil {
1093+
return false, err
1094+
}
1095+
err = doUpsertion(desc)
1096+
if err != nil {
1097+
return false, err
1098+
}
1099+
1100+
desc, err = doAcquisition()
1101+
if err != nil {
1102+
return false, err
1103+
}
1104+
if desc == nil {
1105+
return true, nil
1106+
}
1107+
} else {
1108+
desc, err := doAcquisition()
1109+
if err != nil {
1110+
return false, err
1111+
}
1112+
// If a nil descriptor is returned, then the latest version has already
1113+
// been leased. So, nothing needs to be done here.
1114+
if desc == nil {
1115+
return true, nil
1116+
}
1117+
err = doUpsertion(desc)
1118+
if err != nil {
1119+
return false, err
1120+
}
10731121
}
1122+
10741123
return true, nil
10751124
})
10761125
if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil {
@@ -1084,6 +1133,22 @@ func acquireNodeLease(
10841133
return didAcquire, nil
10851134
}
10861135

1136+
var systemPrivilegesTableDescID atomic.Uint32
1137+
1138+
// isMaybeSystemPrivilegesTable tries to determine if the given descriptor is
1139+
// for the system privileges table. It depends on the namecache to resolve the
1140+
// table descriptor after which it's memoized.
1141+
func (m *Manager) isMaybeSystemPrivilegesTable(ctx context.Context, id descpb.ID) bool {
1142+
if systemPrivilegesTableDescID.Load() == uint32(descpb.InvalidID) {
1143+
if privilegesDesc, _ := m.names.get(ctx, keys.SystemDatabaseID, keys.SystemPublicSchemaID, "privileges", hlc.Timestamp{}); privilegesDesc != nil {
1144+
systemPrivilegesTableDescID.CompareAndSwap(uint32(descpb.InvalidID), uint32(privilegesDesc.GetID()))
1145+
privilegesDesc.Release(ctx)
1146+
}
1147+
}
1148+
1149+
return uint32(id) == systemPrivilegesTableDescID.Load()
1150+
}
1151+
10871152
// releaseLease deletes an entry from system.lease.
10881153
func releaseLease(ctx context.Context, lease *storedLease, m *Manager) (released bool) {
10891154
// Force the release to happen synchronously, if we are draining or, when we
@@ -1729,7 +1794,7 @@ func (m *Manager) Acquire(
17291794
t.markAcquisitionStart(ctx)
17301795
defer t.markAcquisitionDone(ctx)
17311796
// Renew lease and retry. This will block until the lease is acquired.
1732-
_, errLease := acquireNodeLease(ctx, m, id, AcquireBlock)
1797+
_, errLease := m.acquireNodeLease(ctx, id, AcquireBlock)
17331798
return errLease
17341799
}(); err != nil {
17351800
return nil, err
@@ -2371,7 +2436,7 @@ func (m *Manager) refreshSomeLeases(ctx context.Context, refreshAndPurgeAllDescr
23712436
return
23722437
}
23732438
}
2374-
if _, err := acquireNodeLease(ctx, m, id, AcquireBackground); err != nil {
2439+
if _, err := m.acquireNodeLease(ctx, id, AcquireBackground); err != nil {
23752440
log.Dev.Errorf(ctx, "refreshing descriptor: %d lease failed: %s", id, err)
23762441

23772442
if errors.Is(err, catalog.ErrDescriptorNotFound) || errors.Is(err, catalog.ErrDescriptorDropped) {

pkg/sql/catalog/lease/lease_internal_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -571,8 +571,8 @@ CREATE TABLE t.%s (k CHAR PRIMARY KEY, v CHAR);
571571
}
572572
expiration := lease.Expiration(context.Background())
573573
// Acquire another lease.
574-
if _, err := acquireNodeLease(
575-
context.Background(), leaseManager, tableDesc.GetID(), AcquireBlock,
574+
if _, err := leaseManager.acquireNodeLease(
575+
context.Background(), tableDesc.GetID(), AcquireBlock,
576576
); err != nil {
577577
t.Fatal(err)
578578
}

pkg/sql/catalog/lease/lease_test.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -557,16 +557,7 @@ func TestWaitForNewVersion(testingT *testing.T) {
557557
defer leaktest.AfterTest(testingT)()
558558
defer log.Scope(testingT).Close(testingT)
559559

560-
skip.WithIssue(testingT, 152051)
561-
562560
var params base.TestClusterArgs
563-
params.ServerArgs.Knobs = base.TestingKnobs{
564-
SQLLeaseManager: &lease.ManagerTestingKnobs{
565-
LeaseStoreTestingKnobs: lease.StorageTestingKnobs{
566-
LeaseAcquiredEvent: nil, // TODO
567-
},
568-
},
569-
}
570561
t := newLeaseTest(testingT, params)
571562
defer t.cleanup()
572563

@@ -589,7 +580,10 @@ func TestWaitForNewVersion(testingT *testing.T) {
589580
defer cancel()
590581

591582
_, err := leaseMgr.WaitForNewVersion(timeoutCtx, descID, nil, retry.Options{})
592-
require.ErrorIs(t, err, context.DeadlineExceeded)
583+
if !(sqltestutils.IsClientSideQueryCanceledErr(err) ||
584+
errors.Is(err, context.DeadlineExceeded)) {
585+
t.Fatalf("The client or the context should have timed out. Unexpected error: %v", err)
586+
}
593587
}
594588

595589
t.mustAcquire(3, descID)

pkg/sql/conn_executor_jobs.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,7 @@ func (ex *connExecutor) waitForNewVersionPropagation(
8484
continue
8585
}
8686

87-
// TODO(sql-foundations): Change this back to WaitForNewVersion once we
88-
// address the flaky behavior around privilege and role membership caching.
89-
if _, err := ex.planner.LeaseMgr().WaitForOneVersion(ex.Ctx(), idVersion.ID, cachedRegions,
87+
if _, err := ex.planner.LeaseMgr().WaitForNewVersion(ex.Ctx(), idVersion.ID, cachedRegions,
9088
retry.Options{
9189
InitialBackoff: time.Millisecond,
9290
MaxBackoff: time.Second,

pkg/sql/grant_revoke_system.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,11 @@ VALUES ($1, $2, $3, $4, (
128128
return err
129129
}
130130
userPrivs, found := syntheticPrivDesc.FindUser(user)
131+
emptyPrivs := !found || userPrivs.Privileges == 0
131132

132133
// For Public role and virtual tables, leave an empty
133134
// row to indicate that SELECT has been revoked.
134-
if !found && (n.grantOn == privilege.VirtualTable && user == username.PublicRoleName()) {
135+
if emptyPrivs && (n.grantOn == privilege.VirtualTable && user == username.PublicRoleName()) {
135136
_, err := params.p.InternalSQLTxn().ExecEx(
136137
params.ctx,
137138
`insert-system-privilege`,
@@ -151,7 +152,7 @@ VALUES ($1, $2, $3, $4, (
151152

152153
// If there are no entries remaining on the PrivilegeDescriptor for the user
153154
// we can remove the entire row for the user.
154-
if !found {
155+
if emptyPrivs {
155156
_, err := params.p.InternalSQLTxn().ExecEx(
156157
params.ctx,
157158
`delete-system-privilege`,

0 commit comments

Comments
 (0)