Skip to content

Commit cba826d

Browse files
author
Marcelo Vanzin
committed
[SPARK-17742][CORE] Handle child process exit in SparkLauncher.
Currently the launcher handle does not monitor the child spark-submit process it launches; this means that if the child exits with an error, the handle's state will never change, and an application will not know that the application has failed. This change adds code to monitor the child process, and changes the handle state appropriately when the child process exits. Tested with added unit tests. Author: Marcelo Vanzin <[email protected]> Closes apache#18877 from vanzin/SPARK-17742.
1 parent 14bdb25 commit cba826d

File tree

5 files changed

+99
-34
lines changed

5 files changed

+99
-34
lines changed

core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,11 @@ public void testChildProcLauncher() throws Exception {
116116
.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
117117
.addSparkArg(opts.CLASS, "ShouldBeOverriddenBelow")
118118
.setMainClass(SparkLauncherTestApp.class.getName())
119+
.redirectError()
119120
.addAppArgs("proc");
120121
final Process app = launcher.launch();
121122

122-
new OutputRedirector(app.getInputStream(), TF);
123-
new OutputRedirector(app.getErrorStream(), TF);
123+
new OutputRedirector(app.getInputStream(), getClass().getName() + ".child", TF);
124124
assertEquals(0, app.waitFor());
125125
}
126126

launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class ChildProcAppHandle implements SparkAppHandle {
3434
private final String secret;
3535
private final LauncherServer server;
3636

37-
private Process childProc;
37+
private volatile Process childProc;
3838
private boolean disposed;
3939
private LauncherConnection connection;
4040
private List<Listener> listeners;
@@ -96,18 +96,14 @@ public synchronized void disconnect() {
9696

9797
@Override
9898
public synchronized void kill() {
99-
if (!disposed) {
100-
disconnect();
101-
}
99+
disconnect();
102100
if (childProc != null) {
103-
try {
104-
childProc.exitValue();
105-
} catch (IllegalThreadStateException e) {
101+
if (childProc.isAlive()) {
106102
childProc.destroyForcibly();
107-
} finally {
108-
childProc = null;
109103
}
104+
childProc = null;
110105
}
106+
setState(State.KILLED);
111107
}
112108

113109
String getSecret() {
@@ -118,7 +114,13 @@ void setChildProc(Process childProc, String loggerName, InputStream logStream) {
118114
this.childProc = childProc;
119115
if (logStream != null) {
120116
this.redirector = new OutputRedirector(logStream, loggerName,
121-
SparkLauncher.REDIRECTOR_FACTORY);
117+
SparkLauncher.REDIRECTOR_FACTORY, this);
118+
} else {
119+
// If there is no log redirection, spawn a thread that will wait for the child process
120+
// to finish.
121+
Thread waiter = SparkLauncher.REDIRECTOR_FACTORY.newThread(this::monitorChild);
122+
waiter.setDaemon(true);
123+
waiter.start();
122124
}
123125
}
124126

@@ -134,7 +136,7 @@ LauncherConnection getConnection() {
134136
return connection;
135137
}
136138

137-
void setState(State s) {
139+
synchronized void setState(State s) {
138140
if (!state.isFinal()) {
139141
state = s;
140142
fireEvent(false);
@@ -144,17 +146,48 @@ void setState(State s) {
144146
}
145147
}
146148

147-
void setAppId(String appId) {
149+
synchronized void setAppId(String appId) {
148150
this.appId = appId;
149151
fireEvent(true);
150152
}
151153

152-
// Visible for testing.
153-
boolean isRunning() {
154-
return childProc == null || childProc.isAlive() || (redirector != null && redirector.isAlive());
154+
/**
155+
* Wait for the child process to exit and update the handle's state if necessary, accoding to
156+
* the exit code.
157+
*/
158+
void monitorChild() {
159+
while (childProc.isAlive()) {
160+
try {
161+
childProc.waitFor();
162+
} catch (Exception e) {
163+
LOG.log(Level.WARNING, "Exception waiting for child process to exit.", e);
164+
}
165+
}
166+
167+
synchronized (this) {
168+
if (disposed) {
169+
return;
170+
}
171+
172+
disconnect();
173+
174+
int ec;
175+
try {
176+
ec = childProc.exitValue();
177+
} catch (Exception e) {
178+
LOG.log(Level.WARNING, "Exception getting child process exit code, assuming failure.", e);
179+
ec = 1;
180+
}
181+
182+
// Only override the success state; leave other fail states alone.
183+
if (!state.isFinal() || (ec != 0 && state == State.FINISHED)) {
184+
state = State.LOST;
185+
fireEvent(false);
186+
}
187+
}
155188
}
156189

157-
private synchronized void fireEvent(boolean isInfoChanged) {
190+
private void fireEvent(boolean isInfoChanged) {
158191
if (listeners != null) {
159192
for (Listener l : listeners) {
160193
if (isInfoChanged) {

launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,24 @@ class OutputRedirector {
3434
private final BufferedReader reader;
3535
private final Logger sink;
3636
private final Thread thread;
37+
private final ChildProcAppHandle callback;
3738

3839
private volatile boolean active;
3940

40-
OutputRedirector(InputStream in, ThreadFactory tf) {
41-
this(in, OutputRedirector.class.getName(), tf);
41+
OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
42+
this(in, loggerName, tf, null);
4243
}
4344

44-
OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
45+
OutputRedirector(
46+
InputStream in,
47+
String loggerName,
48+
ThreadFactory tf,
49+
ChildProcAppHandle callback) {
4550
this.active = true;
4651
this.reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
4752
this.thread = tf.newThread(this::redirect);
4853
this.sink = Logger.getLogger(loggerName);
54+
this.callback = callback;
4955
thread.start();
5056
}
5157

@@ -59,6 +65,10 @@ private void redirect() {
5965
}
6066
} catch (IOException e) {
6167
sink.log(Level.FINE, "Error reading child process output.", e);
68+
} finally {
69+
if (callback != null) {
70+
callback.monitorChild();
71+
}
6272
}
6373
}
6474

launcher/src/test/java/org/apache/spark/launcher/OutputRedirectionSuite.java renamed to launcher/src/test/java/org/apache/spark/launcher/ChildProcAppHandleSuite.java

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Arrays;
2626
import java.util.EnumSet;
2727
import java.util.List;
28+
import java.util.concurrent.TimeUnit;
2829
import java.util.stream.Collectors;
2930
import static java.nio.file.attribute.PosixFilePermission.*;
3031

@@ -39,7 +40,7 @@
3940

4041
import static org.apache.spark.launcher.CommandBuilderUtils.*;
4142

42-
public class OutputRedirectionSuite extends BaseSuite {
43+
public class ChildProcAppHandleSuite extends BaseSuite {
4344

4445
private static final List<String> MESSAGES = new ArrayList<>();
4546

@@ -99,7 +100,8 @@ public void testRedirectLastWins() throws Exception {
99100
public void testRedirectToLog() throws Exception {
100101
assumeFalse(isWindows());
101102

102-
ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher().startApplication();
103+
SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
104+
.startApplication();
103105
waitFor(handle);
104106

105107
assertTrue(MESSAGES.contains("output"));
@@ -112,7 +114,7 @@ public void testRedirectErrorToLog() throws Exception {
112114

113115
Path err = Files.createTempFile("stderr", "txt");
114116

115-
ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
117+
SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
116118
.redirectError(err.toFile())
117119
.startApplication();
118120
waitFor(handle);
@@ -127,7 +129,7 @@ public void testRedirectOutputToLog() throws Exception {
127129

128130
Path out = Files.createTempFile("stdout", "txt");
129131

130-
ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
132+
SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
131133
.redirectOutput(out.toFile())
132134
.startApplication();
133135
waitFor(handle);
@@ -173,17 +175,37 @@ public void testRedirectErrorTwiceFails() throws Exception {
173175
.waitFor();
174176
}
175177

176-
private void waitFor(ChildProcAppHandle handle) throws Exception {
178+
@Test
179+
public void testProcMonitorWithOutputRedirection() throws Exception {
180+
File err = Files.createTempFile("out", "txt").toFile();
181+
SparkAppHandle handle = new TestSparkLauncher()
182+
.redirectError()
183+
.redirectOutput(err)
184+
.startApplication();
185+
waitFor(handle);
186+
assertEquals(SparkAppHandle.State.LOST, handle.getState());
187+
}
188+
189+
@Test
190+
public void testProcMonitorWithLogRedirection() throws Exception {
191+
SparkAppHandle handle = new TestSparkLauncher()
192+
.redirectToLog(getClass().getName())
193+
.startApplication();
194+
waitFor(handle);
195+
assertEquals(SparkAppHandle.State.LOST, handle.getState());
196+
}
197+
198+
private void waitFor(SparkAppHandle handle) throws Exception {
199+
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
177200
try {
178-
while (handle.isRunning()) {
179-
Thread.sleep(10);
201+
while (!handle.getState().isFinal()) {
202+
assertTrue("Timed out waiting for handle to transition to final state.",
203+
System.nanoTime() < deadline);
204+
TimeUnit.MILLISECONDS.sleep(10);
180205
}
181206
} finally {
182-
// Explicit unregister from server since the handle doesn't yet do that when the
183-
// process finishes by itself.
184-
LauncherServer server = LauncherServer.getServerInstance();
185-
if (server != null) {
186-
server.unregister(handle);
207+
if (!handle.getState().isFinal()) {
208+
handle.kill();
187209
}
188210
}
189211
}

launcher/src/test/resources/log4j.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ log4j.appender.childproc.target=System.err
2929
log4j.appender.childproc.layout=org.apache.log4j.PatternLayout
3030
log4j.appender.childproc.layout.ConversionPattern=%t: %m%n
3131

32-
log4j.appender.outputredirtest=org.apache.spark.launcher.OutputRedirectionSuite$LogAppender
32+
log4j.appender.outputredirtest=org.apache.spark.launcher.ChildProcAppHandleSuite$LogAppender
3333
log4j.logger.org.apache.spark.launcher.app.outputredirtest=INFO, outputredirtest
3434
log4j.logger.org.apache.spark.launcher.app.outputredirtest.additivity=false
3535

0 commit comments

Comments
 (0)