From a4a3180a9facccb95b777a7c30e5d2dcdcb2c1e6 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Fri, 3 Oct 2025 04:55:57 +0000 Subject: [PATCH 1/2] sql: update stats for descriptor table This is needed in order to enforce the sql.schema.approx_max_object_count cluster setting, which relies on optimizer table statistics to find the count. Since the schemachanger uses the KV API to write to the descriptor table, we need to explicitly notify the stats refresher when it should compute new stats for the table. Release note: None --- pkg/sql/catalog/descs/collection.go | 18 +++---- pkg/sql/conn_executor.go | 12 ++++- pkg/sql/schemachanger/schemachanger_test.go | 60 ++++++++++----------- 3 files changed, 46 insertions(+), 44 deletions(-) diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index bd77d9c7ba00..1bd83146bcaf 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -273,21 +273,17 @@ func (tc *Collection) IsVersionBumpOfUncommittedDescriptor(id descpb.ID) bool { return tc.uncommitted.versionBumpOnly[id] } -// HasUncommittedNewOrDroppedDescriptors returns true if the collection contains -// any uncommitted descriptors that are newly created or dropped. -func (tc *Collection) HasUncommittedNewOrDroppedDescriptors() bool { - isNewDescriptor := false - err := tc.uncommitted.iterateUncommittedByID(func(desc catalog.Descriptor) error { +// CountUncommittedNewOrDroppedDescriptors returns the number of uncommitted +// descriptors that are newly created or dropped. +func (tc *Collection) CountUncommittedNewOrDroppedDescriptors() int { + count := 0 + _ = tc.uncommitted.iterateUncommittedByID(func(desc catalog.Descriptor) error { if desc.GetVersion() == 1 || desc.Dropped() { - isNewDescriptor = true - return iterutil.StopIteration() + count++ } return nil }) - if err != nil { - return false - } - return isNewDescriptor + return count } // HasUncommittedTypes returns true if the Collection contains uncommitted diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 75f258545b5f..d1fbcd6555bf 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -4167,7 +4167,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( return advanceInfo{}, err } } - if ex.extraTxnState.descCollection.HasUncommittedNewOrDroppedDescriptors() { + if ex.extraTxnState.descCollection.CountUncommittedNewOrDroppedDescriptors() > 0 { execCfg := ex.planner.ExecCfg() if err := UpdateDescriptorCount(ex.Ctx(), execCfg, execCfg.SchemaChangerMetrics); err != nil { log.Dev.Warningf(ex.Ctx(), "failed to scan descriptor table: %v", err) @@ -4569,6 +4569,16 @@ func (ex *connExecutor) notifyStatsRefresherOfNewTables(ctx context.Context) { ex.planner.execCfg.StatsRefresher.NotifyMutation(desc, math.MaxInt32 /* rowsAffected */) } } + if cnt := ex.extraTxnState.descCollection.CountUncommittedNewOrDroppedDescriptors(); cnt > 0 { + // Notify the refresher of a mutation on the system.descriptor table. + // We conservatively assume that any transaction which creates or + desc, err := ex.extraTxnState.descCollection.ByIDWithLeased(ex.planner.txn).Get().Table(ctx, keys.DescriptorTableID) + if err != nil { + log.Dev.Warningf(ctx, "failed to fetch descriptor table to refresh stats: %v", err) + return + } + ex.planner.execCfg.StatsRefresher.NotifyMutation(desc, cnt) + } } func (ex *connExecutor) getDescIDGenerator() eval.DescIDGenerator { diff --git a/pkg/sql/schemachanger/schemachanger_test.go b/pkg/sql/schemachanger/schemachanger_test.go index 478e82f20604..31bc0044dab5 100644 --- a/pkg/sql/schemachanger/schemachanger_test.go +++ b/pkg/sql/schemachanger/schemachanger_test.go @@ -1165,6 +1165,9 @@ func TestApproxMaxSchemaObjects(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.UnderDuress(t, "slow test, requires polling to wait for auto stats job") + skip.UnderShort(t, "slow test, requires polling to wait for auto stats job") + ctx := context.Background() // Test with both declarative schema changer modes. @@ -1174,40 +1177,41 @@ func TestApproxMaxSchemaObjects(t *testing.T) { defer s.Stopper().Stop(ctx) tdb := sqlutils.MakeSQLRunner(sqlDB) + tdb.Exec(t, "SET CLUSTER SETTING sql.stats.automatic_collection.min_stale_rows = 1") // Configure the declarative schema changer mode. if useDeclarative { + tdb.Exec(t, `SET sql.defaults.use_declarative_schema_changer = 'on'`) tdb.Exec(t, `SET use_declarative_schema_changer = 'on'`) } else { + tdb.Exec(t, `SET sql.defaults.use_declarative_schema_changer = 'off'`) tdb.Exec(t, `SET use_declarative_schema_changer = 'off'`) } - // Get the current count of descriptors to set a realistic limit. - var currentCount int - tdb.QueryRow(t, `SELECT count(*) FROM system.descriptor`).Scan(¤tCount) + var maxObjects int + updateMaxObjects := func() { + // Manually refresh stats. + tdb.Exec(t, "ANALYZE system.public.descriptor") - // Set the limit to be slightly more than current count. - maxObjects := currentCount + 5 - tdb.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING sql.schema.approx_max_object_count = %d`, maxObjects)) + // Get the current count of descriptors to set a realistic limit. + var currentCount int + tdb.QueryRow(t, `SELECT count(*) FROM system.descriptor`).Scan(¤tCount) - // Create a test database and use it. - tdb.Exec(t, `CREATE DATABASE testdb`) - tdb.Exec(t, `USE testdb`) + // Set the limit to be slightly more than current count. + maxObjects = currentCount + 1 + tdb.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING sql.schema.approx_max_object_count = %d`, maxObjects)) + } + updateMaxObjects() // Test that different object types are subject to the limit. objectTypes := []string{"table", "database", "schema", "type", "function"} for _, objectType := range objectTypes { t.Run(objectType, func(t *testing.T) { // Increase the limit before each subtest to avoid interference. - maxObjects += 10 - tdb.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING sql.schema.approx_max_object_count = %d`, maxObjects)) - - // Try to create objects until we hit the limit. - // ANALYZE before starting to ensure stats are fresh. - tdb.Exec(t, `ANALYZE system.descriptor`) + updateMaxObjects() objNum := 0 - for { + testutils.SucceedsWithin(t, func() error { var createStmt string switch objectType { case "table": @@ -1224,30 +1228,22 @@ func TestApproxMaxSchemaObjects(t *testing.T) { _, err := sqlDB.Exec(createStmt) if err != nil { - // Check if we got the expected error. + // Check if we got the expected error and message. if pqErr := (*pq.Error)(nil); errors.As(err, &pqErr) { if string(pqErr.Code) == pgcode.ConfigurationLimitExceeded.String() { - // Verify the error message mentions "approximate maximum". - testutils.IsError(err, "would exceed approximate maximum") - break + if testutils.IsError(err, "would exceed approximate maximum") { + return nil + } } } // Some other error occurred. - t.Fatal(err) + return err } objNum++ - // Re-analyze periodically to update stats. - if objNum%2 == 0 { - tdb.Exec(t, `ANALYZE system.descriptor`) - } - - // Safety check: if we created way more objects than expected, - // something is wrong. - if objNum > 30 { - t.Fatalf("created %d %ss without hitting limit (max=%d)", objNum, objectType, maxObjects) - } - } + // Haven't hit the limit yet, keep trying. + return errors.Errorf("created %d %ss without hitting limit (max=%d)", objNum, objectType, maxObjects) + }, 5*time.Minute) }) } }) From 4fd7db62ff0641e63291a423c2bd0b24fd9017eb Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Fri, 3 Oct 2025 05:23:54 +0000 Subject: [PATCH 2/2] schematelemetry: update object count gauge using table stats In order to support larger scales of object counts, we switch away from using a full table scan on system.descriptors in order to update the object count gauge. Instead, we use table stats on system.descriptor now. The schematelemetry job is updated so it notifies the stats refresher to keep the stats up to date. Note that the stats refresher is also notified with a partial count already. Release note: None --- pkg/sql/catalog/schematelemetry/BUILD.bazel | 1 + .../schematelemetry/schema_telemetry_job.go | 27 +++++++++++++++++++ pkg/sql/conn_executor.go | 5 ++-- pkg/sql/schema_changer.go | 17 ------------ pkg/sql/schema_changer_metrics.go | 26 ++++++++++++++++++ 5 files changed, 56 insertions(+), 20 deletions(-) diff --git a/pkg/sql/catalog/schematelemetry/BUILD.bazel b/pkg/sql/catalog/schematelemetry/BUILD.bazel index f388982d2e8f..190a8593c0f9 100644 --- a/pkg/sql/catalog/schematelemetry/BUILD.bazel +++ b/pkg/sql/catalog/schematelemetry/BUILD.bazel @@ -12,6 +12,7 @@ go_library( deps = [ "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/keys", "//pkg/scheduledjobs", "//pkg/security/username", "//pkg/server/telemetry", diff --git a/pkg/sql/catalog/schematelemetry/schema_telemetry_job.go b/pkg/sql/catalog/schematelemetry/schema_telemetry_job.go index 6161022b55d9..d8050658e390 100644 --- a/pkg/sql/catalog/schematelemetry/schema_telemetry_job.go +++ b/pkg/sql/catalog/schematelemetry/schema_telemetry_job.go @@ -7,9 +7,11 @@ package schematelemetry import ( "context" + "math" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" @@ -63,6 +65,31 @@ func (t schemaTelemetryResumer) Resume(ctx context.Context, execCtx interface{}) if k := p.ExecCfg().SchemaTelemetryTestingKnobs; k != nil { knobs = *k } + // Notify the stats refresher to update the system.descriptors table stats, + // and update the object count in schema changer metrics. + err := p.ExecCfg().InternalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error { + desc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Table(ctx, keys.DescriptorTableID) + if err != nil { + return err + } + p.ExecCfg().StatsRefresher.NotifyMutation(desc, math.MaxInt64 /* rowCount */) + + // Note: This won't be perfectly up-to-date, but it will make sure the + // metric gets updated periodically. It also gets updated after every + // schema change. + tableStats, err := p.ExecCfg().TableStatsCache.GetTableStats(ctx, desc, nil /* typeResolver */) + if err != nil { + return err + } + if len(tableStats) > 0 { + // Use the row count from the most recent statistic. + p.ExecCfg().SchemaChangerMetrics.ObjectCount.Update(int64(tableStats[0].RowCount)) + } + return nil + }) + if err != nil { + return errors.Wrap(err, "failed to notify stats refresher to update system.descriptors table stats") + } // Outside of tests, scan the catalog tables AS OF SYSTEM TIME slightly in the // past. Schema telemetry is not latency-sensitive to the point where a few diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index d1fbcd6555bf..983758c847d2 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -4166,11 +4166,10 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( if err := ex.waitForInitialVersionForNewDescriptors(cachedRegions); err != nil { return advanceInfo{}, err } - } - if ex.extraTxnState.descCollection.CountUncommittedNewOrDroppedDescriptors() > 0 { + execCfg := ex.planner.ExecCfg() if err := UpdateDescriptorCount(ex.Ctx(), execCfg, execCfg.SchemaChangerMetrics); err != nil { - log.Dev.Warningf(ex.Ctx(), "failed to scan descriptor table: %v", err) + log.Dev.Warningf(ex.Ctx(), "failed to update descriptor count metric: %v", err) } } fallthrough diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 6e33caf208d5..d97c014f5000 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -3574,20 +3574,3 @@ func (p *planner) CanCreateCrossDBSequenceRef() error { } return nil } - -// UpdateDescriptorCount updates our sql.schema_changer.object_count gauge with -// a fresh count of objects in the system.descriptor table. -func UpdateDescriptorCount( - ctx context.Context, execCfg *ExecutorConfig, metric *SchemaChangerMetrics, -) error { - return DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, col *descs.Collection) error { - row, err := txn.QueryRow(ctx, "sql-schema-changer-object-count", txn.KV(), - `SELECT count(*) FROM system.descriptor`) - if err != nil { - return err - } - count := *row[0].(*tree.DInt) - metric.ObjectCount.Update(int64(count)) - return nil - }) -} diff --git a/pkg/sql/schema_changer_metrics.go b/pkg/sql/schema_changer_metrics.go index d37a1237a409..608ef8282b9d 100644 --- a/pkg/sql/schema_changer_metrics.go +++ b/pkg/sql/schema_changer_metrics.go @@ -6,7 +6,11 @@ package sql import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/util/metric" ) @@ -42,3 +46,25 @@ func NewSchemaChangerMetrics() *SchemaChangerMetrics { ObjectCount: metric.NewGauge(metaObjects), } } + +// UpdateDescriptorCount updates our sql.schema_changer.object_count gauge with +// a fresh count of objects in the system.descriptor table. +func UpdateDescriptorCount( + ctx context.Context, execCfg *ExecutorConfig, metric *SchemaChangerMetrics, +) error { + return execCfg.InternalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error { + desc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Table(ctx, keys.DescriptorTableID) + if err != nil { + return err + } + tableStats, err := execCfg.TableStatsCache.GetTableStats(ctx, desc, nil /* typeResolver */) + if err != nil { + return err + } + if len(tableStats) > 0 { + // Use the row count from the most recent statistic. + metric.ObjectCount.Update(int64(tableStats[0].RowCount)) + } + return nil + }) +}