Skip to content

Commit 0d02c5d

Browse files
committed
Add SpectatorPeerChooser for shard-aware routing
Implement a YARPC peer chooser that routes requests to the correct executor based on shard ownership. This is the shard distributor equivalent of Cadence's RingpopPeerChooser. Flow: 1. Client calls RPC with yarpc.WithShardKey("shard-key") 2. Chooser queries Spectator for shard owner 3. Extracts grpc_address from owner metadata 4. Creates/reuses peer for that address 5. Returns peer to YARPC for connection The peer chooser maintains a cache of peers and handles concurrent access safely. It uses the x-shard-distributor-namespace header to determine which namespace's spectator to query. Dependencies: - Requires spectator GetShardOwner to return metadata (see previous commit) Signed-off-by: Jakob Haahr Taankvist <[email protected]>
1 parent 38c377d commit 0d02c5d

File tree

1 file changed

+174
-0
lines changed

1 file changed

+174
-0
lines changed
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package spectatorclient
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
"go.uber.org/fx"
9+
"go.uber.org/yarpc/api/peer"
10+
"go.uber.org/yarpc/api/transport"
11+
"go.uber.org/yarpc/peer/hostport"
12+
"go.uber.org/yarpc/yarpcerrors"
13+
14+
"github.com/uber/cadence/common/log"
15+
"github.com/uber/cadence/common/log/tag"
16+
"github.com/uber/cadence/service/sharddistributor/canary/metadata"
17+
)
18+
19+
const NamespaceHeader = "x-shard-distributor-namespace"
20+
21+
// SpectatorPeerChooserInterface extends peer.Chooser with SetSpectators method
22+
type SpectatorPeerChooserInterface interface {
23+
peer.Chooser
24+
SetSpectators(spectators Spectators)
25+
}
26+
27+
// SpectatorPeerChooser is a peer.Chooser that uses the Spectator to route requests
28+
// to the correct executor based on shard ownership.
29+
// This is the shard distributor equivalent of Cadence's RingpopPeerChooser.
30+
//
31+
// Flow:
32+
// 1. Client calls RPC with yarpc.WithShardKey("shard-key")
33+
// 2. Choose() is called with req.ShardKey = "shard-key"
34+
// 3. Query Spectator for shard owner
35+
// 4. Extract grpc_address from owner metadata
36+
// 5. Create/reuse peer for that address
37+
// 6. Return peer to YARPC for connection
38+
type SpectatorPeerChooser struct {
39+
spectators Spectators
40+
transport peer.Transport
41+
logger log.Logger
42+
namespace string
43+
44+
mu sync.RWMutex
45+
peers map[string]peer.Peer // grpc_address -> peer
46+
}
47+
48+
type SpectatorPeerChooserParams struct {
49+
fx.In
50+
Transport peer.Transport
51+
Logger log.Logger
52+
}
53+
54+
// NewSpectatorPeerChooser creates a new peer chooser that routes based on shard distributor ownership
55+
func NewSpectatorPeerChooser(
56+
params SpectatorPeerChooserParams,
57+
) SpectatorPeerChooserInterface {
58+
return &SpectatorPeerChooser{
59+
transport: params.Transport,
60+
logger: params.Logger,
61+
peers: make(map[string]peer.Peer),
62+
}
63+
}
64+
65+
// Start satisfies the peer.Chooser interface
66+
func (c *SpectatorPeerChooser) Start() error {
67+
c.logger.Info("Starting shard distributor peer chooser", tag.ShardNamespace(c.namespace))
68+
return nil
69+
}
70+
71+
// Stop satisfies the peer.Chooser interface
72+
func (c *SpectatorPeerChooser) Stop() error {
73+
c.logger.Info("Stopping shard distributor peer chooser", tag.ShardNamespace(c.namespace))
74+
75+
// Release all peers
76+
c.mu.Lock()
77+
defer c.mu.Unlock()
78+
79+
for addr, p := range c.peers {
80+
if err := c.transport.ReleasePeer(p, &noOpSubscriber{}); err != nil {
81+
c.logger.Error("Failed to release peer", tag.Error(err), tag.Address(addr))
82+
}
83+
}
84+
c.peers = make(map[string]peer.Peer)
85+
86+
return nil
87+
}
88+
89+
// IsRunning satisfies the peer.Chooser interface
90+
func (c *SpectatorPeerChooser) IsRunning() bool {
91+
return true
92+
}
93+
94+
// Choose returns a peer for the given shard key by:
95+
// 0. Looking up the spectator for the namespace using the x-shard-distributor-namespace header
96+
// 1. Looking up the shard owner via the Spectator
97+
// 2. Extracting the grpc_address from the owner's metadata
98+
// 3. Creating/reusing a peer for that address
99+
//
100+
// The ShardKey in the request is the actual shard key (e.g., workflow ID, shard ID),
101+
// NOT the ip:port address. This is the key distinction from directPeerChooser.
102+
func (c *SpectatorPeerChooser) Choose(ctx context.Context, req *transport.Request) (peer.Peer, func(error), error) {
103+
if req.ShardKey == "" {
104+
return nil, nil, yarpcerrors.InvalidArgumentErrorf("chooser requires ShardKey to be non-empty")
105+
}
106+
107+
// Get the spectator for the namespace
108+
namespace, ok := req.Headers.Get(NamespaceHeader)
109+
if !ok || namespace == "" {
110+
return nil, nil, yarpcerrors.InvalidArgumentErrorf("chooser requires x-shard-distributor-namespace header to be non-empty")
111+
}
112+
113+
spectator, err := c.spectators.ForNamespace(namespace)
114+
if err != nil {
115+
return nil, nil, yarpcerrors.InvalidArgumentErrorf("failed to get spectator for namespace %s: %w", namespace, err)
116+
}
117+
118+
// Query spectator for shard owner
119+
owner, err := spectator.GetShardOwner(ctx, req.ShardKey)
120+
if err != nil {
121+
return nil, nil, yarpcerrors.UnavailableErrorf("failed to get shard owner for key %s: %v", req.ShardKey, err)
122+
}
123+
124+
// Extract GRPC address from owner metadata
125+
grpcAddress, ok := owner.Metadata[metadata.MetadataKeyGRPCAddress]
126+
if !ok || grpcAddress == "" {
127+
return nil, nil, yarpcerrors.InternalErrorf("no grpc_address in metadata for executor %s owning shard %s", owner.ExecutorID, req.ShardKey)
128+
}
129+
130+
// Check if we already have a peer for this address
131+
c.mu.RLock()
132+
p, ok := c.peers[grpcAddress]
133+
if ok {
134+
c.mu.RUnlock()
135+
return p, func(error) {}, nil
136+
}
137+
c.mu.RUnlock()
138+
139+
// Create new peer for this address
140+
p, err = c.addPeer(grpcAddress)
141+
if err != nil {
142+
return nil, nil, yarpcerrors.InternalErrorf("failed to add peer for address %s: %v", grpcAddress, err)
143+
}
144+
145+
return p, func(error) {}, nil
146+
}
147+
148+
func (c *SpectatorPeerChooser) SetSpectators(spectators Spectators) {
149+
c.spectators = spectators
150+
}
151+
152+
func (c *SpectatorPeerChooser) addPeer(grpcAddress string) (peer.Peer, error) {
153+
c.mu.Lock()
154+
defer c.mu.Unlock()
155+
156+
// Check again in case another goroutine added it
157+
if p, ok := c.peers[grpcAddress]; ok {
158+
return p, nil
159+
}
160+
161+
p, err := c.transport.RetainPeer(hostport.Identify(grpcAddress), &noOpSubscriber{})
162+
if err != nil {
163+
return nil, fmt.Errorf("retain peer failed: %w", err)
164+
}
165+
166+
c.peers[grpcAddress] = p
167+
c.logger.Info("Added peer to shard distributor peer chooser", tag.Address(grpcAddress))
168+
return p, nil
169+
}
170+
171+
// noOpSubscriber is a no-op implementation of peer.Subscriber
172+
type noOpSubscriber struct{}
173+
174+
func (*noOpSubscriber) NotifyStatusChanged(peer.Identifier) {}

0 commit comments

Comments
 (0)