Skip to content

Commit e20f79c

Browse files
authored
JAVA-2922: Switch to modern framing format inside a channel handler (#1532)
1 parent 9ffe103 commit e20f79c

File tree

3 files changed

+81
-36
lines changed

3 files changed

+81
-36
lines changed

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
- [improvement] JAVA-2705: Remove protocol v5 beta status, add v6-beta.
1111
- [bug] JAVA-2923: Detect and use Guava's new HostAndPort.getHost method.
12+
- [bug] JAVA-2922: Switch to modern framing format inside a channel handler.
1213

1314
## 3.10.2
1415

driver-core/src/main/java/com/datastax/driver/core/Connection.java

Lines changed: 2 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -351,11 +351,6 @@ private AsyncFunction<Message.Response, Void> onStartupResponse(
351351
return new AsyncFunction<Message.Response, Void>() {
352352
@Override
353353
public ListenableFuture<Void> apply(Message.Response response) throws Exception {
354-
355-
if (protocolVersion.compareTo(ProtocolVersion.V5) >= 0 && response.type != ERROR) {
356-
switchToV5Framing();
357-
}
358-
359354
switch (response.type) {
360355
case READY:
361356
return checkClusterName(protocolVersion, initExecutor);
@@ -1728,6 +1723,8 @@ protected void initChannel(SocketChannel channel) throws Exception {
17281723
pipeline.addLast("frameDecoder", new Frame.Decoder());
17291724
pipeline.addLast("frameEncoder", frameEncoder);
17301725

1726+
pipeline.addLast("framingFormatHandler", new FramingFormatHandler(connection.factory));
1727+
17311728
if (compressor != null
17321729
// Frame-level compression is only done in legacy protocol versions. In V5 and above, it
17331730
// happens at a higher level ("segment" that groups multiple frames), so never install
@@ -1767,37 +1764,6 @@ private Message.ProtocolEncoder messageEncoderFor(ProtocolVersion version) {
17671764
}
17681765
}
17691766

1770-
/**
1771-
* Rearranges the pipeline to deal with the new framing structure in protocol v5 and above. This
1772-
* has to be done manually, because it only happens once we've confirmed that the server supports
1773-
* v5.
1774-
*/
1775-
void switchToV5Framing() {
1776-
// We want to do this on the event loop, to make sure it doesn't race with incoming requests
1777-
assert channel.eventLoop().inEventLoop();
1778-
1779-
ChannelPipeline pipeline = channel.pipeline();
1780-
SegmentCodec segmentCodec =
1781-
new SegmentCodec(
1782-
channel.alloc(), factory.configuration.getProtocolOptions().getCompression());
1783-
1784-
// Outbound: "message -> segment -> bytes" instead of "message -> frame -> bytes"
1785-
Message.ProtocolEncoder requestEncoder =
1786-
(Message.ProtocolEncoder) pipeline.get("messageEncoder");
1787-
pipeline.replace(
1788-
"messageEncoder",
1789-
"messageToSegmentEncoder",
1790-
new MessageToSegmentEncoder(channel.alloc(), requestEncoder));
1791-
pipeline.replace(
1792-
"frameEncoder", "segmentToBytesEncoder", new SegmentToBytesEncoder(segmentCodec));
1793-
1794-
// Inbound: "frame <- segment <- bytes" instead of "frame <- bytes"
1795-
pipeline.replace(
1796-
"frameDecoder", "bytesToSegmentDecoder", new BytesToSegmentDecoder(segmentCodec));
1797-
pipeline.addAfter(
1798-
"bytesToSegmentDecoder", "segmentToFrameDecoder", new SegmentToFrameDecoder());
1799-
}
1800-
18011767
/** A component that "owns" a connection, and should be notified when it dies. */
18021768
interface Owner {
18031769
void onConnectionDefunct(Connection connection);
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.driver.core;
17+
18+
import com.datastax.driver.core.Message.Response.Type;
19+
import io.netty.channel.ChannelHandlerContext;
20+
import io.netty.channel.ChannelPipeline;
21+
import io.netty.handler.codec.MessageToMessageDecoder;
22+
import java.util.List;
23+
24+
/**
25+
* A handler to deal with different protocol framing formats.
26+
*
27+
* <p>This handler detects when a handshake is successful; then, if necessary, adapts the pipeline
28+
* to the modern framing format introduced in protocol v5.
29+
*/
30+
public class FramingFormatHandler extends MessageToMessageDecoder<Frame> {
31+
32+
private final Connection.Factory factory;
33+
34+
FramingFormatHandler(Connection.Factory factory) {
35+
this.factory = factory;
36+
}
37+
38+
@Override
39+
protected void decode(ChannelHandlerContext ctx, Frame frame, List<Object> out) throws Exception {
40+
boolean handshakeSuccessful =
41+
frame.header.opcode == Type.READY.opcode || frame.header.opcode == Type.AUTHENTICATE.opcode;
42+
if (handshakeSuccessful) {
43+
// By default, the pipeline is configured for legacy framing since this is the format used
44+
// by all protocol versions until handshake; after handshake however, we need to switch to
45+
// modern framing for protocol v5 and higher.
46+
if (frame.header.version.compareTo(ProtocolVersion.V5) >= 0) {
47+
switchToModernFraming(ctx);
48+
}
49+
// once the handshake is successful, the framing format cannot change anymore;
50+
// we can safely remove ourselves from the pipeline.
51+
ctx.pipeline().remove("framingFormatHandler");
52+
}
53+
out.add(frame);
54+
}
55+
56+
private void switchToModernFraming(ChannelHandlerContext ctx) {
57+
ChannelPipeline pipeline = ctx.pipeline();
58+
SegmentCodec segmentCodec =
59+
new SegmentCodec(
60+
ctx.channel().alloc(), factory.configuration.getProtocolOptions().getCompression());
61+
62+
// Outbound: "message -> segment -> bytes" instead of "message -> frame -> bytes"
63+
Message.ProtocolEncoder requestEncoder =
64+
(Message.ProtocolEncoder) pipeline.get("messageEncoder");
65+
pipeline.replace(
66+
"messageEncoder",
67+
"messageToSegmentEncoder",
68+
new MessageToSegmentEncoder(ctx.channel().alloc(), requestEncoder));
69+
pipeline.replace(
70+
"frameEncoder", "segmentToBytesEncoder", new SegmentToBytesEncoder(segmentCodec));
71+
72+
// Inbound: "frame <- segment <- bytes" instead of "frame <- bytes"
73+
pipeline.replace(
74+
"frameDecoder", "bytesToSegmentDecoder", new BytesToSegmentDecoder(segmentCodec));
75+
pipeline.addAfter(
76+
"bytesToSegmentDecoder", "segmentToFrameDecoder", new SegmentToFrameDecoder());
77+
}
78+
}

0 commit comments

Comments
 (0)