Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,9 @@ SELECT * FROM MessageDetailsView WHERE id = ? AND conversationId = ?;
countByConversationIdAndVisibility:
SELECT count(*) FROM Message WHERE conversation_id = ? AND visibility IN ?;

selectOldestVisibleMessageTimestampByConversationId:
SELECT MIN(creation_date) FROM Message WHERE conversation_id = ? AND visibility = "VISIBLE";

selectByConversationIdAndVisibility:
SELECT * FROM MessageDetailsView WHERE conversationId = :conversationId AND visibility IN :visibility ORDER BY date DESC LIMIT :limit OFFSET :offset;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ interface MessageDAO {
suspend fun updateMessagesStatus(status: MessageEntity.Status, id: List<String>, conversationId: QualifiedIDEntity)
suspend fun getMessageById(id: String, conversationId: QualifiedIDEntity): MessageEntity?
suspend fun observeMessageById(id: String, conversationId: QualifiedIDEntity): Flow<MessageEntity?>
suspend fun getOldestVisibleMessageTimestampByConversationId(conversationId: ConversationIDEntity): Long?
suspend fun getMessagesByConversationAndVisibility(
conversationId: QualifiedIDEntity,
limit: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import kotlinx.coroutines.flow.map
import kotlinx.coroutines.withContext
import kotlinx.datetime.Instant

@Suppress("TooManyFunctions", "LongParameterList")
@Suppress("TooManyFunctions", "LongParameterList", "LargeClass")
internal class MessageDAOImpl internal constructor(
private val queries: MessagesQueries,
private val attachmentsQueries: MessageAttachmentsQueries,
Expand Down Expand Up @@ -289,6 +289,14 @@ internal class MessageDAOImpl internal constructor(
.distinctUntilChanged()
.flowOn(readDispatcher.value)

override suspend fun getOldestVisibleMessageTimestampByConversationId(
conversationId: ConversationIDEntity
): Long? = withContext(readDispatcher.value) {
queries.selectOldestVisibleMessageTimestampByConversationId(conversationId)
.executeAsOneOrNull()
?.MIN?.toEpochMilliseconds()
}

override suspend fun getImageMessageAssets(
conversationId: QualifiedIDEntity,
mimeTypes: Set<String>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ import com.wire.kalium.protobuf.nomaddevice.NomadDeviceQualifiedId
import kotlinx.datetime.Instant
import kotlin.io.encoding.Base64

internal data class NomadMappedMessages(
public data class NomadMappedMessages(
val totalMessages: Int,
val messages: List<NomadMessageToInsert>,
val skippedMessages: Int,
)

internal class NomadAllMessagesMapper {
public class NomadAllMessagesMapper {

fun map(
public fun map(
response: NomadAllMessagesResponse,
): NomadMappedMessages {
var skipped = 0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Wire
* Copyright (C) 2026 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/

package com.wire.kalium.logic.feature.message

import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.message.MessageRepository
import com.wire.kalium.util.KaliumDispatcher
import com.wire.kalium.util.KaliumDispatcherImpl
import kotlinx.coroutines.withContext

public interface FetchOlderNomadMessagesByConversationUseCase {
/**
* Fetches older messages for a given conversation in a remote data source and stores them in the local database.
* This is typically used when the user scrolls up in the message list we want to load more messages from the past.
*/
public suspend operator fun invoke(
conversationId: ConversationId,
pageSize: Int = DEFAULT_PAGE_SIZE,
)

private companion object {
const val DEFAULT_PAGE_SIZE = 50
}
}

internal class FetchOlderNomadMessagesByConversationUseCaseImpl(
private val dispatcher: KaliumDispatcher = KaliumDispatcherImpl,
private val messageRepository: MessageRepository,
) : FetchOlderNomadMessagesByConversationUseCase {

override suspend operator fun invoke(
conversationId: ConversationId,
pageSize: Int,
): Unit = withContext(dispatcher.default) {
messageRepository.extensions.fetchOlderNomadMessagesByConversationId(
conversationId = conversationId,
pageSize = pageSize,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,6 @@ public val MessageScope.getPaginatedFlowOfAssetMessageByConversationId: GetPagin

public val MessageScope.observePaginatedImageAssetMessageByConversationId: ObservePaginatedAssetImageMessages
get() = ObservePaginatedAssetImageMessages(dispatcher, messageRepository)

public val MessageScope.fetchOlderMessagesByConversationId: FetchOlderNomadMessagesByConversationUseCase
get() = FetchOlderNomadMessagesByConversationUseCaseImpl(dispatcher, messageRepository)
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import com.wire.kalium.logic.data.id.toDao
import com.wire.kalium.logic.data.id.toModel
import com.wire.kalium.logic.data.message.linkpreview.LinkPreviewMapper
import com.wire.kalium.logic.data.message.mention.MessageMentionMapper
import com.wire.kalium.logic.data.message.paging.NomadMessagePagingCoordinator
import com.wire.kalium.logic.data.notification.LocalNotification
import com.wire.kalium.logic.data.notification.LocalNotificationMessageMapper
import com.wire.kalium.logic.data.notification.LocalNotificationMessageMapperImpl
Expand Down Expand Up @@ -330,6 +331,7 @@ internal class MessageDataSource internal constructor(
private val messageApi: MessageApi,
private val mlsMessageApi: MLSMessageApi,
private val messageDAO: MessageDAO,
nomadMessagePagingCoordinator: NomadMessagePagingCoordinator? = null,
private val sendMessageFailureMapper: SendMessageFailureMapper = MapperProvider.sendMessageFailureMapper(),
private val messageMapper: MessageMapper = MapperProvider.messageMapper(selfUserId),
private val linkPreviewMapper: LinkPreviewMapper = MapperProvider.linkPreviewMapper(),
Expand All @@ -339,7 +341,11 @@ internal class MessageDataSource internal constructor(
private val notificationMapper: LocalNotificationMessageMapper = LocalNotificationMessageMapperImpl()
) : MessageRepository {

override val extensions: MessageRepositoryExtensions = MessageRepositoryExtensionsImpl(messageDAO, messageMapper)
override val extensions: MessageRepositoryExtensions = MessageRepositoryExtensionsImpl(
messageDAO,
messageMapper,
nomadMessagePagingCoordinator,
)

override suspend fun getMessagesByConversationIdAndVisibility(
conversationId: ConversationId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import com.wire.kalium.logic.data.asset.AssetMessage
import com.wire.kalium.logic.data.asset.SUPPORTED_IMAGE_ASSET_MIME_TYPES
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.id.toDao
import com.wire.kalium.logic.data.message.paging.NomadMessagePagingCoordinator
import com.wire.kalium.persistence.dao.asset.AssetMessageEntity
import com.wire.kalium.persistence.dao.message.KaliumPager
import com.wire.kalium.persistence.dao.message.MessageDAO
import com.wire.kalium.persistence.dao.message.MessageEntity
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.datetime.Clock

internal interface MessageRepositoryExtensions {
suspend fun getPaginatedMessagesByConversationIdAndVisibility(
Expand Down Expand Up @@ -58,11 +60,17 @@ internal interface MessageRepositoryExtensions {
pagingConfig: PagingConfig,
startingOffset: Long
): Flow<PagingData<AssetMessage>>

suspend fun fetchOlderNomadMessagesByConversationId(
conversationId: ConversationId,
pageSize: Int,
)
}

internal class MessageRepositoryExtensionsImpl internal constructor(
private val messageDAO: MessageDAO,
private val messageMapper: MessageMapper,
private val nomadMessagePagingCoordinator: NomadMessagePagingCoordinator? = null,
) : MessageRepositoryExtensions {

override suspend fun getPaginatedMessagesByConversationIdAndVisibility(
Expand All @@ -75,7 +83,7 @@ internal class MessageRepositoryExtensionsImpl internal constructor(
conversationId.toDao(),
visibility.map { it.toEntityVisibility() },
pagingConfig,
startingOffset
startingOffset,
)

return pager.pagingDataFlow.map {
Expand Down Expand Up @@ -134,4 +142,20 @@ internal class MessageRepositoryExtensionsImpl internal constructor(
it.map { messageEntity -> messageMapper.fromAssetEntityToAssetMessage(messageEntity) }
}
}

override suspend fun fetchOlderNomadMessagesByConversationId(
conversationId: ConversationId,
pageSize: Int,
) {
val coordinator = nomadMessagePagingCoordinator ?: return
val beforeTimestampMs = messageDAO.getOldestVisibleMessageTimestampByConversationId(
conversationId.toDao()
) ?: Clock.System.now().toEpochMilliseconds()
coordinator.fetchOlderMessagesIfNeeded(
conversationId = conversationId,
pageSize = pageSize,
beforeTimestampMs = beforeTimestampMs,
onInvalidate = {}
)
}
}
Loading
Loading