Skip to content

Commit 491ec11

Browse files
Sahil TakiarMarcelo Vanzin
authored andcommitted
[SPARK-23785][LAUNCHER] LauncherBackend doesn't check state of connection before setting state
## What changes were proposed in this pull request? Changed `LauncherBackend` `set` method so that it checks if the connection is open or not before writing to it (uses `isConnected`). ## How was this patch tested? None Author: Sahil Takiar <[email protected]> Closes apache#20893 from sahilTakiar/master.
1 parent 505480c commit 491ec11

File tree

2 files changed

+23
-3
lines changed

2 files changed

+23
-3
lines changed

core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,13 @@ private[spark] abstract class LauncherBackend {
6767
}
6868

6969
def setAppId(appId: String): Unit = {
70-
if (connection != null) {
70+
if (connection != null && isConnected) {
7171
connection.send(new SetAppId(appId))
7272
}
7373
}
7474

7575
def setState(state: SparkAppHandle.State): Unit = {
76-
if (connection != null && lastState != state) {
76+
if (connection != null && isConnected && lastState != state) {
7777
connection.send(new SetState(state))
7878
lastState = state
7979
}
@@ -114,10 +114,10 @@ private[spark] abstract class LauncherBackend {
114114

115115
override def close(): Unit = {
116116
try {
117+
_isConnected = false
117118
super.close()
118119
} finally {
119120
onDisconnected()
120-
_isConnected = false
121121
}
122122
}
123123

launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,26 @@ public void testStreamFiltering() throws Exception {
185185
}
186186
}
187187

188+
@Test
189+
public void testAppHandleDisconnect() throws Exception {
190+
LauncherServer server = LauncherServer.getOrCreateServer();
191+
ChildProcAppHandle handle = new ChildProcAppHandle(server);
192+
String secret = server.registerHandle(handle);
193+
194+
TestClient client = null;
195+
try {
196+
Socket s = new Socket(InetAddress.getLoopbackAddress(), server.getPort());
197+
client = new TestClient(s);
198+
client.send(new Hello(secret, "1.4.0"));
199+
handle.disconnect();
200+
waitForError(client, secret);
201+
} finally {
202+
handle.kill();
203+
close(client);
204+
client.clientThread.join();
205+
}
206+
}
207+
188208
private void close(Closeable c) {
189209
if (c != null) {
190210
try {

0 commit comments

Comments
 (0)