Skip to content

Commit 4ec7f63

Browse files
committed
[SPARK-27404][CORE][SQL][STREAMING][YARN] Fix build warnings for 3.0: postfixOps edition
## What changes were proposed in this pull request? Fix build warnings -- see some details below. But mostly, remove use of postfix syntax where it causes warnings without the `scala.language.postfixOps` import. This is mostly in expressions like "120000 milliseconds". Which, I'd like to simplify to things like "2.minutes" anyway. ## How was this patch tested? Existing tests. Closes apache#24314 from srowen/SPARK-27404. Authored-by: Sean Owen <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 43da473 commit 4ec7f63

File tree

71 files changed

+458
-459
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+458
-459
lines changed

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@
4646
*/
4747
public class RetryingBlockFetcherSuite {
4848

49-
ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
50-
ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
51-
ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
49+
private final ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
50+
private final ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
51+
private final ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
5252

5353
@Test
5454
public void testNoFailures() throws IOException, InterruptedException {
@@ -291,7 +291,7 @@ private static void performInteractions(List<? extends Map<String, Object>> inte
291291
}
292292

293293
assertNotNull(stub);
294-
stub.when(fetchStarter).createAndStart(any(), anyObject());
294+
stub.when(fetchStarter).createAndStart(any(), any());
295295
String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
296296
new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start();
297297
}

core/src/main/scala/org/apache/spark/BarrierTaskContext.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.spark
2020
import java.util.{Properties, Timer, TimerTask}
2121

2222
import scala.concurrent.duration._
23-
import scala.language.postfixOps
2423

2524
import org.apache.spark.annotation.{Experimental, Since}
2625
import org.apache.spark.executor.TaskMetrics
@@ -122,7 +121,7 @@ class BarrierTaskContext private[spark] (
122121
barrierEpoch),
123122
// Set a fixed timeout for RPC here, so users shall get a SparkException thrown by
124123
// BarrierCoordinator on timeout, instead of RPCTimeoutException from the RPC framework.
125-
timeout = new RpcTimeout(31536000 /* = 3600 * 24 * 365 */ seconds, "barrierTimeout"))
124+
timeout = new RpcTimeout(365.days, "barrierTimeout"))
126125
barrierEpoch += 1
127126
logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) finished " +
128127
"global sync successfully, waited for " +

core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import scala.collection.mutable.ListBuffer
2626
import scala.concurrent.{Future, Promise}
2727
import scala.concurrent.ExecutionContext.Implicits.global
2828
import scala.concurrent.duration._
29-
import scala.language.postfixOps
3029
import scala.sys.process._
3130

