Skip to content

Commit 65c953b

Browse files
committed
sql: unblock user table changes from long-running transactions
Currently, the `CREATE USER` and the `GRANT role` operations use a bumped descriptor to invalidate caches of the user table. This blocked those operations until long-running transactions using the old table version committed. This change adds special handling of version bumps to wait for visibility of the change rather than convergence across the cluster. It no longer creates (blocking) jobs for the version bumps. Epic: CRDB-49398 Fixes: #138691 Release note (sql change): The `CREATE USER` and `GRANT role` operations now wait for full-cluster visibility of the new user table version rather than blocking on convergence.
1 parent 52d5277 commit 65c953b

File tree

14 files changed

+430
-89
lines changed

14 files changed

+430
-89
lines changed

pkg/bench/rttanalysis/testdata/benchmark_expectations

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
exp,benchmark
2-
14,AlterRole/alter_role_with_1_option
3-
17,AlterRole/alter_role_with_2_options
4-
23,AlterRole/alter_role_with_3_options
2+
12,AlterRole/alter_role_with_1_option
3+
15,AlterRole/alter_role_with_2_options
4+
19,AlterRole/alter_role_with_3_options
55
13,AlterTableAddCheckConstraint/alter_table_add_1_check_constraint
66
13,AlterTableAddCheckConstraint/alter_table_add_2_check_constraints
77
13,AlterTableAddCheckConstraint/alter_table_add_3_check_constraints
@@ -15,9 +15,9 @@ exp,benchmark
1515
14,AlterTableConfigureZone/alter_table_configure_zone_5_replicas
1616
14,AlterTableConfigureZone/alter_table_configure_zone_7_replicas_
1717
14,AlterTableConfigureZone/alter_table_configure_zone_ranges
18-
21,AlterTableDropColumn/alter_table_drop_1_column
19-
21,AlterTableDropColumn/alter_table_drop_2_columns
20-
21,AlterTableDropColumn/alter_table_drop_3_columns
18+
22,AlterTableDropColumn/alter_table_drop_1_column
19+
22,AlterTableDropColumn/alter_table_drop_2_columns
20+
22,AlterTableDropColumn/alter_table_drop_3_columns
2121
13,AlterTableDropConstraint/alter_table_drop_1_check_constraint
2222
13,AlterTableDropConstraint/alter_table_drop_2_check_constraints
2323
13,AlterTableDropConstraint/alter_table_drop_3_check_constraints
@@ -28,20 +28,20 @@ exp,benchmark
2828
11,AlterTableUnsplit/alter_table_unsplit_at_2_values
2929
14,AlterTableUnsplit/alter_table_unsplit_at_3_values
3030
2-4,Audit/select_from_an_audit_table
31-
20,CreateRole/create_role_with_1_option
32-
23,CreateRole/create_role_with_2_options
33-
26,CreateRole/create_role_with_3_options
34-
21,CreateRole/create_role_with_no_options
31+
13,CreateRole/create_role_with_1_option
32+
16,CreateRole/create_role_with_2_options
33+
19,CreateRole/create_role_with_3_options
34+
14,CreateRole/create_role_with_no_options
3535
14,"Discard/DISCARD_ALL,_1_tables_in_1_db"
3636
19,"Discard/DISCARD_ALL,_2_tables_in_2_dbs"
3737
0,"Discard/DISCARD_ALL,_no_tables"
3838
15,DropDatabase/drop_database_0_tables
3939
16,DropDatabase/drop_database_1_table
4040
16,DropDatabase/drop_database_2_tables
4141
16,DropDatabase/drop_database_3_tables
42-
29-30,DropRole/drop_1_role
43-
37,DropRole/drop_2_roles
44-
45-49,DropRole/drop_3_roles
42+
22,DropRole/drop_1_role
43+
30,DropRole/drop_2_roles
44+
39,DropRole/drop_3_roles
4545
13,DropSequence/drop_1_sequence
4646
15,DropSequence/drop_2_sequences
4747
17,DropSequence/drop_3_sequences
@@ -59,8 +59,8 @@ exp,benchmark
5959
8,Grant/grant_all_on_1_table
6060
8,Grant/grant_all_on_2_tables
6161
8,Grant/grant_all_on_3_tables
62-
15,GrantRole/grant_1_role
63-
19,GrantRole/grant_2_roles
62+
12,GrantRole/grant_1_role
63+
16,GrantRole/grant_2_roles
6464
12-15,Jobs/cancel_job
6565
3,Jobs/crdb_internal.system_jobs
6666
3-5,Jobs/jobs_page_default
@@ -79,19 +79,19 @@ exp,benchmark
7979
4,ORMQueries/django_column_introspection_1_table
8080
4,ORMQueries/django_column_introspection_4_tables
8181
4,ORMQueries/django_column_introspection_8_tables
82-
6,ORMQueries/django_comment_introspection_with_comments
83-
6,ORMQueries/django_table_introspection_1_table
84-
6,ORMQueries/django_table_introspection_8_tables
82+
5,ORMQueries/django_comment_introspection_with_comments
83+
5,ORMQueries/django_table_introspection_1_table
84+
5,ORMQueries/django_table_introspection_8_tables
8585
0,ORMQueries/has_column_privilege_using_attnum
8686
0,ORMQueries/has_column_privilege_using_column_name
8787
0,ORMQueries/has_schema_privilege
8888
0,ORMQueries/has_sequence_privilege
8989
0,ORMQueries/has_table_privilege
90-
5,ORMQueries/hasura_column_descriptions
90+
6,ORMQueries/hasura_column_descriptions
9191
20,ORMQueries/hasura_column_descriptions_8_tables
9292
5,ORMQueries/hasura_column_descriptions_modified
9393
4,ORMQueries/information_schema._pg_index_position
94-
4,ORMQueries/introspection_description_join
94+
5,ORMQueries/introspection_description_join
9595
4,ORMQueries/liquibase_migrations
9696
8,ORMQueries/liquibase_migrations_on_multiple_dbs
9797
5,ORMQueries/npgsql_fields
@@ -105,14 +105,14 @@ exp,benchmark
105105
3,ORMQueries/pg_namespace
106106
4,ORMQueries/pg_type
107107
6,ORMQueries/prisma_column_descriptions
108-
3,ORMQueries/prisma_column_descriptions_updated
108+
4,ORMQueries/prisma_column_descriptions_updated
109109
5,ORMQueries/prisma_types_16
110110
5,ORMQueries/prisma_types_4
111111
8,Revoke/revoke_all_on_1_table
112112
8,Revoke/revoke_all_on_2_tables
113113
8,Revoke/revoke_all_on_3_tables
114-
13,RevokeRole/revoke_1_role
115-
15,RevokeRole/revoke_2_roles
114+
10,RevokeRole/revoke_1_role
115+
12,RevokeRole/revoke_2_roles
116116
4,ShowGrants/grant_2_roles
117117
4,ShowGrants/grant_3_roles
118118
4,ShowGrants/grant_4_roles
@@ -129,7 +129,7 @@ exp,benchmark
129129
0,UDFResolution/select_from_udf
130130
2,UseManyRoles/use_2_roles
131131
2,UseManyRoles/use_50_roles
132-
1,VirtualTableQueries/select_crdb_internal.invalid_objects_with_1_fk
132+
2,VirtualTableQueries/select_crdb_internal.invalid_objects_with_1_fk
133133
1,VirtualTableQueries/select_crdb_internal.tables_with_1_fk
134134
0,VirtualTableQueries/show_create_all_routines
135135
0,VirtualTableQueries/show_create_all_triggers

