Skip to content

Commit f477a24

Browse files
author
Justin Uang
committed
Propagate all local properties
1 parent 563706e commit f477a24

File tree

1 file changed

+16
-5
lines changed

1 file changed

+16
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStage.scala

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717

1818
package org.apache.spark.sql.execution.adaptive
1919

20+
import java.util.Properties
21+
2022
import scala.concurrent.{ExecutionContext, Future}
2123
import scala.concurrent.duration.Duration
2224

23-
import org.apache.spark.MapOutputStatistics
24-
import org.apache.spark.broadcast
25+
import org.apache.spark.{broadcast, MapOutputStatistics, SparkContext}
2526
import org.apache.spark.rdd.RDD
2627
import org.apache.spark.sql.catalyst.InternalRow
2728
import org.apache.spark.sql.catalyst.expressions._
@@ -48,20 +49,30 @@ abstract class QueryStage extends UnaryExecNode {
4849

4950
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
5051

52+
def withLocalProperties[T](sc: SparkContext, properties: Properties)(body: => T): T = {
53+
val oldProperties = sc.getLocalProperties
54+
try {
55+
sc.setLocalProperties(properties)
56+
body
57+
} finally {
58+
sc.setLocalProperties(oldProperties)
59+
}
60+
}
61+
5162
/**
5263
* Execute childStages and wait until all stages are completed. Use a thread pool to avoid
5364
* blocking on one child stage.
5465
*/
5566
def executeChildStages(): Unit = {
56-
val executionId = sqlContext.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
67+
val localProperties = sqlContext.sparkContext.getLocalProperties
5768

5869
// Handle broadcast stages
5970
val broadcastQueryStages: Seq[BroadcastQueryStage] = child.collect {
6071
case bqs: BroadcastQueryStageInput => bqs.childStage
6172
}
6273
val broadcastFutures = broadcastQueryStages.map { queryStage =>
6374
Future {
64-
SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) {
75+
withLocalProperties(sqlContext.sparkContext, localProperties) {
6576
queryStage.prepareBroadcast()
6677
}
6778
}(QueryStage.executionContext)
@@ -73,7 +84,7 @@ abstract class QueryStage extends UnaryExecNode {
7384
}
7485
val shuffleStageFutures = shuffleQueryStages.map { queryStage =>
7586
Future {
76-
SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) {
87+
withLocalProperties(sqlContext.sparkContext, localProperties) {
7788
queryStage.execute()
7889
}
7990
}(QueryStage.executionContext)

0 commit comments

Comments
 (0)