Skip to content

Commit f3b7dee

Browse files
carsonwangJustin Uang
authored andcommitted
set correct execution Id for broadcast query stage (apache-spark-on-k8s#50)
1 parent fff789a commit f3b7dee

File tree

1 file changed

+7
-2
lines changed

1 file changed

+7
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStage.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,21 @@ abstract class QueryStage extends UnaryExecNode {
5353
* blocking on one child stage.
5454
*/
5555
def executeChildStages(): Unit = {
56+
val executionId = sqlContext.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
57+
5658
// Handle broadcast stages
5759
val broadcastQueryStages: Seq[BroadcastQueryStage] = child.collect {
5860
case bqs: BroadcastQueryStageInput => bqs.childStage
5961
}
6062
val broadcastFutures = broadcastQueryStages.map { queryStage =>
61-
Future { queryStage.prepareBroadcast() }(QueryStage.executionContext)
63+
Future {
64+
SQLExecution.withExecutionId(sqlContext.sparkContext, executionId) {
65+
queryStage.prepareBroadcast()
66+
}
67+
}(QueryStage.executionContext)
6268
}
6369

6470
// Submit shuffle stages
65-
val executionId = sqlContext.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
6671
val shuffleQueryStages: Seq[ShuffleQueryStage] = child.collect {
6772
case sqs: ShuffleQueryStageInput => sqs.childStage
6873
}

0 commit comments

Comments
 (0)