pkg/cli/testdata/doctor/test_examine_cluster

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@ debug doctor examine cluster
33
debug doctor examine cluster
44
Examining 69 descriptors and 68 namespace entries...
55
ParentID 100, ParentSchemaID 101: relation "foo" (105): expected matching namespace entry, found none
6-
Examining 13 jobs...
6+
Examining 12 jobs...
77
ERROR: validation failed

pkg/cli/testdata/doctor/test_examine_cluster_dropped

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@ debug doctor examine cluster
22
----
33
debug doctor examine cluster
44
Examining 68 descriptors and 68 namespace entries...
5-
Examining 11 jobs...
5+
Examining 10 jobs...
66
No problems found!

pkg/sql/catalog/descs/collection.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,11 @@ func (tc *Collection) IsNewUncommittedDescriptor(id descpb.ID) bool {
267267
return false
268268
}
269269

270+
// IsVersionBumpOfUncommittedDescriptor returns true if the descriptor is only having its version bumped (without mutations) in this transaction.
271+
func (tc *Collection) IsVersionBumpOfUncommittedDescriptor(id descpb.ID) bool {
272+
return tc.uncommitted.versionBumpOnly[id]
273+
}
274+
270275
// HasUncommittedNewOrDroppedDescriptors returns true if the collection contains
271276
// any uncommitted descriptors that are newly created or dropped.
272277
func (tc *Collection) HasUncommittedNewOrDroppedDescriptors() bool {
@@ -330,9 +335,30 @@ func (tc *Collection) AddUncommittedDescriptor(
330335
desc.DescriptorType(), desc.GetName(), desc.GetID())
331336
}
332337
tc.markAsShadowedName(desc.GetID())
338+
339+
// It's the responsibility of the caller to restore the flag (see MaybeMarkVersionBump)
340+
tc.uncommitted.versionBumpOnly[desc.GetID()] = false
341+
333342
return tc.uncommitted.upsert(ctx, desc)
334343
}
335344

