diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index bd77d9c7ba00..1bd83146bcaf 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -273,21 +273,17 @@ func (tc *Collection) IsVersionBumpOfUncommittedDescriptor(id descpb.ID) bool { return tc.uncommitted.versionBumpOnly[id] } -// HasUncommittedNewOrDroppedDescriptors returns true if the collection contains -// any uncommitted descriptors that are newly created or dropped. -func (tc *Collection) HasUncommittedNewOrDroppedDescriptors() bool { - isNewDescriptor := false - err := tc.uncommitted.iterateUncommittedByID(func(desc catalog.Descriptor) error { +// CountUncommittedNewOrDroppedDescriptors returns the number of uncommitted +// descriptors that are newly created or dropped. +func (tc *Collection) CountUncommittedNewOrDroppedDescriptors() int { + count := 0 + _ = tc.uncommitted.iterateUncommittedByID(func(desc catalog.Descriptor) error { if desc.GetVersion() == 1 || desc.Dropped() { - isNewDescriptor = true - return iterutil.StopIteration() + count++ } return nil }) - if err != nil { - return false - } - return isNewDescriptor + return count } // HasUncommittedTypes returns true if the Collection contains uncommitted diff --git a/pkg/sql/catalog/schematelemetry/BUILD.bazel b/pkg/sql/catalog/schematelemetry/BUILD.bazel index f388982d2e8f..190a8593c0f9 100644 --- a/pkg/sql/catalog/schematelemetry/BUILD.bazel +++ b/pkg/sql/catalog/schematelemetry/BUILD.bazel @@ -12,6 +12,7 @@ go_library( deps = [ "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/keys", "//pkg/scheduledjobs", "//pkg/security/username", "//pkg/server/telemetry", diff --git a/pkg/sql/catalog/schematelemetry/schema_telemetry_job.go b/pkg/sql/catalog/schematelemetry/schema_telemetry_job.go index 6161022b55d9..d8050658e390 100644 --- a/pkg/sql/catalog/schematelemetry/schema_telemetry_job.go +++ b/pkg/sql/catalog/schematelemetry/schema_telemetry_job.go @@ -7,9 +7,11 @@ package schematelemetry import ( "context" + "math" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" @@ -63,6 +65,31 @@ func (t schemaTelemetryResumer) Resume(ctx context.Context, execCtx interface{}) if k := p.ExecCfg().SchemaTelemetryTestingKnobs; k != nil { knobs = *k } + // Notify the stats refresher to update the system.descriptors table stats, + // and update the object count in schema changer metrics. + err := p.ExecCfg().InternalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error { + desc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Table(ctx, keys.DescriptorTableID) + if err != nil { + return err + } + p.ExecCfg().StatsRefresher.NotifyMutation(desc, math.MaxInt64 /* rowCount */) + + // Note: This won't be perfectly up-to-date, but it will make sure the + // metric gets updated periodically. It also gets updated after every + // schema change. + tableStats, err := p.ExecCfg().TableStatsCache.GetTableStats(ctx, desc, nil /* typeResolver */) + if err != nil { + return err + } + if len(tableStats) > 0 { + // Use the row count from the most recent statistic. + p.ExecCfg().SchemaChangerMetrics.ObjectCount.Update(int64(tableStats[0].RowCount)) + } + return nil + }) + if err != nil { + return errors.Wrap(err, "failed to notify stats refresher to update system.descriptors table stats") + } // Outside of tests, scan the catalog tables AS OF SYSTEM TIME slightly in the // past. Schema telemetry is not latency-sensitive to the point where a few diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 75f258545b5f..983758c847d2 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -4166,11 +4166,10 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( if err := ex.waitForInitialVersionForNewDescriptors(cachedRegions); err != nil { return advanceInfo{}, err } - } - if ex.extraTxnState.descCollection.HasUncommittedNewOrDroppedDescriptors() { + execCfg := ex.planner.ExecCfg() if err := UpdateDescriptorCount(ex.Ctx(), execCfg, execCfg.SchemaChangerMetrics); err != nil { - log.Dev.Warningf(ex.Ctx(), "failed to scan descriptor table: %v", err) + log.Dev.Warningf(ex.Ctx(), "failed to update descriptor count metric: %v", err) } } fallthrough @@ -4569,6 +4568,16 @@ func (ex *connExecutor) notifyStatsRefresherOfNewTables(ctx context.Context) { ex.planner.execCfg.StatsRefresher.NotifyMutation(desc, math.MaxInt32 /* rowsAffected */) } } + if cnt := ex.extraTxnState.descCollection.CountUncommittedNewOrDroppedDescriptors(); cnt > 0 { + // Notify the refresher of a mutation on the system.descriptor table. + // We conservatively assume that any transaction which creates or + desc, err := ex.extraTxnState.descCollection.ByIDWithLeased(ex.planner.txn).Get().Table(ctx, keys.DescriptorTableID) + if err != nil { + log.Dev.Warningf(ctx, "failed to fetch descriptor table to refresh stats: %v", err) + return + } + ex.planner.execCfg.StatsRefresher.NotifyMutation(desc, cnt) + } } func (ex *connExecutor) getDescIDGenerator() eval.DescIDGenerator { diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 6e33caf208d5..d97c014f5000 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -3574,20 +3574,3 @@ func (p *planner) CanCreateCrossDBSequenceRef() error { } return nil } - -// UpdateDescriptorCount updates our sql.schema_changer.object_count gauge with -// a fresh count of objects in the system.descriptor table. -func UpdateDescriptorCount( - ctx context.Context, execCfg *ExecutorConfig, metric *SchemaChangerMetrics, -) error { - return DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, col *descs.Collection) error { - row, err := txn.QueryRow(ctx, "sql-schema-changer-object-count", txn.KV(), - `SELECT count(*) FROM system.descriptor`) - if err != nil { - return err - } - count := *row[0].(*tree.DInt) - metric.ObjectCount.Update(int64(count)) - return nil - }) -} diff --git a/pkg/sql/schema_changer_metrics.go b/pkg/sql/schema_changer_metrics.go index d37a1237a409..608ef8282b9d 100644 --- a/pkg/sql/schema_changer_metrics.go +++ b/pkg/sql/schema_changer_metrics.go @@ -6,7 +6,11 @@ package sql import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/util/metric" ) @@ -42,3 +46,25 @@ func NewSchemaChangerMetrics() *SchemaChangerMetrics { ObjectCount: metric.NewGauge(metaObjects), } } + +// UpdateDescriptorCount updates our sql.schema_changer.object_count gauge with +// a fresh count of objects in the system.descriptor table. +func UpdateDescriptorCount( + ctx context.Context, execCfg *ExecutorConfig, metric *SchemaChangerMetrics, +) error { + return execCfg.InternalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error { + desc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Table(ctx, keys.DescriptorTableID) + if err != nil { + return err + } + tableStats, err := execCfg.TableStatsCache.GetTableStats(ctx, desc, nil /* typeResolver */) + if err != nil { + return err + } + if len(tableStats) > 0 { + // Use the row count from the most recent statistic. + metric.ObjectCount.Update(int64(tableStats[0].RowCount)) + } + return nil + }) +} diff --git a/pkg/sql/schemachanger/schemachanger_test.go b/pkg/sql/schemachanger/schemachanger_test.go index 478e82f20604..31bc0044dab5 100644 --- a/pkg/sql/schemachanger/schemachanger_test.go +++ b/pkg/sql/schemachanger/schemachanger_test.go @@ -1165,6 +1165,9 @@ func TestApproxMaxSchemaObjects(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.UnderDuress(t, "slow test, requires polling to wait for auto stats job") + skip.UnderShort(t, "slow test, requires polling to wait for auto stats job") + ctx := context.Background() // Test with both declarative schema changer modes. @@ -1174,40 +1177,41 @@ func TestApproxMaxSchemaObjects(t *testing.T) { defer s.Stopper().Stop(ctx) tdb := sqlutils.MakeSQLRunner(sqlDB) + tdb.Exec(t, "SET CLUSTER SETTING sql.stats.automatic_collection.min_stale_rows = 1") // Configure the declarative schema changer mode. if useDeclarative { + tdb.Exec(t, `SET sql.defaults.use_declarative_schema_changer = 'on'`) tdb.Exec(t, `SET use_declarative_schema_changer = 'on'`) } else { + tdb.Exec(t, `SET sql.defaults.use_declarative_schema_changer = 'off'`) tdb.Exec(t, `SET use_declarative_schema_changer = 'off'`) } - // Get the current count of descriptors to set a realistic limit. - var currentCount int - tdb.QueryRow(t, `SELECT count(*) FROM system.descriptor`).Scan(¤tCount) + var maxObjects int + updateMaxObjects := func() { + // Manually refresh stats. + tdb.Exec(t, "ANALYZE system.public.descriptor") - // Set the limit to be slightly more than current count. - maxObjects := currentCount + 5 - tdb.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING sql.schema.approx_max_object_count = %d`, maxObjects)) + // Get the current count of descriptors to set a realistic limit. + var currentCount int + tdb.QueryRow(t, `SELECT count(*) FROM system.descriptor`).Scan(¤tCount) - // Create a test database and use it. - tdb.Exec(t, `CREATE DATABASE testdb`) - tdb.Exec(t, `USE testdb`) + // Set the limit to be slightly more than current count. + maxObjects = currentCount + 1 + tdb.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING sql.schema.approx_max_object_count = %d`, maxObjects)) + } + updateMaxObjects() // Test that different object types are subject to the limit. objectTypes := []string{"table", "database", "schema", "type", "function"} for _, objectType := range objectTypes { t.Run(objectType, func(t *testing.T) { // Increase the limit before each subtest to avoid interference. - maxObjects += 10 - tdb.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING sql.schema.approx_max_object_count = %d`, maxObjects)) - - // Try to create objects until we hit the limit. - // ANALYZE before starting to ensure stats are fresh. - tdb.Exec(t, `ANALYZE system.descriptor`) + updateMaxObjects() objNum := 0 - for { + testutils.SucceedsWithin(t, func() error { var createStmt string switch objectType { case "table": @@ -1224,30 +1228,22 @@ func TestApproxMaxSchemaObjects(t *testing.T) { _, err := sqlDB.Exec(createStmt) if err != nil { - // Check if we got the expected error. + // Check if we got the expected error and message. if pqErr := (*pq.Error)(nil); errors.As(err, &pqErr) { if string(pqErr.Code) == pgcode.ConfigurationLimitExceeded.String() { - // Verify the error message mentions "approximate maximum". - testutils.IsError(err, "would exceed approximate maximum") - break + if testutils.IsError(err, "would exceed approximate maximum") { + return nil + } } } // Some other error occurred. - t.Fatal(err) + return err } objNum++ - // Re-analyze periodically to update stats. - if objNum%2 == 0 { - tdb.Exec(t, `ANALYZE system.descriptor`) - } - - // Safety check: if we created way more objects than expected, - // something is wrong. - if objNum > 30 { - t.Fatalf("created %d %ss without hitting limit (max=%d)", objNum, objectType, maxObjects) - } - } + // Haven't hit the limit yet, keep trying. + return errors.Errorf("created %d %ss without hitting limit (max=%d)", objNum, objectType, maxObjects) + }, 5*time.Minute) }) } })