Skip to content

Commit 8bc304f

Browse files
committed
[SPARK-26132][BUILD][CORE] Remove support for Scala 2.11 in Spark 3.0.0
## What changes were proposed in this pull request? Remove Scala 2.11 support in build files and docs, and in various parts of code that accommodated 2.11. See some targeted comments below. ## How was this patch tested? Existing tests. Closes apache#23098 from srowen/SPARK-26132. Authored-by: Sean Owen <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent b8a0f98 commit 8bc304f

File tree

28 files changed

+163
-784
lines changed

28 files changed

+163
-784
lines changed

R/pkg/R/sparkR.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ sparkR.sparkContext <- function(
269269
#' sparkR.session("yarn-client", "SparkR", "/home/spark",
270270
#' list(spark.executor.memory="4g"),
271271
#' c("one.jar", "two.jar", "three.jar"),
272-
#' c("com.databricks:spark-avro_2.11:2.0.1"))
272+
#' c("com.databricks:spark-avro_2.12:2.0.1"))
273273
#' sparkR.session(spark.master = "yarn-client", spark.executor.memory = "4g")
274274
#'}
275275
#' @note sparkR.session since 2.0.0

R/pkg/tests/fulltests/test_client.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ test_that("multiple packages don't produce a warning", {
3737

3838
test_that("sparkJars sparkPackages as character vectors", {
3939
args <- generateSparkSubmitArgs("", "", c("one.jar", "two.jar", "three.jar"), "",
40-
c("com.databricks:spark-avro_2.11:2.0.1"))
40+
c("com.databricks:spark-avro_2.12:2.0.1"))
4141
expect_match(args, "--jars one.jar,two.jar,three.jar")
42-
expect_match(args, "--packages com.databricks:spark-avro_2.11:2.0.1")
42+
expect_match(args, "--packages com.databricks:spark-avro_2.12:2.0.1")
4343
})

R/pkg/vignettes/sparkr-vignettes.Rmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ SparkR supports operating on a variety of data sources through the `SparkDataFra
219219
The general method for creating `SparkDataFrame` from data sources is `read.df`. This method takes in the path for the file to load and the type of data source, and the currently active Spark Session will be used automatically. SparkR supports reading CSV, JSON and Parquet files natively and through Spark Packages you can find data source connectors for popular file formats like Avro. These packages can be added with `sparkPackages` parameter when initializing SparkSession using `sparkR.session`.
220220

221221
```{r, eval=FALSE}
222-
sparkR.session(sparkPackages = "com.databricks:spark-avro_2.11:3.0.0")
222+
sparkR.session(sparkPackages = "com.databricks:spark-avro_2.12:3.0.0")
223223
```
224224

225225
We can see how to use data sources using an example CSV input file. For more information please refer to SparkR [read.df](https://spark.apache.org/docs/latest/api/R/read.df.html) API documentation.

bin/load-spark-env.cmd

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,37 +21,42 @@ rem This script loads spark-env.cmd if it exists, and ensures it is only loaded
2121
rem spark-env.cmd is loaded from SPARK_CONF_DIR if set, or within the current directory's
2222
rem conf\ subdirectory.
2323

24+
set SPARK_ENV_CMD=spark-env.cmd
2425
if [%SPARK_ENV_LOADED%] == [] (
2526
set SPARK_ENV_LOADED=1
2627

2728
if [%SPARK_CONF_DIR%] == [] (
2829
set SPARK_CONF_DIR=%~dp0..\conf
2930
)
3031

31-
call :LoadSparkEnv
32+
set SPARK_ENV_CMD=%SPARK_CONF_DIR%\%SPARK_ENV_CMD%
33+
if exist %SPARK_ENV_CMD% (
34+
call %SPARK_ENV_CMD%
35+
)
3236
)
3337

3438
rem Setting SPARK_SCALA_VERSION if not already set.
3539

36-
set ASSEMBLY_DIR2="%SPARK_HOME%\assembly\target\scala-2.11"
37-
set ASSEMBLY_DIR1="%SPARK_HOME%\assembly\target\scala-2.12"
38-
39-
if [%SPARK_SCALA_VERSION%] == [] (
40-
41-
if exist %ASSEMBLY_DIR2% if exist %ASSEMBLY_DIR1% (
42-
echo "Presence of build for multiple Scala versions detected."
43-
echo "Either clean one of them or, set SPARK_SCALA_VERSION in spark-env.cmd."
44-
exit 1
45-
)
46-
if exist %ASSEMBLY_DIR2% (
47-
set SPARK_SCALA_VERSION=2.11
48-
) else (
49-
set SPARK_SCALA_VERSION=2.12
50-
)
51-
)
40+
rem TODO: revisit for Scala 2.13 support
41+
set SPARK_SCALA_VERSION=2.12
42+
rem if [%SPARK_SCALA_VERSION%] == [] (
43+
rem set SCALA_VERSION_1=2.12
44+
rem set SCALA_VERSION_2=2.11
45+
rem
46+
rem set ASSEMBLY_DIR1=%SPARK_HOME%\assembly\target\scala-%SCALA_VERSION_1%
47+
rem set ASSEMBLY_DIR2=%SPARK_HOME%\assembly\target\scala-%SCALA_VERSION_2%
48+
rem set ENV_VARIABLE_DOC=https://spark.apache.org/docs/latest/configuration.html#environment-variables
49+
rem if exist %ASSEMBLY_DIR2% if exist %ASSEMBLY_DIR1% (
50+
rem echo "Presence of build for multiple Scala versions detected (%ASSEMBLY_DIR1% and %ASSEMBLY_DIR2%)."
51+
rem echo "Remove one of them or, set SPARK_SCALA_VERSION=%SCALA_VERSION_1% in %SPARK_ENV_CMD%."
52+
rem echo "Visit %ENV_VARIABLE_DOC% for more details about setting environment variables in spark-env.cmd."
53+
rem echo "Either clean one of them or, set SPARK_SCALA_VERSION in spark-env.cmd."
54+
rem exit 1
55+
rem )
56+
rem if exist %ASSEMBLY_DIR1% (
57+
rem set SPARK_SCALA_VERSION=%SCALA_VERSION_1%
58+
rem ) else (
59+
rem set SPARK_SCALA_VERSION=%SCALA_VERSION_2%
60+
rem )
61+
rem )
5262
exit /b 0
53-
54-
:LoadSparkEnv
55-
if exist "%SPARK_CONF_DIR%\spark-env.cmd" (
56-
call "%SPARK_CONF_DIR%\spark-env.cmd"
57-
)

bin/load-spark-env.sh

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -43,23 +43,25 @@ fi
4343

4444
# Setting SPARK_SCALA_VERSION if not already set.
4545

46-
if [ -z "$SPARK_SCALA_VERSION" ]; then
47-
SCALA_VERSION_1=2.12
48-
SCALA_VERSION_2=2.11
49-
50-
ASSEMBLY_DIR_1="${SPARK_HOME}/assembly/target/scala-${SCALA_VERSION_1}"
51-
ASSEMBLY_DIR_2="${SPARK_HOME}/assembly/target/scala-${SCALA_VERSION_2}"
52-
ENV_VARIABLE_DOC="https://spark.apache.org/docs/latest/configuration.html#environment-variables"
53-
if [[ -d "$ASSEMBLY_DIR_1" && -d "$ASSEMBLY_DIR_2" ]]; then
54-
echo "Presence of build for multiple Scala versions detected ($ASSEMBLY_DIR_1 and $ASSEMBLY_DIR_2)." 1>&2
55-
echo "Remove one of them or, export SPARK_SCALA_VERSION=$SCALA_VERSION_1 in ${SPARK_ENV_SH}." 1>&2
56-
echo "Visit ${ENV_VARIABLE_DOC} for more details about setting environment variables in spark-env.sh." 1>&2
57-
exit 1
58-
fi
59-
60-
if [[ -d "$ASSEMBLY_DIR_1" ]]; then
61-
export SPARK_SCALA_VERSION=${SCALA_VERSION_1}
62-
else
63-
export SPARK_SCALA_VERSION=${SCALA_VERSION_2}
64-
fi
65-
fi
46+
# TODO: revisit for Scala 2.13 support
47+
export SPARK_SCALA_VERSION=2.12
48+
#if [ -z "$SPARK_SCALA_VERSION" ]; then
49+
# SCALA_VERSION_1=2.12
50+
# SCALA_VERSION_2=2.11
51+
#
52+
# ASSEMBLY_DIR_1="${SPARK_HOME}/assembly/target/scala-${SCALA_VERSION_1}"
53+
# ASSEMBLY_DIR_2="${SPARK_HOME}/assembly/target/scala-${SCALA_VERSION_2}"
54+
# ENV_VARIABLE_DOC="https://spark.apache.org/docs/latest/configuration.html#environment-variables"
55+
# if [[ -d "$ASSEMBLY_DIR_1" && -d "$ASSEMBLY_DIR_2" ]]; then
56+
# echo "Presence of build for multiple Scala versions detected ($ASSEMBLY_DIR_1 and $ASSEMBLY_DIR_2)." 1>&2
57+
# echo "Remove one of them or, export SPARK_SCALA_VERSION=$SCALA_VERSION_1 in ${SPARK_ENV_SH}." 1>&2
58+
# echo "Visit ${ENV_VARIABLE_DOC} for more details about setting environment variables in spark-env.sh." 1>&2
59+
# exit 1
60+
# fi
61+
#
62+
# if [[ -d "$ASSEMBLY_DIR_1" ]]; then
63+
# export SPARK_SCALA_VERSION=${SCALA_VERSION_1}
64+
# else
65+
# export SPARK_SCALA_VERSION=${SCALA_VERSION_2}
66+
# fi
67+
#fi

core/src/main/scala/org/apache/spark/FutureAction.scala

Lines changed: 4 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -89,18 +89,6 @@ trait FutureAction[T] extends Future[T] {
8989
*/
9090
override def value: Option[Try[T]]
9191

92-
// These two methods must be implemented in Scala 2.12. They're implemented as a no-op here
93-
// and then filled in with a real implementation in the two subclasses below. The no-op exists
94-
// here so that those implementations can declare "override", necessary in 2.12, while working
95-
// in 2.11, where the method doesn't exist in the superclass.
96-
// After 2.11 support goes away, remove these two:
97-
98-
def transform[S](f: (Try[T]) => Try[S])(implicit executor: ExecutionContext): Future[S] =
99-
throw new UnsupportedOperationException()
100-
101-
def transformWith[S](f: (Try[T]) => Future[S])(implicit executor: ExecutionContext): Future[S] =
102-
throw new UnsupportedOperationException()
103-
10492
/**
10593
* Blocks and returns the result of this job.
10694
*/
@@ -117,43 +105,6 @@ trait FutureAction[T] extends Future[T] {
117105

118106
}
119107

120-
/**
121-
* Scala 2.12 defines the two new transform/transformWith methods mentioned above. Impementing
122-
* these for 2.12 in the Spark class here requires delegating to these same methods in an
123-
* underlying Future object. But that only exists in 2.12. But these methods are only called
124-
* in 2.12. So define helper shims to access these methods on a Future by reflection.
125-
*/
126-
private[spark] object FutureAction {
127-
128-
private val transformTryMethod =
129-
try {
130-
classOf[Future[_]].getMethod("transform", classOf[(_) => _], classOf[ExecutionContext])
131-
} catch {
132-
case _: NoSuchMethodException => null // Would fail later in 2.11, but not called in 2.11
133-
}
134-
135-
private val transformWithTryMethod =
136-
try {
137-
classOf[Future[_]].getMethod("transformWith", classOf[(_) => _], classOf[ExecutionContext])
138-
} catch {
139-
case _: NoSuchMethodException => null // Would fail later in 2.11, but not called in 2.11
140-
}
141-
142-
private[spark] def transform[T, S](
143-
future: Future[T],
144-
f: (Try[T]) => Try[S],
145-
executor: ExecutionContext): Future[S] =
146-
transformTryMethod.invoke(future, f, executor).asInstanceOf[Future[S]]
147-
148-
private[spark] def transformWith[T, S](
149-
future: Future[T],
150-
f: (Try[T]) => Future[S],
151-
executor: ExecutionContext): Future[S] =
152-
transformWithTryMethod.invoke(future, f, executor).asInstanceOf[Future[S]]
153-
154-
}
155-
156-
157108
/**
158109
* A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
159110
* count, collect, reduce.
@@ -195,16 +146,10 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
195146
def jobIds: Seq[Int] = Seq(jobWaiter.jobId)
196147

197148
override def transform[S](f: (Try[T]) => Try[S])(implicit e: ExecutionContext): Future[S] =
198-
FutureAction.transform(
199-
jobWaiter.completionFuture,
200-
(u: Try[Unit]) => f(u.map(_ => resultFunc)),
201-
e)
149+
jobWaiter.completionFuture.transform((u: Try[Unit]) => f(u.map(_ => resultFunc)))
202150

203151
override def transformWith[S](f: (Try[T]) => Future[S])(implicit e: ExecutionContext): Future[S] =
204-
FutureAction.transformWith(
205-
jobWaiter.completionFuture,
206-
(u: Try[Unit]) => f(u.map(_ => resultFunc)),
207-
e)
152+
jobWaiter.completionFuture.transformWith((u: Try[Unit]) => f(u.map(_ => resultFunc)))
208153
}
209154

210155

@@ -299,10 +244,10 @@ class ComplexFutureAction[T](run : JobSubmitter => Future[T])
299244
def jobIds: Seq[Int] = subActions.flatMap(_.jobIds)
300245

301246
override def transform[S](f: (Try[T]) => Try[S])(implicit e: ExecutionContext): Future[S] =
302-
FutureAction.transform(p.future, f, e)
247+
p.future.transform(f)
303248

304249
override def transformWith[S](f: (Try[T]) => Future[S])(implicit e: ExecutionContext): Future[S] =
305-
FutureAction.transformWith(p.future, f, e)
250+
p.future.transformWith(f)
306251
}
307252

308253

core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,12 @@ import java.io.{IOException, ObjectOutputStream}
2121

2222
import scala.collection.mutable.ArrayBuffer
2323
import scala.collection.parallel.ForkJoinTaskSupport
24-
import scala.concurrent.forkjoin.ForkJoinPool
2524
import scala.reflect.ClassTag
2625

2726
import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext}
2827
import org.apache.spark.annotation.DeveloperApi
2928
import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD
30-
import org.apache.spark.util.Utils
29+
import org.apache.spark.util.{ThreadUtils, Utils}
3130

3231
/**
3332
* Partition for UnionRDD.
@@ -61,7 +60,7 @@ private[spark] class UnionPartition[T: ClassTag](
6160

6261
object UnionRDD {
6362
private[spark] lazy val partitionEvalTaskSupport =
64-
new ForkJoinTaskSupport(new ForkJoinPool(8))
63+
new ForkJoinTaskSupport(ThreadUtils.newForkJoinPool("partition-eval-task-support", 8))
6564
}
6665

6766
@DeveloperApi

core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ import org.apache.spark.internal.Logging
3434
*/
3535
private[spark] object ClosureCleaner extends Logging {
3636

37-
private val isScala2_11 = scala.util.Properties.versionString.contains("2.11")
38-
3937
// Get an ASM class reader for a given class from the JAR that loaded it
4038
private[util] def getClassReader(cls: Class[_]): ClassReader = {
4139
// Copy data over, before delegating to ClassReader - else we can run out of open file handles.
@@ -168,9 +166,6 @@ private[spark] object ClosureCleaner extends Logging {
168166
* @param closure the closure to check.
169167
*/
170168
private def getSerializedLambda(closure: AnyRef): Option[SerializedLambda] = {
171-
if (isScala2_11) {
172-
return None
173-
}
174169
val isClosureCandidate =
175170
closure.getClass.isSynthetic &&
176171
closure

core/src/main/scala/org/apache/spark/util/ThreadUtils.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import scala.language.higherKinds
2626
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
2727
import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor, Future}
2828
import scala.concurrent.duration.{Duration, FiniteDuration}
29-
import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
3029
import scala.util.control.NonFatal
3130

3231
import org.apache.spark.SparkException
@@ -181,17 +180,17 @@ private[spark] object ThreadUtils {
181180
}
182181

183182
/**
184-
* Construct a new Scala ForkJoinPool with a specified max parallelism and name prefix.
183+
* Construct a new ForkJoinPool with a specified max parallelism and name prefix.
185184
*/
186-
def newForkJoinPool(prefix: String, maxThreadNumber: Int): SForkJoinPool = {
185+
def newForkJoinPool(prefix: String, maxThreadNumber: Int): ForkJoinPool = {
187186
// Custom factory to set thread names
188-
val factory = new SForkJoinPool.ForkJoinWorkerThreadFactory {
189-
override def newThread(pool: SForkJoinPool) =
190-
new SForkJoinWorkerThread(pool) {
187+
val factory = new ForkJoinPool.ForkJoinWorkerThreadFactory {
188+
override def newThread(pool: ForkJoinPool) =
189+
new ForkJoinWorkerThread(pool) {
191190
setName(prefix + "-" + super.getName)
192191
}
193192
}
194-
new SForkJoinPool(maxThreadNumber, factory,
193+
new ForkJoinPool(maxThreadNumber, factory,
195194
null, // handler
196195
false // asyncMode
197196
)

core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
9494

9595
test("add dependencies works correctly") {
9696
val md = SparkSubmitUtils.getModuleDescriptor
97-
val artifacts = SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.11:0.1," +
98-
"com.databricks:spark-avro_2.11:0.1")
97+
val artifacts = SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.12:0.1," +
98+
"com.databricks:spark-avro_2.12:0.1")
9999

100100
SparkSubmitUtils.addDependenciesToIvy(md, artifacts, "default")
101101
assert(md.getDependencies.length === 2)
@@ -189,15 +189,15 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
189189

190190
test("neglects Spark and Spark's dependencies") {
191191
val coordinates = SparkSubmitUtils.IVY_DEFAULT_EXCLUDES
192-
.map(comp => s"org.apache.spark:spark-${comp}2.11:2.1.1")
192+
.map(comp => s"org.apache.spark:spark-${comp}2.12:2.4.0")
193193
.mkString(",") + ",org.apache.spark:spark-core_fake:1.2.0"
194194

195195
val path = SparkSubmitUtils.resolveMavenCoordinates(
196196
coordinates,
197197
SparkSubmitUtils.buildIvySettings(None, None),
198198
isTest = true)
199199
assert(path === "", "should return empty path")
200-
val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.11", "1.2.0")
200+
val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.12", "1.2.0")
201201
IvyTestUtils.withRepository(main, None, None) { repo =>
202202
val files = SparkSubmitUtils.resolveMavenCoordinates(
203203
coordinates + "," + main.toString,

0 commit comments

Comments
 (0)