Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Fixed
- Regression introduced in 1.49.2: missing spans in ReadSchema calls (https://github.com/authzed/spicedb/pull/2947)
- Long standing bug in the way postgres revisions were being compared. Sometimes revisions that were actually overlapping were erroneously being ordered. (https://github.com/authzed/spicedb/pull/2958)
- Do not register CRDB metrics in `init()` functions (https://github.com/authzed/spicedb/pull/2966)

## [1.49.2] - 2026-03-02
### Added
Expand Down
47 changes: 30 additions & 17 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@
gcWindow: config.gcWindow,
watchEnabled: !config.watchDisabled,
schema: *schema.Schema(config.columnOptimizationOption, config.withIntegrity, false),
healthTracker: healthChecker,
}
ds.SetNowFunc(ds.headRevisionInternal)

Expand Down Expand Up @@ -226,14 +227,14 @@
if config.enableConnectionBalancing {
log.Ctx(initCtx).Info().Msg("starting cockroach connection balancer")
ds.pruneGroup, ds.ctx = errgroup.WithContext(ds.ctx)
writePoolBalancer := pool.NewNodeConnectionBalancer(ds.writePool, healthChecker, 5*time.Second)
readPoolBalancer := pool.NewNodeConnectionBalancer(ds.readPool, healthChecker, 5*time.Second)
ds.writePoolBalancer = pool.NewNodeConnectionBalancer(ds.writePool, healthChecker, 5*time.Second)
ds.readPoolBalancer = pool.NewNodeConnectionBalancer(ds.readPool, healthChecker, 5*time.Second)
ds.pruneGroup.Go(func() error {
writePoolBalancer.Prune(ds.ctx)
ds.writePoolBalancer.Prune(ds.ctx)
return nil
})
ds.pruneGroup.Go(func() error {
readPoolBalancer.Prune(ds.ctx)
ds.readPoolBalancer.Prune(ds.ctx)
return nil
})
ds.pruneGroup.Go(func() error {
Expand Down Expand Up @@ -261,19 +262,20 @@
revisions.CommonDecoder
*common.MigrationValidator

dburl string
readPool, writePool *pool.RetryPool
collectors []prometheus.Collector
watchBufferLength uint16
watchChangeBufferMaximumSize uint64
watchBufferWriteTimeout time.Duration
watchConnectTimeout time.Duration
writeOverlapKeyer overlapKeyer
overlapKeyInit func(ctx context.Context) keySet
analyzeBeforeStatistics bool
gcWindow time.Duration
schema common.SchemaInformation
acquireTimeout time.Duration
dburl string
readPool, writePool *pool.RetryPool
readPoolBalancer, writePoolBalancer *pool.NodeConnectionBalancer
collectors []prometheus.Collector
watchBufferLength uint16
watchChangeBufferMaximumSize uint64
watchBufferWriteTimeout time.Duration
watchConnectTimeout time.Duration
writeOverlapKeyer overlapKeyer
overlapKeyInit func(ctx context.Context) keySet
analyzeBeforeStatistics bool
gcWindow time.Duration
schema common.SchemaInformation
acquireTimeout time.Duration

beginChangefeedQuery string
transactionNowQuery string
Expand All @@ -289,6 +291,8 @@
supportsIntegrity bool
watchEnabled bool

healthTracker *pool.NodeHealthTracker

uniqueID atomic.Pointer[string]
}

Expand Down Expand Up @@ -481,6 +485,15 @@
}
cds.readPool.Close()
cds.writePool.Close()
if cds.writePoolBalancer != nil {
cds.writePoolBalancer.Close()
}

Check warning on line 490 in internal/datastore/crdb/crdb.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/crdb/crdb.go#L490

Added line #L490 was not covered by tests
if cds.readPoolBalancer != nil {
cds.readPoolBalancer.Close()
}

Check warning on line 493 in internal/datastore/crdb/crdb.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/crdb/crdb.go#L493

Added line #L493 was not covered by tests
if cds.healthTracker != nil {
cds.healthTracker.Close()
}

Check warning on line 496 in internal/datastore/crdb/crdb.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/crdb/crdb.go#L496

Added line #L496 was not covered by tests
for _, collector := range cds.collectors {
ok := prometheus.Unregister(collector)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/crdb/crdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ func TestRegisterPrometheusCollectors(t *testing.T) {

writePoolConfig, err := pgxpool.ParseConfig(fmt.Sprintf("postgres://db:password@pg.example.com:5432/mydb?pool_max_conns=%d", writeMaxConns))
require.NoError(t, err)
writePool, err := pool.NewRetryPool(t.Context(), "read", writePoolConfig, nil, 18, 20)
writePool, err := pool.NewRetryPool(t.Context(), "write", writePoolConfig, nil, 18, 20)
require.NoError(t, err)

// Create datastore with those pools
Expand Down
25 changes: 13 additions & 12 deletions internal/datastore/crdb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@ type crdbOptions struct {
readPoolOpts, writePoolOpts pgxcommon.PoolOptions
connectRate time.Duration

watchBufferLength uint16
watchChangeBufferMaximumSize uint64
watchBufferWriteTimeout time.Duration
watchConnectTimeout time.Duration
revisionQuantization time.Duration
followerReadDelay time.Duration
maxRevisionStalenessPercent float64
gcWindow time.Duration
maxRetries uint8
overlapStrategy string
overlapKey string
enableConnectionBalancing bool
watchBufferLength uint16
watchChangeBufferMaximumSize uint64
watchBufferWriteTimeout time.Duration
watchConnectTimeout time.Duration
revisionQuantization time.Duration
followerReadDelay time.Duration
maxRevisionStalenessPercent float64
gcWindow time.Duration
maxRetries uint8
overlapStrategy string
overlapKey string
enableConnectionBalancing bool

analyzeBeforeStatistics bool
filterMaximumIDCount uint16
enablePrometheusStats bool
Expand Down
67 changes: 39 additions & 28 deletions internal/datastore/crdb/pool/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,6 @@
"github.com/authzed/spicedb/pkg/genutil"
)

var (
connectionsPerCRDBNodeCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "crdb_connections_per_node",
Help: "The number of active connections SpiceDB holds to each CockroachDB node, by pool (read/write). Imbalanced values across nodes suggest the connection balancer is unable to redistribute connections evenly.",
}, []string{"pool", "node_id"})

pruningTimeHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "crdb_pruning_duration",
Help: "Duration in milliseconds of one iteration of the CockroachDB connection balancer pruning excess connections from over-represented nodes. Elevated values indicate the balancer is struggling to rebalance connections.",
Buckets: []float64{.1, .2, .5, 1, 2, 5, 10, 20, 50, 100},
}, []string{"pool"})
)

func init() {
prometheus.MustRegister(connectionsPerCRDBNodeCountGauge)
prometheus.MustRegister(pruningTimeHistogram)
}

type balancePoolConn[C balanceConn] interface {
Conn() C
Release()
Expand Down Expand Up @@ -81,6 +63,9 @@
healthTracker *NodeHealthTracker
rnd *rand.Rand
seed int64

connectionsPerCRDBNodeCountGauge *prometheus.GaugeVec
pruningTimeHistogram prometheus.Histogram
}

// newNodeConnectionBalancer is generic over underlying connection types for
Expand All @@ -99,12 +84,35 @@
seed = 0
}
}
var (
connectionsPerCRDBNodeCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "crdb_connections_per_node",
ConstLabels: prometheus.Labels{
"pool": pool.ID(),
},
Help: "The number of active connections SpiceDB holds to each CockroachDB node, by pool (read/write). Imbalanced values across nodes suggest the connection balancer is unable to redistribute connections evenly.",
}, []string{"node_id"})

