Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.datastax.oss.driver.api.core.metadata;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;

/** Simple keyspace name and table name pair. */
public class KeyspaceTableNamePair {
@NonNull private final CqlIdentifier keyspace;
@NonNull private final CqlIdentifier tableName;

public KeyspaceTableNamePair(@NonNull CqlIdentifier keyspace, @NonNull CqlIdentifier tableName) {
this.keyspace = keyspace;
this.tableName = tableName;
}

@NonNull
public CqlIdentifier getKeyspace() {
return keyspace;
}

@NonNull
public CqlIdentifier getTableName() {
return tableName;
}

@Override
public String toString() {
return "KeyspaceTableNamePair{"
+ "keyspace='"
+ keyspace
+ '\''
+ ", tableName='"
+ tableName
+ '\''
+ '}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || !(o instanceof KeyspaceTableNamePair)) return false;
KeyspaceTableNamePair that = (KeyspaceTableNamePair) o;
return keyspace.equals(that.keyspace) && tableName.equals(that.tableName);
}

@Override
public int hashCode() {
return Objects.hash(keyspace, tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ default Optional<KeyspaceMetadata> getKeyspace(@NonNull String keyspaceName) {
@NonNull
Optional<TokenMap> getTokenMap();

/**
* The tablet map for this cluster.
*
* <p>Starts as an empty map that will gradually receive updates on each query of a yet unknown
* tablet.
*/
TabletMap getTabletMap();

/**
* The cluster name to which this session is connected. The Optional returned should contain the
* value from the server for <b>system.local.cluster_name</b>.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.datastax.oss.driver.api.core.metadata;

import com.datastax.oss.driver.shaded.guava.common.annotations.Beta;
import java.util.Set;

/**
* Represents a tablet as described in tablets-routing-v1 protocol extension with some additional
* fields for ease of use.
*/
@Beta
public interface Tablet extends Comparable<Tablet> {
/**
* Returns left endpoint of an interval. This interval is left-open, meaning the tablet does not
* own the token equal to the first token.
*
* @return {@code long} value representing first token.
*/
public long getFirstToken();

/**
* Returns right endpoint of an interval. This interval is right-closed, which means that last
* token is owned by this tablet.
*
* @return {@code long} value representing last token.
*/
public long getLastToken();

public Set<Node> getReplicaNodes();

/**
* Looks up the shard number for specific replica Node.
*
* @param node one of the replica nodes of this tablet.
* @return Shard number for the replica or -1 if no such Node found.
*/
public int getShardForNode(Node node);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.datastax.oss.driver.api.core.metadata;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.shaded.guava.common.annotations.Beta;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;

/** Holds all currently known tablet metadata. */
@Beta
public interface TabletMap {
/**
* Returns mapping from tables to the sets of their tablets.
*
* @return the Map keyed by (keyspace,table) pairs with Set of tablets as value type.
*/
public ConcurrentMap<KeyspaceTableNamePair, ConcurrentSkipListSet<Tablet>> getMapping();

/**
* Adds a single tablet to the map. Handles removal of overlapping tablets.
*
* @param keyspace target keyspace
* @param table target table
* @param tablet tablet instance to add
*/
public void addTablet(CqlIdentifier keyspace, CqlIdentifier table, Tablet tablet);

/**
* Returns {@link Tablet} instance
*
* @param keyspace tablet's keyspace
* @param table tablet's table
* @param token target token
* @return {@link Tablet} responsible for provided token or {@code null} if no such tablet is
* present.
*/
public Tablet getTablet(CqlIdentifier keyspace, CqlIdentifier table, long token);
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ public interface Request {
@Nullable
CqlIdentifier getRoutingKeyspace();

/**
* The table to use for tablet-aware routing. Infers the table from available ColumnDefinitions or
* {@code null} if it is not possible.
*
* @return
*/
@Nullable
default CqlIdentifier getRoutingTable() {
return null;
}

/**
* The partition key to use for token-aware routing.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.datastax.oss.driver.internal.core.protocol.SegmentToFrameDecoder;
import com.datastax.oss.driver.internal.core.protocol.ShardingInfo;
import com.datastax.oss.driver.internal.core.protocol.ShardingInfo.ConnectionShardingInfo;
import com.datastax.oss.driver.internal.core.protocol.TabletInfo;
import com.datastax.oss.driver.internal.core.util.ProtocolUtils;
import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions;
import com.datastax.oss.protocol.internal.Message;
Expand Down Expand Up @@ -94,6 +95,7 @@ class ProtocolInitHandler extends ConnectInitHandler {
private ChannelHandlerContext ctx;
private final boolean querySupportedOptions;
private LwtInfo lwtInfo;
private TabletInfo tabletInfo;

/**
* @param querySupportedOptions whether to send OPTIONS as the first message, to request which
Expand Down Expand Up @@ -191,6 +193,9 @@ Message getRequest() {
if (lwtInfo != null) {
lwtInfo.addOption(startupOptions);
}
if (tabletInfo != null && tabletInfo.isEnabled()) {
TabletInfo.addOption(startupOptions);
}
return request = new Startup(startupOptions);
case GET_CLUSTER_NAME:
return request = CLUSTER_NAME_QUERY;
Expand Down Expand Up @@ -230,6 +235,7 @@ void onResponse(Message response) {
if (lwtInfo != null) {
channel.attr(LWT_INFO_KEY).set(lwtInfo);
}
tabletInfo = TabletInfo.parseTabletInfo(res.options);
step = Step.STARTUP;
send();
} else if (step == Step.STARTUP && response instanceof Ready) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.Tablet;
import com.datastax.oss.driver.api.core.metadata.TabletMap;
import com.datastax.oss.driver.api.core.metadata.TokenMap;
import com.datastax.oss.driver.api.core.metadata.token.Partitioner;
import com.datastax.oss.driver.api.core.metadata.token.Token;
Expand All @@ -59,8 +61,10 @@
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
import com.datastax.oss.driver.internal.core.metadata.token.DefaultTokenMap;
import com.datastax.oss.driver.internal.core.metadata.token.TokenLong64;
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
import com.datastax.oss.driver.internal.core.protocol.TabletInfo;
import com.datastax.oss.driver.internal.core.session.DefaultSession;
import com.datastax.oss.driver.internal.core.session.RepreparePayload;
import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker;
Expand Down Expand Up @@ -284,6 +288,51 @@ private Token getRoutingToken(Statement statement) {
return tokenMap == null ? null : ((DefaultTokenMap) tokenMap).getTokenFactory().hash(key);
}

public Integer getShardFromTabletMap(Statement statement, Node node, Token token) {
TabletMap tabletMap = context.getMetadataManager().getMetadata().getTabletMap();
if (!(token instanceof TokenLong64)) {
LOG.trace(
"Token ({}) is not a TokenLong64. Not performing tablet shard lookup for statement {}.",
token,
statement);
return null;
}
CqlIdentifier statementKeyspace = statement.getKeyspace();
if (statementKeyspace == null) {
statementKeyspace = statement.getRoutingKeyspace();
}
if (statementKeyspace == null) {
statementKeyspace = this.keyspace;
}
CqlIdentifier statementTable = statement.getRoutingTable();
if (statementKeyspace == null || statementTable == null) {
return null;
}
long tokenValue = ((TokenLong64) token).getValue();

Tablet targetTablet = tabletMap.getTablet(statementKeyspace, statementTable, tokenValue);
if (targetTablet == null) {
LOG.trace(
"Could not determine shard for token {} and table {}.{} on Node {}: Could not find corresponding tablet, returning null.",
token,
statementKeyspace,
statementTable,
node);
return null;
}
int shard = targetTablet.getShardForNode(node);
if (shard == -1) {
LOG.trace(
"Could not find shard corresponding to token {} and Node {} for table {} in keyspace {}. Returning null.",
token,
node,
statementTable,
statementKeyspace);
return null;
}
return shard;
}

/**
* Sends the request to the next available node.
*
Expand All @@ -309,9 +358,20 @@ private void sendRequest(
Node node = retriedNode;
DriverChannel channel = null;
if (node == null
|| (channel = session.getChannel(node, logPrefix, getRoutingToken(statement))) == null) {
|| (channel =
session.getChannel(
node,
logPrefix,
getRoutingToken(statement),
getShardFromTabletMap(statement, node, getRoutingToken(statement))))
== null) {
while (!result.isDone() && (node = queryPlan.poll()) != null) {
channel = session.getChannel(node, logPrefix, getRoutingToken(statement));
channel =
session.getChannel(
node,
logPrefix,
getRoutingToken(statement),
getShardFromTabletMap(statement, node, getRoutingToken(statement)));
if (channel != null) {
break;
} else {
Expand Down Expand Up @@ -420,6 +480,18 @@ private void setFinalResult(
totalLatencyNanos,
TimeUnit.NANOSECONDS);
}
if (resultSet.getColumnDefinitions().size() > 0
&& resultSet
.getExecutionInfo()
.getIncomingPayload()
.containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)) {
context
.getMetadataManager()
.addTabletFromPayload(
resultSet.getColumnDefinitions().get(0).getKeyspace(),
resultSet.getColumnDefinitions().get(0).getTable(),
resultSet.getExecutionInfo().getIncomingPayload());
}
}
// log the warnings if they have NOT been disabled
if (!executionInfo.getWarnings().isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,17 @@ public CqlIdentifier getRoutingKeyspace() {
return null;
}

@Override
public CqlIdentifier getRoutingTable() {
for (BatchableStatement<?> statement : statements) {
CqlIdentifier ks = statement.getRoutingTable();
if (ks != null) {
return ks;
}
}
return null;
}

@NonNull
@Override
public BatchStatement setRoutingKeyspace(CqlIdentifier newRoutingKeyspace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,12 @@ public CqlIdentifier getRoutingKeyspace() {
}
}

@Override
public CqlIdentifier getRoutingTable() {
ColumnDefinitions definitions = preparedStatement.getVariableDefinitions();
return (definitions.size() == 0) ? null : definitions.get(0).getTable();
}

@NonNull
@Override
public BoundStatement setRoutingKeyspace(@Nullable CqlIdentifier newRoutingKeyspace) {
Expand Down
Loading