Skip to content

Commit 6423b58

Browse files
committed
chore: refactor Watch API flags (close #2861)
1 parent 034d32d commit 6423b58

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+579
-597
lines changed

e2e/go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ require (
4343
github.com/creasty/defaults v1.8.0 // indirect
4444
github.com/dave/jennifer v1.7.1 // indirect
4545
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
46+
github.com/dustin/go-humanize v1.0.1 // indirect
4647
github.com/emirpasic/gods v1.18.1 // indirect
4748
github.com/envoyproxy/protoc-gen-validate v1.3.0 // indirect
4849
github.com/fatih/structtag v1.2.0 // indirect
@@ -60,6 +61,7 @@ require (
6061
github.com/mattn/go-isatty v0.0.20 // indirect
6162
github.com/onsi/ginkgo/v2 v2.26.0 // indirect
6263
github.com/onsi/gomega v1.38.2 // indirect
64+
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
6365
github.com/planetscale/vtprotobuf v0.6.1-0.20240917153116-6f2963f01587 // indirect
6466
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
6567
github.com/rs/zerolog v1.34.0 // indirect

e2e/go.sum

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ github.com/brianvoe/gofakeit/v6 v6.28.0 h1:Xib46XXuQfmlLS2EXRuJpqcw8St6qSZz75OUo
8989
github.com/brianvoe/gofakeit/v6 v6.28.0/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs=
9090
github.com/ccoveille/go-safecast/v2 v2.0.0 h1:+5eyITXAUj3wMjad6cRVJKGnC7vDS55zk0INzJagub0=
9191
github.com/ccoveille/go-safecast/v2 v2.0.0/go.mod h1:JIYA4CAR33blIDuE6fSwCp2sz1oOBahXnvmdBhOAABs=
92-
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
9392
github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM=
9493
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
9594
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@@ -116,6 +115,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
116115
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
117116
github.com/dlmiddlecote/sqlstats v1.0.2 h1:gSU11YN23D/iY50A2zVYwgXgy072khatTsIW6UPjUtI=
118117
github.com/dlmiddlecote/sqlstats v1.0.2/go.mod h1:0CWaIh/Th+z2aI6Q9Jpfg/o21zmGxWhbByHgQSCUQvY=
118+
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
119+
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
119120
github.com/ecordell/optgen v0.2.3 h1:DXuT9cYRInIJEh/dIOuLPgi7gYXrlfjzV/KsD80CXLE=
120121
github.com/ecordell/optgen v0.2.3/go.mod h1:pqjipFkG6vAwvKgjPGWaZyqmtWAqdb2w6EcTnP+kgqQ=
121122
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
@@ -235,6 +236,8 @@ github.com/onsi/ginkgo/v2 v2.26.0/go.mod h1:qhEywmzWTBUY88kfO0BRvX4py7scov9yR+Az
235236
github.com/onsi/gomega v1.38.2 h1:eZCjf2xjZAqe+LeWvKb5weQ+NcPwX84kqJ0cZNxok2A=
236237
github.com/onsi/gomega v1.38.2/go.mod h1:W2MJcYxRGV63b418Ai34Ud0hEdTVXq9NW9+Sx6uXf3k=
237238
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
239+
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
240+
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
238241
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
239242
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
240243
github.com/planetscale/vtprotobuf v0.6.1-0.20240917153116-6f2963f01587 h1:xzZOeCMQLA/W198ZkdVdt4EKFKJtS26B773zNU377ZY=

internal/datastore/common/helpers.go

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,15 @@ package common
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
7-
"strconv"
8-
"strings"
96

10-
"github.com/dustin/go-humanize"
11-
"github.com/pbnjay/memory"
127
"google.golang.org/protobuf/types/known/structpb"
138

149
"github.com/authzed/spicedb/pkg/datastore"
1510
core "github.com/authzed/spicedb/pkg/proto/core/v1"
1611
"github.com/authzed/spicedb/pkg/tuple"
1712
)
1813

19-
// At startup, measure 75% of available free memory.
20-
var freeMemory uint64
21-
22-
func init() {
23-
freeMemory = memory.FreeMemory() / 100 * 75
24-
}
25-
2614
// WriteRelationships is a convenience method to perform the same update operation on a set of relationships
2715
func WriteRelationships(ctx context.Context, ds datastore.Datastore, op tuple.UpdateOperation, rels ...tuple.Relationship) (datastore.Revision, error) {
2816
updates := make([]tuple.RelationshipUpdate, 0, len(rels))
@@ -59,45 +47,3 @@ func ContextualizedCaveatFrom(name string, context map[string]any) (*core.Contex
5947
}
6048
return caveat, nil
6149
}
62-
63-
var errOverHundredPercent = errors.New("percentage greater than 100")
64-
65-
func parsePercent(str string, freeMem uint64) (uint64, error) {
66-
percent := strings.TrimSuffix(str, "%")
67-
parsedPercent, err := strconv.ParseUint(percent, 10, 64)
68-
if err != nil {
69-
return 0, fmt.Errorf("failed to parse percentage: %w", err)
70-
}
71-
72-
if parsedPercent > 100 {
73-
return 0, errOverHundredPercent
74-
}
75-
76-
return freeMem / 100 * parsedPercent, nil
77-
}
78-
79-
// WatchBufferSize takes a string and interprets it as
80-
// either a percentage of memory (as a percentage of
81-
// 75% of free memory as measured on startup)
82-
// or a humanized byte string and returns the number of
83-
// bytes or an error if the value cannot be interpreted.
84-
// Returns 0 on an empty string.
85-
func WatchBufferSize(sizeString string) (size uint64, err error) {
86-
if sizeString == "" {
87-
return 0, nil
88-
}
89-
90-
if strings.HasSuffix(sizeString, "%") {
91-
size, err := parsePercent(sizeString, freeMemory)
92-
if err != nil {
93-
return 0, fmt.Errorf("could not parse %s as percentage: %w", sizeString, err)
94-
}
95-
return size, nil
96-
}
97-
98-
size, err = humanize.ParseBytes(sizeString)
99-
if err != nil {
100-
return 0, fmt.Errorf("could not parse %s as a number of bytes: %w", sizeString, err)
101-
}
102-
return size, nil
103-
}

internal/datastore/context.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ func (p *ctxProxy) RevisionFromString(serialized string) (datastore.Revision, er
6666
return p.delegate.RevisionFromString(serialized)
6767
}
6868

69+
func (p *ctxProxy) DefaultsWatchOptions() datastore.WatchOptions {
70+
return p.delegate.DefaultsWatchOptions()
71+
}
72+
6973
func (p *ctxProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) {
7074
return p.delegate.Watch(ctx, afterRevision, options)
7175
}

internal/datastore/crdb/crdb.go

Lines changed: 23 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -180,24 +180,20 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
180180
config.followerReadDelay,
181181
config.revisionQuantization,
182182
),
183-
CommonDecoder: revisions.CommonDecoder{Kind: revisions.HybridLogicalClock},
184-
MigrationValidator: common.NewMigrationValidator(headMigration, config.allowedMigrations),
185-
dburl: url,
186-
acquireTimeout: config.acquireTimeout,
187-
watchBufferLength: config.watchBufferLength,
188-
watchChangeBufferMaximumSize: config.watchChangeBufferMaximumSize,
189-
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
190-
watchConnectTimeout: config.watchConnectTimeout,
191-
writeOverlapKeyer: keyer,
192-
overlapKeyInit: keySetInit,
193-
beginChangefeedQuery: changefeedQuery,
194-
transactionNowQuery: transactionNowQuery,
195-
analyzeBeforeStatistics: config.analyzeBeforeStatistics,
196-
filterMaximumIDCount: config.filterMaximumIDCount,
197-
supportsIntegrity: config.withIntegrity,
198-
gcWindow: config.gcWindow,
199-
watchEnabled: !config.watchDisabled,
200-
schema: *schema.Schema(config.columnOptimizationOption, config.withIntegrity, false),
183+
CommonDecoder: revisions.CommonDecoder{Kind: revisions.HybridLogicalClock},
184+
MigrationValidator: common.NewMigrationValidator(headMigration, config.allowedMigrations),
185+
dburl: url,
186+
acquireTimeout: config.acquireTimeout,
187+
writeOverlapKeyer: keyer,
188+
overlapKeyInit: keySetInit,
189+
beginChangefeedQuery: changefeedQuery,
190+
transactionNowQuery: transactionNowQuery,
191+
analyzeBeforeStatistics: config.analyzeBeforeStatistics,
192+
filterMaximumIDCount: config.filterMaximumIDCount,
193+
supportsIntegrity: config.withIntegrity,
194+
gcWindow: config.gcWindow,
195+
watchEnabled: !config.watchDisabled,
196+
schema: *schema.Schema(config.columnOptimizationOption, config.withIntegrity, false),
201197
}
202198
ds.SetNowFunc(ds.headRevisionInternal)
203199