Check warning on line 95 in internal/datastore/crdb/pool/balancer.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/crdb/pool/balancer.go#L95

Added line #L95 was not covered by tests
pruningTimeHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "crdb_pruning_duration",
ConstLabels: prometheus.Labels{
"pool": pool.ID(),
},
Help: "Duration in milliseconds of one iteration of the CockroachDB connection balancer pruning excess connections from over-represented nodes. Elevated values indicate the balancer is struggling to rebalance connections.",
Buckets: []float64{.1, .2, .5, 1, 2, 5, 10, 20, 50, 100},
})
)

Check warning on line 105 in internal/datastore/crdb/pool/balancer.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/crdb/pool/balancer.go#L105

Added line #L105 was not covered by tests
prometheus.MustRegister(connectionsPerCRDBNodeCountGauge)
prometheus.MustRegister(pruningTimeHistogram)
return &nodeConnectionBalancer[P, C]{
ticker: time.NewTicker(interval),
sem: semaphore.NewWeighted(1),
healthTracker: healthTracker,
pool: pool,
seed: seed,
ticker: time.NewTicker(interval),
sem: semaphore.NewWeighted(1),
healthTracker: healthTracker,
pool: pool,
seed: seed,
connectionsPerCRDBNodeCountGauge: connectionsPerCRDBNodeCountGauge,
pruningTimeHistogram: pruningTimeHistogram,
// nolint:gosec
// use of non cryptographically secure random number generator is not concern here,
// as it's used for shuffling the nodes to balance the connections when the number of
Expand All @@ -113,6 +121,11 @@
}
}

