diff --git a/features/home/impl/src/main/kotlin/io/element/android/features/home/impl/datasource/RoomListDataSource.kt b/features/home/impl/src/main/kotlin/io/element/android/features/home/impl/datasource/RoomListDataSource.kt index b9e8d27bf7d..8e676e7b908 100644 --- a/features/home/impl/src/main/kotlin/io/element/android/features/home/impl/datasource/RoomListDataSource.kt +++ b/features/home/impl/src/main/kotlin/io/element/android/features/home/impl/datasource/RoomListDataSource.kt @@ -48,7 +48,7 @@ class RoomListDataSource( observeDateTimeChanges() } - private val _allRooms = MutableSharedFlow>(replay = 1) + private val _allRooms = MutableSharedFlow>(replay = 1, extraBufferCapacity = 5) private val lock = Mutex() private val diffCache = MutableListDiffCache() diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt index c646e2ee184..9cc3d34cdb6 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt @@ -133,7 +133,7 @@ class RustMatrixClient( override val sessionId: UserId = UserId(innerClient.userId()) override val deviceId: DeviceId = DeviceId(innerClient.deviceId()) override val sessionCoroutineScope = appCoroutineScope.childScope(dispatchers.main, "Session-$sessionId") - private val sessionDispatcher = dispatchers.io.limitedParallelism(64) + private val sessionDispatcher = dispatchers.computation.limitedParallelism(64) private val innerRoomListService = innerSyncService.roomListService() private val innerSpaceService = innerClient.spaceService() @@ -176,6 +176,7 @@ class RustMatrixClient( roomListFactory = RoomListFactory( innerRoomListService = innerRoomListService, sessionCoroutineScope = sessionCoroutineScope, + coroutineDispatchers = dispatchers, ), roomSyncSubscriber = roomSyncSubscriber, ) diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/roomlist/RoomListEntriesUpdateExt.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/roomlist/RoomListEntriesUpdateExt.kt index 99d1646c72c..d9e2e826ba1 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/roomlist/RoomListEntriesUpdateExt.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/roomlist/RoomListEntriesUpdateExt.kt @@ -8,32 +8,33 @@ package io.element.android.libraries.matrix.impl.roomlist import io.element.android.libraries.architecture.coverage.ExcludeFromCoverage +import org.matrix.rustcomponents.sdk.Room import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate @Suppress("unused") @ExcludeFromCoverage -internal fun RoomListEntriesUpdate.describe(): String { +internal fun RoomListEntriesUpdate.describe(includeRoomNames: Boolean = false): String { return when (this) { is RoomListEntriesUpdate.Set -> { - "Set #$index to '${value.displayName()}'" + "Set #$index to ${roomDescription(value, includeRoomNames)}" } is RoomListEntriesUpdate.Append -> { - "Append ${values.map { "'" + it.displayName() + "'" }}" + "Append ${values.map { roomDescription(it, includeRoomNames) }}" } is RoomListEntriesUpdate.PushBack -> { - "PushBack '${value.displayName()}'" + "PushBack ${roomDescription(value, includeRoomNames)}" } is RoomListEntriesUpdate.PushFront -> { - "PushFront '${value.displayName()}'" + "PushFront ${roomDescription(value, includeRoomNames)}" } is RoomListEntriesUpdate.Insert -> { - "Insert at #$index: '${value.displayName()}'" + "Insert at #$index: ${roomDescription(value, includeRoomNames)}" } is RoomListEntriesUpdate.Remove -> { "Remove #$index" } is RoomListEntriesUpdate.Reset -> { - "Reset all to ${values.map { "'" + it.displayName() + "'" }}" + "Reset all to ${values.map { roomDescription(it, includeRoomNames) }}" } RoomListEntriesUpdate.PopBack -> { "PopBack" @@ -49,3 +50,11 @@ internal fun RoomListEntriesUpdate.describe(): String { } } } + +private fun roomDescription(room: Room, includeRoomNames: Boolean): String { + return if (includeRoomNames) { + "'${room.displayName()}' - ${room.id()}" + } else { + room.id() + } +} diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/roomlist/RoomListFactory.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/roomlist/RoomListFactory.kt index 53b8127ab84..92014f65ee1 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/roomlist/RoomListFactory.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/roomlist/RoomListFactory.kt @@ -7,6 +7,7 @@ package io.element.android.libraries.matrix.impl.roomlist +import io.element.android.libraries.core.coroutine.CoroutineDispatchers import io.element.android.libraries.matrix.api.roomlist.DynamicRoomList import io.element.android.libraries.matrix.api.roomlist.RoomList import io.element.android.libraries.matrix.api.roomlist.RoomListFilter @@ -35,6 +36,7 @@ private val ROOM_LIST_RUST_FILTERS = listOf( internal class RoomListFactory( private val innerRoomListService: RoomListService, private val sessionCoroutineScope: CoroutineScope, + private val coroutineDispatchers: CoroutineDispatchers, ) { private val roomSummaryDetailsFactory: RoomSummaryFactory = RoomSummaryFactory() @@ -49,9 +51,15 @@ internal class RoomListFactory( innerProvider: suspend () -> InnerRoomList ): DynamicRoomList { val loadingStateFlow: MutableStateFlow = MutableStateFlow(RoomList.LoadingState.NotLoaded) - val filteredSummariesFlow = MutableSharedFlow>(replay = 1, extraBufferCapacity = 1) - val summariesFlow = MutableSharedFlow>(replay = 1, extraBufferCapacity = 1) - val processor = RoomSummaryListProcessor(summariesFlow, innerRoomListService, coroutineContext, roomSummaryDetailsFactory) + val summariesFlow = MutableSharedFlow>(replay = 1, extraBufferCapacity = 5) + val filteredSummariesFlow = MutableSharedFlow>(replay = 1, extraBufferCapacity = 5) + val processor = RoomSummaryListProcessor( + roomSummaries = summariesFlow, + roomListService = innerRoomListService, + coroutineContext = coroutineContext, + roomSummaryDetailsFactory = roomSummaryDetailsFactory, + coroutineDispatchers = coroutineDispatchers, + ) // Makes sure we don't miss any events val dynamicEvents = MutableSharedFlow(replay = 100) val currentFilter = MutableStateFlow(initialFilter) diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/roomlist/RoomSummaryListProcessor.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/roomlist/RoomSummaryListProcessor.kt index e26cea1c94f..8689b3778de 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/roomlist/RoomSummaryListProcessor.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/roomlist/RoomSummaryListProcessor.kt @@ -7,38 +7,75 @@ package io.element.android.libraries.matrix.impl.roomlist +import io.element.android.libraries.core.coroutine.CoroutineDispatchers import io.element.android.libraries.matrix.api.roomlist.RoomSummary +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext import org.matrix.rustcomponents.sdk.Room import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate import org.matrix.rustcomponents.sdk.RoomListServiceInterface -import org.matrix.rustcomponents.sdk.use import timber.log.Timber +import java.util.LinkedList import kotlin.coroutines.CoroutineContext +import kotlin.time.measureTime class RoomSummaryListProcessor( private val roomSummaries: MutableSharedFlow>, private val roomListService: RoomListServiceInterface, private val coroutineContext: CoroutineContext, private val roomSummaryDetailsFactory: RoomSummaryFactory = RoomSummaryFactory(), + coroutineDispatchers: CoroutineDispatchers, ) { - private val mutex = Mutex() + private val modifyPendingJobsMutex = Mutex() + private val updateSummariesMutex = Mutex() + + private val coroutineScope = CoroutineScope(coroutineContext + coroutineDispatchers.computation) + private val pendingUpdateJobs = LinkedList() suspend fun postUpdate(updates: List) { - updateRoomSummaries { - Timber.v("Update rooms from postUpdates (with ${updates.size} items) on ${Thread.currentThread()}") - updates.forEach { update -> - applyUpdate(update) + val first = updates.firstOrNull() + + modifyPendingJobsMutex.withLock { + // We can cancel any pending updates if we receive a Reset or Clear + if (first is RoomListEntriesUpdate.Reset || first is RoomListEntriesUpdate.Clear) { + while (pendingUpdateJobs.isNotEmpty()) { + pendingUpdateJobs.removeFirst().cancel() + } } - // TODO remove once https://github.com/element-hq/element-x-android/issues/5031 has been confirmed as fixed - val duplicates = groupingBy { it.roomId }.eachCount().filter { it.value > 1 } - if (duplicates.isNotEmpty()) { - Timber.e("Found duplicates in room summaries after a list update from the SDK: $duplicates. Updates: $updates") + val job = coroutineScope.launch { + updateRoomSummaries { + Timber.v("Update rooms from postUpdates (with ${updates.size} items) on ${Thread.currentThread()}") + val elapsed = measureTime { + for (update in updates) { + applyUpdate(update) + } + + // TODO remove once https://github.com/element-hq/element-x-android/issues/5031 has been confirmed as fixed + val duplicates = groupingBy { it.roomId }.eachCount().filter { it.value > 1 } + if (duplicates.isNotEmpty()) { + Timber.e( + "Found duplicates in room summaries after a list update from the SDK: $duplicates. Updates: ${updates.map { it.describe() }}" + ) + } + } + Timber.d("Time to apply all updates: $elapsed") + + modifyPendingJobsMutex.withLock { + // Remove the current job from the pending ones (done at the end so it can be cancelled) + if (pendingUpdateJobs.isNotEmpty()) { + pendingUpdateJobs.removeFirst() + } + } + } } + + pendingUpdateJobs.add(job) } } @@ -55,7 +92,7 @@ class RoomSummaryListProcessor( private suspend fun MutableList.applyUpdate(update: RoomListEntriesUpdate) { // Remove this comment to debug changes in the room list - // Timber.d("Apply room list update: ${update.describe()}") + // Timber.d("Apply room list update: ${update.describe(includeRoomNames = true)}") when (update) { is RoomListEntriesUpdate.Append -> { val roomSummaries = update.values.map { @@ -112,9 +149,9 @@ class RoomSummaryListProcessor( } private suspend fun updateRoomSummaries(block: suspend MutableList.() -> Unit) = withContext(coroutineContext) { - mutex.withLock { + updateSummariesMutex.withLock { val current = roomSummaries.replayCache.lastOrNull() - val mutableRoomSummaries = current.orEmpty().toMutableList() + val mutableRoomSummaries = current?.toMutableList() ?: mutableListOf() block(mutableRoomSummaries) roomSummaries.emit(mutableRoomSummaries) } diff --git a/libraries/matrix/impl/src/test/kotlin/io/element/android/libraries/matrix/impl/roomlist/RoomListFactoryTest.kt b/libraries/matrix/impl/src/test/kotlin/io/element/android/libraries/matrix/impl/roomlist/RoomListFactoryTest.kt index 2d97f2c5895..9a5e4881907 100644 --- a/libraries/matrix/impl/src/test/kotlin/io/element/android/libraries/matrix/impl/roomlist/RoomListFactoryTest.kt +++ b/libraries/matrix/impl/src/test/kotlin/io/element/android/libraries/matrix/impl/roomlist/RoomListFactoryTest.kt @@ -9,6 +9,7 @@ package io.element.android.libraries.matrix.impl.roomlist import io.element.android.libraries.matrix.impl.fixtures.fakes.FakeFfiRoomList import io.element.android.libraries.matrix.impl.fixtures.fakes.FakeFfiRoomListService +import io.element.android.tests.testutils.testCoroutineDispatchers import kotlinx.coroutines.test.runTest import org.junit.Test import kotlin.coroutines.EmptyCoroutineContext @@ -19,6 +20,7 @@ class RoomListFactoryTest { val sut = RoomListFactory( innerRoomListService = FakeFfiRoomListService(), sessionCoroutineScope = backgroundScope, + coroutineDispatchers = testCoroutineDispatchers(), ) sut.createRoomList( pageSize = 10, diff --git a/libraries/matrix/impl/src/test/kotlin/io/element/android/libraries/matrix/impl/roomlist/RoomSummaryListProcessorTest.kt b/libraries/matrix/impl/src/test/kotlin/io/element/android/libraries/matrix/impl/roomlist/RoomSummaryListProcessorTest.kt index 2b54463deab..750f625f504 100644 --- a/libraries/matrix/impl/src/test/kotlin/io/element/android/libraries/matrix/impl/roomlist/RoomSummaryListProcessorTest.kt +++ b/libraries/matrix/impl/src/test/kotlin/io/element/android/libraries/matrix/impl/roomlist/RoomSummaryListProcessorTest.kt @@ -16,14 +16,17 @@ import io.element.android.libraries.matrix.test.A_ROOM_ID import io.element.android.libraries.matrix.test.A_ROOM_ID_2 import io.element.android.libraries.matrix.test.A_ROOM_ID_3 import io.element.android.libraries.matrix.test.room.aRoomSummary +import io.element.android.tests.testutils.testCoroutineDispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.test.StandardTestDispatcher import kotlinx.coroutines.test.TestScope +import kotlinx.coroutines.test.advanceUntilIdle import kotlinx.coroutines.test.runTest import org.junit.Test import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate -class RoomSummaryListProcessorTest { +@OptIn(ExperimentalCoroutinesApi::class) class RoomSummaryListProcessorTest { private val summaries = MutableStateFlow>(emptyList()) @Test @@ -34,6 +37,8 @@ class RoomSummaryListProcessorTest { val newEntry = aRustRoom(A_ROOM_ID_2) processor.postUpdate(listOf(RoomListEntriesUpdate.Append(listOf(newEntry, newEntry, newEntry)))) + advanceUntilIdle() + assertThat(summaries.value.count()).isEqualTo(4) assertThat(summaries.value.subList(1, 4).all { it.roomId == A_ROOM_ID_2 }).isTrue() } @@ -44,6 +49,8 @@ class RoomSummaryListProcessorTest { val processor = createProcessor() processor.postUpdate(listOf(RoomListEntriesUpdate.PushBack(aRustRoom(A_ROOM_ID_2)))) + advanceUntilIdle() + assertThat(summaries.value.count()).isEqualTo(2) assertThat(summaries.value.last().roomId).isEqualTo(A_ROOM_ID_2) } @@ -54,6 +61,8 @@ class RoomSummaryListProcessorTest { val processor = createProcessor() processor.postUpdate(listOf(RoomListEntriesUpdate.PushFront(aRustRoom(A_ROOM_ID_2)))) + advanceUntilIdle() + assertThat(summaries.value.count()).isEqualTo(2) assertThat(summaries.value.first().roomId).isEqualTo(A_ROOM_ID_2) } @@ -66,6 +75,8 @@ class RoomSummaryListProcessorTest { processor.postUpdate(listOf(RoomListEntriesUpdate.Set(index.toUInt(), aRustRoom(A_ROOM_ID_2)))) + advanceUntilIdle() + assertThat(summaries.value.count()).isEqualTo(1) assertThat(summaries.value[index].roomId).isEqualTo(A_ROOM_ID_2) } @@ -78,6 +89,8 @@ class RoomSummaryListProcessorTest { processor.postUpdate(listOf(RoomListEntriesUpdate.Insert(index.toUInt(), aRustRoom(A_ROOM_ID_2)))) + advanceUntilIdle() + assertThat(summaries.value.count()).isEqualTo(2) assertThat(summaries.value[index].roomId).isEqualTo(A_ROOM_ID_2) } @@ -93,6 +106,8 @@ class RoomSummaryListProcessorTest { processor.postUpdate(listOf(RoomListEntriesUpdate.Remove(index.toUInt()))) + advanceUntilIdle() + assertThat(summaries.value.count()).isEqualTo(1) assertThat(summaries.value[index].roomId).isEqualTo(A_ROOM_ID_2) } @@ -108,6 +123,8 @@ class RoomSummaryListProcessorTest { processor.postUpdate(listOf(RoomListEntriesUpdate.PopBack)) + advanceUntilIdle() + assertThat(summaries.value.count()).isEqualTo(1) assertThat(summaries.value[index].roomId).isEqualTo(A_ROOM_ID) } @@ -123,6 +140,8 @@ class RoomSummaryListProcessorTest { processor.postUpdate(listOf(RoomListEntriesUpdate.PopFront)) + advanceUntilIdle() + assertThat(summaries.value.count()).isEqualTo(1) assertThat(summaries.value[index].roomId).isEqualTo(A_ROOM_ID_2) } @@ -137,6 +156,8 @@ class RoomSummaryListProcessorTest { processor.postUpdate(listOf(RoomListEntriesUpdate.Clear)) + advanceUntilIdle() + assertThat(summaries.value).isEmpty() } @@ -151,6 +172,8 @@ class RoomSummaryListProcessorTest { processor.postUpdate(listOf(RoomListEntriesUpdate.Truncate(1u))) + advanceUntilIdle() + assertThat(summaries.value.count()).isEqualTo(1) assertThat(summaries.value[index].roomId).isEqualTo(A_ROOM_ID) } @@ -166,6 +189,8 @@ class RoomSummaryListProcessorTest { processor.postUpdate(listOf(RoomListEntriesUpdate.Reset(listOf(aRustRoom(A_ROOM_ID_3))))) + advanceUntilIdle() + assertThat(summaries.value.count()).isEqualTo(1) assertThat(summaries.value[index].roomId).isEqualTo(A_ROOM_ID_3) } @@ -180,5 +205,6 @@ class RoomSummaryListProcessorTest { FakeFfiRoomListService(), coroutineContext = StandardTestDispatcher(testScheduler), roomSummaryDetailsFactory = RoomSummaryFactory(), + coroutineDispatchers = testCoroutineDispatchers(), ) } diff --git a/libraries/matrix/impl/src/test/kotlin/io/element/android/libraries/matrix/impl/roomlist/RustBaseRoomListServiceTest.kt b/libraries/matrix/impl/src/test/kotlin/io/element/android/libraries/matrix/impl/roomlist/RustBaseRoomListServiceTest.kt index 561c1f245f4..ea72f523a66 100644 --- a/libraries/matrix/impl/src/test/kotlin/io/element/android/libraries/matrix/impl/roomlist/RustBaseRoomListServiceTest.kt +++ b/libraries/matrix/impl/src/test/kotlin/io/element/android/libraries/matrix/impl/roomlist/RustBaseRoomListServiceTest.kt @@ -49,6 +49,7 @@ private fun TestScope.createRustRoomListService( roomListFactory = RoomListFactory( innerRoomListService = roomListService, sessionCoroutineScope = backgroundScope, + coroutineDispatchers = testCoroutineDispatchers(), ), roomSyncSubscriber = RoomSyncSubscriber( roomListService = roomListService,