@@ -31,6 +31,7 @@ import kotlinx.coroutines.Deferred
3131import kotlinx.coroutines.Dispatchers
3232import kotlinx.coroutines.ExperimentalCoroutinesApi
3333import kotlinx.coroutines.SupervisorJob
34+ import kotlinx.coroutines.asCoroutineDispatcher
3435import kotlinx.coroutines.async
3536import kotlinx.coroutines.channels.ClosedSendChannelException
3637import kotlinx.coroutines.delay
@@ -41,6 +42,7 @@ import kotlinx.coroutines.test.runTest
4142import org.junit.Assert.assertEquals
4243import org.junit.Assert.assertTrue
4344import org.junit.Test
45+ import java.util.concurrent.Executors
4446
4547@OptIn(ExperimentalCoroutinesApi ::class )
4648class StreamSingleFlightProcessorImplTest {
@@ -418,41 +420,43 @@ class StreamSingleFlightProcessorImplTest {
418420
419421 @Test
420422 fun `run with racing callers where loser joins existing flight` () = runTest {
421- val map = RecordingMap <Any , Deferred <Result <* >>>()
422- val sf =
423- StreamSingleFlightProcessorImpl (
424- scope = CoroutineScope (SupervisorJob () + Dispatchers .Default ),
423+ val pool = Executors .newFixedThreadPool(4 ).asCoroutineDispatcher()
424+ try {
425+ val map = RecordingMap <Any , Deferred <Result <* >>>()
426+ val sf = StreamSingleFlightProcessorImpl (
427+ scope = CoroutineScope (SupervisorJob () + pool),
425428 flights = map,
426429 )
427- val key = " k" .asStreamTypedKey<Int >()
430+ val key = " k" .asStreamTypedKey<Int >()
428431
429- repeat(30 ) {
430- val gate = java.util.concurrent.CountDownLatch (1 )
431- val a =
432- async(Dispatchers .Default ) {
432+ repeat(200 ) { // more attempts to guarantee at least one race
433+ val gate = java.util.concurrent.CountDownLatch (1 )
434+ val a = async(pool) {
433435 gate.await()
434436 sf.run (key) {
435- delay(10 )
437+ delay(100 ) // keep the winner running long enough
436438 1
437439 }
438440 }
439- val b =
440- async(Dispatchers .Default ) {
441+ val b = async(pool) {
441442 gate.await()
442443 sf.run (key) {
443- delay(10 )
444+ delay(100 )
444445 1
445446 }
446447 }
447- gate.countDown()
448- a.await()
449- b.await()
450- }
448+ gate.countDown()
449+ a.await()
450+ b.await()
451+ }
451452
452- assertTrue(
453- " Expected putIfAbsent to return existing at least once" ,
454- map.installedNonNull.get(),
455- )
453+ assertTrue(
454+ " Expected putIfAbsent to return existing at least once" ,
455+ map.installedNonNull.get(),
456+ )
457+ } finally {
458+ pool.close() // or pool.executor.shutdown()
459+ }
456460 }
457461
458462 @Test
0 commit comments