Skip to content

Commit 977db8f

Browse files
committed
fix: in spanner and postgres, use min or max checkpointintervals
1 parent 9e58a47 commit 977db8f

File tree

4 files changed

+18
-3
lines changed

4 files changed

+18
-3
lines changed

internal/datastore/crdb/watch.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ func (cds *crdbDatastore) DefaultsWatchOptions() datastore.WatchOptions {
7878
}
7979

8080
func (cds *crdbDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) {
81-
fmt.Printf("%+v\n", options)
8281
updates := make(chan datastore.RevisionChanges, options.WatchBufferLength)
8382
errs := make(chan error, 1)
8483

internal/datastore/postgres/watch.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/authzed/spicedb/internal/datastore/common"
1515
"github.com/authzed/spicedb/internal/datastore/postgres/schema"
16+
log "github.com/authzed/spicedb/internal/logging"
1617
"github.com/authzed/spicedb/internal/sharederrors"
1718
"github.com/authzed/spicedb/pkg/datastore"
1819
core "github.com/authzed/spicedb/pkg/proto/core/v1"
@@ -26,7 +27,6 @@ const (
2627

2728
func (pgd *pgDatastore) DefaultsWatchOptions() datastore.WatchOptions {
2829
return datastore.WatchOptions{
29-
CheckpointInterval: minimumWatchSleep,
3030
WatchBufferLength: defaultWatchBufferLength,
3131
WatchBufferWriteTimeout: defaultWatchBufferWriteTimeout,
3232
// Postgres does not use WatchConnectTimeout
@@ -94,6 +94,11 @@ func (pgd *pgDatastore) Watch(
9494

9595
afterRevision := afterRevisionRaw.(postgresRevision)
9696

97+
if options.CheckpointInterval < minimumWatchSleep {
98+
log.Warn().Msgf("--watch-api-heartbeat set too small, using %d", minimumWatchSleep)
99+
options.CheckpointInterval = minimumWatchSleep
100+
}
101+
97102
sendChange := func(change datastore.RevisionChanges) bool {
98103
select {
99104
case updates <- change:

internal/datastore/spanner/options.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ const (
7373
defaultRevisionQuantization = 5 * time.Second
7474
defaultFollowerReadDelay = 0 * time.Second
7575
defaultMaxRevisionStalenessPercent = 0.1
76+
minimumCheckpointInterval = 100 * time.Millisecond
77+
maximumCheckpointInterval = 300000 * time.Millisecond
7678
defaultWatchBufferLength = 128
7779
defaultWatchBufferWriteTimeout = 1 * time.Second
7880
defaultDisableStats = false

internal/datastore/spanner/watch.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818

1919
"github.com/authzed/spicedb/internal/datastore/common"
2020
"github.com/authzed/spicedb/internal/datastore/revisions"
21+
log "github.com/authzed/spicedb/internal/logging"
2122
"github.com/authzed/spicedb/pkg/datastore"
2223
core "github.com/authzed/spicedb/pkg/proto/core/v1"
2324
"github.com/authzed/spicedb/pkg/spiceerrors"
@@ -55,7 +56,6 @@ func parseDatabaseName(db string) (project, instance, database string, err error
5556

5657
func (sd *spannerDatastore) DefaultsWatchOptions() datastore.WatchOptions {
5758
return datastore.WatchOptions{
58-
CheckpointInterval: 100 * time.Millisecond,
5959
WatchBufferLength: defaultWatchBufferLength,
6060
WatchBufferWriteTimeout: defaultWatchBufferWriteTimeout,
6161
// Spanner does not use WatchConnectTimeout
@@ -88,6 +88,15 @@ func (sd *spannerDatastore) watch(
8888
defer close(updates)
8989
defer close(errs)
9090

91+
if opts.CheckpointInterval < minimumCheckpointInterval {
92+
log.Warn().Msgf("--watch-api-heartbeat set too small, using %d", minimumCheckpointInterval)
93+
opts.CheckpointInterval = minimumCheckpointInterval
94+
}
95+
if opts.CheckpointInterval > maximumCheckpointInterval {
96+
log.Warn().Msgf("--watch-api-heartbeat set too high, using %d", maximumCheckpointInterval)
97+
opts.CheckpointInterval = maximumCheckpointInterval
98+
}
99+
91100
sendError := func(err error) {
92101
if errors.Is(ctx.Err(), context.Canceled) || common.IsCancellationError(err) {
93102
errs <- datastore.NewWatchCanceledErr()

0 commit comments

Comments
 (0)