Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,21 +273,17 @@ func (tc *Collection) IsVersionBumpOfUncommittedDescriptor(id descpb.ID) bool {
return tc.uncommitted.versionBumpOnly[id]
}

// HasUncommittedNewOrDroppedDescriptors returns true if the collection contains
// any uncommitted descriptors that are newly created or dropped.
func (tc *Collection) HasUncommittedNewOrDroppedDescriptors() bool {
isNewDescriptor := false
err := tc.uncommitted.iterateUncommittedByID(func(desc catalog.Descriptor) error {
// CountUncommittedNewOrDroppedDescriptors returns the number of uncommitted
// descriptors that are newly created or dropped.
func (tc *Collection) CountUncommittedNewOrDroppedDescriptors() int {
count := 0
_ = tc.uncommitted.iterateUncommittedByID(func(desc catalog.Descriptor) error {
if desc.GetVersion() == 1 || desc.Dropped() {
isNewDescriptor = true
return iterutil.StopIteration()
count++
}
return nil
})
if err != nil {
return false
}
return isNewDescriptor
return count
}

// HasUncommittedTypes returns true if the Collection contains uncommitted
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/schematelemetry/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
deps = [
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/scheduledjobs",
"//pkg/security/username",
"//pkg/server/telemetry",
Expand Down
27 changes: 27 additions & 0 deletions pkg/sql/catalog/schematelemetry/schema_telemetry_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ package schematelemetry

import (
"context"
"math"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -63,6 +65,31 @@ func (t schemaTelemetryResumer) Resume(ctx context.Context, execCtx interface{})
if k := p.ExecCfg().SchemaTelemetryTestingKnobs; k != nil {
knobs = *k
}
// Notify the stats refresher to update the system.descriptors table stats,
// and update the object count in schema changer metrics.
err := p.ExecCfg().InternalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
desc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Table(ctx, keys.DescriptorTableID)
if err != nil {
return err
}
p.ExecCfg().StatsRefresher.NotifyMutation(desc, math.MaxInt64 /* rowCount */)

// Note: This won't be perfectly up-to-date, but it will make sure the
// metric gets updated periodically. It also gets updated after every
// schema change.
tableStats, err := p.ExecCfg().TableStatsCache.GetTableStats(ctx, desc, nil /* typeResolver */)
if err != nil {
return err
}
if len(tableStats) > 0 {
// Use the row count from the most recent statistic.
p.ExecCfg().SchemaChangerMetrics.ObjectCount.Update(int64(tableStats[0].RowCount))
}
return nil
})
if err != nil {
return errors.Wrap(err, "failed to notify stats refresher to update system.descriptors table stats")
}

// Outside of tests, scan the catalog tables AS OF SYSTEM TIME slightly in the
// past. Schema telemetry is not latency-sensitive to the point where a few
Expand Down
15 changes: 12 additions & 3 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4166,11 +4166,10 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
if err := ex.waitForInitialVersionForNewDescriptors(cachedRegions); err != nil {
return advanceInfo{}, err
}
}
if ex.extraTxnState.descCollection.HasUncommittedNewOrDroppedDescriptors() {

execCfg := ex.planner.ExecCfg()
if err := UpdateDescriptorCount(ex.Ctx(), execCfg, execCfg.SchemaChangerMetrics); err != nil {
log.Dev.Warningf(ex.Ctx(), "failed to scan descriptor table: %v", err)
log.Dev.Warningf(ex.Ctx(), "failed to update descriptor count metric: %v", err)
}
}
fallthrough
Expand Down Expand Up @@ -4569,6 +4568,16 @@ func (ex *connExecutor) notifyStatsRefresherOfNewTables(ctx context.Context) {
ex.planner.execCfg.StatsRefresher.NotifyMutation(desc, math.MaxInt32 /* rowsAffected */)
}
}
if cnt := ex.extraTxnState.descCollection.CountUncommittedNewOrDroppedDescriptors(); cnt > 0 {
// Notify the refresher of a mutation on the system.descriptor table.
// We conservatively assume that any transaction which creates or
desc, err := ex.extraTxnState.descCollection.ByIDWithLeased(ex.planner.txn).Get().Table(ctx, keys.DescriptorTableID)
if err != nil {
log.Dev.Warningf(ctx, "failed to fetch descriptor table to refresh stats: %v", err)
return
}
ex.planner.execCfg.StatsRefresher.NotifyMutation(desc, cnt)
}
}

func (ex *connExecutor) getDescIDGenerator() eval.DescIDGenerator {
Expand Down
17 changes: 0 additions & 17 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3574,20 +3574,3 @@ func (p *planner) CanCreateCrossDBSequenceRef() error {
}
return nil
}

// UpdateDescriptorCount updates our sql.schema_changer.object_count gauge with
// a fresh count of objects in the system.descriptor table.
func UpdateDescriptorCount(
ctx context.Context, execCfg *ExecutorConfig, metric *SchemaChangerMetrics,
) error {
return DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, col *descs.Collection) error {
row, err := txn.QueryRow(ctx, "sql-schema-changer-object-count", txn.KV(),
`SELECT count(*) FROM system.descriptor`)
if err != nil {
return err
}
count := *row[0].(*tree.DInt)
metric.ObjectCount.Update(int64(count))
return nil
})
}
26 changes: 26 additions & 0 deletions pkg/sql/schema_changer_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
package sql

