@@ -6,10 +6,12 @@ import com.synonym.bitkitcore.ActivityFilter
66import com.synonym.bitkitcore.PaymentType
77import com.synonym.bitkitcore.SortDirection
88import kotlinx.coroutines.CoroutineDispatcher
9- import kotlinx.coroutines.delay
9+ import kotlinx.coroutines.TimeoutCancellationException
10+ import kotlinx.coroutines.flow.MutableStateFlow
1011import kotlinx.coroutines.flow.first
1112import kotlinx.coroutines.flow.map
1213import kotlinx.coroutines.withContext
14+ import kotlinx.coroutines.withTimeout
1315import org.lightningdevkit.ldknode.PaymentDetails
1416import to.bitkit.data.CacheStore
1517import to.bitkit.data.dto.InProgressTransfer
@@ -22,7 +24,8 @@ import to.bitkit.services.CoreService
2224import to.bitkit.utils.Logger
2325import javax.inject.Inject
2426import javax.inject.Singleton
25- import kotlin.time.Duration.Companion.seconds
27+
28+ private const val SYNC_TIMEOUT_MS = 40_000L
2629
2730@Singleton
2831class ActivityRepo @Inject constructor(
@@ -31,39 +34,49 @@ class ActivityRepo @Inject constructor(
3134 private val lightningRepo : LightningRepo ,
3235 private val cacheStore : CacheStore ,
3336) {
34-
35- var isSyncingLdkNodePayments = false
37+ var isSyncingLdkNodePayments = MutableStateFlow ( false )
38+ private set
3639
3740 val inProgressTransfers = cacheStore.data.map { it.inProgressTransfers }
3841
3942 suspend fun syncActivities (): Result <Unit > = withContext(bgDispatcher) {
4043 Logger .debug(" syncActivities called" , context = TAG )
4144
4245 return @withContext runCatching {
43- if (isSyncingLdkNodePayments ) {
44- Logger .warn( " LDK-node payments are already being synced, skipping " , context = TAG )
45- return @withContext Result .failure( Exception ())
46+ withTimeout( SYNC_TIMEOUT_MS ) {
47+ Logger .debug( " isSyncingLdkNodePayments = ${isSyncingLdkNodePayments.value} " , context = TAG )
48+ isSyncingLdkNodePayments.first { ! it }
4649 }
4750
48- deletePendingActivities( )
51+ isSyncingLdkNodePayments = MutableStateFlow ( true )
4952
50- isSyncingLdkNodePayments = true
53+ deletePendingActivities()
5154 return @withContext lightningRepo.getPayments()
5255 .onSuccess { payments ->
5356 Logger .debug(" Got payments with success, syncing activities" , context = TAG )
5457 syncLdkNodePayments(payments = payments)
5558 updateActivitiesMetadata()
5659 boostPendingActivities()
5760 updateInProgressTransfers()
58- isSyncingLdkNodePayments = false
61+ isSyncingLdkNodePayments = MutableStateFlow ( false )
5962 return @withContext Result .success(Unit )
6063 }.onFailure { e ->
6164 Logger .error(" Failed to sync ldk-node payments" , e, context = TAG )
62- isSyncingLdkNodePayments = false
65+ isSyncingLdkNodePayments = MutableStateFlow ( false )
6366 return @withContext Result .failure(e)
6467 }.map { Unit }
6568 }.onFailure { e ->
66- Logger .error(" syncLdkNodePayments error" , e, context = TAG )
69+ when (e) {
70+ is TimeoutCancellationException -> {
71+ isSyncingLdkNodePayments = MutableStateFlow (false )
72+ Logger .error(" Timeout waiting for sync to complete, forcing reset" , e, context = TAG )
73+ }
74+
75+ else -> {
76+ isSyncingLdkNodePayments = MutableStateFlow (false )
77+ Logger .error(" syncActivities error" , e, context = TAG )
78+ }
79+ }
6780 }
6881 }
6982
@@ -127,9 +140,6 @@ class ActivityRepo @Inject constructor(
127140 " activity with paymentHashOrTxId:$paymentHashOrTxId not found, trying again after sync" ,
128141 context = TAG
129142 )
130- Logger .debug(" 5 seconds delay" , context = TAG )
131- delay(5 .seconds)
132- Logger .debug(" Syncing LN node called" , context = TAG )
133143
134144 lightningRepo.sync().onSuccess {
135145 Logger .debug(" Syncing LN node SUCCESS" , context = TAG )
0 commit comments