Skip to content

Commit 641aec6

Browse files
Thomas Gravescloud-fan
authored andcommitted
[SPARK-23806] Broadcast.unpersist can cause fatal exception when used…
… with dynamic allocation ## What changes were proposed in this pull request? ignore errors when you are waiting for a broadcast.unpersist. This is handling it the same way as doing rdd.unpersist in https://issues.apache.org/jira/browse/SPARK-22618 ## How was this patch tested? Patch was tested manually against a couple jobs that exhibit this behavior, with the change the application no longer dies due to this and just prints the warning. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Thomas Graves <[email protected]> Closes apache#20924 from tgravescs/SPARK-23806.
1 parent ea2fdc0 commit 641aec6

File tree

1 file changed

+9
-5
lines changed

1 file changed

+9
-5
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -192,11 +192,15 @@ class BlockManagerMasterEndpoint(
192192
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
193193
removeFromDriver || !info.blockManagerId.isDriver
194194
}
195-
Future.sequence(
196-
requiredBlockManagers.map { bm =>
197-
bm.slaveEndpoint.ask[Int](removeMsg)
198-
}.toSeq
199-
)
195+
val futures = requiredBlockManagers.map { bm =>
196+
bm.slaveEndpoint.ask[Int](removeMsg).recover {
197+
case e: IOException =>
198+
logWarning(s"Error trying to remove broadcast $broadcastId", e)
199+
0 // zero blocks were removed
200+
}
201+
}.toSeq
202+
203+
Future.sequence(futures)
200204
}
201205

202206
private def removeBlockManager(blockManagerId: BlockManagerId) {

0 commit comments

Comments
 (0)