Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class RoomListDataSource(
observeDateTimeChanges()
}

private val _allRooms = MutableSharedFlow<ImmutableList<RoomListRoomSummary>>(replay = 1)
private val _allRooms = MutableSharedFlow<ImmutableList<RoomListRoomSummary>>(replay = 1, extraBufferCapacity = 5)

private val lock = Mutex()
private val diffCache = MutableListDiffCache<RoomListRoomSummary>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class RustMatrixClient(
override val sessionId: UserId = UserId(innerClient.userId())
override val deviceId: DeviceId = DeviceId(innerClient.deviceId())
override val sessionCoroutineScope = appCoroutineScope.childScope(dispatchers.main, "Session-$sessionId")
private val sessionDispatcher = dispatchers.io.limitedParallelism(64)
private val sessionDispatcher = dispatchers.computation.limitedParallelism(64)

private val innerRoomListService = innerSyncService.roomListService()
private val innerSpaceService = innerClient.spaceService()
Expand Down Expand Up @@ -176,6 +176,7 @@ class RustMatrixClient(
roomListFactory = RoomListFactory(
innerRoomListService = innerRoomListService,
sessionCoroutineScope = sessionCoroutineScope,
coroutineDispatchers = dispatchers,
),
roomSyncSubscriber = roomSyncSubscriber,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,33 @@
package io.element.android.libraries.matrix.impl.roomlist

import io.element.android.libraries.architecture.coverage.ExcludeFromCoverage
import org.matrix.rustcomponents.sdk.Room
import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate

@Suppress("unused")
@ExcludeFromCoverage
internal fun RoomListEntriesUpdate.describe(): String {
internal fun RoomListEntriesUpdate.describe(includeRoomNames: Boolean = false): String {
return when (this) {
is RoomListEntriesUpdate.Set -> {
"Set #$index to '${value.displayName()}'"
"Set #$index to ${roomDescription(value, includeRoomNames)}"
}
is RoomListEntriesUpdate.Append -> {
"Append ${values.map { "'" + it.displayName() + "'" }}"
"Append ${values.map { roomDescription(it, includeRoomNames) }}"
}
is RoomListEntriesUpdate.PushBack -> {
"PushBack '${value.displayName()}'"
"PushBack ${roomDescription(value, includeRoomNames)}"
}
is RoomListEntriesUpdate.PushFront -> {
"PushFront '${value.displayName()}'"
"PushFront ${roomDescription(value, includeRoomNames)}"
}
is RoomListEntriesUpdate.Insert -> {
"Insert at #$index: '${value.displayName()}'"
"Insert at #$index: ${roomDescription(value, includeRoomNames)}"
}
is RoomListEntriesUpdate.Remove -> {
"Remove #$index"
}
is RoomListEntriesUpdate.Reset -> {
"Reset all to ${values.map { "'" + it.displayName() + "'" }}"
"Reset all to ${values.map { roomDescription(it, includeRoomNames) }}"
}
RoomListEntriesUpdate.PopBack -> {
"PopBack"
Expand All @@ -49,3 +50,11 @@ internal fun RoomListEntriesUpdate.describe(): String {
}
}
}

private fun roomDescription(room: Room, includeRoomNames: Boolean): String {
return if (includeRoomNames) {
"'${room.displayName()}' - ${room.id()}"
} else {
room.id()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package io.element.android.libraries.matrix.impl.roomlist

import io.element.android.libraries.core.coroutine.CoroutineDispatchers
import io.element.android.libraries.matrix.api.roomlist.DynamicRoomList
import io.element.android.libraries.matrix.api.roomlist.RoomList
import io.element.android.libraries.matrix.api.roomlist.RoomListFilter
Expand Down Expand Up @@ -35,6 +36,7 @@ private val ROOM_LIST_RUST_FILTERS = listOf(
internal class RoomListFactory(
private val innerRoomListService: RoomListService,
private val sessionCoroutineScope: CoroutineScope,
private val coroutineDispatchers: CoroutineDispatchers,
) {
private val roomSummaryDetailsFactory: RoomSummaryFactory = RoomSummaryFactory()

Expand All @@ -49,9 +51,15 @@ internal class RoomListFactory(
innerProvider: suspend () -> InnerRoomList
): DynamicRoomList {
val loadingStateFlow: MutableStateFlow<RoomList.LoadingState> = MutableStateFlow(RoomList.LoadingState.NotLoaded)
val filteredSummariesFlow = MutableSharedFlow<List<RoomSummary>>(replay = 1, extraBufferCapacity = 1)
val summariesFlow = MutableSharedFlow<List<RoomSummary>>(replay = 1, extraBufferCapacity = 1)
val processor = RoomSummaryListProcessor(summariesFlow, innerRoomListService, coroutineContext, roomSummaryDetailsFactory)
val summariesFlow = MutableSharedFlow<List<RoomSummary>>(replay = 1, extraBufferCapacity = 5)
val filteredSummariesFlow = MutableSharedFlow<List<RoomSummary>>(replay = 1, extraBufferCapacity = 5)
val processor = RoomSummaryListProcessor(
roomSummaries = summariesFlow,
roomListService = innerRoomListService,
coroutineContext = coroutineContext,
roomSummaryDetailsFactory = roomSummaryDetailsFactory,
coroutineDispatchers = coroutineDispatchers,
)
// Makes sure we don't miss any events
val dynamicEvents = MutableSharedFlow<RoomListDynamicEvents>(replay = 100)
val currentFilter = MutableStateFlow(initialFilter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,75 @@

package io.element.android.libraries.matrix.impl.roomlist

import io.element.android.libraries.core.coroutine.CoroutineDispatchers
import io.element.android.libraries.matrix.api.roomlist.RoomSummary
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import org.matrix.rustcomponents.sdk.Room
import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate
import org.matrix.rustcomponents.sdk.RoomListServiceInterface
import org.matrix.rustcomponents.sdk.use
import timber.log.Timber
import java.util.LinkedList
import kotlin.coroutines.CoroutineContext
import kotlin.time.measureTime

class RoomSummaryListProcessor(
private val roomSummaries: MutableSharedFlow<List<RoomSummary>>,
private val roomListService: RoomListServiceInterface,
private val coroutineContext: CoroutineContext,
private val roomSummaryDetailsFactory: RoomSummaryFactory = RoomSummaryFactory(),
coroutineDispatchers: CoroutineDispatchers,
) {
private val mutex = Mutex()
private val modifyPendingJobsMutex = Mutex()
private val updateSummariesMutex = Mutex()

private val coroutineScope = CoroutineScope(coroutineContext + coroutineDispatchers.computation)
private val pendingUpdateJobs = LinkedList<Job>()

suspend fun postUpdate(updates: List<RoomListEntriesUpdate>) {
updateRoomSummaries {
Timber.v("Update rooms from postUpdates (with ${updates.size} items) on ${Thread.currentThread()}")
updates.forEach { update ->
applyUpdate(update)
val first = updates.firstOrNull()

modifyPendingJobsMutex.withLock {
// We can cancel any pending updates if we receive a Reset or Clear
if (first is RoomListEntriesUpdate.Reset || first is RoomListEntriesUpdate.Clear) {
while (pendingUpdateJobs.isNotEmpty()) {
pendingUpdateJobs.removeFirst().cancel()
}
}

// TODO remove once https://github.com/element-hq/element-x-android/issues/5031 has been confirmed as fixed
val duplicates = groupingBy { it.roomId }.eachCount().filter { it.value > 1 }
if (duplicates.isNotEmpty()) {
Timber.e("Found duplicates in room summaries after a list update from the SDK: $duplicates. Updates: $updates")
val job = coroutineScope.launch {
updateRoomSummaries {
Timber.v("Update rooms from postUpdates (with ${updates.size} items) on ${Thread.currentThread()}")
val elapsed = measureTime {
for (update in updates) {
applyUpdate(update)
}

// TODO remove once https://github.com/element-hq/element-x-android/issues/5031 has been confirmed as fixed
val duplicates = groupingBy { it.roomId }.eachCount().filter { it.value > 1 }
if (duplicates.isNotEmpty()) {
Timber.e(
"Found duplicates in room summaries after a list update from the SDK: $duplicates. Updates: ${updates.map { it.describe() }}"
)
}
}
Timber.d("Time to apply all updates: $elapsed")

modifyPendingJobsMutex.withLock {
// Remove the current job from the pending ones (done at the end so it can be cancelled)
if (pendingUpdateJobs.isNotEmpty()) {
pendingUpdateJobs.removeFirst()
}
}
}
}

pendingUpdateJobs.add(job)
}
}

Expand All @@ -55,7 +92,7 @@ class RoomSummaryListProcessor(

private suspend fun MutableList<RoomSummary>.applyUpdate(update: RoomListEntriesUpdate) {
// Remove this comment to debug changes in the room list
// Timber.d("Apply room list update: ${update.describe()}")
// Timber.d("Apply room list update: ${update.describe(includeRoomNames = true)}")
when (update) {
is RoomListEntriesUpdate.Append -> {
val roomSummaries = update.values.map {
Expand Down Expand Up @@ -112,9 +149,9 @@ class RoomSummaryListProcessor(
}

private suspend fun updateRoomSummaries(block: suspend MutableList<RoomSummary>.() -> Unit) = withContext(coroutineContext) {
mutex.withLock {
updateSummariesMutex.withLock {
val current = roomSummaries.replayCache.lastOrNull()
val mutableRoomSummaries = current.orEmpty().toMutableList()
val mutableRoomSummaries = current?.toMutableList() ?: mutableListOf()
block(mutableRoomSummaries)
roomSummaries.emit(mutableRoomSummaries)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package io.element.android.libraries.matrix.impl.roomlist

import io.element.android.libraries.matrix.impl.fixtures.fakes.FakeFfiRoomList
import io.element.android.libraries.matrix.impl.fixtures.fakes.FakeFfiRoomListService
import io.element.android.tests.testutils.testCoroutineDispatchers
import kotlinx.coroutines.test.runTest
import org.junit.Test
import kotlin.coroutines.EmptyCoroutineContext
Expand All @@ -19,6 +20,7 @@ class RoomListFactoryTest {
val sut = RoomListFactory(
innerRoomListService = FakeFfiRoomListService(),
sessionCoroutineScope = backgroundScope,
coroutineDispatchers = testCoroutineDispatchers(),
)
sut.createRoomList(
pageSize = 10,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ import io.element.android.libraries.matrix.test.A_ROOM_ID
import io.element.android.libraries.matrix.test.A_ROOM_ID_2
import io.element.android.libraries.matrix.test.A_ROOM_ID_3
import io.element.android.libraries.matrix.test.room.aRoomSummary
import io.element.android.tests.testutils.testCoroutineDispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import org.junit.Test
import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate

class RoomSummaryListProcessorTest {
@OptIn(ExperimentalCoroutinesApi::class) class RoomSummaryListProcessorTest {
private val summaries = MutableStateFlow<List<RoomSummary>>(emptyList())

@Test
Expand All @@ -34,6 +37,8 @@ class RoomSummaryListProcessorTest {
val newEntry = aRustRoom(A_ROOM_ID_2)
processor.postUpdate(listOf(RoomListEntriesUpdate.Append(listOf(newEntry, newEntry, newEntry))))

advanceUntilIdle()

assertThat(summaries.value.count()).isEqualTo(4)
assertThat(summaries.value.subList(1, 4).all { it.roomId == A_ROOM_ID_2 }).isTrue()
}
Expand All @@ -44,6 +49,8 @@ class RoomSummaryListProcessorTest {
val processor = createProcessor()
processor.postUpdate(listOf(RoomListEntriesUpdate.PushBack(aRustRoom(A_ROOM_ID_2))))

advanceUntilIdle()

assertThat(summaries.value.count()).isEqualTo(2)
assertThat(summaries.value.last().roomId).isEqualTo(A_ROOM_ID_2)
}
Expand All @@ -54,6 +61,8 @@ class RoomSummaryListProcessorTest {
val processor = createProcessor()
processor.postUpdate(listOf(RoomListEntriesUpdate.PushFront(aRustRoom(A_ROOM_ID_2))))

advanceUntilIdle()

assertThat(summaries.value.count()).isEqualTo(2)
assertThat(summaries.value.first().roomId).isEqualTo(A_ROOM_ID_2)
}
Expand All @@ -66,6 +75,8 @@ class RoomSummaryListProcessorTest {

processor.postUpdate(listOf(RoomListEntriesUpdate.Set(index.toUInt(), aRustRoom(A_ROOM_ID_2))))

advanceUntilIdle()

assertThat(summaries.value.count()).isEqualTo(1)
assertThat(summaries.value[index].roomId).isEqualTo(A_ROOM_ID_2)
}
Expand All @@ -78,6 +89,8 @@ class RoomSummaryListProcessorTest {

processor.postUpdate(listOf(RoomListEntriesUpdate.Insert(index.toUInt(), aRustRoom(A_ROOM_ID_2))))

advanceUntilIdle()

assertThat(summaries.value.count()).isEqualTo(2)
assertThat(summaries.value[index].roomId).isEqualTo(A_ROOM_ID_2)
}
Expand All @@ -93,6 +106,8 @@ class RoomSummaryListProcessorTest {

processor.postUpdate(listOf(RoomListEntriesUpdate.Remove(index.toUInt())))

advanceUntilIdle()

assertThat(summaries.value.count()).isEqualTo(1)
assertThat(summaries.value[index].roomId).isEqualTo(A_ROOM_ID_2)
}
Expand All @@ -108,6 +123,8 @@ class RoomSummaryListProcessorTest {

processor.postUpdate(listOf(RoomListEntriesUpdate.PopBack))

advanceUntilIdle()

assertThat(summaries.value.count()).isEqualTo(1)
assertThat(summaries.value[index].roomId).isEqualTo(A_ROOM_ID)
}
Expand All @@ -123,6 +140,8 @@ class RoomSummaryListProcessorTest {

processor.postUpdate(listOf(RoomListEntriesUpdate.PopFront))

advanceUntilIdle()

assertThat(summaries.value.count()).isEqualTo(1)
assertThat(summaries.value[index].roomId).isEqualTo(A_ROOM_ID_2)
}
Expand All @@ -137,6 +156,8 @@ class RoomSummaryListProcessorTest {

processor.postUpdate(listOf(RoomListEntriesUpdate.Clear))

advanceUntilIdle()

assertThat(summaries.value).isEmpty()
}

Expand All @@ -151,6 +172,8 @@ class RoomSummaryListProcessorTest {

processor.postUpdate(listOf(RoomListEntriesUpdate.Truncate(1u)))

advanceUntilIdle()

assertThat(summaries.value.count()).isEqualTo(1)
assertThat(summaries.value[index].roomId).isEqualTo(A_ROOM_ID)
}
Expand All @@ -166,6 +189,8 @@ class RoomSummaryListProcessorTest {

processor.postUpdate(listOf(RoomListEntriesUpdate.Reset(listOf(aRustRoom(A_ROOM_ID_3)))))

advanceUntilIdle()

assertThat(summaries.value.count()).isEqualTo(1)
assertThat(summaries.value[index].roomId).isEqualTo(A_ROOM_ID_3)
}
Expand All @@ -180,5 +205,6 @@ class RoomSummaryListProcessorTest {
FakeFfiRoomListService(),
coroutineContext = StandardTestDispatcher(testScheduler),
roomSummaryDetailsFactory = RoomSummaryFactory(),
coroutineDispatchers = testCoroutineDispatchers(),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ private fun TestScope.createRustRoomListService(
roomListFactory = RoomListFactory(
innerRoomListService = roomListService,
sessionCoroutineScope = backgroundScope,
coroutineDispatchers = testCoroutineDispatchers(),
),
roomSyncSubscriber = RoomSyncSubscriber(
roomListService = roomListService,
Expand Down
Loading