Skip to content

Commit c4f20b9

Browse files
authored
Merge pull request apache-spark-on-k8s#392 from palantir/rk/upstream
2 parents df34e2d + 8eb984a commit c4f20b9

File tree

150 files changed

+4128
-724
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

150 files changed

+4128
-724
lines changed

.circleci/config.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -167,18 +167,18 @@ jobs:
167167
run-style-tests:
168168
# depends only on build-maven
169169
<<: *test-defaults
170-
resource_class: medium
170+
resource_class: medium+
171171
steps:
172172
- *checkout-code
173173
# Need maven dependency cache, otherwise checkstyle tests fail as such:
174174
# Failed to execute goal on project spark-assembly_2.11: Could not resolve dependencies for project org.apache.spark:spark-assembly_2.11:pom:2.4.0-SNAPSHOT
175175
- restore_cache:
176176
key: maven-dependency-cache-{{ checksum "pom.xml" }}
177177
- *restore-build-binaries-cache
178-
- run: dev/run-style-tests.py | tee /tmp/run-style-tests.log
179-
- store_artifacts:
180-
path: /tmp/run-style-tests.log
181-
destination: run-style-tests.log
178+
- run:
179+
name: Run style tests
180+
command: dev/run-style-tests.py
181+
no_output_timeout: 15m
182182

183183
run-build-tests:
184184
# depends only on build-maven

appveyor.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ install:
4848
- cmd: R -e "packageVersion('knitr'); packageVersion('rmarkdown'); packageVersion('testthat'); packageVersion('e1071'); packageVersion('survival')"
4949

5050
build_script:
51-
- cmd: mvn -DskipTests -Psparkr -Phive -Phive-thriftserver package
51+
- cmd: mvn -DskipTests -Psparkr -Phive package
5252

5353
environment:
5454
NOT_CRAN: true

assembly/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@
7474
<artifactId>spark-repl_${scala.binary.version}</artifactId>
7575
<version>${project.version}</version>
7676
</dependency>
77+
<dependency>
78+
<groupId>org.apache.spark</groupId>
79+
<artifactId>spark-avro_2.11</artifactId>
80+
<version>${project.version}</version>
81+
</dependency>
7782

7883
<!--
7984
Because we don't shade dependencies anymore, we need to restore Guava to compile scope so

