Skip to content

Commit da118b2

Browse files
committed
Tablets support
Introduces basic tablets support for version 4.x of the driver. Metadata about tablets will be kept in TabletMap that gets continuously updated through the tablets-routing-v1 extension. Each time the BoundStatement targets the wrong node and shard combination the server supporting tablets should respond with tablet metadata inside custom payload of its response. This metadata will be transparently processed and used for future queries. Tablets metadata will by enabled by default. Until now driver was using TokenMaps to choose replicas and appropriate shards. Having a token was enough information to do that. Now driver will first attempt tablet-based lookup and only after failing to find corresponding tablet it will defer to TokenMap lookup. Since to find a correct tablet besides the token we need the keyspace and table names, many of the methods were extended to also accept those as parameters. RequestHandlerTestHarness was adjusted to mock also MetadataManager. Before it used to mock only `session.getMetadata()` call but the same can be obtained by `context.getMetadataManager().getMetadata()`. Using the second method was causing test failures.
1 parent 7acacbb commit da118b2

File tree

21 files changed

+1044
-14
lines changed

21 files changed

+1044
-14
lines changed

core/src/main/java/com/datastax/oss/driver/api/core/cql/PreparedStatement.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
*/
2222
package com.datastax.oss.driver.api.core.cql;
2323

