Skip to content

Commit 3bf8055

Browse files
authored
Merge pull request apache-spark-on-k8s#353 from palantir/rk/upstream-bump
Small upstream bump
2 parents 6da0b82 + 33658e4 commit 3bf8055

File tree

49 files changed

+1240
-645
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1240
-645
lines changed

core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,21 @@ private[deploy] object DependencyUtils {
3333
packagesExclusions: String,
3434
packages: String,
3535
repositories: String,
36-
ivyRepoPath: String): String = {
36+
ivyRepoPath: String,
37+
ivySettingsPath: Option[String]): String = {
3738
val exclusions: Seq[String] =
3839
if (!StringUtils.isBlank(packagesExclusions)) {
3940
packagesExclusions.split(",")
4041
} else {
4142
Nil
4243
}
4344
// Create the IvySettings, either load from file or build defaults
44-
val ivySettings = sys.props.get("spark.jars.ivySettings").map { ivySettingsFile =>
45-
SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(repositories), Option(ivyRepoPath))
46-
}.getOrElse {
47-
SparkSubmitUtils.buildIvySettings(Option(repositories), Option(ivyRepoPath))
45+
val ivySettings = ivySettingsPath match {
46+
case Some(path) =>
47+
SparkSubmitUtils.loadIvySettings(path, Option(repositories), Option(ivyRepoPath))
48+
49+
case None =>
50+
SparkSubmitUtils.buildIvySettings(Option(repositories), Option(ivyRepoPath))
4851
}
4952

5053
SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings, exclusions = exclusions)

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
359359
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
360360
// too for packages that include Python code
361361
val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(
362-
args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath)
362+
args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath,
363+
args.ivySettingsPath)
363364

364365
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
365366
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
6363
var packages: String = null
6464
var repositories: String = null
6565
var ivyRepoPath: String = null
66+
var ivySettingsPath: Option[String] = None
6667
var packagesExclusions: String = null
6768
var verbose: Boolean = false
6869
var isPython: Boolean = false
@@ -184,6 +185,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
184185
jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
185186
files = Option(files).orElse(sparkProperties.get("spark.files")).orNull
186187
ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull
188+
ivySettingsPath = sparkProperties.get("spark.jars.ivySettings")
187189
packages = Option(packages).orElse(sparkProperties.get("spark.jars.packages")).orNull
188190
packagesExclusions = Option(packagesExclusions)
189191
.orElse(sparkProperties.get("spark.jars.excludes")).orNull

core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,17 @@ object DriverWrapper extends Logging {
7979
val secMgr = new SecurityManager(sparkConf)
8080
val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)
8181

82-
val Seq(packagesExclusions, packages, repositories, ivyRepoPath) =
83-
Seq("spark.jars.excludes", "spark.jars.packages", "spark.jars.repositories", "spark.jars.ivy")
84-
.map(sys.props.get(_).orNull)
82+
val Seq(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath) =
83+
Seq(
84+
"spark.jars.excludes",
85+
"spark.jars.packages",
86+
"spark.jars.repositories",
87+
"spark.jars.ivy",
88+
"spark.jars.ivySettings"
89+
).map(sys.props.get(_).orNull)
8590

