diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/KeyspaceTableNamePair.java b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/KeyspaceTableNamePair.java new file mode 100644 index 00000000000..e3858cb79b1 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/KeyspaceTableNamePair.java @@ -0,0 +1,51 @@ +package com.datastax.oss.driver.api.core.metadata; + +import com.datastax.oss.driver.api.core.CqlIdentifier; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.util.Objects; + +/** Simple keyspace name and table name pair. */ +public class KeyspaceTableNamePair { + @NonNull private final CqlIdentifier keyspace; + @NonNull private final CqlIdentifier tableName; + + public KeyspaceTableNamePair(@NonNull CqlIdentifier keyspace, @NonNull CqlIdentifier tableName) { + this.keyspace = keyspace; + this.tableName = tableName; + } + + @NonNull + public CqlIdentifier getKeyspace() { + return keyspace; + } + + @NonNull + public CqlIdentifier getTableName() { + return tableName; + } + + @Override + public String toString() { + return "KeyspaceTableNamePair{" + + "keyspace='" + + keyspace + + '\'' + + ", tableName='" + + tableName + + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || !(o instanceof KeyspaceTableNamePair)) return false; + KeyspaceTableNamePair that = (KeyspaceTableNamePair) o; + return keyspace.equals(that.keyspace) && tableName.equals(that.tableName); + } + + @Override + public int hashCode() { + return Objects.hash(keyspace, tableName); + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/Metadata.java b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/Metadata.java index 287298c44fd..9945c3c2d35 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/Metadata.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/Metadata.java @@ -115,6 +115,14 @@ default Optional getKeyspace(@NonNull String keyspaceName) { @NonNull Optional getTokenMap(); + /** + * The tablet map for this cluster. + * + *

Starts as an empty map that will gradually receive updates on each query of a yet unknown + * tablet. + */ + TabletMap getTabletMap(); + /** * The cluster name to which this session is connected. The Optional returned should contain the * value from the server for system.local.cluster_name. diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/Tablet.java b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/Tablet.java new file mode 100644 index 00000000000..f279927cf34 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/Tablet.java @@ -0,0 +1,37 @@ +package com.datastax.oss.driver.api.core.metadata; + +import com.datastax.oss.driver.shaded.guava.common.annotations.Beta; +import java.util.Set; + +/** + * Represents a tablet as described in tablets-routing-v1 protocol extension with some additional + * fields for ease of use. + */ +@Beta +public interface Tablet extends Comparable { + /** + * Returns left endpoint of an interval. This interval is left-open, meaning the tablet does not + * own the token equal to the first token. + * + * @return {@code long} value representing first token. + */ + public long getFirstToken(); + + /** + * Returns right endpoint of an interval. This interval is right-closed, which means that last + * token is owned by this tablet. + * + * @return {@code long} value representing last token. + */ + public long getLastToken(); + + public Set getReplicaNodes(); + + /** + * Looks up the shard number for specific replica Node. + * + * @param node one of the replica nodes of this tablet. + * @return Shard number for the replica or -1 if no such Node found. + */ + public int getShardForNode(Node node); +} diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/metadata/TabletMap.java b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/TabletMap.java new file mode 100644 index 00000000000..9df8c629779 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/api/core/metadata/TabletMap.java @@ -0,0 +1,37 @@ +package com.datastax.oss.driver.api.core.metadata; + +import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.shaded.guava.common.annotations.Beta; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListSet; + +/** Holds all currently known tablet metadata. */ +@Beta +public interface TabletMap { + /** + * Returns mapping from tables to the sets of their tablets. + * + * @return the Map keyed by (keyspace,table) pairs with Set of tablets as value type. + */ + public ConcurrentMap> getMapping(); + + /** + * Adds a single tablet to the map. Handles removal of overlapping tablets. + * + * @param keyspace target keyspace + * @param table target table + * @param tablet tablet instance to add + */ + public void addTablet(CqlIdentifier keyspace, CqlIdentifier table, Tablet tablet); + + /** + * Returns {@link Tablet} instance + * + * @param keyspace tablet's keyspace + * @param table tablet's table + * @param token target token + * @return {@link Tablet} responsible for provided token or {@code null} if no such tablet is + * present. + */ + public Tablet getTablet(CqlIdentifier keyspace, CqlIdentifier table, long token); +} diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/session/Request.java b/core/src/main/java/com/datastax/oss/driver/api/core/session/Request.java index d6139defa44..3d591387e7b 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/session/Request.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/session/Request.java @@ -95,6 +95,17 @@ public interface Request { @Nullable CqlIdentifier getRoutingKeyspace(); + /** + * The table to use for tablet-aware routing. Infers the table from available ColumnDefinitions or + * {@code null} if it is not possible. + * + * @return + */ + @Nullable + default CqlIdentifier getRoutingTable() { + return null; + } + /** * The partition key to use for token-aware routing. * diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java index 9570e960536..b5a677ff3c4 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java @@ -43,6 +43,7 @@ import com.datastax.oss.driver.internal.core.protocol.SegmentToFrameDecoder; import com.datastax.oss.driver.internal.core.protocol.ShardingInfo; import com.datastax.oss.driver.internal.core.protocol.ShardingInfo.ConnectionShardingInfo; +import com.datastax.oss.driver.internal.core.protocol.TabletInfo; import com.datastax.oss.driver.internal.core.util.ProtocolUtils; import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions; import com.datastax.oss.protocol.internal.Message; @@ -94,6 +95,7 @@ class ProtocolInitHandler extends ConnectInitHandler { private ChannelHandlerContext ctx; private final boolean querySupportedOptions; private LwtInfo lwtInfo; + private TabletInfo tabletInfo; /** * @param querySupportedOptions whether to send OPTIONS as the first message, to request which @@ -191,6 +193,9 @@ Message getRequest() { if (lwtInfo != null) { lwtInfo.addOption(startupOptions); } + if (tabletInfo != null && tabletInfo.isEnabled()) { + TabletInfo.addOption(startupOptions); + } return request = new Startup(startupOptions); case GET_CLUSTER_NAME: return request = CLUSTER_NAME_QUERY; @@ -230,6 +235,7 @@ void onResponse(Message response) { if (lwtInfo != null) { channel.attr(LWT_INFO_KEY).set(lwtInfo); } + tabletInfo = TabletInfo.parseTabletInfo(res.options); step = Step.STARTUP; send(); } else if (step == Step.STARTUP && response instanceof Ready) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java index bb2f4ebcb84..fcddce6dcce 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java @@ -34,6 +34,8 @@ import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.cql.Statement; import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.Tablet; +import com.datastax.oss.driver.api.core.metadata.TabletMap; import com.datastax.oss.driver.api.core.metadata.TokenMap; import com.datastax.oss.driver.api.core.metadata.token.Partitioner; import com.datastax.oss.driver.api.core.metadata.token.Token; @@ -59,8 +61,10 @@ import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.metadata.DefaultNode; import com.datastax.oss.driver.internal.core.metadata.token.DefaultTokenMap; +import com.datastax.oss.driver.internal.core.metadata.token.TokenLong64; import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater; import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater; +import com.datastax.oss.driver.internal.core.protocol.TabletInfo; import com.datastax.oss.driver.internal.core.session.DefaultSession; import com.datastax.oss.driver.internal.core.session.RepreparePayload; import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker; @@ -284,6 +288,51 @@ private Token getRoutingToken(Statement statement) { return tokenMap == null ? null : ((DefaultTokenMap) tokenMap).getTokenFactory().hash(key); } + public Integer getShardFromTabletMap(Statement statement, Node node, Token token) { + TabletMap tabletMap = context.getMetadataManager().getMetadata().getTabletMap(); + if (!(token instanceof TokenLong64)) { + LOG.trace( + "Token ({}) is not a TokenLong64. Not performing tablet shard lookup for statement {}.", + token, + statement); + return null; + } + CqlIdentifier statementKeyspace = statement.getKeyspace(); + if (statementKeyspace == null) { + statementKeyspace = statement.getRoutingKeyspace(); + } + if (statementKeyspace == null) { + statementKeyspace = this.keyspace; + } + CqlIdentifier statementTable = statement.getRoutingTable(); + if (statementKeyspace == null || statementTable == null) { + return null; + } + long tokenValue = ((TokenLong64) token).getValue(); + + Tablet targetTablet = tabletMap.getTablet(statementKeyspace, statementTable, tokenValue); + if (targetTablet == null) { + LOG.trace( + "Could not determine shard for token {} and table {}.{} on Node {}: Could not find corresponding tablet, returning null.", + token, + statementKeyspace, + statementTable, + node); + return null; + } + int shard = targetTablet.getShardForNode(node); + if (shard == -1) { + LOG.trace( + "Could not find shard corresponding to token {} and Node {} for table {} in keyspace {}. Returning null.", + token, + node, + statementTable, + statementKeyspace); + return null; + } + return shard; + } + /** * Sends the request to the next available node. * @@ -309,9 +358,20 @@ private void sendRequest( Node node = retriedNode; DriverChannel channel = null; if (node == null - || (channel = session.getChannel(node, logPrefix, getRoutingToken(statement))) == null) { + || (channel = + session.getChannel( + node, + logPrefix, + getRoutingToken(statement), + getShardFromTabletMap(statement, node, getRoutingToken(statement)))) + == null) { while (!result.isDone() && (node = queryPlan.poll()) != null) { - channel = session.getChannel(node, logPrefix, getRoutingToken(statement)); + channel = + session.getChannel( + node, + logPrefix, + getRoutingToken(statement), + getShardFromTabletMap(statement, node, getRoutingToken(statement))); if (channel != null) { break; } else { @@ -420,6 +480,18 @@ private void setFinalResult( totalLatencyNanos, TimeUnit.NANOSECONDS); } + if (resultSet.getColumnDefinitions().size() > 0 + && resultSet + .getExecutionInfo() + .getIncomingPayload() + .containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)) { + context + .getMetadataManager() + .addTabletFromPayload( + resultSet.getColumnDefinitions().get(0).getKeyspace(), + resultSet.getColumnDefinitions().get(0).getTable(), + resultSet.getExecutionInfo().getIncomingPayload()); + } } // log the warnings if they have NOT been disabled if (!executionInfo.getWarnings().isEmpty() diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatement.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatement.java index 0068d3c8b98..3b14e0a24a5 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatement.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatement.java @@ -472,6 +472,17 @@ public CqlIdentifier getRoutingKeyspace() { return null; } + @Override + public CqlIdentifier getRoutingTable() { + for (BatchableStatement statement : statements) { + CqlIdentifier ks = statement.getRoutingTable(); + if (ks != null) { + return ks; + } + } + return null; + } + @NonNull @Override public BatchStatement setRoutingKeyspace(CqlIdentifier newRoutingKeyspace) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBoundStatement.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBoundStatement.java index c815be00263..036be4a16c7 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBoundStatement.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBoundStatement.java @@ -301,6 +301,12 @@ public CqlIdentifier getRoutingKeyspace() { } } + @Override + public CqlIdentifier getRoutingTable() { + ColumnDefinitions definitions = preparedStatement.getVariableDefinitions(); + return (definitions.size() == 0) ? null : definitions.get(0).getTable(); + } + @NonNull @Override public BoundStatement setRoutingKeyspace(@Nullable CqlIdentifier newRoutingKeyspace) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java index b3a71827edc..2c52941f139 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java @@ -32,6 +32,8 @@ import com.datastax.oss.driver.api.core.loadbalancing.NodeDistanceEvaluator; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.metadata.NodeState; +import com.datastax.oss.driver.api.core.metadata.Tablet; +import com.datastax.oss.driver.api.core.metadata.TabletMap; import com.datastax.oss.driver.api.core.metadata.TokenMap; import com.datastax.oss.driver.api.core.metadata.token.Partitioner; import com.datastax.oss.driver.api.core.metadata.token.Token; @@ -45,6 +47,7 @@ import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.MultiDcNodeSet; import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.NodeSet; import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.SingleDcNodeSet; +import com.datastax.oss.driver.internal.core.metadata.token.TokenLong64; import com.datastax.oss.driver.internal.core.util.ArrayUtils; import com.datastax.oss.driver.internal.core.util.collection.CompositeQueryPlan; import com.datastax.oss.driver.internal.core.util.collection.LazyQueryPlan; @@ -285,17 +288,17 @@ protected Set getReplicas(@Nullable Request request, @Nullable Session ses } Optional maybeTokenMap = context.getMetadataManager().getMetadata().getTokenMap(); - if (!maybeTokenMap.isPresent()) { - return Collections.emptySet(); - } + TabletMap tabletMap = context.getMetadataManager().getMetadata().getTabletMap(); // Note: we're on the hot path and the getXxx methods are potentially more than simple getters, // so we only call each method when strictly necessary (which is why the code below looks a bit // weird). CqlIdentifier keyspace; + CqlIdentifier table; Token token; ByteBuffer key; - Partitioner partitioner = null; + Partitioner partitioner; + try { keyspace = request.getKeyspace(); if (keyspace == null) { @@ -308,6 +311,8 @@ protected Set getReplicas(@Nullable Request request, @Nullable Session ses return Collections.emptySet(); } + table = request.getRoutingTable(); + token = request.getRoutingToken(); key = (token == null) ? request.getRoutingKey() : null; if (token == null && key == null) { @@ -321,6 +326,27 @@ protected Set getReplicas(@Nullable Request request, @Nullable Session ses return Collections.emptySet(); } + if (table != null) { + if (token == null) { + if (partitioner != null) { + token = partitioner.hash(key); + } + } + if (token instanceof TokenLong64) { + Tablet targetTablet = + tabletMap.getTablet(keyspace, table, ((TokenLong64) token).getValue()); + if (targetTablet != null) { + Set replicas = targetTablet.getReplicaNodes(); + if (!replicas.isEmpty()) { + return replicas; + } + } + } + } + + if (!maybeTokenMap.isPresent()) { + return Collections.emptySet(); + } TokenMap tokenMap = maybeTokenMap.get(); return token != null ? tokenMap.getReplicas(keyspace, token) diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/AddTabletRefresh.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/AddTabletRefresh.java new file mode 100644 index 00000000000..92f6463f60e --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/AddTabletRefresh.java @@ -0,0 +1,26 @@ +package com.datastax.oss.driver.internal.core.metadata; + +import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.api.core.metadata.Tablet; +import com.datastax.oss.driver.internal.core.context.InternalDriverContext; + +/** Updates tablet metadata by adding provided Tablet to the TabletMap. */ +public class AddTabletRefresh implements MetadataRefresh { + + final CqlIdentifier keyspace; + final CqlIdentifier table; + final Tablet tablet; + + public AddTabletRefresh(CqlIdentifier keyspace, CqlIdentifier table, Tablet tablet) { + this.keyspace = keyspace; + this.table = table; + this.tablet = tablet; + } + + @Override + public Result compute( + DefaultMetadata oldMetadata, boolean tokenMapEnabled, InternalDriverContext context) { + oldMetadata.tabletMap.addTablet(keyspace, table, tablet); + return new Result(oldMetadata); + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultMetadata.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultMetadata.java index c34486029fe..c334a5b931d 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultMetadata.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultMetadata.java @@ -18,6 +18,7 @@ import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.metadata.Metadata; import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.TabletMap; import com.datastax.oss.driver.api.core.metadata.TokenMap; import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; @@ -46,22 +47,34 @@ public class DefaultMetadata implements Metadata { private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadata.class); public static DefaultMetadata EMPTY = - new DefaultMetadata(Collections.emptyMap(), Collections.emptyMap(), null, null); + new DefaultMetadata( + Collections.emptyMap(), Collections.emptyMap(), null, null, DefaultTabletMap.emptyMap()); protected final Map nodes; protected final Map keyspaces; protected final TokenMap tokenMap; protected final String clusterName; + protected final TabletMap tabletMap; protected DefaultMetadata( Map nodes, Map keyspaces, TokenMap tokenMap, String clusterName) { + this(nodes, keyspaces, tokenMap, clusterName, DefaultTabletMap.emptyMap()); + } + + protected DefaultMetadata( + Map nodes, + Map keyspaces, + TokenMap tokenMap, + String clusterName, + TabletMap tabletMap) { this.nodes = nodes; this.keyspaces = keyspaces; this.tokenMap = tokenMap; this.clusterName = clusterName; + this.tabletMap = tabletMap; } @NonNull @@ -82,6 +95,11 @@ public Optional getTokenMap() { return Optional.ofNullable(tokenMap); } + @Override + public TabletMap getTabletMap() { + return tabletMap; + } + @NonNull @Override public Optional getClusterName() { @@ -91,6 +109,8 @@ public Optional getClusterName() { /** * Refreshes the current metadata with the given list of nodes. * + *

Does not rebuild the tablet map. + * * @param tokenMapEnabled whether to rebuild the token map or not; if this is {@code false} the * current token map will be copied into the new metadata without being recomputed. * @param tokensChanged whether we observed a change of tokens for at least one node. This will @@ -114,7 +134,19 @@ public DefaultMetadata withNodes( this.keyspaces, rebuildTokenMap( newNodes, keyspaces, tokenMapEnabled, forceFullRebuild, tokenFactory, context), - context.getChannelFactory().getClusterName()); + context.getChannelFactory().getClusterName(), + this.tabletMap); + } + + /** + * Refreshes the current metadata with new TabletMap. + * + * @param newTabletMap replacement TabletMap. + * @return new metadata. + */ + public DefaultMetadata withTabletMap(TabletMap newTabletMap) { + return new DefaultMetadata( + this.nodes, this.keyspaces, this.tokenMap, this.clusterName, newTabletMap); } public DefaultMetadata withSchema( @@ -125,7 +157,8 @@ public DefaultMetadata withSchema( this.nodes, ImmutableMap.copyOf(newKeyspaces), rebuildTokenMap(nodes, newKeyspaces, tokenMapEnabled, false, null, context), - context.getChannelFactory().getClusterName()); + context.getChannelFactory().getClusterName(), + this.tabletMap); } @Nullable diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTabletMap.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTabletMap.java new file mode 100644 index 00000000000..5e4b6418cc8 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTabletMap.java @@ -0,0 +1,245 @@ +package com.datastax.oss.driver.internal.core.metadata; + +import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.api.core.data.TupleValue; +import com.datastax.oss.driver.api.core.metadata.KeyspaceTableNamePair; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.Tablet; +import com.datastax.oss.driver.api.core.metadata.TabletMap; +import com.datastax.oss.driver.shaded.guava.common.annotations.Beta; +import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Holds currently known tablet mappings. Updated lazily through received custom payloads described + * in Scylla's CQL protocol extensions (tablets-routing-v1). + * + *

Thread-safety notes: This class uses ConcurrentMap and ConcurrentSkipListSet underneath. It is + * safe to have multiple threads accessing it. However, multiple modifications i.e. multiple calls + * of {@link DefaultTabletMap#addTablet(CqlIdentifier, CqlIdentifier, Tablet) will race with each + * other. This may result in unexpected state of this structure when used in a vacuum. For example + * it may end up containing overlapping tablet ranges.

In actual use by the driver {@link + * MetadataManager} solves this by running modifications sequentially. It schedules them on {@link + * MetadataManager#adminExecutor}}'s thread. + */ +@Beta +public class DefaultTabletMap implements TabletMap { + private static final Logger LOG = LoggerFactory.getLogger(DefaultTabletMap.class); + + @NonNull + private final ConcurrentMap> mapping; + + private DefaultTabletMap( + @NonNull ConcurrentMap> mapping) { + this.mapping = mapping; + } + + public static DefaultTabletMap emptyMap() { + return new DefaultTabletMap(new ConcurrentHashMap<>()); + } + + @Override + @NonNull + public ConcurrentMap> getMapping() { + return mapping; + } + + @Override + public Tablet getTablet(CqlIdentifier keyspace, CqlIdentifier table, long token) { + KeyspaceTableNamePair key = new KeyspaceTableNamePair(keyspace, table); + NavigableSet set = mapping.get(key); + if (set == null) { + LOG.trace( + "There is no tablets for {}.{} in current mapping. Returning null.", keyspace, table); + return null; + } + Tablet result = mapping.get(key).ceiling(DefaultTablet.malformedTablet(token)); + if (result == null || result.getFirstToken() >= token) { + LOG.trace( + "Could not find tablet for {}.{} that owns token {}. Returning null.", + keyspace, + table, + token); + return null; + } + return result; + } + + @Override + public void addTablet(CqlIdentifier keyspace, CqlIdentifier table, Tablet tablet) { + LOG.trace("Adding tablet for {}.{} with contents {}", keyspace, table, tablet); + KeyspaceTableNamePair ktPair = new KeyspaceTableNamePair(keyspace, table); + + // Get existing tablets for given table + NavigableSet existingTablets = + mapping.computeIfAbsent(ktPair, k -> new ConcurrentSkipListSet<>()); + // Single tablet token range is represented by (firstToken, lastToken] interval + // We need to do two sweeps: remove overlapping tablets by looking at lastToken of existing + // tablets + // and then by looking at firstToken of existing tablets. Currently, the tablets are sorted + // according + // to their lastTokens. + + // First sweep: remove all tablets whose lastToken is inside this interval + Iterator it = existingTablets.headSet(tablet, true).descendingIterator(); + while (it.hasNext()) { + Tablet nextTablet = it.next(); + if (nextTablet.getLastToken() <= tablet.getFirstToken()) { + break; + } + it.remove(); + } + + // Second sweep: remove all tablets that have their lastToken greater than that of + // the tablet that is being added AND their firstToken is smaller than lastToken of new + // addition. + // After the first sweep, this theoretically should remove at most 1 tablet. + it = existingTablets.tailSet(tablet, true).iterator(); + while (it.hasNext()) { + Tablet nextTablet = it.next(); + if (nextTablet.getFirstToken() >= tablet.getLastToken()) { + break; + } + it.remove(); + } + + // Add new (now) non-overlapping tablet + existingTablets.add(tablet); + } + + /** + * Represents a single tablet created from tablets-routing-v1 custom payload. Its {@code + * compareTo} implementation intentionally relies solely on {@code lastToken} in order to allow + * for quick lookup on sorted Collections based just on the token value. Its token range is + * (firstToken, lastToken], meaning firstToken is not included. + */ + public static class DefaultTablet implements Tablet { + private final long firstToken; + private final long lastToken; + @NonNull private final Set replicaNodes; + @NonNull private final Map replicaShards; + + @VisibleForTesting + DefaultTablet( + long firstToken, + long lastToken, + @NonNull Set replicaNodes, + @NonNull Map replicaShards) { + this.firstToken = firstToken; + this.lastToken = lastToken; + this.replicaNodes = replicaNodes; + this.replicaShards = replicaShards; + } + + /** + * Creates a new instance of DefaultTablet based on provided decoded payload. + * + * @param tupleValue Decoded tablets-routing-v1 payload + * @param nodes Mapping of UUIDs to Node instances. + * @return the new DefaultTablet + */ + public static DefaultTablet parseTabletPayloadV1(TupleValue tupleValue, Map nodes) { + + long firstToken = tupleValue.getLong(0); + long lastToken = tupleValue.getLong(1); + + Set replicaNodes = new HashSet<>(); + Map replicaShards = new HashMap<>(); + List list = tupleValue.getList(2, TupleValue.class); + assert list != null; + for (TupleValue tuple : list) { + Node node = nodes.get(tuple.getUuid(0)); + if (node != null) { + int shard = tuple.getInt(1); + replicaNodes.add(node); + replicaShards.put(node, shard); + } + } + + return new DefaultTablet(firstToken, lastToken, replicaNodes, replicaShards); + } + + /** + * Creates a {@link DefaultTablet} instance with given {@code lastToken}, identical {@code + * firstToken} and unspecified other fields. Used for lookup of sorted collections of proper + * {@link DefaultTablet}. + * + * @param lastToken + * @return New {@link DefaultTablet} object + */ + public static DefaultTablet malformedTablet(long lastToken) { + return new DefaultTablet( + lastToken, lastToken, Collections.emptySet(), Collections.emptyMap()); + } + + @Override + public long getFirstToken() { + return firstToken; + } + + @Override + public long getLastToken() { + return lastToken; + } + + @Override + public Set getReplicaNodes() { + return replicaNodes; + } + + @Override + public int getShardForNode(Node n) { + return replicaShards.getOrDefault(n, -1); + } + + @Override + public String toString() { + return "DefaultTablet{" + + "firstToken=" + + firstToken + + ", lastToken=" + + lastToken + + ", replicaNodes=" + + replicaNodes + + ", replicaShards=" + + replicaShards + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof DefaultTablet)) return false; + DefaultTablet that = (DefaultTablet) o; + return firstToken == that.firstToken + && lastToken == that.lastToken + && replicaNodes.equals(that.replicaNodes) + && replicaShards.equals(that.replicaShards); + } + + @Override + public int hashCode() { + return Objects.hash(firstToken, lastToken, replicaNodes, replicaShards); + } + + @Override + public int compareTo(Tablet tablet) { + return Long.compare(this.lastToken, tablet.getLastToken()); + } + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java index 0f28d354c46..97332ce5be1 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java @@ -16,11 +16,17 @@ package com.datastax.oss.driver.internal.core.metadata; import com.datastax.oss.driver.api.core.AsyncAutoCloseable; +import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.data.TupleValue; import com.datastax.oss.driver.api.core.metadata.EndPoint; import com.datastax.oss.driver.api.core.metadata.Metadata; import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.Tablet; +import com.datastax.oss.driver.api.core.type.DataTypes; +import com.datastax.oss.driver.api.core.type.TupleType; +import com.datastax.oss.driver.api.core.type.codec.TypeCodec; import com.datastax.oss.driver.internal.core.config.ConfigChangeEvent; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.control.ControlConnection; @@ -29,6 +35,7 @@ import com.datastax.oss.driver.internal.core.metadata.schema.queries.SchemaQueriesFactory; import com.datastax.oss.driver.internal.core.metadata.schema.queries.SchemaRows; import com.datastax.oss.driver.internal.core.metadata.schema.refresh.SchemaRefresh; +import com.datastax.oss.driver.internal.core.protocol.TabletInfo; import com.datastax.oss.driver.internal.core.util.Loggers; import com.datastax.oss.driver.internal.core.util.NanoTime; import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures; @@ -39,8 +46,10 @@ import edu.umd.cs.findbugs.annotations.NonNull; import io.netty.util.concurrent.EventExecutor; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -71,6 +80,7 @@ public class MetadataManager implements AsyncAutoCloseable { private volatile boolean tokenMapEnabled; private volatile Set contactPoints; private volatile boolean wasImplicitContactPoint; + private volatile TypeCodec tabletPayloadCodec = null; public MetadataManager(InternalDriverContext context) { this(context, DefaultMetadata.EMPTY); @@ -492,6 +502,10 @@ private Metadata parseAndApplySchemaRows(SchemaRows schemaRows) { return metadata; } + private void addTablet(CqlIdentifier keyspace, CqlIdentifier table, Tablet tablet) { + apply(new AddTabletRefresh(keyspace, table, tablet)); + } + private void close() { if (closeWasCalled) { return; @@ -520,4 +534,39 @@ Void apply(MetadataRefresh refresh) { } return null; } + + private TypeCodec getTabletPayloadCodec() { + if (tabletPayloadCodec == null) { + TupleType payloadOuterTuple = + DataTypes.tupleOf( + DataTypes.BIGINT, + DataTypes.BIGINT, + DataTypes.listOf(DataTypes.tupleOf(DataTypes.UUID, DataTypes.INT))); + tabletPayloadCodec = context.getCodecRegistry().codecFor(payloadOuterTuple); + } + return tabletPayloadCodec; + } + + public void addTabletFromPayload( + CqlIdentifier keyspace, + CqlIdentifier table, + @NonNull Map incomingPayload) { + // Assumes payload is present + TupleValue tabletTuple = + getTabletPayloadCodec() + .decode( + incomingPayload.get(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY), + context.getProtocolVersion()); + if (tabletTuple == null) { + LOG.warn( + "Custom payload containing tablet information for table {}.{} decoded to null. This should not ever " + + "happen.", + keyspace, + table); + return; + } + DefaultTabletMap.DefaultTablet tabletToAdd = + DefaultTabletMap.DefaultTablet.parseTabletPayloadV1(tabletTuple, getMetadata().getNodes()); + RunOrSchedule.on(adminExecutor, () -> singleThreaded.addTablet(keyspace, table, tabletToAdd)); + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/RemoveNodeRefresh.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/RemoveNodeRefresh.java index 5135d04fda4..f7a4c776559 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/RemoveNodeRefresh.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/RemoveNodeRefresh.java @@ -65,8 +65,10 @@ public Result compute( return new Result(oldMetadata); } else { LOG.debug("[{}] Removing node {}", logPrefix, removedNode); + LOG.debug("[{}] Tablet metadata will be wiped and rebuilt due to node removal.", logPrefix); + DefaultMetadata newerMetadata = oldMetadata.withTabletMap(DefaultTabletMap.emptyMap()); return new Result( - oldMetadata.withNodes(newNodesBuilder.build(), tokenMapEnabled, false, null, context), + newerMetadata.withNodes(newNodesBuilder.build(), tokenMapEnabled, false, null, context), ImmutableList.of(NodeStateEvent.removed((DefaultNode) removedNode))); } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java b/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java index 48818554f3f..a6cd87925f3 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java @@ -157,7 +157,7 @@ public boolean isInvalidKeyspace() { * to the caller to fail fast and move to the next node. */ public DriverChannel next() { - return next(null); + return next(null, null); } /** @@ -167,7 +167,7 @@ public DriverChannel next() { * to the caller to fail fast and move to the next node. *

There is no need to return the channel. */ - public DriverChannel next(@Nullable Token routingKey) { + public DriverChannel next(@Nullable Token routingKey, @Nullable Integer shardSuggestion) { if (!singleThreaded.initialized) { return null; } @@ -175,10 +175,20 @@ public DriverChannel next(@Nullable Token routingKey) { return channels[0].next(); } - int shardId = - routingKey != null - ? singleThreaded.shardingInfo.shardId(routingKey) - : ThreadLocalRandom.current().nextInt(channels.length); + int shardId = -1; + if (shardSuggestion != null) { + if (shardSuggestion >= channels.length) { + LOG.warn("Shard suggestion is out of channels array bounds. Ignoring."); + } else { + shardId = shardSuggestion; + } + } + if (shardId == -1) { + shardId = + routingKey != null + ? singleThreaded.shardingInfo.shardId(routingKey) + : ThreadLocalRandom.current().nextInt(channels.length); + } if (channels[shardId].size() > 0) { return channels[shardId].next(); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/TabletInfo.java b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/TabletInfo.java new file mode 100644 index 00000000000..8c33e803fd5 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/TabletInfo.java @@ -0,0 +1,33 @@ +package com.datastax.oss.driver.internal.core.protocol; + +import java.util.List; +import java.util.Map; + +public class TabletInfo { + private static final String SCYLLA_TABLETS_STARTUP_OPTION_KEY = "TABLETS_ROUTING_V1"; + private static final String SCYLLA_TABLETS_STARTUP_OPTION_VALUE = ""; + public static final String TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY = "tablets-routing-v1"; + + private boolean enabled = false; + + private TabletInfo(boolean enabled) { + this.enabled = enabled; + } + + // Currently pertains only to TABLETS_ROUTING_V1 + public boolean isEnabled() { + return enabled; + } + + public static TabletInfo parseTabletInfo(Map> supported) { + List values = supported.get(SCYLLA_TABLETS_STARTUP_OPTION_KEY); + return new TabletInfo( + values != null + && values.size() == 1 + && values.get(0).equals(SCYLLA_TABLETS_STARTUP_OPTION_VALUE)); + } + + public static void addOption(Map options) { + options.put(SCYLLA_TABLETS_STARTUP_OPTION_KEY, SCYLLA_TABLETS_STARTUP_OPTION_VALUE); + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java b/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java index 647b75892ad..5a80d5fb5d4 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java @@ -239,18 +239,27 @@ public ResultT execute( @Nullable public DriverChannel getChannel(@NonNull Node node, @NonNull String logPrefix) { - return getChannel(node, logPrefix, null); + return getChannel(node, logPrefix, null, null); } @Nullable public DriverChannel getChannel( @NonNull Node node, @NonNull String logPrefix, @Nullable Token routingKey) { + return getChannel(node, logPrefix, routingKey, null); + } + + @Nullable + public DriverChannel getChannel( + @NonNull Node node, + @NonNull String logPrefix, + @Nullable Token routingKey, + @Nullable Integer shardSuggestion) { ChannelPool pool = poolManager.getPools().get(node); if (pool == null) { LOG.trace("[{}] No pool to {}, skipping", logPrefix, node); return null; } else { - DriverChannel channel = pool.next(routingKey); + DriverChannel channel = pool.next(routingKey, shardSuggestion); if (channel == null) { LOG.trace("[{}] Pool returned no channel for {}, skipping", logPrefix, node); return null; diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/RequestHandlerTestHarness.java b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/RequestHandlerTestHarness.java index 65fe4a405f2..815fb69f6b3 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/RequestHandlerTestHarness.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/RequestHandlerTestHarness.java @@ -50,6 +50,7 @@ import com.datastax.oss.driver.internal.core.context.NettyOptions; import com.datastax.oss.driver.internal.core.metadata.DefaultMetadata; import com.datastax.oss.driver.internal.core.metadata.LoadBalancingPolicyWrapper; +import com.datastax.oss.driver.internal.core.metadata.MetadataManager; import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater; import com.datastax.oss.driver.internal.core.pool.ChannelPool; import com.datastax.oss.driver.internal.core.servererrors.DefaultWriteTypeRegistry; @@ -98,6 +99,7 @@ public static Builder builder() { @Mock protected TimestampGenerator timestampGenerator; @Mock protected ProtocolVersionRegistry protocolVersionRegistry; @Mock protected SessionMetricUpdater sessionMetricUpdater; + @Mock protected MetadataManager metadataManager; protected RequestHandlerTestHarness(Builder builder) { MockitoAnnotations.initMocks(this); @@ -139,8 +141,17 @@ protected RequestHandlerTestHarness(Builder builder) { when(timestampGenerator.next()).thenReturn(Statement.NO_DEFAULT_TIMESTAMP); when(context.getTimestampGenerator()).thenReturn(timestampGenerator); + when(context.getMetadataManager()).thenReturn(metadataManager); + when(metadataManager.getMetadata()).thenReturn(DefaultMetadata.EMPTY); pools = builder.buildMockPools(); + // Call variation introduced with Tablets (shardSuggestion field) + when(session.getChannel(any(Node.class), anyString(), any(), any())) + .thenAnswer( + invocation -> { + Node node = invocation.getArgument(0); + return pools.get(node).next(); + }); when(session.getChannel(any(Node.class), anyString(), any())) .thenAnswer( invocation -> { diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTabletMapTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTabletMapTest.java new file mode 100644 index 00000000000..7253301d9b8 --- /dev/null +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTabletMapTest.java @@ -0,0 +1,115 @@ +package com.datastax.oss.driver.internal.core.metadata; + +import static org.mockito.Mockito.mock; + +import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.api.core.metadata.KeyspaceTableNamePair; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.Tablet; +import com.datastax.oss.driver.api.core.metadata.TabletMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.junit.Test; +import org.testng.Assert; + +public class DefaultTabletMapTest { + + @Test + public void should_remove_overlapping_tablets() { + TabletMap tabletMap = DefaultTabletMap.emptyMap(); + Tablet tablet1 = + new DefaultTabletMap.DefaultTablet(0, 1, Collections.emptySet(), Collections.emptyMap()); + Tablet tablet2 = + new DefaultTabletMap.DefaultTablet(1, 2, Collections.emptySet(), Collections.emptyMap()); + Tablet tablet3 = + new DefaultTabletMap.DefaultTablet(2, 3, Collections.emptySet(), Collections.emptyMap()); + Tablet tablet4 = + new DefaultTabletMap.DefaultTablet( + -100, 100, Collections.emptySet(), Collections.emptyMap()); + + Tablet tablet5 = + new DefaultTabletMap.DefaultTablet(-10, 10, Collections.emptySet(), Collections.emptyMap()); + Tablet tablet6 = + new DefaultTabletMap.DefaultTablet(9, 20, Collections.emptySet(), Collections.emptyMap()); + + KeyspaceTableNamePair key1 = + new KeyspaceTableNamePair(CqlIdentifier.fromCql("ks"), CqlIdentifier.fromCql("tab")); + + tabletMap.addTablet(CqlIdentifier.fromCql("ks"), CqlIdentifier.fromCql("tab"), tablet1); + Assert.assertEquals(tabletMap.getMapping().size(), 1); + Assert.assertEquals(tabletMap.getMapping().get(key1).size(), 1); + + tabletMap.addTablet(CqlIdentifier.fromCql("ks"), CqlIdentifier.fromCql("tab"), tablet2); + Assert.assertEquals(tabletMap.getMapping().size(), 1); + Assert.assertEquals(tabletMap.getMapping().get(key1).size(), 2); + + tabletMap.addTablet(CqlIdentifier.fromCql("ks"), CqlIdentifier.fromCql("tab"), tablet3); + Assert.assertEquals(tabletMap.getMapping().size(), 1); + Assert.assertEquals(tabletMap.getMapping().get(key1).size(), 3); + + tabletMap.addTablet(CqlIdentifier.fromCql("ks"), CqlIdentifier.fromCql("tab"), tablet4); + Assert.assertEquals(tabletMap.getMapping().size(), 1); + Assert.assertEquals(tabletMap.getMapping().get(key1).size(), 1); + + KeyspaceTableNamePair key2 = + new KeyspaceTableNamePair(CqlIdentifier.fromCql("ks"), CqlIdentifier.fromCql("tab2")); + + tabletMap.addTablet(CqlIdentifier.fromCql("ks"), CqlIdentifier.fromCql("tab2"), tablet5); + Assert.assertEquals(tabletMap.getMapping().size(), 2); + Assert.assertEquals(tabletMap.getMapping().get(key2).size(), 1); + + tabletMap.addTablet(CqlIdentifier.fromCql("ks"), CqlIdentifier.fromCql("tab2"), tablet6); + Assert.assertEquals(tabletMap.getMapping().size(), 2); + Assert.assertEquals(tabletMap.getMapping().get(key2).size(), 1); + Assert.assertTrue(tabletMap.getMapping().get(key2).contains(tablet6)); + Assert.assertEquals( + tablet6, + tabletMap.getTablet(CqlIdentifier.fromCql("ks"), CqlIdentifier.fromCql("tab2"), 10L)); + } + + @Test + public void tablet_range_should_not_include_first_token() { + TabletMap tabletMap = DefaultTabletMap.emptyMap(); + Tablet tablet1 = + new DefaultTabletMap.DefaultTablet( + -123, 123, Collections.emptySet(), Collections.emptyMap()); + tabletMap.addTablet(CqlIdentifier.fromCql("ks"), CqlIdentifier.fromCql("tab"), tablet1); + Tablet result = + tabletMap.getTablet(CqlIdentifier.fromCql("ks"), CqlIdentifier.fromCql("tab"), -123); + Assert.assertEquals(result, null); + } + + @Test + public void tablet_range_should_include_last_token() { + TabletMap tabletMap = DefaultTabletMap.emptyMap(); + Tablet tablet1 = + new DefaultTabletMap.DefaultTablet( + -123, 456, Collections.emptySet(), Collections.emptyMap()); + tabletMap.addTablet(CqlIdentifier.fromCql("ks"), CqlIdentifier.fromCql("tab"), tablet1); + Tablet result = + tabletMap.getTablet(CqlIdentifier.fromCql("ks"), CqlIdentifier.fromCql("tab"), 456); + Assert.assertEquals(result, tablet1); + } + + @Test + public void should_return_correct_shard() { + Node node1 = mock(DefaultNode.class); + Node node2 = mock(DefaultNode.class); + Set replicaNodes = new HashSet(); + replicaNodes.add(node1); + replicaNodes.add(node2); + Map replicaShards = new HashMap<>(); + replicaShards.put(node1, 1); + replicaShards.put(node2, 2); + TabletMap tabletMap = DefaultTabletMap.emptyMap(); + Tablet tablet1 = new DefaultTabletMap.DefaultTablet(-123, 456, replicaNodes, replicaShards); + tabletMap.addTablet(CqlIdentifier.fromCql("ks"), CqlIdentifier.fromCql("tab"), tablet1); + Tablet result = + tabletMap.getTablet(CqlIdentifier.fromCql("ks"), CqlIdentifier.fromCql("tab"), 456); + Assert.assertEquals(result.getShardForNode(node1), 1); + Assert.assertEquals(result.getShardForNode(node2), 2); + } +} diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java new file mode 100644 index 00000000000..32a5b2b29c2 --- /dev/null +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/DefaultMetadataTabletMapIT.java @@ -0,0 +1,151 @@ +package com.datastax.oss.driver.core.metadata; + +import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.metadata.KeyspaceTableNamePair; +import com.datastax.oss.driver.api.core.metadata.Tablet; +import com.datastax.oss.driver.api.testinfra.CassandraSkip; +import com.datastax.oss.driver.api.testinfra.ScyllaRequirement; +import com.datastax.oss.driver.api.testinfra.ccm.CustomCcmRule; +import com.datastax.oss.driver.api.testinfra.session.SessionRule; +import com.datastax.oss.driver.api.testinfra.session.SessionUtils; +import com.datastax.oss.driver.internal.core.protocol.TabletInfo; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListSet; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.RuleChain; +import org.junit.rules.TestRule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ScyllaRequirement( + minOSS = "6.0.0", + minEnterprise = "2024.2", + description = "Needs to support tablets") +@CassandraSkip(description = "Tablets are ScyllaDB-only extension") +public class DefaultMetadataTabletMapIT { + private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadataTabletMapIT.class); + private static final CustomCcmRule CCM_RULE = + CustomCcmRule.builder() + .withNodes(2) + .withCassandraConfiguration( + "experimental_features", "['consistent-topology-changes','tablets']") + .build(); + private static final SessionRule SESSION_RULE = + SessionRule.builder(CCM_RULE) + .withConfigLoader( + SessionUtils.configLoaderBuilder() + .withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(5)) + .build()) + .build(); + + @ClassRule + public static final TestRule CHAIN = RuleChain.outerRule(CCM_RULE).around(SESSION_RULE); + + private static final int INITIAL_TABLETS = 32; + private static final int QUERIES = 1600; + private static final int REPLICATION_FACTOR = 2; + private static String KEYSPACE_NAME = "tabletsTest"; + private static String TABLE_NAME = "tabletsTable"; + private static String CREATE_KEYSPACE_QUERY = + "CREATE KEYSPACE IF NOT EXISTS " + + KEYSPACE_NAME + + " WITH replication = {'class': " + + "'NetworkTopologyStrategy', " + + "'replication_factor': '" + + REPLICATION_FACTOR + + "'} AND durable_writes = true AND tablets = " + + "{'initial': " + + INITIAL_TABLETS + + "};"; + private static String CREATE_TABLE_QUERY = + "CREATE TABLE IF NOT EXISTS " + + KEYSPACE_NAME + + "." + + TABLE_NAME + + " (pk int, ck int, PRIMARY KEY(pk, ck));"; + + @Test + public void should_receive_each_tablet_exactly_once() { + CqlSession session = SESSION_RULE.session(); + + session.execute(CREATE_KEYSPACE_QUERY); + session.execute(CREATE_TABLE_QUERY); + + for (int i = 1; i <= QUERIES; i++) { + session.execute( + "INSERT INTO " + + KEYSPACE_NAME + + "." + + TABLE_NAME + + " (pk,ck) VALUES (" + + i + + "," + + i + + ");"); + } + + PreparedStatement preparedStatement = + session.prepare( + SimpleStatement.builder( + "select pk,ck from " + + KEYSPACE_NAME + + "." + + TABLE_NAME + + " WHERE pk = ? AND ck = ?") + .setTracing(true) + .build()); + // preparedStatement.enableTracing(); + int counter = 0; + for (int i = 1; i <= QUERIES; i++) { + ResultSet rs = session.execute(preparedStatement.bind(i, i).setTracing(true)); + Map payload = rs.getExecutionInfo().getIncomingPayload(); + if (payload.containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)) { + counter++; + } + } + + LOG.debug("Ran first set of queries"); + + // With enough queries we should hit a wrong node for each tablet exactly once. + Assert.assertEquals(INITIAL_TABLETS, counter); + + ConcurrentMap> tabletMapping = + session.getMetadata().getTabletMap().getMapping(); + KeyspaceTableNamePair ktPair = + new KeyspaceTableNamePair( + CqlIdentifier.fromCql(KEYSPACE_NAME), CqlIdentifier.fromCql(TABLE_NAME)); + Assert.assertTrue(tabletMapping.containsKey(ktPair)); + + Set tablets = tabletMapping.get(ktPair); + Assert.assertEquals(INITIAL_TABLETS, tablets.size()); + + for (Tablet tab : tablets) { + Assert.assertEquals(REPLICATION_FACTOR, tab.getReplicaNodes().size()); + } + + // All tablet information should be available by now (unless for some reason cluster did sth on + // its own) + // We should not receive any tablet payloads now, since they are sent only on mismatch. + for (int i = 1; i <= QUERIES; i++) { + + ResultSet rs = session.execute(preparedStatement.bind(i, i)); + Map payload = rs.getExecutionInfo().getIncomingPayload(); + + if (payload.containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)) { + throw new RuntimeException( + "Received non empty payload with tablets routing information: " + payload); + } + } + } +}