Skip to content

Commit d934e3a

Browse files
committed
Change spectator GetShardOwner to return executor metadata
Previously GetShardOwner only returned the executor ID string. Now it returns an ExecutorOwnership struct containing both the executor ID and the full metadata map. This allows callers to access additional executor information like the gRPC address needed for peer routing. Changes: - Add ExecutorOwnership struct with ExecutorID and Metadata fields - Update GetShardOwner signature to return *ExecutorOwnership - Update spectatorImpl to build and return ownership info - Update tests to verify metadata is included in responses Signed-off-by: Jakob Haahr Taankvist <[email protected]>
1 parent ad55a1c commit d934e3a

File tree

4 files changed

+76
-31
lines changed

4 files changed

+76
-31
lines changed

service/sharddistributor/client/spectatorclient/client.go

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,50 @@ import (
2020

2121
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interface_mock.go . Spectator
2222

23+
type Spectators map[string]Spectator
24+
25+
func (s Spectators) ForNamespace(namespace string) (Spectator, error) {
26+
spectator, ok := s[namespace]
27+
if !ok {
28+
return nil, fmt.Errorf("spectator not found for namespace %s", namespace)
29+
}
30+
return spectator, nil
31+
}
32+
33+
func (s Spectators) Start(ctx context.Context) error {
34+
for namespace, spectator := range s {
35+
if err := spectator.Start(ctx); err != nil {
36+
return fmt.Errorf("start spectator for namespace %s: %w", namespace, err)
37+
}
38+
}
39+
return nil
40+
}
41+
42+
func (s Spectators) Stop() {
43+
for _, spectator := range s {
44+
spectator.Stop()
45+
}
46+
}
47+
48+
func NewSpectators(params Params) (Spectators, error) {
49+
spectators := make(Spectators)
50+
for _, namespace := range params.Config.Namespaces {
51+
spectator, err := NewSpectatorWithNamespace(params, namespace.Namespace)
52+
if err != nil {
53+
return nil, fmt.Errorf("create spectator for namespace %s: %w", namespace.Namespace, err)
54+
}
55+
56+
spectators[namespace.Namespace] = spectator
57+
}
58+
return spectators, nil
59+
}
60+
2361
type Spectator interface {
2462
Start(ctx context.Context) error
2563
Stop()
2664

27-
// GetShardOwner returns the owner of a shard. It first checks the local cache,
28-
// and if not found, falls back to querying the shard distributor directly.
29-
GetShardOwner(ctx context.Context, shardKey string) (string, error)
65+
// GetShardOwner returns the owner of a shard
66+
GetShardOwner(ctx context.Context, shardKey string) (*ShardOwner, error)
3067
}
3168

3269
type Params struct {
@@ -109,21 +146,9 @@ func createShardDistributorClient(yarpcClient sharddistributorv1.ShardDistributo
109146
// Module creates a spectator module using auto-selection (single namespace only)
110147
func Module() fx.Option {
111148
return fx.Module("shard-distributor-spectator-client",
112-
fx.Provide(NewSpectator),
113-
fx.Invoke(func(spectator Spectator, lc fx.Lifecycle) {
114-
lc.Append(fx.StartStopHook(spectator.Start, spectator.Stop))
115-
}),
116-
)
117-
}
118-
119-
// ModuleWithNamespace creates a spectator module for a specific namespace
120-
func ModuleWithNamespace(namespace string) fx.Option {
121-
return fx.Module(fmt.Sprintf("shard-distributor-spectator-client-%s", namespace),
122-
fx.Provide(func(params Params) (Spectator, error) {
123-
return NewSpectatorWithNamespace(params, namespace)
124-
}),
125-
fx.Invoke(func(spectator Spectator, lc fx.Lifecycle) {
126-
lc.Append(fx.StartStopHook(spectator.Start, spectator.Stop))
149+
fx.Provide(NewSpectators),
150+
fx.Invoke(func(spectators Spectators, lc fx.Lifecycle) {
151+
lc.Append(fx.StartStopHook(spectators.Start, spectators.Stop))
127152
}),
128153
)
129154
}

service/sharddistributor/client/spectatorclient/clientimpl.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ func (s *spectatorImpl) watchLoop() {
103103

104104
// Server shutdown or network issue - recreate stream (load balancer will route to new server)
105105
s.logger.Info("Stream ended, reconnecting", tag.ShardNamespace(s.namespace))
106+
s.timeSource.Sleep(backoff.JitDuration(streamRetryInterval, streamRetryJitterCoeff))
106107
}
107108
}
108109

@@ -163,10 +164,10 @@ func (s *spectatorImpl) handleResponse(response *types.WatchNamespaceStateRespon
163164
tag.Counter(len(response.Executors)))
164165
}
165166

166-
// GetShardOwner returns the executor ID for a given shard.
167+
// GetShardOwner returns the full owner information including metadata for a given shard.
167168
// It first waits for the initial state to be received, then checks the cache.
168169
// If not found in cache, it falls back to querying the shard distributor directly.
169-
func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (string, error) {
170+
func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (*ShardOwner, error) {
170171
// Wait for first state to be received to avoid flooding shard distributor on startup
171172
s.firstStateWG.Wait()
172173

@@ -176,7 +177,7 @@ func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (str
176177
s.stateMu.RUnlock()
177178

178179
if owner != nil {
179-
return owner.ExecutorID, nil
180+
return owner, nil
180181
}
181182

182183
// Cache miss - fall back to RPC call
@@ -189,8 +190,11 @@ func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (str
189190
ShardKey: shardKey,
190191
})
191192
if err != nil {
192-
return "", fmt.Errorf("get shard owner from shard distributor: %w", err)
193+
return nil, fmt.Errorf("get shard owner from shard distributor: %w", err)
193194
}
194195

195-
return response.Owner, nil
196+
return &ShardOwner{
197+
ExecutorID: response.Owner,
198+
Metadata: response.Metadata,
199+
}, nil
196200
}

service/sharddistributor/client/spectatorclient/clientimpl_test.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ func TestWatchLoopBasicFlow(t *testing.T) {
4444
Executors: []*types.ExecutorShardAssignment{
4545
{
4646
ExecutorID: "executor-1",
47+
Metadata: map[string]string{
48+
"grpc_address": "127.0.0.1:7953",
49+
},
4750
AssignedShards: []*types.Shard{
4851
{ShardKey: "shard-1"},
4952
{ShardKey: "shard-2"},
@@ -72,11 +75,12 @@ func TestWatchLoopBasicFlow(t *testing.T) {
7275
// Query shard owner
7376
owner, err := spectator.GetShardOwner(context.Background(), "shard-1")
7477
assert.NoError(t, err)
75-
assert.Equal(t, "executor-1", owner)
78+
assert.Equal(t, "executor-1", owner.ExecutorID)
79+
assert.Equal(t, "127.0.0.1:7953", owner.Metadata["grpc_address"])
7680

7781
owner, err = spectator.GetShardOwner(context.Background(), "shard-2")
7882
assert.NoError(t, err)
79-
assert.Equal(t, "executor-1", owner)
83+
assert.Equal(t, "executor-1", owner.ExecutorID)
8084
}
8185

8286
func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) {
@@ -103,7 +107,13 @@ func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) {
103107
// First Recv returns state
104108
mockStream.EXPECT().Recv().Return(&types.WatchNamespaceStateResponse{
105109
Executors: []*types.ExecutorShardAssignment{
106-
{ExecutorID: "executor-1", AssignedShards: []*types.Shard{{ShardKey: "shard-1"}}},
110+
{
111+
ExecutorID: "executor-1",
112+
Metadata: map[string]string{
113+
"grpc_address": "127.0.0.1:7953",
114+
},
115+
AssignedShards: []*types.Shard{{ShardKey: "shard-1"}},
116+
},
107117
},
108118
}, nil)
109119

@@ -122,7 +132,12 @@ func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) {
122132
Namespace: "test-ns",
123133
ShardKey: "unknown-shard",
124134
}).
125-
Return(&types.GetShardOwnerResponse{Owner: "executor-2"}, nil)
135+
Return(&types.GetShardOwnerResponse{
136+
Owner: "executor-2",
137+
Metadata: map[string]string{
138+
"grpc_address": "127.0.0.1:7954",
139+
},
140+
}, nil)
126141

127142
spectator.Start(context.Background())
128143
defer spectator.Stop()
@@ -132,12 +147,13 @@ func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) {
132147
// Cache hit
133148
owner, err := spectator.GetShardOwner(context.Background(), "shard-1")
134149
assert.NoError(t, err)
135-
assert.Equal(t, "executor-1", owner)
150+
assert.Equal(t, "executor-1", owner.ExecutorID)
136151

137152
// Cache miss - should trigger RPC
138153
owner, err = spectator.GetShardOwner(context.Background(), "unknown-shard")
139154
assert.NoError(t, err)
140-
assert.Equal(t, "executor-2", owner)
155+
assert.Equal(t, "executor-2", owner.ExecutorID)
156+
assert.Equal(t, "127.0.0.1:7954", owner.Metadata["grpc_address"])
141157
}
142158

143159
func TestStreamReconnection(t *testing.T) {

service/sharddistributor/client/spectatorclient/interface_mock.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)