Skip to content

Commit cf22194

Browse files
authored
Merge pull request #740 from synonymdev/fix/node-stopping-bg-payments
fix: node lifecycle race condition after background payment
2 parents 3bc7c3d + 2fe3e64 commit cf22194

File tree

5 files changed

+241
-83
lines changed

5 files changed

+241
-83
lines changed

app/src/main/java/to/bitkit/repositories/LightningRepo.kt

Lines changed: 138 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import kotlinx.coroutines.flow.update
2525
import kotlinx.coroutines.isActive
2626
import kotlinx.coroutines.launch
2727
import kotlinx.coroutines.sync.Mutex
28+
import kotlinx.coroutines.sync.withLock
2829
import kotlinx.coroutines.tasks.await
2930
import kotlinx.coroutines.withContext
3031
import kotlinx.coroutines.withTimeoutOrNull
@@ -45,6 +46,7 @@ import org.lightningdevkit.ldknode.SpendableUtxo
4546
import org.lightningdevkit.ldknode.Txid
4647
import to.bitkit.data.CacheStore
4748
import to.bitkit.data.SettingsStore
49+
import to.bitkit.data.backup.VssBackupClient
4850
import to.bitkit.data.keychain.Keychain
4951
import to.bitkit.di.BgDispatcher
5052
import to.bitkit.env.Env
@@ -67,6 +69,7 @@ import to.bitkit.services.NodeEventHandler
6769
import to.bitkit.utils.AppError
6870
import to.bitkit.utils.Logger
6971
import to.bitkit.utils.ServiceError
72+
import java.io.File
7073
import java.util.concurrent.ConcurrentHashMap
7174
import java.util.concurrent.atomic.AtomicBoolean
7275
import java.util.concurrent.atomic.AtomicReference
@@ -91,6 +94,7 @@ class LightningRepo @Inject constructor(
9194
private val cacheStore: CacheStore,
9295
private val preActivityMetadataRepo: PreActivityMetadataRepo,
9396
private val connectivityRepo: ConnectivityRepo,
97+
private val vssBackupClient: VssBackupClient,
9498
) {
9599
private val _lightningState = MutableStateFlow(LightningState())
96100
val lightningState = _lightningState.asStateFlow()
@@ -109,6 +113,7 @@ class LightningRepo @Inject constructor(
109113
private val syncMutex = Mutex()
110114
private val syncPending = AtomicBoolean(false)
111115
private val syncRetryJob = AtomicReference<Job?>(null)
116+
private val lifecycleMutex = Mutex()
112117

113118
init {
114119
observeConnectivityForSyncRetry()
@@ -263,95 +268,140 @@ class LightningRepo @Inject constructor(
263268
customRgsServerUrl: String? = null,
264269
eventHandler: NodeEventHandler? = null,
265270
channelMigration: ChannelDataMigration? = null,
271+
shouldValidateGraph: Boolean = true,
266272
): Result<Unit> = withContext(bgDispatcher) {
267273
if (_isRecoveryMode.value) {
268274
return@withContext Result.failure(RecoveryModeError())
269275
}
270276

271277
eventHandler?.let { _eventHandlers.add(it) }
272278

273-
val initialLifecycleState = _lightningState.value.nodeLifecycleState
274-
if (initialLifecycleState.isRunningOrStarting()) {
275-
Logger.info("LDK node start skipped, lifecycle state: $initialLifecycleState", context = TAG)
276-
return@withContext Result.success(Unit)
277-
}
279+
// Track retry state outside mutex to avoid deadlock (Mutex is non-reentrant)
280+
var shouldRetryStart = false
281+
var shouldRestartForGraphReset = false
282+
var initialLifecycleState: NodeLifecycleState
283+
284+
val result = lifecycleMutex.withLock {
285+
initialLifecycleState = _lightningState.value.nodeLifecycleState
286+
if (initialLifecycleState.isRunningOrStarting()) {
287+
Logger.info("LDK node start skipped, lifecycle state: $initialLifecycleState", context = TAG)
288+
lightningService.startEventListener(::onEvent)
289+
return@withLock Result.success(Unit)
290+
}
278291

279-
runCatching {
280-
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Starting) }
281-
282-
// Setup if needed
283-
if (lightningService.node == null) {
284-
val setupResult = setup(walletIndex, customServerUrl, customRgsServerUrl, channelMigration)
285-
if (setupResult.isFailure) {
286-
_lightningState.update {
287-
it.copy(
288-
nodeLifecycleState = NodeLifecycleState.ErrorStarting(
289-
setupResult.exceptionOrNull() ?: NodeSetupError()
292+
runCatching {
293+
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Starting) }
294+
295+
// Setup if needed
296+
if (lightningService.node == null) {
297+
val setupResult = setup(walletIndex, customServerUrl, customRgsServerUrl, channelMigration)
298+
if (setupResult.isFailure) {
299+
_lightningState.update {
300+
it.copy(
301+
nodeLifecycleState = NodeLifecycleState.ErrorStarting(
302+
setupResult.exceptionOrNull() ?: NodeSetupError()
303+
)
290304
)
291-
)
305+
}
306+
return@withLock setupResult
292307
}
293-
return@withContext setupResult
294308
}
295-
}
296309

297-
if (getStatus()?.isRunning == true) {
298-
Logger.info("LDK node already running", context = TAG)
299-
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Running) }
300-
lightningService.startEventListener(::onEvent).onFailure {
301-
Logger.warn("Failed to start event listener", it, context = TAG)
302-
return@withContext Result.failure(it)
310+
if (getStatus()?.isRunning == true) {
311+
Logger.info("LDK node already running", context = TAG)
312+
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Running) }
313+
lightningService.startEventListener(::onEvent).onFailure {
314+
Logger.warn("Failed to start event listener", it, context = TAG)
315+
return@withLock Result.failure(it)
316+
}
317+
return@withLock Result.success(Unit)
303318
}
304-
return@withContext Result.success(Unit)
305-
}
306319

307-
// Start node
308-
lightningService.start(timeout, ::onEvent)
320+
lightningService.start(timeout, ::onEvent)
309321

310-
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Running) }
322+
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Running) }
311323