24+
import com.datastax.oss.driver.api.core.CqlIdentifier;
2425
import com.datastax.oss.driver.api.core.CqlSession;
2526
import com.datastax.oss.driver.api.core.DefaultProtocolVersion;
2627
import com.datastax.oss.driver.api.core.metadata.token.Partitioner;
@@ -58,6 +59,13 @@ public interface PreparedStatement {
5859
@NonNull
5960
ColumnDefinitions getVariableDefinitions();
6061

62+
/**
63+
* Table name inferred from {@link PreparedStatement#getVariableDefinitions()} or {@code null} if
64+
* not possible.
65+
*/
66+
@Nullable
67+
CqlIdentifier inferTable();
68+
6169
/**
6270
* The partitioner to use for token-aware routing. If {@code null}, the cluster-wide partitioner
6371
* will be used.
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.datastax.oss.driver.api.core.metadata;
2+
3+
import java.util.Objects;
4+
5+
/** Simple keyspace name and table name pair. */
6+
public class KeyspaceTableNamePair {
7+
private final String keyspace;
8+
private final String tableName;
9+
10+
public KeyspaceTableNamePair(String keyspace, String tableName) {
11+
this.keyspace = keyspace;
12+
this.tableName = tableName;
13+
}
14+
15+
public String getKeyspace() {
16+
return keyspace;
17+
}
18+
19+
public String getTableName() {
20+
return tableName;
21+
}
22+
23+
@Override
24+
public String toString() {
25+
return "KeyspaceTableNamePair{"
26+
+ "keyspace='"
27+
+ keyspace
28+
+ '\''
29+
+ ", tableName='"
30+
+ tableName
31+
+ '\''
32+
+ '}';
33+
}
34+
35+
@Override
36+
public boolean equals(Object o) {
37+
if (this == o) return true;
38+
if (o == null || !(o instanceof KeyspaceTableNamePair)) return false;
39+
KeyspaceTableNamePair that = (KeyspaceTableNamePair) o;
40+
return keyspace.equals(that.keyspace) && tableName.equals(that.tableName);
41+
}
42+
43+
@Override
44+
public int hashCode() {
45+
return Objects.hash(keyspace, tableName);
46+
}
47+
}

core/src/main/java/com/datastax/oss/driver/api/core/metadata/Metadata.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,14 @@ default Optional<KeyspaceMetadata> getKeyspace(@NonNull String keyspaceName) {
115115
@NonNull
116116
Optional<TokenMap> getTokenMap();
117117

118+
/**
119+
* The tablet map for this cluster.
120+
*
121+
* <p>Starts as an empty map that will gradually receive updates on each query of a yet unknown
122+
* tablet.
123+
*/
124+
TabletMap getTabletMap();
125+
118126
/**
119127
* The cluster name to which this session is connected. The Optional returned should contain the
120128
* value from the server for <b>system.local.cluster_name</b>.
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.datastax.oss.driver.api.core.metadata;
2+
3+
/**
4+
* Simple class to hold Node representing a host and a shard number. Class itself makes no checks or
5+
* guarantees about existence of a shard on corresponding host.
6+
*/
7+
public class NodeShardPair {
8+
private final Node node;
9+
private final int shard;
10+
11+
public NodeShardPair(Node node, int shard) {
12+
this.node = node;
13+
this.shard = shard;
14+
}
15+
16+
public Node getNode() {
17+
return node;
18+
}
19+
20+
public int getShard() {
21+
return shard;
22+
}
23+
24+
@Override
25+
public String toString() {
26+
return "NodeShardPair{" + "host=" + node + ", shard=" + shard + '}';
27+
}
28+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.datastax.oss.driver.api.core.metadata;
2+
3+
import com.datastax.oss.driver.shaded.guava.common.annotations.Beta;
4+
import java.util.Set;
5+
import java.util.UUID;
6+
7+
/**
8+
* Represents a tablet as described in tablets-routing-v1 protocol extension with some additional
9+
* fields for ease of use.
10+
*/
11+
@Beta
12+
public interface Tablet extends Comparable<Tablet> {
13+
public String getKeyspaceName();
14+
15+
public UUID getTableId();
16+
17+
public String getTableName();
18+
19+
/**
20+
* Returns left endpoint of an interval. This interval is left-open, meaning the tablet does not
21+
* own the token equal to the first token.
22+
*
23+
* @return {@code long} value representing first token.
24+
*/
25+
public long getFirstToken();
26+
27+
/**
28+
* Returns right endpoint of an interval. This interval is right-closed, which means that last
29+
* token is owned by this tablet.
30+
*
31+
* @return {@code long} value representing last token.
32+
*/
33+
public long getLastToken();
34+
35+
public Set<Node> getReplicaNodes();
36+
37+
/**
38+
* Looks up the shard number for specific replica Node.
39+
*
40+
* @param node one of the replica nodes of this tablet.
41+
* @return Shard number for the replica or -1 if no such Node found.
42+
*/
43+
public int getShardForNode(Node node);
44+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.datastax.oss.driver.api.core.metadata;
2+
3+
import com.datastax.oss.driver.shaded.guava.common.annotations.Beta;
4+
import java.util.concurrent.ConcurrentMap;
5+
import java.util.concurrent.ConcurrentSkipListSet;
6+
7+
/** Holds all currently known tablet metadata. */
8+
@Beta
9+
public interface TabletMap {
10+
/**
11+
* Returns mapping from tables to the sets of their tablets.
12+
*
13+
* @return the Map keyed by (keyspace,table) pairs with Set of tablets as value type.
14+
*/
15+
public ConcurrentMap<KeyspaceTableNamePair, ConcurrentSkipListSet<Tablet>> getMapping();
16+
17+
/**
18+
* Adds a single tablet to the map. Handles removal of overlapping tablets.
19+
*
20+
* @param tablet
21+
*/
22+
public void addTablet(Tablet tablet);
23+
24+
/**
25+
* Returns {@link Tablet} instance
26+
*
27+
* @param keyspace tablet's keyspace
28+
* @param table tablet's table
29+
* @param token target token
30+
* @return {@link Tablet} responsible for provided token or {@code null} if no such tablet is
31+
* present.
32+
*/
33+
public Tablet getTablet(String keyspace, String table, long token);
34+
}

core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import com.datastax.oss.driver.internal.core.protocol.SegmentToFrameDecoder;
4444
import com.datastax.oss.driver.internal.core.protocol.ShardingInfo;
4545
import com.datastax.oss.driver.internal.core.protocol.ShardingInfo.ConnectionShardingInfo;
46+
import com.datastax.oss.driver.internal.core.protocol.TabletInfo;
4647
import com.datastax.oss.driver.internal.core.util.ProtocolUtils;
4748
import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions;
4849
import com.datastax.oss.protocol.internal.Message;
@@ -94,6 +95,7 @@ class ProtocolInitHandler extends ConnectInitHandler {
9495
private ChannelHandlerContext ctx;
9596
private final boolean querySupportedOptions;
9697
private LwtInfo lwtInfo;
98+
private TabletInfo tabletInfo;
9799

98100
/**
99101
* @param querySupportedOptions whether to send OPTIONS as the first message, to request which
@@ -191,6 +193,9 @@ Message getRequest() {
191193
if (lwtInfo != null) {
192194
lwtInfo.addOption(startupOptions);
193195
}
196+
if (tabletInfo != null && tabletInfo.isEnabled()) {
197+
TabletInfo.addOption(startupOptions);
198+
}
194199
return request = new Startup(startupOptions);
195200
case GET_CLUSTER_NAME:
196201
return request = CLUSTER_NAME_QUERY;
@@ -230,6 +235,7 @@ void onResponse(Message response) {
230235
if (lwtInfo != null) {
231236
channel.attr(LWT_INFO_KEY).set(lwtInfo);
232237
}
238+
tabletInfo = TabletInfo.parseTabletInfo(res.options);
233239
step = Step.STARTUP;
234240
send();
235241
} else if (step == Step.STARTUP && response instanceof Ready) {

core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java

Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,13 @@
3131
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
3232
import com.datastax.oss.driver.api.core.connection.FrameTooLongException;
3333
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
34+
import com.datastax.oss.driver.api.core.cql.BoundStatement;
3435
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
36+
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
3537
import com.datastax.oss.driver.api.core.cql.Statement;
3638
import com.datastax.oss.driver.api.core.metadata.Node;
39+
import com.datastax.oss.driver.api.core.metadata.Tablet;
40+
import com.datastax.oss.driver.api.core.metadata.TabletMap;
3741
import com.datastax.oss.driver.api.core.metadata.TokenMap;
3842
import com.datastax.oss.driver.api.core.metadata.token.Partitioner;
3943
import com.datastax.oss.driver.api.core.metadata.token.Token;
@@ -59,8 +63,10 @@
5963
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
6064
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
6165
import com.datastax.oss.driver.internal.core.metadata.token.DefaultTokenMap;
66+
import com.datastax.oss.driver.internal.core.metadata.token.TokenLong64;
6267
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
6368
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
69+
import com.datastax.oss.driver.internal.core.protocol.TabletInfo;
6470
import com.datastax.oss.driver.internal.core.session.DefaultSession;
6571
import com.datastax.oss.driver.internal.core.session.RepreparePayload;
6672
import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker;
@@ -284,6 +290,73 @@ private Token getRoutingToken(Statement statement) {
284290
return tokenMap == null ? null : ((DefaultTokenMap) tokenMap).getTokenFactory().hash(key);
285291
}
286292

293+
private CqlIdentifier getTabletRoutingKeyspace(Statement statement) {
294+
// Bundles keyspace checks into one function call
295+
if (statement == null) {
296+
return null;
297+
}
298+
CqlIdentifier result = statement.getKeyspace();
299+
if (result == null) {
300+
result = statement.getRoutingKeyspace();
301+
}
302+
if (result == null) {
303+
result = session.getKeyspace().orElse(null);
304+
}
305+
return result;
306+
}
307+
308+
private CqlIdentifier getTabletRoutingTable(Statement statement) {
309+
// Tries to infer target table from column definitions of this statement
310+
if (statement == null) {
311+
return null;
312+
}
313+
if (statement instanceof BoundStatement) {
314+
return ((BoundStatement) statement).getPreparedStatement().inferTable();
315+
} else if (statement instanceof PreparedStatement) {
316+
return ((PreparedStatement) statement).inferTable();
317+
} else {
318+
return null;
319+
}
320+
}
321+
322+
public Integer getShardFromTabletMap(Statement statement, Node node, Token token) {
323+
TabletMap tabletMap = context.getMetadataManager().getMetadata().getTabletMap();
324+
if (!(token instanceof TokenLong64)) {
325+
LOG.trace(
326+
"Token ({}) is not a TokenLong64. Not performing tablet shard lookup for statement {}.",
327+
token,
328+
statement);
329+
return null;
330+
}
331+
if (getTabletRoutingKeyspace(statement) == null || getTabletRoutingTable(statement) == null) {
332+
return null;
333+
}
334+
long tokenValue = ((TokenLong64) token).getValue();
335+
String statementKeyspace = getTabletRoutingKeyspace(statement).asInternal();
336+
String statementTable = getTabletRoutingTable(statement).asInternal();
337+
Tablet targetTablet = tabletMap.getTablet(statementKeyspace, statementTable, tokenValue);
338+
if (targetTablet == null) {
339+
LOG.trace(
340+
"Could not determine shard for token {} and table {}.{} on Node {}: Could not find corresponding tablet, returning null.",
341+
token,
342+
statementKeyspace,
343+
statementTable,
344+
node);
345+
return null;
346+
}
347+
int shard = targetTablet.getShardForNode(node);
348+
if (shard == -1) {
349+
LOG.trace(
350+
"Could not find shard corresponding to token {} and Node {} for table {} in keyspace {}. Returning null.",
351+
token,
352+
node,
353+
statementTable,
354+
statementKeyspace);
355+
return null;
356+
}
357+
return shard;
358+
}
359+
287360
/**
288361
* Sends the request to the next available node.
289362
*
@@ -309,9 +382,20 @@ private void sendRequest(
309382
Node node = retriedNode;
310383
DriverChannel channel = null;
311384
if (node == null
312-
|| (channel = session.getChannel(node, logPrefix, getRoutingToken(statement))) == null) {
385+
|| (channel =
386+
session.getChannel(
387+
node,
388+
logPrefix,
389+
getRoutingToken(statement),
390+
getShardFromTabletMap(statement, node, getRoutingToken(statement))))
391+
== null) {
313392
while (!result.isDone() && (node = queryPlan.poll()) != null) {
314-
channel = session.getChannel(node, logPrefix, getRoutingToken(statement));
393+
channel =
394+
session.getChannel(
395+
node,
396+
logPrefix,
397+
getRoutingToken(statement),
398+
getShardFromTabletMap(statement, node, getRoutingToken(statement)));
315399
if (channel != null) {
316400
break;
317401
} else {
@@ -420,6 +504,18 @@ private void setFinalResult(
420504
totalLatencyNanos,
421505
TimeUnit.NANOSECONDS);
422506
}
507+
if (resultSet.getColumnDefinitions().size() > 0
508+
&& resultSet
509+
.getExecutionInfo()
510+
.getIncomingPayload()
511+
.containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)) {
512+
context
513+
.getMetadataManager()
514+
.addTabletFromPayload(
515+
resultSet.getColumnDefinitions().get(0).getKeyspace(),
516+
resultSet.getColumnDefinitions().get(0).getTable(),
517+
resultSet.getExecutionInfo().getIncomingPayload());
518+
}
423519
}
424520
// log the warnings if they have NOT been disabled
425521
if (!executionInfo.getWarnings().isEmpty()

core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatement.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.datastax.oss.driver.internal.core.data.ValuesHelper;
3737
import com.datastax.oss.driver.internal.core.session.RepreparePayload;
3838
import edu.umd.cs.findbugs.annotations.NonNull;
39+
import edu.umd.cs.findbugs.annotations.Nullable;
3940
import java.nio.ByteBuffer;
4041
import java.time.Duration;
4142
import java.util.List;
@@ -140,6 +141,15 @@ public ColumnDefinitions getVariableDefinitions() {
140141
return variableDefinitions;
141142
}
142143

144+
@Nullable
145+
@Override
146+
public CqlIdentifier inferTable() {
147+
if (variableDefinitions.size() > 0) {
148+
return variableDefinitions.get(0).getTable();
149+
}
150+
return null;
151+
}
152+
143153
@Override
144154
public Partitioner getPartitioner() {
145155
return partitioner;

0 commit comments

Comments
 (0)