Skip to content

Commit 0c0fd94

Browse files
committed
[SPARK-53247][CORE][SQL][MLLIB][TESTS] Use createArray for large test array creations
### What changes were proposed in this pull request? This PR aims to use `createArray` for simple large test array creations like the following. ```scala - Array.fill[Long](4000000)(0) + createArray(4000000, 0L) ``` ```scala scala> spark.time(Array.fill[Long](4000000)(0).size) Time taken: 19 ms val res0: Int = 4000000 scala> spark.time(org.apache.spark.util.collection.Utils.createArray(4000000, 0L).size) Time taken: 6 ms val res1: Int = 4000000 ``` For example, the following test is repeated three times by `OrcV1QuerySuite`, `OrcV2QuerySuite`, and `HiveOrcQuerySuite`. In addition, the test case itself repeats `Array.fill[Byte](5 * 1024 * 1024)('X')` 2048 (=2 x 1024) times. So, in total, this PR improves the array creation **6144 times** just via this single instance change. https://github.com/apache/spark/blob/8d5e60279b10afcaad96abafb11c1c9950029b3d/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala#L866-L877 ### Why are the changes needed? To reduce the test time for [SPARK-48094 Reduce GitHub Action usage according to ASF project allowance](https://issues.apache.org/jira/browse/SPARK-48094). > - The average number of minutes a project uses per calendar week MUST NOT exceed the equivalent of 25 full-time runners (250,000 minutes, or 4,200 hours). - https://infra-reports.apache.org/#ghactions&project=spark&hours=168 <img width="1137" height="496" alt="Screenshot 2025-08-11 at 12 07 29" src="https://github.com/user-attachments/assets/fbb8b500-e1b2-4fa6-9e31-4d5decd51ebb" /> ### Does this PR introduce _any_ user-facing change? No, this is test only change. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51976 from dongjoon-hyun/SPARK-53247. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 66ff752 commit 0c0fd94

File tree

20 files changed

+102
-76
lines changed

20 files changed

+102
-76
lines changed

core/src/test/scala/org/apache/spark/FileSuite.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
3939
import org.apache.spark.serializer.KryoSerializer
4040
import org.apache.spark.storage.StorageLevel
4141
import org.apache.spark.util.Utils
42+
import org.apache.spark.util.collection.Utils.createArray
4243

4344
class FileSuite extends SparkFunSuite with LocalSparkContext {
4445
var tempDir: File = _
@@ -86,11 +87,12 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
8687

8788
val normalFile = new File(normalDir, "part-00000")
8889
val normalContent = sc.textFile(normalDir).collect()
89-
assert(normalContent === Array.fill(10000)("a"))
90+
val expected = createArray(10000, "a")
91+
assert(normalContent === expected)
9092

9193
val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension)
9294
val compressedContent = sc.textFile(compressedOutputDir).collect()
93-
assert(compressedContent === Array.fill(10000)("a"))
95+
assert(compressedContent === expected)
9496

9597
assert(compressedFile.length < normalFile.length)
9698
}
@@ -125,11 +127,12 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
125127

126128
val normalFile = new File(normalDir, "part-00000")
127129
val normalContent = sc.sequenceFile[String, String](normalDir).collect()
128-
assert(normalContent === Array.fill(100)(("abc", "abc")))
130+
val expected = createArray(100, ("abc", "abc"))
131+
assert(normalContent === expected)
129132

130133
val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension)
131134
val compressedContent = sc.sequenceFile[String, String](compressedOutputDir).collect()
132-
assert(compressedContent === Array.fill(100)(("abc", "abc")))
135+
assert(compressedContent === expected)
133136

134137
assert(compressedFile.length < normalFile.length)
135138
}

core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpoint, RpcEndpoin
3838
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus, MapStatus, MergeStatus}
3939
import org.apache.spark.shuffle.FetchFailedException
4040
import org.apache.spark.storage.{BlockManagerId, BlockManagerMasterEndpoint, ShuffleBlockId, ShuffleMergedBlockId}
41+
import org.apache.spark.util.collection.Utils.createArray
4142

