diff --git a/src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java b/src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java index ea9351e410..a16c83db06 100644 --- a/src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java +++ b/src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -73,9 +73,9 @@ public void run() { for (SelectionKey selectionKey : selector.keys()) { SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState) selectionKey.attachment(); - if (state.getConnection() != null && state.getConnection().getHeartbeat() > 0) { - long now = System.currentTimeMillis(); - if ((now - state.getLastActivity()) > state.getConnection().getHeartbeat() * 1000 * 2) { + if (state.getConnection() != null && state.getHeartbeatNanoSeconds() > 0) { + long now = System.nanoTime(); + if ((now - state.getLastActivity()) > state.getHeartbeatNanoSeconds() * 2) { try { handleHeartbeatFailure(state); } catch (Exception e) { @@ -91,7 +91,7 @@ public void run() { if (!writeRegistered && registrations.isEmpty() && writeRegistrations.isEmpty()) { // we can block, registrations will call Selector.wakeup() select = selector.select(1000); - if (selector.keys().size() == 0) { + if (selector.keys().isEmpty()) { // we haven't been doing anything for a while, shutdown state boolean clean = context.cleanUp(); if (clean) { @@ -135,11 +135,9 @@ public void run() { if (!key.isValid()) { continue; } - - if (key.isReadable()) { - final SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState) key.attachment(); - - try { + final SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState) key.attachment(); + try { + if (key.isReadable()) { if (!state.getChannel().isOpen()) { key.cancel(); continue; @@ -175,14 +173,14 @@ public void run() { } } - state.setLastActivity(System.currentTimeMillis()); - } catch (final Exception e) { - LOGGER.warn("Error during reading frames", e); - handleIoError(state, e); - key.cancel(); - } finally { - buffer.clear(); + state.setLastActivity(System.nanoTime()); } + } catch (final Exception e) { + LOGGER.warn("Error during reading frames", e); + handleIoError(state, e); + key.cancel(); + } finally { + buffer.clear(); } } } @@ -222,9 +220,8 @@ public void run() { continue; } - if (key.isWritable()) { - boolean cancelKey = true; - try { + try { + if (key.isWritable()) { if (!state.getChannel().isOpen()) { key.cancel(); continue; @@ -243,17 +240,12 @@ public void run() { written++; } outputStream.flush(); - if (!state.getWriteQueue().isEmpty()) { - cancelKey = true; - } - } catch (Exception e) { - handleIoError(state, e); - } finally { - state.endWriteSequence(); - if (cancelKey) { - key.cancel(); - } } + } catch (Exception e) { + handleIoError(state, e); + } finally { + state.endWriteSequence(); + key.cancel(); } } } @@ -269,7 +261,7 @@ protected void handleIoError(SocketChannelFrameHandlerState state, Throwable ex) } else { try { state.close(); - } catch (IOException e) { + } catch (IOException ignored) { } } @@ -284,7 +276,7 @@ protected void handleHeartbeatFailure(SocketChannelFrameHandlerState state) { } else { try { state.close(); - } catch (IOException e) { + } catch (IOException ignored) { } } diff --git a/src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandler.java b/src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandler.java index 93eea9cd60..c75d988a57 100644 --- a/src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandler.java +++ b/src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandler.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.SocketException; +import java.time.Duration; /** * @@ -61,6 +62,9 @@ public int getPort() { @Override public void setTimeout(int timeoutMs) throws SocketException { state.getChannel().socket().setSoTimeout(timeoutMs); + if (state.getConnection() != null) { + state.setHeartbeat(Duration.ofSeconds(state.getConnection().getHeartbeat())); + } } @Override diff --git a/src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerState.java b/src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerState.java index f8566fb876..9c84cadbe1 100644 --- a/src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerState.java +++ b/src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerState.java @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -26,7 +26,7 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; - +import java.time.Duration; /** * @@ -43,6 +43,7 @@ public class SocketChannelFrameHandlerState { private final NioQueue writeQueue; private volatile AMQConnection connection; + private volatile long heartbeatNanoSeconds = -1; /** should be used only in the NIO read thread */ private long lastActivity; @@ -157,6 +158,10 @@ public void setConnection(AMQConnection connection) { this.connection = connection; } + void setHeartbeat(Duration ht) { + this.heartbeatNanoSeconds = ht.toNanos(); + } + public void setLastActivity(long lastActivity) { this.lastActivity = lastActivity; } @@ -165,6 +170,10 @@ public long getLastActivity() { return lastActivity; } + long getHeartbeatNanoSeconds() { + return this.heartbeatNanoSeconds; + } + void prepareForWriteSequence() { if(ssl) { plainOut.clear();