@@ -8,20 +8,23 @@ import arrow.core.right
88import arrow.core.valueOr
99import com.hoc.flowmvi.core.Mapper
1010import com.hoc.flowmvi.core.dispatchers.CoroutineDispatchers
11- import com.hoc.flowmvi.core.retrySuspend
1211import com.hoc.flowmvi.data.remote.UserApiService
1312import com.hoc.flowmvi.data.remote.UserBody
1413import com.hoc.flowmvi.data.remote.UserResponse
1514import com.hoc.flowmvi.domain.model.User
1615import com.hoc.flowmvi.domain.model.UserError
1716import com.hoc.flowmvi.domain.model.UserValidationError
1817import com.hoc.flowmvi.domain.repository.UserRepository
18+ import com.hoc081098.flowext.retryWithExponentialBackoff
1919import kotlinx.coroutines.ExperimentalCoroutinesApi
20+ import kotlinx.coroutines.FlowPreview
2021import kotlinx.coroutines.delay
22+ import kotlinx.coroutines.flow.Flow
2123import kotlinx.coroutines.flow.MutableSharedFlow
24+ import kotlinx.coroutines.flow.asFlow
2225import kotlinx.coroutines.flow.catch
23- import kotlinx.coroutines.flow.emitAll
24- import kotlinx.coroutines.flow.flow
26+ import kotlinx.coroutines.flow.first
27+ import kotlinx.coroutines.flow.flatMapConcat
2528import kotlinx.coroutines.flow.map
2629import kotlinx.coroutines.flow.onEach
2730import kotlinx.coroutines.flow.scan
@@ -32,6 +35,7 @@ import kotlin.time.Duration.Companion.milliseconds
3235import kotlin.time.ExperimentalTime
3336import arrow.core.Either.Companion.catch as catchEither
3437
38+ @FlowPreview
3539@ExperimentalTime
3640@ExperimentalCoroutinesApi
3741internal class UserRepositoryImpl (
@@ -65,44 +69,38 @@ internal class UserRepositoryImpl(
6569 @Suppress(" NOTHING_TO_INLINE" )
6670 private inline fun logError (t : Throwable , message : String ) = Timber .tag(TAG ).e(t, message)
6771
68- private suspend fun getUsersFromRemote (): List <User > {
69- return withContext(dispatchers.io) {
70- retrySuspend(
71- times = 3 ,
72- initialDelay = 500 .milliseconds,
73- factor = 2.0 ,
74- shouldRetry = { it is IOException }
75- ) { times ->
76- Timber .d(" [USER_REPO] Retry times=$times " )
77- userApiService
78- .getUsers()
79- .map(responseToDomainThrows)
80- }
81- }
82- }
83-
84- override fun getUsers () = flow {
85- val initial = getUsersFromRemote()
86-
87- changesFlow
88- .onEach { Timber .d(" [USER_REPO] Change=$it " ) }
89- .scan(initial) { acc, change ->
90- when (change) {
91- is Change .Removed -> acc.filter { it.id != change.removed.id }
92- is Change .Refreshed -> change.user
93- is Change .Added -> acc + change.user
72+ private fun getUsersFromRemote (): Flow <List <User >> = suspend {
73+ Timber .d(" [USER_REPO] getUsersFromRemote ..." )
74+ userApiService
75+ .getUsers()
76+ .map(responseToDomainThrows)
77+ }.asFlow()
78+ .retryWithExponentialBackoff(
79+ maxAttempt = 3 ,
80+ initialDelay = 500 .milliseconds,
81+ factor = 2.0 ,
82+ ) { it is IOException }
83+
84+ override fun getUsers () = getUsersFromRemote()
85+ .flatMapConcat { initial ->
86+ changesFlow
87+ .onEach { Timber .d(" [USER_REPO] Change=$it " ) }
88+ .scan(initial) { acc, change ->
89+ when (change) {
90+ is Change .Removed -> acc.filter { it.id != change.removed.id }
91+ is Change .Refreshed -> change.user
92+ is Change .Added -> acc + change.user
93+ }
9494 }
95- }
96- .onEach { Timber .d(" [USER_REPO] Emit users.size=${it.size} " ) }
97- .let { emitAll(it) }
98- }
95+ }
96+ .onEach { Timber .d(" [USER_REPO] Emit users.size=${it.size} " ) }
9997 .map { it.right().leftWiden<UserError , Nothing , List <User >>() }
10098 .catch {
10199 logError(it, " getUsers" )
102100 emit(errorMapper(it).left())
103101 }
104102
105- override suspend fun refresh () = catchEither { getUsersFromRemote() }
103+ override suspend fun refresh () = catchEither { getUsersFromRemote().first() }
106104 .tap { sendChange(Change .Refreshed (it)) }
107105 .map { }
108106 .tapLeft { logError(it, " refresh" ) }
0 commit comments