312-
// Initial state sync
313-
syncState()
314-
updateGeoBlockState()
315-
refreshChannelCache()
324+
// Initial state sync
325+
syncState()
326+
updateGeoBlockState()
327+
refreshChannelCache()
316328

317-
// Post-startup tasks (non-blocking)
318-
connectToTrustedPeers().onFailure {
319-
Logger.error("Failed to connect to trusted peers", it, context = TAG)
320-
}
329+
// Validate network graph has trusted peers (RGS cache can become stale)
330+
if (shouldValidateGraph && !lightningService.validateNetworkGraph()) {
331+
Logger.warn("Network graph is stale, resetting and restarting...", context = TAG)
332+
lightningService.stop()
333+
lightningService.resetNetworkGraph(walletIndex)
334+
// Also clear stale graph from VSS to prevent fallback restoration
335+
runCatching {
336+
vssBackupClient.setup(walletIndex).getOrThrow()
337+
vssBackupClient.deleteObject("network_graph").getOrThrow()
338+
Logger.info("Cleared stale network graph from VSS", context = TAG)
339+
}.onFailure {
340+
Logger.warn("Failed to clear graph from VSS", it, context = TAG)
341+
}
342+
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Stopped) }
343+
shouldRestartForGraphReset = true
344+
return@withLock Result.success(Unit)
345+
}
321346