common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,14 @@ static class EncryptedMessage extends AbstractFileRegion {
135135
private final boolean isByteBuf;
136136
private final ByteBuf buf;
137137
private final FileRegion region;
138+
private final int maxOutboundBlockSize;
138139

139140
/**
140141
* A channel used to buffer input data for encryption. The channel has an upper size bound
141142
* so that if the input is larger than the allowed buffer, it will be broken into multiple
142-
* chunks.
143+
* chunks. Made non-final to enable lazy initialization, which saves memory.
143144
*/
144-
private final ByteArrayWritableChannel byteChannel;
145+
private ByteArrayWritableChannel byteChannel;
145146

146147
private ByteBuf currentHeader;
147148
private ByteBuffer currentChunk;
@@ -157,7 +158,7 @@ static class EncryptedMessage extends AbstractFileRegion {
157158
this.isByteBuf = msg instanceof ByteBuf;
158159
this.buf = isByteBuf ? (ByteBuf) msg : null;
159160
this.region = isByteBuf ? null : (FileRegion) msg;
160-
this.byteChannel = new ByteArrayWritableChannel(maxOutboundBlockSize);
161+
this.maxOutboundBlockSize = maxOutboundBlockSize;
161162
}
162163

163164
/**
@@ -292,6 +293,9 @@ public long transferTo(final WritableByteChannel target, final long position)
292293
}
293294

294295
private void nextChunk() throws IOException {
296+
if (byteChannel == null) {
297+
byteChannel = new ByteArrayWritableChannel(maxOutboundBlockSize);
298+
}
295299
byteChannel.reset();
296300
if (isByteBuf) {
297301
int copied = byteChannel.write(buf.nioBuffer());
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import org.apache.spark.annotation.{Experimental, Since}
21+
22+
/** A [[TaskContext]] with extra info and tooling for a barrier stage. */
23+
trait BarrierTaskContext extends TaskContext {
24+
25+
/**
26+
* :: Experimental ::
27+
* Sets a global barrier and waits until all tasks in this stage hit this barrier. Similar to
28+
* MPI_Barrier function in MPI, the barrier() function call blocks until all tasks in the same
29+
* stage have reached this routine.
30+
*/
31+
@Experimental
32+
@Since("2.4.0")
33+
def barrier(): Unit
34+
35+
/**
36+
* :: Experimental ::
37+
* Returns the all task infos in this barrier stage, the task infos are ordered by partitionId.
38+
*/
39+
@Experimental
40+
@Since("2.4.0")
41+
def getTaskInfos(): Array[BarrierTaskInfo]
42+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import java.util.Properties
21+
22+
import org.apache.spark.executor.TaskMetrics
23+
import org.apache.spark.memory.TaskMemoryManager
24+
import org.apache.spark.metrics.MetricsSystem
25+
26+
/** A [[BarrierTaskContext]] implementation. */
27+
private[spark] class BarrierTaskContextImpl(
28+
override val stageId: Int,
29+
override val stageAttemptNumber: Int,
30+
override val partitionId: Int,
31+
override val taskAttemptId: Long,
32+
override val attemptNumber: Int,
33+
override val taskMemoryManager: TaskMemoryManager,
34+
localProperties: Properties,
35+
@transient private val metricsSystem: MetricsSystem,
36+
// The default value is only used in tests.
37+
override val taskMetrics: TaskMetrics = TaskMetrics.empty)
38+
extends TaskContextImpl(stageId, stageAttemptNumber, partitionId, taskAttemptId, attemptNumber,
39+
taskMemoryManager, localProperties, metricsSystem, taskMetrics)
40+
with BarrierTaskContext {
41+
42+
// TODO SPARK-24817 implement global barrier.
43+
override def barrier(): Unit = {}
44+
45+
override def getTaskInfos(): Array[BarrierTaskInfo] = {
46+
val addressesStr = localProperties.getProperty("addresses", "")
47+
addressesStr.split(",").map(_.trim()).map(new BarrierTaskInfo(_))
48+
}
49+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import org.apache.spark.annotation.{Experimental, Since}
21+
22+
23+
/**
24+
* :: Experimental ::
25+
* Carries all task infos of a barrier task.
26+
*
27+
* @param address the IPv4 address(host:port) of the executor that a barrier task is running on
28+
*/
29+
@Experimental
30+
@Since("2.4.0")
31+
class BarrierTaskInfo(val address: String)

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,18 @@ private[spark] class MapOutputTrackerMaster(
434434
}
435435
}
436436

437+
/** Unregister all map output information of the given shuffle. */
438+
def unregisterAllMapOutput(shuffleId: Int) {
439+
shuffleStatuses.get(shuffleId) match {
440+
case Some(shuffleStatus) =>
441+
shuffleStatus.removeOutputsByFilter(x => true)
442+
incrementEpoch()
443+
case None =>
444+
throw new SparkException(
445+
s"unregisterAllMapOutput called for nonexistent shuffle ID $shuffleId.")
446+
}
447+
}
448+
437449
/** Unregister shuffle data */
438450
def unregisterShuffle(shuffleId: Int) {
439451
shuffleStatuses.remove(shuffleId).foreach { shuffleStatus =>

core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,21 @@ import org.apache.spark.{Partition, TaskContext}
2323

2424
/**
2525
* An RDD that applies the provided function to every partition of the parent RDD.
26+
*
27+
* @param prev the parent RDD.
28+
* @param f The function used to map a tuple of (TaskContext, partition index, input iterator) to
29+
* an output iterator.
30+
* @param preservesPartitioning Whether the input function preserves the partitioner, which should
31+
* be `false` unless `prev` is a pair RDD and the input function
32+
* doesn't modify the keys.
33+
* @param isFromBarrier Indicates whether this RDD is transformed from an RDDBarrier, a stage
34+
* containing at least one RDDBarrier shall be turned into a barrier stage.
2635
*/
2736
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
2837
var prev: RDD[T],
2938
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
30-
preservesPartitioning: Boolean = false)
39+
preservesPartitioning: Boolean = false,
40+
isFromBarrier: Boolean = false)
3141
extends RDD[U](prev) {
3242

3343
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
@@ -41,4 +51,7 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
4151
super.clearDependencies()
4252
prev = null
4353
}
54+
55+
@transient protected lazy override val isBarrier_ : Boolean =
56+
isFromBarrier || dependencies.exists(_.rdd.isBarrier())
4457
}

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.hadoop.mapred.TextOutputFormat
3333

3434
import org.apache.spark._
3535
import org.apache.spark.Partitioner._
36-
import org.apache.spark.annotation.{DeveloperApi, Since}
36+
import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
3737
import org.apache.spark.api.java.JavaRDD
3838
import org.apache.spark.internal.Logging
3939
import org.apache.spark.partial.BoundedDouble
@@ -1647,6 +1647,14 @@ abstract class RDD[T: ClassTag](
16471647
}
16481648
}
16491649

1650+
/**
1651+
* :: Experimental ::
1652+
* Indicates that Spark must launch the tasks together for the current stage.
1653+
*/
1654+
@Experimental
1655+
@Since("2.4.0")
1656+
def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this))
1657+
16501658
// =======================================================================
16511659
// Other internal methods and fields
16521660
// =======================================================================
@@ -1839,6 +1847,23 @@ abstract class RDD[T: ClassTag](
18391847
def toJavaRDD() : JavaRDD[T] = {
18401848
new JavaRDD(this)(elementClassTag)
18411849
}
1850+
1851+
/**
1852+
* Whether the RDD is in a barrier stage. Spark must launch all the tasks at the same time for a
1853+
* barrier stage.
1854+
*
1855+
* An RDD is in a barrier stage, if at least one of its parent RDD(s), or itself, are mapped from
1856+
* an [[RDDBarrier]]. This function always returns false for a [[ShuffledRDD]], since a
1857+
* [[ShuffledRDD]] indicates start of a new stage.
1858+
*
1859+
* A [[MapPartitionsRDD]] can be transformed from an [[RDDBarrier]], under that case the
1860+
* [[MapPartitionsRDD]] shall be marked as barrier.
1861+
*/
1862+
private[spark] def isBarrier(): Boolean = isBarrier_
1863+
1864+
// From performance concern, cache the value to avoid repeatedly compute `isBarrier()` on a long
1865+
// RDD chain.
1866+
@transient protected lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier())
18421867
}
18431868

18441869

0 commit comments

Comments
 (0)