Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions e2e/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
github.com/creasty/defaults v1.8.0 // indirect
github.com/dave/jennifer v1.7.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/envoyproxy/protoc-gen-validate v1.3.0 // indirect
github.com/fatih/structtag v1.2.0 // indirect
Expand All @@ -60,6 +61,7 @@ require (
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/onsi/ginkgo/v2 v2.26.0 // indirect
github.com/onsi/gomega v1.38.2 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240917153116-6f2963f01587 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rs/zerolog v1.34.0 // indirect
Expand Down
5 changes: 4 additions & 1 deletion e2e/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ github.com/brianvoe/gofakeit/v6 v6.28.0 h1:Xib46XXuQfmlLS2EXRuJpqcw8St6qSZz75OUo
github.com/brianvoe/gofakeit/v6 v6.28.0/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs=
github.com/ccoveille/go-safecast/v2 v2.0.0 h1:+5eyITXAUj3wMjad6cRVJKGnC7vDS55zk0INzJagub0=
github.com/ccoveille/go-safecast/v2 v2.0.0/go.mod h1:JIYA4CAR33blIDuE6fSwCp2sz1oOBahXnvmdBhOAABs=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
Expand All @@ -116,6 +115,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dlmiddlecote/sqlstats v1.0.2 h1:gSU11YN23D/iY50A2zVYwgXgy072khatTsIW6UPjUtI=
github.com/dlmiddlecote/sqlstats v1.0.2/go.mod h1:0CWaIh/Th+z2aI6Q9Jpfg/o21zmGxWhbByHgQSCUQvY=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/ecordell/optgen v0.2.3 h1:DXuT9cYRInIJEh/dIOuLPgi7gYXrlfjzV/KsD80CXLE=
github.com/ecordell/optgen v0.2.3/go.mod h1:pqjipFkG6vAwvKgjPGWaZyqmtWAqdb2w6EcTnP+kgqQ=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
Expand Down Expand Up @@ -235,6 +236,8 @@ github.com/onsi/ginkgo/v2 v2.26.0/go.mod h1:qhEywmzWTBUY88kfO0BRvX4py7scov9yR+Az
github.com/onsi/gomega v1.38.2 h1:eZCjf2xjZAqe+LeWvKb5weQ+NcPwX84kqJ0cZNxok2A=
github.com/onsi/gomega v1.38.2/go.mod h1:W2MJcYxRGV63b418Ai34Ud0hEdTVXq9NW9+Sx6uXf3k=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/planetscale/vtprotobuf v0.6.1-0.20240917153116-6f2963f01587 h1:xzZOeCMQLA/W198ZkdVdt4EKFKJtS26B773zNU377ZY=
Expand Down
54 changes: 0 additions & 54 deletions internal/datastore/common/helpers.go
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI all this code got moved to avoid circular imports

Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,15 @@ package common

import (
"context"
"errors"
"fmt"
"strconv"
"strings"

"github.com/dustin/go-humanize"
"github.com/pbnjay/memory"
"google.golang.org/protobuf/types/known/structpb"

"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/tuple"
)

// At startup, measure 75% of available free memory.
var freeMemory uint64

func init() {
freeMemory = memory.FreeMemory() / 100 * 75
}

// WriteRelationships is a convenience method to perform the same update operation on a set of relationships
func WriteRelationships(ctx context.Context, ds datastore.Datastore, op tuple.UpdateOperation, rels ...tuple.Relationship) (datastore.Revision, error) {
updates := make([]tuple.RelationshipUpdate, 0, len(rels))
Expand Down Expand Up @@ -59,45 +47,3 @@ func ContextualizedCaveatFrom(name string, context map[string]any) (*core.Contex
}
return caveat, nil
}

var errOverHundredPercent = errors.New("percentage greater than 100")

func parsePercent(str string, freeMem uint64) (uint64, error) {
percent := strings.TrimSuffix(str, "%")
parsedPercent, err := strconv.ParseUint(percent, 10, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse percentage: %w", err)
}

if parsedPercent > 100 {
return 0, errOverHundredPercent
}

return freeMem / 100 * parsedPercent, nil
}

// WatchBufferSize takes a string and interprets it as
// either a percentage of memory (as a percentage of
// 75% of free memory as measured on startup)
// or a humanized byte string and returns the number of
// bytes or an error if the value cannot be interpreted.
// Returns 0 on an empty string.
func WatchBufferSize(sizeString string) (size uint64, err error) {
if sizeString == "" {
return 0, nil
}

if strings.HasSuffix(sizeString, "%") {
size, err := parsePercent(sizeString, freeMemory)
if err != nil {
return 0, fmt.Errorf("could not parse %s as percentage: %w", sizeString, err)
}
return size, nil
}

size, err = humanize.ParseBytes(sizeString)
if err != nil {
return 0, fmt.Errorf("could not parse %s as a number of bytes: %w", sizeString, err)
}
return size, nil
}
4 changes: 4 additions & 0 deletions internal/datastore/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ func (p *ctxProxy) RevisionFromString(serialized string) (datastore.Revision, er
return p.delegate.RevisionFromString(serialized)
}

func (p *ctxProxy) DefaultsWatchOptions() datastore.WatchOptions {
return p.delegate.DefaultsWatchOptions()
}

func (p *ctxProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) {
return p.delegate.Watch(ctx, afterRevision, options)
}
Expand Down
54 changes: 23 additions & 31 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,24 +180,20 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
config.followerReadDelay,
config.revisionQuantization,
),
CommonDecoder: revisions.CommonDecoder{Kind: revisions.HybridLogicalClock},
MigrationValidator: common.NewMigrationValidator(headMigration, config.allowedMigrations),
dburl: url,
acquireTimeout: config.acquireTimeout,
watchBufferLength: config.watchBufferLength,
watchChangeBufferMaximumSize: config.watchChangeBufferMaximumSize,
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
watchConnectTimeout: config.watchConnectTimeout,
writeOverlapKeyer: keyer,
overlapKeyInit: keySetInit,
beginChangefeedQuery: changefeedQuery,
transactionNowQuery: transactionNowQuery,
analyzeBeforeStatistics: config.analyzeBeforeStatistics,
filterMaximumIDCount: config.filterMaximumIDCount,
supportsIntegrity: config.withIntegrity,
gcWindow: config.gcWindow,
watchEnabled: !config.watchDisabled,
schema: *schema.Schema(config.columnOptimizationOption, config.withIntegrity, false),
CommonDecoder: revisions.CommonDecoder{Kind: revisions.HybridLogicalClock},
MigrationValidator: common.NewMigrationValidator(headMigration, config.allowedMigrations),
dburl: url,
acquireTimeout: config.acquireTimeout,
writeOverlapKeyer: keyer,
overlapKeyInit: keySetInit,
beginChangefeedQuery: changefeedQuery,
transactionNowQuery: transactionNowQuery,
analyzeBeforeStatistics: config.analyzeBeforeStatistics,
filterMaximumIDCount: config.filterMaximumIDCount,
supportsIntegrity: config.withIntegrity,
gcWindow: config.gcWindow,
watchEnabled: !config.watchDisabled,
schema: *schema.Schema(config.columnOptimizationOption, config.withIntegrity, false),
}
ds.SetNowFunc(ds.headRevisionInternal)