import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/util/metric"
)
Expand Down Expand Up @@ -42,3 +46,25 @@ func NewSchemaChangerMetrics() *SchemaChangerMetrics {
ObjectCount: metric.NewGauge(metaObjects),
}
}

// UpdateDescriptorCount updates our sql.schema_changer.object_count gauge with
// a fresh count of objects in the system.descriptor table.
func UpdateDescriptorCount(
ctx context.Context, execCfg *ExecutorConfig, metric *SchemaChangerMetrics,
) error {
return execCfg.InternalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
desc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Table(ctx, keys.DescriptorTableID)
if err != nil {
return err
}
tableStats, err := execCfg.TableStatsCache.GetTableStats(ctx, desc, nil /* typeResolver */)
if err != nil {
return err
}
if len(tableStats) > 0 {
// Use the row count from the most recent statistic.
metric.ObjectCount.Update(int64(tableStats[0].RowCount))
}
return nil
})
}
60 changes: 28 additions & 32 deletions pkg/sql/schemachanger/schemachanger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,9 @@ func TestApproxMaxSchemaObjects(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderDuress(t, "slow test, requires polling to wait for auto stats job")
skip.UnderShort(t, "slow test, requires polling to wait for auto stats job")

ctx := context.Background()

// Test with both declarative schema changer modes.
Expand All @@ -1174,40 +1177,41 @@ func TestApproxMaxSchemaObjects(t *testing.T) {
defer s.Stopper().Stop(ctx)

tdb := sqlutils.MakeSQLRunner(sqlDB)
tdb.Exec(t, "SET CLUSTER SETTING sql.stats.automatic_collection.min_stale_rows = 1")

// Configure the declarative schema changer mode.
if useDeclarative {
tdb.Exec(t, `SET sql.defaults.use_declarative_schema_changer = 'on'`)
tdb.Exec(t, `SET use_declarative_schema_changer = 'on'`)
} else {
tdb.Exec(t, `SET sql.defaults.use_declarative_schema_changer = 'off'`)
tdb.Exec(t, `SET use_declarative_schema_changer = 'off'`)
}

// Get the current count of descriptors to set a realistic limit.
var currentCount int
tdb.QueryRow(t, `SELECT count(*) FROM system.descriptor`).Scan(&currentCount)
var maxObjects int
updateMaxObjects := func() {
// Manually refresh stats.
tdb.Exec(t, "ANALYZE system.public.descriptor")

// Set the limit to be slightly more than current count.
maxObjects := currentCount + 5
tdb.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING sql.schema.approx_max_object_count = %d`, maxObjects))
// Get the current count of descriptors to set a realistic limit.
var currentCount int
tdb.QueryRow(t, `SELECT count(*) FROM system.descriptor`).Scan(&currentCount)

// Create a test database and use it.
tdb.Exec(t, `CREATE DATABASE testdb`)
tdb.Exec(t, `USE testdb`)
// Set the limit to be slightly more than current count.
maxObjects = currentCount + 1
tdb.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING sql.schema.approx_max_object_count = %d`, maxObjects))
}
updateMaxObjects()

// Test that different object types are subject to the limit.
objectTypes := []string{"table", "database", "schema", "type", "function"}
for _, objectType := range objectTypes {
t.Run(objectType, func(t *testing.T) {
// Increase the limit before each subtest to avoid interference.
maxObjects += 10
tdb.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING sql.schema.approx_max_object_count = %d`, maxObjects))

// Try to create objects until we hit the limit.
// ANALYZE before starting to ensure stats are fresh.
tdb.Exec(t, `ANALYZE system.descriptor`)
updateMaxObjects()

objNum := 0
for {
testutils.SucceedsWithin(t, func() error {
var createStmt string
switch objectType {
case "table":
Expand All @@ -1224,30 +1228,22 @@ func TestApproxMaxSchemaObjects(t *testing.T) {

_, err := sqlDB.Exec(createStmt)
if err != nil {
// Check if we got the expected error.
// Check if we got the expected error and message.
if pqErr := (*pq.Error)(nil); errors.As(err, &pqErr) {
if string(pqErr.Code) == pgcode.ConfigurationLimitExceeded.String() {
// Verify the error message mentions "approximate maximum".
testutils.IsError(err, "would exceed approximate maximum")
break
if testutils.IsError(err, "would exceed approximate maximum") {
return nil
}
}
}
// Some other error occurred.
t.Fatal(err)
return err
}
objNum++

// Re-analyze periodically to update stats.
if objNum%2 == 0 {
tdb.Exec(t, `ANALYZE system.descriptor`)
}

// Safety check: if we created way more objects than expected,
// something is wrong.
if objNum > 30 {
t.Fatalf("created %d %ss without hitting limit (max=%d)", objNum, objectType, maxObjects)
}
}
// Haven't hit the limit yet, keep trying.
return errors.Errorf("created %d %ss without hitting limit (max=%d)", objNum, objectType, maxObjects)
}, 5*time.Minute)
})
}
})
Expand Down