Skip to content

Commit 0e36a0a

Browse files
feat(nomad devices): batched fetch messages per convo - pagination logic for request of older msgs - pt2 (WPB-24278) (#3991)
* feat: batched fetch add new endpoint and requests/response mapping * feat: add paging coordinator for in mem state management and fetch more calls * fix: compute correctly last page boundary and more logs for debug * feat: change approach, ui triggered end item refill call * remove leftovers * extract interface for usecase * detekt issues fixes * use constant for query param * address pr comment, query table directly * fix: nomadMessagePagingCoordinator property as getter to get fresh instance Refactor nomadMessagePagingCoordinator to use a getter instead of lazy initialization. --------- Co-authored-by: Mohamad Jaara <9083456+MohamadJaara@users.noreply.github.com>
1 parent 425ff19 commit 0e36a0a

File tree

12 files changed

+355
-9
lines changed

12 files changed

+355
-9
lines changed

data/persistence/src/commonMain/db_user/com/wire/kalium/persistence/Messages.sq

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,9 @@ SELECT * FROM MessageDetailsView WHERE id = ? AND conversationId = ?;
440440
countByConversationIdAndVisibility:
441441
SELECT count(*) FROM Message WHERE conversation_id = ? AND visibility IN ?;
442442

443+
selectOldestVisibleMessageTimestampByConversationId:
444+
SELECT MIN(creation_date) FROM Message WHERE conversation_id = ? AND visibility = "VISIBLE";
445+
443446
selectByConversationIdAndVisibility:
444447
SELECT * FROM MessageDetailsView WHERE conversationId = :conversationId AND visibility IN :visibility ORDER BY date DESC LIMIT :limit OFFSET :offset;
445448

data/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/message/MessageDAO.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ interface MessageDAO {
7474
suspend fun updateMessagesStatus(status: MessageEntity.Status, id: List<String>, conversationId: QualifiedIDEntity)
7575
suspend fun getMessageById(id: String, conversationId: QualifiedIDEntity): MessageEntity?
7676
suspend fun observeMessageById(id: String, conversationId: QualifiedIDEntity): Flow<MessageEntity?>
77+
suspend fun getOldestVisibleMessageTimestampByConversationId(conversationId: ConversationIDEntity): Long?
7778
suspend fun getMessagesByConversationAndVisibility(
7879
conversationId: QualifiedIDEntity,
7980
limit: Int,

data/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/message/MessageDAOImpl.kt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ import kotlinx.coroutines.flow.map
5555
import kotlinx.coroutines.withContext
5656
import kotlinx.datetime.Instant
5757

58-
@Suppress("TooManyFunctions", "LongParameterList")
58+
@Suppress("TooManyFunctions", "LongParameterList", "LargeClass")
5959
internal class MessageDAOImpl internal constructor(
6060
private val queries: MessagesQueries,
6161
private val attachmentsQueries: MessageAttachmentsQueries,
@@ -289,6 +289,14 @@ internal class MessageDAOImpl internal constructor(
289289
.distinctUntilChanged()
290290
.flowOn(readDispatcher.value)
291291

292+
override suspend fun getOldestVisibleMessageTimestampByConversationId(
293+
conversationId: ConversationIDEntity
294+
): Long? = withContext(readDispatcher.value) {
295+
queries.selectOldestVisibleMessageTimestampByConversationId(conversationId)
296+
.executeAsOneOrNull()
297+
?.MIN?.toEpochMilliseconds()
298+
}
299+
292300
override suspend fun getImageMessageAssets(
293301
conversationId: QualifiedIDEntity,
294302
mimeTypes: Set<String>,

domain/nomaddevice/src/commonMain/kotlin/com/wire/kalium/nomaddevice/NomadAllMessagesMapper.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,15 @@ import com.wire.kalium.protobuf.nomaddevice.NomadDeviceQualifiedId
3737
import kotlinx.datetime.Instant
3838
import kotlin.io.encoding.Base64
3939

40-
internal data class NomadMappedMessages(
40+
public data class NomadMappedMessages(
4141
val totalMessages: Int,
4242
val messages: List<NomadMessageToInsert>,
4343
val skippedMessages: Int,
4444
)
4545

46-
internal class NomadAllMessagesMapper {
46+
public class NomadAllMessagesMapper {
4747

48-
fun map(
48+
public fun map(
4949
response: NomadAllMessagesResponse,
5050
): NomadMappedMessages {
5151
var skipped = 0
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Wire
3+
* Copyright (C) 2026 Wire Swiss GmbH
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU General Public License as published by
7+
* the Free Software Foundation, either version 3 of the License, or
8+
* (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU General Public License
16+
* along with this program. If not, see http://www.gnu.org/licenses/.
17+
*/
18+
19+
package com.wire.kalium.logic.feature.message
20+
21+
import com.wire.kalium.logic.data.id.ConversationId
22+
import com.wire.kalium.logic.data.message.MessageRepository
23+
import com.wire.kalium.util.KaliumDispatcher
24+
import com.wire.kalium.util.KaliumDispatcherImpl
25+
import kotlinx.coroutines.withContext
26+
27+
public interface FetchOlderNomadMessagesByConversationUseCase {
28+
/**
29+
* Fetches older messages for a given conversation in a remote data source and stores them in the local database.
30+
* This is typically used when the user scrolls up in the message list we want to load more messages from the past.
31+
*/
32+
public suspend operator fun invoke(
33+
conversationId: ConversationId,
34+
pageSize: Int = DEFAULT_PAGE_SIZE,
35+
)
36+
37+
private companion object {
38+
const val DEFAULT_PAGE_SIZE = 50
39+
}
40+
}
41+
42+
internal class FetchOlderNomadMessagesByConversationUseCaseImpl(
43+
private val dispatcher: KaliumDispatcher = KaliumDispatcherImpl,
44+
private val messageRepository: MessageRepository,
45+
) : FetchOlderNomadMessagesByConversationUseCase {
46+
47+
override suspend operator fun invoke(
48+
conversationId: ConversationId,
49+
pageSize: Int,
50+
): Unit = withContext(dispatcher.default) {
51+
messageRepository.extensions.fetchOlderNomadMessagesByConversationId(
52+
conversationId = conversationId,
53+
pageSize = pageSize,
54+
)
55+
}
56+
}

logic/src/androidMain/kotlin/com/wire/kalium/logic/feature/message/MessageScopeExtensions.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,6 @@ public val MessageScope.getPaginatedFlowOfAssetMessageByConversationId: GetPagin
3333

3434
public val MessageScope.observePaginatedImageAssetMessageByConversationId: ObservePaginatedAssetImageMessages
3535
get() = ObservePaginatedAssetImageMessages(dispatcher, messageRepository)
36+
37+
public val MessageScope.fetchOlderMessagesByConversationId: FetchOlderNomadMessagesByConversationUseCase
38+
get() = FetchOlderNomadMessagesByConversationUseCaseImpl(dispatcher, messageRepository)

logic/src/commonMain/kotlin/com/wire/kalium/logic/data/message/MessageRepository.kt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import com.wire.kalium.logic.data.id.toDao
4848
import com.wire.kalium.logic.data.id.toModel
4949
import com.wire.kalium.logic.data.message.linkpreview.LinkPreviewMapper
5050
import com.wire.kalium.logic.data.message.mention.MessageMentionMapper
51+
import com.wire.kalium.logic.data.message.paging.NomadMessagePagingCoordinator
5152
import com.wire.kalium.logic.data.notification.LocalNotification
5253
import com.wire.kalium.logic.data.notification.LocalNotificationMessageMapper
5354
import com.wire.kalium.logic.data.notification.LocalNotificationMessageMapperImpl
@@ -330,6 +331,7 @@ internal class MessageDataSource internal constructor(
330331
private val messageApi: MessageApi,
331332
private val mlsMessageApi: MLSMessageApi,
332333
private val messageDAO: MessageDAO,
334+
nomadMessagePagingCoordinator: NomadMessagePagingCoordinator? = null,
333335
private val sendMessageFailureMapper: SendMessageFailureMapper = MapperProvider.sendMessageFailureMapper(),
334336
private val messageMapper: MessageMapper = MapperProvider.messageMapper(selfUserId),
335337
private val linkPreviewMapper: LinkPreviewMapper = MapperProvider.linkPreviewMapper(),
@@ -339,7 +341,11 @@ internal class MessageDataSource internal constructor(
339341
private val notificationMapper: LocalNotificationMessageMapper = LocalNotificationMessageMapperImpl()
340342
) : MessageRepository {
341343

342-
override val extensions: MessageRepositoryExtensions = MessageRepositoryExtensionsImpl(messageDAO, messageMapper)
344+
override val extensions: MessageRepositoryExtensions = MessageRepositoryExtensionsImpl(
345+
messageDAO,
346+
messageMapper,
347+
nomadMessagePagingCoordinator,
348+
)
343349

344350
override suspend fun getMessagesByConversationIdAndVisibility(
345351
conversationId: ConversationId,

logic/src/commonMain/kotlin/com/wire/kalium/logic/data/message/MessageRepositoryExtensions.kt

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ import com.wire.kalium.logic.data.asset.AssetMessage
2525
import com.wire.kalium.logic.data.asset.SUPPORTED_IMAGE_ASSET_MIME_TYPES
2626
import com.wire.kalium.logic.data.id.ConversationId
2727
import com.wire.kalium.logic.data.id.toDao
28+
import com.wire.kalium.logic.data.message.paging.NomadMessagePagingCoordinator
2829
import com.wire.kalium.persistence.dao.asset.AssetMessageEntity
2930
import com.wire.kalium.persistence.dao.message.KaliumPager
3031
import com.wire.kalium.persistence.dao.message.MessageDAO
3132
import com.wire.kalium.persistence.dao.message.MessageEntity
3233
import kotlinx.coroutines.flow.Flow
3334
import kotlinx.coroutines.flow.map
35+
import kotlinx.datetime.Clock
3436

3537
internal interface MessageRepositoryExtensions {
3638
suspend fun getPaginatedMessagesByConversationIdAndVisibility(
@@ -58,11 +60,17 @@ internal interface MessageRepositoryExtensions {
5860
pagingConfig: PagingConfig,
5961
startingOffset: Long
6062
): Flow<PagingData<AssetMessage>>
63+
64+
suspend fun fetchOlderNomadMessagesByConversationId(
65+
conversationId: ConversationId,
66+
pageSize: Int,
67+
)
6168
}
6269

6370
internal class MessageRepositoryExtensionsImpl internal constructor(
6471
private val messageDAO: MessageDAO,
6572
private val messageMapper: MessageMapper,
73+
private val nomadMessagePagingCoordinator: NomadMessagePagingCoordinator? = null,
6674
) : MessageRepositoryExtensions {
6775

6876
override suspend fun getPaginatedMessagesByConversationIdAndVisibility(
@@ -75,7 +83,7 @@ internal class MessageRepositoryExtensionsImpl internal constructor(
7583
conversationId.toDao(),
7684
visibility.map { it.toEntityVisibility() },
7785
pagingConfig,
78-
startingOffset
86+
startingOffset,
7987
)
8088

8189
return pager.pagingDataFlow.map {
@@ -134,4 +142,20 @@ internal class MessageRepositoryExtensionsImpl internal constructor(
134142
it.map { messageEntity -> messageMapper.fromAssetEntityToAssetMessage(messageEntity) }
135143
}
136144
}
145+
146+
override suspend fun fetchOlderNomadMessagesByConversationId(
147+
conversationId: ConversationId,
148+
pageSize: Int,
149+
) {
150+
val coordinator = nomadMessagePagingCoordinator ?: return
151+
val beforeTimestampMs = messageDAO.getOldestVisibleMessageTimestampByConversationId(
152+
conversationId.toDao()
153+
) ?: Clock.System.now().toEpochMilliseconds()
154+
coordinator.fetchOlderMessagesIfNeeded(
155+
conversationId = conversationId,
156+
pageSize = pageSize,
157+
beforeTimestampMs = beforeTimestampMs,
158+
onInvalidate = {}
159+
)
160+
}
137161
}

0 commit comments

Comments
 (0)