diff --git a/e2e/go.mod b/e2e/go.mod index 3e65e2ea4..63bc38afa 100644 --- a/e2e/go.mod +++ b/e2e/go.mod @@ -43,6 +43,7 @@ require ( github.com/creasty/defaults v1.8.0 // indirect github.com/dave/jennifer v1.7.1 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/emirpasic/gods v1.18.1 // indirect github.com/envoyproxy/protoc-gen-validate v1.3.0 // indirect github.com/fatih/structtag v1.2.0 // indirect @@ -60,6 +61,7 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/onsi/ginkgo/v2 v2.26.0 // indirect github.com/onsi/gomega v1.38.2 // indirect + github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240917153116-6f2963f01587 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/rs/zerolog v1.34.0 // indirect diff --git a/e2e/go.sum b/e2e/go.sum index f1a0c0814..d0bfda53c 100644 --- a/e2e/go.sum +++ b/e2e/go.sum @@ -89,7 +89,6 @@ github.com/brianvoe/gofakeit/v6 v6.28.0 h1:Xib46XXuQfmlLS2EXRuJpqcw8St6qSZz75OUo github.com/brianvoe/gofakeit/v6 v6.28.0/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs= github.com/ccoveille/go-safecast/v2 v2.0.0 h1:+5eyITXAUj3wMjad6cRVJKGnC7vDS55zk0INzJagub0= github.com/ccoveille/go-safecast/v2 v2.0.0/go.mod h1:JIYA4CAR33blIDuE6fSwCp2sz1oOBahXnvmdBhOAABs= -github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -116,6 +115,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dlmiddlecote/sqlstats v1.0.2 h1:gSU11YN23D/iY50A2zVYwgXgy072khatTsIW6UPjUtI= github.com/dlmiddlecote/sqlstats v1.0.2/go.mod h1:0CWaIh/Th+z2aI6Q9Jpfg/o21zmGxWhbByHgQSCUQvY= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/ecordell/optgen v0.2.3 h1:DXuT9cYRInIJEh/dIOuLPgi7gYXrlfjzV/KsD80CXLE= github.com/ecordell/optgen v0.2.3/go.mod h1:pqjipFkG6vAwvKgjPGWaZyqmtWAqdb2w6EcTnP+kgqQ= github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= @@ -235,6 +236,8 @@ github.com/onsi/ginkgo/v2 v2.26.0/go.mod h1:qhEywmzWTBUY88kfO0BRvX4py7scov9yR+Az github.com/onsi/gomega v1.38.2 h1:eZCjf2xjZAqe+LeWvKb5weQ+NcPwX84kqJ0cZNxok2A= github.com/onsi/gomega v1.38.2/go.mod h1:W2MJcYxRGV63b418Ai34Ud0hEdTVXq9NW9+Sx6uXf3k= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= +github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/planetscale/vtprotobuf v0.6.1-0.20240917153116-6f2963f01587 h1:xzZOeCMQLA/W198ZkdVdt4EKFKJtS26B773zNU377ZY= diff --git a/internal/datastore/common/helpers.go b/internal/datastore/common/helpers.go index 2d0d0a9da..8f341340a 100644 --- a/internal/datastore/common/helpers.go +++ b/internal/datastore/common/helpers.go @@ -2,13 +2,8 @@ package common import ( "context" - "errors" "fmt" - "strconv" - "strings" - "github.com/dustin/go-humanize" - "github.com/pbnjay/memory" "google.golang.org/protobuf/types/known/structpb" "github.com/authzed/spicedb/pkg/datastore" @@ -16,13 +11,6 @@ import ( "github.com/authzed/spicedb/pkg/tuple" ) -// At startup, measure 75% of available free memory. -var freeMemory uint64 - -func init() { - freeMemory = memory.FreeMemory() / 100 * 75 -} - // WriteRelationships is a convenience method to perform the same update operation on a set of relationships func WriteRelationships(ctx context.Context, ds datastore.Datastore, op tuple.UpdateOperation, rels ...tuple.Relationship) (datastore.Revision, error) { updates := make([]tuple.RelationshipUpdate, 0, len(rels)) @@ -59,45 +47,3 @@ func ContextualizedCaveatFrom(name string, context map[string]any) (*core.Contex } return caveat, nil } - -var errOverHundredPercent = errors.New("percentage greater than 100") - -func parsePercent(str string, freeMem uint64) (uint64, error) { - percent := strings.TrimSuffix(str, "%") - parsedPercent, err := strconv.ParseUint(percent, 10, 64) - if err != nil { - return 0, fmt.Errorf("failed to parse percentage: %w", err) - } - - if parsedPercent > 100 { - return 0, errOverHundredPercent - } - - return freeMem / 100 * parsedPercent, nil -} - -// WatchBufferSize takes a string and interprets it as -// either a percentage of memory (as a percentage of -// 75% of free memory as measured on startup) -// or a humanized byte string and returns the number of -// bytes or an error if the value cannot be interpreted. -// Returns 0 on an empty string. -func WatchBufferSize(sizeString string) (size uint64, err error) { - if sizeString == "" { - return 0, nil - } - - if strings.HasSuffix(sizeString, "%") { - size, err := parsePercent(sizeString, freeMemory) - if err != nil { - return 0, fmt.Errorf("could not parse %s as percentage: %w", sizeString, err) - } - return size, nil - } - - size, err = humanize.ParseBytes(sizeString) - if err != nil { - return 0, fmt.Errorf("could not parse %s as a number of bytes: %w", sizeString, err) - } - return size, nil -} diff --git a/internal/datastore/context.go b/internal/datastore/context.go index 110870097..71f9c1e06 100644 --- a/internal/datastore/context.go +++ b/internal/datastore/context.go @@ -66,6 +66,10 @@ func (p *ctxProxy) RevisionFromString(serialized string) (datastore.Revision, er return p.delegate.RevisionFromString(serialized) } +func (p *ctxProxy) DefaultsWatchOptions() datastore.WatchOptions { + return p.delegate.DefaultsWatchOptions() +} + func (p *ctxProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { return p.delegate.Watch(ctx, afterRevision, options) } diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index c47bd3e55..063029271 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -180,24 +180,20 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas config.followerReadDelay, config.revisionQuantization, ), - CommonDecoder: revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}, - MigrationValidator: common.NewMigrationValidator(headMigration, config.allowedMigrations), - dburl: url, - acquireTimeout: config.acquireTimeout, - watchBufferLength: config.watchBufferLength, - watchChangeBufferMaximumSize: config.watchChangeBufferMaximumSize, - watchBufferWriteTimeout: config.watchBufferWriteTimeout, - watchConnectTimeout: config.watchConnectTimeout, - writeOverlapKeyer: keyer, - overlapKeyInit: keySetInit, - beginChangefeedQuery: changefeedQuery, - transactionNowQuery: transactionNowQuery, - analyzeBeforeStatistics: config.analyzeBeforeStatistics, - filterMaximumIDCount: config.filterMaximumIDCount, - supportsIntegrity: config.withIntegrity, - gcWindow: config.gcWindow, - watchEnabled: !config.watchDisabled, - schema: *schema.Schema(config.columnOptimizationOption, config.withIntegrity, false), + CommonDecoder: revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}, + MigrationValidator: common.NewMigrationValidator(headMigration, config.allowedMigrations), + dburl: url, + acquireTimeout: config.acquireTimeout, + writeOverlapKeyer: keyer, + overlapKeyInit: keySetInit, + beginChangefeedQuery: changefeedQuery, + transactionNowQuery: transactionNowQuery, + analyzeBeforeStatistics: config.analyzeBeforeStatistics, + filterMaximumIDCount: config.filterMaximumIDCount, + supportsIntegrity: config.withIntegrity, + gcWindow: config.gcWindow, + watchEnabled: !config.watchDisabled, + schema: *schema.Schema(config.columnOptimizationOption, config.withIntegrity, false), } ds.SetNowFunc(ds.headRevisionInternal) @@ -262,19 +258,15 @@ type crdbDatastore struct { revisions.CommonDecoder *common.MigrationValidator - dburl string - readPool, writePool *pool.RetryPool - collectors []prometheus.Collector - watchBufferLength uint16 - watchChangeBufferMaximumSize uint64 - watchBufferWriteTimeout time.Duration - watchConnectTimeout time.Duration - writeOverlapKeyer overlapKeyer - overlapKeyInit func(ctx context.Context) keySet - analyzeBeforeStatistics bool - gcWindow time.Duration - schema common.SchemaInformation - acquireTimeout time.Duration + dburl string + readPool, writePool *pool.RetryPool + collectors []prometheus.Collector + writeOverlapKeyer overlapKeyer + overlapKeyInit func(ctx context.Context) keySet + analyzeBeforeStatistics bool + gcWindow time.Duration + schema common.SchemaInformation + acquireTimeout time.Duration beginChangefeedQuery string transactionNowQuery string diff --git a/internal/datastore/crdb/crdb_test.go b/internal/datastore/crdb/crdb_test.go index e2b43de72..29bc8e4a6 100644 --- a/internal/datastore/crdb/crdb_test.go +++ b/internal/datastore/crdb/crdb_test.go @@ -74,7 +74,7 @@ func crdbTestVersion() string { func TestCRDBDatastoreWithoutIntegrity(t *testing.T) { t.Parallel() b := testdatastore.RunCRDBForTesting(t, "", crdbTestVersion()) - test.All(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) { + test.All(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, _ uint16) (datastore.Datastore, error) { ctx := context.Background() ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { ds, err := NewCRDBDatastore( @@ -82,7 +82,6 @@ func TestCRDBDatastoreWithoutIntegrity(t *testing.T) { uri, GCWindow(gcWindow), RevisionQuantization(revisionQuantization), - WatchBufferLength(watchBufferLength), OverlapStrategy(overlapStrategyPrefix), DebugAnalyzeBeforeStatistics(), WithAcquireTimeout(5*time.Second), @@ -202,7 +201,7 @@ func TestCRDBDatastoreWithIntegrity(t *testing.T) { //nolint:tparallel t.Parallel() b := testdatastore.RunCRDBForTesting(t, "", crdbTestVersion()) - test.All(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) { + test.All(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, _ uint16) (datastore.Datastore, error) { ctx := context.Background() ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { ds, err := NewCRDBDatastore( @@ -210,7 +209,6 @@ func TestCRDBDatastoreWithIntegrity(t *testing.T) { //nolint:tparallel uri, GCWindow(gcWindow), RevisionQuantization(revisionQuantization), - WatchBufferLength(watchBufferLength), OverlapStrategy(overlapStrategyPrefix), DebugAnalyzeBeforeStatistics(), WithIntegrity(true), @@ -229,7 +227,7 @@ func TestCRDBDatastoreWithIntegrity(t *testing.T) { //nolint:tparallel return ds, nil }), false) - unwrappedTester := test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) { + unwrappedTester := test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, _ uint16) (datastore.Datastore, error) { ctx := context.Background() ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { ds, err := NewCRDBDatastore( @@ -237,7 +235,6 @@ func TestCRDBDatastoreWithIntegrity(t *testing.T) { //nolint:tparallel uri, GCWindow(gcWindow), RevisionQuantization(revisionQuantization), - WatchBufferLength(watchBufferLength), OverlapStrategy(overlapStrategyPrefix), DebugAnalyzeBeforeStatistics(), WithIntegrity(true), @@ -331,7 +328,7 @@ func TestWatchFeatureDetection(t *testing.T) { headRevision, err := ds.HeadRevision(ctx) require.NoError(t, err) - _, errChan := ds.Watch(ctx, headRevision, datastore.WatchJustRelationships()) + _, errChan := ds.Watch(ctx, headRevision, datastore.WatchJustRelationships(ds)) err = <-errChan require.Error(t, err) require.Contains(t, err.Error(), "watch is currently disabled") diff --git a/internal/datastore/crdb/options.go b/internal/datastore/crdb/options.go index 9e745fc21..619ae6c9b 100644 --- a/internal/datastore/crdb/options.go +++ b/internal/datastore/crdb/options.go @@ -15,10 +15,6 @@ type crdbOptions struct { readPoolOpts, writePoolOpts pgxcommon.PoolOptions connectRate time.Duration - watchBufferLength uint16 - watchChangeBufferMaximumSize uint64 - watchBufferWriteTimeout time.Duration - watchConnectTimeout time.Duration revisionQuantization time.Duration followerReadDelay time.Duration maxRevisionStalenessPercent float64 @@ -76,9 +72,6 @@ type Option func(*crdbOptions) func generateConfig(options []Option) (crdbOptions, error) { computed := crdbOptions{ gcWindow: 24 * time.Hour, - watchBufferLength: defaultWatchBufferLength, - watchBufferWriteTimeout: defaultWatchBufferWriteTimeout, - watchConnectTimeout: defaultWatchConnectTimeout, revisionQuantization: defaultRevisionQuantization, followerReadDelay: defaultFollowerReadDelay, maxRevisionStalenessPercent: defaultMaxRevisionStalenessPercent, @@ -251,34 +244,6 @@ func WriteConnsMaxOpen(conns int) Option { return func(po *crdbOptions) { po.writePoolOpts.MaxOpenConns = &conns } } -// WatchBufferLength is the number of entries that can be stored in the watch -// buffer while awaiting read by the client. -// -// This value defaults to 128. -func WatchBufferLength(watchBufferLength uint16) Option { - return func(po *crdbOptions) { po.watchBufferLength = watchBufferLength } -} - -// WatchBufferWriteTimeout is the maximum timeout for writing to the watch buffer, -// after which the caller to the watch will be disconnected. -func WatchBufferWriteTimeout(watchBufferWriteTimeout time.Duration) Option { - return func(po *crdbOptions) { po.watchBufferWriteTimeout = watchBufferWriteTimeout } -} - -// WatchBufferMaximumSize is the maximum size in bytes of the watch buffer. -// If this value is exceeded the caller will receive an error. -func WatchChangeBufferMaximumSize(maxSize uint64) Option { - return func(po *crdbOptions) { po.watchChangeBufferMaximumSize = maxSize } -} - -// WatchConnectTimeout is the maximum timeout for connecting the watch stream -// to the datastore. -// -// This value defaults to 1 second. -func WatchConnectTimeout(watchConnectTimeout time.Duration) Option { - return func(po *crdbOptions) { po.watchConnectTimeout = watchConnectTimeout } -} - // RevisionQuantization is the time bucket size to which advertised revisions // will be rounded. // diff --git a/internal/datastore/crdb/pool_test.go b/internal/datastore/crdb/pool_test.go index e1346fc41..5cd66fa64 100644 --- a/internal/datastore/crdb/pool_test.go +++ b/internal/datastore/crdb/pool_test.go @@ -107,7 +107,6 @@ func TestTxReset(t *testing.T) { uri, GCWindow(24*time.Hour), RevisionQuantization(5*time.Second), - WatchBufferLength(128), MaxRetries(tt.maxRetries), WithAcquireTimeout(5*time.Second), ) diff --git a/internal/datastore/crdb/watch.go b/internal/datastore/crdb/watch.go index e81342007..d5132a393 100644 --- a/internal/datastore/crdb/watch.go +++ b/internal/datastore/crdb/watch.go @@ -68,13 +68,17 @@ type changeDetails struct { } } -func (cds *crdbDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { - watchBufferLength := options.WatchBufferLength - if watchBufferLength == 0 { - watchBufferLength = cds.watchBufferLength +func (cds *crdbDatastore) DefaultsWatchOptions() datastore.WatchOptions { + return datastore.WatchOptions{ + CheckpointInterval: 1 * time.Second, + WatchBufferLength: defaultWatchBufferLength, + WatchBufferWriteTimeout: defaultWatchBufferWriteTimeout, + WatchConnectTimeout: defaultWatchConnectTimeout, } +} - updates := make(chan datastore.RevisionChanges, watchBufferLength) +func (cds *crdbDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { + updates := make(chan datastore.RevisionChanges, options.WatchBufferLength) errs := make(chan error, 1) // If checkpoints + schema is requested, this is likely used by the schema watching cache, @@ -121,17 +125,12 @@ func (cds *crdbDatastore) watch( defer close(updates) defer close(errs) - watchConnectTimeout := opts.WatchConnectTimeout - if watchConnectTimeout <= 0 { - watchConnectTimeout = cds.watchConnectTimeout - } - // get non-pooled connection for watch // "applications should explicitly create dedicated connections to consume // changefeed data, instead of using a connection pool as most client // drivers do by default." // see: https://www.cockroachlabs.com/docs/v22.2/changefeed-for#considerations - conn, err := pgxcommon.ConnectWithInstrumentationAndTimeout(ctx, cds.dburl, watchConnectTimeout) + conn, err := pgxcommon.ConnectWithInstrumentationAndTimeout(ctx, cds.dburl, opts.WatchConnectTimeout) if err != nil { errs <- err return @@ -153,18 +152,7 @@ func (cds *crdbDatastore) watch( return } - if opts.CheckpointInterval < 0 { - errs <- errors.New("invalid checkpoint interval given") - return - } - - // Default: 1s - resolvedDuration := 1 * time.Second - if opts.CheckpointInterval > 0 { - resolvedDuration = opts.CheckpointInterval - } - - resolvedDurationString := strconv.FormatInt(resolvedDuration.Milliseconds(), 10) + "ms" + resolvedDurationString := strconv.FormatInt(opts.CheckpointInterval.Milliseconds(), 10) + "ms" interpolated := fmt.Sprintf(cds.beginChangefeedQuery, strings.Join(tableNames, ","), afterRevision, resolvedDurationString) sendError := func(err error) { @@ -186,11 +174,6 @@ func (cds *crdbDatastore) watch( errs <- err } - watchBufferWriteTimeout := opts.WatchBufferWriteTimeout - if watchBufferWriteTimeout <= 0 { - watchBufferWriteTimeout = cds.watchBufferWriteTimeout - } - sendChange := func(change datastore.RevisionChanges) error { select { case updates <- change: @@ -200,7 +183,7 @@ func (cds *crdbDatastore) watch( // If we cannot immediately write, setup the timer and try again. } - timer := time.NewTimer(watchBufferWriteTimeout) + timer := time.NewTimer(opts.WatchBufferWriteTimeout) defer timer.Stop() select { @@ -344,12 +327,7 @@ func (cds *crdbDatastore) processChanges(ctx context.Context, changes pgx.Rows, content: opts.Content, } } else { - watchBufferSize := opts.MaximumBufferedChangesByteSize - if watchBufferSize == 0 { - watchBufferSize = cds.watchChangeBufferMaximumSize - } - - tracked = common.NewChanges(revisions.HLCKeyFunc, opts.Content, watchBufferSize) + tracked = common.NewChanges(revisions.HLCKeyFunc, opts.Content, opts.MaximumBufferedChangesByteSize) } for changes.Next() { diff --git a/internal/datastore/memdb/memdb.go b/internal/datastore/memdb/memdb.go index 610c14201..5cd7a501d 100644 --- a/internal/datastore/memdb/memdb.go +++ b/internal/datastore/memdb/memdb.go @@ -40,7 +40,7 @@ const DisableGC = time.Duration(math.MaxInt64) // // If the watchBufferLength value of 0 is set then a default value of 128 will be used. func NewMemdbDatastore( - watchBufferLength uint16, + watchBufferLength uint16, // TODO(miparnisari): remove revisionQuantization, gcWindow time.Duration, ) (datastore.Datastore, error) { @@ -57,10 +57,6 @@ func NewMemdbDatastore( return nil, err } - if watchBufferLength == 0 { - watchBufferLength = defaultWatchBufferLength - } - uniqueID := uuid.NewString() return &memdbDatastore{ CommonDecoder: revisions.CommonDecoder{ @@ -74,11 +70,9 @@ func NewMemdbDatastore( }, }, - negativeGCWindow: gcWindow.Nanoseconds() * -1, - quantizationPeriod: revisionQuantization.Nanoseconds(), - watchBufferLength: watchBufferLength, - watchBufferWriteTimeout: 100 * time.Millisecond, - uniqueID: uniqueID, + negativeGCWindow: gcWindow.Nanoseconds() * -1, + quantizationPeriod: revisionQuantization.Nanoseconds(), + uniqueID: uniqueID, }, nil } @@ -91,11 +85,9 @@ type memdbDatastore struct { revisions []snapshot // GUARDED_BY(RWMutex) activeWriteTxn *memdb.Txn // GUARDED_BY(RWMutex) - negativeGCWindow int64 - quantizationPeriod int64 - watchBufferLength uint16 - watchBufferWriteTimeout time.Duration - uniqueID string + negativeGCWindow int64 + quantizationPeriod int64 + uniqueID string } type snapshot struct { diff --git a/internal/datastore/memdb/memdb_test.go b/internal/datastore/memdb/memdb_test.go index f21c440c5..6fd24dcdd 100644 --- a/internal/datastore/memdb/memdb_test.go +++ b/internal/datastore/memdb/memdb_test.go @@ -127,7 +127,7 @@ func TestAnythingAfterCloseDoesNotPanic(t *testing.T) { err = ds.Close() require.NoError(err) - _, errChan := ds.Watch(t.Context(), lowestRevision, datastore.WatchJustRelationships()) + _, errChan := ds.Watch(t.Context(), lowestRevision, datastore.WatchJustRelationships(ds)) select { case err := <-errChan: diff --git a/internal/datastore/memdb/watch.go b/internal/datastore/memdb/watch.go index eaa481232..e1eb821b3 100644 --- a/internal/datastore/memdb/watch.go +++ b/internal/datastore/memdb/watch.go @@ -14,13 +14,17 @@ import ( const errWatchError = "watch error: %w" -func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { - watchBufferLength := options.WatchBufferLength - if watchBufferLength == 0 { - watchBufferLength = mdb.watchBufferLength +func (mdb *memdbDatastore) DefaultsWatchOptions() datastore.WatchOptions { + return datastore.WatchOptions{ + WatchBufferLength: defaultWatchBufferLength, + WatchBufferWriteTimeout: 100 * time.Millisecond, + // memdb does not use CheckpointInterval, WatchConnectTimeout, or MaximumBufferedChangesByteSize + // memdb does not support EmitImmediatelyStrategy } +} - updates := make(chan datastore.RevisionChanges, watchBufferLength) +func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { + updates := make(chan datastore.RevisionChanges, options.WatchBufferLength) errs := make(chan error, 1) if options.EmissionStrategy == datastore.EmitImmediatelyStrategy { @@ -29,11 +33,6 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt return updates, errs } - watchBufferWriteTimeout := options.WatchBufferWriteTimeout - if watchBufferWriteTimeout == 0 { - watchBufferWriteTimeout = mdb.watchBufferWriteTimeout - } - sendChange := func(change datastore.RevisionChanges) bool { select { case updates <- change: @@ -43,7 +42,7 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt // If we cannot immediately write, setup the timer and try again. } - timer := time.NewTimer(watchBufferWriteTimeout) + timer := time.NewTimer(options.WatchBufferWriteTimeout) defer timer.Stop() select { diff --git a/internal/datastore/mysql/datastore.go b/internal/datastore/mysql/datastore.go index 977985a05..b1b172a17 100644 --- a/internal/datastore/mysql/datastore.go +++ b/internal/datastore/mysql/datastore.go @@ -254,30 +254,27 @@ func newMySQLDatastore(ctx context.Context, uri string, replicaIndex int, option ) store := &mysqlDatastore{ - MigrationValidator: common.NewMigrationValidator(headMigration, config.allowedMigrations), - db: db, - driver: driver, - collectors: collectors, - url: uri, - revisionQuantization: config.revisionQuantization, - gcWindow: config.gcWindow, - gcInterval: config.gcInterval, - gcTimeout: config.gcMaxOperationTime, - gcCtx: gcCtx, - cancelGc: cancelGc, - watchEnabled: !config.watchDisabled, - watchBufferLength: config.watchBufferLength, - watchChangeBufferMaximumSize: config.watchChangeBufferMaximumSize, - watchBufferWriteTimeout: config.watchBufferWriteTimeout, - optimizedRevisionQuery: revisionQuery, - validTransactionQuery: validTransactionQuery, - createTxn: createTxn, - createBaseTxn: createBaseTxn, - QueryBuilder: queryBuilder, - readTxOptions: &sql.TxOptions{Isolation: sql.LevelSerializable, ReadOnly: true}, - maxRetries: config.maxRetries, - analyzeBeforeStats: config.analyzeBeforeStats, - schema: *schema, + MigrationValidator: common.NewMigrationValidator(headMigration, config.allowedMigrations), + db: db, + driver: driver, + collectors: collectors, + url: uri, + revisionQuantization: config.revisionQuantization, + gcWindow: config.gcWindow, + gcInterval: config.gcInterval, + gcTimeout: config.gcMaxOperationTime, + gcCtx: gcCtx, + cancelGc: cancelGc, + watchEnabled: !config.watchDisabled, + optimizedRevisionQuery: revisionQuery, + validTransactionQuery: validTransactionQuery, + createTxn: createTxn, + createBaseTxn: createBaseTxn, + QueryBuilder: queryBuilder, + readTxOptions: &sql.TxOptions{Isolation: sql.LevelSerializable, ReadOnly: true}, + maxRetries: config.maxRetries, + analyzeBeforeStats: config.analyzeBeforeStats, + schema: *schema, CachedOptimizedRevisions: revisions.NewCachedOptimizedRevisions( maxRevisionStaleness, ), @@ -484,17 +481,14 @@ type mysqlDatastore struct { analyzeBeforeStats bool collectors []prometheus.Collector - revisionQuantization time.Duration - gcWindow time.Duration - gcInterval time.Duration - gcTimeout time.Duration - watchBufferLength uint16 - watchChangeBufferMaximumSize uint64 - watchBufferWriteTimeout time.Duration - watchEnabled bool - maxRetries uint8 - filterMaximumIDCount uint16 - schema common.SchemaInformation + revisionQuantization time.Duration + gcWindow time.Duration + gcInterval time.Duration + gcTimeout time.Duration + watchEnabled bool + maxRetries uint8 + filterMaximumIDCount uint16 + schema common.SchemaInformation optimizedRevisionQuery string validTransactionQuery string diff --git a/internal/datastore/mysql/datastore_test.go b/internal/datastore/mysql/datastore_test.go index 16f8e6574..54663fc90 100644 --- a/internal/datastore/mysql/datastore_test.go +++ b/internal/datastore/mysql/datastore_test.go @@ -674,7 +674,6 @@ func QuantizedRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) { primaryInstanceID, RevisionQuantization(5*time.Second), GCWindow(24*time.Hour), - WatchBufferLength(1), ) require.NoError(err) return ds diff --git a/internal/datastore/mysql/options.go b/internal/datastore/mysql/options.go index 63f6e195c..c93cf0546 100644 --- a/internal/datastore/mysql/options.go +++ b/internal/datastore/mysql/options.go @@ -33,29 +33,26 @@ const ( ) type mysqlOptions struct { - revisionQuantization time.Duration - gcWindow time.Duration - gcInterval time.Duration - gcMaxOperationTime time.Duration - maxRevisionStalenessPercent float64 - followerReadDelay time.Duration - watchBufferLength uint16 - watchChangeBufferMaximumSize uint64 - watchBufferWriteTimeout time.Duration - tablePrefix string - enablePrometheusStats bool - maxOpenConns int - connMaxIdleTime time.Duration - connMaxLifetime time.Duration - analyzeBeforeStats bool - maxRetries uint8 - lockWaitTimeoutSeconds *uint8 - gcEnabled bool - credentialsProviderName string - filterMaximumIDCount uint16 - allowedMigrations []string - columnOptimizationOption common.ColumnOptimizationOption - watchDisabled bool + revisionQuantization time.Duration + gcWindow time.Duration + gcInterval time.Duration + gcMaxOperationTime time.Duration + maxRevisionStalenessPercent float64 + followerReadDelay time.Duration + tablePrefix string + enablePrometheusStats bool + maxOpenConns int + connMaxIdleTime time.Duration + connMaxLifetime time.Duration + analyzeBeforeStats bool + maxRetries uint8 + lockWaitTimeoutSeconds *uint8 + gcEnabled bool + credentialsProviderName string + filterMaximumIDCount uint16 + allowedMigrations []string + columnOptimizationOption common.ColumnOptimizationOption + watchDisabled bool } // Option provides the facility to configure how clients within the @@ -67,8 +64,6 @@ func generateConfig(options []Option) (mysqlOptions, error) { gcWindow: defaultGarbageCollectionWindow, gcInterval: defaultGarbageCollectionInterval, gcMaxOperationTime: defaultGarbageCollectionMaxOperationTime, - watchBufferLength: defaultWatchBufferLength, - watchBufferWriteTimeout: defaultWatchBufferWriteTimeout, maxOpenConns: defaultMaxOpenConns, connMaxIdleTime: defaultConnMaxIdleTime, connMaxLifetime: defaultConnMaxLifetime, @@ -105,28 +100,6 @@ func generateConfig(options []Option) (mysqlOptions, error) { return computed, nil } -// WatchBufferLength is the number of entries that can be stored in the watch -// buffer while awaiting read by the client. -// -// This value defaults to 128. -func WatchBufferLength(watchBufferLength uint16) Option { - return func(mo *mysqlOptions) { - mo.watchBufferLength = watchBufferLength - } -} - -// WatchBufferWriteTimeout is the maximum timeout for writing to the watch buffer, -// after which the caller to the watch will be disconnected. -func WatchBufferWriteTimeout(watchBufferWriteTimeout time.Duration) Option { - return func(mo *mysqlOptions) { mo.watchBufferWriteTimeout = watchBufferWriteTimeout } -} - -// WatchBufferMaximumSize is the maximum size in bytes of the watch buffer. -// If this value is exceeded the caller will receive an error. -func WatchChangeBufferMaximumSize(maxSize uint64) Option { - return func(mo *mysqlOptions) { mo.watchChangeBufferMaximumSize = maxSize } -} - // RevisionQuantization is the time bucket size to which advertised // revisions will be rounded. // diff --git a/internal/datastore/mysql/watch.go b/internal/datastore/mysql/watch.go index 553e3ccd9..da0df2392 100644 --- a/internal/datastore/mysql/watch.go +++ b/internal/datastore/mysql/watch.go @@ -17,16 +17,20 @@ const ( watchSleep = 100 * time.Millisecond ) +func (mds *mysqlDatastore) DefaultsWatchOptions() datastore.WatchOptions { + return datastore.WatchOptions{ + WatchBufferLength: defaultWatchBufferLength, + WatchBufferWriteTimeout: defaultWatchBufferWriteTimeout, + // MySQL does not use CheckpointInterval or WatchConnectTimeout + // MySQL does not support EmitImmediatelyStrategy or WatchSchema + } +} + // Watch notifies the caller about all changes to tuples. // // All events following afterRevision will be sent to the caller. func (mds *mysqlDatastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { - watchBufferLength := options.WatchBufferLength - if watchBufferLength == 0 { - watchBufferLength = mds.watchBufferLength - } - - updates := make(chan datastore.RevisionChanges, watchBufferLength) + updates := make(chan datastore.RevisionChanges, options.WatchBufferLength) errs := make(chan error, 1) if !mds.watchEnabled { @@ -53,11 +57,6 @@ func (mds *mysqlDatastore) Watch(ctx context.Context, afterRevisionRaw datastore return updates, errs } - watchBufferWriteTimeout := options.WatchBufferWriteTimeout - if watchBufferWriteTimeout <= 0 { - watchBufferWriteTimeout = mds.watchBufferWriteTimeout - } - sendChange := func(change datastore.RevisionChanges) bool { select { case updates <- change: @@ -67,7 +66,7 @@ func (mds *mysqlDatastore) Watch(ctx context.Context, afterRevisionRaw datastore // If we cannot immediately write, setup the timer and try again. } - timer := time.NewTimer(watchBufferWriteTimeout) + timer := time.NewTimer(options.WatchBufferWriteTimeout) defer timer.Stop() select { @@ -138,12 +137,7 @@ func (mds *mysqlDatastore) loadChanges( return changes, newRevision, err } - watchBufferSize := options.MaximumBufferedChangesByteSize - if watchBufferSize == 0 { - watchBufferSize = mds.watchChangeBufferMaximumSize - } - - stagedChanges := common.NewChanges(revisions.TransactionIDKeyFunc, options.Content, watchBufferSize) + stagedChanges := common.NewChanges(revisions.TransactionIDKeyFunc, options.Content, options.MaximumBufferedChangesByteSize) // Load any metadata for the revision range. sql, args, err := mds.LoadRevisionRange.Where(sq.Or{ diff --git a/internal/datastore/postgres/options.go b/internal/datastore/postgres/options.go index 1e2e1a812..eba0c4e50 100644 --- a/internal/datastore/postgres/options.go +++ b/internal/datastore/postgres/options.go @@ -16,16 +16,13 @@ type postgresOptions struct { credentialsProviderName string - watchBufferLength uint16 - watchChangeBufferMaximumSize uint64 - watchBufferWriteTimeout time.Duration - revisionQuantization time.Duration - followerReadDelay time.Duration - gcWindow time.Duration - gcInterval time.Duration - gcMaxOperationTime time.Duration - maxRetries uint8 - filterMaximumIDCount uint16 + revisionQuantization time.Duration + followerReadDelay time.Duration + gcWindow time.Duration + gcInterval time.Duration + gcMaxOperationTime time.Duration + maxRetries uint8 + filterMaximumIDCount uint16 enablePrometheusStats bool analyzeBeforeStatistics bool @@ -93,8 +90,6 @@ func generateConfig(options []Option) (postgresOptions, error) { gcWindow: defaultGarbageCollectionWindow, gcInterval: defaultGarbageCollectionInterval, gcMaxOperationTime: defaultGarbageCollectionMaxOperationTime, - watchBufferLength: defaultWatchBufferLength, - watchBufferWriteTimeout: defaultWatchBufferWriteTimeout, revisionQuantization: defaultQuantization, maxRevisionStalenessPercent: defaultMaxRevisionStalenessPercent, enablePrometheusStats: defaultEnablePrometheusStats, @@ -270,26 +265,6 @@ func WriteConnsMaxOpen(conns int) Option { return func(po *postgresOptions) { po.writePoolOpts.MaxOpenConns = &conns } } -// WatchBufferLength is the number of entries that can be stored in the watch -// buffer while awaiting read by the client. -// -// This value defaults to 128. -func WatchBufferLength(watchBufferLength uint16) Option { - return func(po *postgresOptions) { po.watchBufferLength = watchBufferLength } -} - -// WatchBufferWriteTimeout is the maximum timeout for writing to the watch buffer, -// after which the caller to the watch will be disconnected. -func WatchBufferWriteTimeout(watchBufferWriteTimeout time.Duration) Option { - return func(po *postgresOptions) { po.watchBufferWriteTimeout = watchBufferWriteTimeout } -} - -// WatchBufferMaximumSize is the maximum size in bytes of the watch buffer. -// If this value is exceeded the caller will receive an error. -func WatchChangeBufferMaximumSize(maxSize uint64) Option { - return func(po *postgresOptions) { po.watchChangeBufferMaximumSize = maxSize } -} - // RevisionQuantization is the time bucket size to which advertised // revisions will be rounded. // diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index 982a45d45..072b77f2e 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -306,33 +306,30 @@ func newPostgresDatastore( CachedOptimizedRevisions: revisions.NewCachedOptimizedRevisions( maxRevisionStaleness, ), - MigrationValidator: common.NewMigrationValidator(headMigration, config.allowedMigrations), - dburl: pgURL, - readPool: pgxcommon.MustNewInterceptorPooler(readPool, config.queryInterceptor), - writePool: nil, /* disabled by default */ - collectors: collectors, - watchBufferLength: config.watchBufferLength, - watchChangeBufferMaximumSize: config.watchChangeBufferMaximumSize, - watchBufferWriteTimeout: config.watchBufferWriteTimeout, - optimizedRevisionQuery: revisionQuery, - validTransactionQuery: validTransactionQuery, - revisionHeartbeatQuery: revisionHeartbeatQuery, - gcWindow: config.gcWindow, - gcInterval: config.gcInterval, - gcTimeout: config.gcMaxOperationTime, - analyzeBeforeStatistics: config.analyzeBeforeStatistics, - watchEnabled: watchEnabled, - workerCtx: gcCtx, - cancelGc: cancelGc, - readTxOptions: pgx.TxOptions{IsoLevel: pgx.RepeatableRead, AccessMode: pgx.ReadOnly}, - maxRetries: config.maxRetries, - credentialsProvider: credentialsProvider, - isPrimary: isPrimary, - inStrictReadMode: config.readStrictMode, - filterMaximumIDCount: config.filterMaximumIDCount, - schema: *schema.Schema(config.columnOptimizationOption, false), - quantizationPeriodNanos: quantizationPeriodNanos, - isolationLevel: isolationLevel, + MigrationValidator: common.NewMigrationValidator(headMigration, config.allowedMigrations), + dburl: pgURL, + readPool: pgxcommon.MustNewInterceptorPooler(readPool, config.queryInterceptor), + writePool: nil, /* disabled by default */ + collectors: collectors, + optimizedRevisionQuery: revisionQuery, + validTransactionQuery: validTransactionQuery, + revisionHeartbeatQuery: revisionHeartbeatQuery, + gcWindow: config.gcWindow, + gcInterval: config.gcInterval, + gcTimeout: config.gcMaxOperationTime, + analyzeBeforeStatistics: config.analyzeBeforeStatistics, + watchEnabled: watchEnabled, + workerCtx: gcCtx, + cancelGc: cancelGc, + readTxOptions: pgx.TxOptions{IsoLevel: pgx.RepeatableRead, AccessMode: pgx.ReadOnly}, + maxRetries: config.maxRetries, + credentialsProvider: credentialsProvider, + isPrimary: isPrimary, + inStrictReadMode: config.readStrictMode, + filterMaximumIDCount: config.filterMaximumIDCount, + schema: *schema.Schema(config.columnOptimizationOption, false), + quantizationPeriodNanos: quantizationPeriodNanos, + isolationLevel: isolationLevel, } if isPrimary && config.readStrictMode { @@ -379,9 +376,6 @@ type pgDatastore struct { dburl string readPool, writePool pgxcommon.ConnPooler collectors []prometheus.Collector - watchBufferLength uint16 - watchChangeBufferMaximumSize uint64 - watchBufferWriteTimeout time.Duration optimizedRevisionQuery string validTransactionQuery string revisionHeartbeatQuery string diff --git a/internal/datastore/postgres/postgres_shared_test.go b/internal/datastore/postgres/postgres_shared_test.go index f090f6fa5..935ab3720 100644 --- a/internal/datastore/postgres/postgres_shared_test.go +++ b/internal/datastore/postgres/postgres_shared_test.go @@ -82,13 +82,12 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { ctx := context.Background() // NOTE: gc tests take exclusive locks, so they are run under non-parallel. - test.OnlyGCTests(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) { + test.OnlyGCTests(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, _ uint16) (datastore.Datastore, error) { ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { ds, err := newPostgresDatastore(ctx, uri, primaryInstanceID, RevisionQuantization(revisionQuantization), GCWindow(gcWindow), GCInterval(gcInterval), - WatchBufferLength(watchBufferLength), DebugAnalyzeBeforeStatistics(), MigrationPhase(config.migrationPhase), WithRevisionHeartbeat(false), // heartbeat revision messes with tests that assert over revisions @@ -105,7 +104,6 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { RevisionQuantization(0), GCWindow(1000*time.Second), GCInterval(veryLargeGCInterval), - WatchBufferLength(50), MigrationPhase(config.migrationPhase), ReadConnsMinOpen(10), ReadConnsMaxOpen(10), @@ -119,13 +117,12 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { b := testdatastore.RunPostgresForTesting(t, "", config.targetMigration, config.pgVersion, config.pgbouncer) ctx := context.Background() - test.AllWithExceptions(t, test.DatastoreTesterFunc(func(revisionQuantization, _, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) { + test.AllWithExceptions(t, test.DatastoreTesterFunc(func(revisionQuantization, _, gcWindow time.Duration, _ uint16) (datastore.Datastore, error) { ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { ds, err := newPostgresDatastore(ctx, uri, primaryInstanceID, RevisionQuantization(revisionQuantization), GCWindow(gcWindow), GCInterval(veryLargeGCInterval), - WatchBufferLength(watchBufferLength), DebugAnalyzeBeforeStatistics(), MigrationPhase(config.migrationPhase), WithRevisionHeartbeat(false), // heartbeat revision messes with tests that assert over revisions @@ -142,7 +139,6 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { RevisionQuantization(0), GCWindow(1*time.Millisecond), GCInterval(veryLargeGCInterval), - WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -169,7 +165,6 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { RevisionQuantization(0), GCWindow(1*time.Millisecond), GCInterval(veryLargeGCInterval), - WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -179,7 +174,6 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { RevisionQuantization(0), GCWindow(1*time.Millisecond), GCInterval(veryLargeGCInterval), - WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -189,7 +183,6 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { RevisionQuantization(0), GCWindow(1*time.Millisecond), GCInterval(veryLargeGCInterval), - WatchBufferLength(50), MigrationPhase(config.migrationPhase), WithRevisionHeartbeat(false), )) @@ -200,7 +193,6 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { RevisionQuantization(0), GCWindow(1*time.Millisecond), GCInterval(veryLargeGCInterval), - WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -210,7 +202,6 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { RevisionQuantization(0), GCWindow(1*time.Millisecond), GCInterval(veryLargeGCInterval), - WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -220,7 +211,6 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { RevisionQuantization(0), GCWindow(1*time.Millisecond), GCInterval(veryLargeGCInterval), - WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -230,7 +220,6 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { RevisionQuantization(0), GCWindow(1*time.Millisecond), GCInterval(veryLargeGCInterval), - WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -239,7 +228,6 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { ContinuousCheckpointTest, RevisionQuantization(100*time.Millisecond), GCInterval(veryLargeGCInterval), - WatchBufferLength(50), MigrationPhase(config.migrationPhase), WithRevisionHeartbeat(true), )) @@ -250,7 +238,6 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { RevisionQuantization(0), GCWindow(1*time.Millisecond), GCInterval(veryLargeGCInterval), - WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -260,7 +247,6 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { RevisionQuantization(0), GCWindow(1*time.Millisecond), GCInterval(veryLargeGCInterval), - WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -270,7 +256,6 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { RevisionQuantization(0), GCWindow(1000*time.Second), GCInterval(veryLargeGCInterval), - WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) @@ -280,7 +265,6 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { RevisionQuantization(0), GCWindow(1000*time.Second), GCInterval(veryLargeGCInterval), - WatchBufferLength(50), MigrationPhase(config.migrationPhase), )) } @@ -291,7 +275,6 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { RevisionQuantization(0), GCWindow(1*time.Millisecond), GCInterval(veryLargeGCInterval), - WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -301,7 +284,6 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { RevisionQuantization(0), GCWindow(1*time.Millisecond), GCInterval(veryLargeGCInterval), - WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) }) @@ -318,13 +300,12 @@ func testPostgresDatastoreWithoutCommitTimestamps(t *testing.T, config postgresT // NOTE: watch API requires the commit timestamps, so we skip those tests here. // NOTE: gc tests take exclusive locks, so they are run under non-parallel. - test.AllWithExceptions(t, test.DatastoreTesterFunc(func(revisionQuantization, _, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) { + test.AllWithExceptions(t, test.DatastoreTesterFunc(func(revisionQuantization, _, gcWindow time.Duration, _ uint16) (datastore.Datastore, error) { ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { ds, err := newPostgresDatastore(ctx, uri, primaryInstanceID, RevisionQuantization(revisionQuantization), GCWindow(gcWindow), GCInterval(veryLargeGCInterval), - WatchBufferLength(watchBufferLength), DebugAnalyzeBeforeStatistics(), WithRevisionHeartbeat(false), ) @@ -338,13 +319,12 @@ func testPostgresDatastoreWithoutCommitTimestamps(t *testing.T, config postgresT t.Run(fmt.Sprintf("postgres-%s-gc", pgVersion), func(t *testing.T) { ctx := context.Background() b := testdatastore.RunPostgresForTestingWithCommitTimestamps(t, "", "head", false, pgVersion, enablePgbouncer) - test.OnlyGCTests(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) { + test.OnlyGCTests(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, _ uint16) (datastore.Datastore, error) { ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { ds, err := newPostgresDatastore(ctx, uri, primaryInstanceID, RevisionQuantization(revisionQuantization), GCWindow(gcWindow), GCInterval(gcInterval), - WatchBufferLength(watchBufferLength), DebugAnalyzeBeforeStatistics(), WithRevisionHeartbeat(false), ) @@ -926,7 +906,6 @@ func QuantizedRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) { primaryInstanceID, RevisionQuantization(tc.quantization), GCWindow(24*time.Hour), - WatchBufferLength(1), FollowerReadDelay(tc.followerReadDelay), WithRevisionHeartbeat(false), ) @@ -1004,7 +983,6 @@ func OverlappingRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) primaryInstanceID, RevisionQuantization(tc.quantization), GCWindow(24*time.Hour), - WatchBufferLength(1), FollowerReadDelay(tc.followerReadDelay), WithRevisionHeartbeat(false), ) @@ -1166,7 +1144,7 @@ func ConcurrentRevisionWatchTest(t *testing.T, ds datastore.Datastore) { seenWatchRevisionsLock := sync.Mutex{} go func() { - changes, _ := ds.Watch(withCancel, rev, datastore.WatchJustRelationships()) + changes, _ := ds.Watch(withCancel, rev, datastore.WatchJustRelationships(ds)) waitForWatch <- struct{}{} @@ -1334,7 +1312,7 @@ func OverlappingRevisionWatchTest(t *testing.T, ds datastore.Datastore) { require.NoError(err) // Call watch and ensure it terminates with having only read the two expected sets of changes. - changes, errChan := ds.Watch(ctx, rev, datastore.WatchJustRelationships()) + changes, errChan := ds.Watch(ctx, rev, datastore.WatchJustRelationships(ds)) transactionCount := 0 loop: for { @@ -1450,7 +1428,6 @@ func WatchNotEnabledTest(t *testing.T, _ testdatastore.RunningEngineForTest, pgV RevisionQuantization(0), GCWindow(time.Millisecond*1), GCInterval(veryLargeGCInterval), - WatchBufferLength(1), ) require.NoError(err) return ds @@ -1461,7 +1438,7 @@ func WatchNotEnabledTest(t *testing.T, _ testdatastore.RunningEngineForTest, pgV _, errChan := ds.Watch( context.Background(), revision, - datastore.WatchJustRelationships(), + datastore.WatchJustRelationships(ds), ) err := <-errChan require.Error(err) @@ -1479,7 +1456,6 @@ func BenchmarkPostgresQuery(b *testing.B) { RevisionQuantization(0), GCWindow(time.Millisecond*1), GCInterval(veryLargeGCInterval), - WatchBufferLength(1), ) require.NoError(b, err) return ds @@ -1517,7 +1493,6 @@ func datastoreWithInterceptorAndTestData(t *testing.T, interceptor pgcommon.Quer RevisionQuantization(0), GCWindow(time.Millisecond*1), GCInterval(veryLargeGCInterval), - WatchBufferLength(1), WithQueryInterceptor(interceptor), ) require.NoError(err) @@ -1903,7 +1878,7 @@ func NullCaveatWatchTest(t *testing.T, ds datastore.Datastore) { require.NoError(err) // Run the watch API. - changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchJustRelationships()) + changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchJustRelationships(ds)) require.Empty(errchan) // Manually insert a relationship with a NULL caveat. This is allowed, but can only happen due to @@ -1974,9 +1949,9 @@ func RevisionTimestampAndTransactionIDTest(t *testing.T, ds datastore.Datastore) require.NoError(err) // Run the watch API. - changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchOptions{ - Content: datastore.WatchRelationships | datastore.WatchSchema | datastore.WatchCheckpoints, - }) + watchOptions := ds.DefaultsWatchOptions() + watchOptions.Content = datastore.WatchRelationships | datastore.WatchSchema | datastore.WatchCheckpoints + changes, errchan := ds.Watch(ctx, lowestRevision, watchOptions) require.Empty(errchan) pds := ds.(*pgDatastore) @@ -2038,10 +2013,10 @@ func ContinuousCheckpointTest(t *testing.T, ds datastore.Datastore) { require.NoError(err) // Run the watch API. - changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchOptions{ - Content: datastore.WatchCheckpoints, - CheckpointInterval: 100 * time.Millisecond, - }) + watchOptions := ds.DefaultsWatchOptions() + watchOptions.Content = datastore.WatchCheckpoints + watchOptions.CheckpointInterval = 100 * time.Millisecond + changes, errchan := ds.Watch(ctx, lowestRevision, watchOptions) require.Empty(errchan) var checkpointCount int diff --git a/internal/datastore/postgres/postgres_test.go b/internal/datastore/postgres/postgres_test.go index 78c23b161..d2544b46e 100644 --- a/internal/datastore/postgres/postgres_test.go +++ b/internal/datastore/postgres/postgres_test.go @@ -57,7 +57,6 @@ func TestPostgresDatastoreGC(t *testing.T) { RevisionQuantization(0), GCWindow(1*time.Millisecond), GCInterval(veryLargeGCInterval), - WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -67,7 +66,6 @@ func TestPostgresDatastoreGC(t *testing.T) { RevisionQuantization(0), GCWindow(1*time.Millisecond), GCInterval(veryLargeGCInterval), - WatchBufferLength(1), MigrationPhase(config.migrationPhase), )) @@ -77,7 +75,6 @@ func TestPostgresDatastoreGC(t *testing.T) { RevisionQuantization(0), GCWindow(1*time.Millisecond), GCInterval(veryLargeGCInterval), - WatchBufferLength(1), MigrationPhase(config.migrationPhase), WithRevisionHeartbeat(false), )) diff --git a/internal/datastore/postgres/watch.go b/internal/datastore/postgres/watch.go index 89533fc0e..bcf7eb183 100644 --- a/internal/datastore/postgres/watch.go +++ b/internal/datastore/postgres/watch.go @@ -13,6 +13,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/common" "github.com/authzed/spicedb/internal/datastore/postgres/schema" + log "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/internal/sharederrors" "github.com/authzed/spicedb/pkg/datastore" core "github.com/authzed/spicedb/pkg/proto/core/v1" @@ -24,6 +25,15 @@ const ( minimumWatchSleep = 100 * time.Millisecond ) +func (pgd *pgDatastore) DefaultsWatchOptions() datastore.WatchOptions { + return datastore.WatchOptions{ + WatchBufferLength: defaultWatchBufferLength, + WatchBufferWriteTimeout: defaultWatchBufferWriteTimeout, + // Postgres does not use WatchConnectTimeout + // Postgres does not support EmitImmediatelyStrategy + } +} + var ( // This query must cast an xid8 to xid, which is a safe operation as long as the // xid8 is one of the last ~2 billion transaction IDs generated. We should be garbage @@ -67,12 +77,7 @@ func (pgd *pgDatastore) Watch( afterRevisionRaw datastore.Revision, options datastore.WatchOptions, ) (<-chan datastore.RevisionChanges, <-chan error) { - watchBufferLength := options.WatchBufferLength - if watchBufferLength == 0 { - watchBufferLength = pgd.watchBufferLength - } - - updates := make(chan datastore.RevisionChanges, watchBufferLength) + updates := make(chan datastore.RevisionChanges, options.WatchBufferLength) errs := make(chan error, 1) if !pgd.watchEnabled { @@ -88,14 +93,10 @@ func (pgd *pgDatastore) Watch( } afterRevision := afterRevisionRaw.(postgresRevision) - watchSleep := options.CheckpointInterval - if watchSleep < minimumWatchSleep { - watchSleep = minimumWatchSleep - } - watchBufferWriteTimeout := options.WatchBufferWriteTimeout - if watchBufferWriteTimeout <= 0 { - watchBufferWriteTimeout = pgd.watchBufferWriteTimeout + if options.CheckpointInterval < minimumWatchSleep { + log.Warn().Msgf("--watch-api-heartbeat set too small, using %d", minimumWatchSleep) + options.CheckpointInterval = minimumWatchSleep } sendChange := func(change datastore.RevisionChanges) bool { @@ -107,7 +108,7 @@ func (pgd *pgDatastore) Watch( // If we cannot immediately write, setup the timer and try again. } - timer := time.NewTimer(watchBufferWriteTimeout) + timer := time.NewTimer(options.WatchBufferWriteTimeout) defer timer.Stop() select { @@ -192,7 +193,7 @@ func (pgd *pgDatastore) Watch( } } else { select { - case <-time.NewTimer(watchSleep).C: + case <-time.NewTimer(options.CheckpointInterval).C: break case <-ctx.Done(): errs <- datastore.NewWatchCanceledErr() @@ -252,12 +253,7 @@ func (pgd *pgDatastore) loadChanges(ctx context.Context, revisions []postgresRev filter := make(map[uint64]int, len(revisions)) txidToRevision := make(map[uint64]postgresRevision, len(revisions)) - watchBufferSize := options.MaximumBufferedChangesByteSize - if watchBufferSize == 0 { - watchBufferSize = pgd.watchChangeBufferMaximumSize - } - - tracked := common.NewChanges(revisionKeyFunc, options.Content, watchBufferSize) + tracked := common.NewChanges(revisionKeyFunc, options.Content, options.MaximumBufferedChangesByteSize) for i, rev := range revisions { if rev.optionalTxID.Uint64 < xmin { diff --git a/internal/datastore/proxy/checkingreplicated_test.go b/internal/datastore/proxy/checkingreplicated_test.go index b950ad085..6481eb259 100644 --- a/internal/datastore/proxy/checkingreplicated_test.go +++ b/internal/datastore/proxy/checkingreplicated_test.go @@ -122,6 +122,10 @@ func (f fakeDatastore) RevisionFromString(_ string) (datastore.Revision, error) return nil, nil } +func (f fakeDatastore) DefaultsWatchOptions() datastore.WatchOptions { + return datastore.WatchOptions{} +} + func (f fakeDatastore) Watch(_ context.Context, _ datastore.Revision, _ datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { return nil, nil } diff --git a/internal/datastore/proxy/counting.go b/internal/datastore/proxy/counting.go index 60be581d0..b97a68f47 100644 --- a/internal/datastore/proxy/counting.go +++ b/internal/datastore/proxy/counting.go @@ -181,6 +181,10 @@ func (p *countingProxy) RevisionFromString(serialized string) (datastore.Revisio return p.delegate.RevisionFromString(serialized) } +func (p *countingProxy) DefaultsWatchOptions() datastore.WatchOptions { + return p.delegate.DefaultsWatchOptions() +} + func (p *countingProxy) Watch(ctx context.Context, afterRevision datastore.Revision, watchOptions datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { return p.delegate.Watch(ctx, afterRevision, watchOptions) } diff --git a/internal/datastore/proxy/indexcheck/fakedatastore_test.go b/internal/datastore/proxy/indexcheck/fakedatastore_test.go index 6143c13a3..d68446b2a 100644 --- a/internal/datastore/proxy/indexcheck/fakedatastore_test.go +++ b/internal/datastore/proxy/indexcheck/fakedatastore_test.go @@ -59,6 +59,10 @@ func (f fakeDatastore) RevisionFromString(_ string) (datastore.Revision, error) return nil, nil } +func (f fakeDatastore) DefaultsWatchOptions() datastore.WatchOptions { + return datastore.WatchOptions{} +} + func (f fakeDatastore) Watch(_ context.Context, _ datastore.Revision, _ datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { return nil, nil } diff --git a/internal/datastore/proxy/indexcheck/indexcheck.go b/internal/datastore/proxy/indexcheck/indexcheck.go index 4846ca8ba..2346ad89b 100644 --- a/internal/datastore/proxy/indexcheck/indexcheck.go +++ b/internal/datastore/proxy/indexcheck/indexcheck.go @@ -72,6 +72,10 @@ func (p *indexcheckingProxy) RevisionFromString(serialized string) (datastore.Re return p.delegate.RevisionFromString(serialized) } +func (p *indexcheckingProxy) DefaultsWatchOptions() datastore.WatchOptions { + return p.delegate.DefaultsWatchOptions() +} + func (p *indexcheckingProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { return p.delegate.Watch(ctx, afterRevision, options) } diff --git a/internal/datastore/proxy/indexcheck/indexcheck_test.go b/internal/datastore/proxy/indexcheck/indexcheck_test.go index 91d385658..cfe9c1834 100644 --- a/internal/datastore/proxy/indexcheck/indexcheck_test.go +++ b/internal/datastore/proxy/indexcheck/indexcheck_test.go @@ -120,7 +120,13 @@ func TestIndexCheckingProxyMethods(t *testing.T) { }) t.Run("Watch", func(t *testing.T) { - changes, errs := proxy.Watch(t.Context(), nil, datastore.WatchOptions{}) + watchOptions, err := datastore.BuildAndValidateWatchOptions( + datastore.ServerWatchOptions{}, + datastore.ClientWatchOptions{}, + proxy.DefaultsWatchOptions(), + ) + require.NoError(t, err) + changes, errs := proxy.Watch(t.Context(), nil, watchOptions) require.Nil(t, changes) require.Nil(t, errs) }) diff --git a/internal/datastore/proxy/observable.go b/internal/datastore/proxy/observable.go index 9e3ef5934..d848b1a5f 100644 --- a/internal/datastore/proxy/observable.go +++ b/internal/datastore/proxy/observable.go @@ -121,6 +121,10 @@ func (p *observableProxy) RevisionFromString(serialized string) (datastore.Revis return p.delegate.RevisionFromString(serialized) } +func (p *observableProxy) DefaultsWatchOptions() datastore.WatchOptions { + return p.delegate.DefaultsWatchOptions() +} + func (p *observableProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { return p.delegate.Watch(ctx, afterRevision, options) } diff --git a/internal/datastore/proxy/proxy_test/mock.go b/internal/datastore/proxy/proxy_test/mock.go index 268a93b70..47d2614c8 100644 --- a/internal/datastore/proxy/proxy_test/mock.go +++ b/internal/datastore/proxy/proxy_test/mock.go @@ -72,6 +72,10 @@ func (dm *MockDatastore) RevisionFromString(s string) (datastore.Revision, error return args.Get(0).(datastore.Revision), args.Error(1) } +func (dm *MockDatastore) DefaultsWatchOptions() datastore.WatchOptions { + return datastore.WatchOptions{} +} + func (dm *MockDatastore) Watch(_ context.Context, afterRevision datastore.Revision, _ datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { args := dm.Called(afterRevision) return args.Get(0).(<-chan datastore.RevisionChanges), args.Get(1).(<-chan error) diff --git a/internal/datastore/proxy/readonly_test.go b/internal/datastore/proxy/readonly_test.go index 69ba1b8fb..58f888065 100644 --- a/internal/datastore/proxy/readonly_test.go +++ b/internal/datastore/proxy/readonly_test.go @@ -121,7 +121,7 @@ func TestWatchPassthrough(t *testing.T) { make(<-chan error), ).Times(1) - ds.Watch(ctx, expectedRevision, datastore.WatchJustRelationships()) + ds.Watch(ctx, expectedRevision, datastore.WatchJustRelationships(ds)) delegate.AssertExpectations(t) } diff --git a/internal/datastore/proxy/relationshipintegrity.go b/internal/datastore/proxy/relationshipintegrity.go index 7c76833b0..cb7bc428e 100644 --- a/internal/datastore/proxy/relationshipintegrity.go +++ b/internal/datastore/proxy/relationshipintegrity.go @@ -214,6 +214,10 @@ func (r *relationshipIntegrityProxy) RevisionFromString(serialized string) (data return r.ds.RevisionFromString(serialized) } +func (r *relationshipIntegrityProxy) DefaultsWatchOptions() datastore.WatchOptions { + return r.ds.DefaultsWatchOptions() +} + func (r *relationshipIntegrityProxy) Statistics(ctx context.Context) (datastore.Stats, error) { return r.ds.Statistics(ctx) } diff --git a/internal/datastore/proxy/relationshipintegrity_test.go b/internal/datastore/proxy/relationshipintegrity_test.go index 525924111..34a0c06d1 100644 --- a/internal/datastore/proxy/relationshipintegrity_test.go +++ b/internal/datastore/proxy/relationshipintegrity_test.go @@ -256,7 +256,7 @@ func TestWatchIntegrityFailureDueToInvalidHashSignature(t *testing.T) { pds, err := NewRelationshipIntegrityProxy(ds, DefaultKeyForTesting, nil) require.NoError(t, err) - watchEvents, errChan := pds.Watch(t.Context(), headRev, datastore.WatchJustRelationships()) + watchEvents, errChan := pds.Watch(t.Context(), headRev, datastore.WatchJustRelationships(ds)) // Insert an invalid integrity hash for one of the relationships to be invalid by bypassing // the proxy. diff --git a/internal/datastore/proxy/schemacaching/watchingcache.go b/internal/datastore/proxy/schemacaching/watchingcache.go index 20987905f..0601d6c93 100644 --- a/internal/datastore/proxy/schemacaching/watchingcache.go +++ b/internal/datastore/proxy/schemacaching/watchingcache.go @@ -157,6 +157,22 @@ func (p *watchingCachingProxy) startSync(ctx context.Context) error { return err } + watchOptions, err := datastore.BuildAndValidateWatchOptions( + datastore.ServerWatchOptions{ + CheckpointInterval: p.watchHeartbeat, + }, + datastore.ClientWatchOptions{ + Content: datastore.WatchSchema | datastore.WatchCheckpoints, + }, + p.DefaultsWatchOptions(), + ) + if err != nil { + p.namespaceCache.setFallbackMode() + p.caveatCache.setFallbackMode() + log.Warn().Err(err).Msg("error building watch options") + return err + } + // Start watching for expired entries to be GCed. go (func() { log.Debug().Str("revision", headRev.String()).Msg("starting watching cache GC goroutine") @@ -241,10 +257,8 @@ func (p *watchingCachingProxy) startSync(ctx context.Context) error { log.Info().Str("revision", headRev.String()).Int("count", len(caveats)).Msg("populated caveat watching cache") log.Debug().Str("revision", headRev.String()).Dur("watch-heartbeat", p.watchHeartbeat).Msg("beginning schema watch") - ssc, serrc := p.Watch(ctx, headRev, datastore.WatchOptions{ - Content: datastore.WatchSchema | datastore.WatchCheckpoints, - CheckpointInterval: p.watchHeartbeat, - }) + + ssc, serrc := p.Watch(ctx, headRev, watchOptions) spiceerrors.DebugAssertNotNilf(ssc, "ssc is nil") spiceerrors.DebugAssertNotNilf(serrc, "serrc is nil") diff --git a/internal/datastore/proxy/schemacaching/watchingcache_test.go b/internal/datastore/proxy/schemacaching/watchingcache_test.go index fb8489560..163ee8c9c 100644 --- a/internal/datastore/proxy/schemacaching/watchingcache_test.go +++ b/internal/datastore/proxy/schemacaching/watchingcache_test.go @@ -968,6 +968,10 @@ func (*fakeDatastore) Statistics(context.Context) (datastore.Stats, error) { return datastore.Stats{}, fmt.Errorf("not implemented") } +func (*fakeDatastore) DefaultsWatchOptions() datastore.WatchOptions { + return datastore.WatchOptions{} +} + func (fds *fakeDatastore) Watch(_ context.Context, _ datastore.Revision, opts datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { if opts.Content&datastore.WatchSchema != datastore.WatchSchema { panic("unexpected option") diff --git a/internal/datastore/proxy/singleflight.go b/internal/datastore/proxy/singleflight.go index 410ef0093..6891b8d6d 100644 --- a/internal/datastore/proxy/singleflight.go +++ b/internal/datastore/proxy/singleflight.go @@ -64,6 +64,10 @@ func (p *singleflightProxy) RevisionFromString(serialized string) (datastore.Rev return p.delegate.RevisionFromString(serialized) } +func (p *singleflightProxy) DefaultsWatchOptions() datastore.WatchOptions { + return p.delegate.DefaultsWatchOptions() +} + func (p *singleflightProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { return p.delegate.Watch(ctx, afterRevision, options) } diff --git a/internal/datastore/spanner/options.go b/internal/datastore/spanner/options.go index d32a98525..bc0a497af 100644 --- a/internal/datastore/spanner/options.go +++ b/internal/datastore/spanner/options.go @@ -38,26 +38,23 @@ const ( ) type spannerOptions struct { - watchBufferLength uint16 - watchChangeBufferMaximumSize uint64 - watchBufferWriteTimeout time.Duration - revisionQuantization time.Duration - followerReadDelay time.Duration - maxRevisionStalenessPercent float64 - credentialsFilePath string - credentialsJSON []byte - emulatorHost string - disableStats bool - readMaxOpen int - writeMaxOpen int - minSessions uint64 - maxSessions uint64 - migrationPhase string - allowedMigrations []string - filterMaximumIDCount uint16 - columnOptimizationOption common.ColumnOptimizationOption - watchDisabled bool - datastoreMetricsOption DatastoreMetricsOption + revisionQuantization time.Duration + followerReadDelay time.Duration + maxRevisionStalenessPercent float64 + credentialsFilePath string + credentialsJSON []byte + emulatorHost string + disableStats bool + readMaxOpen int + writeMaxOpen int + minSessions uint64 + maxSessions uint64 + migrationPhase string + allowedMigrations []string + filterMaximumIDCount uint16 + columnOptimizationOption common.ColumnOptimizationOption + watchDisabled bool + datastoreMetricsOption DatastoreMetricsOption } type migrationPhase uint8 @@ -76,6 +73,8 @@ const ( defaultRevisionQuantization = 5 * time.Second defaultFollowerReadDelay = 0 * time.Second defaultMaxRevisionStalenessPercent = 0.1 + minimumCheckpointInterval = 100 * time.Millisecond + maximumCheckpointInterval = 300000 * time.Millisecond defaultWatchBufferLength = 128 defaultWatchBufferWriteTimeout = 1 * time.Second defaultDisableStats = false @@ -94,8 +93,6 @@ func generateConfig(options []Option) (spannerOptions, error) { // This determines if there are more CPU cores to increase the default number of connections defaultNumberConnections := max(1, math.Round(float64(runtime.GOMAXPROCS(0)))) computed := spannerOptions{ - watchBufferLength: defaultWatchBufferLength, - watchBufferWriteTimeout: defaultWatchBufferWriteTimeout, revisionQuantization: defaultRevisionQuantization, followerReadDelay: defaultFollowerReadDelay, maxRevisionStalenessPercent: defaultMaxRevisionStalenessPercent, @@ -135,28 +132,6 @@ func generateConfig(options []Option) (spannerOptions, error) { return computed, nil } -// WatchBufferLength is the number of entries that can be stored in the watch -// buffer while awaiting read by the client. -// -// This value defaults to 128. -func WatchBufferLength(watchBufferLength uint16) Option { - return func(so *spannerOptions) { - so.watchBufferLength = watchBufferLength - } -} - -// WatchBufferWriteTimeout is the maximum timeout for writing to the watch buffer, -// after which the caller to the watch will be disconnected. -func WatchBufferWriteTimeout(watchBufferWriteTimeout time.Duration) Option { - return func(so *spannerOptions) { so.watchBufferWriteTimeout = watchBufferWriteTimeout } -} - -// WatchBufferMaximumSize is the maximum size in bytes of the watch buffer. -// If this value is exceeded the caller will receive an error. -func WatchChangeBufferMaximumSize(maxSize uint64) Option { - return func(so *spannerOptions) { so.watchChangeBufferMaximumSize = maxSize } -} - // RevisionQuantization is the time bucket size to which advertised revisions // will be rounded. // diff --git a/internal/datastore/spanner/spanner.go b/internal/datastore/spanner/spanner.go index c535bc7c2..8abaec7d2 100644 --- a/internal/datastore/spanner/spanner.go +++ b/internal/datastore/spanner/spanner.go @@ -85,10 +85,7 @@ type spannerDatastore struct { revisions.CommonDecoder *common.MigrationValidator - watchBufferLength uint16 - watchChangeBufferMaximumSize uint64 - watchBufferWriteTimeout time.Duration - watchEnabled bool + watchEnabled bool client *spanner.Client config spannerOptions @@ -237,9 +234,6 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) ( client: client, config: config, database: database, - watchBufferWriteTimeout: config.watchBufferWriteTimeout, - watchChangeBufferMaximumSize: config.watchChangeBufferMaximumSize, - watchBufferLength: config.watchBufferLength, watchEnabled: !config.watchDisabled, cachedEstimatedBytesPerRelationship: atomic.Uint64{}, tableSizesStatsTable: tableSizesStatsTable, diff --git a/internal/datastore/spanner/spanner_test.go b/internal/datastore/spanner/spanner_test.go index 3a4872484..c7a747764 100644 --- a/internal/datastore/spanner/spanner_test.go +++ b/internal/datastore/spanner/spanner_test.go @@ -32,11 +32,11 @@ func TestSpannerDatastore(t *testing.T) { b := testdatastore.RunSpannerForTesting(t, "", "head") // Transaction tests are excluded because, for reasons unknown, one cannot read its own write in one transaction in the Spanner emulator. - test.AllWithExceptions(t, test.DatastoreTesterFunc(func(revisionQuantization, _, _ time.Duration, watchBufferLength uint16) (datastore.Datastore, error) { + test.AllWithExceptions(t, test.DatastoreTesterFunc(func(revisionQuantization, _, _ time.Duration, _ uint16) (datastore.Datastore, error) { ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { ds, err := NewSpannerDatastore(ctx, uri, RevisionQuantization(revisionQuantization), - WatchBufferLength(watchBufferLength)) + ) require.NoError(t, err) return ds }) diff --git a/internal/datastore/spanner/watch.go b/internal/datastore/spanner/watch.go index 1d07d4555..d8c2e22ca 100644 --- a/internal/datastore/spanner/watch.go +++ b/internal/datastore/spanner/watch.go @@ -18,6 +18,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/common" "github.com/authzed/spicedb/internal/datastore/revisions" + log "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/datastore" core "github.com/authzed/spicedb/pkg/proto/core/v1" "github.com/authzed/spicedb/pkg/spiceerrors" @@ -53,13 +54,17 @@ func parseDatabaseName(db string) (project, instance, database string, err error return matches[1], matches[2], matches[3], nil } -func (sd *spannerDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, opts datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { - watchBufferLength := opts.WatchBufferLength - if watchBufferLength == 0 { - watchBufferLength = sd.watchBufferLength +func (sd *spannerDatastore) DefaultsWatchOptions() datastore.WatchOptions { + return datastore.WatchOptions{ + WatchBufferLength: defaultWatchBufferLength, + WatchBufferWriteTimeout: defaultWatchBufferWriteTimeout, + // Spanner does not use WatchConnectTimeout + // Spanner does not support EmitImmediatelyStrategy } +} - updates := make(chan datastore.RevisionChanges, watchBufferLength) +func (sd *spannerDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, opts datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { + updates := make(chan datastore.RevisionChanges, opts.WatchBufferLength) errs := make(chan error, 2) // we may try to send >1 error if opts.EmissionStrategy == datastore.EmitImmediatelyStrategy { @@ -83,10 +88,13 @@ func (sd *spannerDatastore) watch( defer close(updates) defer close(errs) - // NOTE: 100ms is the minimum allowed. - heartbeatInterval := opts.CheckpointInterval - if heartbeatInterval < 100*time.Millisecond { - heartbeatInterval = 100 * time.Millisecond + if opts.CheckpointInterval < minimumCheckpointInterval { + log.Warn().Msgf("--watch-api-heartbeat set too small, using %d", minimumCheckpointInterval) + opts.CheckpointInterval = minimumCheckpointInterval + } + if opts.CheckpointInterval > maximumCheckpointInterval { + log.Warn().Msgf("--watch-api-heartbeat set too high, using %d", maximumCheckpointInterval) + opts.CheckpointInterval = maximumCheckpointInterval } sendError := func(err error) { @@ -108,11 +116,6 @@ func (sd *spannerDatastore) watch( return } - watchBufferWriteTimeout := opts.WatchBufferWriteTimeout - if watchBufferWriteTimeout <= 0 { - watchBufferWriteTimeout = sd.watchBufferWriteTimeout - } - sendChange := func(change datastore.RevisionChanges) bool { select { case updates <- change: @@ -122,7 +125,7 @@ func (sd *spannerDatastore) watch( // If we cannot immediately write, setup the timer and try again. } - timer := time.NewTimer(watchBufferWriteTimeout) + timer := time.NewTimer(opts.WatchBufferWriteTimeout) defer timer.Stop() select { @@ -155,7 +158,7 @@ func (sd *spannerDatastore) watch( CombinedChangeStreamName, changestreams.Config{ StartTimestamp: afterRevision.Time().Add(1 * time.Nanosecond), // records with commit_timestamp greater than or equal to start_timestamp will be returned - HeartbeatInterval: heartbeatInterval, + HeartbeatInterval: opts.CheckpointInterval, SpannerClientOptions: []option.ClientOption{ option.WithCredentialsFile(sd.config.credentialsFilePath), }, @@ -196,11 +199,6 @@ func (sd *spannerDatastore) watch( // but we only want to send them as *one* group. txnBuffer := xsync.NewMap[string, *common.Changes[revisions.TimestampRevision, int64]]() - watchBufferSize := opts.MaximumBufferedChangesByteSize - if watchBufferSize == 0 { - watchBufferSize = sd.watchChangeBufferMaximumSize - } - // NOTE: the callback below might be called concurrently across partitions. err = reader.Read(ctx, func(result *changestreams.ReadResult) error { // See: https://cloud.google.com/spanner/docs/change-streams/details @@ -211,7 +209,7 @@ func (sd *spannerDatastore) watch( modType := dcr.ModType // options are INSERT, UPDATE, DELETE // Get or create tracked changes for this transaction. - tracked, _ := txnBuffer.LoadOrStore(txnID, common.NewChanges(revisions.TimestampIDKeyFunc, opts.Content, watchBufferSize)) + tracked, _ := txnBuffer.LoadOrStore(txnID, common.NewChanges(revisions.TimestampIDKeyFunc, opts.Content, opts.MaximumBufferedChangesByteSize)) // See: https://cloud.google.com/spanner/docs/ttl // > TTL supports auditing its deletions through change streams. Change diff --git a/internal/services/server.go b/internal/services/server.go index d003235b5..d731ac5a7 100644 --- a/internal/services/server.go +++ b/internal/services/server.go @@ -1,8 +1,6 @@ package services import ( - "time" - "google.golang.org/grpc" healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/reflection" @@ -55,7 +53,7 @@ func RegisterGrpcServices( schemaServiceOption SchemaServiceOption, watchServiceOption WatchServiceOption, permSysConfig v1svc.PermissionsServerConfig, - watchHeartbeatDuration time.Duration, + watchServiceConfig v1svc.ServerWatchConfig, ) { healthManager.RegisterReportedService(OverallServerHealthCheckKey) @@ -64,7 +62,7 @@ func RegisterGrpcServices( healthManager.RegisterReportedService(v1.PermissionsService_ServiceDesc.ServiceName) if watchServiceOption == WatchServiceEnabled { - v1.RegisterWatchServiceServer(srv, v1svc.NewWatchServer(watchHeartbeatDuration)) + v1.RegisterWatchServiceServer(srv, v1svc.NewWatchServer(watchServiceConfig)) healthManager.RegisterReportedService(v1.WatchService_ServiceDesc.ServiceName) } diff --git a/internal/services/v1/watch.go b/internal/services/v1/watch.go index 38148a864..ef991c4ba 100644 --- a/internal/services/v1/watch.go +++ b/internal/services/v1/watch.go @@ -26,16 +26,24 @@ type watchServer struct { v1.UnimplementedWatchServiceServer shared.WithStreamServiceSpecificInterceptor - heartbeatDuration time.Duration + serverConfig ServerWatchConfig +} + +type ServerWatchConfig struct { + CheckpointInterval time.Duration + WatchBufferLength uint16 + WatchBufferWriteTimeout time.Duration + WatchConnectTimeout time.Duration + MaximumBufferedChangesByteSize string } // NewWatchServer creates an instance of the watch server. -func NewWatchServer(heartbeatDuration time.Duration) v1.WatchServiceServer { +func NewWatchServer(config ServerWatchConfig) v1.WatchServiceServer { s := &watchServer{ WithStreamServiceSpecificInterceptor: shared.WithStreamServiceSpecificInterceptor{ Stream: grpcvalidate.StreamServerInterceptor(), }, - heartbeatDuration: heartbeatDuration, + serverConfig: config, } return s } @@ -90,10 +98,22 @@ func (ws *watchServer) Watch(req *v1.WatchRequest, stream v1.WatchService_WatchS DispatchCount: 1, }) - updates, errchan := ds.Watch(ctx, afterRevision, datastore.WatchOptions{ - Content: convertWatchKindToContent(req.OptionalUpdateKinds), - CheckpointInterval: ws.heartbeatDuration, - }) + clientRequest := datastore.ClientWatchOptions{ + Content: convertWatchKindToContent(req.OptionalUpdateKinds), + } + dsConfig := datastore.ServerWatchOptions{ + CheckpointInterval: ws.serverConfig.CheckpointInterval, + WatchBufferLength: ws.serverConfig.WatchBufferLength, + WatchBufferWriteTimeout: ws.serverConfig.WatchBufferWriteTimeout, + WatchConnectTimeout: ws.serverConfig.WatchConnectTimeout, + MaximumBufferedChangesByteSize: ws.serverConfig.MaximumBufferedChangesByteSize, + } + watchOptions, err := datastore.BuildAndValidateWatchOptions(dsConfig, clientRequest, ds.DefaultsWatchOptions()) + if err != nil { + return err + } + + updates, errchan := ds.Watch(ctx, afterRevision, watchOptions) for { select { case update, ok := <-updates: diff --git a/pkg/cmd/datastore/datastore.go b/pkg/cmd/datastore/datastore.go index 51d09f3ac..0efc86d55 100644 --- a/pkg/cmd/datastore/datastore.go +++ b/pkg/cmd/datastore/datastore.go @@ -12,7 +12,6 @@ import ( "github.com/ccoveille/go-safecast/v2" "github.com/spf13/pflag" - "github.com/authzed/spicedb/internal/datastore/common" "github.com/authzed/spicedb/internal/datastore/crdb" "github.com/authzed/spicedb/internal/datastore/memdb" "github.com/authzed/spicedb/internal/datastore/mysql" @@ -301,10 +300,10 @@ func RegisterDatastoreFlagsWithPrefix(flagSet *pflag.FlagSet, prefix string, opt flagSet.StringVar(&opts.TablePrefix, flagName("datastore-mysql-table-prefix"), "", "prefix to add to the name of all SpiceDB database tables") flagSet.StringVar(&opts.MigrationPhase, flagName("datastore-migration-phase"), "", "datastore-specific flag that should be used to signal to a datastore which phase of a multi-step migration it is in") flagSet.StringArrayVar(&opts.AllowedMigrations, flagName("datastore-allowed-migrations"), []string{}, "migration levels that will not fail the health check (in addition to the current head migration)") - flagSet.Uint16Var(&opts.WatchBufferLength, flagName("datastore-watch-buffer-length"), 1024, "how large the watch buffer should be before blocking") - flagSet.StringVar(&opts.WatchChangeBufferMaximumSize, flagName("datastore-watch-change-buffer-maximum-size"), "15%", "how much memory to reserve for the watch change buffer, either as a quantity of bytes (e.g. 5Gi) or a percentage of available memory (e.g. 50%). if this value is exceeded, the watch will error and must be restarted.") - flagSet.DurationVar(&opts.WatchBufferWriteTimeout, flagName("datastore-watch-buffer-write-timeout"), 1*time.Second, "how long the watch buffer should queue before forcefully disconnecting the reader") - flagSet.DurationVar(&opts.WatchConnectTimeout, flagName("datastore-watch-connect-timeout"), 1*time.Second, "how long the watch connection to the underlying datastore should wait before timing out (CockroachDB driver only)") + flagSet.Uint16Var(&opts.WatchBufferLength, flagName("datastore-watch-buffer-length"), defaults.WatchBufferLength, "how large the watch buffer should be before blocking") + flagSet.StringVar(&opts.WatchChangeBufferMaximumSize, flagName("datastore-watch-change-buffer-maximum-size"), defaults.WatchChangeBufferMaximumSize, "how much memory to reserve for the watch change buffer, either as a quantity of bytes (e.g. 5Gi) or a percentage of available memory (e.g. 50%). if this value is exceeded, the watch will error and must be restarted.") + flagSet.DurationVar(&opts.WatchBufferWriteTimeout, flagName("datastore-watch-buffer-write-timeout"), defaults.WatchBufferWriteTimeout, "how long the watch buffer should queue before forcefully disconnecting the reader") + flagSet.DurationVar(&opts.WatchConnectTimeout, flagName("datastore-watch-connect-timeout"), defaults.WatchConnectTimeout, "how long the watch connection to the underlying datastore should wait before timing out (CockroachDB driver only)") flagSet.BoolVar(&opts.DisableWatchSupport, flagName("datastore-disable-watch-support"), false, "disable watch support (only enable if you absolutely do not need watch)") flagSet.BoolVar(&opts.IncludeQueryParametersInTraces, flagName("datastore-include-query-parameters-in-traces"), false, "include query parameters in traces (Postgres and CockroachDB drivers only)") flagSet.DurationVar(&opts.WriteAcquisitionTimeout, flagName("write-conn-acquisition-timeout"), defaults.WriteAcquisitionTimeout, "amount of time that the server will wait for a connection to the datastore to become available when performing a write operation before throwing a ResourceExhausted error. 0 means wait indefinitely. (CockroachDB driver only)") @@ -537,11 +536,6 @@ func newCRDBDatastore(ctx context.Context, opts Config) (datastore.Datastore, er return nil, errors.New("max-retries could not be cast to uint8") } - watchChangeBufferMaximumSize, err := common.WatchBufferSize(opts.WatchChangeBufferMaximumSize) - if err != nil { - return nil, err - } - return crdb.NewCRDBDatastore( ctx, opts.URI, @@ -565,10 +559,6 @@ func newCRDBDatastore(ctx context.Context, opts Config) (datastore.Datastore, er crdb.MaxRetries(maxRetries), crdb.OverlapKey(opts.OverlapKey), crdb.OverlapStrategy(opts.OverlapStrategy), - crdb.WatchBufferLength(opts.WatchBufferLength), - crdb.WatchChangeBufferMaximumSize(watchChangeBufferMaximumSize), - crdb.WatchBufferWriteTimeout(opts.WatchBufferWriteTimeout), - crdb.WatchConnectTimeout(opts.WatchConnectTimeout), crdb.WithEnablePrometheusStats(opts.EnableDatastoreMetrics), crdb.WithEnableConnectionBalancing(opts.EnableConnectionBalancing), crdb.ConnectRate(opts.ConnectRate), @@ -613,18 +603,12 @@ func commonPostgresDatastoreOptions(opts Config) ([]postgres.Option, error) { return nil, errors.New("max-retries could not be cast to uint8") } - watchChangeBufferMaximumSize, err := common.WatchBufferSize(opts.WatchChangeBufferMaximumSize) - if err != nil { - return nil, err - } - return []postgres.Option{ postgres.EnableTracing(), postgres.WithEnablePrometheusStats(opts.EnableDatastoreMetrics), postgres.MaxRetries(maxRetries), postgres.FilterMaximumIDCount(opts.FilterMaximumIDCount), postgres.WithColumnOptimization(opts.ExperimentalColumnOptimization), - postgres.WatchChangeBufferMaximumSize(watchChangeBufferMaximumSize), postgres.IncludeQueryParametersInTraces(opts.IncludeQueryParametersInTraces), }, nil } @@ -671,8 +655,6 @@ func newPostgresPrimaryDatastore(ctx context.Context, opts Config) (datastore.Da postgres.WriteConnHealthCheckInterval(opts.WriteConnPool.HealthCheckInterval), postgres.GCInterval(opts.GCInterval), postgres.GCMaxOperationTime(opts.GCMaxOperationTime), - postgres.WatchBufferLength(opts.WatchBufferLength), - postgres.WatchBufferWriteTimeout(opts.WatchBufferWriteTimeout), postgres.WithWatchDisabled(opts.DisableWatchSupport), postgres.MigrationPhase(opts.MigrationPhase), postgres.AllowedMigrations(opts.AllowedMigrations), @@ -698,11 +680,6 @@ func newSpannerDatastore(ctx context.Context, opts Config) (datastore.Datastore, metricsOption = spanner.DatastoreMetricsOptionNone } - watchChangeBufferMaximumSize, err := common.WatchBufferSize(opts.WatchChangeBufferMaximumSize) - if err != nil { - return nil, err - } - return spanner.NewSpannerDatastore( ctx, opts.URI, @@ -711,9 +688,6 @@ func newSpannerDatastore(ctx context.Context, opts Config) (datastore.Datastore, spanner.MaxRevisionStalenessPercent(opts.MaxRevisionStalenessPercent), spanner.CredentialsFile(opts.SpannerCredentialsFile), spanner.CredentialsJSON(opts.SpannerCredentialsJSON), - spanner.WatchBufferLength(opts.WatchBufferLength), - spanner.WatchChangeBufferMaximumSize(watchChangeBufferMaximumSize), - spanner.WatchBufferWriteTimeout(opts.WatchBufferWriteTimeout), spanner.EmulatorHost(opts.SpannerEmulatorHost), spanner.DisableStats(opts.DisableStats), spanner.WithDatastoreMetricsOption(metricsOption), @@ -761,19 +735,11 @@ func commonMySQLDatastoreOptions(opts Config) ([]mysql.Option, error) { return nil, errors.New("max-retries could not be cast to uint8") } - watchChangeBufferMaximumSize, err := common.WatchBufferSize(opts.WatchChangeBufferMaximumSize) - if err != nil { - return nil, err - } - return []mysql.Option{ mysql.TablePrefix(opts.TablePrefix), mysql.MaxRetries(maxRetries), mysql.OverrideLockWaitTimeout(1), mysql.WithEnablePrometheusStats(opts.EnableDatastoreMetrics), - mysql.WatchBufferLength(opts.WatchBufferLength), - mysql.WatchBufferWriteTimeout(opts.WatchBufferWriteTimeout), - mysql.WatchChangeBufferMaximumSize(watchChangeBufferMaximumSize), mysql.MaxRevisionStalenessPercent(opts.MaxRevisionStalenessPercent), mysql.RevisionQuantization(opts.RevisionQuantization), mysql.FilterMaximumIDCount(opts.FilterMaximumIDCount), diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 1599035aa..cb439955b 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -462,7 +462,13 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) { v1SchemaServiceOption, watchServiceOption, permSysConfig, - c.WatchHeartbeat, + v1svc.ServerWatchConfig{ + CheckpointInterval: c.WatchHeartbeat, + WatchBufferLength: c.DatastoreConfig.WatchBufferLength, + WatchBufferWriteTimeout: c.DatastoreConfig.WatchBufferWriteTimeout, + WatchConnectTimeout: c.DatastoreConfig.WatchConnectTimeout, + MaximumBufferedChangesByteSize: c.DatastoreConfig.WatchChangeBufferMaximumSize, + }, ) }, grpc.ChainUnaryInterceptor(unaryMiddleware...), diff --git a/pkg/cmd/testserver/testserver.go b/pkg/cmd/testserver/testserver.go index 611beb156..70cd3f9d7 100644 --- a/pkg/cmd/testserver/testserver.go +++ b/pkg/cmd/testserver/testserver.go @@ -3,7 +3,6 @@ package testserver import ( "context" "fmt" - "time" "github.com/rs/zerolog" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -111,7 +110,7 @@ func (c *Config) Complete(ctx context.Context) (RunnableTestServer, error) { ExpiringRelationshipsEnabled: true, CaveatTypeSet: cts, }, - 1*time.Second, + v1svc.ServerWatchConfig{}, ) } diff --git a/pkg/datastore/datastore.go b/pkg/datastore/datastore.go index c708a88ae..5f9b65263 100644 --- a/pkg/datastore/datastore.go +++ b/pkg/datastore/datastore.go @@ -8,9 +8,12 @@ import ( "iter" "slices" "sort" + "strconv" "strings" "time" + "github.com/dustin/go-humanize" + "github.com/pbnjay/memory" "github.com/rs/zerolog" "google.golang.org/protobuf/types/known/structpb" @@ -21,6 +24,13 @@ import ( "github.com/authzed/spicedb/pkg/tuple" ) +// At startup, measure 75% of available free memory. +var freeMemory uint64 + +func init() { + freeMemory = memory.FreeMemory() / 100 * 75 +} + var Engines []string // SortedEngineIDs returns the full set of engine IDs, sorted. @@ -592,39 +602,57 @@ const ( WatchCheckpoints WatchContent = 1 << 2 ) -// WatchOptions are options for a Watch call. -type WatchOptions struct { - // Content is the content to watch. - Content WatchContent - +// ServerWatchOptions contains server-level configuration for Watch operations. +// These values do NOT change during the lifetime of a server. +type ServerWatchOptions struct { // CheckpointInterval is the interval to use for checkpointing in the watch. - // If given the zero value, the datastore's default will be used. If smaller - // than the datastore's minimum, the minimum will be used. CheckpointInterval time.Duration - // WatchBufferLength is the length of the buffer for the watch channel. If - // given the zero value, the datastore's default will be used. + // WatchBufferLength is the length of the buffer for the watch channel. WatchBufferLength uint16 // WatchBufferWriteTimeout is the timeout for writing to the watch channel. - // If given the zero value, the datastore's default will be used. WatchBufferWriteTimeout time.Duration // WatchConnectTimeout is the timeout for connecting to the watch channel. - // If given the zero value, the datastore's default will be used. - // May not be supported by the datastore. WatchConnectTimeout time.Duration // MaximumBufferedChangesByteSize is the maximum byte size of the buffered changes struct. - // If unspecified, no maximum will be enforced. If the maximum is reached before + // If the maximum is reached before // the changes can be sent, the watch will be closed with an error. - MaximumBufferedChangesByteSize uint64 + MaximumBufferedChangesByteSize string +} + +// ClientWatchOptions contains client-specified options for a Watch operation. +// These values come from the client API request. +type ClientWatchOptions struct { + // Content is the content to watch. + Content WatchContent - // EmissionStrategy defines when are changes streamed to the client. If unspecified, changes will be buffered until - // they can be checkpointed, which is the default behavior. + // EmissionStrategy defines when are changes streamed to the client. + // If unspecified, changes will be buffered until they can be checkpointed, which is the default behavior. EmissionStrategy EmissionStrategy } +// WatchOptions are ALL options for a Watch call. +// Some datastore implementations may ignore one or more of these. +type WatchOptions struct { + // See ClientWatchOptions.Content + Content WatchContent + // See ClientWatchOptions.EmissionStrategy + EmissionStrategy EmissionStrategy + // See ServerWatchOptions.CheckpointInterval + CheckpointInterval time.Duration + // See ServerWatchOptions.WatchBufferLength + WatchBufferLength uint16 + // See ServerWatchOptions.WatchBufferWriteTimeout + WatchBufferWriteTimeout time.Duration + // See ServerWatchOptions.WatchConnectTimeout + WatchConnectTimeout time.Duration + // See ServerWatchOptions.MaximumBufferedChangesByteSize + MaximumBufferedChangesByteSize uint64 +} + // EmissionStrategy describes when changes are emitted to the client. type EmissionStrategy int @@ -641,26 +669,111 @@ const ( ) // WatchJustRelationships returns watch options for just relationships. -func WatchJustRelationships() WatchOptions { - return WatchOptions{ - Content: WatchRelationships, - } +func WatchJustRelationships(ds Datastore) WatchOptions { + v, _ := BuildAndValidateWatchOptions( + ServerWatchOptions{}, + ClientWatchOptions{Content: WatchRelationships}, + ds.DefaultsWatchOptions(), + ) + return v } // WatchJustSchema returns watch options for just schema. -func WatchJustSchema() WatchOptions { - return WatchOptions{ - Content: WatchSchema, +func WatchJustSchema(ds ReadOnlyDatastore) WatchOptions { + v, _ := BuildAndValidateWatchOptions( + ServerWatchOptions{}, + ClientWatchOptions{Content: WatchSchema}, + ds.DefaultsWatchOptions(), + ) + return v +} + +var errOverHundredPercent = errors.New("percentage greater than 100") + +func parsePercent(str string, freeMem uint64) (uint64, error) { + percent := strings.TrimSuffix(str, "%") + parsedPercent, err := strconv.ParseUint(percent, 10, 64) + if err != nil { + return 0, fmt.Errorf("failed to parse percentage: %w", err) + } + + if parsedPercent > 100 { + return 0, errOverHundredPercent } + + return freeMem / 100 * parsedPercent, nil } -// WithCheckpointInterval sets the checkpoint interval on a watch options, returning -// an updated options struct. -func (wo WatchOptions) WithCheckpointInterval(interval time.Duration) WatchOptions { - return WatchOptions{ - Content: wo.Content, - CheckpointInterval: interval, +// watchBufferSize takes a string and interprets it as +// either a percentage of memory (as a percentage of +// 75% of free memory as measured on startup) +// or a humanized byte string and returns the number of +// bytes or an error if the value cannot be interpreted. +// Returns 0 on an empty string. +func watchBufferSize(sizeString string) (size uint64, err error) { + if sizeString == "" { + return 0, nil } + + if strings.HasSuffix(sizeString, "%") { + size, err := parsePercent(sizeString, freeMemory) + if err != nil { + return 0, fmt.Errorf("could not parse %s as percentage: %w", sizeString, err) + } + return size, nil + } + + size, err = humanize.ParseBytes(sizeString) + if err != nil { + return 0, fmt.Errorf("could not parse %s as a number of bytes: %w", sizeString, err) + } + return size, nil +} + +// BuildAndValidateWatchOptions constructs complete WatchOptions by merging server options, +// client options, and datastore defaults. +// Datastore defaults take precedence over server options. +// Client options cannot be overridden. +func BuildAndValidateWatchOptions( + serverOptions ServerWatchOptions, + clientOptions ClientWatchOptions, + datastoreDefaults WatchOptions, +) (WatchOptions, error) { + watchChangeBufferMaximumSize, err := watchBufferSize(serverOptions.MaximumBufferedChangesByteSize) + if err != nil { + return WatchOptions{}, err + } + + options := WatchOptions{ + Content: clientOptions.Content, + EmissionStrategy: clientOptions.EmissionStrategy, + CheckpointInterval: serverOptions.CheckpointInterval, + WatchBufferLength: serverOptions.WatchBufferLength, + WatchBufferWriteTimeout: serverOptions.WatchBufferWriteTimeout, + WatchConnectTimeout: serverOptions.WatchConnectTimeout, + MaximumBufferedChangesByteSize: watchChangeBufferMaximumSize, + } + + if datastoreDefaults.CheckpointInterval > 0 { + options.CheckpointInterval = datastoreDefaults.CheckpointInterval + } + if options.CheckpointInterval < 0 { + return WatchOptions{}, errors.New("invalid checkpoint interval given") + } + if datastoreDefaults.WatchBufferLength > 0 { + options.WatchBufferLength = datastoreDefaults.WatchBufferLength + } + if datastoreDefaults.WatchBufferWriteTimeout > 0 { + options.WatchBufferWriteTimeout = datastoreDefaults.WatchBufferWriteTimeout + } + if datastoreDefaults.WatchConnectTimeout > 0 { + options.WatchConnectTimeout = datastoreDefaults.WatchConnectTimeout + } + if datastoreDefaults.MaximumBufferedChangesByteSize > 0 { + options.MaximumBufferedChangesByteSize = datastoreDefaults.MaximumBufferedChangesByteSize + } + + return options, nil } // ReadOnlyDatastore is an interface for reading relationships from the datastore. @@ -696,7 +809,8 @@ type ReadOnlyDatastore interface { RevisionFromString(serialized string) (Revision, error) // Watch notifies the caller about changes to the datastore, based on the specified options. - // + // The specified options must be built and validated by the caller. + // Some datastores may intentionally ignore some options. // All events following afterRevision will be sent to the caller. Changes made *in* afterRevision will not be included. // // When the changes channel is closed, callers MUST discard any changes received (they will be the zero value). @@ -711,6 +825,11 @@ type ReadOnlyDatastore interface { // - Other errors - the watch should not be retried due to a fatal error. Watch(ctx context.Context, afterRevision Revision, options WatchOptions) (<-chan RevisionChanges, <-chan error) + // DefaultsWatchOptions returns the default watch options for this datastore. + // These defaults are used when building WatchOptions from ServerWatchOptions and ClientWatchOptions. + // Each datastore should return appropriate defaults based on its capabilities and constraints. + DefaultsWatchOptions() WatchOptions + // ReadyState returns a state indicating whether the datastore is ready to accept data. // Datastores that require database schema creation will return not-ready until the migrations // have been run to create the necessary tables. diff --git a/pkg/datastore/datastore_test.go b/pkg/datastore/datastore_test.go index aa3982b5b..c50bc00d2 100644 --- a/pkg/datastore/datastore_test.go +++ b/pkg/datastore/datastore_test.go @@ -3,6 +3,7 @@ package datastore import ( "context" "testing" + "time" "github.com/stretchr/testify/require" @@ -12,6 +13,37 @@ import ( "github.com/authzed/spicedb/pkg/tuple" ) +func TestBuildAndValidateWatchOptions(t *testing.T) { + t.Run("WatchBufferWriteTimeout", func(t *testing.T) { + c, err := BuildAndValidateWatchOptions(ServerWatchOptions{WatchBufferWriteTimeout: 1 * time.Second}, ClientWatchOptions{}, WatchOptions{WatchBufferWriteTimeout: 2 * time.Second}) + require.NoError(t, err) + require.NotNil(t, c) + require.Equal(t, 2*time.Second, c.WatchBufferWriteTimeout) + }) + t.Run("WatchConnectTimeout", func(t *testing.T) { + c, err := BuildAndValidateWatchOptions(ServerWatchOptions{WatchConnectTimeout: 1 * time.Second}, ClientWatchOptions{}, WatchOptions{WatchConnectTimeout: 2 * time.Second}) + require.NoError(t, err) + require.NotNil(t, c) + require.Equal(t, 2*time.Second, c.WatchConnectTimeout) + }) + t.Run("WatchBufferLength", func(t *testing.T) { + c, err := BuildAndValidateWatchOptions(ServerWatchOptions{WatchBufferLength: 100}, ClientWatchOptions{}, WatchOptions{WatchBufferLength: 200}) + require.NoError(t, err) + require.NotNil(t, c) + require.Equal(t, uint16(200), c.WatchBufferLength) + }) + t.Run("CheckpointInterval", func(t *testing.T) { + c, err := BuildAndValidateWatchOptions(ServerWatchOptions{CheckpointInterval: 1}, ClientWatchOptions{}, WatchOptions{CheckpointInterval: 2}) + require.NoError(t, err) + require.NotNil(t, c) + require.Equal(t, time.Duration(2), c.CheckpointInterval) + }) + t.Run("Invalid CheckpointInterval", func(t *testing.T) { + _, err := BuildAndValidateWatchOptions(ServerWatchOptions{CheckpointInterval: -1}, ClientWatchOptions{}, WatchOptions{CheckpointInterval: -1}) + require.Error(t, err) + }) +} + func TestRelationshipsFilterFromPublicFilter(t *testing.T) { tests := []struct { name string @@ -641,6 +673,10 @@ func (f fakeDatastore) RevisionFromString(_ string) (Revision, error) { return nil, nil } +func (f fakeDatastore) DefaultsWatchOptions() WatchOptions { + return WatchOptions{} +} + func (f fakeDatastore) Watch(_ context.Context, _ Revision, _ WatchOptions) (<-chan RevisionChanges, <-chan error) { panic("should never be called") } diff --git a/pkg/datastore/mocks/mock_datastore.go b/pkg/datastore/mocks/mock_datastore.go index 3f954dcd0..3e741630a 100644 --- a/pkg/datastore/mocks/mock_datastore.go +++ b/pkg/datastore/mocks/mock_datastore.go @@ -734,6 +734,20 @@ func (mr *MockReadOnlyDatastoreMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockReadOnlyDatastore)(nil).Close)) } +// DefaultsWatchOptions mocks base method. +func (m *MockReadOnlyDatastore) DefaultsWatchOptions() datastore.WatchOptions { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DefaultsWatchOptions") + ret0, _ := ret[0].(datastore.WatchOptions) + return ret0 +} + +// DefaultsWatchOptions indicates an expected call of DefaultsWatchOptions. +func (mr *MockReadOnlyDatastoreMockRecorder) DefaultsWatchOptions() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DefaultsWatchOptions", reflect.TypeOf((*MockReadOnlyDatastore)(nil).DefaultsWatchOptions)) +} + // Features mocks base method. func (m *MockReadOnlyDatastore) Features(ctx context.Context) (*datastore.Features, error) { m.ctrl.T.Helper() @@ -950,6 +964,20 @@ func (mr *MockDatastoreMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockDatastore)(nil).Close)) } +// DefaultsWatchOptions mocks base method. +func (m *MockDatastore) DefaultsWatchOptions() datastore.WatchOptions { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DefaultsWatchOptions") + ret0, _ := ret[0].(datastore.WatchOptions) + return ret0 +} + +// DefaultsWatchOptions indicates an expected call of DefaultsWatchOptions. +func (mr *MockDatastoreMockRecorder) DefaultsWatchOptions() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DefaultsWatchOptions", reflect.TypeOf((*MockDatastore)(nil).DefaultsWatchOptions)) +} + // Features mocks base method. func (m *MockDatastore) Features(ctx context.Context) (*datastore.Features, error) { m.ctrl.T.Helper() @@ -1271,6 +1299,20 @@ func (mr *MockSQLDatastoreMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSQLDatastore)(nil).Close)) } +// DefaultsWatchOptions mocks base method. +func (m *MockSQLDatastore) DefaultsWatchOptions() datastore.WatchOptions { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DefaultsWatchOptions") + ret0, _ := ret[0].(datastore.WatchOptions) + return ret0 +} + +// DefaultsWatchOptions indicates an expected call of DefaultsWatchOptions. +func (mr *MockSQLDatastoreMockRecorder) DefaultsWatchOptions() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DefaultsWatchOptions", reflect.TypeOf((*MockSQLDatastore)(nil).DefaultsWatchOptions)) +} + // Features mocks base method. func (m *MockSQLDatastore) Features(ctx context.Context) (*datastore.Features, error) { m.ctrl.T.Helper() @@ -1536,6 +1578,20 @@ func (mr *MockStrictReadDatastoreMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockStrictReadDatastore)(nil).Close)) } +// DefaultsWatchOptions mocks base method. +func (m *MockStrictReadDatastore) DefaultsWatchOptions() datastore.WatchOptions { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DefaultsWatchOptions") + ret0, _ := ret[0].(datastore.WatchOptions) + return ret0 +} + +// DefaultsWatchOptions indicates an expected call of DefaultsWatchOptions. +func (mr *MockStrictReadDatastoreMockRecorder) DefaultsWatchOptions() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DefaultsWatchOptions", reflect.TypeOf((*MockStrictReadDatastore)(nil).DefaultsWatchOptions)) +} + // Features mocks base method. func (m *MockStrictReadDatastore) Features(ctx context.Context) (*datastore.Features, error) { m.ctrl.T.Helper() @@ -1786,6 +1842,20 @@ func (mr *MockStartableDatastoreMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockStartableDatastore)(nil).Close)) } +// DefaultsWatchOptions mocks base method. +func (m *MockStartableDatastore) DefaultsWatchOptions() datastore.WatchOptions { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DefaultsWatchOptions") + ret0, _ := ret[0].(datastore.WatchOptions) + return ret0 +} + +// DefaultsWatchOptions indicates an expected call of DefaultsWatchOptions. +func (mr *MockStartableDatastoreMockRecorder) DefaultsWatchOptions() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DefaultsWatchOptions", reflect.TypeOf((*MockStartableDatastore)(nil).DefaultsWatchOptions)) +} + // Features mocks base method. func (m *MockStartableDatastore) Features(ctx context.Context) (*datastore.Features, error) { m.ctrl.T.Helper() @@ -2036,6 +2106,20 @@ func (mr *MockRepairableDatastoreMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockRepairableDatastore)(nil).Close)) } +// DefaultsWatchOptions mocks base method. +func (m *MockRepairableDatastore) DefaultsWatchOptions() datastore.WatchOptions { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DefaultsWatchOptions") + ret0, _ := ret[0].(datastore.WatchOptions) + return ret0 +} + +// DefaultsWatchOptions indicates an expected call of DefaultsWatchOptions. +func (mr *MockRepairableDatastoreMockRecorder) DefaultsWatchOptions() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DefaultsWatchOptions", reflect.TypeOf((*MockRepairableDatastore)(nil).DefaultsWatchOptions)) +} + // Features mocks base method. func (m *MockRepairableDatastore) Features(ctx context.Context) (*datastore.Features, error) { m.ctrl.T.Helper() diff --git a/pkg/datastore/test/caveat.go b/pkg/datastore/test/caveat.go index 0820d507e..8cc59c2a4 100644 --- a/pkg/datastore/test/caveat.go +++ b/pkg/datastore/test/caveat.go @@ -349,7 +349,7 @@ func expectRelChange(t *testing.T, ds datastore.Datastore, revBeforeWrite datast ctx, cancel := context.WithCancel(t.Context()) defer cancel() - chanRevisionChanges, chanErr := ds.Watch(ctx, revBeforeWrite, datastore.WatchJustRelationships()) + chanRevisionChanges, chanErr := ds.Watch(ctx, revBeforeWrite, datastore.WatchJustRelationships(ds)) require.Empty(t, chanErr) changeWait := time.NewTimer(waitForChangesTimeout) diff --git a/pkg/datastore/test/watch.go b/pkg/datastore/test/watch.go index be3f9781f..bfc5a9ea8 100644 --- a/pkg/datastore/test/watch.go +++ b/pkg/datastore/test/watch.go @@ -242,7 +242,7 @@ func WatchCancelTest(t *testing.T, tester DatastoreTester) { startWatchRevision := setupDatastore(ds, require) ctx, cancel := context.WithCancel(t.Context()) - changes, errchan := ds.Watch(ctx, startWatchRevision, datastore.WatchJustRelationships()) + changes, errchan := ds.Watch(ctx, startWatchRevision, datastore.WatchJustRelationships(ds)) require.Empty(errchan) _, err = common.WriteRelationships(ctx, ds, tuple.UpdateOperationCreate, makeTestRel("test", "test")) @@ -294,7 +294,7 @@ func WatchWithTouchTest(t *testing.T, tester DatastoreTester) { require.NoError(err) // TOUCH a relationship and ensure watch sees it. - changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchJustRelationships()) + changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchJustRelationships(ds)) require.Empty(errchan) afterTouchRevision, err := common.WriteRelationships(ctx, ds, tuple.UpdateOperationTouch, @@ -323,7 +323,7 @@ func WatchWithTouchTest(t *testing.T, tester DatastoreTester) { ) // TOUCH the relationship again with no changes and ensure it does *not* appear in the watch. - changes, errchan = ds.Watch(ctx, afterTouchRevision, datastore.WatchJustRelationships()) + changes, errchan = ds.Watch(ctx, afterTouchRevision, datastore.WatchJustRelationships(ds)) require.Empty(errchan) _, err = common.WriteRelationships(ctx, ds, tuple.UpdateOperationTouch, tuple.MustParse("document:firstdoc#viewer@user:tom")) @@ -342,7 +342,7 @@ func WatchWithTouchTest(t *testing.T, tester DatastoreTester) { ) // TOUCH the relationship again with a caveat name change and ensure it does appear in the watch. - changes, errchan = ds.Watch(ctx, afterTouchRevision, datastore.WatchJustRelationships()) + changes, errchan = ds.Watch(ctx, afterTouchRevision, datastore.WatchJustRelationships(ds)) require.Empty(errchan) afterNameChange, err := common.WriteRelationships(ctx, ds, tuple.UpdateOperationTouch, tuple.MustParse("document:firstdoc#viewer@user:tom[somecaveat]")) @@ -363,7 +363,7 @@ func WatchWithTouchTest(t *testing.T, tester DatastoreTester) { ) // TOUCH the relationship again with a caveat context change and ensure it does appear in the watch. - changes, errchan = ds.Watch(ctx, afterNameChange, datastore.WatchJustRelationships()) + changes, errchan = ds.Watch(ctx, afterNameChange, datastore.WatchJustRelationships(ds)) require.Empty(errchan) _, err = common.WriteRelationships(ctx, ds, tuple.UpdateOperationTouch, tuple.MustParse("document:firstdoc#viewer@user:tom[somecaveat:{\"somecondition\": 42}]")) @@ -398,7 +398,7 @@ func WatchWithExpirationTest(t *testing.T, tester DatastoreTester) { lowestRevision, err := ds.HeadRevision(ctx) require.NoError(err) - changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchJustRelationships()) + changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchJustRelationships(ds)) require.Empty(errchan) metadata, err := structpb.NewStruct(map[string]any{"somekey": "somevalue"}) @@ -443,7 +443,7 @@ func WatchWithMetadataTest(t *testing.T, tester DatastoreTester) { lowestRevision, err := ds.HeadRevision(ctx) require.NoError(err) - changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchJustRelationships()) + changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchJustRelationships(ds)) require.Empty(errchan) metadata, err := structpb.NewStruct(map[string]any{"somekey": "somevalue"}) @@ -483,7 +483,7 @@ func WatchWithDeleteTest(t *testing.T, tester DatastoreTester) { require.NoError(err) // TOUCH a relationship and ensure watch sees it. - changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchJustRelationships()) + changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchJustRelationships(ds)) require.Empty(errchan) afterTouchRevision, err := common.WriteRelationships(ctx, ds, tuple.UpdateOperationTouch, @@ -512,7 +512,7 @@ func WatchWithDeleteTest(t *testing.T, tester DatastoreTester) { ) // DELETE the relationship - changes, errchan = ds.Watch(ctx, afterTouchRevision, datastore.WatchJustRelationships()) + changes, errchan = ds.Watch(ctx, afterTouchRevision, datastore.WatchJustRelationships(ds)) require.Empty(errchan) _, err = common.WriteRelationships(ctx, ds, tuple.UpdateOperationDelete, tuple.MustParse("document:firstdoc#viewer@user:tom")) @@ -574,7 +574,7 @@ func WatchSchemaTest(t *testing.T, tester DatastoreTester) { lowestRevision, err := ds.HeadRevision(ctx) require.NoError(err) - changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchJustSchema()) + changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchJustSchema(ds)) require.Empty(errchan) // Addition @@ -801,6 +801,7 @@ func WatchCheckpointsTest(t *testing.T, tester DatastoreTester) { changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchOptions{ Content: datastore.WatchCheckpoints | datastore.WatchRelationships | datastore.WatchSchema, CheckpointInterval: 100 * time.Millisecond, + WatchBufferLength: 1000, }) require.Empty(errchan)