Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,18 @@ public interface NodeShardingInfo {
* to the Node.
*/
public int shardId(Token t);

/**
* Returns shard aware port number where Scylla is listening for CQL connections. If present, it
* works almost the same way as port 9042 typically does; the difference is that client-side port
* number is used as an indicator to which shard client wants to connect.
*/
public Integer getShardAwarePort();

/**
* Returns shard aware SSL port number where Scylla is listening for encrypted CQL connections. If
* present, it works almost the same way as port 9142 typically does; the difference is that
* client-side port number is used as an indicator to which shard client wants to connect.
*/
public Integer getShardAwarePortSsl();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,29 @@ public class ShardingInfo implements NodeShardingInfo {
private static final String SCYLLA_PARTITIONER = "SCYLLA_PARTITIONER";
private static final String SCYLLA_SHARDING_ALGORITHM = "SCYLLA_SHARDING_ALGORITHM";
private static final String SCYLLA_SHARDING_IGNORE_MSB = "SCYLLA_SHARDING_IGNORE_MSB";
private static final String SCYLLA_SHARD_AWARE_PORT_PARAM_KEY = "SCYLLA_SHARD_AWARE_PORT";
private static final String SCYLLA_SHARD_AWARE_PORT_SSL_PARAM_KEY = "SCYLLA_SHARD_AWARE_PORT_SSL";

private final int shardsCount;
private final String partitioner;
private final String shardingAlgorithm;
private final int shardingIgnoreMSB;
private final Integer shardAwarePort;
private final Integer shardAwarePortSsl;

private ShardingInfo(
int shardsCount, String partitioner, String shardingAlgorithm, int shardingIgnoreMSB) {
int shardsCount,
String partitioner,
String shardingAlgorithm,
int shardingIgnoreMSB,
Integer shardAwarePort,
Integer shardAwarePortSsl) {
this.shardsCount = shardsCount;
this.partitioner = partitioner;
this.shardingAlgorithm = shardingAlgorithm;
this.shardingIgnoreMSB = shardingIgnoreMSB;
this.shardAwarePort = shardAwarePort;
this.shardAwarePortSsl = shardAwarePortSsl;
}

@Override
Expand Down Expand Up @@ -78,6 +89,16 @@ public int shardId(Token t) {
return (int) (sum >>> 32);
}

@Override
public Integer getShardAwarePort() {
return shardAwarePort;
}

@Override
public Integer getShardAwarePortSsl() {
return shardAwarePortSsl;
}

public static class ConnectionShardingInfo {
public final int shardId;
public final ShardingInfo shardingInfo;
Expand All @@ -94,6 +115,8 @@ public static ConnectionShardingInfo parseShardingInfo(Map<String, List<String>>
String partitioner = parseString(params, SCYLLA_PARTITIONER);
String shardingAlgorithm = parseString(params, SCYLLA_SHARDING_ALGORITHM);
Integer shardingIgnoreMSB = parseInt(params, SCYLLA_SHARDING_IGNORE_MSB);
Integer shardAwarePort = parseInt(params, SCYLLA_SHARD_AWARE_PORT_PARAM_KEY);
Integer shardAwarePortSsl = parseInt(params, SCYLLA_SHARD_AWARE_PORT_SSL_PARAM_KEY);
if (shardId == null
|| shardsCount == null
|| partitioner == null
Expand All @@ -104,7 +127,14 @@ public static ConnectionShardingInfo parseShardingInfo(Map<String, List<String>>
return null;
}
return new ConnectionShardingInfo(
shardId, new ShardingInfo(shardsCount, partitioner, shardingAlgorithm, shardingIgnoreMSB));
shardId,
new ShardingInfo(
shardsCount,
partitioner,
shardingAlgorithm,
shardingIgnoreMSB,
shardAwarePort,
shardAwarePortSsl));
}

private static String parseString(Map<String, List<String>> params, String key) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.datastax.oss.driver.internal.core.protocol;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Test;

public class ShardingInfoTest {

@Test
public void should_create_ShardingInfo_from_valid_params() {
// Mostly just a sanity check. Parses example data returned by docker instance.
Map<String, List<String>> params = new HashMap<>();
params.put("COMPRESSION", Arrays.asList("lz4", "snappy"));
params.put("CQL_VERSION", Collections.singletonList("3.3.1"));
params.put(
"SCYLLA_LWT_ADD_METADATA_MARK",
Collections.singletonList("LWT_OPTIMIZATION_META_BIT_MASK=2147483648"));
params.put("SCYLLA_NR_SHARDS", Collections.singletonList("12"));
params.put(
"SCYLLA_PARTITIONER",
Collections.singletonList("org.apache.cassandra.dht.Murmur3Partitioner"));
params.put("SCYLLA_RATE_LIMIT_ERROR", Collections.singletonList("ERROR_CODE=61440"));
params.put("SCYLLA_SHARD", Collections.singletonList("0"));
params.put("SCYLLA_SHARDING_ALGORITHM", Collections.singletonList("biased-token-round-robin"));
params.put("SCYLLA_SHARDING_IGNORE_MSB", Collections.singletonList("12"));
params.put("SCYLLA_SHARD_AWARE_PORT", Collections.singletonList("19042"));
params.put("TABLETS_ROUTING_V1", Collections.emptyList());

ShardingInfo.ConnectionShardingInfo info = ShardingInfo.parseShardingInfo(params);

assertThat(info).isNotNull();
assertThat(info.shardId).isEqualTo(0);
assertThat(info.shardingInfo.getShardsCount()).isEqualTo(12);
assertThat(info.shardingInfo.getPartitioner())
.isEqualTo("org.apache.cassandra.dht.Murmur3Partitioner");
assertThat(info.shardingInfo.getShardingAlgorithm()).isEqualTo("biased-token-round-robin");
assertThat(info.shardingInfo.getShardAwarePort()).isEqualTo(19042);
assertThat(info.shardingInfo.getShardAwarePortSsl()).isNull();
}
}
Loading