322-
sync().onFailure { e ->
323-
Logger.warn("Initial sync failed, event-driven sync will retry", e, context = TAG)
324-
}
325-
scope.launch { registerForNotifications() }
326-
Unit
327-
}.onFailure { e ->
328-
val currentLifecycleState = _lightningState.value.nodeLifecycleState
329-
if (currentLifecycleState.isRunning()) {
330-
Logger.warn("Start error occurred but node is $currentLifecycleState, skipping retry", e, context = TAG)
331-
return@withContext Result.success(Unit)
332-
}
347+
// Post-startup tasks (non-blocking)
348+
connectToTrustedPeers().onFailure {
349+
Logger.error("Failed to connect to trusted peers", it, context = TAG)
350+
}
333351

334-
if (shouldRetry) {
335-
val retryDelay = 2.seconds
336-
Logger.warn("Start error, retrying after $retryDelay...", e, context = TAG)
337-
_lightningState.update { it.copy(nodeLifecycleState = initialLifecycleState) }
338-
339-
delay(retryDelay)
340-
return@withContext start(
341-
walletIndex = walletIndex,
342-
timeout = timeout,
343-
shouldRetry = false,
344-
customServerUrl = customServerUrl,
345-
customRgsServerUrl = customRgsServerUrl,
346-
channelMigration = channelMigration,
347-
)
348-
} else {
349-
_lightningState.update {
350-
it.copy(nodeLifecycleState = NodeLifecycleState.ErrorStarting(e))
352+
sync().onFailure { e ->
353+
Logger.warn("Initial sync failed, event-driven sync will retry", e, context = TAG)
354+
}
355+
scope.launch { registerForNotifications() }
356+
Result.success(Unit)
357+
}.getOrElse { e ->
358+
val currentState = _lightningState.value.nodeLifecycleState
359+
if (currentState.isRunning()) {
360+
Logger.warn("Start error but node is $currentState, skipping retry", e, context = TAG)
361+
return@withLock Result.success(Unit)
362+
}
363+
364+
if (shouldRetry) {
365+
Logger.warn("Start error, will retry...", e, context = TAG)
366+
_lightningState.update { it.copy(nodeLifecycleState = initialLifecycleState) }
367+
shouldRetryStart = true
368+
Result.failure(e)
369+
} else {
370+
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.ErrorStarting(e)) }
371+
Result.failure(e)
351372
}
352-
return@withContext Result.failure(e)
353373
}
354374
}
375+
376+
// Retry OUTSIDE the mutex to avoid deadlock (Kotlin Mutex is non-reentrant)
377+
if (shouldRetryStart) {
378+
delay(2.seconds)
379+
return@withContext start(
380+
walletIndex = walletIndex,
381+
timeout = timeout,
382+
shouldRetry = false,
383+
customServerUrl = customServerUrl,
384+
customRgsServerUrl = customRgsServerUrl,
385+
channelMigration = channelMigration,
386+
shouldValidateGraph = shouldValidateGraph,
387+
)
388+
}
389+
390+
// Restart after graph reset OUTSIDE the mutex to avoid deadlock
391+
if (shouldRestartForGraphReset) {
392+
return@withContext start(
393+
walletIndex = walletIndex,
394+
timeout = timeout,
395+
shouldRetry = shouldRetry,
396+
customServerUrl = customServerUrl,
397+
customRgsServerUrl = customRgsServerUrl,
398+
eventHandler = eventHandler,
399+
channelMigration = channelMigration,
400+
shouldValidateGraph = false, // Prevent infinite loop
401+
)
402+
}
403+
404+
result
355405
}
356406

357407
private suspend fun onEvent(event: Event) {
@@ -375,16 +425,27 @@ class LightningRepo @Inject constructor(
375425
}
376426

377427
suspend fun stop(): Result<Unit> = withContext(bgDispatcher) {
378-
if (_lightningState.value.nodeLifecycleState.isStoppedOrStopping()) {
379-
return@withContext Result.success(Unit)
380-
}
428+
lifecycleMutex.withLock {
429+
if (_lightningState.value.nodeLifecycleState.isStoppedOrStopping()) {
430+
return@withLock Result.success(Unit)
431+
}
381432

382-
runCatching {
383-
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Stopping) }
384-
lightningService.stop()
385-
_lightningState.update { LightningState(nodeLifecycleState = NodeLifecycleState.Stopped) }
386-
}.onFailure {
387-
Logger.error("Node stop error", it, context = TAG)
433+
runCatching {
434+
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Stopping) }
435+
lightningService.stop()
436+
_lightningState.update { LightningState(nodeLifecycleState = NodeLifecycleState.Stopped) }
437+
}.onFailure {
438+
Logger.error("Node stop error", it, context = TAG)
439+
// On failure, check actual node state and update accordingly
440+
// If node is still running, revert to Running state to allow retry
441+
if (lightningService.node != null && lightningService.status?.isRunning == true) {
442+
Logger.warn("Stop failed but node is still running, reverting to Running state", context = TAG)
443+
_lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Running) }
444+
} else {
445+
// Node appears stopped, update state
446+
_lightningState.update { LightningState(nodeLifecycleState = NodeLifecycleState.Stopped) }
447+
}
448+
}
388449
}
389450
}
390451

