Skip to content

Commit d4420b4

Browse files
srowendongjoon-hyun
authored andcommitted
[SPARK-27323][CORE][SQL][STREAMING] Use Single-Abstract-Method support in Scala 2.12 to simplify code
## What changes were proposed in this pull request? Use Single Abstract Method syntax where possible (and minor related cleanup). Comments below. No logic should change here. ## How was this patch tested? Existing tests. Closes apache#24241 from srowen/SPARK-27323. Authored-by: Sean Owen <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent d575a45 commit d4420b4

File tree

78 files changed

+543
-849
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+543
-849
lines changed

core/src/main/scala/org/apache/spark/BarrierCoordinator.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark
1919

2020
import java.util.{Timer, TimerTask}
2121
import java.util.concurrent.ConcurrentHashMap
22-
import java.util.function.{Consumer, Function}
22+
import java.util.function.Consumer
2323

2424
import scala.collection.mutable.ArrayBuffer
2525

@@ -202,10 +202,8 @@ private[spark] class BarrierCoordinator(
202202
case request @ RequestToSync(numTasks, stageId, stageAttemptId, _, _) =>
203203
// Get or init the ContextBarrierState correspond to the stage attempt.
204204
val barrierId = ContextBarrierId(stageId, stageAttemptId)
205-
states.computeIfAbsent(barrierId, new Function[ContextBarrierId, ContextBarrierState] {
206-
override def apply(key: ContextBarrierId): ContextBarrierState =
207-
new ContextBarrierState(key, numTasks)
208-
})
205+
states.computeIfAbsent(barrierId,
206+
(key: ContextBarrierId) => new ContextBarrierState(key, numTasks))
209207
val barrierState = states.get(barrierId)
210208

211209
barrierState.handleRequest(context, request)

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
123123
cleaningThread.setDaemon(true)
124124
cleaningThread.setName("Spark Context Cleaner")
125125
cleaningThread.start()
126-
periodicGCService.scheduleAtFixedRate(new Runnable {
127-
override def run(): Unit = System.gc()
128-
}, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
126+
periodicGCService.scheduleAtFixedRate(() => System.gc(),
127+
periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
129128
}
130129

131130
/**

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
9898
private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")
9999

100100
override def onStart(): Unit = {
101-
timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(new Runnable {
102-
override def run(): Unit = Utils.tryLogNonFatalError {
103-
Option(self).foreach(_.ask[Boolean](ExpireDeadHosts))
104-
}
105-
}, 0, checkTimeoutIntervalMs, TimeUnit.MILLISECONDS)
101+
timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(
102+
() => Utils.tryLogNonFatalError { Option(self).foreach(_.ask[Boolean](ExpireDeadHosts)) },
103+
0, checkTimeoutIntervalMs, TimeUnit.MILLISECONDS)
106104
}
107105

108106
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
6262

6363
@transient private lazy val reader: ConfigReader = {
6464
val _reader = new ConfigReader(new SparkConfigProvider(settings))
65-
_reader.bindEnv(new ConfigProvider {
66-
override def get(key: String): Option[String] = Option(getenv(key))
67-
})
65+
_reader.bindEnv((key: String) => Option(getenv(key)))
6866
_reader
6967
}
7068

@@ -392,7 +390,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
392390

393391
/** Get an optional value, applying variable substitution. */
394392
private[spark] def getWithSubstitution(key: String): Option[String] = {
395-
getOption(key).map(reader.substitute(_))
393+
getOption(key).map(reader.substitute)
396394
}
397395

398396
/** Get all parameters as a list of pairs */

core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,7 @@ object PythonRunner {
6060
.javaAddress(localhost)
6161
.callbackClient(py4j.GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret)
6262
.build()
63-
val thread = new Thread(new Runnable() {
64-
override def run(): Unit = Utils.logUncaughtExceptions {
65-
gatewayServer.start()
66-
}
67-
})
63+
val thread = new Thread(() => Utils.logUncaughtExceptions { gatewayServer.start() })
6864
thread.setName("py4j-gateway-init")
6965
thread.setDaemon(true)
7066
thread.start()

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.deploy
2020
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException}
2121
import java.security.PrivilegedExceptionAction
2222
import java.text.DateFormat
23-
import java.util.{Arrays, Comparator, Date, Locale}
23+
import java.util.{Arrays, Date, Locale}
2424

2525
import scala.collection.JavaConverters._
2626
import scala.collection.immutable.Map
@@ -270,11 +270,8 @@ private[spark] class SparkHadoopUtil extends Logging {
270270
name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
271271
}
272272
})
273-
Arrays.sort(fileStatuses, new Comparator[FileStatus] {
274-
override def compare(o1: FileStatus, o2: FileStatus): Int = {
275-
Longs.compare(o1.getModificationTime, o2.getModificationTime)
276-
}
277-
})
273+
Arrays.sort(fileStatuses, (o1: FileStatus, o2: FileStatus) =>
274+
Longs.compare(o1.getModificationTime, o2.getModificationTime))
278275
fileStatuses
279276
} catch {
280277
case NonFatal(e) =>
@@ -465,7 +462,7 @@ private[spark] object SparkHadoopUtil {
465462
// scalastyle:on line.size.limit
466463
def createNonECFile(fs: FileSystem, path: Path): FSDataOutputStream = {
467464
try {
468-
// Use reflection as this uses apis only avialable in hadoop 3
465+
// Use reflection as this uses APIs only available in Hadoop 3
469466
val builderMethod = fs.getClass().getMethod("createFile", classOf[Path])
470467
// the builder api does not resolve relative paths, nor does it create parent dirs, while
471468
// the old api does.

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -186,13 +186,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
186186
* Return a runnable that performs the given operation on the event logs.
187187
* This operation is expected to be executed periodically.
188188
*/
189-
private def getRunner(operateFun: () => Unit): Runnable = {
190-
new Runnable() {
191-
override def run(): Unit = Utils.tryOrExit {
192-
operateFun()
193-
}
194-
}
195-
}
189+
private def getRunner(operateFun: () => Unit): Runnable =
190+
() => Utils.tryOrExit { operateFun() }
196191

197192
/**
198193
* Fixed size thread pool to fetch and parse log files.
@@ -221,29 +216,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
221216
// Cannot probe anything while the FS is in safe mode, so spawn a new thread that will wait
222217
// for the FS to leave safe mode before enabling polling. This allows the main history server
223218
// UI to be shown (so that the user can see the HDFS status).
224-
val initThread = new Thread(new Runnable() {
225-
override def run(): Unit = {
226-
try {
227-
while (isFsInSafeMode()) {
228-
logInfo("HDFS is still in safe mode. Waiting...")
229-
val deadline = clock.getTimeMillis() +
230-
TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S)
231-
clock.waitTillTime(deadline)
232-
}
233-
startPolling()
234-
} catch {
235-
case _: InterruptedException =>
219+
val initThread = new Thread(() => {
220+
try {
221+
while (isFsInSafeMode()) {
222+
logInfo("HDFS is still in safe mode. Waiting...")
223+
val deadline = clock.getTimeMillis() +
224+
TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S)
225+
clock.waitTillTime(deadline)
236226
}
227+
startPolling()
228+
} catch {
229+
case _: InterruptedException =>
237230
}
238231
})
239232
initThread.setDaemon(true)
240233
initThread.setName(s"${getClass().getSimpleName()}-init")
241234
initThread.setUncaughtExceptionHandler(errorHandler.getOrElse(
242-
new Thread.UncaughtExceptionHandler() {
243-
override def uncaughtException(t: Thread, e: Throwable): Unit = {
244-
logError("Error initializing FsHistoryProvider.", e)
245-
System.exit(1)
246-
}
235+
(_: Thread, e: Throwable) => {
236+
logError("Error initializing FsHistoryProvider.", e)
237+
System.exit(1)
247238
}))
248239
initThread.start()
249240
initThread
@@ -517,9 +508,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
517508

518509
val tasks = updated.flatMap { entry =>
519510
try {
520-
val task: Future[Unit] = replayExecutor.submit(new Runnable {
521-
override def run(): Unit = mergeApplicationListing(entry, newLastScanTime, true)
522-
}, Unit)
511+
val task: Future[Unit] = replayExecutor.submit(
512+
() => mergeApplicationListing(entry, newLastScanTime, true))
523513
Some(task -> entry.getPath)
524514
} catch {
525515
// let the iteration over the updated entries break, since an exception on

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,9 @@ private[deploy] class Master(
150150
logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " +
151151
s"Applications UIs are available at $masterWebUiUrl")
152152
}
153-
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
154-
override def run(): Unit = Utils.tryLogNonFatalError {
155-
self.send(CheckForWorkerTimeOut)
156-
}
157-
}, 0, workerTimeoutMs, TimeUnit.MILLISECONDS)
153+
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(
154+
() => Utils.tryLogNonFatalError { self.send(CheckForWorkerTimeOut) },
155+
0, workerTimeoutMs, TimeUnit.MILLISECONDS)
158156

159157
if (restServerEnabled) {
160158
val port = conf.get(MASTER_REST_SERVER_PORT)

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -325,11 +325,9 @@ private[deploy] class Worker(
325325
if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
326326
registrationRetryTimer.foreach(_.cancel(true))
327327
registrationRetryTimer = Some(
328-
forwardMessageScheduler.scheduleAtFixedRate(new Runnable {
329-
override def run(): Unit = Utils.tryLogNonFatalError {
330-
self.send(ReregisterWithMaster)
331-
}
332-
}, PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
328+
forwardMessageScheduler.scheduleAtFixedRate(
329+
() => Utils.tryLogNonFatalError { self.send(ReregisterWithMaster) },
330+
PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
333331
PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
334332
TimeUnit.SECONDS))
335333
}
@@ -341,7 +339,7 @@ private[deploy] class Worker(
341339
}
342340

343341
/**
344-
* Cancel last registeration retry, or do nothing if no retry
342+
* Cancel last registration retry, or do nothing if no retry
345343
*/
346344
private def cancelLastRegistrationRetry(): Unit = {
347345
if (registerMasterFutures != null) {
@@ -361,11 +359,7 @@ private[deploy] class Worker(
361359
registerMasterFutures = tryRegisterAllMasters()
362360
connectionAttemptCount = 0
363361
registrationRetryTimer = Some(forwardMessageScheduler.scheduleAtFixedRate(
364-
new Runnable {
365-
override def run(): Unit = Utils.tryLogNonFatalError {
366-
Option(self).foreach(_.send(ReregisterWithMaster))
367-
}
368-
},
362+
() => Utils.tryLogNonFatalError { Option(self).foreach(_.send(ReregisterWithMaster)) },
369363
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
370364
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
371365
TimeUnit.SECONDS))
@@ -407,19 +401,15 @@ private[deploy] class Worker(
407401
}
408402
registered = true
409403
changeMaster(masterRef, masterWebUiUrl, masterAddress)
410-
forwardMessageScheduler.scheduleAtFixedRate(new Runnable {
411-
override def run(): Unit = Utils.tryLogNonFatalError {
412-
self.send(SendHeartbeat)
413-
}
414-
}, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
404+
forwardMessageScheduler.scheduleAtFixedRate(
405+
() => Utils.tryLogNonFatalError { self.send(SendHeartbeat) },
406+
0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
415407
if (CLEANUP_ENABLED) {
416408
logInfo(
417409
s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
418-
forwardMessageScheduler.scheduleAtFixedRate(new Runnable {
419-
override def run(): Unit = Utils.tryLogNonFatalError {
420-
self.send(WorkDirCleanup)
421-
}
422-
}, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
410+
forwardMessageScheduler.scheduleAtFixedRate(
411+
() => Utils.tryLogNonFatalError { self.send(WorkDirCleanup) },
412+
CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
423413
}
424414

425415
val execs = executors.values.map { e =>
@@ -568,7 +558,7 @@ private[deploy] class Worker(
568558
}
569559
}
570560

571-
case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
561+
case executorStateChanged: ExecutorStateChanged =>
572562
handleExecutorStateChanged(executorStateChanged)
573563

574564
case KillExecutor(masterUrl, appId, execId) =>
@@ -632,7 +622,7 @@ private[deploy] class Worker(
632622

633623
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
634624
if (master.exists(_.address == remoteAddress) ||
635-
masterAddressToConnect.exists(_ == remoteAddress)) {
625+
masterAddressToConnect.contains(remoteAddress)) {
636626
logInfo(s"$remoteAddress Disassociated !")
637627
masterDisconnected()
638628
}
@@ -815,7 +805,7 @@ private[deploy] object Worker extends Logging {
815805
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
816806
val securityMgr = new SecurityManager(conf)
817807
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
818-
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
808+
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL)
819809
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
820810
masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
821811
rpcEnv

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,17 +89,14 @@ private[spark] class Executor(
8989
}
9090

9191
// Start worker thread pool
92+
// Use UninterruptibleThread to run tasks so that we can allow running codes without being
93+
// interrupted by `Thread.interrupt()`. Some issues, such as KAFKA-1894, HADOOP-10622,
94+
// will hang forever if some methods are interrupted.
9295
private val threadPool = {
9396
val threadFactory = new ThreadFactoryBuilder()
9497
.setDaemon(true)
9598
.setNameFormat("Executor task launch worker-%d")
96-
.setThreadFactory(new ThreadFactory {
97-
override def newThread(r: Runnable): Thread =
98-
// Use UninterruptibleThread to run tasks so that we can allow running codes without being
99-
// interrupted by `Thread.interrupt()`. Some issues, such as KAFKA-1894, HADOOP-10622,
100-
// will hang forever if some methods are interrupted.
101-
new UninterruptibleThread(r, "unused") // thread name will be set by ThreadFactoryBuilder
102-
})
99+
.setThreadFactory((r: Runnable) => new UninterruptibleThread(r, "unused"))
103100
.build()
104101
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
105102
}

0 commit comments

Comments
 (0)