Skip to content

Commit 592e3a4

Browse files
committed
[SPARK-25218][CORE] Fix potential resource leaks in TransportServer and SocketAuthHelper
## What changes were proposed in this pull request? Make sure TransportServer and SocketAuthHelper close the resources for all types of errors. ## How was this patch tested? Jenkins Closes apache#22210 from zsxwing/SPARK-25218. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 8198ea5 commit 592e3a4

File tree

3 files changed

+54
-37
lines changed

3 files changed

+54
-37
lines changed

common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,16 @@ public ByteBuffer nioByteBuffer() throws IOException {
7777
return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
7878
}
7979
} catch (IOException e) {
80+
String errorMessage = "Error in reading " + this;
8081
try {
8182
if (channel != null) {
8283
long size = channel.size();
83-
throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
84-
e);
84+
errorMessage = "Error in reading " + this + " (actual file length " + size + ")";
8585
}
8686
} catch (IOException ignored) {
8787
// ignore
8888
}
89-
throw new IOException("Error in opening " + this, e);
89+
throw new IOException(errorMessage, e);
9090
} finally {
9191
JavaUtils.closeQuietly(channel);
9292
}
@@ -95,26 +95,24 @@ public ByteBuffer nioByteBuffer() throws IOException {
9595
@Override
9696
public InputStream createInputStream() throws IOException {
9797
FileInputStream is = null;
98+
boolean shouldClose = true;
9899
try {
99100
is = new FileInputStream(file);
100101
ByteStreams.skipFully(is, offset);
101-
return new LimitedInputStream(is, length);
102+
InputStream r = new LimitedInputStream(is, length);
103+
shouldClose = false;
104+
return r;
102105
} catch (IOException e) {
103-
try {
104-
if (is != null) {
105-
long size = file.length();
106-
throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
107-
e);
108-
}
109-
} catch (IOException ignored) {
110-
// ignore
111-
} finally {
106+
String errorMessage = "Error in reading " + this;
107+
if (is != null) {
108+
long size = file.length();
109+
errorMessage = "Error in reading " + this + " (actual file length " + size + ")";
110+
}
111+
throw new IOException(errorMessage, e);
112+
} finally {
113+
if (shouldClose) {
112114
JavaUtils.closeQuietly(is);
113115
}
114-
throw new IOException("Error in opening " + this, e);
115-
} catch (RuntimeException e) {
116-
JavaUtils.closeQuietly(is);
117-
throw e;
118116
}
119117
}
120118

common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,14 @@ public TransportServer(
7070
this.appRpcHandler = appRpcHandler;
7171
this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
7272

73+
boolean shouldClose = true;
7374
try {
7475
init(hostToBind, portToBind);
75-
} catch (RuntimeException e) {
76-
JavaUtils.closeQuietly(this);
77-
throw e;
76+
shouldClose = false;
77+
} finally {
78+
if (shouldClose) {
79+
JavaUtils.closeQuietly(this);
80+
}
7881
}
7982
}
8083

core/src/main/scala/org/apache/spark/security/SocketAuthHelper.scala

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,43 +42,59 @@ private[spark] class SocketAuthHelper(conf: SparkConf) {
4242
* Read the auth secret from the socket and compare to the expected value. Write the reply back
4343
* to the socket.
4444
*
45-
* If authentication fails, this method will close the socket.
45+
* If authentication fails or error is thrown, this method will close the socket.
4646
*
4747
* @param s The client socket.
4848
* @throws IllegalArgumentException If authentication fails.
4949
*/
5050
def authClient(s: Socket): Unit = {
51-
// Set the socket timeout while checking the auth secret. Reset it before returning.
52-
val currentTimeout = s.getSoTimeout()
51+
var shouldClose = true
5352
try {
54-
s.setSoTimeout(10000)
55-
val clientSecret = readUtf8(s)
56-
if (secret == clientSecret) {
57-
writeUtf8("ok", s)
58-
} else {
59-
writeUtf8("err", s)
60-
JavaUtils.closeQuietly(s)
53+
// Set the socket timeout while checking the auth secret. Reset it before returning.
54+
val currentTimeout = s.getSoTimeout()
55+
try {
56+
s.setSoTimeout(10000)
57+
val clientSecret = readUtf8(s)
58+
if (secret == clientSecret) {
59+
writeUtf8("ok", s)
60+
shouldClose = false
61+
} else {
62+
writeUtf8("err", s)
63+
throw new IllegalArgumentException("Authentication failed.")
64+
}
65+
} finally {
66+
s.setSoTimeout(currentTimeout)
6167
}
6268
} finally {
63-
s.setSoTimeout(currentTimeout)
69+
if (shouldClose) {
70+
JavaUtils.closeQuietly(s)
71+
}
6472
}
6573
}
6674

6775
/**
6876
* Authenticate with a server by writing the auth secret and checking the server's reply.
6977
*
70-
* If authentication fails, this method will close the socket.
78+
* If authentication fails or error is thrown, this method will close the socket.
7179
*
7280
* @param s The socket connected to the server.
7381
* @throws IllegalArgumentException If authentication fails.
7482
*/
7583
def authToServer(s: Socket): Unit = {
76-
writeUtf8(secret, s)
84+
var shouldClose = true
85+
try {
86+
writeUtf8(secret, s)
7787

78-
val reply = readUtf8(s)
79-
if (reply != "ok") {
80-
JavaUtils.closeQuietly(s)
81-
throw new IllegalArgumentException("Authentication failed.")
88+
val reply = readUtf8(s)
89+
if (reply != "ok") {
90+
throw new IllegalArgumentException("Authentication failed.")
91+
} else {
92+
shouldClose = false
93+
}
94+
} finally {
95+
if (shouldClose) {
96+
JavaUtils.closeQuietly(s)
97+
}
8298
}
8399
}
84100

0 commit comments

Comments
 (0)