Skip to content

Commit 796d09d

Browse files
authored
Merge pull request apache-spark-on-k8s#400 from palantir/cherry-pick-spark-23243
Merge from upstream
2 parents 50e3640 + c33e1fa commit 796d09d

File tree

101 files changed

+1847
-1371
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

101 files changed

+1847
-1371
lines changed

R/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ export R_HOME=/home/username/R
1717

1818
#### Build Spark
1919

20-
Build Spark with [Maven](http://spark.apache.org/docs/latest/building-spark.html#building-with-buildmvn) and include the `-Psparkr` profile to build the R package. For example to use the default Hadoop versions you can run
20+
Build Spark with [Maven](http://spark.apache.org/docs/latest/building-spark.html#buildmvn) and include the `-Psparkr` profile to build the R package. For example to use the default Hadoop versions you can run
2121

2222
```bash
2323
build/mvn -DskipTests -Psparkr package

R/WINDOWS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ directory in Maven in `PATH`.
1414

1515
4. Set `MAVEN_OPTS` as described in [Building Spark](http://spark.apache.org/docs/latest/building-spark.html).
1616

17-
5. Open a command shell (`cmd`) in the Spark directory and build Spark with [Maven](http://spark.apache.org/docs/latest/building-spark.html#building-with-buildmvn) and include the `-Psparkr` profile to build the R package. For example to use the default Hadoop versions you can run
17+
5. Open a command shell (`cmd`) in the Spark directory and build Spark with [Maven](http://spark.apache.org/docs/latest/building-spark.html#buildmvn) and include the `-Psparkr` profile to build the R package. For example to use the default Hadoop versions you can run
1818

1919
```bash
2020
mvn.cmd -DskipTests -Psparkr package

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ storage systems. Because the protocols have changed in different versions of
108108
Hadoop, you must build Spark against the same version that your cluster runs.
109109

110110
Please refer to the build documentation at
111-
["Specifying the Hadoop Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version)
111+
["Specifying the Hadoop Version and Enabling YARN"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version-and-enabling-yarn)
112112
for detailed guidance on building for a particular distribution of Hadoop, including
113113
building for particular Hive and Hive Thriftserver distributions.
114114

build/mvn

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ install_app() {
6060
fi
6161
}
6262

63+
# See simple version normalization: http://stackoverflow.com/questions/16989598/bash-comparing-version-numbers
64+
function version { echo "$@" | awk -F. '{ printf("%03d%03d%03d\n", $1,$2,$3); }'; }
65+
6366
# Determine the Maven version from the root pom.xml file and
6467
# install maven under the build/ folder if needed.
6568
install_mvn() {
@@ -68,8 +71,6 @@ install_mvn() {
6871
if [ "$MVN_BIN" ]; then
6972
local MVN_DETECTED_VERSION="$(mvn --version | head -n1 | awk '{print $3}')"
7073
fi
71-
# See simple version normalization: http://stackoverflow.com/questions/16989598/bash-comparing-version-numbers
72-
function version { echo "$@" | awk -F. '{ printf("%03d%03d%03d\n", $1,$2,$3); }'; }
7374
if [ $(version $MVN_DETECTED_VERSION) -lt $(version $MVN_VERSION) ]; then
7475
local APACHE_MIRROR=${APACHE_MIRROR:-'https://www.apache.org/dyn/closer.lua?action=download&filename='}
7576

@@ -87,15 +88,23 @@ install_mvn() {
8788

8889
# Install zinc under the build/ folder
8990
install_zinc() {
90-
local zinc_path="zinc-0.3.15/bin/zinc"
91-
[ ! -f "${_DIR}/${zinc_path}" ] && ZINC_INSTALL_FLAG=1
92-
local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.lightbend.com}
91+
local ZINC_VERSION=0.3.15
92+
ZINC_BIN="$(command -v zinc)"
93+
if [ "$ZINC_BIN" ]; then
94+
local ZINC_DETECTED_VERSION="$(zinc -version | head -n1 | awk '{print $5}')"
95+
fi
9396

94-
install_app \
95-
"${TYPESAFE_MIRROR}/zinc/0.3.15" \
96-
"zinc-0.3.15.tgz" \
97-
"${zinc_path}"
98-
ZINC_BIN="${_DIR}/${zinc_path}"
97+
if [ $(version $ZINC_DETECTED_VERSION) -lt $(version $ZINC_VERSION) ]; then
98+
local zinc_path="zinc-${ZINC_VERSION}/bin/zinc"
99+
[ ! -f "${_DIR}/${zinc_path}" ] && ZINC_INSTALL_FLAG=1
100+
local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.lightbend.com}
101+
102+
install_app \
103+
"${TYPESAFE_MIRROR}/zinc/${ZINC_VERSION}" \
104+
"zinc-${ZINC_VERSION}.tgz" \
105+
"${zinc_path}"
106+
ZINC_BIN="${_DIR}/${zinc_path}"
107+
fi
99108
}
100109

101110
# Determine the Scala version from the root pom.xml file, set the Scala URL,

core/src/main/scala/org/apache/spark/BarrierTaskContext.scala

Lines changed: 93 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,22 @@ import scala.language.postfixOps
2424

2525
import org.apache.spark.annotation.{Experimental, Since}
2626
import org.apache.spark.executor.TaskMetrics
27+
import org.apache.spark.internal.Logging
2728
import org.apache.spark.memory.TaskMemoryManager
28-
import org.apache.spark.metrics.MetricsSystem
29+
import org.apache.spark.metrics.source.Source
2930
import org.apache.spark.rpc.{RpcEndpointRef, RpcTimeout}
30-
import org.apache.spark.util.{RpcUtils, Utils}
31-
32-
/** A [[TaskContext]] with extra info and tooling for a barrier stage. */
33-
class BarrierTaskContext(
34-
override val stageId: Int,
35-
override val stageAttemptNumber: Int,
36-
override val partitionId: Int,
37-
override val taskAttemptId: Long,
38-
override val attemptNumber: Int,
39-
override val taskMemoryManager: TaskMemoryManager,
40-
localProperties: Properties,
41-
@transient private val metricsSystem: MetricsSystem,
42-
// The default value is only used in tests.
43-
override val taskMetrics: TaskMetrics = TaskMetrics.empty)
44-
extends TaskContextImpl(stageId, stageAttemptNumber, partitionId, taskAttemptId, attemptNumber,
45-
taskMemoryManager, localProperties, metricsSystem, taskMetrics) {
31+
import org.apache.spark.shuffle.FetchFailedException
32+
import org.apache.spark.util._
33+
34+
/**
35+
* :: Experimental ::
36+
* A [[TaskContext]] with extra contextual info and tooling for tasks in a barrier stage.
37+
* Use [[BarrierTaskContext#get]] to obtain the barrier context for a running barrier task.
38+
*/
39+
@Experimental
40+
@Since("2.4.0")
41+
class BarrierTaskContext private[spark] (
42+
taskContext: TaskContext) extends TaskContext with Logging {
4643

4744
// Find the driver side RPCEndpointRef of the coordinator that handles all the barrier() calls.
4845
private val barrierCoordinator: RpcEndpointRef = {
@@ -68,7 +65,7 @@ class BarrierTaskContext(
6865
*
6966
* CAUTION! In a barrier stage, each task must have the same number of barrier() calls, in all
7067
* possible code branches. Otherwise, you may get the job hanging or a SparkException after
71-
* timeout. Some examples of misuses listed below:
68+
* timeout. Some examples of '''misuses''' are listed below:
7269
* 1. Only call barrier() function on a subset of all the tasks in the same barrier stage, it
7370
* shall lead to timeout of the function call.
7471
* {{{
@@ -146,20 +143,95 @@ class BarrierTaskContext(
146143

147144
/**
148145
* :: Experimental ::
149-
* Returns the all task infos in this barrier stage, the task infos are ordered by partitionId.
146+
* Returns [[BarrierTaskInfo]] for all tasks in this barrier stage, ordered by partition ID.
150147
*/
151148
@Experimental
152149
@Since("2.4.0")
153150
def getTaskInfos(): Array[BarrierTaskInfo] = {
154-
val addressesStr = localProperties.getProperty("addresses", "")
151+
val addressesStr = Option(taskContext.getLocalProperty("addresses")).getOrElse("")
155152
addressesStr.split(",").map(_.trim()).map(new BarrierTaskInfo(_))
156153
}
154+
155+
// delegate methods
156+
157+
override def isCompleted(): Boolean = taskContext.isCompleted()
158+
159+
override def isInterrupted(): Boolean = taskContext.isInterrupted()
160+
161+
override def isRunningLocally(): Boolean = taskContext.isRunningLocally()
162+
163+
override def addTaskCompletionListener(listener: TaskCompletionListener): this.type = {
164+
taskContext.addTaskCompletionListener(listener)
165+
this
166+
}
167+
168+
override def addTaskFailureListener(listener: TaskFailureListener): this.type = {
169+
taskContext.addTaskFailureListener(listener)
170+
this
171+
}
172+
173+
override def stageId(): Int = taskContext.stageId()
174+
175+
override def stageAttemptNumber(): Int = taskContext.stageAttemptNumber()
176+
177+
override def partitionId(): Int = taskContext.partitionId()
178+
179+
override def attemptNumber(): Int = taskContext.attemptNumber()
180+
181+
override def taskAttemptId(): Long = taskContext.taskAttemptId()
182+
183+
override def getLocalProperty(key: String): String = taskContext.getLocalProperty(key)
184+
185+
override def taskMetrics(): TaskMetrics = taskContext.taskMetrics()
186+
187+
override def getMetricsSources(sourceName: String): Seq[Source] = {
188+
taskContext.getMetricsSources(sourceName)
189+
}
190+
191+
override private[spark] def killTaskIfInterrupted(): Unit = taskContext.killTaskIfInterrupted()
192+
193+
override private[spark] def getKillReason(): Option[String] = taskContext.getKillReason()
194+
195+
override private[spark] def taskMemoryManager(): TaskMemoryManager = {
196+
taskContext.taskMemoryManager()
197+
}
198+
199+
override private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = {
200+
taskContext.registerAccumulator(a)
201+
}
202+
203+
override private[spark] def setFetchFailed(fetchFailed: FetchFailedException): Unit = {
204+
taskContext.setFetchFailed(fetchFailed)
205+
}
206+
207+
override private[spark] def markInterrupted(reason: String): Unit = {
208+
taskContext.markInterrupted(reason)
209+
}
210+
211+
override private[spark] def markTaskFailed(error: Throwable): Unit = {
212+
taskContext.markTaskFailed(error)
213+
}
214+
215+
override private[spark] def markTaskCompleted(error: Option[Throwable]): Unit = {
216+
taskContext.markTaskCompleted(error)
217+
}
218+
219+
override private[spark] def fetchFailed: Option[FetchFailedException] = {
220+
taskContext.fetchFailed
221+
}
222+
223+
override private[spark] def getLocalProperties: Properties = taskContext.getLocalProperties
157224
}
158225

226+
@Experimental
227+
@Since("2.4.0")
159228
object BarrierTaskContext {
160229
/**
161-
* Return the currently active BarrierTaskContext. This can be called inside of user functions to
230+
* :: Experimental ::
231+
* Returns the currently active BarrierTaskContext. This can be called inside of user functions to
162232
* access contextual information about running barrier tasks.
163233
*/
234+
@Experimental
235+
@Since("2.4.0")
164236
def get(): BarrierTaskContext = TaskContext.get().asInstanceOf[BarrierTaskContext]
165237
}

core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,4 @@ import org.apache.spark.annotation.{Experimental, Since}
2828
*/
2929
@Experimental
3030
@Since("2.4.0")
31-
class BarrierTaskInfo(val address: String)
31+
class BarrierTaskInfo private[spark] (val address: String)

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
2525

2626
import com.codahale.metrics.{Gauge, MetricRegistry}
2727

28-
import org.apache.spark.internal.Logging
28+
import org.apache.spark.internal.{config, Logging}
2929
import org.apache.spark.internal.config._
3030
import org.apache.spark.metrics.source.Source
3131
import org.apache.spark.scheduler._
@@ -212,7 +212,7 @@ private[spark] class ExecutorAllocationManager(
212212
}
213213
// Require external shuffle service for dynamic allocation
214214
// Otherwise, we may lose shuffle files when killing executors
215-
if (!conf.getBoolean("spark.shuffle.service.enabled", false) && !testing) {
215+
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing) {
216216
throw new SparkException("Dynamic allocation of executors requires the external " +
217217
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
218218
}

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ import org.apache.spark.util.random.SamplingUtils
3333
/**
3434
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
3535
* Maps each key to a partition ID, from 0 to `numPartitions - 1`.
36+
*
37+
* Note that, partitioner must be deterministic, i.e. it must return the same partition id given
38+
* the same partition key.
3639
*/
3740
abstract class Partitioner extends Serializable {
3841
def numPartitions: Int

core/src/main/scala/org/apache/spark/TaskContext.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,4 +221,18 @@ abstract class TaskContext extends Serializable {
221221
*/
222222
private[spark] def setFetchFailed(fetchFailed: FetchFailedException): Unit
223223

224+
/** Marks the task for interruption, i.e. cancellation. */
225+
private[spark] def markInterrupted(reason: String): Unit
226+
227+
/** Marks the task as failed and triggers the failure listeners. */
228+
private[spark] def markTaskFailed(error: Throwable): Unit
229+
230+
/** Marks the task as completed and triggers the completion listeners. */
231+
private[spark] def markTaskCompleted(error: Option[Throwable]): Unit
232+
233+
/** Optionally returns the stored fetch failure in the task. */
234+
private[spark] def fetchFailed: Option[FetchFailedException]
235+
236+
/** Gets local properties set upstream in the driver. */
237+
private[spark] def getLocalProperties: Properties
224238
}

core/src/main/scala/org/apache/spark/TaskContextImpl.scala

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.metrics.source.Source
3030
import org.apache.spark.shuffle.FetchFailedException
3131
import org.apache.spark.util._
3232

33+
3334
/**
3435
* A [[TaskContext]] implementation.
3536
*
@@ -98,9 +99,8 @@ private[spark] class TaskContextImpl(
9899
this
99100
}
100101

101-
/** Marks the task as failed and triggers the failure listeners. */
102102
@GuardedBy("this")
103-
private[spark] def markTaskFailed(error: Throwable): Unit = synchronized {
103+
private[spark] override def markTaskFailed(error: Throwable): Unit = synchronized {
104104
if (failed) return
105105
failed = true
106106
failure = error
@@ -109,9 +109,8 @@ private[spark] class TaskContextImpl(
109109
}
110110
}
111111

112-
/** Marks the task as completed and triggers the completion listeners. */
113112
@GuardedBy("this")
114-
private[spark] def markTaskCompleted(error: Option[Throwable]): Unit = synchronized {
113+
private[spark] override def markTaskCompleted(error: Option[Throwable]): Unit = synchronized {
115114
if (completed) return
116115
completed = true
117116
invokeListeners(onCompleteCallbacks, "TaskCompletionListener", error) {
@@ -140,8 +139,7 @@ private[spark] class TaskContextImpl(
140139
}
141140
}
142141

143-
/** Marks the task for interruption, i.e. cancellation. */
144-
private[spark] def markInterrupted(reason: String): Unit = {
142+
private[spark] override def markInterrupted(reason: String): Unit = {
145143
reasonIfKilled = Some(reason)
146144
}
147145

@@ -176,8 +174,7 @@ private[spark] class TaskContextImpl(
176174
this._fetchFailedException = Option(fetchFailed)
177175
}
178176

179-
private[spark] def fetchFailed: Option[FetchFailedException] = _fetchFailedException
177+
private[spark] override def fetchFailed: Option[FetchFailedException] = _fetchFailedException
180178

181-
// TODO: shall we publish it and define it in `TaskContext`?
182-
private[spark] def getLocalProperties(): Properties = localProperties
179+
private[spark] override def getLocalProperties(): Properties = localProperties
183180
}

0 commit comments

Comments
 (0)