Skip to content

Commit fc05d75

Browse files
authored
feat(shard-distributor): return executor metadata from spectator GetShardOwner (#7476)
**What changed?** Changed `GetShardOwner` to return an `ExecutorOwnership` struct containing both executor ID and metadata map, instead of just the executor ID string. Also adds a Spectators group so we can easily pass around all spectators. **Why?** Enables callers to access additional executor information like gRPC address for peer routing, without requiring separate lookups. This is needed for implementing canary peer chooser that routes requests to executors based on their addresses. **How did you test it?** Updated all tests to verify metadata is included in responses. Verified locally that ownership information includes metadata. **Potential risks** Low - this is an API enhancement that maintains backward compatibility by returning the same executor ID, just with additional metadata. **Release notes** **Documentation Changes** None --------- Signed-off-by: Jakob Haahr Taankvist <[email protected]>
1 parent 5487696 commit fc05d75

File tree

4 files changed

+81
-32
lines changed

4 files changed

+81
-32
lines changed

service/sharddistributor/client/spectatorclient/client.go

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,52 @@ import (
2020

2121
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interface_mock.go . Spectator
2222

23+
type Spectators struct {
24+
spectators map[string]Spectator
25+
}
26+
27+
func (s *Spectators) ForNamespace(namespace string) (Spectator, error) {
28+
spectator, ok := s.spectators[namespace]
29+
if !ok {
30+
return nil, fmt.Errorf("spectator not found for namespace %s", namespace)
31+
}
32+
return spectator, nil
33+
}
34+
35+
func (s *Spectators) Start(ctx context.Context) error {
36+
for namespace, spectator := range s.spectators {
37+
if err := spectator.Start(ctx); err != nil {
38+
return fmt.Errorf("start spectator for namespace %s: %w", namespace, err)
39+
}
40+
}
41+
return nil
42+
}
43+
44+
func (s *Spectators) Stop() {
45+
for _, spectator := range s.spectators {
46+
spectator.Stop()
47+
}
48+
}
49+
50+
func NewSpectators(params Params) (*Spectators, error) {
51+
spectators := make(map[string]Spectator)
52+
for _, namespace := range params.Config.Namespaces {
53+
spectator, err := NewSpectatorWithNamespace(params, namespace.Namespace)
54+
if err != nil {
55+
return nil, fmt.Errorf("create spectator for namespace %s: %w", namespace.Namespace, err)
56+
}
57+
58+
spectators[namespace.Namespace] = spectator
59+
}
60+
return &Spectators{spectators: spectators}, nil
61+
}
62+
2363
type Spectator interface {
2464
Start(ctx context.Context) error
2565
Stop()
2666

27-
// GetShardOwner returns the owner of a shard. It first checks the local cache,
28-
// and if not found, falls back to querying the shard distributor directly.
29-
GetShardOwner(ctx context.Context, shardKey string) (string, error)
67+
// GetShardOwner returns the owner of a shard
68+
GetShardOwner(ctx context.Context, shardKey string) (*ShardOwner, error)
3069
}
3170

3271
type Params struct {
@@ -109,21 +148,9 @@ func createShardDistributorClient(yarpcClient sharddistributorv1.ShardDistributo
109148
// Module creates a spectator module using auto-selection (single namespace only)
110149
func Module() fx.Option {
111150
return fx.Module("shard-distributor-spectator-client",
112-
fx.Provide(NewSpectator),
113-
fx.Invoke(func(spectator Spectator, lc fx.Lifecycle) {
114-
lc.Append(fx.StartStopHook(spectator.Start, spectator.Stop))
115-
}),
116-
)
117-
}
118-
119-
// ModuleWithNamespace creates a spectator module for a specific namespace
120-
func ModuleWithNamespace(namespace string) fx.Option {
121-
return fx.Module(fmt.Sprintf("shard-distributor-spectator-client-%s", namespace),
122-
fx.Provide(func(params Params) (Spectator, error) {
123-
return NewSpectatorWithNamespace(params, namespace)
124-
}),
125-
fx.Invoke(func(spectator Spectator, lc fx.Lifecycle) {
126-
lc.Append(fx.StartStopHook(spectator.Start, spectator.Stop))
151+
fx.Provide(NewSpectators),
152+
fx.Invoke(func(spectators *Spectators, lc fx.Lifecycle) {
153+
lc.Append(fx.StartStopHook(spectators.Start, spectators.Stop))
127154
}),
128155
)
129156
}

service/sharddistributor/client/spectatorclient/clientimpl.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ func (s *spectatorImpl) watchLoop() {
103103

104104
// Server shutdown or network issue - recreate stream (load balancer will route to new server)
105105
s.logger.Info("Stream ended, reconnecting", tag.ShardNamespace(s.namespace))
106+
s.timeSource.Sleep(backoff.JitDuration(streamRetryInterval, streamRetryJitterCoeff))
106107
}
107108
}
108109

@@ -163,10 +164,10 @@ func (s *spectatorImpl) handleResponse(response *types.WatchNamespaceStateRespon
163164
tag.Counter(len(response.Executors)))
164165
}
165166

166-
// GetShardOwner returns the executor ID for a given shard.
167+
// GetShardOwner returns the full owner information including metadata for a given shard.
167168
// It first waits for the initial state to be received, then checks the cache.
168169
// If not found in cache, it falls back to querying the shard distributor directly.
169-
func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (string, error) {
170+
func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (*ShardOwner, error) {
170171
// Wait for first state to be received to avoid flooding shard distributor on startup
171172
s.firstStateWG.Wait()
172173

@@ -176,7 +177,7 @@ func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (str
176177
s.stateMu.RUnlock()
177178

178179
if owner != nil {
179-
return owner.ExecutorID, nil
180+
return owner, nil
180181
}
181182

182183
// Cache miss - fall back to RPC call
@@ -189,8 +190,11 @@ func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (str
189190
ShardKey: shardKey,
190191
})
191192
if err != nil {
192-
return "", fmt.Errorf("get shard owner from shard distributor: %w", err)
193+
return nil, fmt.Errorf("get shard owner from shard distributor: %w", err)
193194
}
194195

195-
return response.Owner, nil
196+
return &ShardOwner{
197+
ExecutorID: response.Owner,
198+
Metadata: response.Metadata,
199+
}, nil
196200
}

service/sharddistributor/client/spectatorclient/clientimpl_test.go

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ func TestWatchLoopBasicFlow(t *testing.T) {
4444
Executors: []*types.ExecutorShardAssignment{
4545
{
4646
ExecutorID: "executor-1",
47+
Metadata: map[string]string{
48+
"grpc_address": "127.0.0.1:7953",
49+
},
4750
AssignedShards: []*types.Shard{
4851
{ShardKey: "shard-1"},
4952
{ShardKey: "shard-2"},
@@ -72,11 +75,12 @@ func TestWatchLoopBasicFlow(t *testing.T) {
7275
// Query shard owner
7376
owner, err := spectator.GetShardOwner(context.Background(), "shard-1")
7477
assert.NoError(t, err)
75-
assert.Equal(t, "executor-1", owner)
78+
assert.Equal(t, "executor-1", owner.ExecutorID)
79+
assert.Equal(t, "127.0.0.1:7953", owner.Metadata["grpc_address"])
7680

7781
owner, err = spectator.GetShardOwner(context.Background(), "shard-2")
7882
assert.NoError(t, err)
79-
assert.Equal(t, "executor-1", owner)
83+
assert.Equal(t, "executor-1", owner.ExecutorID)
8084
}
8185

8286
func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) {
@@ -103,7 +107,13 @@ func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) {
103107
// First Recv returns state
104108
mockStream.EXPECT().Recv().Return(&types.WatchNamespaceStateResponse{
105109
Executors: []*types.ExecutorShardAssignment{
106-
{ExecutorID: "executor-1", AssignedShards: []*types.Shard{{ShardKey: "shard-1"}}},
110+
{
111+
ExecutorID: "executor-1",
112+
Metadata: map[string]string{
113+
"grpc_address": "127.0.0.1:7953",
114+
},
115+
AssignedShards: []*types.Shard{{ShardKey: "shard-1"}},
116+
},
107117
},
108118
}, nil)
109119

@@ -122,7 +132,12 @@ func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) {
122132
Namespace: "test-ns",
123133
ShardKey: "unknown-shard",
124134
}).
125-
Return(&types.GetShardOwnerResponse{Owner: "executor-2"}, nil)
135+
Return(&types.GetShardOwnerResponse{
136+
Owner: "executor-2",
137+
Metadata: map[string]string{
138+
"grpc_address": "127.0.0.1:7954",
139+
},
140+
}, nil)
126141

127142
spectator.Start(context.Background())
128143
defer spectator.Stop()
@@ -132,12 +147,13 @@ func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) {
132147
// Cache hit
133148
owner, err := spectator.GetShardOwner(context.Background(), "shard-1")
134149
assert.NoError(t, err)
135-
assert.Equal(t, "executor-1", owner)
150+
assert.Equal(t, "executor-1", owner.ExecutorID)
136151

137152
// Cache miss - should trigger RPC
138153
owner, err = spectator.GetShardOwner(context.Background(), "unknown-shard")
139154
assert.NoError(t, err)
140-
assert.Equal(t, "executor-2", owner)
155+
assert.Equal(t, "executor-2", owner.ExecutorID)
156+
assert.Equal(t, "127.0.0.1:7954", owner.Metadata["grpc_address"])
141157
}
142158

143159
func TestStreamReconnection(t *testing.T) {
@@ -188,7 +204,9 @@ func TestStreamReconnection(t *testing.T) {
188204
spectator.Start(context.Background())
189205
defer spectator.Stop()
190206

191-
// Advance time for retry
207+
// Wait for the goroutine to be blocked in Sleep, then advance time
208+
mockTimeSource.BlockUntil(1) // Wait for 1 goroutine to be blocked in Sleep
192209
mockTimeSource.Advance(2 * time.Second)
210+
193211
spectator.firstStateWG.Wait()
194212
}

service/sharddistributor/client/spectatorclient/interface_mock.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)