Skip to content

Commit e51cfcc

Browse files
committed
fix: implements io dispatcher for parallelism
1 parent a2e077a commit e51cfcc

File tree

1 file changed

+28
-23
lines changed

1 file changed

+28
-23
lines changed

app/src/main/java/to/bitkit/services/CoreService.kt

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,11 @@ import com.synonym.bitkitcore.upsertActivity
4343
import io.ktor.client.HttpClient
4444
import io.ktor.client.request.get
4545
import io.ktor.http.HttpStatusCode
46+
import kotlinx.coroutines.Dispatchers
4647
import kotlinx.coroutines.async
4748
import kotlinx.coroutines.awaitAll
4849
import kotlinx.coroutines.flow.first
50+
import kotlinx.coroutines.withContext
4951
import org.lightningdevkit.ldknode.ConfirmationStatus
5052
import org.lightningdevkit.ldknode.Network
5153
import org.lightningdevkit.ldknode.PaymentDetails
@@ -156,6 +158,7 @@ class CoreService @Inject constructor(
156158

157159
// region Activity
158160
private const val CHUNCK_SIZE = 50
161+
159162
class ActivityService(
160163
private val coreService: CoreService,
161164
private val cacheStore: CacheStore,
@@ -253,34 +256,36 @@ class ActivityService(
253256

254257
suspend fun syncLdkNodePayments(payments: List<PaymentDetails>, forceUpdate: Boolean = false) {
255258
ServiceQueue.CORE.background {
256-
val allResults = mutableListOf<Result<String>>()
257-
258-
payments.chunked(CHUNCK_SIZE).forEach { chunk ->
259-
val results = chunk.map { payment ->
260-
async {
261-
try {
262-
processSinglePayment(payment, forceUpdate)
263-
Result.success(payment.id)
264-
} catch (e: Throwable) {
265-
Logger.error("Error syncing payment ${payment.id}:", e, context = "CoreService")
266-
Result.failure<String>(e)
259+
withContext(Dispatchers.IO) {
260+
val allResults = mutableListOf<Result<String>>()
261+
262+
payments.chunked(CHUNCK_SIZE).forEach { chunk ->
263+
val results = chunk.map { payment ->
264+
async {
265+
try {
266+
processSinglePayment(payment, forceUpdate)
267+
Result.success(payment.id)
268+
} catch (e: Throwable) {
269+
Logger.error("Error syncing payment ${payment.id}:", e, context = "CoreService")
270+
Result.failure<String>(e)
271+
}
267272
}
268-
}
269-
}.awaitAll()
273+
}.awaitAll()
270274

271-
allResults.addAll(results)
272-
}
275+
allResults.addAll(results)
276+
}
273277

274-
val (successful, failed) = allResults.partition { it.isSuccess }
278+
val (successful, failed) = allResults.partition { it.isSuccess }
275279

276-
Logger.info(
277-
"Synced ${successful.size} payments successfully, ${failed.size} failed",
278-
context = "CoreService"
279-
)
280+
Logger.info(
281+
"Synced ${successful.size} payments successfully, ${failed.size} failed",
282+
context = "CoreService"
283+
)
280284

281-
// Throw if too many failed
282-
if (failed.size > payments.size * 0.5) {
283-
throw Exception("Too many payment sync failures: ${failed.size}/${payments.size}")
285+
// Throw if too many failed
286+
if (failed.size > payments.size * 0.5) {
287+
throw Exception("Too many payment sync failures: ${failed.size}/${payments.size}")
288+
}
284289
}
285290
}
286291
}

0 commit comments

Comments
 (0)