4243
class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
4344
private val conf = new SparkConf
@@ -193,7 +194,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
193194
// Message size should be ~123B, and no exception should be thrown
194195
masterTracker.registerShuffle(10, 1, MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES)
195196
masterTracker.registerMapOutput(10, 0, MapStatus(
196-
BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0), 5))
197+
BlockManagerId("88", "mph", 1000), createArray(10, 0L), 5))
197198
val senderAddress = RpcAddress("localhost", 12345)
198199
val rpcCallContext = mock(classOf[RpcCallContext])
199200
when(rpcCallContext.senderAddress).thenReturn(senderAddress)
@@ -271,7 +272,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
271272
masterTracker.registerShuffle(20, 100, MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES)
272273
(0 until 100).foreach { i =>
273274
masterTracker.registerMapOutput(20, i, new CompressedMapStatus(
274-
BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0), 5))
275+
BlockManagerId("999", "mps", 1000), createArray(4000000, 0L), 5))
275276
}
276277
val senderAddress = RpcAddress("localhost", 12345)
277278
val rpcCallContext = mock(classOf[RpcCallContext])
@@ -578,7 +579,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
578579
masterTracker.registerShuffle(20, 100, MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES)
579580
(0 until 100).foreach { i =>
580581
masterTracker.registerMapOutput(20, i, new CompressedMapStatus(
581-
BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0), 5))
582+
BlockManagerId("999", "mps", 1000), createArray(4000000, 0L), 5))
582583
}
583584

584585
val mapWorkerRpcEnv = createRpcEnv("spark-worker", "localhost", 0, new SecurityManager(conf))
@@ -625,7 +626,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
625626
masterTracker.registerShuffle(20, 100, MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES)
626627
(0 until 100).foreach { i =>
627628
masterTracker.registerMapOutput(20, i, new CompressedMapStatus(
628-
BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0), 5))
629+
BlockManagerId("999", "mps", 1000), createArray(4000000, 0L), 5))
629630
}
630631
masterTracker.registerMergeResult(20, 0, MergeStatus(BlockManagerId("999", "mps", 1000), 0,
631632
bitmap1, 1000L))

core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.{SharedSparkContext, SparkFunSuite}
2424
import org.apache.spark.internal.config
2525
import org.apache.spark.network.util.ByteArrayWritableChannel
2626
import org.apache.spark.util.Utils
27+
import org.apache.spark.util.collection.Utils.createArray
2728
import org.apache.spark.util.io.ChunkedByteBuffer
2829

