diff --git a/app/build.gradle.kts b/app/build.gradle.kts index 81d6fc93c1..1d296eb680 100644 --- a/app/build.gradle.kts +++ b/app/build.gradle.kts @@ -407,6 +407,7 @@ dependencies { implementation(libs.opencsv) implementation(libs.androidx.work.runtime.ktx) implementation(libs.rxbinding) + implementation(libs.retrofit) if (hasIncludedLibSessionUtilProject) { implementation( diff --git a/app/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt b/app/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt index 8be647e12c..dd9c0df504 100644 --- a/app/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt +++ b/app/src/main/java/org/session/libsession/messaging/sending_receiving/MessageSender.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.supervisorScope import network.loki.messenger.libsession_util.ConfigBase.Companion.PRIORITY_HIDDEN import network.loki.messenger.libsession_util.ConfigBase.Companion.PRIORITY_VISIBLE import network.loki.messenger.libsession_util.Namespace +import network.loki.messenger.libsession_util.protocol.SessionProtocol import network.loki.messenger.libsession_util.util.BlindKeyAPI import network.loki.messenger.libsession_util.util.ExpiryMode import nl.komponents.kovenant.Promise @@ -28,7 +29,6 @@ import org.session.libsession.messaging.messages.visible.VisibleMessage import org.session.libsession.messaging.open_groups.OpenGroupApi import org.session.libsession.messaging.open_groups.OpenGroupApi.Capability import org.session.libsession.messaging.open_groups.OpenGroupMessage -import org.session.libsession.messaging.utilities.MessageWrapper import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeAPI.nowWithOffset import org.session.libsession.snode.SnodeMessage @@ -36,7 +36,6 @@ import org.session.libsession.snode.utilities.asyncPromise import org.session.libsession.utilities.Address import org.session.libsession.utilities.SSKEnvironment import org.session.libsignal.crypto.PushTransportDetails -import org.session.libsignal.protos.SignalServiceProtos import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.Base64 import org.session.libsignal.utilities.Hex @@ -44,7 +43,6 @@ import org.session.libsignal.utilities.IdPrefix import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.defaultRequiresAuth import org.session.libsignal.utilities.hasNamespaces -import org.session.libsignal.utilities.hexEncodedPublicKey import java.util.concurrent.TimeUnit import org.session.libsession.messaging.sending_receiving.link_preview.LinkPreview as SignalLinkPreview import org.session.libsession.messaging.sending_receiving.quotes.QuoteModel as SignalQuote @@ -137,54 +135,44 @@ object MessageSender { // Set the timestamp on the content so it can be verified against envelope timestamp proto.setSigTimestampMs(message.sentTimestamp!!) - // Serialize the protobuf - val plaintext = PushTransportDetails.getPaddedMessageBody(proto.build().toByteArray()) + val plainText = PushTransportDetails.getPaddedMessageBody(proto.build().toByteArray()) - // Envelope information - val kind: SignalServiceProtos.Envelope.Type - val senderPublicKey: String - when (destination) { + val wrappedMessage = when (destination) { is Destination.Contact -> { - kind = SignalServiceProtos.Envelope.Type.SESSION_MESSAGE - senderPublicKey = "" - } - is Destination.LegacyClosedGroup -> { - kind = SignalServiceProtos.Envelope.Type.CLOSED_GROUP_MESSAGE - senderPublicKey = destination.groupPublicKey - } - is Destination.ClosedGroup -> { - kind = SignalServiceProtos.Envelope.Type.CLOSED_GROUP_MESSAGE - senderPublicKey = destination.publicKey + if (isSyncMessage) { + SessionProtocol.encryptFor1o1( + plaintext = plainText, + myEd25519PrivKey = storage.getUserED25519KeyPair()!!.secretKey.data, + timestampMs = message.sentTimestamp!!, + recipientPubKey = AccountId(userPublicKey!!).prefixedBytes, + proRotatingEd25519PrivKey = null, + ) + } else { + SessionProtocol.encryptFor1o1( + plaintext = plainText, + myEd25519PrivKey = storage.getUserED25519KeyPair()!!.secretKey.data, + timestampMs = message.sentTimestamp!!, + recipientPubKey = AccountId(destination.publicKey).prefixedBytes, + proRotatingEd25519PrivKey = null, + ) + } } - else -> throw IllegalStateException("Destination should not be open group.") - } - // Encrypt the serialized protobuf - val ciphertext = when (destination) { - is Destination.Contact -> MessageEncrypter.encrypt(plaintext, destination.publicKey) - is Destination.LegacyClosedGroup -> { - val encryptionKeyPair = - MessagingModuleConfiguration.shared.storage.getLatestClosedGroupEncryptionKeyPair( - destination.groupPublicKey - )!! - MessageEncrypter.encrypt(plaintext, encryptionKeyPair.hexEncodedPublicKey) - } is Destination.ClosedGroup -> { - val envelope = MessageWrapper.createEnvelope(kind, message.sentTimestamp!!, senderPublicKey, proto.build().toByteArray()) - configFactory.withGroupConfigs(AccountId(destination.publicKey)) { - it.groupKeys.encrypt(envelope.toByteArray()) - } + val groupId = AccountId(destination.publicKey) + SessionProtocol.encryptForGroup( + plaintext = plainText, + myEd25519PrivKey = storage.getUserED25519KeyPair()!!.secretKey.data, + timestampMs = message.sentTimestamp!!, + groupEd25519PublicKey = groupId.prefixedBytes, + groupEd25519PrivateKey = configFactory.withGroupConfigs(groupId) { it.groupKeys.groupEncKey() }, + proRotatingEd25519PrivKey = null, + ) } + else -> throw IllegalStateException("Destination should not be open group.") } - // Wrap the result using envelope information - val wrappedMessage = when (destination) { - is Destination.ClosedGroup -> { - // encrypted bytes from the above closed group encryption and envelope steps - ciphertext - } - else -> MessageWrapper.wrap(kind, message.sentTimestamp!!, senderPublicKey, ciphertext) - } + val base64EncodedData = Base64.encodeBytes(wrappedMessage) // Send the result return SnodeMessage( @@ -365,8 +353,8 @@ object MessageSender { if (message !is VisibleMessage || !message.isValid()) { throw Error.InvalidMessage } - val messageBody = content.toByteArray() - val plaintext = PushTransportDetails.getPaddedMessageBody(messageBody) + + val plaintext = content.toByteArray() val openGroupMessage = OpenGroupMessage( sender = message.sender, sentTimestamp = message.sentTimestamp!!, @@ -386,14 +374,17 @@ object MessageSender { if (message !is VisibleMessage || !message.isValid()) { throw Error.InvalidMessage } - val messageBody = content.toByteArray() - val plaintext = PushTransportDetails.getPaddedMessageBody(messageBody) - val ciphertext = MessageEncrypter.encryptBlinded( - plaintext, - destination.blindedPublicKey, - destination.serverPublicKey + val contentBytes = content.toByteArray() + val cipherText = SessionProtocol.encryptForCommunityInbox( + plaintext = contentBytes, + myEd25519PrivKey = userEdKeyPair.secretKey.data, + timestampMs = message.sentTimestamp!!, + recipientPubKey = AccountId(destination.blindedPublicKey).prefixedBytes, + communityServerPubKey = Hex.fromStringCondensed(destination.serverPublicKey), + proRotatingEd25519PrivKey = null ) - val base64EncodedData = Base64.encodeBytes(ciphertext) + + val base64EncodedData = Base64.encodeBytes(cipherText) OpenGroupApi.sendDirectMessage(base64EncodedData, destination.blindedPublicKey, destination.server).success { message.openGroupServerMessageID = it.id handleSuccessfulMessageSend(message, destination, openGroupSentTimestamp = TimeUnit.SECONDS.toMillis(it.postedAt)) diff --git a/app/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageManager.kt b/app/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageManager.kt new file mode 100644 index 0000000000..2ca9a4f5b0 --- /dev/null +++ b/app/src/main/java/org/session/libsession/messaging/sending_receiving/ReceivedMessageManager.kt @@ -0,0 +1,34 @@ +package org.session.libsession.messaging.sending_receiving + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.launch +import org.session.libsession.utilities.Address +import org.thoughtcrime.securesms.database.ReceivedMessageDatabase +import org.thoughtcrime.securesms.dependencies.ManagerScope +import org.thoughtcrime.securesms.dependencies.OnAppStartupComponent +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +class ReceivedMessageManager @Inject constructor( + private val handler: ReceivedMessageHandler, + @param:ManagerScope private val scope: CoroutineScope, +): OnAppStartupComponent { + override fun onPostAppStarted() { + scope.launch { +// database.getSwarmMessagesSorted() + } + } + + private fun launchMessageProcessing( + address: Address, + messages: ReceiveChannel + ) { + scope.launch { + for (msg in messages) { + + } + } + } +} \ No newline at end of file diff --git a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt index 13dfeea206..b9578fd743 100644 --- a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt +++ b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt @@ -25,26 +25,31 @@ import kotlinx.coroutines.flow.stateIn import kotlinx.coroutines.launch import kotlinx.coroutines.supervisorScope import network.loki.messenger.libsession_util.Namespace +import network.loki.messenger.libsession_util.protocol.DecryptEnvelopeKey +import network.loki.messenger.libsession_util.protocol.SessionProtocol import org.session.libsession.database.StorageProtocol import org.session.libsession.database.userAuth -import org.session.libsession.messaging.MessagingModuleConfiguration -import org.session.libsession.messaging.jobs.BatchMessageReceiveJob -import org.session.libsession.messaging.jobs.JobQueue -import org.session.libsession.messaging.jobs.MessageReceiveParameters -import org.session.libsession.snode.RawResponse import org.session.libsession.snode.SnodeAPI +import org.session.libsession.snode.SnodeClock +import org.session.libsession.snode.StorageRPCService +import org.session.libsession.snode.endpoint.Authenticated +import org.session.libsession.snode.endpoint.ExtendTtl import org.session.libsession.snode.utilities.await import org.session.libsession.utilities.ConfigFactoryProtocol import org.session.libsession.utilities.ConfigMessage import org.session.libsession.utilities.TextSecurePreferences import org.session.libsession.utilities.UserConfigType import org.session.libsignal.database.LokiAPIDatabaseProtocol -import org.session.libsignal.utilities.Base64 +import org.session.libsignal.protos.SignalServiceProtos +import org.session.libsignal.utilities.AccountId +import org.session.libsignal.utilities.IdPrefix import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Snode +import org.thoughtcrime.securesms.database.ReceivedMessageHashDatabase import org.thoughtcrime.securesms.util.AppVisibilityManager +import org.thoughtcrime.securesms.util.DateUtils.Companion.toEpochSeconds import org.thoughtcrime.securesms.util.NetworkConnectivity -import kotlin.time.Duration.Companion.days +import java.time.Duration private const val TAG = "Poller" @@ -57,12 +62,21 @@ class Poller @AssistedInject constructor( private val preferences: TextSecurePreferences, private val appVisibilityManager: AppVisibilityManager, private val networkConnectivity: NetworkConnectivity, - private val batchMessageReceiveJobFactory: BatchMessageReceiveJob.Factory, + private val storageRPCService: StorageRPCService, + private val snodeClock: SnodeClock, + private val receivedMessageHashDatabase: ReceivedMessageHashDatabase, + fetcherFactory: SnodeMessageFetcher.Factory, @Assisted scope: CoroutineScope ) { private val userPublicKey: String get() = storage.getUserPublicKey().orEmpty() + private val fetcher by lazy { + fetcherFactory.create( + swarmAuthProvider = { requireNotNull(storage.userAuth) } + ) + } + private val manualRequestTokens: SendChannel val pollState: StateFlow @@ -119,7 +133,7 @@ class Poller @AssistedInject constructor( // To migrate to multi part config, we'll need to fetch all the config messages so we // get the chance to process those multipart messages again... lokiApiDatabase.clearLastMessageHashesByNamespaces(*allConfigNamespaces) - lokiApiDatabase.clearReceivedMessageHashValuesByNamespaces(*allConfigNamespaces) + receivedMessageHashDatabase.removeHashesByNamespaces(allConfigNamespaces.asIterable()) preferences.migratedToMultiPartConfig = true } @@ -208,79 +222,64 @@ class Poller @AssistedInject constructor( } } - private fun processPersonalMessages(snode: Snode, rawMessages: RawResponse) { - val messages = SnodeAPI.parseRawMessagesResponse(rawMessages, snode, userPublicKey) - val parameters = messages.map { (envelope, serverHash) -> - MessageReceiveParameters(envelope.toByteArray(), serverHash = serverHash) - } - parameters.chunked(BatchMessageReceiveJob.BATCH_DEFAULT_NUMBER).forEach { chunk -> - JobQueue.shared.add(batchMessageReceiveJobFactory.create( - messages = chunk, - fromCommunity = null - )) - } - } + private suspend fun processPersonalMessages(fetchResult: SnodeMessageFetcher.FetchResult) { + val privateKey = DecryptEnvelopeKey.Regular( + ed25519PrivKey = storage.getUserED25519KeyPair()!!.secretKey.data + ) + + fetchResult.processMessagesInBatches { batch -> + //TODO: + Log.d(TAG, "Processing batch of ${batch.size} personal messages") + for (msg in batch) { + val envelope = SessionProtocol.decryptEnvelope( + key = privateKey, + payload = msg.dataDecoded, + nowEpochSeconds = snodeClock.currentTime().toEpochSeconds(), + proBackendPubKey = ByteArray(32) + ) - private fun processConfig(snode: Snode, rawMessages: RawResponse, forConfig: UserConfigType) { - Log.d(TAG, "Received ${rawMessages.size} messages for $forConfig") - val messages = rawMessages["messages"] as? List<*> - val namespace = forConfig.namespace - val processed = if (!messages.isNullOrEmpty()) { - SnodeAPI.updateLastMessageHashValueIfPossible(snode, userPublicKey, messages, namespace) - SnodeAPI.removeDuplicates( - publicKey = userPublicKey, - messages = messages, - messageHashGetter = { (it as? Map<*, *>)?.get("hash") as? String }, - namespace = namespace, - updateStoredHashes = true - ).mapNotNull { rawMessageAsJSON -> - rawMessageAsJSON as Map<*, *> // removeDuplicates should have ensured this is always a map - val hashValue = rawMessageAsJSON["hash"] as? String ?: return@mapNotNull null - val b64EncodedBody = rawMessageAsJSON["data"] as? String ?: return@mapNotNull null - val timestamp = rawMessageAsJSON["t"] as? Long ?: SnodeAPI.nowWithOffset - val body = Base64.decode(b64EncodedBody) - ConfigMessage(data = body, hash = hashValue, timestamp = timestamp) - } - } else emptyList() + val senderAccountId = AccountId(IdPrefix.STANDARD, envelope.senderEd25519PubKey.data) + val content = SignalServiceProtos.Content.parseFrom(envelope.contentPlainText.data) + val timestamp = envelope.timestamp - Log.d(TAG, "About to process ${processed.size} messages for $forConfig") + Log.d(TAG, "From $senderAccountId, Time = $timestamp: $content") - if (processed.isEmpty()) return - try { - configFactory.mergeUserConfigs( - userConfigType = forConfig, - messages = processed, - ) - } catch (e: Exception) { - Log.e(TAG, "Error while merging user configs", e) + } } + } + + private suspend fun processConfig(fetchResult: SnodeMessageFetcher.FetchResult, forConfig: UserConfigType) { + fetchResult.processMessages { messages -> + Log.d(TAG, "Processing batch of ${messages.size} messages for $forConfig") - Log.d(TAG, "Completed processing messages for $forConfig") + if (messages.isNotEmpty()) { + try { + configFactory.mergeUserConfigs( + userConfigType = forConfig, + messages = messages.map { msg -> + ConfigMessage(data = msg.dataDecoded, hash = msg.hash, timestamp = msg.timestamp.toEpochMilli()) + }, + ) + } catch (e: Exception) { + Log.e(TAG, "Error while merging user configs", e) + } + } + } } private suspend fun poll(snode: Snode, pollOnlyUserProfileConfig: Boolean) = supervisorScope { - val userAuth = requireNotNull(MessagingModuleConfiguration.shared.storage.userAuth) + val userAuth = requireNotNull(storage.userAuth) // Get messages call wrapped in an async val fetchMessageTask = if (!pollOnlyUserProfileConfig) { - val request = SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - lastHash = lokiApiDatabase.getLastMessageHashValue( - snode = snode, - publicKey = userAuth.accountId.hexString, - namespace = Namespace.DEFAULT() - ), - auth = userAuth, - maxSize = -2) - this.async { runCatching { - SnodeAPI.sendBatchRequest( + fetcher.fetchLatestMessages( snode = snode, - publicKey = userPublicKey, - request = request, - responseType = Map::class.java + namespace = Namespace.DEFAULT(), + maxSize = -2 ) } } @@ -301,20 +300,14 @@ class Poller @AssistedInject constructor( .map { type -> val config = configs.getConfig(type) hashesToExtend += config.activeHashes() - val request = SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - lastHash = lokiApiDatabase.getLastMessageHashValue( - snode = snode, - publicKey = userAuth.accountId.hexString, - namespace = type.namespace - ), - auth = userAuth, - namespace = type.namespace, - maxSize = -8 - ) this.async { type to runCatching { - SnodeAPI.sendBatchRequest(snode, userPublicKey, request, Map::class.java) + fetcher.fetchLatestMessages( + snode = snode, + namespace = type.namespace, + maxSize = -8 + ) } } } @@ -323,14 +316,16 @@ class Poller @AssistedInject constructor( if (hashesToExtend.isNotEmpty()) { launch { try { - SnodeAPI.sendBatchRequest( + storageRPCService.call( snode, - userPublicKey, - SnodeAPI.buildAuthenticatedAlterTtlBatchRequest( + endpoint = Authenticated( + clock = snodeClock, + realEndpoint = ExtendTtl, + swarmAuth = userAuth + ), + req = ExtendTtl.Request( + newExpiry = snodeClock.currentTime() + Duration.ofDays(14), messageHashes = hashesToExtend.toList(), - auth = userAuth, - newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds, - extend = true ) ) } catch (e: Exception) { @@ -350,7 +345,7 @@ class Poller @AssistedInject constructor( continue } - processConfig(snode, result.getOrThrow(), configType) + processConfig(result.getOrThrow(), configType) } // Process the messages if we requested them @@ -359,7 +354,7 @@ class Poller @AssistedInject constructor( if (result.isFailure) { Log.e(TAG, "Error while fetching messages", result.exceptionOrNull()) } else { - processPersonalMessages(snode, result.getOrThrow()) + processPersonalMessages(result.getOrThrow()) } } } diff --git a/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/SnodeMessageFetcher.kt b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/SnodeMessageFetcher.kt new file mode 100644 index 0000000000..989580006d --- /dev/null +++ b/app/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/SnodeMessageFetcher.kt @@ -0,0 +1,136 @@ +package org.session.libsession.messaging.sending_receiving.pollers + +import dagger.assisted.Assisted +import dagger.assisted.AssistedFactory +import dagger.assisted.AssistedInject +import org.session.libsession.snode.SnodeClock +import org.session.libsession.snode.StorageRPCService +import org.session.libsession.snode.SwarmAuth +import org.session.libsession.snode.endpoint.Authenticated +import org.session.libsession.snode.endpoint.Retrieve +import org.session.libsession.utilities.Address.Companion.toAddress +import org.session.libsignal.utilities.AccountId +import org.session.libsignal.utilities.Snode +import org.thoughtcrime.securesms.database.LokiAPIDatabase +import org.thoughtcrime.securesms.database.ReceivedMessageHashDatabase + +class SnodeMessageFetcher @AssistedInject constructor( + val lokiAPIDatabase: LokiAPIDatabase, + private val storageRPCService: StorageRPCService, + private val snodeClock: SnodeClock, + private val receivedMessageHashDatabase: ReceivedMessageHashDatabase, + @Assisted private val swarmAuthProvider: () -> SwarmAuth, +) { + + inner class FetchResult( + private val allMessages: List, + private val swarmAccountId: AccountId, + private val newMessages: List, + private val snode: Snode, + private val namespace: Int + ) { + suspend fun processMessagesInBatches( + chunkSize: Int = 50, + processor: suspend (List) -> T + ): List { + if (newMessages.isEmpty()) { + val result = processor(newMessages) + + if (allMessages.isNotEmpty()) { + lokiAPIDatabase.setLastMessageHashValue( + snode = snode, + publicKey = swarmAccountId.hexString, + namespace = namespace, + newValue = allMessages.maxBy { it.timestamp }.hash + ) + } + + return listOf(result) + } + + return buildList { + for (batch in newMessages.asSequence().chunked(chunkSize)) { + add(processor(batch)) + + // For each batch's completion, update the last processed message hash and + // store the processed message hashes to avoid reprocessing. + if (batch.isNotEmpty()) { + lokiAPIDatabase.setLastMessageHashValue( + snode = snode, + publicKey = swarmAccountId.hexString, + namespace = namespace, + newValue = batch.maxBy { it.timestamp }.hash + ) + + receivedMessageHashDatabase.addNewMessageHashes( + hashes = batch.asSequence().map { it.hash }, + repositoryAddress = swarmAccountId.toAddress(), + namespace = namespace + ) + } + } + } + } + + suspend fun processMessages(processor: suspend (List) -> T): T { + return processMessagesInBatches( + chunkSize = newMessages.size, + processor = processor + ).first() + } + } + + + suspend fun fetchLatestMessages( + snode: Snode, + namespace: Int, + maxSize: Int?, + ): FetchResult { + val swarmAuth = swarmAuthProvider() + val swarmAccountId = swarmAuth.accountId + + val request = Retrieve.Request( + namespace = namespace.takeIf { it != 0 }, + lastHash = lokiAPIDatabase.getLastMessageHashValue( + snode = snode, + publicKey = swarmAccountId.hexString, + namespace = namespace + ).orEmpty(), + maxSize = maxSize + ) + + val messages = storageRPCService.call( + snode = snode, + endpoint = Authenticated( + clock = snodeClock, + realEndpoint = Retrieve, + swarmAuth = swarmAuth, + ), + req = request, + ).messages + .sortedBy { it.timestamp } + + val newMessages = receivedMessageHashDatabase.dedupMessages( + messages = messages, + hash = Retrieve.Message::hash, + repositoryAddress = swarmAccountId.toAddress(), + namespace = namespace + ).toList() + + return FetchResult( + swarmAccountId = swarmAccountId, + newMessages = newMessages, + allMessages = messages, + snode = snode, + namespace = namespace + ) + } + + + @AssistedFactory + interface Factory { + fun create( + swarmAuthProvider: () -> SwarmAuth + ): SnodeMessageFetcher + } +} \ No newline at end of file diff --git a/app/src/main/java/org/session/libsession/snode/AutoBatchRPCService.kt b/app/src/main/java/org/session/libsession/snode/AutoBatchRPCService.kt new file mode 100644 index 0000000000..63ff787dab --- /dev/null +++ b/app/src/main/java/org/session/libsession/snode/AutoBatchRPCService.kt @@ -0,0 +1,156 @@ +package org.session.libsession.snode + +import android.os.SystemClock +import dagger.assisted.Assisted +import dagger.assisted.AssistedFactory +import dagger.assisted.AssistedInject +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.launch +import kotlinx.coroutines.selects.onTimeout +import kotlinx.coroutines.selects.select +import kotlinx.serialization.json.Json +import org.session.libsession.snode.endpoint.Batch +import org.session.libsession.snode.endpoint.Endpoint +import org.session.libsignal.utilities.Snode +import org.thoughtcrime.securesms.dependencies.ManagerScope + +@OptIn(ExperimentalCoroutinesApi::class) +class AutoBatchRPCService @AssistedInject constructor( + private val json: Json, + @Assisted private val realService: StorageRPCService, + @ManagerScope scope: CoroutineScope, +) : StorageRPCService { + + private data class BatchKey(val snodeAddress: String, val batchKey: String) + + private data class BatchRequestInfo( + val key: BatchKey, + val snode: Snode, + val endpoint: Endpoint, + val request: T, + val resultCallback: SendChannel>, + val requestTime: Long = SystemClock.elapsedRealtime(), + ) + + private val batchChannel: SendChannel> + + init { + val channel = Channel>() + batchChannel = channel + + scope.launch { + val batches = hashMapOf>>() + + while (true) { + val batch = select>?> { + // If we receive a request, add it to the batch + channel.onReceive { req -> + batches.getOrPut(req.key) { mutableListOf() }.add(req) + null + } + + // If we have anything in the batch, look for the one that is about to expire + // and wait for it to expire, remove it from the batches and send it for + // processing. + if (batches.isNotEmpty()) { + val earliestBatch = batches.minBy { it.value.first().requestTime } + val deadline = earliestBatch.value.first().requestTime + BATCH_WINDOW_MILLS + onTimeout( + timeMillis = (deadline - SystemClock.elapsedRealtime()).coerceAtLeast(0) + ) { + batches.remove(earliestBatch.key) + } + } + } + + if (batch != null) { + launch batch@{ + val snode = batch.first().snode + @Suppress("UNCHECKED_CAST") val responses = try { + realService.call(snode, Batch(), Batch.Request(batch.map { + Batch.Request.Item( + method = it.endpoint.methodName, + params = (it.endpoint as Endpoint).serializeRequest(json, it.request) ) + })) + } catch (e: Exception) { + for (req in batch) { + runCatching { + req.resultCallback.send(Result.failure(e)) + } + } + return@batch + } + + // For each response, parse the result, match it with the request then send + // back through the request's callback. + for ((req, resp) in batch.zip(responses.results)) { + val result = runCatching { + if (!resp.isSuccessful) { + throw Batch.Response.Error(resp) + } + + req.endpoint.deserializeResponse(json, resp.body) + } + + runCatching { + @Suppress("UNCHECKED_CAST") + (req.resultCallback as SendChannel).send(result) + } + } + + // Close all channels in the requests just in case we don't have paired up + // responses. + for (req in batch) { + req.resultCallback.close() + } + } + } + } + + } + } + + override suspend fun call( + snode: Snode, + endpoint: Endpoint, + req: Req + ): Res { + if (endpoint is Batch) { + return realService.call(snode, endpoint, req) + } + + val batchKey = endpoint.batchKey(req) + + if (batchKey == null) { + // No batch key provided, we won't know how to group the requests together, + // so just call this request directly + return realService.call(snode, endpoint, req) + } + + val callback = Channel>() + + batchChannel.send( + BatchRequestInfo( + snode = snode, + endpoint = endpoint, + request = req, + resultCallback = callback, + key = BatchKey(snode.address, batchKey) + ) + ) + + return callback.receive().getOrThrow() + } + + @AssistedFactory + interface Factory { + fun create(realService: StorageRPCService): AutoBatchRPCService + } + + companion object { + const val BATCH_WINDOW_MILLS = 100L + } +} \ No newline at end of file diff --git a/app/src/main/java/org/session/libsession/snode/OnionRequestAPI.kt b/app/src/main/java/org/session/libsession/snode/OnionRequestAPI.kt index 9315ed424d..1b5194277d 100644 --- a/app/src/main/java/org/session/libsession/snode/OnionRequestAPI.kt +++ b/app/src/main/java/org/session/libsession/snode/OnionRequestAPI.kt @@ -110,7 +110,7 @@ object OnionRequestAPI { val destinationSymmetricKey: ByteArray ) - internal sealed class Destination(val description: String) { + sealed class Destination(val description: String) { class Snode(val snode: org.session.libsignal.utilities.Snode) : Destination("Service node ${snode.ip}:${snode.port}") class Server(val host: String, val target: String, val x25519PublicKey: String, val scheme: String, val port: Int) : Destination("$host") } @@ -327,7 +327,7 @@ object OnionRequestAPI { /** * Sends an onion request to `destination`. Builds new paths as needed. */ - private fun sendOnionRequest( + fun sendOnionRequest( destination: Destination, payload: ByteArray, version: Version @@ -429,7 +429,7 @@ object OnionRequestAPI { /** * Sends an onion request to `snode`. Builds new paths as needed. */ - internal fun sendOnionRequest( + fun sendOnionRequest( method: Snode.Method, parameters: Map<*, *>, snode: Snode, diff --git a/app/src/main/java/org/session/libsession/snode/OnionRoutedRPCService.kt b/app/src/main/java/org/session/libsession/snode/OnionRoutedRPCService.kt new file mode 100644 index 0000000000..6130266dc4 --- /dev/null +++ b/app/src/main/java/org/session/libsession/snode/OnionRoutedRPCService.kt @@ -0,0 +1,49 @@ +package org.session.libsession.snode + +import kotlinx.serialization.ExperimentalSerializationApi +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonElement +import kotlinx.serialization.json.decodeFromStream +import org.session.libsession.snode.endpoint.Endpoint +import org.session.libsession.snode.model.OnionSnodeRequest +import org.session.libsession.snode.utilities.await +import org.session.libsignal.utilities.Snode +import java.io.ByteArrayInputStream +import javax.inject.Inject + +class OnionRoutedRPCService @Inject constructor( + private val json: Json, +) : StorageRPCService { + @OptIn(ExperimentalSerializationApi::class) + override suspend fun call( + snode: Snode, + endpoint: Endpoint, + req: Req + ): Res { + val request = OnionSnodeRequest( + method = endpoint.methodName, + params = endpoint.serializeRequest(json, req) + ) + + val resp = OnionRequestAPI.sendOnionRequest( + destination = OnionRequestAPI.Destination.Snode(snode), + payload = json.encodeToString(request).toByteArray(), + version = Version.V3 + ).await() + + if (resp.code == 200 || resp.code == null) { + val body = json.decodeFromStream( + ByteArrayInputStream( + resp.body!!.data, + resp.body.offset, + resp.body.len + ) + ) + + return endpoint.deserializeResponse(json, body) + } else { + //TODO: Exception translation + throw Exception("Onion RPC call failed with code ${resp.code}") + } + } +} \ No newline at end of file diff --git a/app/src/main/java/org/session/libsession/snode/SnodeAPI.kt b/app/src/main/java/org/session/libsession/snode/SnodeAPI.kt index 7b471a216c..ef466dd81e 100644 --- a/app/src/main/java/org/session/libsession/snode/SnodeAPI.kt +++ b/app/src/main/java/org/session/libsession/snode/SnodeAPI.kt @@ -12,7 +12,6 @@ import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.SendChannel -import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch import kotlinx.coroutines.selects.onTimeout import kotlinx.coroutines.selects.select @@ -20,10 +19,8 @@ import network.loki.messenger.libsession_util.ED25519 import network.loki.messenger.libsession_util.Hash import network.loki.messenger.libsession_util.SessionEncrypt import nl.komponents.kovenant.Promise -import nl.komponents.kovenant.all import nl.komponents.kovenant.functional.bind import nl.komponents.kovenant.functional.map -import nl.komponents.kovenant.unwrap import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.utilities.MessageWrapper import org.session.libsession.snode.model.BatchResponse @@ -40,14 +37,12 @@ import org.session.libsignal.database.LokiAPIDatabaseProtocol import org.session.libsignal.protos.SignalServiceProtos import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.Base64 -import org.session.libsignal.utilities.Broadcaster import org.session.libsignal.utilities.HTTP import org.session.libsignal.utilities.Hex import org.session.libsignal.utilities.JsonUtil import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Snode import org.session.libsignal.utilities.prettifiedDescription -import org.session.libsignal.utilities.retryIfNeeded import org.session.libsignal.utilities.retryWithUniformInterval import java.util.Locale import kotlin.collections.component1 @@ -790,12 +785,6 @@ object SnodeAPI { } } - fun getMessages(auth: SwarmAuth): MessageListPromise = scope.retrySuspendAsPromise(maxRetryCount) { - val snode = getSingleTargetSnode(auth.accountId.hexString).await() - val resp = getRawMessages(snode, auth).await() - parseRawMessagesResponse(resp, snode, auth.accountId.hexString) - } - fun getNetworkTime(snode: Snode): Promise, Exception> = invoke(Snode.Method.Info, snode, emptyMap()).map { rawResponse -> val timestamp = rawResponse["timestamp"] as? Long ?: -1 @@ -959,18 +948,6 @@ object SnodeAPI { ) } - fun parseRawMessagesResponse(rawResponse: RawResponse, snode: Snode, publicKey: String, namespace: Int = 0, updateLatestHash: Boolean = true, updateStoredHashes: Boolean = true, decrypt: ((ByteArray) -> Pair?)? = null): List> = - (rawResponse["messages"] as? List<*>)?.let { messages -> - if (updateLatestHash) updateLastMessageHashValueIfPossible(snode, publicKey, messages, namespace) - removeDuplicates( - publicKey = publicKey, - messages = parseEnvelopes(messages, decrypt), - messageHashGetter = { it.second }, - namespace = namespace, - updateStoredHashes = updateStoredHashes - ) - } ?: listOf() - fun updateLastMessageHashValueIfPossible(snode: Snode, publicKey: String, rawMessages: List<*>, namespace: Int) { val lastMessageAsJSON = rawMessages.lastOrNull() as? Map<*, *> val hashValue = lastMessageAsJSON?.get("hash") as? String @@ -980,44 +957,6 @@ object SnodeAPI { } } - /** - * - * - * TODO Use a db transaction, synchronizing is sufficient for now because - * database#setReceivedMessageHashValues is only called here. - */ - @Synchronized - fun removeDuplicates( - publicKey: String, - messages: List, - messageHashGetter: (M) -> String?, - namespace: Int, - updateStoredHashes: Boolean - ): List { - val hashValues = database.getReceivedMessageHashValues(publicKey, namespace)?.toMutableSet() ?: mutableSetOf() - return messages - .filter { message -> - val hash = messageHashGetter(message) - if (hash == null) { - Log.d("Loki", "Missing hash value for message: ${message?.prettifiedDescription()}.") - return@filter false - } - - val isNew = hashValues.add(hash) - - if (!isNew) { - Log.d("Loki", "Duplicate message hash: $hash.") - } - - isNew - } - .also { - if (updateStoredHashes && it.isNotEmpty()) { - database.setReceivedMessageHashValues(publicKey, hashValues, namespace) - } - } - } - private fun parseEnvelopes(rawMessages: List<*>, decrypt: ((ByteArray)->Pair?)?): List> { return rawMessages.mapNotNull { rawMessage -> val rawMessageAsJSON = rawMessage as? Map<*, *> diff --git a/app/src/main/java/org/session/libsession/snode/SnodeClock.kt b/app/src/main/java/org/session/libsession/snode/SnodeClock.kt index 71382140cf..fc2e4939fe 100644 --- a/app/src/main/java/org/session/libsession/snode/SnodeClock.kt +++ b/app/src/main/java/org/session/libsession/snode/SnodeClock.kt @@ -87,6 +87,10 @@ class SnodeClock @Inject constructor( return currentTimeMills() / 1000 } + fun currentTime(): java.time.Instant { + return java.time.Instant.ofEpochMilli(currentTimeMills()) + } + private class Instant( val systemUptime: Long, val networkTime: Long, diff --git a/app/src/main/java/org/session/libsession/snode/SnodeHiltModule.kt b/app/src/main/java/org/session/libsession/snode/SnodeHiltModule.kt new file mode 100644 index 0000000000..fc94346711 --- /dev/null +++ b/app/src/main/java/org/session/libsession/snode/SnodeHiltModule.kt @@ -0,0 +1,20 @@ +package org.session.libsession.snode + +import dagger.Module +import dagger.Provides +import dagger.hilt.InstallIn +import dagger.hilt.components.SingletonComponent +import javax.inject.Singleton + +@Module +@InstallIn(SingletonComponent::class) +class SnodeHiltModule { + @Provides + @Singleton + fun provideStorageRPCService( + onionRPCService: OnionRoutedRPCService, + factory: AutoBatchRPCService.Factory, + ): StorageRPCService { + return factory.create(onionRPCService) + } +} \ No newline at end of file diff --git a/app/src/main/java/org/session/libsession/snode/SnodeModule.kt b/app/src/main/java/org/session/libsession/snode/SnodeModule.kt index 993c73d0b6..5cac8c9bae 100644 --- a/app/src/main/java/org/session/libsession/snode/SnodeModule.kt +++ b/app/src/main/java/org/session/libsession/snode/SnodeModule.kt @@ -1,11 +1,9 @@ package org.session.libsession.snode -import android.app.Application import dagger.Lazy import org.session.libsession.utilities.Environment import org.session.libsession.utilities.TextSecurePreferences import org.session.libsignal.database.LokiAPIDatabaseProtocol -import org.session.libsignal.utilities.Broadcaster import javax.inject.Inject import javax.inject.Singleton diff --git a/app/src/main/java/org/session/libsession/snode/SnodeService.kt b/app/src/main/java/org/session/libsession/snode/SnodeService.kt new file mode 100644 index 0000000000..89ffd06c08 --- /dev/null +++ b/app/src/main/java/org/session/libsession/snode/SnodeService.kt @@ -0,0 +1,38 @@ +package org.session.libsession.snode + +import kotlinx.coroutines.CancellationException +import org.session.libsession.snode.endpoint.Endpoint +import org.session.libsignal.utilities.AccountId +import org.session.libsignal.utilities.Log +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +class SnodeService @Inject constructor( + private val storageRPCService: StorageRPCService, + private val swarmManager: SwarmManager, +) { + suspend fun invokeOnSwarm( + swarmAddress: AccountId, + endpoint: Endpoint, + req: Req + ): Res { + val node = swarmManager.getRandomSwarmNode(swarmAddress) + Log.d(TAG, "Invoking ${endpoint.methodName} on ${node.address}") + try { + return storageRPCService.call(node, endpoint, req) + } catch (e: Exception) { + if (e !is CancellationException) { + Log.e(TAG, "Error invoking ${endpoint.methodName} on ${node.address}", e) + + swarmManager.handleExceptionOnNodeRPC(swarmAddress, node, e) + } + + throw e + } + } + + companion object { + private const val TAG = "SnodeService" + } +} \ No newline at end of file diff --git a/app/src/main/java/org/session/libsession/snode/StorageRPCService.kt b/app/src/main/java/org/session/libsession/snode/StorageRPCService.kt new file mode 100644 index 0000000000..feb333cfb7 --- /dev/null +++ b/app/src/main/java/org/session/libsession/snode/StorageRPCService.kt @@ -0,0 +1,8 @@ +package org.session.libsession.snode + +import org.session.libsession.snode.endpoint.Endpoint +import org.session.libsignal.utilities.Snode + +interface StorageRPCService { + suspend fun call(snode: Snode, endpoint: Endpoint, req: Req): Res +} \ No newline at end of file diff --git a/app/src/main/java/org/session/libsession/snode/SwarmManager.kt b/app/src/main/java/org/session/libsession/snode/SwarmManager.kt new file mode 100644 index 0000000000..76e79b53cd --- /dev/null +++ b/app/src/main/java/org/session/libsession/snode/SwarmManager.kt @@ -0,0 +1,20 @@ +package org.session.libsession.snode + +import org.session.libsignal.utilities.AccountId +import org.session.libsignal.utilities.Snode +import javax.inject.Singleton + +@Singleton +class SwarmManager { + suspend fun getRandomSwarmNode(swarmAddress: AccountId): Snode { + TODO() + } + + suspend fun handleExceptionOnNodeRPC( + swarmAddress: AccountId, + snode: Snode, + exception: Exception + ) { + TODO() + } +} \ No newline at end of file diff --git a/app/src/main/java/org/session/libsession/snode/endpoint/Authenticated.kt b/app/src/main/java/org/session/libsession/snode/endpoint/Authenticated.kt new file mode 100644 index 0000000000..c24b1387a9 --- /dev/null +++ b/app/src/main/java/org/session/libsession/snode/endpoint/Authenticated.kt @@ -0,0 +1,39 @@ +package org.session.libsession.snode.endpoint + +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.JsonPrimitive +import org.session.libsession.snode.SnodeClock +import org.session.libsession.snode.SwarmAuth + +class Authenticated( + private val clock: SnodeClock, + private val realEndpoint: Endpoint, + private val swarmAuth: SwarmAuth, +) : Endpoint by realEndpoint { + override fun serializeRequest( + json: Json, + request: Req + ): JsonObject { + val element = (realEndpoint.serializeRequest(json, request) as JsonObject).toMutableMap() + val timestampMs = clock.currentTimeMills() + val data = request.getSigningData(timestampMs) + + for ((key, value) in swarmAuth.sign(data)) { + element[key] = JsonPrimitive(value) + } + + element["timestamp"] = JsonPrimitive(timestampMs) + element["pubkey"] = JsonPrimitive(swarmAuth.accountId.hexString) + swarmAuth.ed25519PublicKeyHex?.let { + element["pubkey_ed25519"] = JsonPrimitive(it) + } + + return JsonObject(element) + } + + override fun batchKey(request: Req): String? { + return "${realEndpoint.batchKey(request)}-$${swarmAuth.accountId.hexString}" + } + +} \ No newline at end of file diff --git a/app/src/main/java/org/session/libsession/snode/endpoint/Batch.kt b/app/src/main/java/org/session/libsession/snode/endpoint/Batch.kt new file mode 100644 index 0000000000..66c08d9f67 --- /dev/null +++ b/app/src/main/java/org/session/libsession/snode/endpoint/Batch.kt @@ -0,0 +1,44 @@ +package org.session.libsession.snode.endpoint + +import kotlinx.serialization.DeserializationStrategy +import kotlinx.serialization.Serializable +import kotlinx.serialization.SerializationStrategy +import kotlinx.serialization.json.JsonElement + +class Batch : SimpleJsonEndpoint() { + override val requestSerializer: SerializationStrategy + get() = Request.serializer() + + override val responseDeserializer: DeserializationStrategy + get() = Response.serializer() + + override val methodName: String + get() = "batch" + + override fun batchKey(request: Request): String? = null + + @Serializable + class Request(val requests: List) { + + @Serializable + class Item(val method: String, val params: JsonElement) + } + + + @Serializable + class Response(val results: List) { + @Serializable + class Result(val code: Int, val body: JsonElement) { + val isSuccessful: Boolean + get() = code in 200..299 + + val isServerError: Boolean + get() = code in 500..599 + + val isSnodeNoLongerPartOfSwarm: Boolean + get() = code == 421 + } + + class Error(val result: Result): Exception("Batch request failed with code ${result.code}, body = ${result.body}") + } +} \ No newline at end of file diff --git a/app/src/main/java/org/session/libsession/snode/endpoint/Endpoint.kt b/app/src/main/java/org/session/libsession/snode/endpoint/Endpoint.kt new file mode 100644 index 0000000000..25465a0796 --- /dev/null +++ b/app/src/main/java/org/session/libsession/snode/endpoint/Endpoint.kt @@ -0,0 +1,26 @@ +package org.session.libsession.snode.endpoint + +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonElement + +sealed interface Endpoint { + fun serializeRequest( + json: Json, + request: Request + ): JsonElement + + fun deserializeResponse( + json: Json, + response: JsonElement + ): Response + + val methodName: String + + fun batchKey(request: Request): String? + + companion object { + // A convenient constant for endpoints that are always + // batchable among each other + const val BATCH_KEY_ALWAYS = "endpoint" + } +} \ No newline at end of file diff --git a/app/src/main/java/org/session/libsession/snode/endpoint/ExtendTtl.kt b/app/src/main/java/org/session/libsession/snode/endpoint/ExtendTtl.kt new file mode 100644 index 0000000000..bdf27fd660 --- /dev/null +++ b/app/src/main/java/org/session/libsession/snode/endpoint/ExtendTtl.kt @@ -0,0 +1,46 @@ +package org.session.libsession.snode.endpoint + +import kotlinx.serialization.DeserializationStrategy +import kotlinx.serialization.EncodeDefault +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable +import kotlinx.serialization.SerializationStrategy +import org.session.libsession.utilities.serializable.InstantAsMillisSerializer +import java.time.Instant + +object ExtendTtl : SimpleJsonEndpoint() { + override val requestSerializer: SerializationStrategy + get() = Request.serializer() + + override val responseDeserializer: DeserializationStrategy + get() = Response.serializer() + + override val methodName: String + get() = "expire" + + override fun batchKey(request: Request): String? = Endpoint.BATCH_KEY_ALWAYS + + @Serializable + class Request( + @SerialName("expiry") + @Serializable(with = InstantAsMillisSerializer::class) + val newExpiry: Instant, + + @SerialName("messages") + val messageHashes: List, + + val extend: Boolean = true + ) : WithSigningData { + + override fun getSigningData(timestampMs: Long): ByteArray { + return buildString { + append("expireextend") + append(newExpiry.toEpochMilli()) + messageHashes.forEach(this::append) + }.toByteArray() + } + } + + @Serializable + class Response +} \ No newline at end of file diff --git a/app/src/main/java/org/session/libsession/snode/endpoint/Retrieve.kt b/app/src/main/java/org/session/libsession/snode/endpoint/Retrieve.kt new file mode 100644 index 0000000000..28d34dc34d --- /dev/null +++ b/app/src/main/java/org/session/libsession/snode/endpoint/Retrieve.kt @@ -0,0 +1,55 @@ +package org.session.libsession.snode.endpoint + +import kotlinx.serialization.DeserializationStrategy +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable +import kotlinx.serialization.SerializationStrategy +import org.session.libsession.utilities.serializable.InstantAsMillisSerializer +import org.session.libsignal.utilities.Base64 +import java.time.Instant +import javax.annotation.concurrent.ThreadSafe + +object Retrieve : SimpleJsonEndpoint() { + override val requestSerializer: SerializationStrategy + get() = Request.serializer() + override val responseDeserializer: DeserializationStrategy + get() = Response.serializer() + override val methodName: String + get() = "retrieve" + + override fun batchKey(request: Request): String? = Endpoint.BATCH_KEY_ALWAYS + + @Serializable + class Request( + val namespace: Int?, + + @SerialName("last_hash") + val lastHash: String, + + @SerialName("max_size") + val maxSize: Int?, + ) : WithSigningData { + override fun getSigningData(timestampMs: Long): ByteArray { + return if (namespace == null || namespace == 0) { + "retrieve$timestampMs" + } else { + "retrieve$namespace$timestampMs" + }.toByteArray() + } + } + + @Serializable + class Response(val messages: List) + + @Serializable + class Message( + val data: String, + val hash: String, + @Serializable(with = InstantAsMillisSerializer::class) + val timestamp: Instant, + ) { + val dataDecoded by lazy(LazyThreadSafetyMode.NONE) { + Base64.decode(data) + } + } +} \ No newline at end of file diff --git a/app/src/main/java/org/session/libsession/snode/endpoint/SimpleJsonEndpoint.kt b/app/src/main/java/org/session/libsession/snode/endpoint/SimpleJsonEndpoint.kt new file mode 100644 index 0000000000..6894db6ab3 --- /dev/null +++ b/app/src/main/java/org/session/libsession/snode/endpoint/SimpleJsonEndpoint.kt @@ -0,0 +1,21 @@ +package org.session.libsession.snode.endpoint + +import kotlinx.serialization.DeserializationStrategy +import kotlinx.serialization.SerializationStrategy +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonElement + +abstract class SimpleJsonEndpoint : Endpoint { + abstract val requestSerializer: SerializationStrategy + abstract val responseDeserializer: DeserializationStrategy + + override fun serializeRequest( + json: Json, + request: Req + ) = json.encodeToJsonElement(requestSerializer, request) + + override fun deserializeResponse( + json: Json, + response: JsonElement + ) = json.decodeFromJsonElement(responseDeserializer, response) +} \ No newline at end of file diff --git a/app/src/main/java/org/session/libsession/snode/endpoint/WithSigningData.kt b/app/src/main/java/org/session/libsession/snode/endpoint/WithSigningData.kt new file mode 100644 index 0000000000..8ac0f3c1cf --- /dev/null +++ b/app/src/main/java/org/session/libsession/snode/endpoint/WithSigningData.kt @@ -0,0 +1,5 @@ +package org.session.libsession.snode.endpoint + +interface WithSigningData { + fun getSigningData(timestampMs: Long): ByteArray +} \ No newline at end of file diff --git a/app/src/main/java/org/session/libsession/snode/model/OnionSnodeRequest.kt b/app/src/main/java/org/session/libsession/snode/model/OnionSnodeRequest.kt new file mode 100644 index 0000000000..82ef48ad13 --- /dev/null +++ b/app/src/main/java/org/session/libsession/snode/model/OnionSnodeRequest.kt @@ -0,0 +1,10 @@ +package org.session.libsession.snode.model + +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonElement + +@Serializable +class OnionSnodeRequest( + val method: String, + val params: JsonElement, +) \ No newline at end of file diff --git a/app/src/main/java/org/session/libsignal/database/LokiAPIDatabaseProtocol.kt b/app/src/main/java/org/session/libsignal/database/LokiAPIDatabaseProtocol.kt index 9d0d4a6241..c2bf90306d 100644 --- a/app/src/main/java/org/session/libsignal/database/LokiAPIDatabaseProtocol.kt +++ b/app/src/main/java/org/session/libsignal/database/LokiAPIDatabaseProtocol.kt @@ -20,11 +20,7 @@ interface LokiAPIDatabaseProtocol { fun clearLastMessageHashes(publicKey: String) fun clearLastMessageHashesByNamespaces(vararg namespaces: Int) fun clearAllLastMessageHashes() - fun getReceivedMessageHashValues(publicKey: String, namespace: Int): Set? - fun setReceivedMessageHashValues(publicKey: String, newValue: Set, namespace: Int) - fun clearReceivedMessageHashValues(publicKey: String) - fun clearReceivedMessageHashValues() - fun clearReceivedMessageHashValuesByNamespaces(vararg namespaces: Int) + fun getAuthToken(server: String): String? fun setAuthToken(server: String, newValue: String?) fun getLastMessageServerID(room: String, server: String): Long? diff --git a/app/src/main/java/org/session/libsignal/utilities/AccountId.kt b/app/src/main/java/org/session/libsignal/utilities/AccountId.kt index adce91c77b..aa01f6a760 100644 --- a/app/src/main/java/org/session/libsignal/utilities/AccountId.kt +++ b/app/src/main/java/org/session/libsignal/utilities/AccountId.kt @@ -28,6 +28,13 @@ data class AccountId( Hex.fromStringCondensed(hexString.drop(2)) } + /** + * A 33 bytes prefixed pub key bytes + */ + val prefixedBytes: ByteArray by lazy { + byteArrayOf(prefix!!.binaryValue) + pubKeyBytes + } + override fun toString(): String { return truncatedForDisplay() } diff --git a/app/src/main/java/org/session/libsignal/utilities/IdPrefix.kt b/app/src/main/java/org/session/libsignal/utilities/IdPrefix.kt index 8c375b46e2..2652b50ac8 100644 --- a/app/src/main/java/org/session/libsignal/utilities/IdPrefix.kt +++ b/app/src/main/java/org/session/libsignal/utilities/IdPrefix.kt @@ -1,7 +1,7 @@ package org.session.libsignal.utilities -enum class IdPrefix(val value: String) { - STANDARD("05"), BLINDED("15"), UN_BLINDED("00"), GROUP("03"), BLINDEDV2("25"); +enum class IdPrefix(val value: String, val binaryValue: Byte) { + STANDARD("05", 0x05), BLINDED("15", 0x15), UN_BLINDED("00", 0x00), GROUP("03", 0x03), BLINDEDV2("25", 0x25); fun isBlinded() = value == BLINDED.value || value == BLINDEDV2.value diff --git a/app/src/main/java/org/thoughtcrime/securesms/configs/ConfigToDatabaseSync.kt b/app/src/main/java/org/thoughtcrime/securesms/configs/ConfigToDatabaseSync.kt index 15d3a7fede..09cb1b9327 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/configs/ConfigToDatabaseSync.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/configs/ConfigToDatabaseSync.kt @@ -46,6 +46,7 @@ import org.thoughtcrime.securesms.database.LokiAPIDatabase import org.thoughtcrime.securesms.database.LokiMessageDatabase import org.thoughtcrime.securesms.database.MmsDatabase import org.thoughtcrime.securesms.database.MmsSmsDatabase +import org.thoughtcrime.securesms.database.ReceivedMessageHashDatabase import org.thoughtcrime.securesms.database.RecipientSettingsDatabase import org.thoughtcrime.securesms.database.SmsDatabase import org.thoughtcrime.securesms.database.ThreadDatabase @@ -87,6 +88,7 @@ class ConfigToDatabaseSync @Inject constructor( private val messageNotifier: MessageNotifier, private val recipientSettingsDatabase: RecipientSettingsDatabase, private val avatarCacheCleaner: AvatarCacheCleaner, + private val receivedMessageHashDatabase: ReceivedMessageHashDatabase, @param:ManagerScope private val scope: CoroutineScope, ) : OnAppStartupComponent { init { @@ -190,7 +192,7 @@ class ConfigToDatabaseSync @Inject constructor( private fun deleteGroupData(address: Address.Group) { lokiAPIDatabase.clearLastMessageHashes(address.accountId.hexString) - lokiAPIDatabase.clearReceivedMessageHashValues(address.accountId.hexString) + receivedMessageHashDatabase.removeHashesByRepo(address) } private fun onLegacyGroupAdded( diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/LokiAPIDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/database/LokiAPIDatabase.kt index 6a16e68a7d..5e3cd26d4c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/LokiAPIDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/LokiAPIDatabase.kt @@ -2,6 +2,11 @@ package org.thoughtcrime.securesms.database import android.content.ContentValues import android.content.Context +import androidx.sqlite.db.transaction +import dagger.hilt.android.qualifiers.ApplicationContext +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonArray +import kotlinx.serialization.json.JsonPrimitive import org.session.libsession.utilities.TextSecurePreferences import org.session.libsignal.crypto.ecc.DjbECPrivateKey import org.session.libsignal.crypto.ecc.DjbECPublicKey @@ -16,10 +21,17 @@ import org.session.libsignal.utilities.removingIdPrefixIfNeeded import org.session.libsignal.utilities.toHexString import org.thoughtcrime.securesms.crypto.IdentityKeyUtil import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper +import org.thoughtcrime.securesms.util.asSequence import java.util.Date +import javax.inject.Inject import javax.inject.Provider +import javax.inject.Singleton -class LokiAPIDatabase(context: Context, helper: Provider) : Database(context, helper), LokiAPIDatabaseProtocol { +@Singleton +class LokiAPIDatabase @Inject constructor( + @ApplicationContext context: Context, + helper: Provider, +) : Database(context, helper), LokiAPIDatabaseProtocol { companion object { // Shared @@ -49,6 +61,7 @@ class LokiAPIDatabase(context: Context, helper: Provider) : = "CREATE TABLE $legacyLastMessageHashValueTable2 ($snode TEXT, $publicKey TEXT, $lastMessageHashValue TEXT, PRIMARY KEY ($snode, $publicKey));" // Received message hash values private const val legacyReceivedMessageHashValuesTable3 = "received_message_hash_values_table_3" + @Deprecated("This table is now deleted. It's only here for migration purpose") private const val receivedMessageHashValuesTable = "session_received_message_hash_values_table" private const val receivedMessageHashValues = "received_message_hash_values" private const val receivedMessageHashNamespace = "received_message_namespace" @@ -311,43 +324,6 @@ class LokiAPIDatabase(context: Context, helper: Provider) : database.delete(lastMessageHashValueTable2, null, null) } - override fun getReceivedMessageHashValues(publicKey: String, namespace: Int): Set? { - val database = readableDatabase - val query = "${Companion.publicKey} = ? AND ${Companion.receivedMessageHashNamespace} = ?" - return database.get(receivedMessageHashValuesTable, query, arrayOf( publicKey, namespace.toString() )) { cursor -> - val receivedMessageHashValuesAsString = cursor.getString(cursor.getColumnIndexOrThrow(Companion.receivedMessageHashValues)) - receivedMessageHashValuesAsString.split("-").toSet() - } - } - - override fun setReceivedMessageHashValues(publicKey: String, newValue: Set, namespace: Int) { - val database = writableDatabase - val receivedMessageHashValuesAsString = newValue.joinToString("-") - val row = wrap(mapOf( - Companion.publicKey to publicKey, - Companion.receivedMessageHashValues to receivedMessageHashValuesAsString, - Companion.receivedMessageHashNamespace to namespace.toString() - )) - val query = "${Companion.publicKey} = ? AND $receivedMessageHashNamespace = ?" - database.insertOrUpdate(receivedMessageHashValuesTable, row, query, arrayOf( publicKey, namespace.toString() )) - } - - override fun clearReceivedMessageHashValues(publicKey: String) { - writableDatabase - .delete(receivedMessageHashValuesTable, "${Companion.publicKey} = ?", arrayOf(publicKey)) - } - - override fun clearReceivedMessageHashValues() { - val database = writableDatabase - database.delete(receivedMessageHashValuesTable, null, null) - } - - override fun clearReceivedMessageHashValuesByNamespaces(vararg namespaces: Int) { - // Note that we don't use SQL parameter as the given namespaces are integer anyway so there's little chance of SQL injection - writableDatabase - .delete(receivedMessageHashValuesTable, "$receivedMessageHashNamespace IN (${namespaces.joinToString(",")})", null) - } - override fun getAuthToken(server: String): String? { val database = readableDatabase return database.get(openGroupAuthTokenTable, "${Companion.server} = ?", wrap(server)) { cursor -> diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/ReceivedMessageDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/database/ReceivedMessageDatabase.kt new file mode 100644 index 0000000000..9103ba8096 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/database/ReceivedMessageDatabase.kt @@ -0,0 +1,154 @@ +package org.thoughtcrime.securesms.database + +import android.content.Context +import androidx.sqlite.db.transaction +import dagger.hilt.android.qualifiers.ApplicationContext +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow +import org.session.libsession.snode.endpoint.Retrieve +import org.session.libsession.utilities.Address +import org.session.libsession.utilities.Address.Companion.toAddress +import org.session.libsignal.protos.SignalServiceProtos +import org.session.libsignal.utilities.AccountId +import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper +import org.thoughtcrime.securesms.util.asSequence +import java.time.Instant +import javax.inject.Provider +import javax.inject.Singleton + +@Singleton +class ReceivedMessageDatabase( + @ApplicationContext context: Context, + openHelper: Provider, +) : Database(context, openHelper) { + private val mutableChangeNotification = MutableSharedFlow
( + extraBufferCapacity = 25, + onBufferOverflow = BufferOverflow.DROP_OLDEST + ) + + val changeNotification: SharedFlow
get() = mutableChangeNotification + + fun saveSwarmMessages( + swarmAddress: AccountId, + namespace: Int, + messages: Sequence + ) { + val changed = writableDatabase.transaction { + val stmt = compileStatement( + """ + INSERT OR IGNORE INTO received_messages + (repository_address, namespace, server_id, data, timestamp_ms) + VALUES (?, ?, ?, ?, ?) + """.trimIndent() + ) + + var insertedSomething = false + + for (msg in messages) { + stmt.clearBindings() + stmt.bindString(1, swarmAddress.hexString) + stmt.bindLong(2, namespace.toLong()) + stmt.bindString(3, msg.hash) + stmt.bindString(4, msg.data) + stmt.bindLong(5, msg.timestamp.toEpochMilli()) + + insertedSomething = insertedSomething || stmt.executeUpdateDelete() > 0 + } + + insertedSomething + } + + if (changed) { + mutableChangeNotification.tryEmit(swarmAddress.toAddress()) + } + } + + /** + * Get all received pending messages sorted by timestamp ascending (oldest first). + */ + fun getMessagesSorted(limit: Int): List { + return readableDatabase.rawQuery( + """ + SELECT namespace, server_id, data, timestamp_ms, repository_address + FROM received_messages + ORDER BY timestamp_ms ASC + LIMIT ? + """.trimIndent(), limit + ).use { cursor -> + cursor.asSequence() + .map { + Message( + id = MessageId( + repositoryAddress = cursor.getString(5), + namespace = cursor.getInt(1), + serverId = cursor.getString(2) + ), + Retrieve.Message( + data = cursor.getString(3), + hash = cursor.getString(2), + timestamp = Instant.ofEpochMilli(cursor.getLong(4)), + ) + ) + } + .toList() + } + } + + fun removeMessages(ids: Sequence) { + val changed = writableDatabase.transaction { + val stmt = compileStatement( + """ + DELETE FROM received_messages + WHERE repository_address = ? AND namespace = ? AND server_id = ? + """.trimIndent() + ) + + buildSet { + for (id in ids) { + stmt.clearBindings() + stmt.bindString(1, id.repositoryAddress) + stmt.bindLong(2, id.namespace.toLong()) + stmt.bindString(3, id.serverId) + + if (stmt.executeUpdateDelete() > 0) { + add(id.repositoryAddress.toAddress()) + } + } + } + } + + for (addr in changed) { + mutableChangeNotification.tryEmit(addr) + } + } + + data class MessageId(val repositoryAddress: String, val namespace: Int, val serverId: String) + data class Message(val id: MessageId, val message: Retrieve.Message) + + companion object { + @JvmStatic + val CREATE_TABLE = arrayOf( + """ + CREATE TABLE received_messages( + --Where this message belongs to : either the user 's pub key, group' s key or a community address + repository_address TEXT NOT NULL, + + --The namespace this message belongs to (only relevant for swarm messages, should be 0 for other type of repositories + namespace INTEGER NOT NULL DEFAULT 0, + + --The value that can be used to identify this message on the server: hash for swarm messages and id for communities + server_id TEXT NOT NULL, + + --The raw data that belongs to this repository (note: this can have different formats depending on the repository type) + data TEXT NOT NULL, + + timestamp_ms INTEGER NOT NULL, + + PRIMARY KEY (repository_address, namespace, server_id) + ) WITHOUT ROWID + """, + "CREATE INDEX idx_received_messages_repository_ts ON received_messages(repository_address, timestamp_ms)" + ) + } +} \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/ReceivedMessageHashDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/database/ReceivedMessageHashDatabase.kt new file mode 100644 index 0000000000..e85dad9b1f --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/database/ReceivedMessageHashDatabase.kt @@ -0,0 +1,119 @@ +package org.thoughtcrime.securesms.database + +import android.content.Context +import androidx.sqlite.db.transaction +import dagger.hilt.android.qualifiers.ApplicationContext +import kotlinx.serialization.json.JsonArray +import kotlinx.serialization.json.JsonPrimitive +import net.zetetic.database.sqlcipher.SQLiteDatabase +import org.session.libsession.utilities.Address +import org.session.libsignal.utilities.Log +import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper +import org.thoughtcrime.securesms.util.asSequence +import javax.inject.Inject +import javax.inject.Provider +import javax.inject.Singleton + +@Singleton +class ReceivedMessageHashDatabase @Inject constructor( + @ApplicationContext context: Context, + helper: Provider, +): Database(context, helper) { + + fun dedupMessages(messages: Collection, hash: (M) -> String, repositoryAddress: Address, namespace: Int): Sequence { + if (messages.isEmpty()) { + return emptySequence() + } + + val newHashes = readableDatabase.rawQuery(""" + SELECT value FROM json_each(?) + WHERE NOT EXISTS ( + SELECT 1 FROM received_message_hashes + WHERE repository_address = ? + AND namespace = ? + AND hash = value + ) + """, + JsonArray(messages.map { JsonPrimitive(hash(it)) }).toString(), + repositoryAddress.address, + namespace, + ).use { cursor -> + cursor.asSequence() + .mapTo(hashSetOf()) { it.getString(0) } + } + + return messages.asSequence().filter { hash(it) in newHashes } + } + + fun removeHashesByRepo(repositoryAddress: Address) { + writableDatabase.rawExecSQL(""" + DELETE FROM received_message_hashes + WHERE repository_address = ? + """, repositoryAddress.address) + } + + fun removeHashesByNamespaces(namespaces: Iterable) { + writableDatabase.rawExecSQL(""" + DELETE FROM received_message_hashes + WHERE namespace IN (SELECT value FROM json_each(?)) + """, JsonArray(namespaces.map { JsonPrimitive(it) }).toString()) + } + + fun removeAll() { + writableDatabase.rawExecSQL("DELETE FROM received_message_hashes WHERE 1") + } + + fun addNewMessageHashes(hashes: Sequence, repositoryAddress: Address, namespace: Int) { + writableDatabase.transaction { + val stmt = compileStatement(""" + INSERT OR IGNORE INTO received_message_hashes(repository_address, namespace, hash) + VALUES (?, ?, ?) + """) + + for (hash in hashes) { + stmt.clearBindings() + stmt.bindString(1, repositoryAddress.address) + stmt.bindLong(2, namespace.toLong()) + stmt.bindString(3, hash) + if (stmt.executeUpdateDelete() > 0) { + Log.d(TAG, "Added new received message hash for $repositoryAddress/$namespace: $hash") + } + } + } + } + + companion object { + private const val TAG = "ReceivedMessageHashDatabase" + + const val CREATE_TABLE = """ + CREATE TABLE received_message_hashes( + -- Where this message is stored, can be 05/03 or community addresses + repository_address TEXT NOT NULL, + namespace INTEGER NOT NULL DEFAULT 0, + hash TEXT NOT NULL, + + PRIMARY KEY(repository_address, namespace, hash) + ) WITHOUT ROWID + """ + + fun migrateFromOldTable(db: SQLiteDatabase) { + db.rawQuery(""" + SELECT public_key, received_message_namespace, received_message_hash_values + FROM session_received_message_hash_values_table + """).use { cursor -> + while (cursor.moveToNext()) { + val publicKey = cursor.getString(0) + val namespace = cursor.getInt(1) + for (hash in cursor.getString(2).splitToSequence("-")) { + db.rawExecSQL(""" + INSERT OR IGNORE INTO received_message_hashes(repository_address, namespace, hash) + VALUES (?, ?, ?) + """, publicKey, namespace, hash) + } + } + } + + db.rawExecSQL("DROP TABLE session_received_message_hash_values_table") + } + } +} \ No newline at end of file diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/helpers/SQLCipherOpenHelper.java b/app/src/main/java/org/thoughtcrime/securesms/database/helpers/SQLCipherOpenHelper.java index ae56d16c0f..1198aa2964 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/helpers/SQLCipherOpenHelper.java +++ b/app/src/main/java/org/thoughtcrime/securesms/database/helpers/SQLCipherOpenHelper.java @@ -32,6 +32,7 @@ import org.thoughtcrime.securesms.database.MmsSmsDatabase; import org.thoughtcrime.securesms.database.PushDatabase; import org.thoughtcrime.securesms.database.ReactionDatabase; +import org.thoughtcrime.securesms.database.ReceivedMessageHashDatabase; import org.thoughtcrime.securesms.database.RecipientDatabase; import org.thoughtcrime.securesms.database.RecipientSettingsDatabase; import org.thoughtcrime.securesms.database.SearchDatabase; @@ -99,9 +100,10 @@ public class SQLCipherOpenHelper extends SQLiteOpenHelper { private static final int lokiV51 = 72; private static final int lokiV52 = 73; private static final int lokiV53 = 74; + private static final int lokiV54 = 75; // Loki - onUpgrade(...) must be updated to use Loki version numbers if Signal makes any database changes - private static final int DATABASE_VERSION = lokiV53; + private static final int DATABASE_VERSION = lokiV54; private static final int MIN_DATABASE_VERSION = lokiV7; public static final String DATABASE_NAME = "session.db"; @@ -143,8 +145,6 @@ public void postKey(SQLiteConnection connection) { ); this.jsonProvider = jsonProvider; - - Log.d(TAG, "SQLCipherOpenHelper created with database secret: " + databaseSecret.asString()); } @Override @@ -258,6 +258,8 @@ public void onCreate(SQLiteDatabase db) { db.execSQL(CommunityDatabase.MIGRATE_CREATE_TABLE); executeStatements(db, CommunityDatabase.Companion.getMIGRATE_DROP_OLD_TABLES()); + + db.execSQL(ReceivedMessageHashDatabase.CREATE_TABLE); } @Override @@ -587,6 +589,11 @@ public void onUpgrade(SQLiteDatabase db, int oldVersion, int newVersion) { MmsSmsDatabase.migrateLegacyCommunityAddresses2(db); } + if (oldVersion < lokiV54) { + db.execSQL(ReceivedMessageHashDatabase.CREATE_TABLE); + ReceivedMessageHashDatabase.Companion.migrateFromOldTable(db); + } + db.setTransactionSuccessful(); } finally { db.endTransaction(); diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppModule.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppModule.kt index 13591b71ee..79e0b28048 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppModule.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppModule.kt @@ -37,6 +37,9 @@ class AppModule { return Json { ignoreUnknownKeys = true isLenient = true + + // enabled so that default values are present in the json + encodeDefaults = true serializersModule += SerializersModule { modules.forEach { include(it) } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/DatabaseModule.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/DatabaseModule.kt index d7bb41a857..fe36a0f126 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/DatabaseModule.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/DatabaseModule.kt @@ -93,10 +93,6 @@ object DatabaseModule { @Singleton fun searchDatabase(@ApplicationContext context: Context, openHelper: Provider) = SearchDatabase(context,openHelper) - @Provides - @Singleton - fun provideLokiApiDatabase(@ApplicationContext context: Context, openHelper: Provider) = LokiAPIDatabase(context,openHelper) - @Provides @Singleton fun provideLokiMessageDatabase(@ApplicationContext context: Context, openHelper: Provider) = LokiMessageDatabase(context,openHelper) diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/OnAppStartupComponents.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/OnAppStartupComponents.kt index e1730e784e..f71fcbd2ef 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/OnAppStartupComponents.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/OnAppStartupComponents.kt @@ -1,6 +1,7 @@ package org.thoughtcrime.securesms.dependencies import org.session.libsession.messaging.notifications.TokenFetcher +import org.session.libsession.messaging.sending_receiving.ReceivedMessageManager import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPollerManager import org.session.libsession.messaging.sending_receiving.pollers.PollerManager import org.session.libsession.snode.SnodeClock @@ -70,6 +71,7 @@ class OnAppStartupComponents private constructor( avatarUploadManager: AvatarUploadManager, configToDatabaseSync: ConfigToDatabaseSync, subscriptionManagers: Set<@JvmSuppressWildcards SubscriptionManager>, + receivedMessageManager: ReceivedMessageManager, ): this( components = listOf( configUploader, @@ -101,6 +103,7 @@ class OnAppStartupComponents private constructor( subscriptionCoordinator, avatarUploadManager, configToDatabaseSync, + receivedMessageManager, ) + subscriptionManagers ) } diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt index 30f49b1978..898313a8d9 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupManagerV2Impl.kt @@ -44,6 +44,7 @@ import org.session.libsession.snode.SnodeMessage import org.session.libsession.snode.model.BatchResponse import org.session.libsession.snode.utilities.await import org.session.libsession.utilities.Address +import org.session.libsession.utilities.Address.Companion.toAddress import org.session.libsession.utilities.StringSubstitutionConstants.GROUP_NAME_KEY import org.session.libsession.utilities.getGroup import org.session.libsession.utilities.recipients.RecipientData @@ -62,6 +63,7 @@ import org.thoughtcrime.securesms.configs.ConfigUploader import org.thoughtcrime.securesms.database.LokiAPIDatabase import org.thoughtcrime.securesms.database.LokiMessageDatabase import org.thoughtcrime.securesms.database.MmsSmsDatabase +import org.thoughtcrime.securesms.database.ReceivedMessageHashDatabase import org.thoughtcrime.securesms.database.RecipientRepository import org.thoughtcrime.securesms.database.ThreadDatabase import org.thoughtcrime.securesms.dependencies.ConfigFactory @@ -87,6 +89,7 @@ class GroupManagerV2Impl @Inject constructor( private val scope: GroupScope, private val groupPollerManager: GroupPollerManager, private val recipientRepository: RecipientRepository, + private val receivedMessageHashDatabase: ReceivedMessageHashDatabase, ) : GroupManagerV2 { private val dispatcher = Dispatchers.Default @@ -880,7 +883,7 @@ class GroupManagerV2Impl @Inject constructor( // Clear all polling states lokiAPIDatabase.clearLastMessageHashes(groupId.hexString) - lokiAPIDatabase.clearReceivedMessageHashValues(groupId.hexString) + receivedMessageHashDatabase.removeHashesByRepo(groupId.toAddress()) SessionMetaProtocol.clearReceivedMessages() configFactory.deleteGroupConfigs(groupId) diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPoller.kt b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPoller.kt index 7db34f8cb4..d6dbf8023c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPoller.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/GroupPoller.kt @@ -17,19 +17,14 @@ import kotlinx.coroutines.flow.stateIn import kotlinx.coroutines.launch import kotlinx.coroutines.supervisorScope import network.loki.messenger.libsession_util.Namespace -import org.session.libsession.messaging.jobs.BatchMessageReceiveJob -import org.session.libsession.messaging.jobs.JobQueue -import org.session.libsession.messaging.jobs.MessageReceiveParameters -import org.session.libsession.messaging.messages.Destination -import org.session.libsession.snode.RawResponse +import org.session.libsession.messaging.sending_receiving.pollers.SnodeMessageFetcher import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeClock -import org.session.libsession.snode.model.BatchResponse -import org.session.libsession.snode.model.RetrieveMessageResponse +import org.session.libsession.snode.endpoint.Batch +import org.session.libsession.snode.endpoint.Retrieve import org.session.libsession.utilities.ConfigFactoryProtocol import org.session.libsession.utilities.ConfigMessage import org.session.libsession.utilities.getGroup -import org.session.libsignal.database.LokiAPIDatabaseProtocol import org.session.libsignal.exceptions.NonRetryableException import org.session.libsignal.utilities.AccountId import org.session.libsignal.utilities.Log @@ -44,11 +39,10 @@ class GroupPoller @AssistedInject constructor( @Assisted scope: CoroutineScope, @Assisted private val groupId: AccountId, private val configFactoryProtocol: ConfigFactoryProtocol, - private val lokiApiDatabase: LokiAPIDatabaseProtocol, private val clock: SnodeClock, private val appVisibilityManager: AppVisibilityManager, private val groupRevokedMessageHandler: GroupRevokedMessageHandler, - private val batchMessageReceiveJobFactory: BatchMessageReceiveJob.Factory, + fetcherFactory: SnodeMessageFetcher.Factory ) { companion object { private const val POLL_INTERVAL = 3_000L @@ -87,6 +81,12 @@ class GroupPoller @AssistedInject constructor( } } + private val fetcher by lazy { + fetcherFactory.create( + swarmAuthProvider = { configFactoryProtocol.getGroupAuth(groupId)!! } + ) + } + // A channel to send tokens to trigger a poll private val pollOnceTokens = Channel() @@ -226,21 +226,11 @@ class GroupPoller @AssistedInject constructor( val pollingTasks = mutableListOf>>() val receiveRevokeMessage = async { - SnodeAPI.sendBatchRequest( - snode, - groupId.hexString, - SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - lastHash = lokiApiDatabase.getLastMessageHashValue( - snode, - groupId.hexString, - Namespace.REVOKED_GROUP_MESSAGES() - ).orEmpty(), - auth = groupAuth, - namespace = Namespace.REVOKED_GROUP_MESSAGES(), - maxSize = null, - ), - RetrieveMessageResponse::class.java - ).messages.filterNotNull() + fetcher.fetchLatestMessages( + snode = snode, + namespace = Namespace.REVOKED_GROUP_MESSAGES(), + maxSize = null + ) } if (configHashesToExtends.isNotEmpty() && adminKey != null) { @@ -259,24 +249,10 @@ class GroupPoller @AssistedInject constructor( } val groupMessageRetrieval = async { - val lastHash = lokiApiDatabase.getLastMessageHashValue( - snode, - groupId.hexString, - Namespace.GROUP_MESSAGES() - ).orEmpty() - - Log.v(TAG, "Retrieving group($groupId) message since lastHash = $lastHash, snode = ${snode.publicKeySet}") - - SnodeAPI.sendBatchRequest( + fetcher.fetchLatestMessages( snode = snode, - publicKey = groupId.hexString, - request = SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - lastHash = lastHash, - auth = groupAuth, - namespace = Namespace.GROUP_MESSAGES(), - maxSize = null, - ), - responseType = Map::class.java + namespace = Namespace.GROUP_MESSAGES(), + maxSize = null ) } @@ -286,21 +262,11 @@ class GroupPoller @AssistedInject constructor( Namespace.GROUP_MEMBERS() ).map { ns -> async { - SnodeAPI.sendBatchRequest( + fetcher.fetchLatestMessages( snode = snode, - publicKey = groupId.hexString, - request = SnodeAPI.buildAuthenticatedRetrieveBatchRequest( - lastHash = lokiApiDatabase.getLastMessageHashValue( - snode, - groupId.hexString, - ns - ).orEmpty(), - auth = groupAuth, - namespace = ns, - maxSize = null, - ), - responseType = RetrieveMessageResponse::class.java - ).messages.filterNotNull() + namespace = ns, + maxSize = null + ) } } @@ -311,22 +277,16 @@ class GroupPoller @AssistedInject constructor( val result = runCatching { val (keysMessage, infoMessage, membersMessage) = groupConfigRetrieval.map { it.await() } handleGroupConfigMessages(keysMessage, infoMessage, membersMessage) - saveLastMessageHash(snode, keysMessage, Namespace.GROUP_KEYS()) - saveLastMessageHash(snode, infoMessage, Namespace.GROUP_INFO()) - saveLastMessageHash(snode, membersMessage, Namespace.GROUP_MEMBERS()) groupExpired = configFactoryProtocol.withGroupConfigs(groupId) { it.groupKeys.size() == 0 } - val regularMessages = groupMessageRetrieval.await() - handleMessages(regularMessages, snode) + handleMessages(groupMessageRetrieval.await()) } // Revoke message must be handled regardless, and at the end - val revokedMessages = receiveRevokeMessage.await() - handleRevoked(revokedMessages) - saveLastMessageHash(snode, revokedMessages, Namespace.REVOKED_GROUP_MESSAGES()) + handleRevoked(receiveRevokeMessage.await()) // Propagate any prior exceptions result.getOrThrow() @@ -360,12 +320,12 @@ class GroupPoller @AssistedInject constructor( if (error != null && currentSnode != null) { val badResponse = (sequenceOf(error) + error.suppressedExceptions.asSequence()) .firstOrNull { err -> - err.getRootCause()?.item?.let { it.isServerError || it.isSnodeNoLongerPartOfSwarm } == true + err.getRootCause()?.result?.let { it.isServerError || it.isSnodeNoLongerPartOfSwarm } == true } if (badResponse != null) { Log.e(TAG, "Group polling failed due to a server error", badResponse) - pollState.swarmNodes -= currentSnode!! + pollState.swarmNodes -= currentSnode } } } @@ -380,85 +340,86 @@ class GroupPoller @AssistedInject constructor( return pollResult } - private fun RetrieveMessageResponse.Message.toConfigMessage(): ConfigMessage { - return ConfigMessage(hash, data, timestamp ?: clock.currentTimeMills()) - } - - private fun saveLastMessageHash( - snode: Snode, - messages: List, - namespace: Int - ) { - if (messages.isNotEmpty()) { - lokiApiDatabase.setLastMessageHashValue( - snode = snode, - publicKey = groupId.hexString, - newValue = messages.last().hash, - namespace = namespace - ) - } + private fun Retrieve.Message.toConfigMessage(): ConfigMessage { + return ConfigMessage( + hash = hash, + data = dataDecoded, + timestamp = timestamp.toEpochMilli() + ) } - private suspend fun handleRevoked(messages: List) { - groupRevokedMessageHandler.handleRevokeMessage(groupId, messages.map { it.data }) - } - private fun handleGroupConfigMessages( - keysResponse: List, - infoResponse: List, - membersResponse: List - ) { - if (keysResponse.isEmpty() && infoResponse.isEmpty() && membersResponse.isEmpty()) { - return + private suspend fun handleRevoked(fetchResult: SnodeMessageFetcher.FetchResult) { + fetchResult.processMessages { batch -> + Log.d(TAG, "Handling ${batch.size} group revoked messages") + groupRevokedMessageHandler.handleRevokeMessage(groupId, batch.map { it.dataDecoded }) } - Log.d( - TAG, "Handling group config messages(" + - "info = ${infoResponse.size}, " + - "keys = ${keysResponse.size}, " + - "members = ${membersResponse.size})" - ) - - configFactoryProtocol.mergeGroupConfigMessages( - groupId = groupId, - keys = keysResponse.map { it.toConfigMessage() }, - info = infoResponse.map { it.toConfigMessage() }, - members = membersResponse.map { it.toConfigMessage() }, - ) } - private fun handleMessages(body: RawResponse, snode: Snode) { - val messages = configFactoryProtocol.withGroupConfigs(groupId) { - SnodeAPI.parseRawMessagesResponse( - rawResponse = body, - snode = snode, - publicKey = groupId.hexString, - decrypt = { data -> - val (decrypted, sender) = it.groupKeys.decrypt(data) ?: return@parseRawMessagesResponse null - decrypted to AccountId(sender) - }, - namespace = Namespace.GROUP_MESSAGES(), - ) - } + private suspend fun handleGroupConfigMessages( + keysResponse: SnodeMessageFetcher.FetchResult, + infoResponse: SnodeMessageFetcher.FetchResult, + membersResponse: SnodeMessageFetcher.FetchResult + ) { + keysResponse.processMessages { keys -> + infoResponse.processMessages { info -> + membersResponse.processMessages { members -> + Log.d( + TAG, "Handling group config messages(" + + "info = ${info.size}, " + + "keys = ${keys.size}, " + + "members = ${members.size})" + ) - val parameters = messages.map { (envelope, serverHash) -> - MessageReceiveParameters( - envelope.toByteArray(), - serverHash = serverHash, - closedGroup = Destination.ClosedGroup(groupId.hexString) - ) + configFactoryProtocol.mergeGroupConfigMessages( + groupId = groupId, + keys = keys.map { it.toConfigMessage() }, + info = info.map { it.toConfigMessage() }, + members = members.map { it.toConfigMessage() }, + ) + } + } } + } - parameters.chunked(BatchMessageReceiveJob.BATCH_DEFAULT_NUMBER).forEach { chunk -> - JobQueue.shared.add(batchMessageReceiveJobFactory.create( - messages = chunk, - fromCommunity = null - )) + private suspend fun handleMessages(fetchResult: SnodeMessageFetcher.FetchResult) { + fetchResult.processMessagesInBatches { batch -> + Log.d(TAG, "Handling ${batch.size} group messages") + //TODO: Handle group messages } - if (messages.isNotEmpty()) { - Log.d(TAG, "Received and handled ${messages.size} group messages") - } +// val messages = configFactoryProtocol.withGroupConfigs(groupId) { +// SnodeAPI.parseRawMessagesResponse( +// rawResponse = body, +// snode = snode, +// publicKey = groupId.hexString, +// decrypt = { data -> +// val (decrypted, sender) = it.groupKeys.decrypt(data) ?: return@parseRawMessagesResponse null +// decrypted to AccountId(sender) +// }, +// namespace = Namespace.GROUP_MESSAGES(), +// ) +// } +// +// val parameters = messages.map { (envelope, serverHash) -> +// MessageReceiveParameters( +// envelope.toByteArray(), +// serverHash = serverHash, +// closedGroup = Destination.ClosedGroup(groupId.hexString) +// ) +// } +// +// parameters.chunked(BatchMessageReceiveJob.BATCH_DEFAULT_NUMBER).forEach { chunk -> +// JobQueue.shared.add(batchMessageReceiveJobFactory.create( +// messages = chunk, +// fromCommunity = null +// )) +// } +// +// if (messages.isNotEmpty()) { +// Log.d(TAG, "Received and handled ${messages.size} group messages") +// } } /** diff --git a/app/src/main/java/org/thoughtcrime/securesms/notifications/PushReceiver.kt b/app/src/main/java/org/thoughtcrime/securesms/notifications/PushReceiver.kt index 8c34519359..1ae591aa3c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/notifications/PushReceiver.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/notifications/PushReceiver.kt @@ -160,10 +160,10 @@ class PushReceiver @Inject constructor( } if (params != null) { - JobQueue.shared.add(batchJobFactory.create( - messages = listOf(params), - fromCommunity = null - )) +// JobQueue.shared.add(batchJobFactory.create( +// messages = listOf(params), +// fromCommunity = null +// )) } } catch (e: Exception) { Log.d(TAG, "Failed to unwrap data for message due to error.", e) diff --git a/app/src/main/java/org/thoughtcrime/securesms/onboarding/manager/CreateAccountManager.kt b/app/src/main/java/org/thoughtcrime/securesms/onboarding/manager/CreateAccountManager.kt index 5602ccb9d7..6c9f34166a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/onboarding/manager/CreateAccountManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/onboarding/manager/CreateAccountManager.kt @@ -9,6 +9,7 @@ import org.session.libsignal.database.LokiAPIDatabaseProtocol import org.session.libsignal.utilities.KeyHelper import org.session.libsignal.utilities.hexEncodedPublicKey import org.thoughtcrime.securesms.crypto.KeyPairUtilities +import org.thoughtcrime.securesms.database.ReceivedMessageHashDatabase import org.thoughtcrime.securesms.util.VersionDataFetcher import javax.inject.Inject import javax.inject.Singleton @@ -18,7 +19,8 @@ class CreateAccountManager @Inject constructor( private val application: Application, private val prefs: TextSecurePreferences, private val versionDataFetcher: VersionDataFetcher, - private val configFactory: ConfigFactoryProtocol + private val configFactory: ConfigFactoryProtocol, + private val receivedMessageHashDatabase: ReceivedMessageHashDatabase, ) { private val database: LokiAPIDatabaseProtocol get() = SnodeModule.shared.storage @@ -27,7 +29,7 @@ class CreateAccountManager @Inject constructor( // This is here to resolve a case where the app restarts before a user completes onboarding // which can result in an invalid database state database.clearAllLastMessageHashes() - database.clearReceivedMessageHashValues() + receivedMessageHashDatabase.removeAll() val keyPairGenerationResult = KeyPairUtilities.generate() val seed = keyPairGenerationResult.seed diff --git a/app/src/main/java/org/thoughtcrime/securesms/onboarding/manager/LoadAccountManager.kt b/app/src/main/java/org/thoughtcrime/securesms/onboarding/manager/LoadAccountManager.kt index 489742a9a2..9a2cd459b9 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/onboarding/manager/LoadAccountManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/onboarding/manager/LoadAccountManager.kt @@ -1,29 +1,29 @@ package org.thoughtcrime.securesms.onboarding.manager +import android.app.Application import android.content.Context +import dagger.hilt.android.qualifiers.ApplicationContext import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.launch -import org.session.libsession.snode.SnodeModule import org.session.libsession.utilities.TextSecurePreferences import org.session.libsignal.database.LokiAPIDatabaseProtocol import org.session.libsignal.utilities.hexEncodedPublicKey -import org.thoughtcrime.securesms.ApplicationContext import org.thoughtcrime.securesms.crypto.KeyPairUtilities -import org.thoughtcrime.securesms.dependencies.ConfigFactory +import org.thoughtcrime.securesms.database.ReceivedMessageHashDatabase import org.thoughtcrime.securesms.util.VersionDataFetcher import javax.inject.Inject import javax.inject.Singleton @Singleton class LoadAccountManager @Inject constructor( - @dagger.hilt.android.qualifiers.ApplicationContext private val context: Context, + @param:ApplicationContext private val context: Context, private val prefs: TextSecurePreferences, - private val versionDataFetcher: VersionDataFetcher + private val versionDataFetcher: VersionDataFetcher, + private val database: LokiAPIDatabaseProtocol, + private val receivedMessageHashDatabase: ReceivedMessageHashDatabase, ) { - private val database: LokiAPIDatabaseProtocol - get() = SnodeModule.shared.storage private var restoreJob: Job? = null @@ -37,7 +37,7 @@ class LoadAccountManager @Inject constructor( // This is here to resolve a case where the app restarts before a user completes onboarding // which can result in an invalid database state database.clearAllLastMessageHashes() - database.clearReceivedMessageHashValues() + receivedMessageHashDatabase.removeAll() // RestoreActivity handles seed this way val keyPairGenerationResult = KeyPairUtilities.generate(seed) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 7ceafa1fab..6f984fa6db 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -29,7 +29,7 @@ kotlinVersion = "2.2.20" kryoVersion = "5.6.2" kspVersion = "2.2.10-2.0.2" legacySupportV13Version = "1.0.0" -libsessionUtilAndroidVersion = "1.0.8-1-g27817b4" +libsessionUtilAndroidVersion = "1.0.8-8-g1c10785" media3ExoplayerVersion = "1.8.0" mockitoCoreVersion = "5.20.0" navVersion = "2.9.4" @@ -174,6 +174,7 @@ coil-compose = { module = "io.coil-kt.coil3:coil-compose", version.ref = "coilVe coil-gif = { module = "io.coil-kt.coil3:coil-gif", version.ref = "coilVersion" } android-billing = { module = "com.android.billingclient:billing", version.ref = "billingVersion" } android-billing-ktx = { module = "com.android.billingclient:billing-ktx", version.ref = "billingVersion" } +retrofit = { module = "com.squareup.retrofit2:retrofit", version = "3.0.0" } [plugins] android-application = { id = "com.android.application", version.ref = "gradlePluginVersion" }