Skip to content

Commit 1732178

Browse files
committed
[SPARK-26923][R][SQL][FOLLOW-UP] Show stderr in the exception whenever possible in RRunner
### What changes were proposed in this pull request? This is a followup of apache#23977 I made a mistake related to this line: apache@3725b13#diff-71c2cad03f08cb5f6c70462aa4e28d3aL112 Previously, 1. the reader iterator for R worker read some initial data eagerly during RDD materialization. So it read the data before actual execution. For some reasons, in this case, it showed standard error from R worker. 2. After that, when error happens during actual execution, stderr wasn't shown: apache@3725b13#diff-71c2cad03f08cb5f6c70462aa4e28d3aL260 After my change apache@3725b13#diff-71c2cad03f08cb5f6c70462aa4e28d3aL112, it now ignores 1. case and only does 2. of previous code path, because 1. does not happen anymore as I avoided to such eager execution (which is consistent with PySpark code path). This PR proposes to do only 1. before/after execution always because It is pretty much possible R worker was failed during actual execution and it's best to show the stderr from R worker whenever possible. ### Why are the changes needed? It currently swallows standard error from R worker which makes debugging harder. ### Does this PR introduce any user-facing change? Yes, ```R df <- createDataFrame(list(list(n=1))) collect(dapply(df, function(x) { stop("asdkjasdjkbadskjbsdajbk") x }, structType("a double"))) ``` **Before:** ``` Error in handleErrors(returnStatus, conn) : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 13.0 failed 1 times, most recent failure: Lost task 0.0 in stage 13.0 (TID 13, 192.168.35.193, executor driver): org.apache.spark.SparkException: R worker exited unexpectedly (cranshed) at org.apache.spark.api.r.RRunner$$anon$1.read(RRunner.scala:130) at org.apache.spark.api.r.BaseRRunner$ReaderIterator.hasNext(BaseRRunner.scala:118) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:726) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:337) at org.apache.spark. ``` **After:** ``` Error in handleErrors(returnStatus, conn) : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, 192.168.35.193, executor driver): org.apache.spark.SparkException: R unexpectedly exited. R worker produced errors: Error in computeFunc(inputData) : asdkjasdjkbadskjbsdajbk at org.apache.spark.api.r.BaseRRunner$ReaderIterator$$anonfun$1.applyOrElse(BaseRRunner.scala:144) at org.apache.spark.api.r.BaseRRunner$ReaderIterator$$anonfun$1.applyOrElse(BaseRRunner.scala:137) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38) at org.apache.spark.api.r.RRunner$$anon$1.read(RRunner.scala:128) at org.apache.spark.api.r.BaseRRunner$ReaderIterator.hasNext(BaseRRunner.scala:113) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegen ``` ### How was this patch tested? Manually tested and unittest was added. Closes apache#26517 from HyukjinKwon/SPARK-26923-followup. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent ab981f1 commit 1732178

File tree

4 files changed

+20
-15
lines changed

4 files changed

+20
-15
lines changed

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3238,6 +3238,13 @@ test_that("Histogram", {
32383238
expect_equal(histogram(df, "x")$counts, c(4, 0, 0, 0, 0, 0, 0, 0, 0, 1))
32393239
})
32403240

3241+
test_that("dapply() should show error message from R worker", {
3242+
df <- createDataFrame(list(list(n = 1)))
3243+
expect_error({
3244+
collect(dapply(df, function(x) stop("custom error message"), structType("a double")))
3245+
}, "custom error message")
3246+
})
3247+
32413248
test_that("dapply() and dapplyCollect() on a DataFrame", {
32423249
df <- createDataFrame(
32433250
list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")),

core/src/main/scala/org/apache/spark/api/r/BaseRRunner.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,7 @@ private[spark] abstract class BaseRRunner[IN, OUT](
8282
serverSocket.close()
8383
}
8484

85-
try {
86-
newReaderIterator(dataStream, errThread)
87-
} catch {
88-
case e: Exception =>
89-
throw new SparkException("R computation failed with\n " + errThread.getLines(), e)
90-
}
85+
newReaderIterator(dataStream, errThread)
9186
}
9287

9388
/**
@@ -138,6 +133,16 @@ private[spark] abstract class BaseRRunner[IN, OUT](
138133
* and then returns null.
139134
*/
140135
protected def read(): OUT
136+
137+
protected val handleException: PartialFunction[Throwable, OUT] = {
138+
case e: Exception =>
139+
var msg = "R unexpectedly exited."
140+
val lines = errThread.getLines()
141+
if (lines.trim().nonEmpty) {
142+
msg += s"\nR worker produced errors: $lines\n"
143+
}
144+
throw new SparkException(msg, e)
145+
}
141146
}
142147

143148
/**

core/src/main/scala/org/apache/spark/api/r/RRunner.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,7 @@ private[spark] class RRunner[IN, OUT](
125125
eos = true
126126
null.asInstanceOf[OUT]
127127
}
128-
} catch {
129-
case eof: EOFException =>
130-
throw new SparkException("R worker exited unexpectedly (cranshed)", eof)
131-
}
128+
} catch handleException
132129
}
133130
}
134131
}

sql/core/src/main/scala/org/apache/spark/sql/execution/r/ArrowRRunner.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -191,11 +191,7 @@ class ArrowRRunner(
191191
null
192192
}
193193
}
194-
} catch {
195-
case eof: EOFException =>
196-
throw new SparkException(
197-
"R worker exited unexpectedly (crashed)\n " + errThread.getLines(), eof)
198-
}
194+
} catch handleException
199195
}
200196
}
201197
}

0 commit comments

Comments
 (0)