Skip to content

Commit 2c53be0

Browse files
feat: detekt rule for flow consumer in daos [WPB-8645] (#3825)
* chore: ensure flowOn is consistently applied in DAO * chore: ensure flowOn is consistently applied in DAO * compile error * fix: removing mentions validation (WPB-20317) (#3824) * chore: ensure flowOn is consistently applied in DAO --------- Co-authored-by: Sergey Bakhtiarov <sbakhtiarov@gmail.com>
1 parent ae71a9a commit 2c53be0

File tree

22 files changed

+94
-57
lines changed

22 files changed

+94
-57
lines changed

buildSrc/src/main/kotlin/scripts/detekt.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ dependencies {
3131
detekt("io.gitlab.arturbosch.detekt:detekt-cli:$detektVersion")
3232
detektPlugins("io.gitlab.arturbosch.detekt:detekt-formatting:$detektVersion")
3333
detektPlugins("io.gitlab.arturbosch.detekt:detekt-rules-libraries:$detektVersion")
34-
detektPlugins("com.wire:detekt-rules:1.0.0-1.23.6") {
34+
detektPlugins("com.wire:detekt-rules:20260128-162246") {
3535
isChanging = true
3636
}
3737
}

data/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/ConnectionDAOImpl.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,9 @@ class ConnectionDAOImpl(
117117
override suspend fun getConnections(): Flow<List<ConnectionEntity>> {
118118
return connectionsQueries.getConnections()
119119
.asFlow()
120-
.flowOn(readDispatcher.value)
121120
.mapToList()
122121
.map { it.map(connectionMapper::toModel) }
122+
.flowOn(readDispatcher.value)
123123
}
124124

125125
override suspend fun getConnection(conversationId: QualifiedIDEntity): ConnectionEntity? = withContext(readDispatcher.value) {
@@ -129,8 +129,8 @@ class ConnectionDAOImpl(
129129
override suspend fun getConnectionRequests(): Flow<List<ConnectionEntity>> {
130130
return connectionsQueries.selectConnectionRequests(connectionMapper::toModel)
131131
.asFlow()
132-
.flowOn(readDispatcher.value)
133132
.mapToList()
133+
.flowOn(readDispatcher.value)
134134
}
135135

136136
override suspend fun insertConnection(connectionEntity: ConnectionEntity) {
@@ -182,8 +182,8 @@ class ConnectionDAOImpl(
182182
override suspend fun getConnectionRequestsForNotification(): Flow<List<ConnectionEntity>> {
183183
return connectionsQueries.selectConnectionsForNotification(connectionMapper::toModel)
184184
.asFlow()
185-
.flowOn(readDispatcher.value)
186185
.mapToList()
186+
.flowOn(readDispatcher.value)
187187
}
188188

189189
override suspend fun updateNotificationFlag(flag: Boolean, userId: QualifiedIDEntity) {

data/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/MetadataDAOImpl.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package com.wire.kalium.persistence.dao
2121
import app.cash.sqldelight.coroutines.asFlow
2222
import com.wire.kalium.persistence.MetadataQueries
2323
import com.wire.kalium.persistence.cache.FlowCache
24+
import com.wire.kalium.persistence.db.ReadDispatcher
2425
import com.wire.kalium.persistence.db.WriteDispatcher
2526
import com.wire.kalium.persistence.util.JsonSerializer
2627
import com.wire.kalium.persistence.util.mapToOneOrNull
@@ -29,6 +30,7 @@ import kotlinx.coroutines.flow.Flow
2930
import kotlinx.coroutines.flow.SharingStarted
3031
import kotlinx.coroutines.flow.distinctUntilChanged
3132
import kotlinx.coroutines.flow.first
33+
import kotlinx.coroutines.flow.flowOn
3234
import kotlinx.coroutines.flow.map
3335
import kotlinx.coroutines.flow.shareIn
3436
import kotlinx.coroutines.withContext
@@ -39,6 +41,7 @@ class MetadataDAOImpl internal constructor(
3941
private val metadataCache: FlowCache<String, String?>,
4042
private val databaseScope: CoroutineScope,
4143
private val writeDispatcher: WriteDispatcher,
44+
private val readDispatcher: ReadDispatcher,
4245
) : MetadataDAO {
4346

4447
override suspend fun insertValue(value: String, key: String) {
@@ -59,6 +62,7 @@ class MetadataDAOImpl internal constructor(
5962
metadataQueries.selectValueByKey(key)
6063
.asFlow()
6164
.mapToOneOrNull()
65+
.flowOn(readDispatcher.value)
6266
}
6367

6468
override suspend fun valueByKey(key: String): String? = valueByKeyFlow(key).first()

data/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/ServiceDAO.kt

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,16 +88,22 @@ internal class ServiceDAOImpl(
8888
override suspend fun observeIsServiceMember(id: BotIdEntity, conversationId: ConversationIDEntity): Flow<QualifiedIDEntity?> =
8989
serviceQueries.getUserIdFromMember(conversationId, id)
9090
.asFlow()
91-
.flowOn(readDispatcher.value)
9291
.mapToOneOrNull(readDispatcher.value)
92+
.flowOn(readDispatcher.value)
9393

9494
override suspend fun getAllServices(): Flow<List<ServiceEntity>> =
95-
serviceQueries.allServices(mapper = ::mapToServiceEntity).asFlow().flowOn(readDispatcher.value).mapToList()
95+
serviceQueries.allServices(mapper = ::mapToServiceEntity)
96+
.asFlow()
97+
.mapToList()
98+
.flowOn(readDispatcher.value)
9699

97100
override suspend fun searchServicesByName(
98101
query: String
99102
): Flow<List<ServiceEntity>> =
100-
serviceQueries.searchByName(query, mapper = ::mapToServiceEntity).asFlow().flowOn(readDispatcher.value).mapToList()
103+
serviceQueries.searchByName(query, mapper = ::mapToServiceEntity)
104+
.asFlow()
105+
.mapToList()
106+
.flowOn(readDispatcher.value)
101107

102108
override suspend fun insert(service: ServiceEntity) {
103109
withContext(writeDispatcher.value) {

data/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/TeamDAOImpl.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ class TeamDAOImpl(
7070

7171
override suspend fun getTeamById(teamId: String) = queries.selectTeamById(id = teamId)
7272
.asFlow()
73-
.flowOn(readDispatcher.value)
7473
.mapToOneOrNull()
7574
.map { it?.let { mapper.toModel(team = it) } }
75+
.flowOn(readDispatcher.value)
7676

7777
override suspend fun updateTeam(team: TeamEntity) {
7878
withContext(writeDispatcher.value) {

data/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/UserDAOImpl.kt

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -278,22 +278,24 @@ class UserDAOImpl internal constructor(
278278

279279
override suspend fun getAllUsersDetails(): Flow<List<UserDetailsEntity>> = userQueries.selectAllUsers()
280280
.asFlow()
281-
.flowOn(readDispatcher.value)
282281
.mapToList()
283282
.map { entryList -> entryList.map(mapper::toDetailsModel) }
283+
.flowOn(readDispatcher.value)
284284

285285
override suspend fun observeUserDetailsByQualifiedID(qualifiedID: QualifiedIDEntity): Flow<UserDetailsEntity?> =
286286
userCache.get(qualifiedID) {
287287
userQueries.selectDetailsByQualifiedId(listOf(qualifiedID))
288288
.asFlow()
289289
.mapToOneOrNull()
290290
.map { it?.let { mapper.toDetailsModel(it) } }
291+
.flowOn(readDispatcher.value)
291292
}
292293

293294
override suspend fun getUserDetailsWithTeamByQualifiedID(qualifiedID: QualifiedIDEntity): Flow<Pair<UserDetailsEntity, TeamEntity?>?> =
294295
userQueries.selectWithTeamByQualifiedId(listOf(qualifiedID), mapper::toUserAndTeamPairModel)
295296
.asFlow()
296297
.mapToOneOrNull()
298+
.flowOn(readDispatcher.value)
297299

298300
override suspend fun getUserMinimizedByQualifiedID(qualifiedID: QualifiedIDEntity): UserEntityMinimized? =
299301
withContext(readDispatcher.value) {
@@ -328,18 +330,18 @@ class UserDAOImpl internal constructor(
328330
connectionStates: List<ConnectionEntity.State>
329331
): Flow<List<UserDetailsEntity>> = userQueries.selectByNameOrHandleOrEmailAndConnectionState(searchQuery, connectionStates)
330332
.asFlow()
331-
.flowOn(readDispatcher.value)
332333
.mapToList()
333334
.map { it.map(mapper::toDetailsModel) }
335+
.flowOn(readDispatcher.value)
334336

335337
override suspend fun getUserDetailsByHandleAndConnectionStates(
336338
handle: String,
337339
connectionStates: List<ConnectionEntity.State>
338340
) = userQueries.selectByHandleAndConnectionState(handle, connectionStates)
339341
.asFlow()
340-
.flowOn(readDispatcher.value)
341342
.mapToList()
342343
.map { it.map(mapper::toDetailsModel) }
344+
.flowOn(readDispatcher.value)
343345

344346
override suspend fun getUsersWithOneOnOneConversation(): List<UserEntity> = withContext(readDispatcher.value) {
345347
userQueries.selectUsersWithOneOnOne().executeAsList().map(mapper::toModel)
@@ -406,29 +408,29 @@ class UserDAOImpl internal constructor(
406408
override fun observeUsersDetailsNotInConversation(conversationId: QualifiedIDEntity): Flow<List<UserDetailsEntity>> =
407409
userQueries.getUsersNotPartOfTheConversation(conversationId)
408410
.asFlow()
409-
.flowOn(readDispatcher.value)
410411
.mapToList()
411412
.map { it.map(mapper::toDetailsModel) }
413+
.flowOn(readDispatcher.value)
412414

413415
override suspend fun getUsersDetailsNotInConversationByNameOrHandleOrEmail(
414416
conversationId: QualifiedIDEntity,
415417
searchQuery: String
416418
): Flow<List<UserDetailsEntity>> =
417419
userQueries.getUsersNotInConversationByNameOrHandleOrEmail(conversationId, searchQuery)
418420
.asFlow()
419-
.flowOn(readDispatcher.value)
420421
.mapToList()
421422
.map { it.map(mapper::toDetailsModel) }
423+
.flowOn(readDispatcher.value)
422424

423425
override suspend fun getUsersDetailsNotInConversationByHandle(
424426
conversationId: QualifiedIDEntity,
425427
handle: String
426428
): Flow<List<UserDetailsEntity>> =
427429
userQueries.getUsersNotInConversationByHandle(conversationId, handle)
428430
.asFlow()
429-
.flowOn(readDispatcher.value)
430431
.mapToList()
431432
.map { it.map(mapper::toDetailsModel) }
433+
.flowOn(readDispatcher.value)
432434

433435
override suspend fun insertOrIgnoreIncompleteUsers(userIds: List<QualifiedIDEntity>) =
434436
withContext(writeDispatcher.value) {
@@ -448,9 +450,9 @@ class UserDAOImpl internal constructor(
448450
override suspend fun observeAllUsersDetailsByConnectionStatus(connectionState: ConnectionEntity.State): Flow<List<UserDetailsEntity>> =
449451
userQueries.selectAllUsersWithConnectionStatus(connectionState)
450452
.asFlow()
451-
.flowOn(readDispatcher.value)
452453
.mapToList()
453454
.map { it.map(mapper::toDetailsModel) }
455+
.flowOn(readDispatcher.value)
454456

455457
override suspend fun getAllUsersDetailsByTeam(teamId: String): List<UserDetailsEntity> = withContext(readDispatcher.value) {
456458
userQueries.selectUsersByTeam(teamId)

data/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/asset/AssetDAOImpl.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ class AssetDAOImpl internal constructor(
8383
override suspend fun getAssetByKey(assetKey: String): Flow<AssetEntity?> {
8484
return queries.selectByKey(assetKey, mapper::fromAssets)
8585
.asFlow()
86-
.flowOn(readDispatcher.value)
8786
.mapToOneOrNull()
87+
.flowOn(readDispatcher.value)
8888
}
8989

9090
override suspend fun updateAsset(assetEntity: AssetEntity) {

data/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/call/CallDAOImpl.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,26 +88,26 @@ internal class CallDAOImpl(
8888
override suspend fun observeCalls(): Flow<List<CallEntity>> =
8989
callsQueries.selectAllCalls(mapper = mapper::fromCalls)
9090
.asFlow()
91-
.flowOn(readDispatcher.value)
9291
.mapToList()
92+
.flowOn(readDispatcher.value)
9393

9494
override suspend fun observeIncomingCalls(): Flow<List<CallEntity>> =
9595
callsQueries.selectIncomingCalls(mapper = mapper::fromCalls)
9696
.asFlow()
97-
.flowOn(readDispatcher.value)
9897
.mapToList()
98+
.flowOn(readDispatcher.value)
9999

100100
override suspend fun observeOutgoingCalls(): Flow<List<CallEntity>> =
101101
callsQueries.selectOutgoingCalls(mapper = mapper::fromCalls)
102102
.asFlow()
103-
.flowOn(readDispatcher.value)
104103
.mapToList()
104+
.flowOn(readDispatcher.value)
105105

106106
override suspend fun observeEstablishedCalls(): Flow<List<CallEntity>> =
107107
callsQueries.selectEstablishedCalls(mapper = mapper::fromCalls)
108108
.asFlow()
109-
.flowOn(readDispatcher.value)
110109
.mapToList()
110+
.flowOn(readDispatcher.value)
111111

112112
override suspend fun getEstablishedCall(): CallEntity = withContext(readDispatcher.value) {
113113
callsQueries.selectEstablishedCalls(mapper = mapper::fromCalls).executeAsOne()
@@ -116,8 +116,8 @@ internal class CallDAOImpl(
116116
override suspend fun observeOngoingCalls(): Flow<List<CallEntity>> =
117117
callsQueries.selectOngoingCalls(mapper = mapper::fromCalls)
118118
.asFlow()
119-
.flowOn(readDispatcher.value)
120119
.mapToList()
120+
.flowOn(readDispatcher.value)
121121

122122
override suspend fun updateLastCallStatusByConversationId(status: CallEntity.Status, conversationId: QualifiedIDEntity) {
123123
withContext(writeDispatcher.value) {
@@ -140,8 +140,8 @@ internal class CallDAOImpl(
140140
override suspend fun getLastClosedCallByConversationId(conversationId: QualifiedIDEntity): Flow<String?> =
141141
callsQueries.selectLastClosedCallCreationTimeConversationId(conversationId)
142142
.asFlow()
143-
.flowOn(readDispatcher.value)
144143
.mapToOneOrNull()
144+
.flowOn(readDispatcher.value)
145145

146146
override suspend fun getLastCallConversationTypeByConversationId(
147147
conversationId: QualifiedIDEntity
@@ -159,6 +159,6 @@ internal class CallDAOImpl(
159159
override fun observeLastActiveCallByConversationId(conversationId: QualifiedIDEntity): Flow<CallEntity?> =
160160
callsQueries.selectLastActiveCallByConversationId(conversationId, mapper = mapper::fromCalls)
161161
.asFlow()
162-
.flowOn(readDispatcher.value)
163162
.mapToOneOrNull()
163+
.flowOn(readDispatcher.value)
164164
}

data/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/client/ClientDAOImpl.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,8 @@ internal class ClientDAOImpl internal constructor(
191191
override suspend fun getClientsOfUserByQualifiedIDFlow(qualifiedID: QualifiedIDEntity): Flow<List<Client>> =
192192
clientsQueries.selectAllClientsByUserId(qualifiedID, mapper::fromClient)
193193
.asFlow()
194-
.flowOn(readDispatcher.value)
195194
.mapToList()
195+
.flowOn(readDispatcher.value)
196196

197197
override suspend fun getClientsOfUserByQualifiedID(qualifiedID: QualifiedIDEntity): List<Client> = withContext(readDispatcher.value) {
198198
clientsQueries.selectAllClientsByUserId(qualifiedID, mapper = mapper::fromClient)
@@ -202,8 +202,8 @@ internal class ClientDAOImpl internal constructor(
202202
override suspend fun observeClientsByUserId(qualifiedID: QualifiedIDEntity): Flow<List<Client>> = withContext(readDispatcher.value) {
203203
clientsQueries.selectAllClientsByUserId(qualifiedID, mapper = mapper::fromClient)
204204
.asFlow()
205-
.flowOn(readDispatcher.value)
206205
.mapToList()
206+
.flowOn(readDispatcher.value)
207207
}
208208

209209
override suspend fun getClientsOfUsersByQualifiedIDs(

data/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/conversation/ConversationDAOImpl.kt

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ internal class ConversationDAOImpl internal constructor(
7676
conversationCache.get(qualifiedID) {
7777
conversationQueries.selectConversationByQualifiedId(qualifiedID, conversationMapper::fromViewToModel)
7878
.asFlow()
79-
.flowOn(readDispatcher.value)
8079
.mapToOneOrNull()
80+
.flowOn(readDispatcher.value)
8181
}
8282

8383
override suspend fun getConversationById(
@@ -89,8 +89,8 @@ internal class ConversationDAOImpl internal constructor(
8989
): Flow<ConversationViewEntity?> = conversationDetailsCache.get(conversationId) {
9090
conversationDetailsQueries.selectConversationDetailsByQualifiedId(conversationId, conversationMapper::fromViewToModel)
9191
.asFlow()
92-
.flowOn(readDispatcher.value)
9392
.mapToOneOrNull()
93+
.flowOn(readDispatcher.value)
9494
}
9595

9696
override suspend fun getConversationDetailsById(
@@ -474,9 +474,9 @@ internal class ConversationDAOImpl internal constructor(
474474
override suspend fun getProposalTimers(): Flow<List<ProposalTimerEntity>> {
475475
return conversationQueries.selectProposalTimers()
476476
.asFlow()
477-
.flowOn(readDispatcher.value)
478477
.mapToList()
479478
.map { list -> list.map { ProposalTimerEntity(it.mls_group_id, Instant.parse(it.mls_proposal_timer)) } }
479+
.flowOn(readDispatcher.value)
480480
}
481481

482482
override suspend fun whoDeletedMeInConversation(conversationId: QualifiedIDEntity, selfUserIdString: String): UserIDEntity? =
@@ -590,9 +590,10 @@ internal class ConversationDAOImpl internal constructor(
590590
}
591591

592592
override suspend fun observeUnreadArchivedConversationsCount(): Flow<Long> =
593-
unreadEventsQueries.getUnreadArchivedConversationsCount().asFlow()
594-
.flowOn(readDispatcher.value)
593+
unreadEventsQueries.getUnreadArchivedConversationsCount()
594+
.asFlow()
595595
.mapToOne()
596+
.flowOn(readDispatcher.value)
596597

597598
override suspend fun updateLegalHoldStatus(
598599
conversationId: QualifiedIDEntity,

0 commit comments

Comments
 (0)