@@ -14,7 +14,7 @@ import kotlin.test.*
14
14
class FlatMapStressTest : TestBase () {
15
15
16
16
private val iterations = 2000 * stressTestMultiplier
17
- private val expectedSum = iterations * (iterations + 1 ) / 2
17
+ private val expectedSum = iterations.toLong() * (iterations + 1 ) / 2
18
18
19
19
@Test
20
20
fun testConcurrencyLevel () = runTest {
@@ -35,7 +35,7 @@ class FlatMapStressTest : TestBase() {
35
35
val bufferSize = 5
36
36
withContext(Dispatchers .Default ) {
37
37
val inFlightElements = AtomicLong (0L )
38
- var result = 0
38
+ var result = 0L
39
39
(1 .. iterations step 4 ).asFlow().flatMapMerge(bufferSize = bufferSize) { value ->
40
40
unsafeFlow {
41
41
repeat(4 ) {
@@ -59,11 +59,11 @@ class FlatMapStressTest : TestBase() {
59
59
@Test
60
60
fun testDelivery () = runTest {
61
61
withContext(Dispatchers .Default ) {
62
- val result = (1 .. iterations step 4 ).asFlow().flatMapMerge { value ->
62
+ val result = (1L .. iterations step 4 ).asFlow().flatMapMerge { value ->
63
63
unsafeFlow {
64
64
repeat(4 ) { emit(value + it) }
65
65
}
66
- }.sum ()
66
+ }.longSum ()
67
67
assertEquals(expectedSum, result)
68
68
}
69
69
}
@@ -72,12 +72,12 @@ class FlatMapStressTest : TestBase() {
72
72
fun testIndependentShortBursts () = runTest {
73
73
withContext(Dispatchers .Default ) {
74
74
repeat(iterations) {
75
- val result = (1 .. 4 ).asFlow().flatMapMerge { value ->
75
+ val result = (1L .. 4L ).asFlow().flatMapMerge { value ->
76
76
unsafeFlow {
77
77
emit(value)
78
78
emit(value)
79
79
}
80
- }.sum ()
80
+ }.longSum ()
81
81
assertEquals(20 , result)
82
82
}
83
83
}
@@ -86,14 +86,14 @@ class FlatMapStressTest : TestBase() {
86
86
private suspend fun testConcurrencyLevel (maxConcurrency : Int ) {
87
87
assumeTrue(maxConcurrency <= CORE_POOL_SIZE )
88
88
val concurrency = AtomicLong ()
89
- val result = (1 .. iterations).asFlow().flatMapMerge(concurrency = maxConcurrency) { value ->
89
+ val result = (1L .. iterations).asFlow().flatMapMerge(concurrency = maxConcurrency) { value ->
90
90
unsafeFlow {
91
91
val current = concurrency.incrementAndGet()
92
92
assertTrue(current in 1 .. maxConcurrency)
93
93
emit(value)
94
94
concurrency.decrementAndGet()
95
95
}
96
- }.sum ()
96
+ }.longSum ()
97
97
98
98
assertEquals(0 , concurrency.get())
99
99
assertEquals(expectedSum, result)
0 commit comments