Skip to content

Commit bb57ab8

Browse files
committed
sql: unblock user create from long-running transactions
The changes in #150747 to unblock user create were partially reverted in #152297 due to it introducing a race condition that caused tests to flake. This change restores that behavior and accounts for the race by updating the lease manager before acquiring a lease on tables used for user access management. Epic: CRDB-49398 Fixes: #138691 Release note: None
1 parent b39eb11 commit bb57ab8

File tree

5 files changed

+132
-101
lines changed

5 files changed

+132
-101
lines changed

pkg/sql/catalog/lease/lease.go

Lines changed: 88 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,7 +1002,7 @@ func (m *Manager) AcquireFreshestFromStore(ctx context.Context, id descpb.ID) er
10021002
attemptsMade := 0
10031003
for {
10041004
// Acquire a fresh lease.
1005-
didAcquire, err := acquireNodeLease(ctx, m, id, AcquireFreshestBlock)
1005+
didAcquire, err := m.acquireNodeLease(ctx, id, AcquireFreshestBlock)
10061006
if err != nil {
10071007
return err
10081008
}
@@ -1024,8 +1024,8 @@ func (m *Manager) AcquireFreshestFromStore(ctx context.Context, id descpb.ID) er
10241024
// being dropped or offline, the error will be of type inactiveTableError.
10251025
// The boolean returned is true if this call was actually responsible for the
10261026
// lease acquisition.
1027-
func acquireNodeLease(
1028-
ctx context.Context, m *Manager, id descpb.ID, typ AcquireType,
1027+
func (m *Manager) acquireNodeLease(
1028+
ctx context.Context, id descpb.ID, typ AcquireType,
10291029
) (bool, error) {
10301030
start := timeutil.Now()
10311031
log.VEventf(ctx, 2, "acquiring lease for descriptor %d...", id)
@@ -1037,7 +1037,7 @@ func acquireNodeLease(
10371037
},
10381038
func(ctx context.Context) (interface{}, error) {
10391039
if m.IsDraining() {
1040-
return nil, errLeaseManagerIsDraining
1040+
return false, errLeaseManagerIsDraining
10411041
}
10421042
newest := m.findNewest(id)
10431043
var currentVersion descpb.DescriptorVersion
@@ -1051,26 +1051,75 @@ func acquireNodeLease(
10511051
if err != nil {
10521052
return false, errors.Wrapf(err, "lease acquisition was unable to resolve liveness session")
10531053
}
1054-
desc, regionPrefix, err := m.storage.acquire(ctx, session, id, currentVersion, currentSessionID)
1055-
if err != nil {
1056-
return nil, err
1057-
}
1058-
// If a nil descriptor is returned, then the latest version has already
1059-
// been leased. So, nothing needs to be done here.
1060-
if desc == nil {
1061-
return true, nil
1054+
1055+
doAcquisition := func() (catalog.Descriptor, error) {
1056+
desc, _, err := m.storage.acquire(ctx, session, id, currentVersion, currentSessionID)
1057+
if err != nil {
1058+
return nil, err
1059+
}
1060+
1061+
return desc, nil
10621062
}
1063-
t := m.findDescriptorState(id, false /* create */)
1064-
if t == nil {
1065-
return nil, errors.AssertionFailedf("could not find descriptor state for id %d", id)
1063+
1064+
doUpsertion := func(desc catalog.Descriptor) error {
1065+
t := m.findDescriptorState(id, false /* create */)
1066+
if t == nil {
1067+
return errors.AssertionFailedf("could not find descriptor state for id %d", id)
1068+
}
1069+
t.mu.Lock()
1070+
t.mu.takenOffline = false
1071+
defer t.mu.Unlock()
1072+
err = t.upsertLeaseLocked(ctx, desc, session, m.storage.getRegionPrefix())
1073+
if err != nil {
1074+
return err
1075+
}
1076+
1077+
return nil
10661078
}
1067-
t.mu.Lock()
1068-
t.mu.takenOffline = false
1069-
defer t.mu.Unlock()
1070-
err = t.upsertLeaseLocked(ctx, desc, session, regionPrefix)
1071-
if err != nil {
1072-
return nil, err
1079+
1080+
// These tables are special and can have their versions bumped
1081+
// without blocking on other nodes converging to that version.
1082+
if newest != nil && (id == keys.UsersTableID ||
1083+
id == keys.RoleMembersTableID ||
1084+
id == keys.RoleOptionsTableID ||
1085+
m.isMaybeSystemPrivilegesTable(ctx, id)) {
1086+
1087+
// The two-version invariant allows an update in lease manager
1088+
// without (immediately) acquiring a new lease. This prevents
1089+
// a race on where the lease is acquired but the manager isn't
1090+
// yet updated.
1091+
desc, err := m.maybeGetDescriptorWithoutValidation(ctx, id, true /* existenceRequired */)
1092+
if err != nil {
1093+
return false, err
1094+
}
1095+
err = doUpsertion(desc)
1096+
if err != nil {
1097+
return false, err
1098+
}
1099+
1100+
desc, err = doAcquisition()
1101+
if err != nil {
1102+
return false, err
1103+
}
1104+
if desc == nil {
1105+
return true, nil
1106+
}
1107+
} else {
1108+
desc, err := doAcquisition()
1109+
if err != nil {
1110+
return false, err
1111+
}
1112+
// If a nil descriptor is returned, then the latest version has already
1113+
// been leased. So, nothing needs to be done here.
1114+
if desc == nil {
1115+
return true, nil
1116+
}
1117+
err = doUpsertion(desc)
1118+
if err != nil {
1119+
return false, err
1120+
}
10731121
}
1122+
10741123
return true, nil
10751124
})
10761125
if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil {
@@ -1084,6 +1133,22 @@ func acquireNodeLease(
10841133
return didAcquire, nil
10851134
}
10861135

1136+
var systemPrivilegesTableDescID atomic.Uint32
1137+
1138+
// isMaybeSystemPrivilegesTable tries to determine if the given descriptor is
1139+
// for the system privileges table. It depends on the namecache to resolve the
1140+
// table descriptor after which it's memoized.
1141+
func (m *Manager) isMaybeSystemPrivilegesTable(ctx context.Context, id descpb.ID) bool {
1142+
if systemPrivilegesTableDescID.Load() == uint32(descpb.InvalidID) {
1143+
if privilegesDesc, _ := m.names.get(ctx, keys.SystemDatabaseID, keys.SystemPublicSchemaID, "privileges", hlc.Timestamp{}); privilegesDesc != nil {
1144+
systemPrivilegesTableDescID.CompareAndSwap(uint32(descpb.InvalidID), uint32(privilegesDesc.GetID()))
1145+
privilegesDesc.Release(ctx)
1146+
}
1147+
}
1148+
1149+
return uint32(id) == systemPrivilegesTableDescID.Load()
1150+
}
1151+
10871152
// releaseLease deletes an entry from system.lease.
10881153
func releaseLease(ctx context.Context, lease *storedLease, m *Manager) (released bool) {
10891154
// Force the release to happen synchronously, if we are draining or, when we
@@ -1729,7 +1794,7 @@ func (m *Manager) Acquire(
17291794
t.markAcquisitionStart(ctx)
17301795
defer t.markAcquisitionDone(ctx)
17311796
// Renew lease and retry. This will block until the lease is acquired.
1732-
_, errLease := acquireNodeLease(ctx, m, id, AcquireBlock)
1797+
_, errLease := m.acquireNodeLease(ctx, id, AcquireBlock)
17331798
return errLease
17341799
}(); err != nil {
17351800
return nil, err
@@ -2371,7 +2436,7 @@ func (m *Manager) refreshSomeLeases(ctx context.Context, refreshAndPurgeAllDescr
23712436
return
23722437
}
23732438
}
2374-
if _, err := acquireNodeLease(ctx, m, id, AcquireBackground); err != nil {
2439+
if _, err := m.acquireNodeLease(ctx, id, AcquireBackground); err != nil {
23752440
log.Dev.Errorf(ctx, "refreshing descriptor: %d lease failed: %s", id, err)
23762441

23772442
if errors.Is(err, catalog.ErrDescriptorNotFound) || errors.Is(err, catalog.ErrDescriptorDropped) {

pkg/sql/catalog/lease/lease_internal_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -571,8 +571,8 @@ CREATE TABLE t.%s (k CHAR PRIMARY KEY, v CHAR);
571571
}
572572
expiration := lease.Expiration(context.Background())
573573
// Acquire another lease.
574-
if _, err := acquireNodeLease(
575-
context.Background(), leaseManager, tableDesc.GetID(), AcquireBlock,
574+
if _, err := leaseManager.acquireNodeLease(
575+
context.Background(), tableDesc.GetID(), AcquireBlock,
576576
); err != nil {
577577
t.Fatal(err)
578578
}

pkg/sql/catalog/lease/lease_test.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -558,13 +558,6 @@ func TestWaitForNewVersion(testingT *testing.T) {
558558
defer log.Scope(testingT).Close(testingT)
559559

560560
var params base.TestClusterArgs
561-
params.ServerArgs.Knobs = base.TestingKnobs{
562-
SQLLeaseManager: &lease.ManagerTestingKnobs{
563-
LeaseStoreTestingKnobs: lease.StorageTestingKnobs{
564-
LeaseAcquiredEvent: nil, // TODO
565-
},
566-
},
567-
}
568561
t := newLeaseTest(testingT, params)
569562
defer t.cleanup()
570563

@@ -587,7 +580,10 @@ func TestWaitForNewVersion(testingT *testing.T) {
587580
defer cancel()
588581

589582
_, err := leaseMgr.WaitForNewVersion(timeoutCtx, descID, nil, retry.Options{})
590-
require.ErrorIs(t, err, context.DeadlineExceeded)
583+
if !(sqltestutils.IsClientSideQueryCanceledErr(err) ||
584+
errors.Is(err, context.DeadlineExceeded)) {
585+
t.Fatalf("The client or the context should have timed out. Unexpected error: %v", err)
586+
}
591587
}
592588

593589
t.mustAcquire(3, descID)

pkg/sql/grant_revoke_system.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,11 @@ VALUES ($1, $2, $3, $4, (
128128
return err
129129
}
130130
userPrivs, found := syntheticPrivDesc.FindUser(user)
131+
emptyPrivs := !found || userPrivs.Privileges == 0
131132

132133
// For Public role and virtual tables, leave an empty
133134
// row to indicate that SELECT has been revoked.
134-
if !found && (n.grantOn == privilege.VirtualTable && user == username.PublicRoleName()) {
135+
if emptyPrivs && (n.grantOn == privilege.VirtualTable && user == username.PublicRoleName()) {
135136
_, err := params.p.InternalSQLTxn().ExecEx(
136137
params.ctx,
137138
`insert-system-privilege`,
@@ -151,7 +152,7 @@ VALUES ($1, $2, $3, $4, (
151152

152153
// If there are no entries remaining on the PrivilegeDescriptor for the user
153154
// we can remove the entire row for the user.
154-
if !found {
155+
if emptyPrivs {
155156
_, err := params.p.InternalSQLTxn().ExecEx(
156157
params.ctx,
157158
`delete-system-privilege`,

pkg/sql/tests/allow_role_memberships_to_change_during_transaction_test.go

Lines changed: 35 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,15 @@ import (
1212
"time"
1313

1414
"github.com/cockroachdb/cockroach/pkg/base"
15-
"github.com/cockroachdb/cockroach/pkg/testutils"
1615
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
1716
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
1817
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
1918
"github.com/cockroachdb/cockroach/pkg/util/log"
2019
"github.com/stretchr/testify/require"
2120
)
2221

23-
// TestAllowRoleMembershipsToChangeDuringTransaction ensures that when the
24-
// corresponding session variable is set for all sessions using cached
25-
// role memberships, performing new grants do not need to wait for open
26-
// transactions to complete.
22+
// TestAllowRoleMembershipsToChangeDuringTransaction ensures user operations
23+
// are not blocked by transactions with the corresponding session variable set.
2724
func TestAllowRoleMembershipsToChangeDuringTransaction(t *testing.T) {
2825
defer leaktest.AfterTest(t)()
2926
defer log.Scope(t).Close(t)
@@ -40,14 +37,15 @@ func TestAllowRoleMembershipsToChangeDuringTransaction(t *testing.T) {
4037
}
4138
}
4239

43-
// Create three users: foo, bar, and baz.
40+
// Create four users: foo, bar, biz, and baz.
4441
// Use one of these users to hold open a transaction which uses a lease
4542
// on the role_memberships table. Ensure that initially granting does
4643
// wait on that transaction. Then set the session variable and ensure
4744
// that it does not.
4845
tdb := sqlutils.MakeSQLRunner(sqlDB)
4946
tdb.Exec(t, "CREATE USER foo PASSWORD 'foo'")
5047
tdb.Exec(t, "CREATE USER bar")
48+
tdb.Exec(t, "CREATE USER biz")
5149
tdb.Exec(t, "CREATE USER baz")
5250
tdb.Exec(t, "GRANT admin TO foo")
5351
tdb.Exec(t, "CREATE DATABASE db2")
@@ -56,20 +54,27 @@ func TestAllowRoleMembershipsToChangeDuringTransaction(t *testing.T) {
5654
fooDB, cleanupFoo := openUser("foo", "db2")
5755
defer cleanupFoo()
5856

59-
// In this first test, we want to make sure that the query cannot proceed
60-
// until the outer transaction commits. We launch the grant while the
61-
// transaction is open, wait a tad, make sure it hasn't made progress,
62-
// then we commit the relevant transaction and ensure that it does finish.
57+
// Ensure that the outer transaction blocks the user admin operations until
58+
// it is committed and its lease on the initial versions of the tables are
59+
// released.
6360
t.Run("normal path waits", func(t *testing.T) {
64-
fooTx, err := fooDB.BeginTx(ctx, nil)
61+
outerTx, err := fooDB.BeginTx(ctx, nil)
6562
require.NoError(t, err)
63+
defer func() { _ = outerTx.Rollback() }()
64+
6665
var count int
6766
require.NoError(t,
68-
fooTx.QueryRow("SELECT count(*) FROM t").Scan(&count))
67+
outerTx.QueryRow("SELECT count(*) FROM t").Scan(&count))
6968
require.Equal(t, 0, count)
69+
70+
// the first user operation shouldn't block
71+
_, err = sqlDB.Exec("GRANT biz TO bar;")
72+
require.NoError(t, err)
73+
74+
// the outer tx still has a lease on the initial table version so this should block
7075
errCh := make(chan error, 1)
7176
go func() {
72-
_, err := sqlDB.Exec("GRANT bar TO baz;")
77+
_, err = sqlDB.Exec("GRANT baz TO bar;")
7378
errCh <- err
7479
}()
7580
select {
@@ -79,74 +84,38 @@ func TestAllowRoleMembershipsToChangeDuringTransaction(t *testing.T) {
7984
case err := <-errCh:
8085
t.Fatalf("expected transaction to block, got err: %v", err)
8186
}
82-
require.NoError(t, fooTx.Commit())
87+
require.NoError(t, outerTx.Commit())
8388
require.NoError(t, <-errCh)
8489
})
8590

86-
// In this test we ensure that we can perform role grant and revoke
87-
// operations while the transaction which uses the relevant roles
88-
// remains open. We ensure that the transaction still succeeds and
89-
// that the operations occur in a timely manner.
90-
t.Run("session variable prevents waiting during GRANT and REVOKE", func(t *testing.T) {
91+
// When the session variable is set, the outer transaction should not block
92+
// the user admin operations as it releases the leases after every
93+
// statement.
94+
t.Run("session variable prevents blocking", func(t *testing.T) {
9195
fooConn, err := fooDB.Conn(ctx)
9296
require.NoError(t, err)
9397
defer func() { _ = fooConn.Close() }()
94-
_, err = fooConn.ExecContext(ctx, "SET allow_role_memberships_to_change_during_transaction = true;")
98+
99+
outerTx, err := fooConn.BeginTx(ctx, nil)
95100
require.NoError(t, err)
96-
fooTx, err := fooConn.BeginTx(ctx, nil)
101+
defer func() { _ = outerTx.Rollback() }()
102+
103+
_, err = outerTx.ExecContext(ctx, "SET allow_role_memberships_to_change_during_transaction = true;")
97104
require.NoError(t, err)
105+
98106
var count int
99107
require.NoError(t,
100-
fooTx.QueryRow("SELECT count(*) FROM t").Scan(&count))
108+
outerTx.QueryRow("SELECT count(*) FROM t").Scan(&count))
101109
require.Equal(t, 0, count)
102-
conn, err := sqlDB.Conn(ctx)
103-
require.NoError(t, err)
104-
defer func() { _ = conn.Close() }()
105-
// Set a timeout on the SQL operations to ensure that they both
106-
// happen in a timely manner.
107-
grantRevokeTimeout, cancel := context.WithTimeout(
108-
ctx, testutils.DefaultSucceedsSoonDuration,
109-
)
110-
defer cancel()
111-
112-
_, err = conn.ExecContext(grantRevokeTimeout, "GRANT foo TO baz;")
113-
require.NoError(t, err)
114-
_, err = conn.ExecContext(grantRevokeTimeout, "REVOKE bar FROM baz;")
115-
require.NoError(t, err)
116110

117-
// Ensure the transaction we held open commits without issue.
118-
require.NoError(t, fooTx.Commit())
119-
})
120-
121-
t.Run("session variable prevents waiting during CREATE and DROP role", func(t *testing.T) {
122-
fooConn, err := fooDB.Conn(ctx)
123-
require.NoError(t, err)
124-
defer func() { _ = fooConn.Close() }()
125-
_, err = fooConn.ExecContext(ctx, "SET allow_role_memberships_to_change_during_transaction = true;")
126-
require.NoError(t, err)
127-
fooTx, err := fooConn.BeginTx(ctx, nil)
128-
require.NoError(t, err)
129-
// We need to use show roles because that access the system.users table.
130-
_, err = fooTx.Exec("SHOW ROLES")
111+
// the first user operation shouldn't block
112+
_, err = sqlDB.Exec("GRANT biz TO bar;")
131113
require.NoError(t, err)
132114

133-
conn, err := sqlDB.Conn(ctx)
134-
require.NoError(t, err)
135-
defer func() { _ = conn.Close() }()
136-
// Set a timeout on the SQL operations to ensure that they both
137-
// happen in a timely manner.
138-
grantRevokeTimeout, cancel := context.WithTimeout(
139-
ctx, testutils.DefaultSucceedsSoonDuration,
140-
)
141-
defer cancel()
142-
143-
_, err = conn.ExecContext(grantRevokeTimeout, "CREATE ROLE new_role;")
144-
require.NoError(t, err)
145-
_, err = conn.ExecContext(grantRevokeTimeout, "DROP ROLE new_role;")
115+
// nor should the second
116+
_, err = sqlDB.Exec("GRANT baz TO bar;")
146117
require.NoError(t, err)
147118

148-
// Ensure the transaction we held open commits without issue.
149-
require.NoError(t, fooTx.Commit())
119+
require.NoError(t, outerTx.Commit())
150120
})
151-
152121
}

0 commit comments

Comments
 (0)