Skip to content

Commit 8b214a6

Browse files
Refactor community room info polling (#1473)
1 parent 1d02d27 commit 8b214a6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+717
-724
lines changed

app/src/main/java/org/session/libsession/database/StorageProtocol.kt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import org.session.libsession.messaging.messages.visible.Attachment
1414
import org.session.libsession.messaging.messages.visible.Profile
1515
import org.session.libsession.messaging.messages.visible.Reaction
1616
import org.session.libsession.messaging.messages.visible.VisibleMessage
17-
import org.session.libsession.messaging.open_groups.OpenGroup
1817
import org.session.libsession.messaging.sending_receiving.attachments.AttachmentId
1918
import org.session.libsession.messaging.sending_receiving.attachments.DatabaseAttachment
2019
import org.session.libsession.messaging.sending_receiving.data_extraction.DataExtractionNotificationInfoMessage
@@ -67,12 +66,8 @@ interface StorageProtocol {
6766
fun getServerCapabilities(server: String): List<String>?
6867

6968
// Open Groups
70-
fun getAllOpenGroups(): Map<Long, OpenGroup>
71-
fun getOpenGroup(threadId: Long): OpenGroup?
72-
fun getOpenGroup(address: Address): OpenGroup?
7369
suspend fun addOpenGroup(urlAsString: String)
7470
fun setOpenGroupServerMessageID(messageID: MessageId, serverID: Long, threadID: Long)
75-
fun getOpenGroup(room: String, server: String): OpenGroup?
7671

7772
// Open Group Public Keys
7873
fun getOpenGroupPublicKey(server: String): String?
@@ -82,7 +77,6 @@ interface StorageProtocol {
8277
fun updateProfilePicture(groupID: String, newValue: ByteArray)
8378
fun removeProfilePicture(groupID: String)
8479
fun hasDownloadedProfilePicture(groupID: String): Boolean
85-
fun setUserCount(room: String, server: String, newValue: Int)
8680

8781
// Last Message Server ID
8882
fun getLastMessageServerID(room: String, server: String): Long?
@@ -179,7 +173,14 @@ interface StorageProtocol {
179173
/**
180174
* Returns the ID of the `TSIncomingMessage` that was constructed.
181175
*/
182-
fun persist(message: VisibleMessage, quotes: QuoteModel?, linkPreview: List<LinkPreview?>, fromGroup: Address.GroupLike?, attachments: List<Attachment>, runThreadUpdate: Boolean): MessageId?
176+
fun persist(
177+
threadRecipient: Recipient,
178+
message: VisibleMessage,
179+
quotes: QuoteModel?,
180+
linkPreview: List<LinkPreview?>,
181+
attachments: List<Attachment>,
182+
runThreadUpdate: Boolean
183+
): MessageId?
183184
fun markConversationAsRead(threadId: Long, lastSeenTime: Long, force: Boolean = false)
184185
fun markConversationAsUnread(threadId: Long)
185186
fun getLastSeen(threadId: Long): Long

app/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import org.session.libsession.messaging.sending_receiving.attachments.DatabaseAt
1515
import org.session.libsession.messaging.utilities.Data
1616
import org.session.libsession.snode.OnionRequestAPI
1717
import org.session.libsession.snode.utilities.await
18+
import org.session.libsession.utilities.Address
1819
import org.session.libsession.utilities.DecodedAudio
1920
import org.session.libsession.utilities.DownloadUtilities
2021
import org.session.libsession.utilities.InputStreamMediaDataSource
@@ -66,11 +67,12 @@ class AttachmentDownloadJob @AssistedInject constructor(
6667
* of whether the download is allowed, it does not check if the download has already taken
6768
* place.
6869
*/
69-
fun eligibleForDownload(threadID: Long,
70-
storage: StorageProtocol,
71-
recipientRepository: RecipientRepository,
72-
messageDataProvider: MessageDataProvider,
73-
mmsId: Long): Boolean {
70+
fun eligibleForDownload(
71+
threadID: Long,
72+
storage: StorageProtocol,
73+
messageDataProvider: MessageDataProvider,
74+
mmsId: Long
75+
): Boolean {
7476
val threadRecipient = storage.getRecipientForThread(threadID) ?: return false
7577

7678
// if we are the sender we are always eligible
@@ -138,14 +140,15 @@ class AttachmentDownloadJob @AssistedInject constructor(
138140
if (!eligibleForDownload(
139141
threadID = threadID,
140142
storage = storage,
141-
recipientRepository = recipientRepository,
142143
messageDataProvider = messageDataProvider,
143144
mmsId = mmsMessageId
144145
)) {
145146
handleFailure(Error.NoSender, null)
146147
return
147148
}
148149

150+
val threadRecipient = storage.getRecipientForThread(threadID)
151+
149152
var tempFile: File? = null
150153
var attachment: DatabaseAttachment? = null
151154

@@ -157,15 +160,14 @@ class AttachmentDownloadJob @AssistedInject constructor(
157160
return
158161
}
159162
messageDataProvider.setAttachmentState(AttachmentState.DOWNLOADING, attachment.attachmentId, this.mmsMessageId)
160-
val openGroup = storage.getOpenGroup(threadID)
161-
val downloadedData = if (openGroup == null) {
163+
val downloadedData = if (threadRecipient?.address !is Address.Community) {
162164
Log.d("AttachmentDownloadJob", "downloading normal attachment")
163165
DownloadUtilities.downloadFromFileServer(attachment.url).body
164166
} else {
165167
Log.d("AttachmentDownloadJob", "downloading open group attachment")
166168
val url = attachment.url.toHttpUrlOrNull()!!
167169
val fileID = url.pathSegments.last()
168-
OpenGroupApi.download(fileID, openGroup.room, openGroup.server).await()
170+
OpenGroupApi.download(fileID, room = threadRecipient.address.room, server = threadRecipient.address.serverUrl).await()
169171
}
170172

171173
tempFile = createTempFile().also { file ->

app/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import org.session.libsession.messaging.open_groups.OpenGroupApi
1818
import org.session.libsession.messaging.sending_receiving.MessageSender
1919
import org.session.libsession.messaging.utilities.Data
2020
import org.session.libsession.snode.utilities.await
21+
import org.session.libsession.utilities.Address
2122
import org.session.libsession.utilities.DecodedAudio
2223
import org.session.libsession.utilities.InputStreamMediaDataSource
2324
import org.session.libsession.utilities.UploadResult
@@ -30,6 +31,7 @@ import org.session.libsignal.streams.PlaintextOutputStreamFactory
3031
import org.session.libsignal.utilities.Log
3132
import org.session.libsignal.utilities.PushAttachmentData
3233
import org.session.libsignal.utilities.Util
34+
import org.thoughtcrime.securesms.database.ThreadDatabase
3335

3436
class AttachmentUploadJob @AssistedInject constructor(
3537
@Assisted val attachmentID: Long,
@@ -39,6 +41,7 @@ class AttachmentUploadJob @AssistedInject constructor(
3941
private val storage: StorageProtocol,
4042
private val messageDataProvider: MessageDataProvider,
4143
private val messageSendJobFactory: MessageSendJob.Factory,
44+
private val threadDatabase: ThreadDatabase,
4245
) : Job {
4346
override var delegate: JobDelegate? = null
4447
override var id: String? = null
@@ -67,11 +70,13 @@ class AttachmentUploadJob @AssistedInject constructor(
6770
try {
6871
val attachment = messageDataProvider.getScaledSignalAttachmentStream(attachmentID)
6972
?: return handleFailure(dispatcherName, Error.NoAttachment)
70-
val openGroup = storage.getOpenGroup(threadID.toLong())
7173

72-
if (openGroup != null) {
73-
val keyAndResult = upload(attachment, openGroup.server, false) {
74-
OpenGroupApi.upload(it, openGroup.room, openGroup.server)
74+
val threadAddress = threadDatabase.getRecipientForThreadId(threadID.toLong()) ?: return handlePermanentFailure(dispatcherName,
75+
RuntimeException("Thread doesn't exist"))
76+
77+
if (threadAddress is Address.Community) {
78+
val keyAndResult = upload(attachment, threadAddress.serverUrl, false) {
79+
OpenGroupApi.upload(it, threadAddress.room, threadAddress.serverUrl)
7580
}
7681
handleSuccess(dispatcherName, attachment, keyAndResult.first, keyAndResult.second)
7782
} else {

app/src/main/java/org/session/libsession/messaging/jobs/BatchMessageReceiveJob.kt

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ import org.session.libsession.utilities.Address.Companion.toAddress
3636
import org.session.libsession.utilities.ConfigFactoryProtocol
3737
import org.session.libsession.utilities.UserConfigType
3838
import org.session.libsignal.protos.UtilProtos
39-
import org.session.libsignal.utilities.AccountId
4039
import org.session.libsignal.utilities.Log
40+
import org.thoughtcrime.securesms.database.RecipientRepository
4141
import org.thoughtcrime.securesms.database.ThreadDatabase
4242
import org.thoughtcrime.securesms.database.model.MessageId
4343
import org.thoughtcrime.securesms.database.model.ReactionRecord
@@ -61,6 +61,7 @@ class BatchMessageReceiveJob @AssistedInject constructor(
6161
private val visibleMessageHandlerContextFactory: VisibleMessageHandlerContext.Factory,
6262
private val messageNotifier: MessageNotifier,
6363
private val threadDatabase: ThreadDatabase,
64+
private val recipientRepository: RecipientRepository,
6465
) : Job {
6566

6667
override var delegate: JobDelegate? = null
@@ -102,7 +103,8 @@ class BatchMessageReceiveJob @AssistedInject constructor(
102103
visibleMessageHandlerContextFactory = visibleMessageHandlerContextFactory,
103104
messageNotifier = messageNotifier,
104105
fromCommunity = fromCommunity,
105-
threadDatabase = threadDatabase
106+
threadDatabase = threadDatabase,
107+
recipientRepository = recipientRepository,
106108
)
107109
}
108110

@@ -146,7 +148,7 @@ class BatchMessageReceiveJob @AssistedInject constructor(
146148
}
147149

148150
suspend fun executeAsync(dispatcherName: String) {
149-
val threadMap = mutableMapOf<Long, MutableList<ParsedMessage>>()
151+
val threadMap = mutableMapOf<Long, Pair<Address.Conversable, MutableList<ParsedMessage>>>()
150152
val localUserPublicKey = storage.getUserPublicKey()
151153
val serverPublicKey = fromCommunity?.let { storage.getOpenGroupPublicKey(it.serverUrl) }
152154
val currentClosedGroups = storage.getAllActiveClosedGroupPublicKeys()
@@ -171,14 +173,14 @@ class BatchMessageReceiveJob @AssistedInject constructor(
171173
fromCommunity != null -> fromCommunity
172174
message.groupPublicKey != null -> message.groupPublicKey!!.toAddress()
173175
else -> message.senderOrSync.toAddress()
174-
}
176+
} as Address.Conversable
175177

176178
val threadID = if (shouldCreateThread(parsedParams)) {
177179
threadDatabase.getOrCreateThreadIdFor(threadAddress)
178180
} else {
179181
threadDatabase.getThreadIdIfExistsFor(threadAddress)
180182
}
181-
threadMap.getOrPut(threadID) { mutableListOf() } += parsedParams
183+
threadMap.getOrPut(threadID) { threadAddress to mutableListOf() }.second += parsedParams
182184
} catch (e: Exception) {
183185
when (e) {
184186
is MessageReceiver.Error.DuplicateMessage, MessageReceiver.Error.SelfSend -> {
@@ -202,13 +204,14 @@ class BatchMessageReceiveJob @AssistedInject constructor(
202204
}
203205

204206
// iterate over threads and persist them (persistence is the longest constant in the batch process operation)
205-
suspend fun processMessages(threadId: Long, messages: List<ParsedMessage>) {
207+
suspend fun processMessages(threadId: Long, threadAddress: Address.Conversable, messages: List<ParsedMessage>) {
206208
// The LinkedHashMap should preserve insertion order
207209
val messageIds = linkedMapOf<MessageId, Pair<Boolean, Boolean>>()
208210
val myLastSeen = storage.getLastSeen(threadId)
209211
var updatedLastSeen = myLastSeen.takeUnless { it == -1L } ?: 0
210212
val handlerContext = visibleMessageHandlerContextFactory.create(
211213
threadId = threadId,
214+
threadAddress,
212215
)
213216

214217
val communityReactions = mutableMapOf<MessageId, MutableList<ReactionRecord>>()
@@ -262,8 +265,7 @@ class BatchMessageReceiveJob @AssistedInject constructor(
262265
message = message,
263266
proto = proto,
264267
threadId = threadId,
265-
groupv2Id = parameters.closedGroup?.publicKey?.let(::AccountId),
266-
fromCommunity = fromCommunity,
268+
threadAddress = threadAddress
267269
)
268270
}
269271
} catch (e: Exception) {
@@ -297,18 +299,23 @@ class BatchMessageReceiveJob @AssistedInject constructor(
297299

298300
coroutineScope {
299301
val withoutDefault = threadMap.entries.filter { it.key != NO_THREAD_MAPPING }
300-
val deferredThreadMap = withoutDefault.map { (threadId, messages) ->
302+
val deferredThreadMap = withoutDefault.map { (threadId, data) ->
303+
val (threadAddress, messages) = data
301304
async(Dispatchers.Default) {
302-
processMessages(threadId, messages)
305+
processMessages(
306+
threadId = threadId,
307+
threadAddress = threadAddress,
308+
messages = messages
309+
)
303310
}
304311
}
305312
// await all thread processing
306313
deferredThreadMap.awaitAll()
307314
}
308315

309-
val noThreadMessages = threadMap[NO_THREAD_MAPPING] ?: listOf()
310-
if (noThreadMessages.isNotEmpty()) {
311-
processMessages(NO_THREAD_MAPPING, noThreadMessages)
316+
val noThreadMessages = threadMap[NO_THREAD_MAPPING]
317+
if (noThreadMessages != null && noThreadMessages.second.isNotEmpty()) {
318+
processMessages(NO_THREAD_MAPPING, noThreadMessages.first, noThreadMessages.second)
312319
}
313320

314321
if (failures.isEmpty()) {

app/src/main/java/org/session/libsession/messaging/messages/Destination.kt

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,23 +43,19 @@ sealed class Destination {
4343
LegacyClosedGroup(address.groupPublicKeyHex)
4444
}
4545
is Address.Community -> {
46-
val storage = MessagingModuleConfiguration.shared.storage
47-
val threadID = storage.getThreadId(address)!!
48-
storage.getOpenGroup(threadID)?.let {
49-
OpenGroup(roomToken = it.room, server = it.server, fileIds = fileIds)
50-
} ?: throw Exception("Missing open group for thread with ID: $threadID.")
46+
OpenGroup(roomToken = address.room, server = address.serverUrl, fileIds = fileIds)
5147
}
5248
is Address.CommunityBlindedId -> {
5349
val serverPublicKey = MessagingModuleConfiguration.shared.configFactory
5450
.withUserConfigs { configs ->
5551
configs.userGroups.allCommunityInfo()
56-
.first { it.community.baseUrl == address.serverUrl.toString() }
52+
.first { it.community.baseUrl == address.serverUrl }
5753
.community
5854
.pubKeyHex
5955
}
6056

6157
OpenGroupInbox(
62-
server = address.serverUrl.toString(),
58+
server = address.serverUrl,
6359
serverPublicKey = serverPublicKey,
6460
blindedPublicKey = address.blindedId.blindedId.hexString,
6561
)

app/src/main/java/org/session/libsession/messaging/open_groups/OpenGroup.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import org.session.libsession.messaging.open_groups.OpenGroup.Companion.toAddres
99
import org.session.libsession.utilities.Address
1010
import org.session.libsession.utilities.GroupUtil
1111

12+
@Deprecated("This class is no longer used except in migration. Use RoomInfo instead")
1213
@Serializable
1314
data class OpenGroup(
1415
val server: String,

0 commit comments

Comments
 (0)