Skip to content

Commit 0b38b8d

Browse files
shvo123artembilan
authored andcommitted
GH-3705: Close TcpNioConn.ChannelOutStr.selector
Fixes #3705 Closing/destroying `ChannelOutputStream` object does not close the selector therefore it retains redundant pipes/FD that cen be seen using lsof command or ls /proc/ * Close `TcpNioConnection.ChannelOutputStream.selector` in the `ChannelOutputStream` * Close `TcpNioConnection.ChannelOutputStream` when connection is closed * Code style clean up **Cherry-pick to `5.3.x` & `5.4.x`**
1 parent 6c769ca commit 0b38b8d

File tree

1 file changed

+17
-16
lines changed
  • spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection

1 file changed

+17
-16
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioConnection.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -56,6 +56,7 @@
5656
* @author Gary Russell
5757
* @author John Anderson
5858
* @author Artem Bilan
59+
* @author David Herschler Shvo
5960
*
6061
* @since 2.0
6162
*
@@ -74,7 +75,7 @@ public class TcpNioConnection extends TcpConnectionSupport {
7475

7576
private final SocketChannel socketChannel;
7677

77-
private final ChannelOutputStream channelOutputStream;
78+
private final ChannelOutputStream channelOutputStream = new ChannelOutputStream();
7879

7980
private final ChannelInputStream channelInputStream = new ChannelInputStream();
8081

@@ -115,7 +116,6 @@ public TcpNioConnection(SocketChannel socketChannel, boolean server, boolean loo
115116

116117
super(socketChannel.socket(), server, lookupHost, applicationEventPublisher, connectionFactoryName);
117118
this.socketChannel = socketChannel;
118-
this.channelOutputStream = new ChannelOutputStream();
119119
}
120120

121121
public void setPipeTimeout(long pipeTimeout) {
@@ -126,20 +126,25 @@ public void setPipeTimeout(long pipeTimeout) {
126126
public void close() {
127127
setNoReadErrorOnClose(true);
128128
doClose();
129+
super.close();
129130
}
130131

131132
private void doClose() {
132133
try {
133134
this.channelInputStream.close();
134135
}
135-
catch (@SuppressWarnings(UNUSED) IOException e) {
136+
catch (@SuppressWarnings(UNUSED) IOException ex) {
137+
}
138+
try {
139+
this.channelOutputStream.close();
140+
}
141+
catch (@SuppressWarnings(UNUSED) Exception ex) {
136142
}
137143
try {
138144
this.socketChannel.close();
139145
}
140-
catch (@SuppressWarnings(UNUSED) Exception e) {
146+
catch (@SuppressWarnings(UNUSED) Exception ex) {
141147
}
142-
super.close();
143148
}
144149

145150
@Override
@@ -376,9 +381,8 @@ private synchronized Message<?> convert() throws IOException {
376381
throw new IOException("Interrupted waiting for IO", e);
377382
}
378383
}
379-
Message<?> message = null;
380384
try {
381-
message = getMapper().toMessage(this);
385+
return getMapper().toMessage(this);
382386
}
383387
catch (Exception e) {
384388
closeConnection(true);
@@ -394,7 +398,6 @@ private synchronized Message<?> convert() throws IOException {
394398
}
395399
return null;
396400
}
397-
return message;
398401
}
399402

400403
private void sendToChannel(Message<?> message) {
@@ -486,7 +489,7 @@ private void checkForAssembler() {
486489
catch (RejectedExecutionException e) {
487490
this.executionControl.decrementAndGet();
488491
logger.info("Insufficient threads in the assembler fixed thread pool; consider increasing " +
489-
"this task executor pool size");
492+
"this task executor pool size");
490493
throw e;
491494
}
492495
}
@@ -619,12 +622,10 @@ public void write(int b) throws IOException {
619622
}
620623

621624
@Override
622-
public void close() {
623-
doClose();
624-
}
625-
626-
@Override
627-
public void flush() {
625+
public void close() throws IOException {
626+
if (this.selector != null) {
627+
this.selector.close();
628+
}
628629
}
629630

630631
@Override

0 commit comments

Comments
 (0)