Skip to content

Commit 9e58a47

Browse files
committed
chore: more refactor
1 parent f03a9fa commit 9e58a47

File tree

8 files changed

+40
-42
lines changed

8 files changed

+40
-42
lines changed

internal/datastore/mysql/watch.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@ const (
1919

2020
func (mds *mysqlDatastore) DefaultsWatchOptions() datastore.WatchOptions {
2121
return datastore.WatchOptions{
22-
WatchBufferLength: defaultWatchBufferLength,
23-
WatchBufferWriteTimeout: defaultWatchBufferWriteTimeout,
24-
MaximumBufferedChangesByteSize: 0, // 0 means no limit
22+
WatchBufferLength: defaultWatchBufferLength,
23+
WatchBufferWriteTimeout: defaultWatchBufferWriteTimeout,
2524
// MySQL does not use CheckpointInterval or WatchConnectTimeout
2625
// MySQL does not support EmitImmediatelyStrategy or WatchSchema
2726
}

internal/datastore/postgres/watch.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,9 @@ const (
2626

2727
func (pgd *pgDatastore) DefaultsWatchOptions() datastore.WatchOptions {
2828
return datastore.WatchOptions{
29-
CheckpointInterval: minimumWatchSleep,
30-
WatchBufferLength: defaultWatchBufferLength,
31-
WatchBufferWriteTimeout: defaultWatchBufferWriteTimeout,
32-
MaximumBufferedChangesByteSize: 0, // 0 means no limit
29+
CheckpointInterval: minimumWatchSleep,
30+
WatchBufferLength: defaultWatchBufferLength,
31+
WatchBufferWriteTimeout: defaultWatchBufferWriteTimeout,
3332
// Postgres does not use WatchConnectTimeout
3433
// Postgres does not support EmitImmediatelyStrategy
3534
}

internal/datastore/proxy/indexcheck/indexcheck_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func TestIndexCheckingProxyMethods(t *testing.T) {
121121

122122
t.Run("Watch", func(t *testing.T) {
123123
watchOptions, err := datastore.BuildAndValidateWatchOptions(
124-
datastore.ServerWatchConfig{},
124+
datastore.ServerWatchOptions{},
125125
datastore.ClientWatchOptions{},
126126
proxy.DefaultsWatchOptions(),
127127
)

internal/datastore/proxy/schemacaching/watchingcache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func (p *watchingCachingProxy) startSync(ctx context.Context) error {
158158
}
159159

160160
watchOptions, err := datastore.BuildAndValidateWatchOptions(
161-
datastore.ServerWatchConfig{
161+
datastore.ServerWatchOptions{
162162
CheckpointInterval: p.watchHeartbeat,
163163
},
164164
datastore.ClientWatchOptions{

internal/datastore/spanner/watch.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,9 @@ func parseDatabaseName(db string) (project, instance, database string, err error
5555

5656
func (sd *spannerDatastore) DefaultsWatchOptions() datastore.WatchOptions {
5757
return datastore.WatchOptions{
58-
CheckpointInterval: 100 * time.Millisecond,
59-
WatchBufferLength: defaultWatchBufferLength,
60-
WatchBufferWriteTimeout: defaultWatchBufferWriteTimeout,
61-
MaximumBufferedChangesByteSize: 0, // 0 means no limit
58+
CheckpointInterval: 100 * time.Millisecond,
59+
WatchBufferLength: defaultWatchBufferLength,
60+
WatchBufferWriteTimeout: defaultWatchBufferWriteTimeout,
6261
// Spanner does not use WatchConnectTimeout
6362
// Spanner does not support EmitImmediatelyStrategy
6463
}

internal/services/v1/watch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (ws *watchServer) Watch(req *v1.WatchRequest, stream v1.WatchService_WatchS
101101
clientRequest := datastore.ClientWatchOptions{
102102
Content: convertWatchKindToContent(req.OptionalUpdateKinds),
103103
}
104-
dsConfig := datastore.ServerWatchConfig{
104+
dsConfig := datastore.ServerWatchOptions{
105105
CheckpointInterval: ws.serverConfig.CheckpointInterval,
106106
WatchBufferLength: ws.serverConfig.WatchBufferLength,
107107
WatchBufferWriteTimeout: ws.serverConfig.WatchBufferWriteTimeout,

pkg/datastore/datastore.go

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -602,9 +602,9 @@ const (
602602
WatchCheckpoints WatchContent = 1 << 2
603603
)
604604

605-
// ServerWatchConfig contains server-level configuration for Watch operations.
605+
// ServerWatchOptions contains server-level configuration for Watch operations.
606606
// These values do NOT change during the lifetime of a server.
607-
type ServerWatchConfig struct {
607+
type ServerWatchOptions struct {
608608
// CheckpointInterval is the interval to use for checkpointing in the watch.
609609
CheckpointInterval time.Duration
610610

@@ -635,22 +635,21 @@ type ClientWatchOptions struct {
635635
}
636636

637637
// WatchOptions are ALL options for a Watch call.
638-
// This struct combines server configuration, client requests, and datastore defaults
639-
// into a single set of options passed to the datastore Watch implementation.
638+
// Some datastore implementations may ignore one or more of these.
640639
type WatchOptions struct {
641640
// See ClientWatchOptions.Content
642641
Content WatchContent
643642
// See ClientWatchOptions.EmissionStrategy
644643
EmissionStrategy EmissionStrategy
645-
// See ServerWatchConfig.CheckpointInterval
644+
// See ServerWatchOptions.CheckpointInterval
646645
CheckpointInterval time.Duration
647-
// See ServerWatchConfig.WatchBufferLength
646+
// See ServerWatchOptions.WatchBufferLength
648647
WatchBufferLength uint16
649-
// See ServerWatchConfig.WatchBufferWriteTimeout
648+
// See ServerWatchOptions.WatchBufferWriteTimeout
650649
WatchBufferWriteTimeout time.Duration
651-
// See ServerWatchConfig.WatchConnectTimeout
650+
// See ServerWatchOptions.WatchConnectTimeout
652651
WatchConnectTimeout time.Duration
653-
// See ServerWatchConfig.MaximumBufferedChangesByteSize
652+
// See ServerWatchOptions.MaximumBufferedChangesByteSize
654653
MaximumBufferedChangesByteSize uint64
655654
}
656655

@@ -672,7 +671,7 @@ const (
672671
// WatchJustRelationships returns watch options for just relationships.
673672
func WatchJustRelationships(ds Datastore) WatchOptions {
674673
v, _ := BuildAndValidateWatchOptions(
675-
ServerWatchConfig{},
674+
ServerWatchOptions{},
676675
ClientWatchOptions{Content: WatchRelationships},
677676
ds.DefaultsWatchOptions(),
678677
)
@@ -682,7 +681,7 @@ func WatchJustRelationships(ds Datastore) WatchOptions {
682681
// WatchJustSchema returns watch options for just schema.
683682
func WatchJustSchema(ds ReadOnlyDatastore) WatchOptions {
684683
v, _ := BuildAndValidateWatchOptions(
685-
ServerWatchConfig{},
684+
ServerWatchOptions{},
686685
ClientWatchOptions{Content: WatchSchema},
687686
ds.DefaultsWatchOptions(),
688687
)
@@ -731,25 +730,27 @@ func watchBufferSize(sizeString string) (size uint64, err error) {
731730
return size, nil
732731
}
733732

734-
// BuildAndValidateWatchOptions constructs complete WatchOptions by merging server configuration,
735-
// client requests, and datastore defaults.
733+
// BuildAndValidateWatchOptions constructs complete WatchOptions by merging server options,
734+
// client options, and datastore defaults.
735+
// Datastore defaults take precedence over server options.
736+
// Client options cannot be overridden.
736737
func BuildAndValidateWatchOptions(
737-
serverConfig ServerWatchConfig,
738-
clientRequest ClientWatchOptions,
738+
serverOptions ServerWatchOptions,
739+
clientOptions ClientWatchOptions,
739740
datastoreDefaults WatchOptions,
740741
) (WatchOptions, error) {
741-
watchChangeBufferMaximumSize, err := watchBufferSize(serverConfig.MaximumBufferedChangesByteSize)
742+
watchChangeBufferMaximumSize, err := watchBufferSize(serverOptions.MaximumBufferedChangesByteSize)
742743
if err != nil {
743744
return WatchOptions{}, err
744745
}
745746

746747
options := WatchOptions{
747-
Content: clientRequest.Content,
748-
EmissionStrategy: clientRequest.EmissionStrategy,
749-
CheckpointInterval: serverConfig.CheckpointInterval,
750-
WatchBufferLength: serverConfig.WatchBufferLength,
751-
WatchBufferWriteTimeout: serverConfig.WatchBufferWriteTimeout,
752-
WatchConnectTimeout: serverConfig.WatchConnectTimeout,
748+
Content: clientOptions.Content,
749+
EmissionStrategy: clientOptions.EmissionStrategy,
750+
CheckpointInterval: serverOptions.CheckpointInterval,
751+
WatchBufferLength: serverOptions.WatchBufferLength,
752+
WatchBufferWriteTimeout: serverOptions.WatchBufferWriteTimeout,
753+
WatchConnectTimeout: serverOptions.WatchConnectTimeout,
753754
MaximumBufferedChangesByteSize: watchChangeBufferMaximumSize,
754755
}
755756

@@ -825,7 +826,7 @@ type ReadOnlyDatastore interface {
825826
Watch(ctx context.Context, afterRevision Revision, options WatchOptions) (<-chan RevisionChanges, <-chan error)
826827

827828
// DefaultsWatchOptions returns the default watch options for this datastore.
828-
// These defaults are used when building WatchOptions from ServerWatchConfig and ClientWatchOptions.
829+
// These defaults are used when building WatchOptions from ServerWatchOptions and ClientWatchOptions.
829830
// Each datastore should return appropriate defaults based on its capabilities and constraints.
830831
DefaultsWatchOptions() WatchOptions
831832

pkg/datastore/datastore_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,31 +15,31 @@ import (
1515

1616
func TestBuildAndValidateWatchOptions(t *testing.T) {
1717
t.Run("WatchBufferWriteTimeout", func(t *testing.T) {
18-
c, err := BuildAndValidateWatchOptions(ServerWatchConfig{WatchBufferWriteTimeout: 1 * time.Second}, ClientWatchOptions{}, WatchOptions{WatchBufferWriteTimeout: 2 * time.Second})
18+
c, err := BuildAndValidateWatchOptions(ServerWatchOptions{WatchBufferWriteTimeout: 1 * time.Second}, ClientWatchOptions{}, WatchOptions{WatchBufferWriteTimeout: 2 * time.Second})
1919
require.NoError(t, err)
2020
require.NotNil(t, c)
2121
require.Equal(t, 2*time.Second, c.WatchBufferWriteTimeout)
2222
})
2323
t.Run("WatchConnectTimeout", func(t *testing.T) {
24-
c, err := BuildAndValidateWatchOptions(ServerWatchConfig{WatchConnectTimeout: 1 * time.Second}, ClientWatchOptions{}, WatchOptions{WatchConnectTimeout: 2 * time.Second})
24+
c, err := BuildAndValidateWatchOptions(ServerWatchOptions{WatchConnectTimeout: 1 * time.Second}, ClientWatchOptions{}, WatchOptions{WatchConnectTimeout: 2 * time.Second})
2525
require.NoError(t, err)
2626
require.NotNil(t, c)
2727
require.Equal(t, 2*time.Second, c.WatchConnectTimeout)
2828
})
2929
t.Run("WatchBufferLength", func(t *testing.T) {
30-
c, err := BuildAndValidateWatchOptions(ServerWatchConfig{WatchBufferLength: 100}, ClientWatchOptions{}, WatchOptions{WatchBufferLength: 200})
30+
c, err := BuildAndValidateWatchOptions(ServerWatchOptions{WatchBufferLength: 100}, ClientWatchOptions{}, WatchOptions{WatchBufferLength: 200})
3131
require.NoError(t, err)
3232
require.NotNil(t, c)
3333
require.Equal(t, uint16(200), c.WatchBufferLength)
3434
})
3535
t.Run("CheckpointInterval", func(t *testing.T) {
36-
c, err := BuildAndValidateWatchOptions(ServerWatchConfig{CheckpointInterval: 1}, ClientWatchOptions{}, WatchOptions{CheckpointInterval: 2})
36+
c, err := BuildAndValidateWatchOptions(ServerWatchOptions{CheckpointInterval: 1}, ClientWatchOptions{}, WatchOptions{CheckpointInterval: 2})
3737
require.NoError(t, err)
3838
require.NotNil(t, c)
3939
require.Equal(t, time.Duration(2), c.CheckpointInterval)
4040
})
4141
t.Run("Invalid CheckpointInterval", func(t *testing.T) {
42-
_, err := BuildAndValidateWatchOptions(ServerWatchConfig{CheckpointInterval: -1}, ClientWatchOptions{}, WatchOptions{CheckpointInterval: -1})
42+
_, err := BuildAndValidateWatchOptions(ServerWatchOptions{CheckpointInterval: -1}, ClientWatchOptions{}, WatchOptions{CheckpointInterval: -1})
4343
require.Error(t, err)
4444
})
4545
}

0 commit comments

Comments
 (0)