Skip to content

Commit 14a29cc

Browse files
authored
Merge pull request #474 from synonymdev/chore/move-closed-channels
Move closed channels tracking to LightningRepo
2 parents 23f55e0 + 545b812 commit 14a29cc

File tree

2 files changed

+96
-86
lines changed

2 files changed

+96
-86
lines changed

app/src/main/java/to/bitkit/repositories/LightningRepo.kt

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package to.bitkit.repositories
22

33
import com.google.firebase.messaging.FirebaseMessaging
4+
import com.synonym.bitkitcore.ClosedChannelDetails
45
import com.synonym.bitkitcore.FeeRates
56
import com.synonym.bitkitcore.LightningInvoice
67
import com.synonym.bitkitcore.Scanner
@@ -27,6 +28,7 @@ import org.lightningdevkit.ldknode.BalanceDetails
2728
import org.lightningdevkit.ldknode.BestBlock
2829
import org.lightningdevkit.ldknode.ChannelConfig
2930
import org.lightningdevkit.ldknode.ChannelDetails
31+
import org.lightningdevkit.ldknode.Event
3032
import org.lightningdevkit.ldknode.NodeStatus
3133
import org.lightningdevkit.ldknode.PaymentDetails
3234
import org.lightningdevkit.ldknode.PaymentId
@@ -57,6 +59,7 @@ import to.bitkit.services.NodeEventHandler
5759
import to.bitkit.utils.AppError
5860
import to.bitkit.utils.Logger
5961
import to.bitkit.utils.ServiceError
62+
import java.util.concurrent.ConcurrentHashMap
6063
import javax.inject.Inject
6164
import javax.inject.Singleton
6265
import kotlin.time.Duration
@@ -85,6 +88,8 @@ class LightningRepo @Inject constructor(
8588
private val _isRecoveryMode = MutableStateFlow(false)
8689
val isRecoveryMode = _isRecoveryMode.asStateFlow()
8790

91+
private val channelCache = ConcurrentHashMap<String, ChannelDetails>()
92+
8893
/**
8994
* Executes the provided operation only if the node is running.
9095
* If the node is not running, waits for it to be running for a specified timeout.
@@ -203,12 +208,16 @@ class LightningRepo @Inject constructor(
203208
if (getStatus()?.isRunning == true) {
204209
Logger.info("LDK node already running", context = TAG)
205210
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Running) }
206-
lightningService.listenForEvents(onEvent = eventHandler)
211+
lightningService.listenForEvents(onEvent = { event ->
212+
handleLdkEvent(event)
213+
eventHandler?.invoke(event)
214+
})
207215
return@withContext Result.success(Unit)
208216
}
209217

210218
// Start the node service
211219
lightningService.start(timeout) { event ->
220+
handleLdkEvent(event)
212221
eventHandler?.invoke(event)
213222
ldkNodeEventBus.emit(event)
214223
}
@@ -220,6 +229,7 @@ class LightningRepo @Inject constructor(
220229
// Initial state sync
221230
syncState()
222231
updateGeoBlockState()
232+
refreshChannelCache()
223233

224234
// Perform post-startup tasks
225235
connectToTrustedPeers().onFailure { e ->
@@ -296,12 +306,96 @@ class LightningRepo @Inject constructor(
296306

297307
_lightningState.update { it.copy(isSyncingWallet = true) }
298308
lightningService.sync()
309+
refreshChannelCache()
299310
syncState()
300311
_lightningState.update { it.copy(isSyncingWallet = false) }
301312

302313
Result.success(Unit)
303314
}
304315

316+
private suspend fun refreshChannelCache() = withContext(bgDispatcher) {
317+
val channels = lightningService.channels ?: return@withContext
318+
channels.forEach { channel ->
319+
channelCache[channel.channelId] = channel
320+
}
321+
}
322+
323+
private fun handleLdkEvent(event: Event) {
324+
when (event) {
325+
is Event.ChannelPending -> {
326+
scope.launch {
327+
refreshChannelCache()
328+
}
329+
}
330+
is Event.ChannelReady -> {
331+
scope.launch {
332+
refreshChannelCache()
333+
}
334+
}
335+
is Event.ChannelClosed -> {
336+
val channelId = event.channelId
337+
val reason = event.reason?.toString() ?: ""
338+
scope.launch {
339+
registerClosedChannel(channelId, reason)
340+
}
341+
}
342+
else -> {
343+
// Other events don't need special handling
344+
}
345+
}
346+
}
347+
348+
private suspend fun registerClosedChannel(channelId: String, reason: String?) = withContext(bgDispatcher) {
349+
try {
350+
val channel = channelCache[channelId] ?: run {
351+
Logger.error(
352+
"Could not find channel details for closed channel: channelId=$channelId",
353+
context = TAG
354+
)
355+
return@withContext
356+
}
357+
358+
val fundingTxo = channel.fundingTxo
359+
if (fundingTxo == null) {
360+
Logger.error(
361+
"Channel has no funding transaction, cannot persist closed channel: channelId=$channelId",
362+
context = TAG
363+
)
364+
return@withContext
365+
}
366+
367+
val channelName = channel.inboundScidAlias?.toString()
368+
?: channel.channelId.take(CHANNEL_ID_PREVIEW_LENGTH) + ""
369+
370+
val closedAt = (System.currentTimeMillis() / 1000L).toULong()
371+
372+
val closedChannel = ClosedChannelDetails(
373+
channelId = channel.channelId,
374+
counterpartyNodeId = channel.counterpartyNodeId,
375+
fundingTxoTxid = fundingTxo.txid,
376+
fundingTxoIndex = fundingTxo.vout,
377+
channelValueSats = channel.channelValueSats,
378+
closedAt = closedAt,
379+
outboundCapacityMsat = channel.outboundCapacityMsat,
380+
inboundCapacityMsat = channel.inboundCapacityMsat,
381+
counterpartyUnspendablePunishmentReserve = channel.counterpartyUnspendablePunishmentReserve,
382+
unspendablePunishmentReserve = channel.unspendablePunishmentReserve ?: 0u,
383+
forwardingFeeProportionalMillionths = channel.config.forwardingFeeProportionalMillionths,
384+
forwardingFeeBaseMsat = channel.config.forwardingFeeBaseMsat,
385+
channelName = channelName,
386+
channelClosureReason = reason.orEmpty()
387+
)
388+
389+
coreService.activity.upsertClosedChannelList(listOf(closedChannel))
390+
391+
channelCache.remove(channelId)
392+
393+
Logger.info("Registered closed channel: ${channel.userChannelId}", context = TAG)
394+
} catch (e: Throwable) {
395+
Logger.error("Failed to register closed channel: $e", e, context = TAG)
396+
}
397+
}
398+
305399
suspend fun wipeStorage(walletIndex: Int): Result<Unit> = withContext(bgDispatcher) {
306400
Logger.debug("wipeStorage called, stopping node first", context = TAG)
307401
stop().onSuccess {
@@ -867,6 +961,7 @@ class LightningRepo @Inject constructor(
867961
companion object {
868962
private const val TAG = "LightningRepo"
869963
private const val SYNC_TIMEOUT_MS = 20_000L
964+
private const val CHANNEL_ID_PREVIEW_LENGTH = 10
870965
}
871966
}
872967

app/src/main/java/to/bitkit/services/LightningService.kt

Lines changed: 0 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package to.bitkit.services
22

3-
import com.synonym.bitkitcore.ClosedChannelDetails
4-
import com.synonym.bitkitcore.upsertClosedChannel
53
import kotlinx.coroutines.CoroutineDispatcher
64
import kotlinx.coroutines.currentCoroutineContext
75
import kotlinx.coroutines.delay
@@ -51,7 +49,6 @@ import to.bitkit.utils.LdkError
5149
import to.bitkit.utils.LdkLogWriter
5250
import to.bitkit.utils.Logger
5351
import to.bitkit.utils.ServiceError
54-
import java.util.concurrent.ConcurrentHashMap
5552
import javax.inject.Inject
5653
import javax.inject.Singleton
5754
import kotlin.io.path.Path
@@ -74,8 +71,6 @@ class LightningService @Inject constructor(
7471

7572
private lateinit var trustedPeers: List<PeerDetails>
7673

77-
private val channelCache = ConcurrentHashMap<String, ChannelDetails>()
78-
7974
suspend fun setup(
8075
walletIndex: Int,
8176
customServerUrl: String? = null,
@@ -196,7 +191,6 @@ class LightningService @Inject constructor(
196191
}
197192
}
198193

199-
refreshChannelCache()
200194
Logger.info("Node started")
201195
}
202196

@@ -232,79 +226,10 @@ class LightningService @Inject constructor(
232226
node.syncWallets()
233227
// launch { setMaxDustHtlcExposureForCurrentChannels() }
234228
}
235-
refreshChannelCache()
236229

237230
Logger.debug("LDK synced")
238231
}
239232

240-
private suspend fun refreshChannelCache() {
241-
val node = this.node ?: return
242-
243-
ServiceQueue.LDK.background {
244-
val channels = node.listChannels()
245-
channels.forEach { channel ->
246-
channelCache[channel.channelId] = channel
247-
}
248-
}
249-
}
250-
251-
private suspend fun registerClosedChannel(channelId: String, reason: String?) {
252-
try {
253-
val channel = ServiceQueue.LDK.background {
254-
channelCache[channelId]
255-
} ?: run {
256-
Logger.error(
257-
"Could not find channel details for closed channel: channelId=$channelId",
258-
context = TAG
259-
)
260-
return@registerClosedChannel
261-
}
262-
263-
val fundingTxo = channel.fundingTxo
264-
if (fundingTxo == null) {
265-
Logger.error(
266-
"Channel has no funding transaction, cannot persist closed channel: channelId=$channelId",
267-
context = TAG
268-
)
269-
return@registerClosedChannel
270-
}
271-
272-
val channelName = channel.inboundScidAlias?.toString()
273-
?: channel.channelId.take(CHANNEL_ID_PREVIEW_LENGTH) + ""
274-
275-
val closedAt = (System.currentTimeMillis() / 1000L).toULong()
276-
277-
val closedChannel = ClosedChannelDetails(
278-
channelId = channel.channelId,
279-
counterpartyNodeId = channel.counterpartyNodeId,
280-
fundingTxoTxid = fundingTxo.txid,
281-
fundingTxoIndex = fundingTxo.vout,
282-
channelValueSats = channel.channelValueSats,
283-
closedAt = closedAt,
284-
outboundCapacityMsat = channel.outboundCapacityMsat,
285-
inboundCapacityMsat = channel.inboundCapacityMsat,
286-
counterpartyUnspendablePunishmentReserve = channel.counterpartyUnspendablePunishmentReserve,
287-
unspendablePunishmentReserve = channel.unspendablePunishmentReserve ?: 0u,
288-
forwardingFeeProportionalMillionths = channel.config.forwardingFeeProportionalMillionths,
289-
forwardingFeeBaseMsat = channel.config.forwardingFeeBaseMsat,
290-
channelName = channelName,
291-
channelClosureReason = reason.orEmpty()
292-
)
293-
294-
ServiceQueue.CORE.background {
295-
upsertClosedChannel(closedChannel)
296-
}
297-
298-
ServiceQueue.LDK.background {
299-
channelCache.remove(channelId)
300-
}
301-
302-
Logger.info("Registered closed channel: ${channel.userChannelId}", context = TAG)
303-
} catch (e: Exception) {
304-
Logger.error("Failed to register closed channel: $e", e, context = TAG)
305-
}
306-
}
307-
308233
// private fun setMaxDustHtlcExposureForCurrentChannels() {
309234
// if (Env.network != Network.REGTEST) {
310235
// Logger.debug("Not updating channel config for non-regtest network")
@@ -815,9 +740,6 @@ class LightningService @Inject constructor(
815740
Logger.info(
816741
"⏳ Channel pending: channelId: $channelId userChannelId: $userChannelId formerTemporaryChannelId: $formerTemporaryChannelId counterpartyNodeId: $counterpartyNodeId fundingTxo: $fundingTxo"
817742
)
818-
launch {
819-
refreshChannelCache()
820-
}
821743
}
822744

823745
is Event.ChannelReady -> {
@@ -827,9 +749,6 @@ class LightningService @Inject constructor(
827749
Logger.info(
828750
"👐 Channel ready: channelId: $channelId userChannelId: $userChannelId counterpartyNodeId: $counterpartyNodeId"
829751
)
830-
launch {
831-
refreshChannelCache()
832-
}
833752
}
834753

835754
is Event.ChannelClosed -> {
@@ -840,9 +759,6 @@ class LightningService @Inject constructor(
840759
Logger.info(
841760
"⛔ Channel closed: channelId: $channelId userChannelId: $userChannelId counterpartyNodeId: $counterpartyNodeId reason: $reason"
842761
)
843-
launch {
844-
registerClosedChannel(channelId, reason)
845-
}
846762
}
847763
}
848764
}
@@ -867,7 +783,6 @@ class LightningService @Inject constructor(
867783

868784
companion object {
869785
private const val TAG = "LightningService"
870-
private const val CHANNEL_ID_PREVIEW_LENGTH = 10
871786
}
872787
}
873788

0 commit comments

Comments
 (0)