2930
class ChunkedByteBufferSuite extends SparkFunSuite with SharedSparkContext {
@@ -128,7 +129,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite with SharedSparkContext {
128129
test("toArray() throws UnsupportedOperationException if size exceeds 2GB") {
129130
val fourMegabyteBuffer = ByteBuffer.allocate(1024 * 1024 * 4)
130131
fourMegabyteBuffer.limit(fourMegabyteBuffer.capacity())
131-
val chunkedByteBuffer = new ChunkedByteBuffer(Array.fill(1024)(fourMegabyteBuffer))
132+
val chunkedByteBuffer = new ChunkedByteBuffer(createArray(1024, fourMegabyteBuffer))
132133
assert(chunkedByteBuffer.size === (1024L * 1024L * 1024L * 4L))
133134
intercept[UnsupportedOperationException] {
134135
chunkedByteBuffer.toArray

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.apache.spark.rdd.RDDSuiteUtils._
3838
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
3939
import org.apache.spark.util.{ThreadUtils, Utils}
4040
import org.apache.spark.util.ArrayImplicits._
41+
import org.apache.spark.util.collection.Utils.createArray
4142

4243
class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually {
4344
var tempDir: File = _
@@ -365,7 +366,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually {
365366

366367
test("repartitioned RDDs perform load balancing") {
367368
// Coalesce partitions
368-
val input = Array.fill(1000)(1)
369+
val input = createArray(1000, 1)
369370
val initialPartitions = 10
370371
val data = sc.parallelize(input.toImmutableArraySeq, initialPartitions)
371372

@@ -393,9 +394,10 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually {
393394
}
394395
}
395396

396-
testSplitPartitions(Array.fill(100)(1).toImmutableArraySeq, 10, 20)
397-
testSplitPartitions((Array.fill(10000)(1) ++ Array.fill(10000)(2)).toImmutableArraySeq, 20, 100)
398-
testSplitPartitions(Array.fill(1000)(1).toImmutableArraySeq, 250, 128)
397+
testSplitPartitions(createArray(100, 1).toImmutableArraySeq, 10, 20)
398+
testSplitPartitions(
399+
(createArray(10000, 1) ++ createArray(10000, 2)).toImmutableArraySeq, 20, 100)
400+
testSplitPartitions(createArray(1000, 1).toImmutableArraySeq, 250, 128)
399401
}
400402

401403
test("coalesced RDDs") {

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedExcept
5050
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerId, BlockManagerMaster}
5151
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, Clock, LongAccumulator, SystemClock, ThreadUtils, Utils}
5252
import org.apache.spark.util.ArrayImplicits._
53+
import org.apache.spark.util.collection.Utils.createArray
5354

5455
class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
5556
extends DAGSchedulerEventProcessLoop(dagScheduler) {
@@ -679,23 +680,24 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
679680
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
680681
submit(reduceRdd, Array(0))
681682
// map stage1 completes successfully, with one task on each executor
683+
val uncompressedSizes = createArray(1, 2L)
682684
complete(taskSets(0), Seq(
683685
(Success,
684686
MapStatus(
685-
BlockManagerId("hostA-exec1", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 5)),
687+
BlockManagerId("hostA-exec1", "hostA", 12345), uncompressedSizes, mapTaskId = 5)),
686688
(Success,
687689
MapStatus(
688-
BlockManagerId("hostA-exec2", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 6)),
690+
BlockManagerId("hostA-exec2", "hostA", 12345), uncompressedSizes, mapTaskId = 6)),
689691
(Success, makeMapStatus("hostB", 1, mapTaskId = 7))
690692
))
691693
// map stage2 completes successfully, with one task on each executor
692694
complete(taskSets(1), Seq(
693695
(Success,
694696
MapStatus(
695-
BlockManagerId("hostA-exec1", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 8)),
697+
BlockManagerId("hostA-exec1", "hostA", 12345), uncompressedSizes, mapTaskId = 8)),
696698
(Success,
697699
MapStatus(
698-
BlockManagerId("hostA-exec2", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 9)),
700+
BlockManagerId("hostA-exec2", "hostA", 12345), uncompressedSizes, mapTaskId = 9)),
699701
(Success, makeMapStatus("hostB", 1, mapTaskId = 10))
700702
))
701703
// make sure our test setup is correct
@@ -4948,6 +4950,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
49484950
}
49494951

49504952
// stage2`s task0 Fetch failed
4953+
val uncompressedSizes = createArray(2, 2L)
49514954
runEvent(makeCompletionEvent(
49524955
taskSets(1).tasks(0),
49534956
FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0L, 0, 0,
@@ -4957,11 +4960,11 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
49574960
// long running task complete
49584961
runEvent(makeCompletionEvent(taskSets(1).tasks(1), Success,
49594962
result = MapStatus(BlockManagerId("hostC-exec1", "hostC", 44399),
4960-
Array.fill[Long](2)(2), mapTaskId = taskIdCount),
4963+
uncompressedSizes, mapTaskId = taskIdCount),
49614964
Seq.empty, Array.empty, createTaskInfo(false)))
49624965
runEvent(makeCompletionEvent(taskSets(1).tasks(0), Success,
49634966
result = MapStatus(BlockManagerId("hostD-exec1", "hostD", 44400),
4964-
Array.fill[Long](2)(2), mapTaskId = taskIdCount),
4967+
uncompressedSizes, mapTaskId = taskIdCount),
49654968
Seq.empty, Array.empty, createTaskInfo(true)))
49664969

49674970

