@@ -25,6 +25,7 @@ import kotlinx.coroutines.flow.update
2525import kotlinx.coroutines.isActive
2626import kotlinx.coroutines.launch
2727import kotlinx.coroutines.sync.Mutex
28+ import kotlinx.coroutines.sync.withLock
2829import kotlinx.coroutines.tasks.await
2930import kotlinx.coroutines.withContext
3031import kotlinx.coroutines.withTimeoutOrNull
@@ -45,6 +46,7 @@ import org.lightningdevkit.ldknode.SpendableUtxo
4546import org.lightningdevkit.ldknode.Txid
4647import to.bitkit.data.CacheStore
4748import to.bitkit.data.SettingsStore
49+ import to.bitkit.data.backup.VssBackupClient
4850import to.bitkit.data.keychain.Keychain
4951import to.bitkit.di.BgDispatcher
5052import to.bitkit.env.Env
@@ -67,6 +69,7 @@ import to.bitkit.services.NodeEventHandler
6769import to.bitkit.utils.AppError
6870import to.bitkit.utils.Logger
6971import to.bitkit.utils.ServiceError
72+ import java.io.File
7073import java.util.concurrent.ConcurrentHashMap
7174import java.util.concurrent.atomic.AtomicBoolean
7275import java.util.concurrent.atomic.AtomicReference
@@ -91,6 +94,7 @@ class LightningRepo @Inject constructor(
9194 private val cacheStore : CacheStore ,
9295 private val preActivityMetadataRepo : PreActivityMetadataRepo ,
9396 private val connectivityRepo : ConnectivityRepo ,
97+ private val vssBackupClient : VssBackupClient ,
9498) {
9599 private val _lightningState = MutableStateFlow (LightningState ())
96100 val lightningState = _lightningState .asStateFlow()
@@ -109,6 +113,7 @@ class LightningRepo @Inject constructor(
109113 private val syncMutex = Mutex ()
110114 private val syncPending = AtomicBoolean (false )
111115 private val syncRetryJob = AtomicReference <Job ?>(null )
116+ private val lifecycleMutex = Mutex ()
112117
113118 init {
114119 observeConnectivityForSyncRetry()
@@ -263,95 +268,140 @@ class LightningRepo @Inject constructor(
263268 customRgsServerUrl : String? = null,
264269 eventHandler : NodeEventHandler ? = null,
265270 channelMigration : ChannelDataMigration ? = null,
271+ shouldValidateGraph : Boolean = true,
266272 ): Result <Unit > = withContext(bgDispatcher) {
267273 if (_isRecoveryMode .value) {
268274 return @withContext Result .failure(RecoveryModeError ())
269275 }
270276
271277 eventHandler?.let { _eventHandlers .add(it) }
272278
273- val initialLifecycleState = _lightningState .value.nodeLifecycleState
274- if (initialLifecycleState.isRunningOrStarting()) {
275- Logger .info(" LDK node start skipped, lifecycle state: $initialLifecycleState " , context = TAG )
276- return @withContext Result .success(Unit )
277- }
279+ // Track retry state outside mutex to avoid deadlock (Mutex is non-reentrant)
280+ var shouldRetryStart = false
281+ var shouldRestartForGraphReset = false
282+ var initialLifecycleState: NodeLifecycleState
283+
284+ val result = lifecycleMutex.withLock {
285+ initialLifecycleState = _lightningState .value.nodeLifecycleState
286+ if (initialLifecycleState.isRunningOrStarting()) {
287+ Logger .info(" LDK node start skipped, lifecycle state: $initialLifecycleState " , context = TAG )
288+ lightningService.startEventListener(::onEvent)
289+ return @withLock Result .success(Unit )
290+ }
278291
279- runCatching {
280- _lightningState .update { it.copy(nodeLifecycleState = NodeLifecycleState .Starting ) }
281-
282- // Setup if needed
283- if (lightningService.node == null ) {
284- val setupResult = setup(walletIndex, customServerUrl, customRgsServerUrl, channelMigration)
285- if (setupResult.isFailure) {
286- _lightningState .update {
287- it.copy(
288- nodeLifecycleState = NodeLifecycleState .ErrorStarting (
289- setupResult.exceptionOrNull() ? : NodeSetupError ()
292+ runCatching {
293+ _lightningState .update { it.copy(nodeLifecycleState = NodeLifecycleState .Starting ) }
294+
295+ // Setup if needed
296+ if (lightningService.node == null ) {
297+ val setupResult = setup(walletIndex, customServerUrl, customRgsServerUrl, channelMigration)
298+ if (setupResult.isFailure) {
299+ _lightningState .update {
300+ it.copy(
301+ nodeLifecycleState = NodeLifecycleState .ErrorStarting (
302+ setupResult.exceptionOrNull() ? : NodeSetupError ()
303+ )
290304 )
291- )
305+ }
306+ return @withLock setupResult
292307 }
293- return @withContext setupResult
294308 }
295- }
296309
297- if (getStatus()?.isRunning == true ) {
298- Logger .info(" LDK node already running" , context = TAG )
299- _lightningState .update { it.copy(nodeLifecycleState = NodeLifecycleState .Running ) }
300- lightningService.startEventListener(::onEvent).onFailure {
301- Logger .warn(" Failed to start event listener" , it, context = TAG )
302- return @withContext Result .failure(it)
310+ if (getStatus()?.isRunning == true ) {
311+ Logger .info(" LDK node already running" , context = TAG )
312+ _lightningState .update { it.copy(nodeLifecycleState = NodeLifecycleState .Running ) }
313+ lightningService.startEventListener(::onEvent).onFailure {
314+ Logger .warn(" Failed to start event listener" , it, context = TAG )
315+ return @withLock Result .failure(it)
316+ }
317+ return @withLock Result .success(Unit )
303318 }
304- return @withContext Result .success(Unit )
305- }
306319
307- // Start node
308- lightningService.start(timeout, ::onEvent)
320+ lightningService.start(timeout, ::onEvent)
309321
310- _lightningState .update { it.copy(nodeLifecycleState = NodeLifecycleState .Running ) }
322+ _lightningState .update { it.copy(nodeLifecycleState = NodeLifecycleState .Running ) }
311323
312- // Initial state sync
313- syncState()
314- updateGeoBlockState()
315- refreshChannelCache()
324+ // Initial state sync
325+ syncState()
326+ updateGeoBlockState()
327+ refreshChannelCache()
316328
317- // Post-startup tasks (non-blocking)
318- connectToTrustedPeers().onFailure {
319- Logger .error(" Failed to connect to trusted peers" , it, context = TAG )
320- }
329+ // Validate network graph has trusted peers (RGS cache can become stale)
330+ if (shouldValidateGraph && ! lightningService.validateNetworkGraph()) {
331+ Logger .warn(" Network graph is stale, resetting and restarting..." , context = TAG )
332+ lightningService.stop()
333+ lightningService.resetNetworkGraph(walletIndex)
334+ // Also clear stale graph from VSS to prevent fallback restoration
335+ runCatching {
336+ vssBackupClient.setup(walletIndex).getOrThrow()
337+ vssBackupClient.deleteObject(" network_graph" ).getOrThrow()
338+ Logger .info(" Cleared stale network graph from VSS" , context = TAG )
339+ }.onFailure {
340+ Logger .warn(" Failed to clear graph from VSS" , it, context = TAG )
341+ }
342+ _lightningState .update { it.copy(nodeLifecycleState = NodeLifecycleState .Stopped ) }
343+ shouldRestartForGraphReset = true
344+ return @withLock Result .success(Unit )
345+ }
321346
322- sync().onFailure { e ->
323- Logger .warn(" Initial sync failed, event-driven sync will retry" , e, context = TAG )
324- }
325- scope.launch { registerForNotifications() }
326- Unit
327- }.onFailure { e ->
328- val currentLifecycleState = _lightningState .value.nodeLifecycleState
329- if (currentLifecycleState.isRunning()) {
330- Logger .warn(" Start error occurred but node is $currentLifecycleState , skipping retry" , e, context = TAG )
331- return @withContext Result .success(Unit )
332- }
347+ // Post-startup tasks (non-blocking)
348+ connectToTrustedPeers().onFailure {
349+ Logger .error(" Failed to connect to trusted peers" , it, context = TAG )
350+ }
333351
334- if (shouldRetry) {
335- val retryDelay = 2 .seconds
336- Logger .warn(" Start error, retrying after $retryDelay ..." , e, context = TAG )
337- _lightningState .update { it.copy(nodeLifecycleState = initialLifecycleState) }
338-
339- delay(retryDelay)
340- return @withContext start(
341- walletIndex = walletIndex,
342- timeout = timeout,
343- shouldRetry = false ,
344- customServerUrl = customServerUrl,
345- customRgsServerUrl = customRgsServerUrl,
346- channelMigration = channelMigration,
347- )
348- } else {
349- _lightningState .update {
350- it.copy(nodeLifecycleState = NodeLifecycleState .ErrorStarting (e))
352+ sync().onFailure { e ->
353+ Logger .warn(" Initial sync failed, event-driven sync will retry" , e, context = TAG )
354+ }
355+ scope.launch { registerForNotifications() }
356+ Result .success(Unit )
357+ }.getOrElse { e ->
358+ val currentState = _lightningState .value.nodeLifecycleState
359+ if (currentState.isRunning()) {
360+ Logger .warn(" Start error but node is $currentState , skipping retry" , e, context = TAG )
361+ return @withLock Result .success(Unit )
362+ }
363+
364+ if (shouldRetry) {
365+ Logger .warn(" Start error, will retry..." , e, context = TAG )
366+ _lightningState .update { it.copy(nodeLifecycleState = initialLifecycleState) }
367+ shouldRetryStart = true
368+ Result .failure(e)
369+ } else {
370+ _lightningState .update { it.copy(nodeLifecycleState = NodeLifecycleState .ErrorStarting (e)) }
371+ Result .failure(e)
351372 }
352- return @withContext Result .failure(e)
353373 }
354374 }
375+
376+ // Retry OUTSIDE the mutex to avoid deadlock (Kotlin Mutex is non-reentrant)
377+ if (shouldRetryStart) {
378+ delay(2 .seconds)
379+ return @withContext start(
380+ walletIndex = walletIndex,
381+ timeout = timeout,
382+ shouldRetry = false ,
383+ customServerUrl = customServerUrl,
384+ customRgsServerUrl = customRgsServerUrl,
385+ channelMigration = channelMigration,
386+ shouldValidateGraph = shouldValidateGraph,
387+ )
388+ }
389+
390+ // Restart after graph reset OUTSIDE the mutex to avoid deadlock
391+ if (shouldRestartForGraphReset) {
392+ return @withContext start(
393+ walletIndex = walletIndex,
394+ timeout = timeout,
395+ shouldRetry = shouldRetry,
396+ customServerUrl = customServerUrl,
397+ customRgsServerUrl = customRgsServerUrl,
398+ eventHandler = eventHandler,
399+ channelMigration = channelMigration,
400+ shouldValidateGraph = false , // Prevent infinite loop
401+ )
402+ }
403+
404+ result
355405 }
356406
357407 private suspend fun onEvent (event : Event ) {
@@ -375,16 +425,27 @@ class LightningRepo @Inject constructor(
375425 }
376426
377427 suspend fun stop (): Result <Unit > = withContext(bgDispatcher) {
378- if (_lightningState .value.nodeLifecycleState.isStoppedOrStopping()) {
379- return @withContext Result .success(Unit )
380- }
428+ lifecycleMutex.withLock {
429+ if (_lightningState .value.nodeLifecycleState.isStoppedOrStopping()) {
430+ return @withLock Result .success(Unit )
431+ }
381432
382- runCatching {
383- _lightningState .update { it.copy(nodeLifecycleState = NodeLifecycleState .Stopping ) }
384- lightningService.stop()
385- _lightningState .update { LightningState (nodeLifecycleState = NodeLifecycleState .Stopped ) }
386- }.onFailure {
387- Logger .error(" Node stop error" , it, context = TAG )
433+ runCatching {
434+ _lightningState .update { it.copy(nodeLifecycleState = NodeLifecycleState .Stopping ) }
435+ lightningService.stop()
436+ _lightningState .update { LightningState (nodeLifecycleState = NodeLifecycleState .Stopped ) }
437+ }.onFailure {
438+ Logger .error(" Node stop error" , it, context = TAG )
439+ // On failure, check actual node state and update accordingly
440+ // If node is still running, revert to Running state to allow retry
441+ if (lightningService.node != null && lightningService.status?.isRunning == true ) {
442+ Logger .warn(" Stop failed but node is still running, reverting to Running state" , context = TAG )
443+ _lightningState .update { it.copy(nodeLifecycleState = NodeLifecycleState .Running ) }
444+ } else {
445+ // Node appears stopped, update state
446+ _lightningState .update { LightningState (nodeLifecycleState = NodeLifecycleState .Stopped ) }
447+ }
448+ }
388449 }
389450 }
390451
@@ -1030,7 +1091,7 @@ class LightningRepo @Inject constructor(
10301091 // region debug
10311092 fun getNetworkGraphInfo () = lightningService.getNetworkGraphInfo()
10321093
1033- suspend fun exportNetworkGraphToFile (outputDir : String ): Result <java.io. File > =
1094+ suspend fun exportNetworkGraphToFile (outputDir : String ): Result <File > =
10341095 executeWhenNodeRunning(" exportNetworkGraphToFile" ) {
10351096 lightningService.exportNetworkGraphToFile(outputDir)
10361097 }
0 commit comments