Skip to content

Commit cdf6665

Browse files
committed
sql: update stats for descriptor table
This is needed in order to enforce the sql.schema.approx_max_object_count cluster setting, which relies on optimizer table statistics to find the count. Since the schemachanger uses the KV API to write to the descriptor table, we need to explicitly notify the stats refresher when it should compute new stats for the table. Release note: None
1 parent 2a1771e commit cdf6665

File tree

3 files changed

+46
-44
lines changed

3 files changed

+46
-44
lines changed

pkg/sql/catalog/descs/collection.go

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -273,21 +273,17 @@ func (tc *Collection) IsVersionBumpOfUncommittedDescriptor(id descpb.ID) bool {
273273
return tc.uncommitted.versionBumpOnly[id]
274274
}
275275

276-
// HasUncommittedNewOrDroppedDescriptors returns true if the collection contains
277-
// any uncommitted descriptors that are newly created or dropped.
278-
func (tc *Collection) HasUncommittedNewOrDroppedDescriptors() bool {
279-
isNewDescriptor := false
280-
err := tc.uncommitted.iterateUncommittedByID(func(desc catalog.Descriptor) error {
276+
// CountUncommittedNewOrDroppedDescriptors returns the number of uncommitted
277+
// descriptors that are newly created or dropped.
278+
func (tc *Collection) CountUncommittedNewOrDroppedDescriptors() int {
279+
count := 0
280+
_ = tc.uncommitted.iterateUncommittedByID(func(desc catalog.Descriptor) error {
281281
if desc.GetVersion() == 1 || desc.Dropped() {
282-
isNewDescriptor = true
283-
return iterutil.StopIteration()
282+
count++
284283
}
285284
return nil
286285
})
287-
if err != nil {
288-
return false
289-
}
290-
return isNewDescriptor
286+
return count
291287
}
292288

293289
// HasUncommittedTypes returns true if the Collection contains uncommitted

pkg/sql/conn_executor.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4176,7 +4176,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
41764176
return advanceInfo{}, err
41774177
}
41784178
}
4179-
if ex.extraTxnState.descCollection.HasUncommittedNewOrDroppedDescriptors() {
4179+
if ex.extraTxnState.descCollection.CountUncommittedNewOrDroppedDescriptors() > 0 {
41804180
execCfg := ex.planner.ExecCfg()
41814181
if err := UpdateDescriptorCount(ex.Ctx(), execCfg, execCfg.SchemaChangerMetrics); err != nil {
41824182
log.Dev.Warningf(ex.Ctx(), "failed to scan descriptor table: %v", err)
@@ -4578,6 +4578,16 @@ func (ex *connExecutor) notifyStatsRefresherOfNewTables(ctx context.Context) {
45784578
ex.planner.execCfg.StatsRefresher.NotifyMutation(desc, math.MaxInt32 /* rowsAffected */)
45794579
}
45804580
}
4581+
if cnt := ex.extraTxnState.descCollection.CountUncommittedNewOrDroppedDescriptors(); cnt > 0 {
4582+
// Notify the refresher of a mutation on the system.descriptor table.
4583+
// We conservatively assume that any transaction which creates or
4584+
desc, err := ex.extraTxnState.descCollection.ByIDWithLeased(ex.planner.txn).Get().Table(ctx, keys.DescriptorTableID)
4585+
if err != nil {
4586+
log.Dev.Warningf(ctx, "failed to fetch descriptor table to refresh stats: %v", err)
4587+
return
4588+
}
4589+
ex.planner.execCfg.StatsRefresher.NotifyMutation(desc, cnt)
4590+
}
45814591
}
45824592

45834593
func (ex *connExecutor) getDescIDGenerator() eval.DescIDGenerator {

pkg/sql/schemachanger/schemachanger_test.go

Lines changed: 28 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,6 +1165,9 @@ func TestApproxMaxSchemaObjects(t *testing.T) {
11651165
defer leaktest.AfterTest(t)()
11661166
defer log.Scope(t).Close(t)
11671167

1168+
skip.UnderDuress(t, "slow test, requires polling to wait for auto stats job")
1169+
skip.UnderShort(t, "slow test, requires polling to wait for auto stats job")
1170+
11681171
ctx := context.Background()
11691172

11701173
// Test with both declarative schema changer modes.
@@ -1174,40 +1177,41 @@ func TestApproxMaxSchemaObjects(t *testing.T) {
11741177
defer s.Stopper().Stop(ctx)
11751178

11761179
tdb := sqlutils.MakeSQLRunner(sqlDB)
1180+
tdb.Exec(t, "SET CLUSTER SETTING sql.stats.automatic_collection.min_stale_rows = 1")
11771181

11781182
// Configure the declarative schema changer mode.
11791183
if useDeclarative {
1184+
tdb.Exec(t, `SET sql.defaults.use_declarative_schema_changer = 'on'`)
11801185
tdb.Exec(t, `SET use_declarative_schema_changer = 'on'`)
11811186
} else {
1187+
tdb.Exec(t, `SET sql.defaults.use_declarative_schema_changer = 'off'`)
11821188
tdb.Exec(t, `SET use_declarative_schema_changer = 'off'`)
11831189
}
11841190

1185-
// Get the current count of descriptors to set a realistic limit.
1186-
var currentCount int
1187-
tdb.QueryRow(t, `SELECT count(*) FROM system.descriptor`).Scan(&currentCount)
1191+
var maxObjects int
1192+
updateMaxObjects := func() {
1193+
// Manually refresh stats.
1194+
tdb.Exec(t, "ANALYZE system.public.descriptor")
11881195

1189-
// Set the limit to be slightly more than current count.
1190-
maxObjects := currentCount + 5
1191-
tdb.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING sql.schema.approx_max_object_count = %d`, maxObjects))
1196+
// Get the current count of descriptors to set a realistic limit.
1197+
var currentCount int
1198+
tdb.QueryRow(t, `SELECT count(*) FROM system.descriptor`).Scan(&currentCount)
11921199

1193-
// Create a test database and use it.
1194-
tdb.Exec(t, `CREATE DATABASE testdb`)
1195-
tdb.Exec(t, `USE testdb`)
1200+
// Set the limit to be slightly more than current count.
1201+
maxObjects = currentCount + 1
1202+
tdb.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING sql.schema.approx_max_object_count = %d`, maxObjects))
1203+
}
1204+
updateMaxObjects()
11961205

11971206
// Test that different object types are subject to the limit.
11981207
objectTypes := []string{"table", "database", "schema", "type", "function"}
11991208
for _, objectType := range objectTypes {
12001209
t.Run(objectType, func(t *testing.T) {
12011210
// Increase the limit before each subtest to avoid interference.
1202-
maxObjects += 10
1203-
tdb.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING sql.schema.approx_max_object_count = %d`, maxObjects))
1204-
1205-
// Try to create objects until we hit the limit.
1206-
// ANALYZE before starting to ensure stats are fresh.
1207-
tdb.Exec(t, `ANALYZE system.descriptor`)
1211+
updateMaxObjects()
12081212

12091213
objNum := 0
1210-
for {
1214+
testutils.SucceedsWithin(t, func() error {
12111215
var createStmt string
12121216
switch objectType {
12131217
case "table":
@@ -1224,30 +1228,22 @@ func TestApproxMaxSchemaObjects(t *testing.T) {
12241228

12251229
_, err := sqlDB.Exec(createStmt)
12261230
if err != nil {
1227-
// Check if we got the expected error.
1231+
// Check if we got the expected error and message.
12281232
if pqErr := (*pq.Error)(nil); errors.As(err, &pqErr) {
12291233
if string(pqErr.Code) == pgcode.ConfigurationLimitExceeded.String() {
1230-
// Verify the error message mentions "approximate maximum".
1231-
testutils.IsError(err, "would exceed approximate maximum")
1232-
break
1234+
if testutils.IsError(err, "would exceed approximate maximum") {
1235+
return nil
1236+
}
12331237
}
12341238
}
12351239
// Some other error occurred.
1236-
t.Fatal(err)
1240+
return err
12371241
}
12381242
objNum++
12391243

1240-
// Re-analyze periodically to update stats.
1241-
if objNum%2 == 0 {
1242-
tdb.Exec(t, `ANALYZE system.descriptor`)
1243-
}
1244-
1245-
// Safety check: if we created way more objects than expected,
1246-
// something is wrong.
1247-
if objNum > 30 {
1248-
t.Fatalf("created %d %ss without hitting limit (max=%d)", objNum, objectType, maxObjects)
1249-
}
1250-
}
1244+
// Haven't hit the limit yet, keep trying.
1245+
return errors.Errorf("created %d %ss without hitting limit (max=%d)", objNum, objectType, maxObjects)
1246+
}, 5*time.Minute)
12511247
})
12521248
}
12531249
})

0 commit comments

Comments
 (0)