@@ -1029,7 +1090,7 @@ class LightningRepo @Inject constructor(
10291090
// region debug
10301091
fun getNetworkGraphInfo() = lightningService.getNetworkGraphInfo()
10311092

1032-
suspend fun exportNetworkGraphToFile(outputDir: String): Result<java.io.File> =
1093+
suspend fun exportNetworkGraphToFile(outputDir: String): Result<File> =
10331094
executeWhenNodeRunning("exportNetworkGraphToFile") {
10341095
lightningService.exportNetworkGraphToFile(outputDir)
10351096
}

app/src/main/java/to/bitkit/services/LightningService.kt

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,63 @@ class LightningService @Inject constructor(
262262
Logger.info("LDK storage wiped", context = TAG)
263263
}
264264

265+
/**
266+
* Resets the network graph cache, forcing a full RGS sync on next startup.
267+
* This is useful when the cached graph is stale or missing nodes.
268+
* Note: Node must be stopped before calling this.
269+
*/
270+
fun resetNetworkGraph(walletIndex: Int) {
271+
if (node != null) throw ServiceError.NodeStillRunning()
272+
Logger.warn("Resetting network graph cache…", context = TAG)
273+
val ldkPath = Path(Env.ldkStoragePath(walletIndex)).toFile()
274+
val graphFile = ldkPath.resolve("network_graph_cache")
275+
if (graphFile.exists()) {
276+
graphFile.delete()
277+
Logger.info("Network graph cache deleted", context = TAG)
278+
} else {
279+
Logger.info("No network graph cache found", context = TAG)
280+
}
281+
}
282+
283+
/**
284+
* Validates that all trusted peers are present in the network graph.
285+
* Returns false if all trusted peers are missing, indicating the graph cache is stale.
286+
*/
287+
fun validateNetworkGraph(): Boolean {
288+
val node = this.node ?: return true
289+
val graph = node.networkGraph()
290+
val graphNodes = graph.listNodes().toSet()
291+
if (graphNodes.isEmpty()) {
292+
val rgsTimestamp = node.status().latestRgsSnapshotTimestamp
293+
if (rgsTimestamp != null) {
294+
Logger.warn("Network graph is empty despite RGS timestamp $rgsTimestamp", context = TAG)
295+
return false
296+
}
297+
Logger.debug("Network graph is empty, skipping validation", context = TAG)
298+
return true
299+
}
300+
val missingPeers = trustedPeers.filter { it.nodeId !in graphNodes }
301+
if (missingPeers.size == trustedPeers.size) {
302+
Logger.warn(
303+
"Network graph missing all ${trustedPeers.size} trusted peers",
304+
context = TAG,
305+
)
306+
return false
307+
}
308+
if (missingPeers.isNotEmpty()) {
309+
Logger.debug(
310+
"Network graph missing ${missingPeers.size}/${trustedPeers.size} trusted peers",
311+
context = TAG,
312+
)
313+
}
314+
val presentCount = trustedPeers.size - missingPeers.size
315+
Logger.debug(
316+
"Network graph validated: $presentCount/${trustedPeers.size} trusted peers present",
317+
context = TAG,
318+
)
319+
return true
320+
}
321+
265322
suspend fun sync() {
266323
val node = this.node ?: throw ServiceError.NodeNotSetup()
267324

@@ -755,8 +812,9 @@ class LightningService @Inject constructor(
755812
// region events
756813
private var shouldListenForEvents = true
757814

758-
fun startEventListener(onEvent: NodeEventHandler? = null): Result<Unit> = runCatching {
815+
suspend fun startEventListener(onEvent: NodeEventHandler? = null): Result<Unit> = runCatching {
759816
val node = this.node ?: throw ServiceError.NodeNotSetup()
817+
listenerJob?.cancelAndJoin()
760818
shouldListenForEvents = true
761819
listenerJob = launch {
762820
runCatching {

app/src/test/java/to/bitkit/androidServices/LightningNodeServiceTest.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ class LightningNodeServiceTest : BaseUnitTest() {
101101
anyOrNull(),
102102
anyOrNull(),
103103
anyOrNull(),
104+
any(),
104105
)
105106
} doAnswer {
106107
capturedHandler = it.getArgument(5) as? NodeEventHandler

0 commit comments

Comments
 (0)