@@ -4984,7 +4987,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
49844987
assert(stage0Retry.size === 1)
49854988
runEvent(makeCompletionEvent(stage0Retry.head.tasks(0), Success,
49864989
result = MapStatus(BlockManagerId("hostE-exec1", "hostE", 44401),
4987-
Array.fill[Long](2)(2), mapTaskId = taskIdCount)))
4990+
uncompressedSizes, mapTaskId = taskIdCount)))
49884991

49894992
// wait stage2 resubmit
49904993
sc.listenerBus.waitUntilEmpty()
@@ -5062,15 +5065,16 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
50625065
normalTask))
50635066

50645067
// Make the speculative task succeed after initial task has failed
5068+
val uncompressedSizes = createArray(2, 2L)
50655069
runEvent(makeCompletionEvent(taskSets(1).tasks(0), Success,
50665070
result = MapStatus(BlockManagerId("hostD-exec1", "hostD", 34512),
5067-
Array.fill[Long](2)(2), mapTaskId = speculativeTask.taskId),
5071+
uncompressedSizes, mapTaskId = speculativeTask.taskId),
50685072
taskInfo = speculativeTask))
50695073

50705074
// The second task, for partition 1 succeeds as well.
50715075
runEvent(makeCompletionEvent(taskSets(1).tasks(1), Success,
50725076
result = MapStatus(BlockManagerId("hostE-exec2", "hostE", 23456),
5073-
Array.fill[Long](2)(2), mapTaskId = taskIdCount)))
5077+
createArray(2, 2L), mapTaskId = taskIdCount)))
50745078
taskIdCount += 1
50755079

50765080
sc.listenerBus.waitUntilEmpty()
@@ -5096,7 +5100,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
50965100
// make the original task succeed
50975101
runEvent(makeCompletionEvent(stage0Retry.head.tasks(fetchFailParentPartition), Success,
50985102
result = MapStatus(BlockManagerId("hostF-exec1", "hostF", 12345),
5099-
Array.fill[Long](2)(2), mapTaskId = taskIdCount)))
5103+
createArray(2, 2L), mapTaskId = taskIdCount)))
51005104
Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2)
51015105
dagEventProcessLoopTester.runEvents()
51025106

core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.internal.config
3030
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
3131
import org.apache.spark.storage.BlockManagerId
3232
import org.apache.spark.util.Utils
33+
import org.apache.spark.util.collection.Utils.createArray
3334

3435
class MapStatusSuite extends SparkFunSuite {
3536
private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, Seq.empty: _*)
@@ -75,7 +76,7 @@ class MapStatusSuite extends SparkFunSuite {
7576
}
7677

7778
test("large tasks should use " + classOf[HighlyCompressedMapStatus].getName) {
78-
val sizes = Array.fill[Long](2001)(150L)
79+
val sizes = createArray(2001, 150L)
7980
val status = MapStatus(null, sizes, -1)
8081
assert(status.isInstanceOf[HighlyCompressedMapStatus])
8182
assert(status.getSizeForBlock(10) === 150L)
@@ -208,7 +209,7 @@ class MapStatusSuite extends SparkFunSuite {
208209
doReturn(conf).when(env).conf
209210
SparkEnv.set(env)
210211

211-
val emptyBlocks = Array.fill[Long](emptyBlocksLength)(0L)
212+
val emptyBlocks = createArray(emptyBlocksLength, 0L)
212213
val smallAndUntrackedBlocks = Array.tabulate[Long](smallAndUntrackedBlocksLength)(i => i)
213214
val trackedSkewedBlocks =
214215
Array.tabulate[Long](trackedSkewedBlocksLength)(i => i + 350 * 1024)
@@ -252,7 +253,7 @@ class MapStatusSuite extends SparkFunSuite {
252253
doReturn(conf).when(env).conf
253254
SparkEnv.set(env)
254255

255-
val emptyBlocks = Array.fill[Long](emptyBlocksLength)(0L)
256+
val emptyBlocks = createArray(emptyBlocksLength, 0L)
256257
val smallBlockSizes = Array.tabulate[Long](smallBlocksLength)(i => i + 1)
257258
val untrackedSkewedBlocksSizes =
258259
Array.tabulate[Long](untrackedSkewedBlocksLength)(i => i + 3500 * 1024)

0 commit comments

Comments
 (0)