Skip to content

Commit 71df42c

Browse files
committed
Merge branch 'add-spectator-peer-chooser' into add-canary-pinger-new
2 parents 1115b5b + 1597904 commit 71df42c

File tree

6 files changed

+440
-32
lines changed

6 files changed

+440
-32
lines changed

service/sharddistributor/client/spectatorclient/client.go

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,50 @@ import (
2020

2121
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interface_mock.go . Spectator
2222

23+
type Spectators map[string]Spectator
24+
25+
func (s Spectators) ForNamespace(namespace string) (Spectator, error) {
26+
spectator, ok := s[namespace]
27+
if !ok {
28+
return nil, fmt.Errorf("spectator not found for namespace %s", namespace)
29+
}
30+
return spectator, nil
31+
}
32+
33+
func (s Spectators) Start(ctx context.Context) error {
34+
for namespace, spectator := range s {
35+
if err := spectator.Start(ctx); err != nil {
36+
return fmt.Errorf("start spectator for namespace %s: %w", namespace, err)
37+
}
38+
}
39+
return nil
40+
}
41+
42+
func (s Spectators) Stop() {
43+
for _, spectator := range s {
44+
spectator.Stop()
45+
}
46+
}
47+
48+
func NewSpectators(params Params) (Spectators, error) {
49+
spectators := make(Spectators)
50+
for _, namespace := range params.Config.Namespaces {
51+
spectator, err := NewSpectatorWithNamespace(params, namespace.Namespace)
52+
if err != nil {
53+
return nil, fmt.Errorf("create spectator for namespace %s: %w", namespace.Namespace, err)
54+
}
55+
56+
spectators[namespace.Namespace] = spectator
57+
}
58+
return spectators, nil
59+
}
60+
2361
type Spectator interface {
2462
Start(ctx context.Context) error
2563
Stop()
2664

27-
// GetShardOwner returns the owner of a shard. It first checks the local cache,
28-
// and if not found, falls back to querying the shard distributor directly.
29-
GetShardOwner(ctx context.Context, shardKey string) (string, error)
65+
// GetShardOwner returns the owner of a shard
66+
GetShardOwner(ctx context.Context, shardKey string) (*ShardOwner, error)
3067
}
3168

3269
type Params struct {
@@ -109,21 +146,9 @@ func createShardDistributorClient(yarpcClient sharddistributorv1.ShardDistributo
109146
// Module creates a spectator module using auto-selection (single namespace only)
110147
func Module() fx.Option {
111148
return fx.Module("shard-distributor-spectator-client",
112-
fx.Provide(NewSpectator),
113-
fx.Invoke(func(spectator Spectator, lc fx.Lifecycle) {
114-
lc.Append(fx.StartStopHook(spectator.Start, spectator.Stop))
115-
}),
116-
)
117-
}
118-
119-
// ModuleWithNamespace creates a spectator module for a specific namespace
120-
func ModuleWithNamespace(namespace string) fx.Option {
121-
return fx.Module(fmt.Sprintf("shard-distributor-spectator-client-%s", namespace),
122-
fx.Provide(func(params Params) (Spectator, error) {
123-
return NewSpectatorWithNamespace(params, namespace)
124-
}),
125-
fx.Invoke(func(spectator Spectator, lc fx.Lifecycle) {
126-
lc.Append(fx.StartStopHook(spectator.Start, spectator.Stop))
149+
fx.Provide(NewSpectators),
150+
fx.Invoke(func(spectators Spectators, lc fx.Lifecycle) {
151+
lc.Append(fx.StartStopHook(spectators.Start, spectators.Stop))
127152
}),
128153
)
129154
}

service/sharddistributor/client/spectatorclient/clientimpl.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ func (s *spectatorImpl) watchLoop() {
103103

104104
// Server shutdown or network issue - recreate stream (load balancer will route to new server)
105105
s.logger.Info("Stream ended, reconnecting", tag.ShardNamespace(s.namespace))
106+
s.timeSource.Sleep(backoff.JitDuration(streamRetryInterval, streamRetryJitterCoeff))
106107
}
107108
}
108109

@@ -163,10 +164,10 @@ func (s *spectatorImpl) handleResponse(response *types.WatchNamespaceStateRespon
163164
tag.Counter(len(response.Executors)))
164165
}
165166

166-
// GetShardOwner returns the executor ID for a given shard.
167+
// GetShardOwner returns the full owner information including metadata for a given shard.
167168
// It first waits for the initial state to be received, then checks the cache.
168169
// If not found in cache, it falls back to querying the shard distributor directly.
169-
func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (string, error) {
170+
func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (*ShardOwner, error) {
170171
// Wait for first state to be received to avoid flooding shard distributor on startup
171172
s.firstStateWG.Wait()
172173

@@ -176,7 +177,7 @@ func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (str
176177
s.stateMu.RUnlock()
177178

178179
if owner != nil {
179-
return owner.ExecutorID, nil
180+
return owner, nil
180181
}
181182

182183
// Cache miss - fall back to RPC call
@@ -189,8 +190,11 @@ func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (str
189190
ShardKey: shardKey,
190191
})
191192
if err != nil {
192-
return "", fmt.Errorf("get shard owner from shard distributor: %w", err)
193+
return nil, fmt.Errorf("get shard owner from shard distributor: %w", err)
193194
}
194195

195-
return response.Owner, nil
196+
return &ShardOwner{
197+
ExecutorID: response.Owner,
198+
Metadata: response.Metadata,
199+
}, nil
196200
}

service/sharddistributor/client/spectatorclient/clientimpl_test.go

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ func TestWatchLoopBasicFlow(t *testing.T) {
4444
Executors: []*types.ExecutorShardAssignment{
4545
{
4646
ExecutorID: "executor-1",
47+
Metadata: map[string]string{
48+
"grpc_address": "127.0.0.1:7953",
49+
},
4750
AssignedShards: []*types.Shard{
4851
{ShardKey: "shard-1"},
4952
{ShardKey: "shard-2"},
@@ -72,11 +75,12 @@ func TestWatchLoopBasicFlow(t *testing.T) {
7275
// Query shard owner
7376
owner, err := spectator.GetShardOwner(context.Background(), "shard-1")
7477
assert.NoError(t, err)
75-
assert.Equal(t, "executor-1", owner)
78+
assert.Equal(t, "executor-1", owner.ExecutorID)
79+
assert.Equal(t, "127.0.0.1:7953", owner.Metadata["grpc_address"])
7680

7781
owner, err = spectator.GetShardOwner(context.Background(), "shard-2")
7882
assert.NoError(t, err)
79-
assert.Equal(t, "executor-1", owner)
83+
assert.Equal(t, "executor-1", owner.ExecutorID)
8084
}
8185

8286
func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) {
@@ -103,7 +107,13 @@ func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) {
103107
// First Recv returns state
104108
mockStream.EXPECT().Recv().Return(&types.WatchNamespaceStateResponse{
105109
Executors: []*types.ExecutorShardAssignment{
106-
{ExecutorID: "executor-1", AssignedShards: []*types.Shard{{ShardKey: "shard-1"}}},
110+
{
111+
ExecutorID: "executor-1",
112+
Metadata: map[string]string{
113+
"grpc_address": "127.0.0.1:7953",
114+
},
115+
AssignedShards: []*types.Shard{{ShardKey: "shard-1"}},
116+
},
107117
},
108118
}, nil)
109119

@@ -122,7 +132,12 @@ func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) {
122132
Namespace: "test-ns",
123133
ShardKey: "unknown-shard",
124134
}).
125-
Return(&types.GetShardOwnerResponse{Owner: "executor-2"}, nil)
135+
Return(&types.GetShardOwnerResponse{
136+
Owner: "executor-2",
137+
Metadata: map[string]string{
138+
"grpc_address": "127.0.0.1:7954",
139+
},
140+
}, nil)
126141

127142
spectator.Start(context.Background())
128143
defer spectator.Stop()
@@ -132,12 +147,13 @@ func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) {
132147
// Cache hit
133148
owner, err := spectator.GetShardOwner(context.Background(), "shard-1")
134149
assert.NoError(t, err)
135-
assert.Equal(t, "executor-1", owner)
150+
assert.Equal(t, "executor-1", owner.ExecutorID)
136151

137152
// Cache miss - should trigger RPC
138153
owner, err = spectator.GetShardOwner(context.Background(), "unknown-shard")
139154
assert.NoError(t, err)
140-
assert.Equal(t, "executor-2", owner)
155+
assert.Equal(t, "executor-2", owner.ExecutorID)
156+
assert.Equal(t, "127.0.0.1:7954", owner.Metadata["grpc_address"])
141157
}
142158

143159
func TestStreamReconnection(t *testing.T) {
@@ -188,7 +204,9 @@ func TestStreamReconnection(t *testing.T) {
188204
spectator.Start(context.Background())
189205
defer spectator.Stop()
190206

191-
// Advance time for retry
207+
// Wait for the goroutine to be blocked in Sleep, then advance time
208+
mockTimeSource.BlockUntil(1) // Wait for 1 goroutine to be blocked in Sleep
192209
mockTimeSource.Advance(2 * time.Second)
210+
193211
spectator.firstStateWG.Wait()
194212
}

service/sharddistributor/client/spectatorclient/interface_mock.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package spectatorclient
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
"go.uber.org/fx"
9+
"go.uber.org/yarpc/api/peer"
10+
"go.uber.org/yarpc/api/transport"
11+
"go.uber.org/yarpc/peer/hostport"
12+
"go.uber.org/yarpc/yarpcerrors"
13+
14+
"github.com/uber/cadence/common/log"
15+
"github.com/uber/cadence/common/log/tag"
16+
)
17+
18+
const (
19+
NamespaceHeader = "x-shard-distributor-namespace"
20+
grpcAddressMetadataKey = "grpc_address"
21+
)
22+
23+
// SpectatorPeerChooserInterface extends peer.Chooser with SetSpectators method
24+
type SpectatorPeerChooserInterface interface {
25+
peer.Chooser
26+
SetSpectators(spectators Spectators)
27+
}
28+
29+
// SpectatorPeerChooser is a peer.Chooser that uses the Spectator to route requests
30+
// to the correct executor based on shard ownership.
31+
// This is the shard distributor equivalent of Cadence's RingpopPeerChooser.
32+
//
33+
// Flow:
34+
// 1. Client calls RPC with yarpc.WithShardKey("shard-key")
35+
// 2. Choose() is called with req.ShardKey = "shard-key"
36+
// 3. Query Spectator for shard owner
37+
// 4. Extract grpc_address from owner metadata
38+
// 5. Create/reuse peer for that address
39+
// 6. Return peer to YARPC for connection
40+
type SpectatorPeerChooser struct {
41+
spectators Spectators
42+
transport peer.Transport
43+
logger log.Logger
44+
namespace string
45+
46+
mu sync.RWMutex
47+
peers map[string]peer.Peer // grpc_address -> peer
48+
}
49+
50+
type SpectatorPeerChooserParams struct {
51+
fx.In
52+
Transport peer.Transport
53+
Logger log.Logger
54+
}
55+
56+
// NewSpectatorPeerChooser creates a new peer chooser that routes based on shard distributor ownership
57+
func NewSpectatorPeerChooser(
58+
params SpectatorPeerChooserParams,
59+
) SpectatorPeerChooserInterface {
60+
return &SpectatorPeerChooser{
61+
transport: params.Transport,
62+
logger: params.Logger,
63+
peers: make(map[string]peer.Peer),
64+
}
65+
}
66+
67+
// Start satisfies the peer.Chooser interface
68+
func (c *SpectatorPeerChooser) Start() error {
69+
c.logger.Info("Starting shard distributor peer chooser", tag.ShardNamespace(c.namespace))
70+
return nil
71+
}
72+
73+
// Stop satisfies the peer.Chooser interface
74+
func (c *SpectatorPeerChooser) Stop() error {
75+
c.logger.Info("Stopping shard distributor peer chooser", tag.ShardNamespace(c.namespace))
76+
77+
// Release all peers
78+
c.mu.Lock()
79+
defer c.mu.Unlock()
80+
81+
for addr, p := range c.peers {
82+
if err := c.transport.ReleasePeer(p, &noOpSubscriber{}); err != nil {
83+
c.logger.Error("Failed to release peer", tag.Error(err), tag.Address(addr))
84+
}
85+
}
86+
c.peers = make(map[string]peer.Peer)
87+
88+
return nil
89+
}
90+
91+
// IsRunning satisfies the peer.Chooser interface
92+
func (c *SpectatorPeerChooser) IsRunning() bool {
93+
return true
94+
}
95+
96+
// Choose returns a peer for the given shard key by:
97+
// 0. Looking up the spectator for the namespace using the x-shard-distributor-namespace header
98+
// 1. Looking up the shard owner via the Spectator
99+
// 2. Extracting the grpc_address from the owner's metadata
100+
// 3. Creating/reusing a peer for that address
101+
//
102+
// The ShardKey in the request is the actual shard key (e.g., workflow ID, shard ID),
103+
// NOT the ip:port address. This is the key distinction from directPeerChooser.
104+
func (c *SpectatorPeerChooser) Choose(ctx context.Context, req *transport.Request) (peer.Peer, func(error), error) {
105+
if req.ShardKey == "" {
106+
return nil, nil, yarpcerrors.InvalidArgumentErrorf("chooser requires ShardKey to be non-empty")
107+
}
108+
109+
// Get the spectator for the namespace
110+
namespace, ok := req.Headers.Get(NamespaceHeader)
111+
if !ok || namespace == "" {
112+
return nil, nil, yarpcerrors.InvalidArgumentErrorf("chooser requires x-shard-distributor-namespace header to be non-empty")
113+
}
114+
115+
spectator, err := c.spectators.ForNamespace(namespace)
116+
if err != nil {
117+
return nil, nil, yarpcerrors.InvalidArgumentErrorf("failed to get spectator for namespace %s: %w", namespace, err)
118+
}
119+
120+
// Query spectator for shard owner
121+
owner, err := spectator.GetShardOwner(ctx, req.ShardKey)
122+
if err != nil {
123+
return nil, nil, yarpcerrors.UnavailableErrorf("failed to get shard owner for key %s: %v", req.ShardKey, err)
124+
}
125+
126+
// Extract GRPC address from owner metadata
127+
grpcAddress, ok := owner.Metadata[grpcAddressMetadataKey]
128+
if !ok || grpcAddress == "" {
129+
return nil, nil, yarpcerrors.InternalErrorf("no grpc_address in metadata for executor %s owning shard %s", owner.ExecutorID, req.ShardKey)
130+
}
131+
132+
// Check if we already have a peer for this address
133+
c.mu.RLock()
134+
p, ok := c.peers[grpcAddress]
135+
if ok {
136+
c.mu.RUnlock()
137+
return p, func(error) {}, nil
138+
}
139+
c.mu.RUnlock()
140+
141+
// Create new peer for this address
142+
p, err = c.addPeer(grpcAddress)
143+
if err != nil {
144+
return nil, nil, yarpcerrors.InternalErrorf("failed to add peer for address %s: %v", grpcAddress, err)
145+
}
146+
147+
return p, func(error) {}, nil
148+
}
149+
150+
func (c *SpectatorPeerChooser) SetSpectators(spectators Spectators) {
151+
c.spectators = spectators
152+
}
153+
154+
func (c *SpectatorPeerChooser) addPeer(grpcAddress string) (peer.Peer, error) {
155+
c.mu.Lock()
156+
defer c.mu.Unlock()
157+
158+
// Check again in case another goroutine added it
159+
if p, ok := c.peers[grpcAddress]; ok {
160+
return p, nil
161+
}
162+
163+
p, err := c.transport.RetainPeer(hostport.Identify(grpcAddress), &noOpSubscriber{})
164+
if err != nil {
165+
return nil, fmt.Errorf("retain peer failed: %w", err)
166+
}
167+
168+
c.peers[grpcAddress] = p
169+
c.logger.Info("Added peer to shard distributor peer chooser", tag.Address(grpcAddress))
170+
return p, nil
171+
}
172+
173+
// noOpSubscriber is a no-op implementation of peer.Subscriber
174+
type noOpSubscriber struct{}
175+
176+
func (*noOpSubscriber) NotifyStatusChanged(peer.Identifier) {}

0 commit comments

Comments
 (0)