@@ -54,7 +54,7 @@ import io.element.android.libraries.matrix.impl.verification.RustSessionVerifica
5454import io.element.android.libraries.sessionstorage.api.SessionStore
5555import io.element.android.services.toolbox.api.systemclock.SystemClock
5656import kotlinx.coroutines.CoroutineScope
57- import kotlinx.coroutines.Dispatchers
57+ import kotlinx.coroutines.ExperimentalCoroutinesApi
5858import kotlinx.coroutines.cancel
5959import kotlinx.coroutines.flow.filter
6060import kotlinx.coroutines.flow.first
@@ -73,6 +73,7 @@ import org.matrix.rustcomponents.sdk.CreateRoomParameters as RustCreateRoomParam
7373import org.matrix.rustcomponents.sdk.RoomPreset as RustRoomPreset
7474import org.matrix.rustcomponents.sdk.RoomVisibility as RustRoomVisibility
7575
76+ @OptIn(ExperimentalCoroutinesApi ::class )
7677class RustMatrixClient constructor(
7778 private val client : Client ,
7879 private val sessionStore : SessionStore ,
@@ -85,13 +86,15 @@ class RustMatrixClient constructor(
8586
8687 override val sessionId: UserId = UserId (client.userId())
8788 private val roomListService = client.roomListServiceWithEncryption()
89+ private val sessionDispatcher = dispatchers.io.limitedParallelism(64 )
8890 private val sessionCoroutineScope = appCoroutineScope.childScope(dispatchers.main, " Session-${sessionId} " )
8991 private val verificationService = RustSessionVerificationService ()
9092 private val syncService = RustSyncService (roomListService, sessionCoroutineScope)
9193 private val pushersService = RustPushersService (
9294 client = client,
9395 dispatchers = dispatchers,
9496 )
97+
9598 private val notificationService = RustNotificationService (client)
9699
97100 private val clientDelegate = object : ClientDelegate {
@@ -105,7 +108,7 @@ class RustMatrixClient constructor(
105108 RustRoomSummaryDataSource (
106109 roomListService = roomListService,
107110 sessionCoroutineScope = sessionCoroutineScope,
108- coroutineDispatchers = dispatchers ,
111+ dispatcher = sessionDispatcher ,
109112 )
110113
111114 override val roomSummaryDataSource: RoomSummaryDataSource
@@ -150,7 +153,7 @@ class RustMatrixClient constructor(
150153 )
151154 }
152155
153- private suspend fun pairOfRoom (roomId : RoomId ): Pair <RoomListItem , Room >? = withContext(dispatchers.io ) {
156+ private suspend fun pairOfRoom (roomId : RoomId ): Pair <RoomListItem , Room >? = withContext(sessionDispatcher ) {
154157 val cachedRoomListItem = roomListService.roomOrNull(roomId.value)
155158 val fullRoom = cachedRoomListItem?.fullRoom()
156159 if (cachedRoomListItem == null || fullRoom == null ) {
@@ -165,19 +168,19 @@ class RustMatrixClient constructor(
165168 return roomId?.let { getRoom(it) }
166169 }
167170
168- override suspend fun ignoreUser (userId : UserId ): Result <Unit > = withContext(dispatchers.io ) {
171+ override suspend fun ignoreUser (userId : UserId ): Result <Unit > = withContext(sessionDispatcher ) {
169172 runCatching {
170173 client.ignoreUser(userId.value)
171174 }
172175 }
173176
174- override suspend fun unignoreUser (userId : UserId ): Result <Unit > = withContext(dispatchers.io ) {
177+ override suspend fun unignoreUser (userId : UserId ): Result <Unit > = withContext(sessionDispatcher ) {
175178 runCatching {
176179 client.unignoreUser(userId.value)
177180 }
178181 }
179182
180- override suspend fun createRoom (createRoomParams : CreateRoomParameters ): Result <RoomId > = withContext(dispatchers.io ) {
183+ override suspend fun createRoom (createRoomParams : CreateRoomParameters ): Result <RoomId > = withContext(sessionDispatcher ) {
181184 runCatching {
182185 val rustParams = RustCreateRoomParameters (
183186 name = createRoomParams.name,
@@ -221,14 +224,14 @@ class RustMatrixClient constructor(
221224 return createRoom(createRoomParams)
222225 }
223226
224- override suspend fun getProfile (userId : UserId ): Result <MatrixUser > = withContext(Dispatchers . IO ) {
227+ override suspend fun getProfile (userId : UserId ): Result <MatrixUser > = withContext(sessionDispatcher ) {
225228 runCatching {
226229 client.getProfile(userId.value).let (UserProfileMapper ::map)
227230 }
228231 }
229232
230233 override suspend fun searchUsers (searchTerm : String , limit : Long ): Result <MatrixSearchUserResults > =
231- withContext(dispatchers.io ) {
234+ withContext(sessionDispatcher ) {
232235 runCatching {
233236 client.searchUsers(searchTerm, limit.toULong()).let (UserSearchResultMapper ::map)
234237 }
@@ -260,7 +263,7 @@ class RustMatrixClient constructor(
260263 baseDirectory.deleteSessionDirectory(userID = sessionId.value, deleteCryptoDb = false )
261264 }
262265
263- override suspend fun logout () = withContext(dispatchers.io ) {
266+ override suspend fun logout () = withContext(sessionDispatcher ) {
264267 try {
265268 client.logout()
266269 } catch (failure: Throwable ) {
@@ -271,20 +274,20 @@ class RustMatrixClient constructor(
271274 sessionStore.removeSession(sessionId.value)
272275 }
273276
274- override suspend fun loadUserDisplayName (): Result <String > = withContext(dispatchers.io ) {
277+ override suspend fun loadUserDisplayName (): Result <String > = withContext(sessionDispatcher ) {
275278 runCatching {
276279 client.displayName()
277280 }
278281 }
279282
280- override suspend fun loadUserAvatarURLString (): Result <String ?> = withContext(dispatchers.io ) {
283+ override suspend fun loadUserAvatarURLString (): Result <String ?> = withContext(sessionDispatcher ) {
281284 runCatching {
282285 client.avatarUrl()
283286 }
284287 }
285288
286289 @OptIn(ExperimentalUnsignedTypes ::class )
287- override suspend fun uploadMedia (mimeType : String , data : ByteArray , progressCallback : ProgressCallback ? ): Result <String > = withContext(dispatchers.io ) {
290+ override suspend fun uploadMedia (mimeType : String , data : ByteArray , progressCallback : ProgressCallback ? ): Result <String > = withContext(sessionDispatcher ) {
288291 runCatching {
289292 client.uploadMedia(mimeType, data.toUByteArray().toList(), progressCallback?.toProgressWatcher())
290293 }
@@ -305,7 +308,7 @@ class RustMatrixClient constructor(
305308 private suspend fun File.getCacheSize (
306309 userID : String ,
307310 includeCryptoDb : Boolean = false,
308- ): Long = withContext(dispatchers.io ) {
311+ ): Long = withContext(sessionDispatcher ) {
309312 // Rust sanitises the user ID replacing invalid characters with an _
310313 val sanitisedUserID = userID.replace(" :" , " _" )
311314 val sessionDirectory = File (this @getCacheSize, sanitisedUserID)
@@ -327,7 +330,7 @@ class RustMatrixClient constructor(
327330 private suspend fun File.deleteSessionDirectory (
328331 userID : String ,
329332 deleteCryptoDb : Boolean = false,
330- ): Boolean = withContext(dispatchers.io ) {
333+ ): Boolean = withContext(sessionDispatcher ) {
331334 // Rust sanitises the user ID replacing invalid characters with an _
332335 val sanitisedUserID = userID.replace(" :" , " _" )
333336 val sessionDirectory = File (this @deleteSessionDirectory, sanitisedUserID)
0 commit comments