diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ada4af36..e4af8f70d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### Fixed - Regression introduced in 1.49.2: missing spans in ReadSchema calls (https://github.com/authzed/spicedb/pull/2947) - Long standing bug in the way postgres revisions were being compared. Sometimes revisions that were actually overlapping were erroneously being ordered. (https://github.com/authzed/spicedb/pull/2958) +- Do not register CRDB metrics in `init()` functions (https://github.com/authzed/spicedb/pull/2966) ## [1.49.2] - 2026-03-02 ### Added diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index 70fc8e20f..6a42f9d6a 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -197,6 +197,7 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas gcWindow: config.gcWindow, watchEnabled: !config.watchDisabled, schema: *schema.Schema(config.columnOptimizationOption, config.withIntegrity, false), + healthTracker: healthChecker, } ds.SetNowFunc(ds.headRevisionInternal) @@ -226,14 +227,14 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas if config.enableConnectionBalancing { log.Ctx(initCtx).Info().Msg("starting cockroach connection balancer") ds.pruneGroup, ds.ctx = errgroup.WithContext(ds.ctx) - writePoolBalancer := pool.NewNodeConnectionBalancer(ds.writePool, healthChecker, 5*time.Second) - readPoolBalancer := pool.NewNodeConnectionBalancer(ds.readPool, healthChecker, 5*time.Second) + ds.writePoolBalancer = pool.NewNodeConnectionBalancer(ds.writePool, healthChecker, 5*time.Second) + ds.readPoolBalancer = pool.NewNodeConnectionBalancer(ds.readPool, healthChecker, 5*time.Second) ds.pruneGroup.Go(func() error { - writePoolBalancer.Prune(ds.ctx) + ds.writePoolBalancer.Prune(ds.ctx) return nil }) ds.pruneGroup.Go(func() error { - readPoolBalancer.Prune(ds.ctx) + ds.readPoolBalancer.Prune(ds.ctx) return nil }) ds.pruneGroup.Go(func() error { @@ -261,19 +262,20 @@ type crdbDatastore struct { revisions.CommonDecoder *common.MigrationValidator - dburl string - readPool, writePool *pool.RetryPool - collectors []prometheus.Collector - watchBufferLength uint16 - watchChangeBufferMaximumSize uint64 - watchBufferWriteTimeout time.Duration - watchConnectTimeout time.Duration - writeOverlapKeyer overlapKeyer - overlapKeyInit func(ctx context.Context) keySet - analyzeBeforeStatistics bool - gcWindow time.Duration - schema common.SchemaInformation - acquireTimeout time.Duration + dburl string + readPool, writePool *pool.RetryPool + readPoolBalancer, writePoolBalancer *pool.NodeConnectionBalancer + collectors []prometheus.Collector + watchBufferLength uint16 + watchChangeBufferMaximumSize uint64 + watchBufferWriteTimeout time.Duration + watchConnectTimeout time.Duration + writeOverlapKeyer overlapKeyer + overlapKeyInit func(ctx context.Context) keySet + analyzeBeforeStatistics bool + gcWindow time.Duration + schema common.SchemaInformation + acquireTimeout time.Duration beginChangefeedQuery string transactionNowQuery string @@ -289,6 +291,8 @@ type crdbDatastore struct { supportsIntegrity bool watchEnabled bool + healthTracker *pool.NodeHealthTracker + uniqueID atomic.Pointer[string] } @@ -481,6 +485,15 @@ func (cds *crdbDatastore) Close() error { } cds.readPool.Close() cds.writePool.Close() + if cds.writePoolBalancer != nil { + cds.writePoolBalancer.Close() + } + if cds.readPoolBalancer != nil { + cds.readPoolBalancer.Close() + } + if cds.healthTracker != nil { + cds.healthTracker.Close() + } for _, collector := range cds.collectors { ok := prometheus.Unregister(collector) if !ok { diff --git a/internal/datastore/crdb/crdb_test.go b/internal/datastore/crdb/crdb_test.go index f1e3d0c28..8104074bb 100644 --- a/internal/datastore/crdb/crdb_test.go +++ b/internal/datastore/crdb/crdb_test.go @@ -904,7 +904,7 @@ func TestRegisterPrometheusCollectors(t *testing.T) { writePoolConfig, err := pgxpool.ParseConfig(fmt.Sprintf("postgres://db:password@pg.example.com:5432/mydb?pool_max_conns=%d", writeMaxConns)) require.NoError(t, err) - writePool, err := pool.NewRetryPool(t.Context(), "read", writePoolConfig, nil, 18, 20) + writePool, err := pool.NewRetryPool(t.Context(), "write", writePoolConfig, nil, 18, 20) require.NoError(t, err) // Create datastore with those pools diff --git a/internal/datastore/crdb/options.go b/internal/datastore/crdb/options.go index 9e745fc21..e2716488c 100644 --- a/internal/datastore/crdb/options.go +++ b/internal/datastore/crdb/options.go @@ -15,18 +15,19 @@ type crdbOptions struct { readPoolOpts, writePoolOpts pgxcommon.PoolOptions connectRate time.Duration - watchBufferLength uint16 - watchChangeBufferMaximumSize uint64 - watchBufferWriteTimeout time.Duration - watchConnectTimeout time.Duration - revisionQuantization time.Duration - followerReadDelay time.Duration - maxRevisionStalenessPercent float64 - gcWindow time.Duration - maxRetries uint8 - overlapStrategy string - overlapKey string - enableConnectionBalancing bool + watchBufferLength uint16 + watchChangeBufferMaximumSize uint64 + watchBufferWriteTimeout time.Duration + watchConnectTimeout time.Duration + revisionQuantization time.Duration + followerReadDelay time.Duration + maxRevisionStalenessPercent float64 + gcWindow time.Duration + maxRetries uint8 + overlapStrategy string + overlapKey string + enableConnectionBalancing bool + analyzeBeforeStatistics bool filterMaximumIDCount uint16 enablePrometheusStats bool diff --git a/internal/datastore/crdb/pool/balancer.go b/internal/datastore/crdb/pool/balancer.go index dbbbd23d2..261e7d39b 100644 --- a/internal/datastore/crdb/pool/balancer.go +++ b/internal/datastore/crdb/pool/balancer.go @@ -20,24 +20,6 @@ import ( "github.com/authzed/spicedb/pkg/genutil" ) -var ( - connectionsPerCRDBNodeCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "crdb_connections_per_node", - Help: "The number of active connections SpiceDB holds to each CockroachDB node, by pool (read/write). Imbalanced values across nodes suggest the connection balancer is unable to redistribute connections evenly.", - }, []string{"pool", "node_id"}) - - pruningTimeHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: "crdb_pruning_duration", - Help: "Duration in milliseconds of one iteration of the CockroachDB connection balancer pruning excess connections from over-represented nodes. Elevated values indicate the balancer is struggling to rebalance connections.", - Buckets: []float64{.1, .2, .5, 1, 2, 5, 10, 20, 50, 100}, - }, []string{"pool"}) -) - -func init() { - prometheus.MustRegister(connectionsPerCRDBNodeCountGauge) - prometheus.MustRegister(pruningTimeHistogram) -} - type balancePoolConn[C balanceConn] interface { Conn() C Release() @@ -81,6 +63,9 @@ type nodeConnectionBalancer[P balancePoolConn[C], C balanceConn] struct { healthTracker *NodeHealthTracker rnd *rand.Rand seed int64 + + connectionsPerCRDBNodeCountGauge *prometheus.GaugeVec + pruningTimeHistogram prometheus.Histogram } // newNodeConnectionBalancer is generic over underlying connection types for @@ -99,12 +84,35 @@ func newNodeConnectionBalancer[P balancePoolConn[C], C balanceConn](pool balance seed = 0 } } + var ( + connectionsPerCRDBNodeCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "crdb_connections_per_node", + ConstLabels: prometheus.Labels{ + "pool": pool.ID(), + }, + Help: "The number of active connections SpiceDB holds to each CockroachDB node, by pool (read/write). Imbalanced values across nodes suggest the connection balancer is unable to redistribute connections evenly.", + }, []string{"node_id"}) + + pruningTimeHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "crdb_pruning_duration", + ConstLabels: prometheus.Labels{ + "pool": pool.ID(), + }, + Help: "Duration in milliseconds of one iteration of the CockroachDB connection balancer pruning excess connections from over-represented nodes. Elevated values indicate the balancer is struggling to rebalance connections.", + Buckets: []float64{.1, .2, .5, 1, 2, 5, 10, 20, 50, 100}, + }) + ) + + prometheus.MustRegister(connectionsPerCRDBNodeCountGauge) + prometheus.MustRegister(pruningTimeHistogram) return &nodeConnectionBalancer[P, C]{ - ticker: time.NewTicker(interval), - sem: semaphore.NewWeighted(1), - healthTracker: healthTracker, - pool: pool, - seed: seed, + ticker: time.NewTicker(interval), + sem: semaphore.NewWeighted(1), + healthTracker: healthTracker, + pool: pool, + seed: seed, + connectionsPerCRDBNodeCountGauge: connectionsPerCRDBNodeCountGauge, + pruningTimeHistogram: pruningTimeHistogram, // nolint:gosec // use of non cryptographically secure random number generator is not concern here, // as it's used for shuffling the nodes to balance the connections when the number of @@ -113,6 +121,11 @@ func newNodeConnectionBalancer[P balancePoolConn[C], C balanceConn](pool balance } } +func (p *nodeConnectionBalancer[P, C]) Close() { + prometheus.Unregister(p.connectionsPerCRDBNodeCountGauge) + prometheus.Unregister(p.pruningTimeHistogram) +} + // Prune starts periodically checking idle connections and killing ones that are determined to be unbalanced. func (p *nodeConnectionBalancer[P, C]) Prune(ctx context.Context) { for { @@ -137,7 +150,7 @@ func (p *nodeConnectionBalancer[P, C]) Prune(ctx context.Context) { func (p *nodeConnectionBalancer[P, C]) mustPruneConnections(ctx context.Context) { start := time.Now() defer func() { - pruningTimeHistogram.WithLabelValues(p.pool.ID()).Observe(float64(time.Since(start).Milliseconds())) + p.pruningTimeHistogram.Observe(float64(time.Since(start).Milliseconds())) }() conns := p.pool.AcquireAllIdle(ctx) defer func() { @@ -182,8 +195,7 @@ func (p *nodeConnectionBalancer[P, C]) mustPruneConnections(ctx context.Context) p.healthTracker.RLock() for node := range p.healthTracker.nodesEverSeen { if _, ok := connectionCounts[node]; !ok { - connectionsPerCRDBNodeCountGauge.DeletePartialMatch(map[string]string{ - "pool": p.pool.ID(), + p.connectionsPerCRDBNodeCountGauge.DeletePartialMatch(map[string]string{ "node_id": strconv.FormatUint(uint64(node), 10), }) } @@ -205,8 +217,7 @@ func (p *nodeConnectionBalancer[P, C]) mustPruneConnections(ctx context.Context) initialPerNodeMax := p.pool.MaxConns() / nodeCount for i, node := range nodes { count := connectionCounts[node] - connectionsPerCRDBNodeCountGauge.WithLabelValues( - p.pool.ID(), + p.connectionsPerCRDBNodeCountGauge.WithLabelValues( strconv.FormatUint(uint64(node), 10), ).Set(float64(count)) diff --git a/internal/datastore/crdb/pool/balancer_test.go b/internal/datastore/crdb/pool/balancer_test.go index 230eafa8c..c98586041 100644 --- a/internal/datastore/crdb/pool/balancer_test.go +++ b/internal/datastore/crdb/pool/balancer_test.go @@ -141,6 +141,9 @@ func TestNodeConnectionBalancerPrune(t *testing.T) { t.Run(tt.name, func(t *testing.T) { tracker, err := NewNodeHealthChecker("") require.NoError(t, err) + t.Cleanup(func() { + tracker.Close() + }) for _, n := range tt.nodes { tracker.healthyNodes[n] = struct{}{} } diff --git a/internal/datastore/crdb/pool/health.go b/internal/datastore/crdb/pool/health.go index d1c511b94..0ae5bcc5a 100644 --- a/internal/datastore/crdb/pool/health.go +++ b/internal/datastore/crdb/pool/health.go @@ -2,6 +2,7 @@ package pool import ( "context" + "fmt" "math/rand" "sync" "time" @@ -18,15 +19,6 @@ import ( const errorBurst = 2 -var healthyCRDBNodeCountGauge = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "crdb_healthy_nodes", - Help: "the number of healthy crdb nodes detected by spicedb", -}) - -func init() { - prometheus.MustRegister(healthyCRDBNodeCountGauge) -} - // NodeHealthTracker detects changes in the node pool by polling the cluster periodically and recording // the node ids that are seen. This is used to detect new nodes that come online that have either previously // been marked unhealthy due to connection errors or due to scale up. @@ -34,10 +26,11 @@ func init() { // Consumers can manually mark a node healthy or unhealthy as well. type NodeHealthTracker struct { sync.RWMutex - connConfig *pgx.ConnConfig - healthyNodes map[uint32]struct{} // GUARDED_BY(RWMutex) - nodesEverSeen map[uint32]*rate.Limiter // GUARDED_BY(RWMutex) - newLimiter func() *rate.Limiter + connConfig *pgx.ConnConfig + healthyNodes map[uint32]struct{} // GUARDED_BY(RWMutex) + nodesEverSeen map[uint32]*rate.Limiter // GUARDED_BY(RWMutex) + newLimiter func() *rate.Limiter + healthyCRDBNodeCountGauge prometheus.Gauge } // NewNodeHealthChecker builds a health checker that polls the cluster at the given url. @@ -47,16 +40,31 @@ func NewNodeHealthChecker(url string) (*NodeHealthTracker, error) { return nil, err } + healthyCRDBNodeCountGauge := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "crdb_healthy_nodes", + Help: "the number of healthy crdb nodes detected by spicedb", + }) + + err = prometheus.Register(healthyCRDBNodeCountGauge) + if err != nil { + return nil, fmt.Errorf("failed to register crdb healthy nodes metric: %w", err) + } + return &NodeHealthTracker{ - connConfig: connConfig, - healthyNodes: make(map[uint32]struct{}, 0), - nodesEverSeen: make(map[uint32]*rate.Limiter, 0), + healthyCRDBNodeCountGauge: healthyCRDBNodeCountGauge, + connConfig: connConfig, + healthyNodes: make(map[uint32]struct{}, 0), + nodesEverSeen: make(map[uint32]*rate.Limiter, 0), newLimiter: func() *rate.Limiter { return rate.NewLimiter(rate.Every(1*time.Minute), errorBurst) }, }, nil } +func (t *NodeHealthTracker) Close() { + _ = prometheus.Unregister(t.healthyCRDBNodeCountGauge) +} + // Poll starts polling the cluster and recording the node IDs that it sees. func (t *NodeHealthTracker) Poll(ctx context.Context, interval time.Duration) { ticker := jitterbug.New(interval, jitterbug.Uniform{ @@ -104,7 +112,7 @@ func (t *NodeHealthTracker) SetNodeHealth(nodeID uint32, healthy bool) { t.Lock() defer t.Unlock() defer func() { - healthyCRDBNodeCountGauge.Set(float64(len(t.healthyNodes))) + t.healthyCRDBNodeCountGauge.Set(float64(len(t.healthyNodes))) }() if _, ok := t.nodesEverSeen[nodeID]; !ok { diff --git a/internal/datastore/crdb/pool/health_test.go b/internal/datastore/crdb/pool/health_test.go index 3894a0895..3ff42d7de 100644 --- a/internal/datastore/crdb/pool/health_test.go +++ b/internal/datastore/crdb/pool/health_test.go @@ -2,20 +2,16 @@ package pool import ( "testing" - "time" "github.com/stretchr/testify/require" - "golang.org/x/time/rate" ) func TestNodeHealthTracker(t *testing.T) { - tracker := &NodeHealthTracker{ - healthyNodes: make(map[uint32]struct{}), - nodesEverSeen: make(map[uint32]*rate.Limiter), - newLimiter: func() *rate.Limiter { - return rate.NewLimiter(rate.Every(1*time.Minute), 2) - }, - } + tracker, err := NewNodeHealthChecker("postgres://user:password@localhost:5432/dbname") + require.NoError(t, err) + t.Cleanup(func() { + tracker.Close() + }) tracker.SetNodeHealth(1, true) require.True(t, tracker.IsHealthy(1)) diff --git a/internal/datastore/crdb/pool/pool.go b/internal/datastore/crdb/pool/pool.go index 4699b33bd..360f7c265 100644 --- a/internal/datastore/crdb/pool/pool.go +++ b/internal/datastore/crdb/pool/pool.go @@ -28,16 +28,6 @@ type pgxPool interface { Stat() *pgxpool.Stat } -var resetHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "crdb_client_resets", - Help: "Distribution of the number of client-side transaction restarts per transaction attempt. Restarts occur when CockroachDB returns a serialization failure (40001) and the driver retries the transaction from scratch. Sustained high values indicate transaction contention.", - Buckets: []float64{0, 1, 2, 5, 10, 20, 50}, -}) - -func init() { - prometheus.MustRegister(resetHistogram) -} - type ctxDisableRetries struct{} var ( @@ -46,9 +36,10 @@ var ( ) type RetryPool struct { - pool pgxPool - id string - healthTracker *NodeHealthTracker + pool pgxPool + id string + healthTracker *NodeHealthTracker + resetHistogram prometheus.Histogram sync.RWMutex maxRetries uint8 @@ -144,6 +135,20 @@ func NewRetryPool(ctx context.Context, name string, config *pgxpool.Config, heal return nil, err } + p.resetHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "crdb_client_resets", + ConstLabels: prometheus.Labels{ + "pool_name": name, // this is needed to avoid "duplicate metrics collector registration attempted" + }, + Help: "Distribution of the number of client-side transaction restarts per transaction attempt. Restarts occur when CockroachDB returns a serialization failure (40001) and the driver retries the transaction from scratch. Sustained high values indicate transaction contention.", + Buckets: []float64{0, 1, 2, 5, 10, 20, 50}, + }) + + err = prometheus.Register(p.resetHistogram) + if err != nil { + return nil, fmt.Errorf("error registering CRDB reset histogram: %w", err) + } + p.pool = pool return p, nil } @@ -253,6 +258,7 @@ func (p *RetryPool) Config() *pgxpool.Config { // Close closes the underlying pgxpool.Pool func (p *RetryPool) Close() { p.pool.Close() + prometheus.Unregister(p.resetHistogram) } // Stat returns the underlying pgxpool.Pool stats @@ -315,7 +321,7 @@ func (p *RetryPool) withRetries(ctx context.Context, acquireTimeout time.Duratio var retries uint8 defer func() { - resetHistogram.Observe(float64(retries)) + p.resetHistogram.Observe(float64(retries)) }() maxRetries := p.maxRetries diff --git a/internal/datastore/crdb/pool/pool_test.go b/internal/datastore/crdb/pool/pool_test.go index b7d5ef640..2451d9ccc 100644 --- a/internal/datastore/crdb/pool/pool_test.go +++ b/internal/datastore/crdb/pool/pool_test.go @@ -7,11 +7,9 @@ import ( "testing/synctest" "time" - "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/time/rate" ) // TestPool implements pgxPool interface for testing @@ -53,7 +51,12 @@ func NewTestPool() *TestPool { return nil }, configFunc: func() *pgxpool.Config { - return &pgxpool.Config{} + url := "postgres://jack:secret@localhost:5432/mydb?sslmode=verify-ca&pool_max_conns=10&pool_max_conn_lifetime=1h30m" + c, err := pgxpool.ParseConfig(url) + if err != nil { + panic(err) + } + return c }, closeFunc: func() {}, statFunc: func() *pgxpool.Stat { @@ -63,21 +66,19 @@ func NewTestPool() *TestPool { } // createTestRetryPool creates a RetryPool for testing with dependency injection -func createTestRetryPool(testPool *TestPool) *RetryPool { - return &RetryPool{ - pool: testPool, - id: "test-pool", - healthTracker: &NodeHealthTracker{ - healthyNodes: make(map[uint32]struct{}), - nodesEverSeen: make(map[uint32]*rate.Limiter), - newLimiter: func() *rate.Limiter { - return rate.NewLimiter(rate.Every(1*time.Minute), 2) - }, - }, - maxRetries: 3, - nodeForConn: make(map[*pgx.Conn]uint32), - gc: make(map[*pgx.Conn]struct{}), - } +func createTestRetryPool(t *testing.T, testPool *TestPool) *RetryPool { + ht, err := NewNodeHealthChecker("postgres://user:password@localhost:5432/dbname") + require.NoError(t, err) + t.Cleanup(func() { + ht.Close() + }) + + retrypool, err := NewRetryPool(t.Context(), "name", testPool.Config(), ht, 3, 0) + require.NoError(t, err) + t.Cleanup(func() { + retrypool.Close() + }) + return retrypool } func TestContextCancelledDuringBlockingAcquire(t *testing.T) { @@ -90,7 +91,7 @@ func TestContextCancelledDuringBlockingAcquire(t *testing.T) { return nil, ctx.Err() } - retryPool := createTestRetryPool(testPool) + retryPool := createTestRetryPool(t, testPool) ctx, cancel := context.WithCancel(t.Context()) // Cancel the context after a short delay @@ -125,7 +126,7 @@ func TestAcquireTimeoutReturnsErrAcquire(t *testing.T) { } } - retryPool := createTestRetryPool(testPool) + retryPool := createTestRetryPool(t, testPool) ctx := t.Context() acquireTimeout := 50 * time.Millisecond @@ -146,7 +147,7 @@ func TestAcquireSucceedsButTopLevelContextCancelled(t *testing.T) { t.Parallel() testPool := NewTestPool() - retryPool := createTestRetryPool(testPool) + retryPool := createTestRetryPool(t, testPool) ctx, cancel := context.WithCancel(t.Context()) cancel() // Cancel immediately @@ -168,7 +169,7 @@ func TestAcquireErrorWithConnectionReturned(t *testing.T) { return &pgxpool.Conn{}, errors.New("pool exhausted") } - retryPool := createTestRetryPool(testPool) + retryPool := createTestRetryPool(t, testPool) ctx := context.Background() err := retryPool.withRetries(ctx, 0, func(conn *pgxpool.Conn) error { @@ -198,7 +199,7 @@ func TestAcquireSucceedsWithinTimeout(t *testing.T) { } } - retryPool := createTestRetryPool(testPool) + retryPool := createTestRetryPool(t, testPool) ctx := t.Context() acquireTimeout := 50 * time.Millisecond functionCalled := false @@ -225,7 +226,7 @@ func TestNoAcquireTimeoutUsesOriginalContext(t *testing.T) { return &pgxpool.Conn{}, nil } - retryPool := createTestRetryPool(testPool) + retryPool := createTestRetryPool(t, testPool) originalCtx := context.Background() err := retryPool.withRetries(originalCtx, 0, func(conn *pgxpool.Conn) error { @@ -247,7 +248,7 @@ func TestAcquireTimeoutCreatesSeparateContext(t *testing.T) { return &pgxpool.Conn{}, nil } - retryPool := createTestRetryPool(testPool) + retryPool := createTestRetryPool(t, testPool) originalCtx := context.Background() acquireTimeout := 50 * time.Millisecond startTime := time.Now() @@ -277,7 +278,7 @@ func TestAcquireTimeoutContextCausePreserved(t *testing.T) { return nil, ctx.Err() } - retryPool := createTestRetryPool(testPool) + retryPool := createTestRetryPool(t, testPool) ctx := t.Context() acquireTimeout := 10 * time.Millisecond @@ -301,7 +302,7 @@ func TestSuccessfulFunctionExecution(t *testing.T) { return &pgxpool.Conn{}, nil } - retryPool := createTestRetryPool(testPool) + retryPool := createTestRetryPool(t, testPool) ctx := t.Context() functionCalled := false diff --git a/internal/datastore/proxy/schemacaching/watchingcache.go b/internal/datastore/proxy/schemacaching/watchingcache.go index d69b85e6b..fed5aa7ac 100644 --- a/internal/datastore/proxy/schemacaching/watchingcache.go +++ b/internal/datastore/proxy/schemacaching/watchingcache.go @@ -56,10 +56,6 @@ var definitionsReadTotalCounter = prometheus.NewCounterVec(prometheus.CounterOpt const maximumRetryCount = 10 -func init() { - prometheus.MustRegister(namespacesFallbackModeGauge, caveatsFallbackModeGauge, schemaCacheRevisionGauge, definitionsReadCachedCounter, definitionsReadTotalCounter) -} - // watchingCachingProxy is a datastore proxy that caches schema (namespaces and caveat definitions) // and updates its cache via a WatchSchema call. If the supplied datastore to be wrapped does not support // this API, or the data is not available in this case or an error occurs, the updating cache fallsback @@ -74,6 +70,7 @@ type watchingCachingProxy struct { namespaceCache *schemaWatchCache[*core.NamespaceDefinition] caveatCache *schemaWatchCache[*core.CaveatDefinition] + metrics []prometheus.Collector } // createWatchingCacheProxy creates and returns a watching cache proxy. @@ -83,7 +80,12 @@ func createWatchingCacheProxy(delegate datastore.Datastore, c cache.Cache[cache. c: c, } + metrics := []prometheus.Collector{namespacesFallbackModeGauge, caveatsFallbackModeGauge, schemaCacheRevisionGauge, definitionsReadCachedCounter, definitionsReadTotalCounter} + for _, metric := range metrics { + _ = prometheus.Register(metric) + } proxy := &watchingCachingProxy{ + metrics: metrics, Datastore: delegate, fallbackCache: fallbackCache, @@ -354,6 +356,9 @@ func (p *watchingCachingProxy) startSync(ctx context.Context) error { // Close stops all resources. // The caller must have canceled the context passed to Start. func (p *watchingCachingProxy) Close() error { + for _, metric := range p.metrics { + prometheus.Unregister(metric) + } p.caveatCache.setFallbackMode() p.namespaceCache.setFallbackMode() diff --git a/internal/services/steelthreadtesting/steelthread_test.go b/internal/services/steelthreadtesting/steelthread_test.go index ffc1eef55..53017a6a9 100644 --- a/internal/services/steelthreadtesting/steelthread_test.go +++ b/internal/services/steelthreadtesting/steelthread_test.go @@ -56,6 +56,9 @@ func TestNonMemdbSteelThreads(t *testing.T) { dsconfig.WithMaxRetries(50), dsconfig.WithExperimentalColumnOptimization(true), dsconfig.WithWriteAcquisitionTimeout(5*time.Second))) + t.Cleanup(func() { + _ = ds.Close() + }) ds = indexcheck.WrapWithIndexCheckingDatastoreProxyIfApplicable(ds) runSteelThreadTest(t, tc, ds)