Skip to content

Commit 278008f

Browse files
committed
added tests
Signed-off-by: Jakob Haahr Taankvist <[email protected]>
1 parent 0d02c5d commit 278008f

File tree

2 files changed

+267
-3
lines changed

2 files changed

+267
-3
lines changed

service/sharddistributor/client/spectatorclient/peer_chooser.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ import (
1313

1414
"github.com/uber/cadence/common/log"
1515
"github.com/uber/cadence/common/log/tag"
16-
"github.com/uber/cadence/service/sharddistributor/canary/metadata"
1716
)
1817

19-
const NamespaceHeader = "x-shard-distributor-namespace"
18+
const (
19+
NamespaceHeader = "x-shard-distributor-namespace"
20+
grpcAddressMetadataKey = "grpc_address"
21+
)
2022

2123
// SpectatorPeerChooserInterface extends peer.Chooser with SetSpectators method
2224
type SpectatorPeerChooserInterface interface {
@@ -122,7 +124,7 @@ func (c *SpectatorPeerChooser) Choose(ctx context.Context, req *transport.Reques
122124
}
123125

124126
// Extract GRPC address from owner metadata
125-
grpcAddress, ok := owner.Metadata[metadata.MetadataKeyGRPCAddress]
127+
grpcAddress, ok := owner.Metadata[grpcAddressMetadataKey]
126128
if !ok || grpcAddress == "" {
127129
return nil, nil, yarpcerrors.InternalErrorf("no grpc_address in metadata for executor %s owning shard %s", owner.ExecutorID, req.ShardKey)
128130
}
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
package spectatorclient
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
"github.com/golang/mock/gomock"
9+
"github.com/stretchr/testify/assert"
10+
"go.uber.org/yarpc/api/peer"
11+
"go.uber.org/yarpc/api/transport"
12+
"go.uber.org/yarpc/yarpcerrors"
13+
14+
"github.com/uber/cadence/common/log/testlogger"
15+
)
16+
17+
func TestSpectatorPeerChooser_Choose_Success(t *testing.T) {
18+
ctrl := gomock.NewController(t)
19+
defer ctrl.Finish()
20+
21+
mockTransport := newMockTransport(ctrl)
22+
mockSpectator := NewMockSpectator(ctrl)
23+
mockPeer := newMockPeer()
24+
25+
chooser := &SpectatorPeerChooser{
26+
transport: mockTransport,
27+
logger: testlogger.New(t),
28+
peers: make(map[string]peer.Peer),
29+
spectators: &mockSpectators{
30+
spectators: map[string]Spectator{"test-namespace": mockSpectator},
31+
},
32+
}
33+
34+
ctx := context.Background()
35+
req := &transport.Request{
36+
ShardKey: "shard-1",
37+
Headers: transport.NewHeaders().With(NamespaceHeader, "test-namespace"),
38+
}
39+
40+
mockSpectator.EXPECT().GetShardOwner(ctx, "shard-1").Return(&ExecutorOwnership{
41+
ExecutorID: "executor-1",
42+
Metadata: map[string]string{"grpc_address": "127.0.0.1:7953"},
43+
}, nil)
44+
mockTransport.expectRetainPeer(mockPeer, nil)
45+
46+
p, onFinish, err := chooser.Choose(ctx, req)
47+
48+
assert.NoError(t, err)
49+
assert.Equal(t, mockPeer, p)
50+
assert.NotNil(t, onFinish)
51+
assert.Len(t, chooser.peers, 1)
52+
}
53+
54+
func TestSpectatorPeerChooser_Choose_ReusesPeer(t *testing.T) {
55+
ctrl := gomock.NewController(t)
56+
defer ctrl.Finish()
57+
58+
mockSpectator := NewMockSpectator(ctrl)
59+
mockPeer := newMockPeer()
60+
61+
chooser := &SpectatorPeerChooser{
62+
logger: testlogger.New(t),
63+
peers: map[string]peer.Peer{"127.0.0.1:7953": mockPeer},
64+
spectators: &mockSpectators{
65+
spectators: map[string]Spectator{"test-namespace": mockSpectator},
66+
},
67+
}
68+
69+
req := &transport.Request{
70+
ShardKey: "shard-1",
71+
Headers: transport.NewHeaders().With(NamespaceHeader, "test-namespace"),
72+
}
73+
74+
mockSpectator.EXPECT().GetShardOwner(gomock.Any(), "shard-1").Return(&ExecutorOwnership{
75+
ExecutorID: "executor-1",
76+
Metadata: map[string]string{"grpc_address": "127.0.0.1:7953"},
77+
}, nil)
78+
79+
p, _, err := chooser.Choose(context.Background(), req)
80+
81+
assert.NoError(t, err)
82+
assert.Equal(t, mockPeer, p)
83+
assert.Len(t, chooser.peers, 1)
84+
}
85+
86+
func TestSpectatorPeerChooser_Choose_Errors(t *testing.T) {
87+
tests := []struct {
88+
name string
89+
shardKey string
90+
namespace string
91+
setupMock func(*gomock.Controller) Spectator
92+
expectedError string
93+
errorType func(error) bool
94+
}{
95+
{
96+
name: "missing shard key",
97+
shardKey: "",
98+
namespace: "test-ns",
99+
expectedError: "ShardKey to be non-empty",
100+
errorType: yarpcerrors.IsInvalidArgument,
101+
},
102+
{
103+
name: "missing namespace header",
104+
shardKey: "shard-1",
105+
namespace: "",
106+
expectedError: "x-shard-distributor-namespace",
107+
errorType: yarpcerrors.IsInvalidArgument,
108+
},
109+
{
110+
name: "spectator returns error",
111+
shardKey: "shard-1",
112+
namespace: "test-ns",
113+
setupMock: func(ctrl *gomock.Controller) Spectator {
114+
m := NewMockSpectator(ctrl)
115+
m.EXPECT().GetShardOwner(gomock.Any(), "shard-1").
116+
Return(nil, errors.New("shard not assigned"))
117+
return m
118+
},
119+
expectedError: "failed to get shard owner",
120+
errorType: yarpcerrors.IsUnavailable,
121+
},
122+
{
123+
name: "missing grpc_address in metadata",
124+
shardKey: "shard-1",
125+
namespace: "test-ns",
126+
setupMock: func(ctrl *gomock.Controller) Spectator {
127+
m := NewMockSpectator(ctrl)
128+
m.EXPECT().GetShardOwner(gomock.Any(), "shard-1").
129+
Return(&ExecutorOwnership{ExecutorID: "executor-1", Metadata: map[string]string{}}, nil)
130+
return m
131+
},
132+
expectedError: "no grpc_address in metadata",
133+
errorType: yarpcerrors.IsInternal,
134+
},
135+
}
136+
137+
for _, tt := range tests {
138+
t.Run(tt.name, func(t *testing.T) {
139+
ctrl := gomock.NewController(t)
140+
defer ctrl.Finish()
141+
142+
var spectators Spectators
143+
if tt.setupMock != nil {
144+
spectators = &mockSpectators{
145+
spectators: map[string]Spectator{"test-ns": tt.setupMock(ctrl)},
146+
}
147+
} else {
148+
spectators = &mockSpectators{spectators: map[string]Spectator{}}
149+
}
150+
151+
chooser := &SpectatorPeerChooser{
152+
logger: testlogger.New(t),
153+
peers: make(map[string]peer.Peer),
154+
spectators: spectators,
155+
}
156+
157+
req := &transport.Request{
158+
ShardKey: tt.shardKey,
159+
Headers: transport.NewHeaders().With(NamespaceHeader, tt.namespace),
160+
}
161+
162+
p, onFinish, err := chooser.Choose(context.Background(), req)
163+
164+
assert.Error(t, err)
165+
assert.Nil(t, p)
166+
assert.Nil(t, onFinish)
167+
assert.Contains(t, err.Error(), tt.expectedError)
168+
if tt.errorType != nil {
169+
assert.True(t, tt.errorType(err))
170+
}
171+
})
172+
}
173+
}
174+
175+
func TestSpectatorPeerChooser_Stop_ReleasesPeers(t *testing.T) {
176+
ctrl := gomock.NewController(t)
177+
defer ctrl.Finish()
178+
179+
mockTransport := newMockTransport(ctrl)
180+
mockPeer1, mockPeer2 := newMockPeer(), newMockPeer()
181+
182+
chooser := &SpectatorPeerChooser{
183+
transport: mockTransport,
184+
logger: testlogger.New(t),
185+
peers: map[string]peer.Peer{
186+
"127.0.0.1:7953": mockPeer1,
187+
"127.0.0.1:7954": mockPeer2,
188+
},
189+
}
190+
191+
mockTransport.expectReleasePeer(mockPeer1, nil)
192+
mockTransport.expectReleasePeer(mockPeer2, nil)
193+
194+
err := chooser.Stop()
195+
assert.NoError(t, err)
196+
assert.Empty(t, chooser.peers)
197+
}
198+
199+
// Test helpers
200+
201+
type mockSpectators struct {
202+
spectators map[string]Spectator
203+
}
204+
205+
func (m *mockSpectators) ForNamespace(namespace string) (Spectator, error) {
206+
s, ok := m.spectators[namespace]
207+
if !ok {
208+
return nil, errors.New("spectator not found")
209+
}
210+
return s, nil
211+
}
212+
213+
type mockTransport struct {
214+
ctrl *gomock.Controller
215+
retainPeerFunc func(peer.Identifier, peer.Subscriber) (peer.Peer, error)
216+
releasePeerFunc func(peer.Peer, peer.Subscriber) error
217+
}
218+
219+
func newMockTransport(ctrl *gomock.Controller) *mockTransport {
220+
return &mockTransport{ctrl: ctrl}
221+
}
222+
223+
func (m *mockTransport) expectRetainPeer(p peer.Peer, err error) {
224+
m.retainPeerFunc = func(peer.Identifier, peer.Subscriber) (peer.Peer, error) {
225+
return p, err
226+
}
227+
}
228+
229+
func (m *mockTransport) expectReleasePeer(p peer.Peer, err error) {
230+
old := m.releasePeerFunc
231+
m.releasePeerFunc = func(peer peer.Peer, sub peer.Subscriber) error {
232+
if peer == p {
233+
return err
234+
}
235+
if old != nil {
236+
return old(peer, sub)
237+
}
238+
return nil
239+
}
240+
}
241+
242+
func (m *mockTransport) RetainPeer(pid peer.Identifier, sub peer.Subscriber) (peer.Peer, error) {
243+
if m.retainPeerFunc != nil {
244+
return m.retainPeerFunc(pid, sub)
245+
}
246+
return nil, errors.New("unexpected call to RetainPeer")
247+
}
248+
249+
func (m *mockTransport) ReleasePeer(p peer.Peer, sub peer.Subscriber) error {
250+
if m.releasePeerFunc != nil {
251+
return m.releasePeerFunc(p, sub)
252+
}
253+
return errors.New("unexpected call to ReleasePeer")
254+
}
255+
256+
type mockPeer struct{}
257+
258+
func newMockPeer() peer.Peer { return &mockPeer{} }
259+
func (m *mockPeer) Identifier() string { return "mock-peer" }
260+
func (m *mockPeer) Status() peer.Status { return peer.Status{} }
261+
func (m *mockPeer) StartRequest() {}
262+
func (m *mockPeer) EndRequest() {}

0 commit comments

Comments
 (0)