Skip to content

Commit 1dafe4a

Browse files
authored
chore: optimize EventBus performance under high-frequency publishing (#572)
1 parent b76f810 commit 1dafe4a

File tree

2 files changed

+112
-2
lines changed

2 files changed

+112
-2
lines changed

core/src/main/kotlin/io/customer/sdk/communication/EventBus.kt

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import kotlin.reflect.KClass
66
import kotlin.reflect.safeCast
77
import kotlinx.coroutines.CoroutineScope
88
import kotlinx.coroutines.Job
9+
import kotlinx.coroutines.channels.Channel
910
import kotlinx.coroutines.flow.MutableSharedFlow
1011
import kotlinx.coroutines.flow.SharedFlow
12+
import kotlinx.coroutines.flow.consumeAsFlow
1113
import kotlinx.coroutines.flow.mapNotNull
1214
import kotlinx.coroutines.launch
1315

@@ -40,12 +42,20 @@ class EventBusImpl(
4042

4143
val scope: CoroutineScope = SDKComponent.scopeProvider.eventBusScope
4244

43-
override fun publish(event: Event) {
45+
private val eventChannel = Channel<Event>(capacity = Channel.UNLIMITED)
46+
47+
init {
4448
scope.launch {
45-
sharedFlow.emit(event)
49+
eventChannel.consumeAsFlow().collect { event ->
50+
sharedFlow.tryEmit(event)
51+
}
4652
}
4753
}
4854

55+
override fun publish(event: Event) {
56+
eventChannel.trySend(event)
57+
}
58+
4959
inline fun <reified T : Event> EventBus.subscribe(noinline action: suspend (T) -> Unit) = subscribe(T::class, action)
5060

5161
override fun removeAllSubscriptions() {

core/src/test/java/io/customer/sdk/communication/EventBusTest.kt

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,15 @@ import io.customer.commontest.util.ScopeProviderStub
77
import io.customer.sdk.core.di.SDKComponent
88
import io.customer.sdk.core.util.ScopeProvider
99
import io.customer.sdk.events.Metric
10+
import kotlinx.coroutines.async
11+
import kotlinx.coroutines.awaitAll
1012
import kotlinx.coroutines.delay
1113
import kotlinx.coroutines.runBlocking
1214
import org.amshove.kluent.internal.assertEquals
1315
import org.amshove.kluent.shouldBe
1416
import org.amshove.kluent.shouldBeEqualTo
1517
import org.amshove.kluent.shouldBeInstanceOf
18+
import org.amshove.kluent.shouldBeLessThan
1619
import org.amshove.kluent.shouldHaveSingleItem
1720
import org.junit.jupiter.api.Test
1821

@@ -201,4 +204,101 @@ class EventBusTest : JUnit5Test() {
201204

202205
job.cancel()
203206
}
207+
208+
@Test
209+
fun givenConcurrentPublishingExpectCorrectOrdering() = runBlocking {
210+
val events = mutableListOf<Event>()
211+
val job = eventBus.subscribe<Event.TrackPushMetricEvent> { event ->
212+
events.add(event)
213+
}
214+
215+
// Publish 100 events concurrently to test ordering behavior
216+
val publishJobs = (1..100).map { index ->
217+
async {
218+
eventBus.publish(Event.TrackPushMetricEvent("deliveryId$index", Metric.Delivered, "deviceToken$index"))
219+
}
220+
}
221+
publishJobs.awaitAll()
222+
223+
delay(200) // Give time for all events to be processed
224+
225+
// Should receive all 100 events
226+
events.size shouldBeEqualTo 100
227+
228+
// Events should be TrackPushMetricEvent instances
229+
events.forEach { event ->
230+
event.shouldBeInstanceOf<Event.TrackPushMetricEvent>()
231+
}
232+
233+
// Verify all unique events were received (no duplicates/losses)
234+
val deliveryIds = events.map { (it as Event.TrackPushMetricEvent).deliveryId }.toSet()
235+
deliveryIds.size shouldBeEqualTo 100
236+
237+
job.cancel()
238+
}
239+
240+
@Test
241+
fun givenHighFrequencyPublishingExpectAllEventsReceived() = runBlocking {
242+
val events = mutableListOf<Event>()
243+
val job = eventBus.subscribe<Event.ScreenViewedEvent> { event ->
244+
events.add(event)
245+
}
246+
247+
val startTime = System.currentTimeMillis()
248+
249+
// Rapid publishing (simulates ViewPager or rapid navigation)
250+
repeat(1000) { index ->
251+
eventBus.publish(Event.ScreenViewedEvent("screen$index"))
252+
}
253+
254+
delay(500) // Allow processing time
255+
256+
val duration = System.currentTimeMillis() - startTime
257+
258+
// Should receive all 1000 events
259+
events.size shouldBeEqualTo 1000
260+
261+
// Should complete reasonably quickly (less than 1 second total)
262+
duration shouldBeLessThan 1000
263+
264+
// Verify events are correct type and have expected content
265+
events.forEachIndexed { index, event ->
266+
event.shouldBeInstanceOf<Event.ScreenViewedEvent>()
267+
(event as Event.ScreenViewedEvent).name shouldBeEqualTo "screen$index"
268+
}
269+
270+
job.cancel()
271+
}
272+
273+
@Test
274+
fun givenMoreThan100EventsExpectReplayBufferLimited() = runBlocking {
275+
// Publish 150 events without any subscribers to test replay buffer limit
276+
repeat(150) { index ->
277+
eventBus.publish(Event.TrackInAppMetricEvent("deliveryId$index", Metric.Delivered, params = mapOf("index" to index.toString())))
278+
}
279+
280+
delay(100) // Allow events to be buffered
281+
282+
val events = mutableListOf<Event>()
283+
val job = eventBus.subscribe<Event.TrackInAppMetricEvent> { event ->
284+
events.add(event)
285+
}
286+
287+
delay(100) // Give time for replay events to be delivered
288+
289+
// Should only receive the last 100 events due to replay buffer limit
290+
events.size shouldBeEqualTo 100
291+
292+
// First received event should be from index 50 (events 0-49 should be dropped)
293+
val firstEvent = events.first() as Event.TrackInAppMetricEvent
294+
firstEvent.deliveryID shouldBeEqualTo "deliveryId50"
295+
firstEvent.params["index"] shouldBeEqualTo "50"
296+
297+
// Last received event should be from index 149
298+
val lastEvent = events.last() as Event.TrackInAppMetricEvent
299+
lastEvent.deliveryID shouldBeEqualTo "deliveryId149"
300+
lastEvent.params["index"] shouldBeEqualTo "149"
301+
302+
job.cancel()
303+
}
204304
}

0 commit comments

Comments
 (0)