1- // Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
1+ // Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
22//
33// This software, the RabbitMQ Java client library, is triple-licensed under the
44// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -73,9 +73,9 @@ public void run() {
7373
7474 for (SelectionKey selectionKey : selector .keys ()) {
7575 SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState ) selectionKey .attachment ();
76- if (state .getConnection () != null && state .getConnection (). getHeartbeat () > 0 ) {
77- long now = System .currentTimeMillis ();
78- if ((now - state .getLastActivity ()) > state .getConnection (). getHeartbeat () * 1000 * 2 ) {
76+ if (state .getConnection () != null && state .getHeartbeatNanoSeconds () > 0 ) {
77+ long now = System .nanoTime ();
78+ if ((now - state .getLastActivity ()) > state .getHeartbeatNanoSeconds () * 2 ) {
7979 try {
8080 handleHeartbeatFailure (state );
8181 } catch (Exception e ) {
@@ -91,7 +91,7 @@ public void run() {
9191 if (!writeRegistered && registrations .isEmpty () && writeRegistrations .isEmpty ()) {
9292 // we can block, registrations will call Selector.wakeup()
9393 select = selector .select (1000 );
94- if (selector .keys ().size () == 0 ) {
94+ if (selector .keys ().isEmpty () ) {
9595 // we haven't been doing anything for a while, shutdown state
9696 boolean clean = context .cleanUp ();
9797 if (clean ) {
@@ -135,11 +135,9 @@ public void run() {
135135 if (!key .isValid ()) {
136136 continue ;
137137 }
138-
139- if (key .isReadable ()) {
140- final SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState ) key .attachment ();
141-
142- try {
138+ final SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState ) key .attachment ();
139+ try {
140+ if (key .isReadable ()) {
143141 if (!state .getChannel ().isOpen ()) {
144142 key .cancel ();
145143 continue ;
@@ -175,14 +173,14 @@ public void run() {
175173 }
176174 }
177175
178- state .setLastActivity (System .currentTimeMillis ());
179- } catch (final Exception e ) {
180- LOGGER .warn ("Error during reading frames" , e );
181- handleIoError (state , e );
182- key .cancel ();
183- } finally {
184- buffer .clear ();
176+ state .setLastActivity (System .nanoTime ());
185177 }
178+ } catch (final Exception e ) {
179+ LOGGER .warn ("Error during reading frames" , e );
180+ handleIoError (state , e );
181+ key .cancel ();
182+ } finally {
183+ buffer .clear ();
186184 }
187185 }
188186 }
@@ -222,9 +220,8 @@ public void run() {
222220 continue ;
223221 }
224222
225- if (key .isWritable ()) {
226- boolean cancelKey = true ;
227- try {
223+ try {
224+ if (key .isWritable ()) {
228225 if (!state .getChannel ().isOpen ()) {
229226 key .cancel ();
230227 continue ;
@@ -243,17 +240,12 @@ public void run() {
243240 written ++;
244241 }
245242 outputStream .flush ();
246- if (!state .getWriteQueue ().isEmpty ()) {
247- cancelKey = true ;
248- }
249- } catch (Exception e ) {
250- handleIoError (state , e );
251- } finally {
252- state .endWriteSequence ();
253- if (cancelKey ) {
254- key .cancel ();
255- }
256243 }
244+ } catch (Exception e ) {
245+ handleIoError (state , e );
246+ } finally {
247+ state .endWriteSequence ();
248+ key .cancel ();
257249 }
258250 }
259251 }
@@ -269,7 +261,7 @@ protected void handleIoError(SocketChannelFrameHandlerState state, Throwable ex)
269261 } else {
270262 try {
271263 state .close ();
272- } catch (IOException e ) {
264+ } catch (IOException ignored ) {
273265
274266 }
275267 }
@@ -284,7 +276,7 @@ protected void handleHeartbeatFailure(SocketChannelFrameHandlerState state) {
284276 } else {
285277 try {
286278 state .close ();
287- } catch (IOException e ) {
279+ } catch (IOException ignored ) {
288280
289281 }
290282 }
0 commit comments