345+
// MaybeMarkVersionBump provides a defer-friendly function that updates the
346+
// version bump only flag for the descriptor so it reflects previous mutations
347+
// to the descriptor along with the current mutation.
348+
// The returned function is to be called after AddUncommittedDescriptor or
349+
// functions that call it (such as WriteDescToBatch).
350+
func (tc *Collection) MaybeMarkVersionBump(
351+
desc catalog.MutableDescriptor, isVersionBump bool,
352+
) func() {
353+
prev, ok := tc.uncommitted.versionBumpOnly[desc.GetID()]
354+
355+
return func() {
356+
tc.uncommitted.versionBumpOnly[desc.GetID()] =
357+
(!ok || prev) && // if the flag isn't set or it was previously set up
358+
isVersionBump
359+
}
360+
}
361+
336362
// WriteDescToBatch calls MaybeIncrementVersion, adds the descriptor to the
337363
// collection as an uncommitted descriptor, and writes it into b.
338364
func (tc *Collection) WriteDescToBatch(

pkg/sql/catalog/descs/uncommitted_descriptors.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,21 @@ type uncommittedDescriptors struct {
4242
uncommitted nstree.NameMap
4343
original, mutable nstree.IDMap
4444

45+
// versionBumpOnly stores descriptors that have been modified only by
46+
// changing their version counter, without any other mutations. These bumps
47+
// are used to trigger cache invalidation and are tracked to avoid the
48+
// typical block on version convergence behavior.
49+
versionBumpOnly map[descpb.ID]bool
50+
4551
// memAcc is the actual account of an injected, upstream monitor
4652
// to track memory usage of uncommittedDescriptors.
4753
memAcc mon.BoundAccount
4854
}
4955

5056
func makeUncommittedDescriptors(monitor *mon.BytesMonitor) uncommittedDescriptors {
5157
return uncommittedDescriptors{
52-
memAcc: monitor.MakeBoundAccount(),
58+
versionBumpOnly: map[descpb.ID]bool{},
59+
memAcc: monitor.MakeBoundAccount(),
5360
}
5461
}
5562

@@ -61,10 +68,11 @@ func (ud *uncommittedDescriptors) reset(ctx context.Context) {
6168
ud.memAcc.Clear(ctx)
6269
old := *ud
6370
*ud = uncommittedDescriptors{
64-
original: old.original,
65-
uncommitted: old.uncommitted,
66-
mutable: old.mutable,
67-
memAcc: old.memAcc,
71+
original: old.original,
72+
uncommitted: old.uncommitted,
73+
mutable: old.mutable,
74+
versionBumpOnly: map[descpb.ID]bool{},
75+
memAcc: old.memAcc,
6876
}
6977
}
7078

pkg/sql/catalog/lease/lease.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,46 @@ SELECT DISTINCT session_id FROM system.lease WHERE desc_id=%d AND crdb_internal.
265265
return sessionIDs, nil
266266
}
267267

268+
// countSessionsHoldingStaleDescriptor finds sessionIDs that are holding a lease
269+
// on the previous version of desc but not the current version.
270+
func countSessionsHoldingStaleDescriptor(
271+
ctx context.Context, txn isql.Txn, desc catalog.Descriptor, region string,
272+
) (int, error) {
273+
b := strings.Builder{}
274+
275+
// Counts sessions that have previous version of the descriptor but not the current version
276+
b.WriteString(fmt.Sprintf(`
277+
SELECT count(DISTINCT l1.session_id)
278+
FROM system.lease l1
279+
WHERE l1.desc_id = %d
280+
AND l1.version < %d
281+
AND crdb_internal.sql_liveness_is_alive(l1.session_id)
282+
AND NOT EXISTS (
283+
SELECT 1 FROM system.lease l2
284+
WHERE l2.desc_id = l1.desc_id
285+
AND l2.session_id = l1.session_id
286+
AND l2.version = %d
287+
`, desc.GetID(), desc.GetVersion(), desc.GetVersion()))
288+
if region != "" {
289+
b.WriteString(fmt.Sprintf(" AND l2.crdb_region='%s'", region))
290+
}
291+
b.WriteString(")")
292+
if region != "" {
293+
b.WriteString(fmt.Sprintf(" AND l1.crdb_region='%s'", region))
294+
}
295+
296+
rows, err := txn.QueryBuffered(ctx, "count-sessions-holding-stale-descriptor", txn.KV(), b.String())
297+
if err != nil {
298+
return 0, err
299+
}
300+
301+
if len(rows) == 0 {
302+
return 0, nil
303+
}
304+
305+
return int(tree.MustBeDInt(rows[0][0])), nil
306+
}
307+
268308
// WaitForInitialVersion waits for a lease to be acquired on a newly created
269309
// object on any nodes that have already leased the schema out. This ensures
270310
// that their leaseGeneration is incremented before the user commit completes,
@@ -511,6 +551,100 @@ func (m *Manager) WaitForOneVersion(
511551
return desc, nil
512552
}
513553

