Skip to content

Commit f740e07

Browse files
authored
DotnetBackend handles .NET process in a more robust way (#424)
1 parent 4dbfb34 commit f740e07

File tree

3 files changed

+102
-12
lines changed

3 files changed

+102
-12
lines changed

src/scala/microsoft-spark-2.3.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ object DotnetRunner extends Logging {
110110
if (initialized.tryAcquire(backendTimeout, TimeUnit.SECONDS)) {
111111
if (!runInDebugMode) {
112112
var returnCode = -1
113+
var process: Process = null
113114
try {
114115
val builder = new ProcessBuilder(processParameters)
115116
val env = builder.environment()
@@ -120,19 +121,21 @@ object DotnetRunner extends Logging {
120121
logInfo(s"Adding key=$key and value=$value to environment")
121122
}
122123
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
123-
val process = builder.start()
124+
process = builder.start()
124125

125126
// Redirect stdin of JVM process to stdin of .NET process.
126127
new RedirectThread(System.in, process.getOutputStream, "redirect JVM input").start()
127128
// Redirect stdout and stderr of .NET process.
128129
new RedirectThread(process.getInputStream, System.out, "redirect .NET stdout").start()
129130
new RedirectThread(process.getErrorStream, System.out, "redirect .NET stderr").start()
130131

131-
returnCode = process.waitFor()
132-
closeBackend(dotnetBackend)
132+
process.waitFor()
133133
} catch {
134134
case t: Throwable =>
135-
logError(s"${t.getMessage} \n ${t.getStackTrace}")
135+
logThrowable(t)
136+
} finally {
137+
returnCode = closeDotnetProcess(process)
138+
closeBackend(dotnetBackend)
136139
}
137140

138141
if (returnCode != 0) {
@@ -232,6 +235,30 @@ object DotnetRunner extends Logging {
232235
dotnetBackend.close()
233236
}
234237

238+
private def closeDotnetProcess(dotnetProcess: Process): Int = {
239+
if (dotnetProcess == null) {
240+
return -1
241+
} else if (!dotnetProcess.isAlive) {
242+
return dotnetProcess.exitValue()
243+
}
244+
245+
// Try to (gracefully on Linux) kill the process and resort to force if interrupted
246+
var returnCode = -1
247+
logInfo("Closing .NET process")
248+
try {
249+
dotnetProcess.destroy()
250+
returnCode = dotnetProcess.waitFor()
251+
} catch {
252+
case _: InterruptedException =>
253+
logInfo("Thread interrupted while waiting for graceful close. Forcefully closing .NET process")
254+
returnCode = dotnetProcess.destroyForcibly().waitFor()
255+
case t: Throwable =>
256+
logThrowable(t)
257+
}
258+
259+
returnCode
260+
}
261+
235262
private def initializeSettings(args: Array[String]): (Boolean, Int) = {
236263
val runInDebugMode = (args.length == 1 || args.length == 2) && args(0).equalsIgnoreCase(
237264
"debug")
@@ -246,4 +273,7 @@ object DotnetRunner extends Logging {
246273

247274
(runInDebugMode, portNumber)
248275
}
276+
277+
private def logThrowable(throwable: Throwable): Unit =
278+
logError(s"${throwable.getMessage} \n ${throwable.getStackTrace.mkString("\n")}")
249279
}

src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ object DotnetRunner extends Logging {
110110
if (initialized.tryAcquire(backendTimeout, TimeUnit.SECONDS)) {
111111
if (!runInDebugMode) {
112112
var returnCode = -1
113+
var process: Process = null
113114
try {
114115
val builder = new ProcessBuilder(processParameters)
115116
val env = builder.environment()
@@ -120,19 +121,21 @@ object DotnetRunner extends Logging {
120121
logInfo(s"Adding key=$key and value=$value to environment")
121122
}
122123
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
123-
val process = builder.start()
124+
process = builder.start()
124125

125126
// Redirect stdin of JVM process to stdin of .NET process.
126127
new RedirectThread(System.in, process.getOutputStream, "redirect JVM input").start()
127128
// Redirect stdout and stderr of .NET process.
128129
new RedirectThread(process.getInputStream, System.out, "redirect .NET stdout").start()
129130
new RedirectThread(process.getErrorStream, System.out, "redirect .NET stderr").start()
130131

131-
returnCode = process.waitFor()
132-
closeBackend(dotnetBackend)
132+
process.waitFor()
133133
} catch {
134134
case t: Throwable =>
135-
logError(s"${t.getMessage} \n ${t.getStackTrace}")
135+
logThrowable(t)
136+
} finally {
137+
returnCode = closeDotnetProcess(process)
138+
closeBackend(dotnetBackend)
136139
}
137140

138141
if (returnCode != 0) {
@@ -232,6 +235,30 @@ object DotnetRunner extends Logging {
232235
dotnetBackend.close()
233236
}
234237

238+
private def closeDotnetProcess(dotnetProcess: Process): Int = {
239+
if (dotnetProcess == null) {
240+
return -1
241+
} else if (!dotnetProcess.isAlive) {
242+
return dotnetProcess.exitValue()
243+
}
244+
245+
// Try to (gracefully on Linux) kill the process and resort to force if interrupted
246+
var returnCode = -1
247+
logInfo("Closing .NET process")
248+
try {
249+
dotnetProcess.destroy()
250+
returnCode = dotnetProcess.waitFor()
251+
} catch {
252+
case _: InterruptedException =>
253+
logInfo("Thread interrupted while waiting for graceful close. Forcefully closing .NET process")
254+
returnCode = dotnetProcess.destroyForcibly().waitFor()
255+
case t: Throwable =>
256+
logThrowable(t)
257+
}
258+
259+
returnCode
260+
}
261+
235262
private def initializeSettings(args: Array[String]): (Boolean, Int) = {
236263
val runInDebugMode = (args.length == 1 || args.length == 2) && args(0).equalsIgnoreCase(
237264
"debug")
@@ -246,4 +273,7 @@ object DotnetRunner extends Logging {
246273

247274
(runInDebugMode, portNumber)
248275
}
276+
277+
private def logThrowable(throwable: Throwable): Unit =
278+
logError(s"${throwable.getMessage} \n ${throwable.getStackTrace.mkString("\n")}")
249279
}

src/scala/microsoft-spark-3.0.x/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ object DotnetRunner extends Logging {
110110
if (initialized.tryAcquire(backendTimeout, TimeUnit.SECONDS)) {
111111
if (!runInDebugMode) {
112112
var returnCode = -1
113+
var process: Process = null
113114
try {
114115
val builder = new ProcessBuilder(processParameters)
115116
val env = builder.environment()
@@ -120,19 +121,21 @@ object DotnetRunner extends Logging {
120121
logInfo(s"Adding key=$key and value=$value to environment")
121122
}
122123
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
123-
val process = builder.start()
124+
process = builder.start()
124125

125126
// Redirect stdin of JVM process to stdin of .NET process.
126127
new RedirectThread(System.in, process.getOutputStream, "redirect JVM input").start()
127128
// Redirect stdout and stderr of .NET process.
128129
new RedirectThread(process.getInputStream, System.out, "redirect .NET stdout").start()
129130
new RedirectThread(process.getErrorStream, System.out, "redirect .NET stderr").start()
130131

131-
returnCode = process.waitFor()
132-
closeBackend(dotnetBackend)
132+
process.waitFor()
133133
} catch {
134134
case t: Throwable =>
135-
logError(s"${t.getMessage} \n ${t.getStackTrace}")
135+
logThrowable(t)
136+
} finally {
137+
returnCode = closeDotnetProcess(process)
138+
closeBackend(dotnetBackend)
136139
}
137140

138141
if (returnCode != 0) {
@@ -232,6 +235,30 @@ object DotnetRunner extends Logging {
232235
dotnetBackend.close()
233236
}
234237

238+
private def closeDotnetProcess(dotnetProcess: Process): Int = {
239+
if (dotnetProcess == null) {
240+
return -1
241+
} else if (!dotnetProcess.isAlive) {
242+
return dotnetProcess.exitValue()
243+
}
244+
245+
// Try to (gracefully on Linux) kill the process and resort to force if interrupted
246+
var returnCode = -1
247+
logInfo("Closing .NET process")
248+
try {
249+
dotnetProcess.destroy()
250+
returnCode = dotnetProcess.waitFor()
251+
} catch {
252+
case _: InterruptedException =>
253+
logInfo("Thread interrupted while waiting for graceful close. Forcefully closing .NET process")
254+
returnCode = dotnetProcess.destroyForcibly().waitFor()
255+
case t: Throwable =>
256+
logThrowable(t)
257+
}
258+
259+
returnCode
260+
}
261+
235262
private def initializeSettings(args: Array[String]): (Boolean, Int) = {
236263
val runInDebugMode = (args.length == 1 || args.length == 2) && args(0).equalsIgnoreCase(
237264
"debug")
@@ -246,4 +273,7 @@ object DotnetRunner extends Logging {
246273

247274
(runInDebugMode, portNumber)
248275
}
276+
277+
private def logThrowable(throwable: Throwable): Unit =
278+
logError(s"${throwable.getMessage} \n ${throwable.getStackTrace.mkString("\n")}")
249279
}

0 commit comments

Comments
 (0)