@@ -262,19 +258,15 @@ type crdbDatastore struct {
262258
revisions.CommonDecoder
263259
*common.MigrationValidator
264260

265-
dburl string
266-
readPool, writePool *pool.RetryPool
267-
collectors []prometheus.Collector
268-
watchBufferLength uint16
269-
watchChangeBufferMaximumSize uint64
270-
watchBufferWriteTimeout time.Duration
271-
watchConnectTimeout time.Duration
272-
writeOverlapKeyer overlapKeyer
273-
overlapKeyInit func(ctx context.Context) keySet
274-
analyzeBeforeStatistics bool
275-
gcWindow time.Duration
276-
schema common.SchemaInformation
277-
acquireTimeout time.Duration
261+
dburl string
262+
readPool, writePool *pool.RetryPool
263+
collectors []prometheus.Collector
264+
writeOverlapKeyer overlapKeyer
265+
overlapKeyInit func(ctx context.Context) keySet
266+
analyzeBeforeStatistics bool
267+
gcWindow time.Duration
268+
schema common.SchemaInformation
269+
acquireTimeout time.Duration
278270

279271
beginChangefeedQuery string
280272
transactionNowQuery string

internal/datastore/crdb/crdb_test.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,14 @@ func crdbTestVersion() string {
7474
func TestCRDBDatastoreWithoutIntegrity(t *testing.T) {
7575
t.Parallel()
7676
b := testdatastore.RunCRDBForTesting(t, "", crdbTestVersion())
77-
test.All(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) {
77+
test.All(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, _ uint16) (datastore.Datastore, error) {
7878
ctx := context.Background()
7979
ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
8080
ds, err := NewCRDBDatastore(
8181
ctx,
8282
uri,
8383
GCWindow(gcWindow),
8484
RevisionQuantization(revisionQuantization),
85-
WatchBufferLength(watchBufferLength),
8685
OverlapStrategy(overlapStrategyPrefix),
8786
DebugAnalyzeBeforeStatistics(),
8887
WithAcquireTimeout(5*time.Second),
@@ -202,15 +201,14 @@ func TestCRDBDatastoreWithIntegrity(t *testing.T) { //nolint:tparallel
202201
t.Parallel()
203202
b := testdatastore.RunCRDBForTesting(t, "", crdbTestVersion())
204203

205-
test.All(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) {
204+
test.All(t, test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, _ uint16) (datastore.Datastore, error) {
206205
ctx := context.Background()
207206
ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
208207
ds, err := NewCRDBDatastore(
209208
ctx,
210209
uri,
211210
GCWindow(gcWindow),
212211
RevisionQuantization(revisionQuantization),
213-
WatchBufferLength(watchBufferLength),
214212
OverlapStrategy(overlapStrategyPrefix),
215213
DebugAnalyzeBeforeStatistics(),
216214
WithIntegrity(true),
@@ -229,15 +227,14 @@ func TestCRDBDatastoreWithIntegrity(t *testing.T) { //nolint:tparallel
229227
return ds, nil
230228
}), false)
231229

232-
unwrappedTester := test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, watchBufferLength uint16) (datastore.Datastore, error) {
230+
unwrappedTester := test.DatastoreTesterFunc(func(revisionQuantization, gcInterval, gcWindow time.Duration, _ uint16) (datastore.Datastore, error) {
233231
ctx := context.Background()
234232
ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
235233
ds, err := NewCRDBDatastore(
236234
ctx,
237235
uri,
238236
GCWindow(gcWindow),
239237
RevisionQuantization(revisionQuantization),
240-
WatchBufferLength(watchBufferLength),
241238
OverlapStrategy(overlapStrategyPrefix),
242239
DebugAnalyzeBeforeStatistics(),
243240
WithIntegrity(true),
@@ -331,7 +328,7 @@ func TestWatchFeatureDetection(t *testing.T) {
331328
headRevision, err := ds.HeadRevision(ctx)
332329
require.NoError(t, err)
333330

334-
_, errChan := ds.Watch(ctx, headRevision, datastore.WatchJustRelationships())
331+
_, errChan := ds.Watch(ctx, headRevision, datastore.WatchJustRelationships(ds))
335332
err = <-errChan
336333
require.Error(t, err)
337334
require.Contains(t, err.Error(), "watch is currently disabled")

internal/datastore/crdb/options.go

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,6 @@ type crdbOptions struct {
1515
readPoolOpts, writePoolOpts pgxcommon.PoolOptions
1616
connectRate time.Duration
1717

18-
watchBufferLength uint16
19-
watchChangeBufferMaximumSize uint64
20-
watchBufferWriteTimeout time.Duration
21-
watchConnectTimeout time.Duration
2218
revisionQuantization time.Duration
2319
followerReadDelay time.Duration
2420
maxRevisionStalenessPercent float64
@@ -76,9 +72,6 @@ type Option func(*crdbOptions)
7672
func generateConfig(options []Option) (crdbOptions, error) {
7773
computed := crdbOptions{
7874
gcWindow: 24 * time.Hour,
79-
watchBufferLength: defaultWatchBufferLength,
80-
watchBufferWriteTimeout: defaultWatchBufferWriteTimeout,
81-
watchConnectTimeout: defaultWatchConnectTimeout,
8275
revisionQuantization: defaultRevisionQuantization,
8376
followerReadDelay: defaultFollowerReadDelay,
8477
maxRevisionStalenessPercent: defaultMaxRevisionStalenessPercent,
@@ -251,34 +244,6 @@ func WriteConnsMaxOpen(conns int) Option {
251244
return func(po *crdbOptions) { po.writePoolOpts.MaxOpenConns = &conns }
252245
}
253246

254-
// WatchBufferLength is the number of entries that can be stored in the watch
255-
// buffer while awaiting read by the client.
256-
//
257-
// This value defaults to 128.
258-
func WatchBufferLength(watchBufferLength uint16) Option {
259-
return func(po *crdbOptions) { po.watchBufferLength = watchBufferLength }
260-
}
261-
262-
// WatchBufferWriteTimeout is the maximum timeout for writing to the watch buffer,
263-
// after which the caller to the watch will be disconnected.
264-
func WatchBufferWriteTimeout(watchBufferWriteTimeout time.Duration) Option {
265-
return func(po *crdbOptions) { po.watchBufferWriteTimeout = watchBufferWriteTimeout }
266-
}
267-
268-
// WatchBufferMaximumSize is the maximum size in bytes of the watch buffer.
269-
// If this value is exceeded the caller will receive an error.
270-
func WatchChangeBufferMaximumSize(maxSize uint64) Option {
271-
return func(po *crdbOptions) { po.watchChangeBufferMaximumSize = maxSize }
272-
}
273-
274-
// WatchConnectTimeout is the maximum timeout for connecting the watch stream
275-
// to the datastore.
276-
//
277-
// This value defaults to 1 second.
278-
func WatchConnectTimeout(watchConnectTimeout time.Duration) Option {
279-
return func(po *crdbOptions) { po.watchConnectTimeout = watchConnectTimeout }
280-
}
281-
282247
// RevisionQuantization is the time bucket size to which advertised revisions
283248
// will be rounded.
284249
//

internal/datastore/crdb/pool_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@ func TestTxReset(t *testing.T) {
107107
uri,
108108
GCWindow(24*time.Hour),
109109
RevisionQuantization(5*time.Second),
110-
WatchBufferLength(128),
111110
MaxRetries(tt.maxRetries),
112111
WithAcquireTimeout(5*time.Second),
113112
)

0 commit comments

Comments
 (0)