@@ -18,16 +18,17 @@ package io.getstream.chat.android.client.utils.observable
1818
1919import io.getstream.chat.android.client.ChatEventListener
2020import io.getstream.chat.android.client.events.ChatEvent
21- import org.amshove.kluent.internal.assertEquals
2221import org.junit.Test
22+ import org.junit.jupiter.api.Assertions.assertEquals
2323import org.junit.jupiter.api.Assertions.assertTrue
2424import org.junit.jupiter.api.assertThrows
2525import org.mockito.kotlin.any
2626import org.mockito.kotlin.mock
2727import org.mockito.kotlin.never
2828import org.mockito.kotlin.verify
2929import org.mockito.kotlin.whenever
30- import java.util.concurrent.CountDownLatch
30+ import java.util.concurrent.CompletableFuture
31+ import java.util.concurrent.TimeUnit
3132
3233internal class SubscriptionImplTest {
3334
@@ -143,33 +144,40 @@ internal class SubscriptionImplTest {
143144
144145 @Test
145146 fun `onNext should not call listener if disposed concurrently` () {
146- val latch = CountDownLatch (1 )
147+ val gate = CompletableFuture <Unit >() // blocks the filter
148+ val filterEntered = CompletableFuture <Unit >() // signals we are inside the filter
149+
147150 val mockListener = mock<ChatEventListener <ChatEvent >>()
148- val subscription = SubscriptionImpl (filter = {
149- latch.await() // Introduce a pause in the filter
150- true
151- }, listener = mockListener)
151+ val subscription = SubscriptionImpl (
152+ filter = {
153+ filterEntered.complete(Unit ) // tell the test we are here
154+ gate.get() // wait until the test lets us go
155+ true
156+ },
157+ listener = mockListener,
158+ )
152159
153160 val event = mock<ChatEvent >()
154161 val exceptions = mutableListOf<Throwable >()
155162
156- val onNextThread = Thread {
163+ val onNextFuture = CompletableFuture .runAsync {
157164 try {
158165 subscription.onNext(event)
159166 } catch (e: Throwable ) {
160167 exceptions.add(e)
161168 }
162169 }
163- val disposerThread = Thread {
164- subscription.dispose()
165- latch.countDown() // Release the latch to allow the filter to continue
166- }
167- onNextThread.start()
168- disposerThread.start()
169- onNextThread.join()
170- disposerThread.join()
171170
172- assertEquals(" Expected no exceptions" , 0 , exceptions.size)
171+ // Wait until the filter is entered (ensures onNext is truly paused)
172+ filterEntered.get(1 , TimeUnit .SECONDS )
173+
174+ subscription.dispose() // Dispose from the test thread – this is the concurrent part
175+
176+ gate.complete(Unit ) // Unblock the filter so onNext can finish its execution
177+
178+ // Verify the outcome (no exception, listener never called)
179+ onNextFuture.get(1 , TimeUnit .SECONDS )
180+ assertEquals(0 , exceptions.size)
173181 verify(mockListener, never()).onEvent(event)
174182 }
175183}
0 commit comments