3231
import org.json4s._
@@ -112,7 +111,7 @@ private object FaultToleranceTest extends App with Logging {
112111
assertValidClusterState()
113112

114113
killLeader()
115-
delay(30 seconds)
114+
delay(30.seconds)
116115
assertValidClusterState()
117116
createClient()
118117
assertValidClusterState()
@@ -126,12 +125,12 @@ private object FaultToleranceTest extends App with Logging {
126125

127126
killLeader()
128127
addMasters(1)
129-
delay(30 seconds)
128+
delay(30.seconds)
130129
assertValidClusterState()
131130

132131
killLeader()
133132
addMasters(1)
134-
delay(30 seconds)
133+
delay(30.seconds)
135134
assertValidClusterState()
136135
}
137136

@@ -156,7 +155,7 @@ private object FaultToleranceTest extends App with Logging {
156155
killLeader()
157156
workers.foreach(_.kill())
158157
workers.clear()
159-
delay(30 seconds)
158+
delay(30.seconds)
160159
addWorkers(2)
161160
assertValidClusterState()
162161
}
@@ -174,7 +173,7 @@ private object FaultToleranceTest extends App with Logging {
174173

175174
(1 to 3).foreach { _ =>
176175
killLeader()
177-
delay(30 seconds)
176+
delay(30.seconds)
178177
assertValidClusterState()
179178
assertTrue(getLeader == masters.head)
180179
addMasters(1)
@@ -264,7 +263,7 @@ private object FaultToleranceTest extends App with Logging {
264263
}
265264

266265
// Avoid waiting indefinitely (e.g., we could register but get no executors).
267-
assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
266+
assertTrue(ThreadUtils.awaitResult(f, 2.minutes))
268267
}
269268

270269
/**
@@ -317,7 +316,7 @@ private object FaultToleranceTest extends App with Logging {
317316
}
318317

319318
try {
320-
assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
319+
assertTrue(ThreadUtils.awaitResult(f, 2.minutes))
321320
} catch {
322321
case e: TimeoutException =>
323322
logError("Master states: " + masters.map(_.state))
@@ -421,7 +420,7 @@ private object SparkDocker {
421420
}
422421

423422
dockerCmd.run(ProcessLogger(findIpAndLog _))
424-
val ip = ThreadUtils.awaitResult(ipPromise.future, 30 seconds)
423+
val ip = ThreadUtils.awaitResult(ipPromise.future, 30.seconds)
425424
val dockerId = Docker.getLastProcessId
426425
(ip, dockerId, outFile)
427426
}

core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: S
5252
*
5353
* @note This can be used in the recover callback of a Future to add to a TimeoutException
5454
* Example:
55-
* val timeout = new RpcTimeout(5 millis, "short timeout")
55+
* val timeout = new RpcTimeout(5.milliseconds, "short timeout")
5656
* Future(throw new TimeoutException).recover(timeout.addMessageIfTimeout)
5757
*/
5858
def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = {

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import scala.collection.Map
2727
import scala.collection.mutable.{ArrayStack, HashMap, HashSet}
2828
import scala.concurrent.duration._
2929
import scala.language.existentials
30-
import scala.language.postfixOps
3130
import scala.util.control.NonFatal
3231

3332
import org.apache.commons.lang3.SerializationUtils
@@ -270,7 +269,7 @@ private[spark] class DAGScheduler(
270269
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates,
271270
Some(executorUpdates)))
272271
blockManagerMaster.driverEndpoint.askSync[Boolean](
273-
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
272+
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(10.minutes, "BlockManagerHeartbeat"))
274273
}
275274

276275
/**

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,12 @@ private[spark] object UIUtils extends Logging {
109109
}
110110
}
111111
// if time is more than a year
112-
return s"$yearString $weekString $dayString"
112+
s"$yearString $weekString $dayString"
113113
} catch {
114114
case e: Exception =>
115115
logError("Error converting time to string", e)
116116
// if there is some error, return blank string
117-
return ""
117+
""
118118
}
119119
}
120120

@@ -336,7 +336,7 @@ private[spark] object UIUtils extends Logging {
336336
def getHeaderContent(header: String): Seq[Node] = {
337337
if (newlinesInHeader) {
338338
<ul class="unstyled">
339-
{ header.split("\n").map { case t => <li> {t} </li> } }
339+
{ header.split("\n").map(t => <li> {t} </li>) }
340340
</ul>
341341
} else {
342342
Text(header)
@@ -446,7 +446,7 @@ private[spark] object UIUtils extends Logging {
446446
* the whole string will rendered as a simple escaped text.
447447
*
448448
* Note: In terms of security, only anchor tags with root relative links are supported. So any
449-
* attempts to embed links outside Spark UI, or other tags like {@code <script>} will cause in
449+
* attempts to embed links outside Spark UI, or other tags like &lt;script&gt; will cause in
450450
* the whole description to be treated as plain text.
451451
*
452452
* @param desc the original job or stage description string, which may contain html tags.
@@ -458,7 +458,6 @@ private[spark] object UIUtils extends Logging {
458458
* is true, and an Elem otherwise.
459459
*/
460460
def makeDescription(desc: String, basePathUri: String, plainText: Boolean = false): NodeSeq = {
461-
import scala.language.postfixOps
462461

463462
// If the description can be parsed as HTML and has only relative links, then render
464463
// as HTML, otherwise render as escaped string
@@ -468,9 +467,7 @@ private[spark] object UIUtils extends Logging {
468467

469468
// Verify that this has only anchors and span (we are wrapping in span)
470469
val allowedNodeLabels = Set("a", "span", "br")
471-
val illegalNodes = xml \\ "_" filterNot { case node: Node =>
472-
allowedNodeLabels.contains(node.label)
473-
}
470+
val illegalNodes = (xml \\ "_").filterNot(node => allowedNodeLabels.contains(node.label))
474471
if (illegalNodes.nonEmpty) {
475472
throw new IllegalArgumentException(
476473
"Only HTML anchors allowed in job descriptions\n" +
@@ -491,8 +488,8 @@ private[spark] object UIUtils extends Logging {
491488
new RewriteRule() {
492489
override def transform(n: Node): Seq[Node] = {
493490
n match {
494-
case e: Elem if e.child isEmpty => Text(e.text)
495-
case e: Elem if e.child nonEmpty => Text(e.child.flatMap(transform).text)
491+
case e: Elem if e.child.isEmpty => Text(e.text)
492+
case e: Elem => Text(e.child.flatMap(transform).text)
496493
case _ => n
497494
}
498495
}
@@ -503,7 +500,7 @@ private[spark] object UIUtils extends Logging {
503500
new RewriteRule() {
504501
override def transform(n: Node): Seq[Node] = {
505502
n match {
506-
case e: Elem if e \ "@href" nonEmpty =>
503+
case e: Elem if (e \ "@href").nonEmpty =>
507504
val relativePath = e.attribute("href").get.toString
508505
val fullUri = s"${basePathUri.stripSuffix("/")}/${relativePath.stripPrefix("/")}"
509506
e % Attribute(null, "href", fullUri, Null)

core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark
1919

2020
import scala.concurrent.duration._
21-
import scala.language.postfixOps
2221

2322
import org.apache.spark.internal.config._
2423
import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
@@ -52,7 +51,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
5251
)
5352

5453
val error = intercept[SparkException] {
55-
ThreadUtils.awaitResult(futureAction, 5 seconds)
54+
ThreadUtils.awaitResult(futureAction, 5.seconds)
5655
}.getCause.getMessage
5756
assert(error.contains(message))
5857
}

core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import org.apache.spark.storage._
4343
abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[SortShuffleManager])
4444
extends SparkFunSuite with BeforeAndAfter with LocalSparkContext
4545
{
46-
implicit val defaultTimeout = timeout(10000 millis)
46+
implicit val defaultTimeout = timeout(10.seconds)
4747
val conf = new SparkConf()
4848
.setMaster("local[2]")
4949
.setAppName("ContextCleanerSuite")
@@ -159,7 +159,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
159159
val preGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id))
160160
runGC()
161161
intercept[Exception] {
162-
preGCTester.assertCleanup()(timeout(1000 millis))
162+
preGCTester.assertCleanup()(timeout(1.second))
163163
}
164164

165165
// Test that GC causes RDD cleanup after dereferencing the RDD
@@ -178,7 +178,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
178178
val preGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
179179
runGC()
180180
intercept[Exception] {
181-
preGCTester.assertCleanup()(timeout(1000 millis))
181+
preGCTester.assertCleanup()(timeout(1.second))
182182
}
183183
rdd.count() // Defeat early collection by the JVM
184184

@@ -196,7 +196,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
196196
val preGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
197197
runGC()
198198
intercept[Exception] {
199-
preGCTester.assertCleanup()(timeout(1000 millis))
199+
preGCTester.assertCleanup()(timeout(1.second))
200200
}
201201

202202
// Test that GC causes broadcast cleanup after dereferencing the broadcast variable
@@ -272,7 +272,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
272272
val preGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id))
273273
runGC()
274274
intercept[Exception] {
275-
preGCTester.assertCleanup()(timeout(1000 millis))
275+
preGCTester.assertCleanup()(timeout(1.second))
276276
}
277277

278278
// Test that RDD going out of scope does cause the checkpoint blocks to be cleaned up
@@ -294,7 +294,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
294294
val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
295295
runGC()
296296
intercept[Exception] {
297-
preGCTester.assertCleanup()(timeout(1000 millis))
297+
preGCTester.assertCleanup()(timeout(1.second))
298298
}
299299

300300
// Test that GC triggers the cleanup of all variables after the dereferencing them
@@ -334,7 +334,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
334334
val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
335335
runGC()
336336
intercept[Exception] {
337-
preGCTester.assertCleanup()(timeout(1000 millis))
337+
preGCTester.assertCleanup()(timeout(1.second))
338338
}
339339

340340
// Test that GC triggers the cleanup of all variables after the dereferencing them
@@ -408,7 +408,7 @@ class CleanerTester(
408408
/** Assert that all the stuff has been cleaned up */
409409
def assertCleanup()(implicit waitTimeout: PatienceConfiguration.Timeout) {
410410
try {
411-
eventually(waitTimeout, interval(100 millis)) {
411+
eventually(waitTimeout, interval(100.milliseconds)) {
412412
assert(isAllCleanedUp,
413413
"The following resources were not cleaned up:\n" + uncleanedResourcesToString)
414414
}

core/src/test/scala/org/apache/spark/DriverSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ class DriverSuite extends SparkFunSuite with TimeLimits {
3333
ignore("driver should exit after finishing without cleanup (SPARK-530)") {
3434
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
3535
val masters = Table("master", "local", "local-cluster[2,1,1024]")
36-
forAll(masters) { (master: String) =>
36+
forAll(masters) { master =>
3737
val process = Utils.executeCommand(
3838
Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
3939
new File(sparkHome),
4040
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
41-
failAfter(60 seconds) { process.waitFor() }
41+
failAfter(1.minute) { process.waitFor() }
4242
// Ensure we still kill the process in case it timed out
4343
process.destroy()
4444
}

core/src/test/scala/org/apache/spark/JobCancellationSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
250250
assert(e.getMessage contains "cancel")
251251

252252
// Once A is cancelled, job B should finish fairly quickly.
253-
assert(ThreadUtils.awaitResult(jobB, 60.seconds) === 100)
253+
assert(ThreadUtils.awaitResult(jobB, 1.minute) === 100)
254254
}
255255

256256
test("task reaper will not kill JVM if spark.task.killTimeout == -1") {
@@ -290,7 +290,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
290290
assert(e.getMessage contains "cancel")
291291

292292
// Once A is cancelled, job B should finish fairly quickly.
293-
assert(ThreadUtils.awaitResult(jobB, 60.seconds) === 100)
293+
assert(ThreadUtils.awaitResult(jobB, 1.minute) === 100)
294294
}
295295

296296
test("two jobs sharing the same stage") {

0 commit comments

Comments
 (0)