func (p *nodeConnectionBalancer[P, C]) Close() {
prometheus.Unregister(p.connectionsPerCRDBNodeCountGauge)
prometheus.Unregister(p.pruningTimeHistogram)
}

// Prune starts periodically checking idle connections and killing ones that are determined to be unbalanced.
func (p *nodeConnectionBalancer[P, C]) Prune(ctx context.Context) {
for {
Expand All @@ -137,7 +150,7 @@
func (p *nodeConnectionBalancer[P, C]) mustPruneConnections(ctx context.Context) {
start := time.Now()
defer func() {
pruningTimeHistogram.WithLabelValues(p.pool.ID()).Observe(float64(time.Since(start).Milliseconds()))
p.pruningTimeHistogram.Observe(float64(time.Since(start).Milliseconds()))

Check warning on line 153 in internal/datastore/crdb/pool/balancer.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/crdb/pool/balancer.go#L153

Added line #L153 was not covered by tests
}()
conns := p.pool.AcquireAllIdle(ctx)
defer func() {
Expand Down Expand Up @@ -182,8 +195,7 @@
p.healthTracker.RLock()
for node := range p.healthTracker.nodesEverSeen {
if _, ok := connectionCounts[node]; !ok {
connectionsPerCRDBNodeCountGauge.DeletePartialMatch(map[string]string{
"pool": p.pool.ID(),
p.connectionsPerCRDBNodeCountGauge.DeletePartialMatch(map[string]string{

Check warning on line 198 in internal/datastore/crdb/pool/balancer.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/crdb/pool/balancer.go#L198

Added line #L198 was not covered by tests
"node_id": strconv.FormatUint(uint64(node), 10),
})
}
Expand All @@ -205,8 +217,7 @@
initialPerNodeMax := p.pool.MaxConns() / nodeCount
for i, node := range nodes {
count := connectionCounts[node]
connectionsPerCRDBNodeCountGauge.WithLabelValues(
p.pool.ID(),
p.connectionsPerCRDBNodeCountGauge.WithLabelValues(

Check warning on line 220 in internal/datastore/crdb/pool/balancer.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/crdb/pool/balancer.go#L220

Added line #L220 was not covered by tests
strconv.FormatUint(uint64(node), 10),
).Set(float64(count))

Expand Down
3 changes: 3 additions & 0 deletions internal/datastore/crdb/pool/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ func TestNodeConnectionBalancerPrune(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
tracker, err := NewNodeHealthChecker("")
require.NoError(t, err)
t.Cleanup(func() {
tracker.Close()
})
for _, n := range tt.nodes {
tracker.healthyNodes[n] = struct{}{}
}
Expand Down
42 changes: 25 additions & 17 deletions internal/datastore/crdb/pool/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"context"
"fmt"
"math/rand"
"sync"
"time"
Expand All @@ -18,26 +19,18 @@

const errorBurst = 2

var healthyCRDBNodeCountGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "crdb_healthy_nodes",
Help: "the number of healthy crdb nodes detected by spicedb",
})

func init() {
prometheus.MustRegister(healthyCRDBNodeCountGauge)
}

// NodeHealthTracker detects changes in the node pool by polling the cluster periodically and recording
// the node ids that are seen. This is used to detect new nodes that come online that have either previously
// been marked unhealthy due to connection errors or due to scale up.
//
// Consumers can manually mark a node healthy or unhealthy as well.
type NodeHealthTracker struct {
sync.RWMutex
connConfig *pgx.ConnConfig
healthyNodes map[uint32]struct{} // GUARDED_BY(RWMutex)
nodesEverSeen map[uint32]*rate.Limiter // GUARDED_BY(RWMutex)
newLimiter func() *rate.Limiter
connConfig *pgx.ConnConfig
healthyNodes map[uint32]struct{} // GUARDED_BY(RWMutex)
nodesEverSeen map[uint32]*rate.Limiter // GUARDED_BY(RWMutex)
newLimiter func() *rate.Limiter
healthyCRDBNodeCountGauge prometheus.Gauge
}

// NewNodeHealthChecker builds a health checker that polls the cluster at the given url.
Expand All @@ -47,16 +40,31 @@
return nil, err
}

healthyCRDBNodeCountGauge := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "crdb_healthy_nodes",
Help: "the number of healthy crdb nodes detected by spicedb",
})

Check warning on line 47 in internal/datastore/crdb/pool/health.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/crdb/pool/health.go#L47

Added line #L47 was not covered by tests
err = prometheus.Register(healthyCRDBNodeCountGauge)
if err != nil {
return nil, fmt.Errorf("failed to register crdb healthy nodes metric: %w", err)
}

Check warning on line 51 in internal/datastore/crdb/pool/health.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/crdb/pool/health.go#L50-L51

Added lines #L50 - L51 were not covered by tests

return &NodeHealthTracker{
connConfig: connConfig,
healthyNodes: make(map[uint32]struct{}, 0),
nodesEverSeen: make(map[uint32]*rate.Limiter, 0),
healthyCRDBNodeCountGauge: healthyCRDBNodeCountGauge,
connConfig: connConfig,
healthyNodes: make(map[uint32]struct{}, 0),
nodesEverSeen: make(map[uint32]*rate.Limiter, 0),
newLimiter: func() *rate.Limiter {
return rate.NewLimiter(rate.Every(1*time.Minute), errorBurst)
},
}, nil
}

func (t *NodeHealthTracker) Close() {
_ = prometheus.Unregister(t.healthyCRDBNodeCountGauge)
}

// Poll starts polling the cluster and recording the node IDs that it sees.
func (t *NodeHealthTracker) Poll(ctx context.Context, interval time.Duration) {
ticker := jitterbug.New(interval, jitterbug.Uniform{
Expand Down Expand Up @@ -104,7 +112,7 @@
t.Lock()
defer t.Unlock()
defer func() {
healthyCRDBNodeCountGauge.Set(float64(len(t.healthyNodes)))
t.healthyCRDBNodeCountGauge.Set(float64(len(t.healthyNodes)))
}()

if _, ok := t.nodesEverSeen[nodeID]; !ok {
Expand Down
14 changes: 5 additions & 9 deletions internal/datastore/crdb/pool/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,16 @@ package pool

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
)

func TestNodeHealthTracker(t *testing.T) {
tracker := &NodeHealthTracker{
healthyNodes: make(map[uint32]struct{}),
nodesEverSeen: make(map[uint32]*rate.Limiter),
newLimiter: func() *rate.Limiter {
return rate.NewLimiter(rate.Every(1*time.Minute), 2)
},
}
tracker, err := NewNodeHealthChecker("postgres://user:password@localhost:5432/dbname")
require.NoError(t, err)
t.Cleanup(func() {
tracker.Close()
})

tracker.SetNodeHealth(1, true)
require.True(t, tracker.IsHealthy(1))
Expand Down
Loading
Loading