Skip to content

Commit f4bb653

Browse files
committed
Add tests for SpectatorPeerChooser
Tests cover: - Success path with peer creation - Peer reuse on subsequent calls - Error cases (missing shard key, namespace header, spectator not found) - Lifecycle methods (Start, Stop, IsRunning) - SetSpectators method Signed-off-by: Jakob Haahr Taankvist <[email protected]>
1 parent 9dfbbbd commit f4bb653

File tree

2 files changed

+192
-3
lines changed

2 files changed

+192
-3
lines changed

service/sharddistributor/client/spectatorclient/peer_chooser.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ const (
2323
// SpectatorPeerChooserInterface extends peer.Chooser with SetSpectators method
2424
type SpectatorPeerChooserInterface interface {
2525
peer.Chooser
26-
SetSpectators(spectators Spectators)
26+
SetSpectators(spectators *Spectators)
2727
}
2828

2929
// SpectatorPeerChooser is a peer.Chooser that uses the Spectator to route requests
@@ -38,7 +38,7 @@ type SpectatorPeerChooserInterface interface {
3838
// 5. Create/reuse peer for that address
3939
// 6. Return peer to YARPC for connection
4040
type SpectatorPeerChooser struct {
41-
spectators Spectators
41+
spectators *Spectators
4242
transport peer.Transport
4343
logger log.Logger
4444
namespace string
@@ -147,7 +147,7 @@ func (c *SpectatorPeerChooser) Choose(ctx context.Context, req *transport.Reques
147147
return p, func(error) {}, nil
148148
}
149149

150-
func (c *SpectatorPeerChooser) SetSpectators(spectators Spectators) {
150+
func (c *SpectatorPeerChooser) SetSpectators(spectators *Spectators) {
151151
c.spectators = spectators
152152
}
153153

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
package spectatorclient
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/require"
9+
"go.uber.org/mock/gomock"
10+
"go.uber.org/yarpc/api/peer"
11+
"go.uber.org/yarpc/api/transport"
12+
"go.uber.org/yarpc/transport/grpc"
13+
14+
"github.com/uber/cadence/common/log/testlogger"
15+
)
16+
17+
func TestSpectatorPeerChooser_Choose_MissingShardKey(t *testing.T) {
18+
chooser := &SpectatorPeerChooser{
19+
logger: testlogger.New(t),
20+
peers: make(map[string]peer.Peer),
21+
}
22+
23+
req := &transport.Request{
24+
ShardKey: "",
25+
Headers: transport.NewHeaders(),
26+
}
27+
28+
p, onFinish, err := chooser.Choose(context.Background(), req)
29+
30+
assert.Error(t, err)
31+
assert.Nil(t, p)
32+
assert.Nil(t, onFinish)
33+
assert.Contains(t, err.Error(), "ShardKey")
34+
}
35+
36+
func TestSpectatorPeerChooser_Choose_MissingNamespaceHeader(t *testing.T) {
37+
chooser := &SpectatorPeerChooser{
38+
logger: testlogger.New(t),
39+
peers: make(map[string]peer.Peer),
40+
}
41+
42+
req := &transport.Request{
43+
ShardKey: "shard-1",
44+
Headers: transport.NewHeaders(),
45+
}
46+
47+
p, onFinish, err := chooser.Choose(context.Background(), req)
48+
49+
assert.Error(t, err)
50+
assert.Nil(t, p)
51+
assert.Nil(t, onFinish)
52+
assert.Contains(t, err.Error(), "x-shard-distributor-namespace")
53+
}
54+
55+
func TestSpectatorPeerChooser_Choose_SpectatorNotFound(t *testing.T) {
56+
chooser := &SpectatorPeerChooser{
57+
logger: testlogger.New(t),
58+
peers: make(map[string]peer.Peer),
59+
spectators: &Spectators{spectators: make(map[string]Spectator)},
60+
}
61+
62+
req := &transport.Request{
63+
ShardKey: "shard-1",
64+
Headers: transport.NewHeaders().With(NamespaceHeader, "unknown-namespace"),
65+
}
66+
67+
p, onFinish, err := chooser.Choose(context.Background(), req)
68+
69+
assert.Error(t, err)
70+
assert.Nil(t, p)
71+
assert.Nil(t, onFinish)
72+
assert.Contains(t, err.Error(), "spectator not found")
73+
}
74+
75+
func TestSpectatorPeerChooser_StartStop(t *testing.T) {
76+
chooser := &SpectatorPeerChooser{
77+
logger: testlogger.New(t),
78+
peers: make(map[string]peer.Peer),
79+
}
80+
81+
err := chooser.Start()
82+
require.NoError(t, err)
83+
84+
assert.True(t, chooser.IsRunning())
85+
86+
err = chooser.Stop()
87+
assert.NoError(t, err)
88+
}
89+
90+
func TestSpectatorPeerChooser_SetSpectators(t *testing.T) {
91+
chooser := &SpectatorPeerChooser{
92+
logger: testlogger.New(t),
93+
}
94+
95+
spectators := &Spectators{spectators: make(map[string]Spectator)}
96+
chooser.SetSpectators(spectators)
97+
98+
assert.Equal(t, spectators, chooser.spectators)
99+
}
100+
101+
func TestSpectatorPeerChooser_Choose_Success(t *testing.T) {
102+
ctrl := gomock.NewController(t)
103+
defer ctrl.Finish()
104+
105+
mockSpectator := NewMockSpectator(ctrl)
106+
peerTransport := grpc.NewTransport()
107+
108+
chooser := &SpectatorPeerChooser{
109+
transport: peerTransport,
110+
logger: testlogger.New(t),
111+
peers: make(map[string]peer.Peer),
112+
spectators: &Spectators{
113+
spectators: map[string]Spectator{
114+
"test-namespace": mockSpectator,
115+
},
116+
},
117+
}
118+
119+
ctx := context.Background()
120+
req := &transport.Request{
121+
ShardKey: "shard-1",
122+
Headers: transport.NewHeaders().With(NamespaceHeader, "test-namespace"),
123+
}
124+
125+
// Mock spectator to return shard owner with grpc_address
126+
mockSpectator.EXPECT().
127+
GetShardOwner(ctx, "shard-1").
128+
Return(&ShardOwner{
129+
ExecutorID: "executor-1",
130+
Metadata: map[string]string{
131+
grpcAddressMetadataKey: "127.0.0.1:7953",
132+
},
133+
}, nil)
134+
135+
// Execute
136+
p, onFinish, err := chooser.Choose(ctx, req)
137+
138+
// Assert
139+
assert.NoError(t, err)
140+
assert.NotNil(t, p)
141+
assert.NotNil(t, onFinish)
142+
assert.Equal(t, "127.0.0.1:7953", p.Identifier())
143+
assert.Len(t, chooser.peers, 1)
144+
}
145+
146+
func TestSpectatorPeerChooser_Choose_ReusesPeer(t *testing.T) {
147+
ctrl := gomock.NewController(t)
148+
defer ctrl.Finish()
149+
150+
mockSpectator := NewMockSpectator(ctrl)
151+
peerTransport := grpc.NewTransport()
152+
153+
chooser := &SpectatorPeerChooser{
154+
transport: peerTransport,
155+
logger: testlogger.New(t),
156+
peers: make(map[string]peer.Peer),
157+
spectators: &Spectators{
158+
spectators: map[string]Spectator{
159+
"test-namespace": mockSpectator,
160+
},
161+
},
162+
}
163+
164+
req := &transport.Request{
165+
ShardKey: "shard-1",
166+
Headers: transport.NewHeaders().With(NamespaceHeader, "test-namespace"),
167+
}
168+
169+
// First call creates the peer
170+
mockSpectator.EXPECT().
171+
GetShardOwner(gomock.Any(), "shard-1").
172+
Return(&ShardOwner{
173+
ExecutorID: "executor-1",
174+
Metadata: map[string]string{
175+
grpcAddressMetadataKey: "127.0.0.1:7953",
176+
},
177+
}, nil).Times(2)
178+
179+
firstPeer, _, err := chooser.Choose(context.Background(), req)
180+
require.NoError(t, err)
181+
182+
// Second call should reuse the same peer
183+
secondPeer, _, err := chooser.Choose(context.Background(), req)
184+
185+
// Assert - should reuse existing peer
186+
assert.NoError(t, err)
187+
assert.Equal(t, firstPeer, secondPeer)
188+
assert.Len(t, chooser.peers, 1)
189+
}

0 commit comments

Comments
 (0)