Skip to content

Commit beb717f

Browse files
Brad Kaisercloud-fan
authored andcommitted
[SPARK-22618][CORE] Catch exception in removeRDD to stop jobs from dying
## What changes were proposed in this pull request? I propose that BlockManagerMasterEndpoint.removeRdd() should catch and log any IOExceptions it receives. As it is now, the exception can bubble up to the main thread and kill user applications when called from RDD.unpersist(). I think this change is a better experience for the end user. I chose to catch the exception in BlockManagerMasterEndpoint.removeRdd() instead of RDD.unpersist() because this way the RDD.unpersist() blocking option will still work correctly. Otherwise, blocking will get short circuited by the first error. ## How was this patch tested? This patch was tested with a job that shows the job killing behavior mentioned above. rxin, it looks like you originally wrote this method, I would appreciate it if you took a look. Thanks. This contribution is my original work and is licensed under the project's open source license. Author: Brad Kaiser <[email protected]> Closes #19836 from brad-kaiser/catch-unpersist-exception.
1 parent 2be4482 commit beb717f

File tree

1 file changed

+11
-5
lines changed

1 file changed

+11
-5
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.storage
1919

20+
import java.io.IOException
2021
import java.util.{HashMap => JHashMap}
2122

2223
import scala.collection.JavaConverters._
@@ -159,11 +160,16 @@ class BlockManagerMasterEndpoint(
159160
// Ask the slaves to remove the RDD, and put the result in a sequence of Futures.
160161
// The dispatcher is used as an implicit argument into the Future sequence construction.
161162
val removeMsg = RemoveRdd(rddId)
162-
Future.sequence(
163-
blockManagerInfo.values.map { bm =>
164-
bm.slaveEndpoint.ask[Int](removeMsg)
165-
}.toSeq
166-
)
163+
164+
val futures = blockManagerInfo.values.map { bm =>
165+
bm.slaveEndpoint.ask[Int](removeMsg).recover {
166+
case e: IOException =>
167+
logWarning(s"Error trying to remove RDD $rddId", e)
168+
0 // zero blocks were removed
169+
}
170+
}.toSeq
171+
172+
Future.sequence(futures)
167173
}
168174

169175
private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {

0 commit comments

Comments
 (0)