Skip to content

Commit 6dfdb31

Browse files
committed
fixup! Update unit tests
1 parent a2942af commit 6dfdb31

File tree

2 files changed

+14
-2
lines changed

2 files changed

+14
-2
lines changed

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,8 @@ final class ShuffleBlockFetcherIterator(
153153
* Thread pool reading from fallback storage, first creating FallbackStorageRequest from
154154
* block id and map index, then materializing requests to SuccessFetchResult.
155155
*/
156-
private[this] val fallbackStorageReadPool: ThreadPoolExecutor =
156+
// This is visible for testing
157+
private[storage] val fallbackStorageReadPool: ThreadPoolExecutor =
157158
ThreadUtils.newDaemonFixedThreadPool(fallbackStorageReadThreads, "fallback-storage-read")
158159
private[this] val fallbackStorageReadContext: ExecutionContextExecutor =
159160
ExecutionContext.fromExecutor(fallbackStorageReadPool)

core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ import org.mockito.invocation.InvocationOnMock
3939
import org.mockito.stubbing.Answer
4040
import org.roaringbitmap.RoaringBitmap
4141
import org.scalatest.PrivateMethodTester
42+
import org.scalatest.concurrent.Eventually
43+
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
4244

4345
import org.apache.spark.{MapOutputTracker, SparkFunSuite, TaskContext}
4446
import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID
@@ -52,7 +54,8 @@ import org.apache.spark.storage.ShuffleBlockFetcherIterator._
5254
import org.apache.spark.util.Utils
5355

5456

55-
class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodTester {
57+
class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with Eventually
58+
with PrivateMethodTester {
5659

5760
private var transfer: BlockTransferService = _
5861
private var mapOutputTracker: MapOutputTracker = _
@@ -361,6 +364,10 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
361364
verify(blockManager, times(3)).getLocalBlockData(any())
362365

363366
// 2 fallback storage blocks fetched in initialization
367+
// initialize creates futures that eventually call into getFallbackStorageBlockData
368+
eventually(timeout(1.seconds), interval(10.millis)) {
369+
assert(iterator.fallbackStorageReadPool.getCompletedTaskCount >= 2)
370+
}
364371
verify(blockManager, times(2)).getFallbackStorageBlockData(any())
365372
// SPARK-55469: but buffer data have never been materialized
366373
fallbackBlocks.values.foreach { mockBuf =>
@@ -552,6 +559,10 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
552559
verify(blockManager, times(1)).getLocalBlockData(any())
553560

554561
// 1 fallback merge block fetched in initialization
562+
// initialize creates futures that eventually call into getFallbackStorageBlockData
563+
eventually(timeout(1.seconds), interval(10.millis)) {
564+
assert(iterator.fallbackStorageReadPool.getCompletedTaskCount >= 1)
565+
}
555566
verify(blockManager, times(1)).getFallbackStorageBlockData(any())
556567
// SPARK-55469: but buffer data have never been materialized
557568
mergedFallbackBlocks.values.foreach { mockBuf =>

0 commit comments

Comments
 (0)