Skip to content

Commit 0f51590

Browse files
authored
Capture dotnet application error stack trace (#1047)
1 parent b2fa350 commit 0f51590

File tree

12 files changed

+252
-28
lines changed

12 files changed

+252
-28
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the .NET Foundation under one or more agreements.
3+
* The .NET Foundation licenses this file to you under the MIT license.
4+
* See the LICENSE file in the project root for more information.
5+
*/
6+
7+
package org.apache.spark.deploy.dotnet
8+
9+
import org.apache.spark.SparkException
10+
11+
/**
12+
* This exception type describes an exception thrown by a .NET user application.
13+
*
14+
* @param exitCode Exit code returned by the .NET application.
15+
* @param dotNetStackTrace Stacktrace extracted from .NET application logs.
16+
*/
17+
private[spark] class DotNetUserAppException(exitCode: Int, dotNetStackTrace: Option[String])
18+
extends SparkException(
19+
dotNetStackTrace match {
20+
case None => s"User application exited with $exitCode"
21+
case Some(e) => s"User application exited with $exitCode and .NET exception: $e"
22+
})

src/scala/microsoft-spark-2-4/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,18 @@ import java.util.Locale
1414
import java.util.concurrent.{Semaphore, TimeUnit}
1515

1616
import org.apache.commons.io.FilenameUtils
17+
import org.apache.commons.io.output.TeeOutputStream
1718
import org.apache.hadoop.fs.Path
1819
import org.apache.spark
1920
import org.apache.spark.api.dotnet.DotnetBackend
2021
import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
2122
import org.apache.spark.internal.Logging
22-
import org.apache.spark.internal.config.dotnet.Dotnet.DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK
23+
import org.apache.spark.internal.config.dotnet.Dotnet.{
24+
DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK,
25+
ERROR_BUFFER_SIZE, ERROR_REDIRECITON_ENABLED
26+
}
2327
import org.apache.spark.util.dotnet.{Utils => DotnetUtils}
24-
import org.apache.spark.util.{RedirectThread, Utils}
28+
import org.apache.spark.util.{CircularBuffer, RedirectThread, Utils}
2529
import org.apache.spark.{SecurityManager, SparkConf, SparkUserAppException}
2630

2731
import scala.collection.JavaConverters._
@@ -123,6 +127,17 @@ object DotnetRunner extends Logging {
123127
if (!runInDebugMode) {
124128
var returnCode = -1
125129
var process: Process = null
130+
val enableLogRedirection: Boolean = sys.props
131+
.getOrElse(
132+
ERROR_REDIRECITON_ENABLED.key,
133+
ERROR_REDIRECITON_ENABLED.defaultValue.get.toString).toBoolean
134+
val stderrBuffer: Option[CircularBuffer] = Option(enableLogRedirection).collect {
135+
case true => new CircularBuffer(
136+
sys.props.getOrElse(
137+
ERROR_BUFFER_SIZE.key,
138+
ERROR_BUFFER_SIZE.defaultValue.get.toString).toInt)
139+
}
140+
126141
try {
127142
val builder = new ProcessBuilder(processParameters)
128143
val env = builder.environment()
@@ -137,9 +152,15 @@ object DotnetRunner extends Logging {
137152

138153
// Redirect stdin of JVM process to stdin of .NET process.
139154
new RedirectThread(System.in, process.getOutputStream, "redirect JVM input").start()
140-
// Redirect stdout and stderr of .NET process.
141-
new RedirectThread(process.getInputStream, System.out, "redirect .NET stdout").start()
142-
new RedirectThread(process.getErrorStream, System.out, "redirect .NET stderr").start()
155+
// Redirect stdout and stderr of .NET process to System.out and to buffer
156+
// if log direction is enabled. If not, redirect only to System.out.
157+
new RedirectThread(
158+
process.getInputStream,
159+
stderrBuffer match {
160+
case Some(buffer) => new TeeOutputStream(System.out, buffer)
161+
case _ => System.out
162+
},
163+
"redirect .NET stdout and stderr").start()
143164

144165
process.waitFor()
145166
} catch {
@@ -149,9 +170,12 @@ object DotnetRunner extends Logging {
149170
returnCode = closeDotnetProcess(process)
150171
closeBackend(dotnetBackend)
151172
}
152-
153173
if (returnCode != 0) {
154-
throw new SparkUserAppException(returnCode)
174+
if (stderrBuffer.isDefined) {
175+
throw new DotNetUserAppException(returnCode, Some(stderrBuffer.get.toString))
176+
} else {
177+
throw new SparkUserAppException(returnCode)
178+
}
155179
} else {
156180
logInfo(s".NET application exited successfully")
157181
}

src/scala/microsoft-spark-2-4/src/main/scala/org/apache/spark/internal/config/dotnet/Dotnet.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,14 @@ private[spark] object Dotnet {
1515
val DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK =
1616
ConfigBuilder("spark.dotnet.ignoreSparkPatchVersionCheck").booleanConf
1717
.createWithDefault(false)
18+
19+
val ERROR_REDIRECITON_ENABLED =
20+
ConfigBuilder("spark.nonjvm.error.forwarding.enabled").booleanConf
21+
.createWithDefault(false)
22+
23+
val ERROR_BUFFER_SIZE =
24+
ConfigBuilder("spark.nonjvm.error.buffer.size")
25+
.intConf
26+
.checkValue(_ >= 0, "The error buffer size must not be negative")
27+
.createWithDefault(10240)
1828
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the .NET Foundation under one or more agreements.
3+
* The .NET Foundation licenses this file to you under the MIT license.
4+
* See the LICENSE file in the project root for more information.
5+
*/
6+
7+
package org.apache.spark.deploy.dotnet
8+
9+
import org.apache.spark.SparkException
10+
11+
/**
12+
* This exception type describes an exception thrown by a .NET user application.
13+
*
14+
* @param exitCode Exit code returned by the .NET application.
15+
* @param dotNetStackTrace Stacktrace extracted from .NET application logs.
16+
*/
17+
private[spark] class DotNetUserAppException(exitCode: Int, dotNetStackTrace: Option[String])
18+
extends SparkException(
19+
dotNetStackTrace match {
20+
case None => s"User application exited with $exitCode"
21+
case Some(e) => s"User application exited with $exitCode and .NET exception: $e"
22+
})

src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,18 @@ import java.util.Locale
1414
import java.util.concurrent.{Semaphore, TimeUnit}
1515

1616
import org.apache.commons.io.FilenameUtils
17+
import org.apache.commons.io.output.TeeOutputStream
1718
import org.apache.hadoop.fs.Path
1819
import org.apache.spark
1920
import org.apache.spark.api.dotnet.DotnetBackend
2021
import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
2122
import org.apache.spark.internal.Logging
22-
import org.apache.spark.internal.config.dotnet.Dotnet.DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK
23+
import org.apache.spark.internal.config.dotnet.Dotnet.{
24+
DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK,
25+
ERROR_BUFFER_SIZE, ERROR_REDIRECITON_ENABLED
26+
}
2327
import org.apache.spark.util.dotnet.{Utils => DotnetUtils}
24-
import org.apache.spark.util.{RedirectThread, Utils}
28+
import org.apache.spark.util.{CircularBuffer, RedirectThread, Utils}
2529
import org.apache.spark.{SecurityManager, SparkConf, SparkUserAppException}
2630

2731
import scala.collection.JavaConverters._
@@ -122,6 +126,17 @@ object DotnetRunner extends Logging {
122126
if (!runInDebugMode) {
123127
var returnCode = -1
124128
var process: Process = null
129+
val enableLogRedirection: Boolean = sys.props
130+
.getOrElse(
131+
ERROR_REDIRECITON_ENABLED.key,
132+
ERROR_REDIRECITON_ENABLED.defaultValue.get.toString).toBoolean
133+
val stderrBuffer: Option[CircularBuffer] = Option(enableLogRedirection).collect {
134+
case true => new CircularBuffer(
135+
sys.props.getOrElse(
136+
ERROR_BUFFER_SIZE.key,
137+
ERROR_BUFFER_SIZE.defaultValue.get.toString).toInt)
138+
}
139+
125140
try {
126141
val builder = new ProcessBuilder(processParameters)
127142
val env = builder.environment()
@@ -136,9 +151,15 @@ object DotnetRunner extends Logging {
136151

137152
// Redirect stdin of JVM process to stdin of .NET process.
138153
new RedirectThread(System.in, process.getOutputStream, "redirect JVM input").start()
139-
// Redirect stdout and stderr of .NET process.
140-
new RedirectThread(process.getInputStream, System.out, "redirect .NET stdout").start()
141-
new RedirectThread(process.getErrorStream, System.out, "redirect .NET stderr").start()
154+
// Redirect stdout and stderr of .NET process to System.out and to buffer
155+
// if log direction is enabled. If not, redirect only to System.out.
156+
new RedirectThread(
157+
process.getInputStream,
158+
stderrBuffer match {
159+
case Some(buffer) => new TeeOutputStream(System.out, buffer)
160+
case _ => System.out
161+
},
162+
"redirect .NET stdout and stderr").start()
142163

143164
process.waitFor()
144165
} catch {
@@ -148,9 +169,12 @@ object DotnetRunner extends Logging {
148169
returnCode = closeDotnetProcess(process)
149170
closeBackend(dotnetBackend)
150171
}
151-
152172
if (returnCode != 0) {
153-
throw new SparkUserAppException(returnCode)
173+
if (stderrBuffer.isDefined) {
174+
throw new DotNetUserAppException(returnCode, Some(stderrBuffer.get.toString))
175+
} else {
176+
throw new SparkUserAppException(returnCode)
177+
}
154178
} else {
155179
logInfo(s".NET application exited successfully")
156180
}

src/scala/microsoft-spark-3-0/src/main/scala/org/apache/spark/internal/config/dotnet/Dotnet.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,14 @@ private[spark] object Dotnet {
1515
val DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK =
1616
ConfigBuilder("spark.dotnet.ignoreSparkPatchVersionCheck").booleanConf
1717
.createWithDefault(false)
18+
19+
val ERROR_REDIRECITON_ENABLED =
20+
ConfigBuilder("spark.nonjvm.error.forwarding.enabled").booleanConf
21+
.createWithDefault(false)
22+
23+
val ERROR_BUFFER_SIZE =
24+
ConfigBuilder("spark.nonjvm.error.buffer.size")
25+
.intConf
26+
.checkValue(_ >= 0, "The error buffer size must not be negative")
27+
.createWithDefault(10240)
1828
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the .NET Foundation under one or more agreements.
3+
* The .NET Foundation licenses this file to you under the MIT license.
4+
* See the LICENSE file in the project root for more information.
5+
*/
6+
7+
package org.apache.spark.deploy.dotnet
8+
9+
import org.apache.spark.SparkException
10+
11+
/**
12+
* This exception type describes an exception thrown by a .NET user application.
13+
*
14+
* @param exitCode Exit code returned by the .NET application.
15+
* @param dotNetStackTrace Stacktrace extracted from .NET application logs.
16+
*/
17+
private[spark] class DotNetUserAppException(exitCode: Int, dotNetStackTrace: Option[String])
18+
extends SparkException(
19+
dotNetStackTrace match {
20+
case None => s"User application exited with $exitCode"
21+
case Some(e) => s"User application exited with $exitCode and .NET exception: $e"
22+
})

src/scala/microsoft-spark-3-1/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,18 @@ import java.util.Locale
1414
import java.util.concurrent.{Semaphore, TimeUnit}
1515

1616
import org.apache.commons.io.FilenameUtils
17+
import org.apache.commons.io.output.TeeOutputStream
1718
import org.apache.hadoop.fs.Path
1819
import org.apache.spark
1920
import org.apache.spark.api.dotnet.DotnetBackend
2021
import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
2122
import org.apache.spark.internal.Logging
22-
import org.apache.spark.internal.config.dotnet.Dotnet.DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK
23+
import org.apache.spark.internal.config.dotnet.Dotnet.{
24+
DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK,
25+
ERROR_BUFFER_SIZE, ERROR_REDIRECITON_ENABLED
26+
}
2327
import org.apache.spark.util.dotnet.{Utils => DotnetUtils}
24-
import org.apache.spark.util.{RedirectThread, Utils}
28+
import org.apache.spark.util.{CircularBuffer, RedirectThread, Utils}
2529
import org.apache.spark.{SecurityManager, SparkConf, SparkUserAppException}
2630

2731
import scala.collection.JavaConverters._
@@ -122,6 +126,17 @@ object DotnetRunner extends Logging {
122126
if (!runInDebugMode) {
123127
var returnCode = -1
124128
var process: Process = null
129+
val enableLogRedirection: Boolean = sys.props
130+
.getOrElse(
131+
ERROR_REDIRECITON_ENABLED.key,
132+
ERROR_REDIRECITON_ENABLED.defaultValue.get.toString).toBoolean
133+
val stderrBuffer: Option[CircularBuffer] = Option(enableLogRedirection).collect {
134+
case true => new CircularBuffer(
135+
sys.props.getOrElse(
136+
ERROR_BUFFER_SIZE.key,
137+
ERROR_BUFFER_SIZE.defaultValue.get.toString).toInt)
138+
}
139+
125140
try {
126141
val builder = new ProcessBuilder(processParameters)
127142
val env = builder.environment()
@@ -136,9 +151,15 @@ object DotnetRunner extends Logging {
136151

137152
// Redirect stdin of JVM process to stdin of .NET process.
138153
new RedirectThread(System.in, process.getOutputStream, "redirect JVM input").start()
139-
// Redirect stdout and stderr of .NET process.
140-
new RedirectThread(process.getInputStream, System.out, "redirect .NET stdout").start()
141-
new RedirectThread(process.getErrorStream, System.out, "redirect .NET stderr").start()
154+
// Redirect stdout and stderr of .NET process to System.out and to buffer
155+
// if log direction is enabled. If not, redirect only to System.out.
156+
new RedirectThread(
157+
process.getInputStream,
158+
stderrBuffer match {
159+
case Some(buffer) => new TeeOutputStream(System.out, buffer)
160+
case _ => System.out
161+
},
162+
"redirect .NET stdout and stderr").start()
142163

143164
process.waitFor()
144165
} catch {
@@ -148,9 +169,12 @@ object DotnetRunner extends Logging {
148169
returnCode = closeDotnetProcess(process)
149170
closeBackend(dotnetBackend)
150171
}
151-
152172
if (returnCode != 0) {
153-
throw new SparkUserAppException(returnCode)
173+
if (stderrBuffer.isDefined) {
174+
throw new DotNetUserAppException(returnCode, Some(stderrBuffer.get.toString))
175+
} else {
176+
throw new SparkUserAppException(returnCode)
177+
}
154178
} else {
155179
logInfo(s".NET application exited successfully")
156180
}

src/scala/microsoft-spark-3-1/src/main/scala/org/apache/spark/internal/config/dotnet/Dotnet.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,14 @@ private[spark] object Dotnet {
1515
val DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK =
1616
ConfigBuilder("spark.dotnet.ignoreSparkPatchVersionCheck").booleanConf
1717
.createWithDefault(false)
18+
19+
val ERROR_REDIRECITON_ENABLED =
20+
ConfigBuilder("spark.nonjvm.error.forwarding.enabled").booleanConf
21+
.createWithDefault(false)
22+
23+
val ERROR_BUFFER_SIZE =
24+
ConfigBuilder("spark.nonjvm.error.buffer.size")
25+
.intConf
26+
.checkValue(_ >= 0, "The error buffer size must not be negative")
27+
.createWithDefault(10240)
1828
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the .NET Foundation under one or more agreements.
3+
* The .NET Foundation licenses this file to you under the MIT license.
4+
* See the LICENSE file in the project root for more information.
5+
*/
6+
7+
package org.apache.spark.deploy.dotnet
8+
9+
import org.apache.spark.SparkException
10+
11+
/**
12+
* This exception type describes an exception thrown by a .NET user application.
13+
*
14+
* @param exitCode Exit code returned by the .NET application.
15+
* @param dotNetStackTrace Stacktrace extracted from .NET application logs.
16+
*/
17+
private[spark] class DotNetUserAppException(exitCode: Int, dotNetStackTrace: Option[String])
18+
extends SparkException(
19+
dotNetStackTrace match {
20+
case None => s"User application exited with $exitCode"
21+
case Some(e) => s"User application exited with $exitCode and .NET exception: $e"
22+
})

0 commit comments

Comments
 (0)