Skip to content

Commit ca86563

Browse files
committed
Add peer token unit tests
1 parent c1b02e4 commit ca86563

File tree

3 files changed

+129
-3
lines changed

3 files changed

+129
-3
lines changed

proxy/proxy.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -375,11 +375,15 @@ func (p *Proxy) buildNodes() (err error) {
375375
return errors.New("tokens must be provided for all peer proxies if tokens are provided for this proxy")
376376
}
377377
nodes = append(nodes, &node{
378-
addr: addr,
379-
dc: dc,
378+
addr: addr,
379+
dc: dc,
380+
tokens: peer.Tokens,
380381
})
381382
}
382383

384+
// If tokens are not provided then we calculate tokens by sorting the addresses and assigning an even portion of the
385+
// ring to each proxy. This should be deterministic in multiple independent proxies, assuming they have the same
386+
// list of peers.
383387
if calculateTokens && len(nodes) > 1 {
384388
sort.Slice(nodes, func(i, j int) bool {
385389
return compareIPAddr(nodes[i].addr, nodes[j].addr) < 0

proxy/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ var config struct {
4949
ProtocolVersion string `yaml:"protocol-version" help:"Initial protocol version to use when connecting to the backend cluster (default: v4, options: v3, v4, v5, DSEv1, DSEv2)" default:"v4" short:"n" env:"PROTOCOL_VERSION"`
5050
MaxProtocolVersion string `yaml:"max-protocol-version" help:"Max protocol version supported by the backend cluster (default: v4, options: v3, v4, v5, DSEv1, DSEv2)" default:"v4" short:"m" env:"MAX_PROTOCOL_VERSION"`
5151
Bind string `yaml:"bind" help:"Address to use to bind server" short:"a" default:":9042" env:"BIND"`
52-
Config *os.File `yaml:"-" help:"YAML configuration file" short:"f" env:"CONFIG_FILE"`
52+
Config *os.File `yaml:"-" help:"YAML configuration file" short:"f" env:"CONFIG_FILE"` // Not available in the configuration file
5353
Debug bool `yaml:"debug" help:"Show debug logging" default:"false" env:"DEBUG"`
5454
HealthCheck bool `yaml:"health-check" help:"Enable liveness and readiness checks" default:"false" env:"HEALTH_CHECK"`
5555
HttpBind string `yaml:"http-bind" help:"Address to use to bind HTTP server used for health checks" default:":8000" env:"HTTP_BIND"`

proxy/run_test.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,89 @@ func TestRun_ConfigFileWithPeers(t *testing.T) {
179179
assert.Equal(t, "-3074457345618258602", *tokens[0])
180180
}
181181

182+
func TestRun_ConfigFileWithTokensProvided(t *testing.T) {
183+
ctx, cancel := context.WithCancel(context.Background())
184+
defer cancel()
185+
186+
cluster := proxycore.NewMockCluster(net.ParseIP(testClusterStartIP), testClusterPort)
187+
188+
err := cluster.Add(ctx, 1)
189+
require.NoError(t, err)
190+
191+
defer cluster.Shutdown()
192+
193+
configFileName, err := writeTempYaml(struct {
194+
Bind string
195+
Port int
196+
RPCAddr string `yaml:"rpc-address"`
197+
DataCenter string `yaml:"data-center"`
198+
Tokens []string
199+
ContactPoints []string `yaml:"contact-points"`
200+
HealthCheck bool `yaml:"health-check"`
201+
HttpBind string `yaml:"http-bind"`
202+
Peers []PeerConfig
203+
}{
204+
Bind: "127.0.0.1:9042",
205+
RPCAddr: "127.0.0.1",
206+
DataCenter: "dc-1",
207+
Tokens: []string{"0", "1"}, // Provide custom tokens
208+
Port: testClusterPort,
209+
ContactPoints: []string{testClusterContactPoint},
210+
HealthCheck: true,
211+
HttpBind: testProxyHTTPBind,
212+
Peers: []PeerConfig{{
213+
RPCAddr: "127.0.0.2",
214+
Tokens: []string{"42", "613"}, // Same here
215+
}},
216+
})
217+
218+
go func() {
219+
rc := Run(ctx, []string{
220+
"--config", configFileName,
221+
})
222+
require.Equal(t, 0, rc)
223+
}()
224+
225+
waitUntil(10*time.Second, func() bool {
226+
res, err := http.Get(fmt.Sprintf("http://%s%s", testProxyHTTPBind, livenessPath))
227+
return err == nil && res.StatusCode == http.StatusOK
228+
})
229+
230+
cl := connectTestClient(t, ctx)
231+
232+
rs, err := cl.Query(ctx, primitive.ProtocolVersion4, &message.Query{
233+
Query: "SELECT rpc_address, tokens FROM system.local",
234+
})
235+
require.Equal(t, rs.RowCount(), 1)
236+
237+
rpcAddress, err := rs.Row(0).InetByName("rpc_address")
238+
require.NoError(t, err)
239+
assert.Equal(t, "127.0.0.1", rpcAddress.String())
240+
241+
val, err := rs.Row(0).ByName("tokens")
242+
require.NoError(t, err)
243+
tokens := val.([]*string)
244+
assert.NotEmpty(t, tokens)
245+
assert.Equal(t, "0", *tokens[0])
246+
assert.Equal(t, "1", *tokens[1])
247+
248+
rs, err = cl.Query(ctx, primitive.ProtocolVersion4, &message.Query{
249+
Query: "SELECT rpc_address, data_center, tokens FROM system.peers",
250+
})
251+
require.Equal(t, rs.RowCount(), 1)
252+
253+
rpcAddress, err = rs.Row(0).InetByName("rpc_address")
254+
require.NoError(t, err)
255+
assert.Equal(t, "127.0.0.2", rpcAddress.String())
256+
257+
val, err = rs.Row(0).ByName("tokens")
258+
require.NoError(t, err)
259+
tokens = val.([]*string)
260+
assert.NotEmpty(t, tokens)
261+
assert.Equal(t, "42", *tokens[0])
262+
assert.Equal(t, "613", *tokens[1])
263+
}
264+
182265
func TestRun_ConfigFileWithPeersAndNoRPCAddr(t *testing.T) {
183266
ctx, cancel := context.WithCancel(context.Background())
184267
defer cancel()
@@ -215,6 +298,45 @@ func TestRun_ConfigFileWithPeersAndNoRPCAddr(t *testing.T) {
215298
require.Equal(t, 1, rc)
216299
}
217300

301+
func TestRun_ConfigFileWithNoPeerTokens(t *testing.T) {
302+
ctx, cancel := context.WithCancel(context.Background())
303+
defer cancel()
304+
305+
cluster := proxycore.NewMockCluster(net.ParseIP(testClusterStartIP), testClusterPort)
306+
307+
err := cluster.Add(ctx, 1)
308+
require.NoError(t, err)
309+
310+
defer cluster.Shutdown()
311+
312+
configFileName, err := writeTempYaml(struct {
313+
Bind string
314+
Port int
315+
RPCAddr string `yaml:"rpc-address"`
316+
DataCenter string `yaml:"data-center"`
317+
Tokens []string
318+
ContactPoints []string `yaml:"contact-points"`
319+
Peers []PeerConfig
320+
}{
321+
ContactPoints: []string{testClusterContactPoint},
322+
Port: testClusterPort,
323+
Bind: "127.0.0.1:9042",
324+
RPCAddr: "127.0.0.1",
325+
Tokens: []string{"0"}, // Local tokens set
326+
Peers: []PeerConfig{{
327+
RPCAddr: "127.0.0.2",
328+
DC: "dc-2",
329+
// No peer tokens
330+
}},
331+
})
332+
require.NoError(t, err)
333+
334+
rc := Run(ctx, []string{
335+
"--config", configFileName,
336+
})
337+
require.Equal(t, 1, rc)
338+
}
339+
218340
func TestRun_ConfigFileWithInvalidPeer(t *testing.T) {
219341
ctx, cancel := context.WithCancel(context.Background())
220342
defer cancel()

0 commit comments

Comments
 (0)