554+
// WaitForNewVersion returns once all leaseholders of any version of the
555+
// descriptor hold a lease of the current version.
556+
//
557+
// The MaxRetries and MaxDuration in retryOpts should not be set.
558+
func (m *Manager) WaitForNewVersion(
559+
ctx context.Context,
560+
descriptorId descpb.ID,
561+
retryOpts retry.Options,
562+
regions regionliveness.CachedDatabaseRegions,
563+
) (catalog.Descriptor, error) {
564+
if retryOpts.MaxRetries != 0 {
565+
return nil, errors.New("The MaxRetries option shouldn't be set in WaitForNewVersion")
566+
}
567+
if retryOpts.MaxDuration != 0 {
568+
return nil, errors.New("The MaxDuration option shouldn't be set in WaitForNewVersion")
569+
}
570+
571+
var desc catalog.Descriptor
572+
573+
var success bool
574+
// Block until each leaseholder on the previous version of the descriptor
575+
// also holds a lease on the current version of the descriptor (`for all
576+
// session: (session in Prev => session in Curr)` for the set theory
577+
// enjoyers).
578+
for r := retry.Start(retryOpts); r.Next(); {
579+
var err error
580+
desc, err = m.maybeGetDescriptorWithoutValidation(ctx, descriptorId, true)
581+
if err != nil {
582+
return nil, err
583+
}
584+
585+
db := m.storage.db
586+
587+
// Get the sessions with leases in each region.
588+
if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
589+
prober := regionliveness.NewLivenessProber(db.KV(), m.storage.codec, regions, m.settings)
590+
regionMap, err := prober.QueryLiveness(ctx, txn.KV())
591+
if err != nil {
592+
return err
593+
}
594+
595+
// On single region clusters we can query everything at once.
596+
if regionMap == nil {
597+
regionMap = regionliveness.LiveRegions{"": struct{}{}}
598+
}
599+
600+
var staleSessionCount int
601+
for region := range regionMap {
602+
var regionStaleSessionCount int
603+
var err error
604+
if hasTimeout, timeout := prober.GetProbeTimeout(); hasTimeout {
605+
err = timeutil.RunWithTimeout(ctx, "count-sessions-holding-stale-descriptor-by-region", timeout, func(ctx context.Context) (countErr error) {
606+
regionStaleSessionCount, countErr = countSessionsHoldingStaleDescriptor(ctx, txn, desc, region)
607+
return countErr
608+
})
609+
} else {
610+
regionStaleSessionCount, err = countSessionsHoldingStaleDescriptor(ctx, txn, desc, region)
611+
}
612+
if err != nil {
613+
return handleRegionLivenessErrors(ctx, prober, region, err)
614+
}
615+
616+
staleSessionCount += regionStaleSessionCount
617+
618+
if regionStaleSessionCount != 0 { // quit early
619+
if region == "" {
620+
log.Infof(ctx, "%d sessions holding stale descriptor", regionStaleSessionCount)
621+
} else {
622+
log.Infof(ctx, "Region '%s' has %d sessions holding stale descriptor", region, regionStaleSessionCount)
623+
}
624+
break
625+
}
626+
}
627+
628+
if staleSessionCount == 0 {
629+
success = true
630+
}
631+
632+
return nil
633+
}); err != nil {
634+
return nil, err
635+
}
636+
637+
if success {
638+
break
639+
}
640+
}
641+
if !success {
642+
return nil, errors.New("Exited lease acquisition loop before success")
643+
}
644+
645+
return desc, nil
646+
}
647+
514648
// IDVersion represents a descriptor ID, version pair that are
515649
// meant to map to a single immutable descriptor.
516650
type IDVersion struct {

0 commit comments

Comments
 (0)