Skip to content

Commit 099cb3d

Browse files
committed
Add frame builder that supports TLS in NIO mode
References #319
1 parent 72e792a commit 099cb3d

File tree

9 files changed

+139
-201
lines changed

9 files changed

+139
-201
lines changed

src/main/java/com/rabbitmq/client/impl/nio/ByteBufferInputStream.java

Lines changed: 0 additions & 57 deletions
This file was deleted.

src/main/java/com/rabbitmq/client/impl/nio/FrameBuilder.java

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,31 +24,50 @@
2424
import java.nio.channels.ReadableByteChannel;
2525

2626
/**
27-
*
27+
* Class to create AMQP frames from a {@link ReadableByteChannel}.
28+
* Supports partial frames: a frame can be read in several attempts
29+
* from the {@link NioLoop}. This can happen when the channel won't
30+
* read any more bytes in the middle of a frame building. The state
31+
* of the outstanding frame is saved up, and the builder will
32+
* start where it left off when the {@link NioLoop} comes back to
33+
* this connection.
34+
* This class is not thread safe.
35+
* @since 4.4.0
2836
*/
2937
public class FrameBuilder {
3038

3139
private static final int PAYLOAD_OFFSET = 1 /* type */ + 2 /* channel */ + 4 /* payload size */;
3240

33-
private final ReadableByteChannel channel;
41+
protected final ReadableByteChannel channel;
3442

35-
private final ByteBuffer buffer;
43+
protected final ByteBuffer applicationBuffer;
3644

3745
private int frameType;
3846
private int frameChannel;
3947
private byte [] framePayload;
4048

4149
private int bytesRead = 0;
4250

51+
// to store the bytes of the outstanding data
52+
// 3 byte-long because the longest we read is an unsigned int
53+
// (not need to store the latest byte)
4354
private final int [] frameBuffer = new int[3];
4455

4556
public FrameBuilder(ReadableByteChannel channel, ByteBuffer buffer) {
4657
this.channel = channel;
47-
this.buffer = buffer;
58+
this.applicationBuffer = buffer;
4859
}
4960

61+
/**
62+
* Read a frame from the network.
63+
* This method returns null f a frame could not have been fully built from
64+
* the network. The client must then retry later (typically
65+
* when the channel notifies it has something to read).
66+
* @return a complete frame or null if a frame couldn't have been fully built
67+
* @throws IOException
68+
*/
5069
public Frame readFrame() throws IOException {
51-
while(readFromNetworkIfNecessary()) {
70+
while(somethingToRead()) {
5271
if (bytesRead == 0) {
5372
// type
5473
// FIXME check first byte isn't 'A' and thus a header indicating protocol version mismatch
@@ -90,18 +109,24 @@ public Frame readFrame() throws IOException {
90109
}
91110

92111
private int read() throws IOException {
93-
return NioHelper.read(channel, buffer);
112+
return NioHelper.read(channel, applicationBuffer);
94113
}
95114

96115
private int readFromBuffer() {
97-
return buffer.get() & 0xff;
116+
return applicationBuffer.get() & 0xff;
98117
}
99118

100-
private boolean readFromNetworkIfNecessary() throws IOException {
101-
if(!buffer.hasRemaining()) {
102-
buffer.clear();
119+
/**
120+
* Tells whether there's something to read in the application buffer or not.
121+
* Tries to read from the network if necessary.
122+
* @return true if there's something to read in the application buffer
123+
* @throws IOException
124+
*/
125+
protected boolean somethingToRead() throws IOException {
126+
if(!applicationBuffer.hasRemaining()) {
127+
applicationBuffer.clear();
103128
int read = read();
104-
buffer.flip();
129+
applicationBuffer.flip();
105130
if (read > 0) {
106131
return true;
107132
} else {

src/main/java/com/rabbitmq/client/impl/nio/NioHelper.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,29 +29,4 @@ static int read(ReadableByteChannel channel, ByteBuffer buffer) throws IOExcepti
2929
return read;
3030
}
3131

32-
static int retryRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
33-
int attempt = 0;
34-
int read = 0;
35-
while(attempt < 3) {
36-
try {
37-
Thread.sleep(100L);
38-
} catch (InterruptedException e) {
39-
// ignore
40-
}
41-
read = read(channel, buffer);
42-
if(read > 0) {
43-
break;
44-
}
45-
attempt++;
46-
}
47-
return read;
48-
}
49-
50-
static int readWithRetry(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
51-
int bytesRead = NioHelper.read(channel, buffer);
52-
if (bytesRead <= 0) {
53-
bytesRead = NioHelper.retryRead(channel, buffer);
54-
}
55-
return bytesRead;
56-
}
5732
}

src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.slf4j.Logger;
2121
import org.slf4j.LoggerFactory;
2222

23-
import java.io.DataInputStream;
2423
import java.io.DataOutputStream;
2524
import java.io.IOException;
2625
import java.nio.ByteBuffer;
@@ -144,17 +143,10 @@ public void run() {
144143
continue;
145144
}
146145

147-
DataInputStream inputStream = state.inputStream;
148-
149146
state.prepareForReadSequence();
150147

151148
while (state.continueReading()) {
152-
final Frame frame;
153-
if (state.frameBuilder == null) {
154-
frame = Frame.readFrom(inputStream);
155-
} else {
156-
frame = state.frameBuilder.readFrame();
157-
}
149+
final Frame frame = state.frameBuilder.readFrame();
158150

159151
if (frame != null) {
160152
try {

src/main/java/com/rabbitmq/client/impl/nio/NioParams.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
/**
2828
* Parameters used to configure the NIO mode of a {@link com.rabbitmq.client.ConnectionFactory}.
29-
*
29+
* @since 4.0.0
3030
*/
3131
public class NioParams {
3232

src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerState.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.slf4j.LoggerFactory;
2222

2323
import javax.net.ssl.SSLEngine;
24-
import java.io.DataInputStream;
2524
import java.io.DataOutputStream;
2625
import java.io.IOException;
2726
import java.nio.ByteBuffer;
@@ -75,8 +74,6 @@ public class SocketChannelFrameHandlerState {
7574

7675
final DataOutputStream outputStream;
7776

78-
final DataInputStream inputStream;
79-
8077
final FrameBuilder frameBuilder;
8178

8279
public SocketChannelFrameHandlerState(SocketChannel channel, NioLoopContext nioLoopsState, NioParams nioParams, SSLEngine sslEngine) {
@@ -96,9 +93,6 @@ public SocketChannelFrameHandlerState(SocketChannel channel, NioLoopContext nioL
9693
this.outputStream = new DataOutputStream(
9794
new ByteBufferOutputStream(channel, plainOut)
9895
);
99-
this.inputStream = new DataInputStream(
100-
new ByteBufferInputStream(channel, plainIn)
101-
);
10296

10397
this.frameBuilder = new FrameBuilder(channel, plainIn);
10498

@@ -112,11 +106,7 @@ public SocketChannelFrameHandlerState(SocketChannel channel, NioLoopContext nioL
112106
this.outputStream = new DataOutputStream(
113107
new SslEngineByteBufferOutputStream(sslEngine, plainOut, cipherOut, channel)
114108
);
115-
this.inputStream = new DataInputStream(
116-
new SslEngineByteBufferInputStream(sslEngine, plainIn, cipherIn, channel)
117-
);
118-
119-
this.frameBuilder = null;
109+
this.frameBuilder = new SslEngineFrameBuilder(sslEngine, plainIn, cipherIn, channel);
120110
}
121111

122112
}
@@ -202,6 +192,10 @@ boolean continueReading() throws IOException {
202192
if (!plainIn.hasRemaining() && !cipherIn.hasRemaining()) {
203193
// need to try to read something
204194
cipherIn.clear();
195+
196+
// FIXME this logic may be simplified:
197+
// flipping cipherIn and return cipherIn.hasRemaining should be enough
198+
205199
int bytesRead = NioHelper.read(channel, cipherIn);
206200
if (bytesRead <= 0) {
207201
return false;

src/main/java/com/rabbitmq/client/impl/nio/SslEngineByteBufferInputStream.java

Lines changed: 0 additions & 86 deletions
This file was deleted.

0 commit comments

Comments
 (0)