Skip to content

Commit 5b92eac

Browse files
committed
Improve ByteBuffer#flip logic when using TLS
Fixes #307
1 parent 5c3b0f2 commit 5b92eac

File tree

2 files changed

+13
-10
lines changed

2 files changed

+13
-10
lines changed

src/main/java/com/rabbitmq/client/impl/nio/SslEngineByteBufferInputStream.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,16 +66,15 @@ public int read() throws IOException {
6666
}
6767

6868
int bytesRead = NioHelper.read(channel, cipherIn);
69-
if (bytesRead > 0) {
70-
cipherIn.flip();
71-
} else {
69+
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/307
70+
if (bytesRead <= 0) {
7271
bytesRead = NioHelper.retryRead(channel, cipherIn);
7372
if(bytesRead <= 0) {
7473
throw new IllegalStateException("Should be reading something from the network");
7574
}
76-
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/307
77-
cipherIn.flip();
7875
}
76+
cipherIn.flip();
77+
7978
plainIn.clear();
8079
result = sslEngine.unwrap(cipherIn, plainIn);
8180

src/test/java/com/rabbitmq/client/test/ssl/NioTlsUnverifiedConnection.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,14 @@ public void configure(SSLEngine sslEngine) throws IOException {
9595
}
9696
}
9797

98-
@Test public void largeMessage() throws Exception {
99-
CountDownLatch latch = new CountDownLatch(1);
100-
connection = basicGetBasicConsume(connection, "tls.nio.queue", latch, 1 * 1000 * 1000);
101-
boolean messagesReceived = latch.await(5, TimeUnit.SECONDS);
102-
assertTrue("Message has not been received", messagesReceived);
98+
@Test public void messageSize() throws Exception {
99+
int [] sizes = new int [] {100, 1000, 10 * 1000, 1 * 1000 * 1000, 5 * 1000 * 1000};
100+
for(int size : sizes) {
101+
CountDownLatch latch = new CountDownLatch(1);
102+
connection = basicGetBasicConsume(connection, "tls.nio.queue", latch, size);
103+
boolean messagesReceived = latch.await(5, TimeUnit.SECONDS);
104+
assertTrue("Message has not been received", messagesReceived);
105+
}
103106
}
104107

105108
private Connection basicGetBasicConsume(Connection connection, String queue, final CountDownLatch latch, int msgSize)
@@ -116,6 +119,7 @@ private Connection basicGetBasicConsume(Connection connection, String queue, fin
116119
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
117120
getChannel().basicAck(envelope.getDeliveryTag(), false);
118121
latch.countDown();
122+
getChannel().basicCancel(consumerTag);
119123
}
120124
});
121125

0 commit comments

Comments
 (0)