Expand Down Expand Up @@ -262,19 +258,15 @@ type crdbDatastore struct {
revisions.CommonDecoder
*common.MigrationValidator

dburl string
readPool, writePool *pool.RetryPool
collectors []prometheus.Collector
watchBufferLength uint16
watchChangeBufferMaximumSize uint64
watchBufferWriteTimeout time.Duration
watchConnectTimeout time.Duration
writeOverlapKeyer overlapKeyer
overlapKeyInit func(ctx context.Context) keySet
analyzeBeforeStatistics bool
gcWindow time.Duration
schema common.SchemaInformation
acquireTimeout time.Duration
dburl string
readPool, writePool *pool.RetryPool
collectors []prometheus.Collector
writeOverlapKeyer overlapKeyer
overlapKeyInit func(ctx context.Context) keySet
analyzeBeforeStatistics bool
gcWindow time.Duration
schema common.SchemaInformation
acquireTimeout time.Duration

beginChangefeedQuery string
transactionNowQuery string
Expand Down
11 changes: 4 additions & 7 deletions internal/datastore/crdb/crdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,14 @@ func crdbTestVersion() string {
func TestCRDBDatastoreWithoutIntegrity(t *testing.T) {
t.Parallel()
b := testdatastore.RunCRDBForTesting(t, "", crdbTestVersion())
test.All(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) {
test.All(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, _ uint16) (datastore.Datastore, error) {
ctx := context.Background()
ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
ds, err := NewCRDBDatastore(
ctx,
uri,
GCWindow(gcWindow),
RevisionQuantization(revisionQuantization),
WatchBufferLength(watchBufferLength),
OverlapStrategy(overlapStrategyPrefix),
DebugAnalyzeBeforeStatistics(),
WithAcquireTimeout(5*time.Second),
Expand Down Expand Up @@ -202,15 +201,14 @@ func TestCRDBDatastoreWithIntegrity(t *testing.T) { //nolint:tparallel
t.Parallel()
b := testdatastore.RunCRDBForTesting(t, "", crdbTestVersion())

test.All(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) {
test.All(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, _ uint16) (datastore.Datastore, error) {
ctx := context.Background()
ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
ds, err := NewCRDBDatastore(
ctx,
uri,
GCWindow(gcWindow),
RevisionQuantization(revisionQuantization),
WatchBufferLength(watchBufferLength),
OverlapStrategy(overlapStrategyPrefix),
DebugAnalyzeBeforeStatistics(),
WithIntegrity(true),
Expand All @@ -229,15 +227,14 @@ func TestCRDBDatastoreWithIntegrity(t *testing.T) { //nolint:tparallel
return ds, nil
}), false)

unwrappedTester := test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) {
unwrappedTester := test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, _ uint16) (datastore.Datastore, error) {
ctx := context.Background()
ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
ds, err := NewCRDBDatastore(
ctx,
uri,
GCWindow(gcWindow),
RevisionQuantization(revisionQuantization),
WatchBufferLength(watchBufferLength),
OverlapStrategy(overlapStrategyPrefix),
DebugAnalyzeBeforeStatistics(),
WithIntegrity(true),
Expand Down Expand Up @@ -331,7 +328,7 @@ func TestWatchFeatureDetection(t *testing.T) {
headRevision, err := ds.HeadRevision(ctx)
require.NoError(t, err)

_, errChan := ds.Watch(ctx, headRevision, datastore.WatchJustRelationships())
_, errChan := ds.Watch(ctx, headRevision, datastore.WatchJustRelationships(ds))
err = <-errChan
require.Error(t, err)
require.Contains(t, err.Error(), "watch is currently disabled")
Expand Down
35 changes: 0 additions & 35 deletions internal/datastore/crdb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ type crdbOptions struct {
readPoolOpts, writePoolOpts pgxcommon.PoolOptions
connectRate time.Duration

watchBufferLength uint16
watchChangeBufferMaximumSize uint64
watchBufferWriteTimeout time.Duration
watchConnectTimeout time.Duration
revisionQuantization time.Duration
followerReadDelay time.Duration
maxRevisionStalenessPercent float64
Expand Down Expand Up @@ -76,9 +72,6 @@ type Option func(*crdbOptions)
func generateConfig(options []Option) (crdbOptions, error) {
computed := crdbOptions{
gcWindow: 24 * time.Hour,
watchBufferLength: defaultWatchBufferLength,
watchBufferWriteTimeout: defaultWatchBufferWriteTimeout,
watchConnectTimeout: defaultWatchConnectTimeout,
revisionQuantization: defaultRevisionQuantization,
followerReadDelay: defaultFollowerReadDelay,
maxRevisionStalenessPercent: defaultMaxRevisionStalenessPercent,
Expand Down Expand Up @@ -251,34 +244,6 @@ func WriteConnsMaxOpen(conns int) Option {
return func(po *crdbOptions) { po.writePoolOpts.MaxOpenConns = &conns }
}

// WatchBufferLength is the number of entries that can be stored in the watch
// buffer while awaiting read by the client.
//
// This value defaults to 128.
func WatchBufferLength(watchBufferLength uint16) Option {
return func(po *crdbOptions) { po.watchBufferLength = watchBufferLength }
}

// WatchBufferWriteTimeout is the maximum timeout for writing to the watch buffer,
// after which the caller to the watch will be disconnected.
func WatchBufferWriteTimeout(watchBufferWriteTimeout time.Duration) Option {
return func(po *crdbOptions) { po.watchBufferWriteTimeout = watchBufferWriteTimeout }
}

// WatchBufferMaximumSize is the maximum size in bytes of the watch buffer.
// If this value is exceeded the caller will receive an error.
func WatchChangeBufferMaximumSize(maxSize uint64) Option {
return func(po *crdbOptions) { po.watchChangeBufferMaximumSize = maxSize }
}

// WatchConnectTimeout is the maximum timeout for connecting the watch stream
// to the datastore.
//
// This value defaults to 1 second.
func WatchConnectTimeout(watchConnectTimeout time.Duration) Option {
return func(po *crdbOptions) { po.watchConnectTimeout = watchConnectTimeout }
}

// RevisionQuantization is the time bucket size to which advertised revisions
// will be rounded.
//
Expand Down
1 change: 0 additions & 1 deletion internal/datastore/crdb/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ func TestTxReset(t *testing.T) {
uri,
GCWindow(24*time.Hour),
RevisionQuantization(5*time.Second),
WatchBufferLength(128),
MaxRetries(tt.maxRetries),
WithAcquireTimeout(5*time.Second),
)
Expand Down
Loading
Loading