Skip to content

Commit ad0b94a

Browse files
nikagradkropachev
authored andcommitted
[#596] Implementation of SCYLLA_USE_METADATA_ID protocol feature negotiation. Introducing ProtocolFeatureStore.
[#596] Initial implementation of SCYLLA_USE_METADATA_ID feature. [#596] Fixing NPE when accessing sharding info from empty ProtocolFeatureStore. [#596] Storing `ProtocolFeatureStore` in `ProtocolEncoder`/`ProtocolDecoder`. Getting read of sharing it via `Channel` attributes. [#596] Addressing missing `ProtocolFeatureStore` in a `statement` param of `executeAsync` to fix ITs. [#596] Adjusting the way feature store is handled in `DefaultResultSetFuture`. Removing feature store from its constructor. [#596] Moving channel attribute management for feature store to utility methods of `ProtocolFeatureStore`. [#596] Setting `Host`'s feature store separately from `Channel`. Code clean up Apply suggestion from @dkropachev Co-authored-by: Dmitry Kropachev <[email protected]> [#596] `ProtocolEncoder` clean up.
1 parent 819827e commit ad0b94a

17 files changed

+474
-208
lines changed

driver-core/src/main/java/com/datastax/driver/core/ArrayBackedResultSet.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ static ArrayBackedResultSet fromMessage(
6868
SessionManager session,
6969
ProtocolVersion protocolVersion,
7070
ExecutionInfo info,
71-
Statement statement) {
71+
Statement statement,
72+
ProtocolFeatureStore featureStore) {
7273

7374
switch (msg.kind) {
7475
case ROWS:
@@ -94,7 +95,8 @@ static ArrayBackedResultSet fromMessage(
9495
// CASSANDRA-10786).
9596
MD5Digest newMetadataId = r.metadata.metadataId;
9697
assert !(actualStatement instanceof BoundStatement)
97-
|| ProtocolFeature.PREPARED_METADATA_CHANGES.isSupportedBy(protocolVersion)
98+
|| ProtocolFeatures.PREPARED_METADATA_CHANGES.isSupportedBy(
99+
protocolVersion, featureStore)
98100
|| newMetadataId == null;
99101
if (newMetadataId != null) {
100102
BoundStatement bs = ((BoundStatement) actualStatement);

driver-core/src/main/java/com/datastax/driver/core/BatchStatement.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,10 @@ public int requestSizeInBytes(ProtocolVersion protocolVersion, CodecRegistry cod
228228
// overestimate by a
229229
// few bytes.
230230
size += CBUtil.sizeOfConsistencyLevel(getSerialConsistencyLevel());
231-
if (ProtocolFeature.CLIENT_TIMESTAMPS.isSupportedBy(protocolVersion)) {
231+
if (ProtocolFeatures.CLIENT_TIMESTAMPS.isSupportedBy(protocolVersion)) {
232232
size += 8; // timestamp
233233
}
234-
if (ProtocolFeature.CUSTOM_PAYLOADS.isSupportedBy(protocolVersion)
234+
if (ProtocolFeatures.CUSTOM_PAYLOADS.isSupportedBy(protocolVersion)
235235
&& getOutgoingPayload() != null) {
236236
size += CBUtil.sizeOfBytesMap(getOutgoingPayload());
237237
}

driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,8 @@ public int requestSizeInBytes(ProtocolVersion protocolVersion, CodecRegistry cod
325325
try {
326326
size +=
327327
CBUtil.sizeOfShortBytes(preparedStatement().getPreparedId().boundValuesMetadata.id.bytes);
328-
if (ProtocolFeature.PREPARED_METADATA_CHANGES.isSupportedBy(protocolVersion)) {
328+
ProtocolFeatureStore featureStore = getHost().getProtocolFeatureStore();
329+
if (ProtocolFeatures.PREPARED_METADATA_CHANGES.isSupportedBy(protocolVersion, featureStore)) {
329330
size +=
330331
CBUtil.sizeOfShortBytes(preparedStatement().getPreparedId().resultSetMetadata.id.bytes);
331332
}
@@ -353,10 +354,10 @@ public int requestSizeInBytes(ProtocolVersion protocolVersion, CodecRegistry cod
353354
size += CBUtil.sizeOfValue(getPagingState());
354355
}
355356
size += CBUtil.sizeOfConsistencyLevel(getSerialConsistencyLevel());
356-
if (ProtocolFeature.CLIENT_TIMESTAMPS.isSupportedBy(protocolVersion)) {
357+
if (ProtocolFeatures.CLIENT_TIMESTAMPS.isSupportedBy(protocolVersion)) {
357358
size += 8; // timestamp
358359
}
359-
if (ProtocolFeature.CUSTOM_PAYLOADS.isSupportedBy(protocolVersion)
360+
if (ProtocolFeatures.CUSTOM_PAYLOADS.isSupportedBy(protocolVersion)
360361
&& getOutgoingPayload() != null) {
361362
size += CBUtil.sizeOfBytesMap(getOutgoingPayload());
362363
}

driver-core/src/main/java/com/datastax/driver/core/Connection.java

Lines changed: 27 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ enum State {
155155

156156
private final AtomicReference<Owner> ownerRef = new AtomicReference<Owner>();
157157
private final ApplicationInfo applicationInfo;
158+
private ProtocolFeatureStore protocolFeatureStore;
158159

159160
/**
160161
* Create a new connection to a Cassandra node and associate it with the given pool.
@@ -449,35 +450,31 @@ private AsyncFunction<Message.Response, Void> onOptionsResponse(
449450
final ProtocolVersion protocolVersion, final Executor initExecutor) {
450451
return new AsyncFunction<Message.Response, Void>() {
451452
@Override
452-
public ListenableFuture<Void> apply(Message.Response response) throws Exception {
453+
public ListenableFuture<Void> apply(Message.Response response) {
453454
switch (response.type) {
454455
case SUPPORTED:
455-
Responses.Supported msg = (Supported) response;
456-
ShardingInfo.ConnectionShardingInfo sharding =
457-
ShardingInfo.parseShardingInfo(msg.supported);
458-
if (sharding != null) {
459-
getHost().setShardingInfo(sharding.shardingInfo);
460-
Connection.this.shardId = sharding.shardId;
456+
Supported supported = (Supported) response;
457+
protocolFeatureStore = ProtocolFeatureStore.parseSupportedOptions(supported.supported);
458+
protocolFeatureStore.storeInChannel(channel);
459+
getHost().setProtocolFeatureStore(protocolFeatureStore);
460+
461+
ShardingInfo.ConnectionShardingInfo shardingInfo =
462+
protocolFeatureStore.getConnectionShardingInfo();
463+
if (protocolFeatureStore.getConnectionShardingInfo() != null) {
464+
Connection.this.shardId = shardingInfo.shardId;
461465
if (Connection.this.requestedShardId != -1
462-
&& Connection.this.requestedShardId != sharding.shardId) {
466+
&& Connection.this.requestedShardId != shardingInfo.shardId) {
463467
logger.warn(
464468
"Advanced shard awareness: requested connection to shard {}, but connected to {}. Is there a NAT between client and server?",
465469
Connection.this.requestedShardId,
466-
sharding.shardId);
470+
shardingInfo.shardId);
467471
// Owner is a HostConnectionPool if we are using adv. shard awareness
468472
((HostConnectionPool) Connection.this.ownerRef.get())
469473
.tempBlockAdvShardAwareness(ADV_SHARD_AWARENESS_BLOCK_ON_NAT);
470474
}
471475
} else {
472-
getHost().setShardingInfo(null);
473476
Connection.this.shardId = 0;
474477
}
475-
LwtInfo lwt = LwtInfo.parseLwtInfo(msg.supported);
476-
if (lwt != null) {
477-
getHost().setLwtInfo(lwt);
478-
}
479-
TabletInfo tabletInfo = TabletInfo.parseTabletInfo(msg.supported);
480-
getHost().setTabletInfo(tabletInfo);
481478
return MoreFutures.VOID_SUCCESS;
482479
case ERROR:
483480
Responses.Error error = (Responses.Error) response;
@@ -506,20 +503,13 @@ private AsyncFunction<Void, Void> onOptionsReady(
506503
@Override
507504
public ListenableFuture<Void> apply(Void input) throws Exception {
508505
ProtocolOptions protocolOptions = factory.configuration.getProtocolOptions();
509-
Map<String, String> extraOptions = new HashMap<String, String>();
506+
Map<String, String> extraOptions = new HashMap<>();
510507
if (applicationInfo != null) {
511508
applicationInfo.addOption(extraOptions);
512509
}
513-
LwtInfo lwtInfo = getHost().getLwtInfo();
514-
if (lwtInfo != null) {
515-
lwtInfo.addOption(extraOptions);
516-
}
517-
TabletInfo tabletInfo = getHost().getTabletInfo();
518-
if (tabletInfo != null
519-
&& tabletInfo.isEnabled()
520-
&& ProtocolFeature.CUSTOM_PAYLOADS.isSupportedBy(protocolVersion)) {
521-
logger.debug("Enabling tablet support in OPTIONS message");
522-
TabletInfo.addOption(extraOptions);
510+
511+
if (protocolFeatureStore != null) {
512+
protocolFeatureStore.populateStartupOptions(protocolVersion, extraOptions);
523513
}
524514

525515
Future startupResponseFuture =
@@ -1065,6 +1055,10 @@ public int shardId() {
10651055
return shardId == null ? 0 : shardId;
10661056
}
10671057

1058+
public ProtocolFeatureStore getProtocolFeatureStore() {
1059+
return protocolFeatureStore;
1060+
}
1061+
10681062
/**
10691063
* If the connection is part of a pool, return it to the pool. The connection should generally not
10701064
* be reused after that.
@@ -1955,21 +1949,6 @@ interface DefaultResponseHandler {
19551949
}
19561950

19571951
private static class Initializer extends ChannelInitializer<SocketChannel> {
1958-
// Stateless handlers
1959-
private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder();
1960-
private static final Message.ProtocolEncoder messageEncoderV1 =
1961-
new Message.ProtocolEncoder(ProtocolVersion.V1);
1962-
private static final Message.ProtocolEncoder messageEncoderV2 =
1963-
new Message.ProtocolEncoder(ProtocolVersion.V2);
1964-
private static final Message.ProtocolEncoder messageEncoderV3 =
1965-
new Message.ProtocolEncoder(ProtocolVersion.V3);
1966-
private static final Message.ProtocolEncoder messageEncoderV4 =
1967-
new Message.ProtocolEncoder(ProtocolVersion.V4);
1968-
private static final Message.ProtocolEncoder messageEncoderV5 =
1969-
new Message.ProtocolEncoder(ProtocolVersion.V5);
1970-
private static final Message.ProtocolEncoder messageEncoderV6 =
1971-
new Message.ProtocolEncoder(ProtocolVersion.V6);
1972-
private static final Frame.Encoder frameEncoder = new Frame.Encoder();
19731952

19741953
private final ProtocolVersion protocolVersion;
19751954
private final Connection connection;
@@ -2033,7 +2012,7 @@ protected void initChannel(SocketChannel channel) throws Exception {
20332012
}
20342013

20352014
pipeline.addLast("frameDecoder", new Frame.Decoder());
2036-
pipeline.addLast("frameEncoder", frameEncoder);
2015+
pipeline.addLast("frameEncoder", new Frame.Encoder());
20372016

20382017
pipeline.addLast("framingFormatHandler", new FramingFormatHandler(connection.factory));
20392018

@@ -2046,7 +2025,7 @@ protected void initChannel(SocketChannel channel) throws Exception {
20462025
pipeline.addLast("frameCompressor", new Frame.Compressor(compressor));
20472026
}
20482027

2049-
pipeline.addLast("messageDecoder", messageDecoder);
2028+
pipeline.addLast("messageDecoder", new Message.ProtocolDecoder(null));
20502029
pipeline.addLast("messageEncoder", messageEncoderFor(protocolVersion));
20512030

20522031
pipeline.addLast("idleStateHandler", idleStateHandler);
@@ -2056,23 +2035,11 @@ protected void initChannel(SocketChannel channel) throws Exception {
20562035
nettyOptions.afterChannelInitialized(channel);
20572036
}
20582037

2059-
private Message.ProtocolEncoder messageEncoderFor(ProtocolVersion version) {
2060-
switch (version) {
2061-
case V1:
2062-
return messageEncoderV1;
2063-
case V2:
2064-
return messageEncoderV2;
2065-
case V3:
2066-
return messageEncoderV3;
2067-
case V4:
2068-
return messageEncoderV4;
2069-
case V5:
2070-
return messageEncoderV5;
2071-
case V6:
2072-
return messageEncoderV6;
2073-
default:
2074-
throw new DriverInternalError("Unsupported protocol version " + protocolVersion);
2038+
private static Message.ProtocolEncoder messageEncoderFor(ProtocolVersion version) {
2039+
if (version.toInt() > ProtocolVersion.V6.toInt()) {
2040+
throw new DriverInternalError("Unsupported protocol version " + version);
20752041
}
2042+
return new Message.ProtocolEncoder(version, null);
20762043
}
20772044
}
20782045

driver-core/src/main/java/com/datastax/driver/core/DefaultResultSetFuture.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,15 +87,29 @@ public void onSet(
8787
table,
8888
rm.getCustomPayload().get(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY));
8989
}
90+
9091
switch (rm.kind) {
9192
case SET_KEYSPACE:
9293
// propagate the keyspace change to other connections
9394
session.poolsState.setKeyspace(((Responses.Result.SetKeyspace) rm).keyspace);
94-
set(ArrayBackedResultSet.fromMessage(rm, session, protocolVersion, info, statement));
95+
set(
96+
ArrayBackedResultSet.fromMessage(
97+
rm,
98+
session,
99+
protocolVersion,
100+
info,
101+
statement,
102+
connection.getProtocolFeatureStore()));
95103
break;
96104
case SCHEMA_CHANGE:
97105
ResultSet rs =
98-
ArrayBackedResultSet.fromMessage(rm, session, protocolVersion, info, statement);
106+
ArrayBackedResultSet.fromMessage(
107+
rm,
108+
session,
109+
protocolVersion,
110+
info,
111+
statement,
112+
connection.getProtocolFeatureStore());
99113
final Cluster.Manager cluster = session.cluster.manager;
100114
if (!cluster.configuration.getQueryOptions().isMetadataEnabled()) {
101115
cluster.waitForSchemaAgreementAndSignal(connection, this, rs);
@@ -224,7 +238,14 @@ public void run() {
224238
}
225239
break;
226240
default:
227-
set(ArrayBackedResultSet.fromMessage(rm, session, protocolVersion, info, statement));
241+
set(
242+
ArrayBackedResultSet.fromMessage(
243+
rm,
244+
session,
245+
protocolVersion,
246+
info,
247+
statement,
248+
connection.getProtocolFeatureStore()));
228249
break;
229250
}
230251
break;

driver-core/src/main/java/com/datastax/driver/core/FramingFormatHandler.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.datastax.driver.core;
1717

1818
import com.datastax.driver.core.Message.Response.Type;
19+
import com.datastax.driver.core.exceptions.DriverInternalError;
1920
import io.netty.channel.ChannelHandlerContext;
2021
import io.netty.channel.ChannelPipeline;
2122
import io.netty.handler.codec.MessageToMessageDecoder;
@@ -43,8 +44,11 @@ protected void decode(ChannelHandlerContext ctx, Frame frame, List<Object> out)
4344
// By default, the pipeline is configured for legacy framing since this is the format used
4445
// by all protocol versions until handshake; after handshake however, we need to switch to
4546
// modern framing for protocol v5 and higher.
47+
ProtocolFeatureStore featureStore = ProtocolFeatureStore.loadFromChannel(ctx.channel());
4648
if (frame.header.version.compareTo(ProtocolVersion.V5) >= 0) {
4749
switchToModernFraming(ctx);
50+
} else if (featureStore != null && featureStore.isUseMetadataId()) {
51+
switchToCQL4MetadataId(ctx, frame.header.version, featureStore);
4852
}
4953
// once the handshake is successful, the framing format cannot change anymore;
5054
// we can safely remove ourselves from the pipeline.
@@ -53,6 +57,16 @@ protected void decode(ChannelHandlerContext ctx, Frame frame, List<Object> out)
5357
out.add(frame);
5458
}
5559

60+
private void switchToCQL4MetadataId(
61+
ChannelHandlerContext ctx,
62+
ProtocolVersion protocolVersion,
63+
ProtocolFeatureStore featureStore) {
64+
ChannelPipeline pipeline = ctx.pipeline();
65+
pipeline.replace("messageDecoder", "messageDecoder", new Message.ProtocolDecoder(featureStore));
66+
pipeline.replace(
67+
"messageEncoder", "messageEncoder", messageEncoderFor(protocolVersion, featureStore));
68+
}
69+
5670
private void switchToModernFraming(ChannelHandlerContext ctx) {
5771
ChannelPipeline pipeline = ctx.pipeline();
5872
SegmentCodec segmentCodec =
@@ -75,4 +89,12 @@ private void switchToModernFraming(ChannelHandlerContext ctx) {
7589
pipeline.addAfter(
7690
"bytesToSegmentDecoder", "segmentToFrameDecoder", new SegmentToFrameDecoder());
7791
}
92+
93+
private static Message.ProtocolEncoder messageEncoderFor(
94+
ProtocolVersion version, ProtocolFeatureStore protocolFeatureStore) {
95+
if (version.toInt() > ProtocolVersion.V6.toInt()) {
96+
throw new DriverInternalError("Unsupported protocol version " + version);
97+
}
98+
return new Message.ProtocolEncoder(version, protocolFeatureStore);
99+
}
78100
}

0 commit comments

Comments
 (0)