Skip to content

Commit 17e03b1

Browse files
Bouncheckdkropachev
authored andcommitted
Add shard aware ports to ShardingInfo
1 parent 5727caa commit 17e03b1

File tree

3 files changed

+91
-2
lines changed

3 files changed

+91
-2
lines changed

core/src/main/java/com/datastax/oss/driver/api/core/metadata/NodeShardingInfo.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,18 @@ public interface NodeShardingInfo {
1616
* to the Node.
1717
*/
1818
public int shardId(Token t);
19+
20+
/**
21+
* Returns shard aware port number where Scylla is listening for CQL connections. If present, it
22+
* works almost the same way as port 9042 typically does; the difference is that client-side port
23+
* number is used as an indicator to which shard client wants to connect.
24+
*/
25+
public Integer getShardAwarePort();
26+
27+
/**
28+
* Returns shard aware SSL port number where Scylla is listening for encrypted CQL connections. If
29+
* present, it works almost the same way as port 9142 typically does; the difference is that
30+
* client-side port number is used as an indicator to which shard client wants to connect.
31+
*/
32+
public Integer getShardAwarePortSsl();
1933
}

core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ShardingInfo.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,29 @@ public class ShardingInfo implements NodeShardingInfo {
3535
private static final String SCYLLA_PARTITIONER = "SCYLLA_PARTITIONER";
3636
private static final String SCYLLA_SHARDING_ALGORITHM = "SCYLLA_SHARDING_ALGORITHM";
3737
private static final String SCYLLA_SHARDING_IGNORE_MSB = "SCYLLA_SHARDING_IGNORE_MSB";
38+
private static final String SCYLLA_SHARD_AWARE_PORT_PARAM_KEY = "SCYLLA_SHARD_AWARE_PORT";
39+
private static final String SCYLLA_SHARD_AWARE_PORT_SSL_PARAM_KEY = "SCYLLA_SHARD_AWARE_PORT_SSL";
3840

3941
private final int shardsCount;
4042
private final String partitioner;
4143
private final String shardingAlgorithm;
4244
private final int shardingIgnoreMSB;
45+
private final Integer shardAwarePort;
46+
private final Integer shardAwarePortSsl;
4347

4448
private ShardingInfo(
45-
int shardsCount, String partitioner, String shardingAlgorithm, int shardingIgnoreMSB) {
49+
int shardsCount,
50+
String partitioner,
51+
String shardingAlgorithm,
52+
int shardingIgnoreMSB,
53+
Integer shardAwarePort,
54+
Integer shardAwarePortSsl) {
4655
this.shardsCount = shardsCount;
4756
this.partitioner = partitioner;
4857
this.shardingAlgorithm = shardingAlgorithm;
4958
this.shardingIgnoreMSB = shardingIgnoreMSB;
59+
this.shardAwarePort = shardAwarePort;
60+
this.shardAwarePortSsl = shardAwarePortSsl;
5061
}
5162

5263
@Override
@@ -78,6 +89,16 @@ public int shardId(Token t) {
7889
return (int) (sum >>> 32);
7990
}
8091

92+
@Override
93+
public Integer getShardAwarePort() {
94+
return shardAwarePort;
95+
}
96+
97+
@Override
98+
public Integer getShardAwarePortSsl() {
99+
return shardAwarePortSsl;
100+
}
101+
81102
public static class ConnectionShardingInfo {
82103
public final int shardId;
83104
public final ShardingInfo shardingInfo;
@@ -94,6 +115,8 @@ public static ConnectionShardingInfo parseShardingInfo(Map<String, List<String>>
94115
String partitioner = parseString(params, SCYLLA_PARTITIONER);
95116
String shardingAlgorithm = parseString(params, SCYLLA_SHARDING_ALGORITHM);
96117
Integer shardingIgnoreMSB = parseInt(params, SCYLLA_SHARDING_IGNORE_MSB);
118+
Integer shardAwarePort = parseInt(params, SCYLLA_SHARD_AWARE_PORT_PARAM_KEY);
119+
Integer shardAwarePortSsl = parseInt(params, SCYLLA_SHARD_AWARE_PORT_SSL_PARAM_KEY);
97120
if (shardId == null
98121
|| shardsCount == null
99122
|| partitioner == null
@@ -104,7 +127,14 @@ public static ConnectionShardingInfo parseShardingInfo(Map<String, List<String>>
104127
return null;
105128
}
106129
return new ConnectionShardingInfo(
107-
shardId, new ShardingInfo(shardsCount, partitioner, shardingAlgorithm, shardingIgnoreMSB));
130+
shardId,
131+
new ShardingInfo(
132+
shardsCount,
133+
partitioner,
134+
shardingAlgorithm,
135+
shardingIgnoreMSB,
136+
shardAwarePort,
137+
shardAwarePortSsl));
108138
}
109139

110140
private static String parseString(Map<String, List<String>> params, String key) {
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.datastax.oss.driver.internal.core.protocol;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import java.util.Arrays;
6+
import java.util.Collections;
7+
import java.util.HashMap;
8+
import java.util.List;
9+
import java.util.Map;
10+
import org.junit.Test;
11+
12+
public class ShardingInfoTest {
13+
14+
@Test
15+
public void should_create_ShardingInfo_from_valid_params() {
16+
// Mostly just a sanity check. Parses example data returned by docker instance.
17+
Map<String, List<String>> params = new HashMap<>();
18+
params.put("COMPRESSION", Arrays.asList("lz4", "snappy"));
19+
params.put("CQL_VERSION", Collections.singletonList("3.3.1"));
20+
params.put(
21+
"SCYLLA_LWT_ADD_METADATA_MARK",
22+
Collections.singletonList("LWT_OPTIMIZATION_META_BIT_MASK=2147483648"));
23+
params.put("SCYLLA_NR_SHARDS", Collections.singletonList("12"));
24+
params.put(
25+
"SCYLLA_PARTITIONER",
26+
Collections.singletonList("org.apache.cassandra.dht.Murmur3Partitioner"));
27+
params.put("SCYLLA_RATE_LIMIT_ERROR", Collections.singletonList("ERROR_CODE=61440"));
28+
params.put("SCYLLA_SHARD", Collections.singletonList("0"));
29+
params.put("SCYLLA_SHARDING_ALGORITHM", Collections.singletonList("biased-token-round-robin"));
30+
params.put("SCYLLA_SHARDING_IGNORE_MSB", Collections.singletonList("12"));
31+
params.put("SCYLLA_SHARD_AWARE_PORT", Collections.singletonList("19042"));
32+
params.put("TABLETS_ROUTING_V1", Collections.emptyList());
33+
34+
ShardingInfo.ConnectionShardingInfo info = ShardingInfo.parseShardingInfo(params);
35+
36+
assertThat(info).isNotNull();
37+
assertThat(info.shardId).isEqualTo(0);
38+
assertThat(info.shardingInfo.getShardsCount()).isEqualTo(12);
39+
assertThat(info.shardingInfo.getPartitioner())
40+
.isEqualTo("org.apache.cassandra.dht.Murmur3Partitioner");
41+
assertThat(info.shardingInfo.getShardingAlgorithm()).isEqualTo("biased-token-round-robin");
42+
assertThat(info.shardingInfo.getShardAwarePort()).isEqualTo(19042);
43+
assertThat(info.shardingInfo.getShardAwarePortSsl()).isNull();
44+
}
45+
}

0 commit comments

Comments
 (0)