|
34 | 34 | import java.nio.channels.SocketChannel; |
35 | 35 | import java.security.GeneralSecurityException; |
36 | 36 | import java.util.ArrayList; |
37 | | -import java.util.HashSet; |
38 | 37 | import java.util.Iterator; |
39 | 38 | import java.util.List; |
40 | 39 | import java.util.Set; |
41 | 40 | import java.util.concurrent.Callable; |
| 41 | +import java.util.concurrent.ConcurrentHashMap; |
42 | 42 | import java.util.concurrent.ExecutorService; |
43 | 43 | import java.util.concurrent.Executors; |
44 | 44 | import java.util.concurrent.Future; |
@@ -80,7 +80,7 @@ public abstract class NioConnection implements Callable<Boolean> { |
80 | 80 | protected ExecutorService _executor; |
81 | 81 | protected ExecutorService _sslHandshakeExecutor; |
82 | 82 | protected CAService caService; |
83 | | - protected Set<SocketChannel> socketChannels = new HashSet<>(); |
| 83 | + protected Set<SocketChannel> socketChannels = ConcurrentHashMap.newKeySet(); |
84 | 84 | protected Integer sslHandshakeTimeout = null; |
85 | 85 | private final int factoryMaxNewConnectionsCount; |
86 | 86 | protected boolean blockNewConnections; |
@@ -219,7 +219,7 @@ public Boolean call() throws NioConnectionException { |
219 | 219 | return true; |
220 | 220 | } |
221 | 221 |
|
222 | | - abstract void init() throws IOException; |
| 222 | + protected abstract void init() throws IOException; |
223 | 223 |
|
224 | 224 | abstract void registerLink(InetSocketAddress saddr, Link link); |
225 | 225 |
|
@@ -489,16 +489,47 @@ protected void write(final SelectionKey key) throws IOException { |
489 | 489 | } |
490 | 490 |
|
491 | 491 | protected void closeConnection(final SelectionKey key) { |
492 | | - if (key != null) { |
493 | | - final SocketChannel channel = (SocketChannel)key.channel(); |
494 | | - key.cancel(); |
| 492 | + if (key == null) { |
| 493 | + return; |
| 494 | + } |
| 495 | + |
| 496 | + SocketChannel channel = null; |
| 497 | + try { |
| 498 | + // 1. Check type and handle potential CancelledKeyException |
| 499 | + if (key.isValid() && key.channel() instanceof SocketChannel) { |
| 500 | + channel = (SocketChannel) key.channel(); |
| 501 | + } |
| 502 | + } catch (CancelledKeyException e) { |
| 503 | + logger.trace("Key already cancelled when trying to get channel in closeConnection."); |
| 504 | + } |
| 505 | + |
| 506 | + // 2. Cancel the key (safe to call even if already cancelled) |
| 507 | + key.cancel(); |
| 508 | + |
| 509 | + if (channel == null) { |
| 510 | + logger.trace("Channel was null, invalid, or not a SocketChannel for key: " + key); |
| 511 | + return; |
| 512 | + } |
| 513 | + |
| 514 | + // 3. Try to close the channel if we obtained it |
| 515 | + if (channel != null) { |
| 516 | + closeChannel(channel); |
| 517 | + } else { |
| 518 | + logger.trace("Channel was null, invalid, or not a SocketChannel for key: " + key); |
| 519 | + } |
| 520 | + } |
| 521 | + |
| 522 | + private void closeChannel(SocketChannel channel) { |
| 523 | + if (channel != null && channel.isOpen()) { |
495 | 524 | try { |
496 | | - if (channel != null) { |
497 | | - logger.debug("Closing socket {}", channel.socket()); |
498 | | - channel.close(); |
499 | | - } |
500 | | - } catch (final IOException ignore) { |
501 | | - logger.info("[ignored] channel"); |
| 525 | + logger.debug("Closing socket " + channel.socket()); |
| 526 | + channel.close(); |
| 527 | + } catch (IOException ignore) { |
| 528 | + logger.warn(String.format("[ignored] Exception closing channel: %s, due to %s", channel, ignore.getMessage())); |
| 529 | + } catch (Exception e) { |
| 530 | + logger.warn(String.format("Unexpected exception in closing channel %s", channel), e); |
| 531 | + } finally { |
| 532 | + socketChannels.remove(channel); |
502 | 533 | } |
503 | 534 | } |
504 | 535 | } |
@@ -530,14 +561,7 @@ public void close(final SelectionKey key) { |
530 | 561 | /* Release the resource used by the instance */ |
531 | 562 | public void cleanUp() throws IOException { |
532 | 563 | for (SocketChannel channel : socketChannels) { |
533 | | - if (channel != null && channel.isOpen()) { |
534 | | - try { |
535 | | - logger.info("Closing connection: {}", channel.getRemoteAddress()); |
536 | | - channel.close(); |
537 | | - } catch (IOException e) { |
538 | | - logger.warn("Unable to close connection due to {}", e.getMessage()); |
539 | | - } |
540 | | - } |
| 564 | + closeChannel(channel); |
541 | 565 | } |
542 | 566 | if (_selector != null) { |
543 | 567 | _selector.close(); |
|
0 commit comments