diff --git a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt index ac58377af..453fe9955 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt @@ -97,6 +97,9 @@ class Peer( val db: Databases, socketBuilder: TcpSocket.Builder?, scope: CoroutineScope, + val currentTipFlow: StateFlow>, + val onChainFeeratesFlow: StateFlow, + val swapInFeeratesFlow: StateFlow, private val isMigrationFromLegacyApp: Boolean = false, private val initTlvStream: TlvStream = TlvStream.empty() ) : CoroutineScope by scope { @@ -157,17 +160,13 @@ class Peer( private val ourInit = Init(features.initFeatures(), initTlvStream) private var theirInit: Init? = null - val currentTipFlow = MutableStateFlow?>(null) - val onChainFeeratesFlow = MutableStateFlow(null) - val swapInFeeratesFlow = MutableStateFlow(null) - private val _channelLogger = nodeParams.loggerFactory.newLogger(ChannelState::class) private suspend fun ChannelState.process(cmd: ChannelCommand): Pair> { val state = this val ctx = ChannelContext( StaticParams(nodeParams, remoteNodeId), - currentTipFlow.filterNotNull().first().first, - onChainFeeratesFlow.filterNotNull().first(), + currentTipFlow.value.first, + onChainFeeratesFlow.value, logger = MDCLogger( logger = _channelLogger, staticMdc = mapOf("remoteNodeId" to remoteNodeId) + state.mdc() @@ -188,20 +187,6 @@ class Peer( val swapInAddress: String = nodeParams.keyManager.swapInOnChainWallet.address.also { swapInWallet.addAddress(it) } init { - launch { - watcher.client.notifications.filterIsInstance() - .collect { msg -> - currentTipFlow.value = msg.blockHeight to msg.header - } - } - launch { - watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.collect { - // onchain fees are retrieved punctually, when electrum status moves to Connection.ESTABLISHED - // since the application is not running most of the time, and when it is, it will be only for a few minutes, this is good enough. - // (for a node that is online most of the time things would be different and we would need to re-evaluate onchain fee estimates on a regular basis) - updateEstimateFees() - } - } launch { watcher.openWatchNotificationsFlow().collect { logger.debug { "notification: $it" } @@ -233,8 +218,6 @@ class Peer( processSwapInCommands(swapInManager) } launch { - // wait to have a swap-in feerate available - swapInFeeratesFlow.filterNotNull().first() watchSwapInWallet() } launch { @@ -257,24 +240,6 @@ class Peer( } } - private suspend fun updateEstimateFees() { - watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.first() - val sortedFees = listOf( - watcher.client.estimateFees(2), - watcher.client.estimateFees(6), - watcher.client.estimateFees(18), - watcher.client.estimateFees(144), - ) - logger.info { "on-chain fees: $sortedFees" } - // TODO: If some feerates are null, we may implement a retry - onChainFeeratesFlow.value = OnChainFeerates( - fundingFeerate = sortedFees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)), - mutualCloseFeerate = sortedFees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)), - claimMainFeerate = sortedFees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)), - fastFeerate = sortedFees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat)) - ) - } - fun connect() { if (connectionState.value is Connection.CLOSED) establishConnection() else logger.warning { "Peer is already connecting / connected" } @@ -393,7 +358,7 @@ class Peer( swapInWallet.walletStateFlow .filter { it.consistent } .collect { - val currentBlockHeight = currentTipFlow.filterNotNull().first().first + val currentBlockHeight = currentTipFlow.value.first swapInCommands.send(SwapInCommand.TrySwapIn(currentBlockHeight, it, walletParams.swapInConfirmations, isMigrationFromLegacyApp)) } } @@ -532,7 +497,7 @@ class Peer( } is ChannelAction.ProcessCmdRes.AddSettledFail -> { - val currentTip = currentTipFlow.filterNotNull().first() + val currentTip = currentTipFlow.value when (val result = outgoingPaymentHandler.processAddSettled(actualChannelId, action, _channels, currentTip.first)) { is OutgoingPaymentHandler.Progress -> { _eventsFlow.emit(PaymentProgress(result.request, result.fees)) @@ -639,7 +604,7 @@ class Peer( } private suspend fun processIncomingPayment(item: Either) { - val currentBlockHeight = currentTipFlow.filterNotNull().first().first + val currentBlockHeight = currentTipFlow.value.first val result = when (item) { is Either.Right -> incomingPaymentHandler.process(item.value, currentBlockHeight) is Either.Left -> incomingPaymentHandler.process(item.value, currentBlockHeight) @@ -937,7 +902,7 @@ class Peer( is RequestChannelOpen -> { when (val channel = channels.values.firstOrNull { it is Normal }) { is ChannelStateWithCommitments -> { - val targetFeerate = swapInFeeratesFlow.filterNotNull().first() + val targetFeerate = swapInFeeratesFlow.value val weight = FundingContributions.computeWeightPaid(isInitiator = true, commitment = channel.commitments.active.first(), walletInputs = cmd.walletInputs, localOutputs = emptyList()) val (feerate, fee) = watcher.client.computeSpliceCpfpFeerate(channel.commitments, targetFeerate, spliceWeight = weight, logger) @@ -1034,7 +999,7 @@ class Peer( sendToPeer(cmd.payToOpenResponse) } is SendPayment -> { - val currentTip = currentTipFlow.filterNotNull().first() + val currentTip = currentTipFlow.value when (val result = outgoingPaymentHandler.sendPayment(cmd, _channels, currentTip.first)) { is OutgoingPaymentHandler.Progress -> { _eventsFlow.emit(PaymentProgress(result.request, result.fees)) diff --git a/src/commonTest/kotlin/fr/acinq/lightning/tests/io/peer/builders.kt b/src/commonTest/kotlin/fr/acinq/lightning/tests/io/peer/builders.kt index 1afc055db..482a60cf8 100644 --- a/src/commonTest/kotlin/fr/acinq/lightning/tests/io/peer/builders.kt +++ b/src/commonTest/kotlin/fr/acinq/lightning/tests/io/peer/builders.kt @@ -27,9 +27,7 @@ import fr.acinq.lightning.wire.ChannelReestablish import fr.acinq.lightning.wire.Init import fr.acinq.lightning.wire.LightningMessage import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.first -import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.* import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import org.kodein.log.LoggerFactory @@ -169,13 +167,17 @@ fun buildPeer( ): Peer { val electrum = ElectrumClient(TcpSocket.Builder(), scope, LoggerFactory.default) val watcher = ElectrumWatcher(electrum, scope, LoggerFactory.default) - val peer = Peer(nodeParams, walletParams, watcher, databases, TcpSocket.Builder(), scope) - peer.currentTipFlow.value = currentTip - peer.onChainFeeratesFlow.value = OnChainFeerates( - fundingFeerate = FeeratePerKw(FeeratePerByte(5.sat)), - mutualCloseFeerate = FeeratePerKw(FeeratePerByte(10.sat)), - claimMainFeerate = FeeratePerKw(FeeratePerByte(20.sat)), - fastFeerate = FeeratePerKw(FeeratePerByte(50.sat)) + val peer = Peer(nodeParams, walletParams, watcher, databases, TcpSocket.Builder(), scope, + currentTipFlow = MutableStateFlow(currentTip), + onChainFeeratesFlow = MutableStateFlow( + OnChainFeerates( + fundingFeerate = FeeratePerKw(FeeratePerByte(5.sat)), + mutualCloseFeerate = FeeratePerKw(FeeratePerByte(10.sat)), + claimMainFeerate = FeeratePerKw(FeeratePerByte(20.sat)), + fastFeerate = FeeratePerKw(FeeratePerByte(50.sat)) + ) + ), + swapInFeeratesFlow = MutableStateFlow(FeeratePerKw(FeeratePerByte(5.sat))) ) return peer