diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/NodeShardingInfo.java b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/NodeShardingInfo.java index ee9ef908e40..d01ca74a9d0 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/NodeShardingInfo.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/NodeShardingInfo.java @@ -16,4 +16,18 @@ public interface NodeShardingInfo { * to the Node. */ public int shardId(Token t); + + /** + * Returns shard aware port number where Scylla is listening for CQL connections. If present, it + * works almost the same way as port 9042 typically does; the difference is that client-side port + * number is used as an indicator to which shard client wants to connect. + */ + public Integer getShardAwarePort(); + + /** + * Returns shard aware SSL port number where Scylla is listening for encrypted CQL connections. If + * present, it works almost the same way as port 9142 typically does; the difference is that + * client-side port number is used as an indicator to which shard client wants to connect. + */ + public Integer getShardAwarePortSsl(); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ShardingInfo.java b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ShardingInfo.java index e7a23de6fc8..81eaaa8dd75 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ShardingInfo.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ShardingInfo.java @@ -35,18 +35,29 @@ public class ShardingInfo implements NodeShardingInfo { private static final String SCYLLA_PARTITIONER = "SCYLLA_PARTITIONER"; private static final String SCYLLA_SHARDING_ALGORITHM = "SCYLLA_SHARDING_ALGORITHM"; private static final String SCYLLA_SHARDING_IGNORE_MSB = "SCYLLA_SHARDING_IGNORE_MSB"; + private static final String SCYLLA_SHARD_AWARE_PORT_PARAM_KEY = "SCYLLA_SHARD_AWARE_PORT"; + private static final String SCYLLA_SHARD_AWARE_PORT_SSL_PARAM_KEY = "SCYLLA_SHARD_AWARE_PORT_SSL"; private final int shardsCount; private final String partitioner; private final String shardingAlgorithm; private final int shardingIgnoreMSB; + private final Integer shardAwarePort; + private final Integer shardAwarePortSsl; private ShardingInfo( - int shardsCount, String partitioner, String shardingAlgorithm, int shardingIgnoreMSB) { + int shardsCount, + String partitioner, + String shardingAlgorithm, + int shardingIgnoreMSB, + Integer shardAwarePort, + Integer shardAwarePortSsl) { this.shardsCount = shardsCount; this.partitioner = partitioner; this.shardingAlgorithm = shardingAlgorithm; this.shardingIgnoreMSB = shardingIgnoreMSB; + this.shardAwarePort = shardAwarePort; + this.shardAwarePortSsl = shardAwarePortSsl; } @Override @@ -78,6 +89,16 @@ public int shardId(Token t) { return (int) (sum >>> 32); } + @Override + public Integer getShardAwarePort() { + return shardAwarePort; + } + + @Override + public Integer getShardAwarePortSsl() { + return shardAwarePortSsl; + } + public static class ConnectionShardingInfo { public final int shardId; public final ShardingInfo shardingInfo; @@ -94,6 +115,8 @@ public static ConnectionShardingInfo parseShardingInfo(Map> String partitioner = parseString(params, SCYLLA_PARTITIONER); String shardingAlgorithm = parseString(params, SCYLLA_SHARDING_ALGORITHM); Integer shardingIgnoreMSB = parseInt(params, SCYLLA_SHARDING_IGNORE_MSB); + Integer shardAwarePort = parseInt(params, SCYLLA_SHARD_AWARE_PORT_PARAM_KEY); + Integer shardAwarePortSsl = parseInt(params, SCYLLA_SHARD_AWARE_PORT_SSL_PARAM_KEY); if (shardId == null || shardsCount == null || partitioner == null @@ -104,7 +127,14 @@ public static ConnectionShardingInfo parseShardingInfo(Map> return null; } return new ConnectionShardingInfo( - shardId, new ShardingInfo(shardsCount, partitioner, shardingAlgorithm, shardingIgnoreMSB)); + shardId, + new ShardingInfo( + shardsCount, + partitioner, + shardingAlgorithm, + shardingIgnoreMSB, + shardAwarePort, + shardAwarePortSsl)); } private static String parseString(Map> params, String key) { diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/protocol/ShardingInfoTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/protocol/ShardingInfoTest.java new file mode 100644 index 00000000000..07686493d46 --- /dev/null +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/protocol/ShardingInfoTest.java @@ -0,0 +1,45 @@ +package com.datastax.oss.driver.internal.core.protocol; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.Test; + +public class ShardingInfoTest { + + @Test + public void should_create_ShardingInfo_from_valid_params() { + // Mostly just a sanity check. Parses example data returned by docker instance. + Map> params = new HashMap<>(); + params.put("COMPRESSION", Arrays.asList("lz4", "snappy")); + params.put("CQL_VERSION", Collections.singletonList("3.3.1")); + params.put( + "SCYLLA_LWT_ADD_METADATA_MARK", + Collections.singletonList("LWT_OPTIMIZATION_META_BIT_MASK=2147483648")); + params.put("SCYLLA_NR_SHARDS", Collections.singletonList("12")); + params.put( + "SCYLLA_PARTITIONER", + Collections.singletonList("org.apache.cassandra.dht.Murmur3Partitioner")); + params.put("SCYLLA_RATE_LIMIT_ERROR", Collections.singletonList("ERROR_CODE=61440")); + params.put("SCYLLA_SHARD", Collections.singletonList("0")); + params.put("SCYLLA_SHARDING_ALGORITHM", Collections.singletonList("biased-token-round-robin")); + params.put("SCYLLA_SHARDING_IGNORE_MSB", Collections.singletonList("12")); + params.put("SCYLLA_SHARD_AWARE_PORT", Collections.singletonList("19042")); + params.put("TABLETS_ROUTING_V1", Collections.emptyList()); + + ShardingInfo.ConnectionShardingInfo info = ShardingInfo.parseShardingInfo(params); + + assertThat(info).isNotNull(); + assertThat(info.shardId).isEqualTo(0); + assertThat(info.shardingInfo.getShardsCount()).isEqualTo(12); + assertThat(info.shardingInfo.getPartitioner()) + .isEqualTo("org.apache.cassandra.dht.Murmur3Partitioner"); + assertThat(info.shardingInfo.getShardingAlgorithm()).isEqualTo("biased-token-round-robin"); + assertThat(info.shardingInfo.getShardAwarePort()).isEqualTo(19042); + assertThat(info.shardingInfo.getShardAwarePortSsl()).isNull(); + } +}