Skip to content

Commit b0ba942

Browse files
authored
Merge pull request #349 from synonymdev/refactor/parallelize-ldk-payments-syncing
Parallelize ldk payments syncing
2 parents d9ebf69 + f0e9780 commit b0ba942

File tree

2 files changed

+159
-117
lines changed

2 files changed

+159
-117
lines changed

app/src/main/java/to/bitkit/repositories/ActivityRepo.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -363,15 +363,15 @@ class ActivityRepo @Inject constructor(
363363
when (activity.v1.txType) {
364364
PaymentType.RECEIVED -> {
365365
// TODO Temporary solution while whe ldk-node doesn't return the address directly
366-
Logger.debug("Fetching data for txId: ${activity.v1.txId}", context = TAG)
366+
Logger.verbose("Fetching data for txId: ${activity.v1.txId}", context = TAG)
367367
runCatching {
368368
addressChecker.getTransaction(activity.v1.txId)
369369
}.onSuccess { txDetails ->
370-
Logger.debug("Tx detail fetched with success: $txDetails", context = TAG)
370+
Logger.verbose("Tx detail fetched with success: $txDetails", context = TAG)
371371
txDetails.vout.map { vOut ->
372372
async {
373373
vOut.scriptpubkey_address?.let {
374-
Logger.debug("Extracted address: $it", context = TAG)
374+
Logger.verbose("Extracted address: $it", context = TAG)
375375
db.tagMetadataDao().searchByAddress(it)
376376
}?.let { tagMetadata ->
377377
Logger.debug("Tags metadata found! $tagMetadata", context = TAG)

app/src/main/java/to/bitkit/services/CoreService.kt

Lines changed: 156 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,11 @@ import com.synonym.bitkitcore.upsertActivity
4343
import io.ktor.client.HttpClient
4444
import io.ktor.client.request.get
4545
import io.ktor.http.HttpStatusCode
46+
import kotlinx.coroutines.Dispatchers
47+
import kotlinx.coroutines.async
48+
import kotlinx.coroutines.awaitAll
4649
import kotlinx.coroutines.flow.first
50+
import kotlinx.coroutines.withContext
4751
import org.lightningdevkit.ldknode.ConfirmationStatus
4852
import org.lightningdevkit.ldknode.Network
4953
import org.lightningdevkit.ldknode.PaymentDetails
@@ -151,6 +155,7 @@ class CoreService @Inject constructor(
151155
// endregion
152156

153157
// region Activity
158+
private const val CHUNCK_SIZE = 50
154159

155160
class ActivityService(
156161
private val coreService: CoreService,
@@ -249,132 +254,169 @@ class ActivityService(
249254

250255
suspend fun syncLdkNodePayments(payments: List<PaymentDetails>, forceUpdate: Boolean = false) {
251256
ServiceQueue.CORE.background {
252-
for (payment in payments) {
253-
try {
254-
val state = when (payment.status) {
255-
PaymentStatus.FAILED -> PaymentState.FAILED
256-
PaymentStatus.PENDING -> PaymentState.PENDING
257-
PaymentStatus.SUCCEEDED -> PaymentState.SUCCEEDED
258-
}
259-
260-
when (val kind = payment.kind) {
261-
is PaymentKind.Onchain -> {
262-
var isConfirmed = false
263-
var confirmedTimestamp: ULong? = null
264-
265-
val status = kind.status
266-
if (status is ConfirmationStatus.Confirmed) {
267-
isConfirmed = true
268-
confirmedTimestamp = status.timestamp
257+
withContext(Dispatchers.IO) {
258+
val allResults = mutableListOf<Result<String>>()
259+
260+
payments.chunked(CHUNCK_SIZE).forEach { chunk ->
261+
val results = chunk.map { payment ->
262+
async {
263+
runCatching {
264+
processSinglePayment(payment, forceUpdate)
265+
payment.id
266+
}.onFailure { e ->
267+
Logger.error("Error syncing payment ${payment.id}:", e, context = "CoreService")
269268
}
269+
}
270+
}.awaitAll()
270271

271-
// Ensure confirmTimestamp is at least equal to timestamp when confirmed
272-
val timestamp = payment.latestUpdateTimestamp
272+
allResults.addAll(results)
273+
}
273274

274-
if (isConfirmed && confirmedTimestamp != null && confirmedTimestamp < timestamp) {
275-
confirmedTimestamp = timestamp
276-
}
275+
val (successful, failed) = allResults.partition { it.isSuccess }
277276

278-
val existingActivity = getActivityById(payment.id)
279-
if (existingActivity != null &&
280-
existingActivity is Activity.Onchain &&
281-
(existingActivity.v1.updatedAt ?: 0u) > payment.latestUpdateTimestamp
282-
) {
283-
continue
284-
}
277+
Logger.info(
278+
"Synced ${successful.size} payments successfully, ${failed.size} failed",
279+
context = "CoreService"
280+
)
281+
}
282+
}
283+
}
285284

286-
val onChain = if (existingActivity is Activity.Onchain) {
287-
existingActivity.v1.copy(
288-
confirmed = isConfirmed,
289-
confirmTimestamp = confirmedTimestamp,
290-
updatedAt = timestamp,
291-
)
292-
} else {
293-
OnchainActivity(
294-
id = payment.id,
295-
txType = payment.direction.toPaymentType(),
296-
txId = kind.txid,
297-
value = payment.amountSats ?: 0u,
298-
fee = (payment.feePaidMsat ?: 0u) / 1000u,
299-
feeRate = 1u, // TODO: get from somewhere
300-
address = "Loading...", // TODO: find address
301-
confirmed = isConfirmed,
302-
timestamp = timestamp,
303-
isBoosted = false,
304-
isTransfer = false, // TODO: handle when paying for order
305-
doesExist = true,
306-
confirmTimestamp = confirmedTimestamp,
307-
channelId = null, // TODO: get from linked order
308-
transferTxId = null, // TODO: get from linked order
309-
createdAt = timestamp,
310-
updatedAt = timestamp,
311-
)
312-
}
285+
private suspend fun processSinglePayment(payment: PaymentDetails, forceUpdate: Boolean) {
286+
val state = when (payment.status) {
287+
PaymentStatus.FAILED -> PaymentState.FAILED
288+
PaymentStatus.PENDING -> PaymentState.PENDING
289+
PaymentStatus.SUCCEEDED -> PaymentState.SUCCEEDED
290+
}
313291

314-
if (onChain.id in cacheStore.data.first().deletedActivities && !forceUpdate) {
315-
Logger.debug("Activity ${onChain.id} was already deleted, skipping", context = TAG)
316-
continue
317-
}
292+
when (val kind = payment.kind) {
293+
is PaymentKind.Onchain -> {
294+
processOnchainPayment(kind = kind, payment = payment, forceUpdate = forceUpdate)
295+
}
318296

319-
if (existingActivity != null) {
320-
updateActivity(payment.id, Activity.Onchain(onChain))
321-
} else {
322-
upsertActivity(Activity.Onchain(onChain))
323-
}
324-
}
297+
is PaymentKind.Bolt11 -> {
298+
processBolt11(kind = kind, payment = payment, state = state)
299+
}
325300

326-
is PaymentKind.Bolt11 -> {
327-
// Skip pending inbound payments, just means they created an invoice
328-
if (
329-
payment.status == PaymentStatus.PENDING &&
330-
payment.direction == PaymentDirection.INBOUND
331-
) {
332-
continue
333-
}
301+
else -> Unit // Handle spontaneous payments if needed
302+
}
303+
}
334304

335-
val existingActivity = getActivityById(payment.id)
336-
if (
337-
existingActivity as? Activity.Lightning != null &&
338-
(existingActivity.v1.updatedAt ?: 0u) > payment.latestUpdateTimestamp
339-
) {
340-
continue
341-
}
305+
private suspend fun processBolt11(
306+
kind: PaymentKind.Bolt11,
307+
payment: PaymentDetails,
308+
state: PaymentState,
309+
) {
310+
// Skip pending inbound payments, just means an invoice was created
311+
if (
312+
payment.status == PaymentStatus.PENDING &&
313+
payment.direction == PaymentDirection.INBOUND
314+
) {
315+
return
316+
}
342317

343-
val ln = if (existingActivity is Activity.Lightning) {
344-
existingActivity.v1.copy(
345-
updatedAt = payment.latestUpdateTimestamp,
346-
status = state
347-
)
348-
} else {
349-
LightningActivity(
350-
id = payment.id,
351-
txType = payment.direction.toPaymentType(),
352-
status = state,
353-
value = payment.amountSats ?: 0u,
354-
fee = (payment.feePaidMsat ?: 0u) / 1000u,
355-
invoice = kind.bolt11 ?: "Loading...",
356-
message = kind.description.orEmpty(),
357-
timestamp = payment.latestUpdateTimestamp,
358-
preimage = kind.preimage,
359-
createdAt = payment.latestUpdateTimestamp,
360-
updatedAt = payment.latestUpdateTimestamp,
361-
)
362-
}
318+
val existingActivity = getActivityById(payment.id)
319+
if (
320+
existingActivity as? Activity.Lightning != null &&
321+
(existingActivity.v1.updatedAt ?: 0u) > payment.latestUpdateTimestamp
322+
) {
323+
return
324+
}
363325

364-
if (getActivityById(payment.id) != null) {
365-
updateActivity(payment.id, Activity.Lightning(ln))
366-
} else {
367-
upsertActivity(Activity.Lightning(ln))
368-
}
369-
}
326+
val ln = if (existingActivity is Activity.Lightning) {
327+
existingActivity.v1.copy(
328+
updatedAt = payment.latestUpdateTimestamp,
329+
status = state
330+
)
331+
} else {
332+
LightningActivity(
333+
id = payment.id,
334+
txType = payment.direction.toPaymentType(),
335+
status = state,
336+
value = payment.amountSats ?: 0u,
337+
fee = (payment.feePaidMsat ?: 0u) / 1000u,
338+
invoice = kind.bolt11 ?: "Loading...",
339+
message = kind.description.orEmpty(),
340+
timestamp = payment.latestUpdateTimestamp,
341+
preimage = kind.preimage,
342+
createdAt = payment.latestUpdateTimestamp,
343+
updatedAt = payment.latestUpdateTimestamp,
344+
)
345+
}
370346

371-
else -> Unit // Handle spontaneous payments if needed
372-
}
373-
} catch (e: Throwable) {
374-
Logger.error("Error syncing LDK payment:", e, context = "CoreService")
375-
throw e
376-
}
377-
}
347+
if (getActivityById(payment.id) != null) {
348+
updateActivity(payment.id, Activity.Lightning(ln))
349+
} else {
350+
upsertActivity(Activity.Lightning(ln))
351+
}
352+
}
353+
354+
private suspend fun processOnchainPayment(
355+
kind: PaymentKind.Onchain,
356+
payment: PaymentDetails,
357+
forceUpdate: Boolean,
358+
) {
359+
var isConfirmed = false
360+
var confirmedTimestamp: ULong? = null
361+
362+
val status = kind.status
363+
if (status is ConfirmationStatus.Confirmed) {
364+
isConfirmed = true
365+
confirmedTimestamp = status.timestamp
366+
}
367+
368+
// Ensure confirmTimestamp is at least equal to timestamp when confirmed
369+
val timestamp = payment.latestUpdateTimestamp
370+
371+
if (isConfirmed && confirmedTimestamp != null && confirmedTimestamp < timestamp) {
372+
confirmedTimestamp = timestamp
373+
}
374+
375+
val existingActivity = getActivityById(payment.id)
376+
if (existingActivity != null &&
377+
existingActivity is Activity.Onchain &&
378+
(existingActivity.v1.updatedAt ?: 0u) > payment.latestUpdateTimestamp
379+
) {
380+
return
381+
}
382+
383+
val onChain = if (existingActivity is Activity.Onchain) {
384+
existingActivity.v1.copy(
385+
confirmed = isConfirmed,
386+
confirmTimestamp = confirmedTimestamp,
387+
updatedAt = timestamp,
388+
)
389+
} else {
390+
OnchainActivity(
391+
id = payment.id,
392+
txType = payment.direction.toPaymentType(),
393+
txId = kind.txid,
394+
value = payment.amountSats ?: 0u,
395+
fee = (payment.feePaidMsat ?: 0u) / 1000u,
396+
feeRate = 1u,
397+
address = "Loading...",
398+
confirmed = isConfirmed,
399+
timestamp = timestamp,
400+
isBoosted = false,
401+
isTransfer = false,
402+
doesExist = true,
403+
confirmTimestamp = confirmedTimestamp,
404+
channelId = null,
405+
transferTxId = null,
406+
createdAt = timestamp,
407+
updatedAt = timestamp,
408+
)
409+
}
410+
411+
if (onChain.id in cacheStore.data.first().deletedActivities && !forceUpdate) {
412+
Logger.debug("Activity ${onChain.id} was already deleted, skipping", context = TAG)
413+
return
414+
}
415+
416+
if (existingActivity != null) {
417+
updateActivity(payment.id, Activity.Onchain(onChain))
418+
} else {
419+
upsertActivity(Activity.Onchain(onChain))
378420
}
379421
}
380422

0 commit comments

Comments
 (0)