8691
val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(packagesExclusions,
87-
packages, repositories, ivyRepoPath)
92+
packages, repositories, ivyRepoPath, Option(ivySettingsPath))
8893
val jars = {
8994
val jarsProp = sys.props.get("spark.jars").orNull
9095
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ class BlockManagerMasterEndpoint(
164164
val futures = blockManagerInfo.values.map { bm =>
165165
bm.slaveEndpoint.ask[Int](removeMsg).recover {
166166
case e: IOException =>
167-
logWarning(s"Error trying to remove RDD $rddId", e)
167+
logWarning(s"Error trying to remove RDD $rddId from block manager ${bm.blockManagerId}",
168+
e)
168169
0 // zero blocks were removed
169170
}
170171
}.toSeq
@@ -195,7 +196,8 @@ class BlockManagerMasterEndpoint(
195196
val futures = requiredBlockManagers.map { bm =>
196197
bm.slaveEndpoint.ask[Int](removeMsg).recover {
197198
case e: IOException =>
198-
logWarning(s"Error trying to remove broadcast $broadcastId", e)
199+
logWarning(s"Error trying to remove broadcast $broadcastId from block manager " +
200+
s"${bm.blockManagerId}", e)
199201
0 // zero blocks were removed
200202
}
201203
}.toSeq

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ class SparkSubmitSuite
106106
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
107107
implicit val defaultSignaler: Signaler = ThreadSignaler
108108

109+
private val emptyIvySettings = File.createTempFile("ivy", ".xml")
110+
FileUtils.write(emptyIvySettings, "<ivysettings />", StandardCharsets.UTF_8)
111+
109112
override def beforeEach() {
110113
super.beforeEach()
111114
}
@@ -520,6 +523,7 @@ class SparkSubmitSuite
520523
"--repositories", repo,
521524
"--conf", "spark.ui.enabled=false",
522525
"--conf", "spark.master.rest.enabled=false",
526+
"--conf", s"spark.jars.ivySettings=${emptyIvySettings.getAbsolutePath()}",
523527
unusedJar.toString,
524528
"my.great.lib.MyLib", "my.great.dep.MyLib")
525529
runSparkSubmit(args)
@@ -530,7 +534,6 @@ class SparkSubmitSuite
530534
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
531535
val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
532536
val dep = MavenCoordinate("my.great.dep", "mylib", "0.1")
533-
// Test using "spark.jars.packages" and "spark.jars.repositories" configurations.
534537
IvyTestUtils.withRepository(main, Some(dep.toString), None) { repo =>
535538
val args = Seq(
536539
"--class", JarCreationTest.getClass.getName.stripSuffix("$"),
@@ -540,6 +543,7 @@ class SparkSubmitSuite
540543
"--conf", s"spark.jars.repositories=$repo",
541544
"--conf", "spark.ui.enabled=false",
542545
"--conf", "spark.master.rest.enabled=false",
546+
"--conf", s"spark.jars.ivySettings=${emptyIvySettings.getAbsolutePath()}",
543547
unusedJar.toString,
544548
"my.great.lib.MyLib", "my.great.dep.MyLib")
545549
runSparkSubmit(args)
@@ -550,7 +554,6 @@ class SparkSubmitSuite
550554
// See https://gist.github.com/shivaram/3a2fecce60768a603dac for a error log
551555
ignore("correctly builds R packages included in a jar with --packages") {
552556
assume(RUtils.isRInstalled, "R isn't installed on this machine.")
553-
// Check if the SparkR package is installed
554557
assume(RUtils.isSparkRInstalled, "SparkR is not installed in this build.")
555558
val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
556559
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
@@ -563,6 +566,7 @@ class SparkSubmitSuite
563566
"--master", "local-cluster[2,1,1024]",
564567
"--packages", main.toString,
565568
"--repositories", repo,
569+
"--conf", s"spark.jars.ivySettings=${emptyIvySettings.getAbsolutePath()}",
566570
"--verbose",
567571
"--conf", "spark.ui.enabled=false",
568572
rScriptDir)
@@ -573,7 +577,6 @@ class SparkSubmitSuite
573577
test("include an external JAR in SparkR") {
574578
assume(RUtils.isRInstalled, "R isn't installed on this machine.")
575579
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
576-
// Check if the SparkR package is installed
577580
assume(RUtils.isSparkRInstalled, "SparkR is not installed in this build.")
578581
val rScriptDir =
579582
Seq(sparkHome, "R", "pkg", "tests", "fulltests", "jarTest.R").mkString(File.separator)

docs/ml-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ and the migration guide below will explain all changes between releases.
111111
* The class and trait hierarchy for logistic regression model summaries was changed to be cleaner
112112
and better accommodate the addition of the multi-class summary. This is a breaking change for user
113113
code that casts a `LogisticRegressionTrainingSummary` to a
114-
` BinaryLogisticRegressionTrainingSummary`. Users should instead use the `model.binarySummary`
114+
`BinaryLogisticRegressionTrainingSummary`. Users should instead use the `model.binarySummary`
115115
method. See [SPARK-17139](https://issues.apache.org/jira/browse/SPARK-17139) for more detail
116116
(_note_ this is an `Experimental` API). This _does not_ affect the Python `summary` method, which
117117
will still work correctly for both multinomial and binary cases.

docs/mllib-feature-extraction.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,8 +278,8 @@ for details on the API.
278278
multiplication. In other words, it scales each column of the dataset by a scalar multiplier. This
279279
represents the [Hadamard product](https://en.wikipedia.org/wiki/Hadamard_product_%28matrices%29)
280280
between the input vector, `v` and transforming vector, `scalingVec`, to yield a result vector.
281-
Qu8T948*1#
282-
Denoting the `scalingVec` as "`w`," this transformation may be written as:
281+
282+
Denoting the `scalingVec` as "`w`", this transformation may be written as:
283283

284284
`\[ \begin{pmatrix}
285285
v_1 \\

docs/mllib-pmml-model-export.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@ displayTitle: PMML model export - RDD-based API
77
* Table of contents
88
{:toc}
99

10-
## `spark.mllib` supported models
10+
## spark.mllib supported models
1111

1212
`spark.mllib` supports model export to Predictive Model Markup Language ([PMML](http://en.wikipedia.org/wiki/Predictive_Model_Markup_Language)).
1313

1414
The table below outlines the `spark.mllib` models that can be exported to PMML and their equivalent PMML model.
1515

1616
<table class="table">
1717
<thead>
18-
<tr><th>`spark.mllib` model</th><th>PMML model</th></tr>
18+
<tr><th>spark.mllib model</th><th>PMML model</th></tr>
1919
</thead>
2020
<tbody>
2121
<tr>

docs/running-on-kubernetes.md

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -549,14 +549,23 @@ specific to Spark on Kubernetes.
549549
<td><code>spark.kubernetes.driver.limit.cores</code></td>
550550
<td>(none)</td>
551551
<td>
552-
Specify the hard CPU [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for the driver pod.
552+
Specify a hard cpu [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for the driver pod.
553553
</td>
554554
</tr>
555+
<tr>
556+
<td><code>spark.kubernetes.executor.request.cores</code></td>
557+
<td>(none)</td>
558+
<td>
559+
Specify the cpu request for each executor pod. Values conform to the Kubernetes [convention](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu).
560+
Example values include 0.1, 500m, 1.5, 5, etc., with the definition of cpu units documented in [CPU units](https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#cpu-units).
561+
This is distinct from <code>spark.executor.cores</code>: it is only used and takes precedence over <code>spark.executor.cores</code> for specifying the executor pod cpu request if set. Task
562+
parallelism, e.g., number of tasks an executor can run concurrently is not affected by this.
563+
</tr>
555564
<tr>
556565
<td><code>spark.kubernetes.executor.limit.cores</code></td>
557566
<td>(none)</td>
558567
<td>
559-
Specify the hard CPU [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for each executor pod launched for the Spark Application.
568+
Specify a hard cpu [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for each executor pod launched for the Spark Application.
560569
</td>
561570
</tr>
562571
<tr>
@@ -593,4 +602,4 @@ specific to Spark on Kubernetes.
593602
<code>spark.kubernetes.executor.secrets.spark-secret=/etc/secrets</code>.
594603
</td>
595604
</tr>
596-
</table>
605+
</table>

0 commit comments

Comments
 (0)