Skip to content

Commit d1be409

Browse files
shvo123artembilan
authored andcommitted
GH-3705: Close TcpNioConn.ChannelOutStr.selector
Fixes #3705 Closing/destroying `ChannelOutputStream` object does not close the selector therefore it retains redundant pipes/FD that cen be seen using lsof command or ls /proc/ * Close `TcpNioConnection.ChannelOutputStream.selector` in the `ChannelOutputStream` * Close `TcpNioConnection.ChannelOutputStream` when connection is closed * Code style clean up **Cherry-pick to `5.3.x` & `5.4.x`** # Conflicts: # spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioConnection.java
1 parent f6c6a72 commit d1be409

File tree

1 file changed

+17
-16
lines changed
  • spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection

1 file changed

+17
-16
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioConnection.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -54,6 +54,7 @@
5454
* @author Gary Russell
5555
* @author John Anderson
5656
* @author Artem Bilan
57+
* @author David Herschler Shvo
5758
*
5859
* @since 2.0
5960
*
@@ -70,7 +71,7 @@ public class TcpNioConnection extends TcpConnectionSupport {
7071

7172
private final SocketChannel socketChannel;
7273

73-
private final ChannelOutputStream channelOutputStream;
74+
private final ChannelOutputStream channelOutputStream = new ChannelOutputStream();
7475

7576
private final ChannelInputStream channelInputStream = new ChannelInputStream();
7677

@@ -113,7 +114,6 @@ public TcpNioConnection(SocketChannel socketChannel, boolean server, boolean loo
113114

114115
super(socketChannel.socket(), server, lookupHost, applicationEventPublisher, connectionFactoryName);
115116
this.socketChannel = socketChannel;
116-
this.channelOutputStream = new ChannelOutputStream();
117117
}
118118

119119
public void setPipeTimeout(long pipeTimeout) {
@@ -124,20 +124,25 @@ public void setPipeTimeout(long pipeTimeout) {
124124
public void close() {
125125
setNoReadErrorOnClose(true);
126126
doClose();
127+
super.close();
127128
}
128129

129130
private void doClose() {
130131
try {
131132
this.channelInputStream.close();
132133
}
133-
catch (@SuppressWarnings(UNUSED) IOException e) {
134+
catch (@SuppressWarnings(UNUSED) IOException ex) {
135+
}
136+
try {
137+
this.channelOutputStream.close();
138+
}
139+
catch (@SuppressWarnings(UNUSED) Exception ex) {
134140
}
135141
try {
136142
this.socketChannel.close();
137143
}
138-
catch (@SuppressWarnings(UNUSED) Exception e) {
144+
catch (@SuppressWarnings(UNUSED) Exception ex) {
139145
}
140-
super.close();
141146
}
142147

143148
@Override
@@ -364,9 +369,8 @@ private synchronized Message<?> convert() throws IOException {
364369
throw new IOException("Interrupted waiting for IO", e);
365370
}
366371
}
367-
Message<?> message = null;
368372
try {
369-
message = getMapper().toMessage(this);
373+
return getMapper().toMessage(this);
370374
}
371375
catch (Exception e) {
372376
closeConnection(true);
@@ -382,7 +386,6 @@ private synchronized Message<?> convert() throws IOException {
382386
}
383387
return null;
384388
}
385-
return message;
386389
}
387390

388391
private void sendToChannel(Message<?> message) {
@@ -477,7 +480,7 @@ private void checkForAssembler() {
477480
this.executionControl.decrementAndGet();
478481
if (logger.isInfoEnabled()) {
479482
logger.info("Insufficient threads in the assembler fixed thread pool; consider increasing " +
480-
"this task executor pool size");
483+
"this task executor pool size");
481484
}
482485
throw e;
483486
}
@@ -609,12 +612,10 @@ public void write(int b) throws IOException {
609612
}
610613

611614
@Override
612-
public void close() {
613-
doClose();
614-
}
615-
616-
@Override
617-
public void flush() {
615+
public void close() throws IOException {
616+
if (this.selector != null) {
617+
this.selector.close();
618+
}
618619
}
619620

620621
@Override

0 commit comments

Comments
 (0)