Skip to content

Commit c9c826d

Browse files
committed
Retrieve onchain feerate when we connect to an electrum server
When we connect to an electrum sever we perform a "handshake" that includes exchanging protocol version messages and retrieving the server's current tip, and now we also retrieve onchain fees. Since this is done during the connection handshake, errors will be caught by the corouting exception handler that we use in the client and will not crash the application.
1 parent b70fb15 commit c9c826d

File tree

5 files changed

+98
-49
lines changed

5 files changed

+98
-49
lines changed

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ plugins {
1111

1212
allprojects {
1313
group = "fr.acinq.lightning"
14-
version = "1.5.0"
14+
version = "1.5.1-SNAPSHOT"
1515

1616
repositories {
1717
// using the local maven repository with Kotlin Multi Platform can lead to build errors that are hard to diagnose.

src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package fr.acinq.lightning.blockchain.electrum
22

33
import fr.acinq.bitcoin.*
4+
import fr.acinq.lightning.blockchain.fee.FeeratePerByte
5+
import fr.acinq.lightning.blockchain.fee.FeeratePerKw
6+
import fr.acinq.lightning.blockchain.fee.OnChainFeerates
47
import fr.acinq.lightning.io.TcpSocket
5-
import fr.acinq.lightning.io.linesFlow
68
import fr.acinq.lightning.io.send
79
import fr.acinq.lightning.utils.*
810
import kotlinx.coroutines.*
@@ -23,14 +25,15 @@ sealed interface ElectrumClientCommand {
2325
sealed interface ElectrumConnectionStatus {
2426
data class Closed(val reason: TcpSocket.IOException?) : ElectrumConnectionStatus
2527
object Connecting : ElectrumConnectionStatus
26-
data class Connected(val version: ServerVersionResponse, val height: Int, val header: BlockHeader) : ElectrumConnectionStatus
28+
data class Connected(val version: ServerVersionResponse, val height: Int, val header: BlockHeader, val onchainFeeRates: OnChainFeerates) : ElectrumConnectionStatus
2729
}
2830

2931
@OptIn(ExperimentalCoroutinesApi::class)
3032
class ElectrumClient(
3133
socketBuilder: TcpSocket.Builder?,
3234
scope: CoroutineScope,
33-
private val loggerFactory: LoggerFactory
35+
private val loggerFactory: LoggerFactory,
36+
exceptionHandler_opt: CoroutineExceptionHandler? = null
3437
) : CoroutineScope by scope, IElectrumClient {
3538

3639
private val logger = loggerFactory.newLogger(this::class)
@@ -99,9 +102,11 @@ class ElectrumClient(
99102
}
100103
}
101104

102-
private fun establishConnection(serverAddress: ServerAddress) = launch(CoroutineExceptionHandler { _, exception ->
105+
val exceptionHandler = exceptionHandler_opt ?: CoroutineExceptionHandler { _, exception ->
103106
logger.error(exception) { "error starting electrum client" }
104-
}) {
107+
}
108+
109+
private fun establishConnection(serverAddress: ServerAddress) = launch(exceptionHandler) {
105110
_connectionStatus.value = ElectrumConnectionStatus.Connecting
106111
val socket: TcpSocket = try {
107112
val (host, port, tls) = serverAddress
@@ -139,22 +144,41 @@ class ElectrumClient(
139144
}
140145

141146
val flow = socket.linesFlow().map { json.decodeFromString(ElectrumResponseDeserializer, it) }
142-
val version = ServerVersion()
143-
sendRequest(version, 0)
144147
val rpcFlow = flow.filterIsInstance<Either.Right<Nothing, JsonRPCResponse>>().map { it.value }
148+
var requestId = 0
149+
150+
val version = ServerVersion()
151+
sendRequest(version, requestId++)
145152
val theirVersion = parseJsonResponse(version, rpcFlow.first())
146153
require(theirVersion is ServerVersionResponse) { "invalid server version response $theirVersion" }
147154
logger.info { "server version $theirVersion" }
148-
sendRequest(HeaderSubscription, 0)
155+
156+
sendRequest(HeaderSubscription, requestId++)
149157
val header = parseJsonResponse(HeaderSubscription, rpcFlow.first())
150158
require(header is HeaderSubscriptionResponse) { "invalid header subscription response $header" }
159+
160+
suspend fun estimateFee(confirmations: Int): EstimateFeeResponse {
161+
val request = EstimateFees(confirmations)
162+
sendRequest(request, requestId++)
163+
val response = parseJsonResponse(request, rpcFlow.first())
164+
require(response is EstimateFeeResponse) { "invalid estimatefee response $response" }
165+
return response
166+
}
167+
168+
val fees = listOf(estimateFee(2), estimateFee(6), estimateFee(18), estimateFee(144))
169+
logger.info { "onchain fees $fees" }
170+
val feeRates = OnChainFeerates(
171+
fundingFeerate = fees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)),
172+
mutualCloseFeerate = fees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)),
173+
claimMainFeerate = fees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)),
174+
fastFeerate = fees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat))
175+
)
151176
_notifications.emit(header)
152-
_connectionStatus.value = ElectrumConnectionStatus.Connected(theirVersion, header.blockHeight, header.header)
177+
_connectionStatus.value = ElectrumConnectionStatus.Connected(theirVersion, header.blockHeight, header.header, feeRates)
153178
logger.info { "server tip $header" }
154179

155180
// pending requests map
156181
val requestMap = mutableMapOf<Int, Pair<ElectrumRequest, CompletableDeferred<ElectrumResponse>>>()
157-
var requestId = 0
158182

159183
// reset mailbox
160184
mailbox.cancel(CancellationException("connection in progress"))

src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -195,11 +195,11 @@ class Peer(
195195
}
196196
}
197197
launch {
198-
watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.collect {
199-
// onchain fees are retrieved punctually, when electrum status moves to Connection.ESTABLISHED
200-
// since the application is not running most of the time, and when it is, it will be only for a few minutes, this is good enough.
201-
// (for a node that is online most of the time things would be different and we would need to re-evaluate onchain fee estimates on a regular basis)
202-
updateEstimateFees()
198+
watcher.client.connectionStatus.filterIsInstance<ElectrumConnectionStatus.Connected>().collect {
199+
// Onchain fees are retrieved once when we establish a connection to an electrum server.
200+
// It is acceptable since the application will typically not be running more than a few minutes at a time.
201+
// (for a node that is online most of the time things would be different, and we would need to re-evaluate onchain fee estimates on a regular basis)
202+
onChainFeeratesFlow.value = it.onchainFeeRates
203203
}
204204
}
205205
launch {
@@ -257,24 +257,6 @@ class Peer(
257257
}
258258
}
259259

260-
private suspend fun updateEstimateFees() {
261-
watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.first()
262-
val sortedFees = listOf(
263-
watcher.client.estimateFees(2),
264-
watcher.client.estimateFees(6),
265-
watcher.client.estimateFees(18),
266-
watcher.client.estimateFees(144),
267-
)
268-
logger.info { "on-chain fees: $sortedFees" }
269-
// TODO: If some feerates are null, we may implement a retry
270-
onChainFeeratesFlow.value = OnChainFeerates(
271-
fundingFeerate = sortedFees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)),
272-
mutualCloseFeerate = sortedFees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)),
273-
claimMainFeerate = sortedFees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)),
274-
fastFeerate = sortedFees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat))
275-
)
276-
}
277-
278260
fun connect() {
279261
if (connectionState.value is Connection.CLOSED) establishConnection()
280262
else logger.warning { "Peer is already connecting / connected" }

src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,18 @@ interface TcpSocket {
2121
suspend fun receiveFully(buffer: ByteArray, offset: Int, length: Int)
2222
suspend fun receiveAvailable(buffer: ByteArray, offset: Int, length: Int): Int
2323

24+
fun linesFlow(): Flow<String> {
25+
return flow {
26+
val buffer = ByteArray(8192)
27+
while (true) {
28+
val size = receiveAvailable(buffer)
29+
emit(buffer.subArray(size))
30+
}
31+
}
32+
.decodeToString()
33+
.splitByLines()
34+
}
35+
2436
suspend fun startTls(tls: TLS): TcpSocket
2537

2638
fun close()
@@ -71,14 +83,3 @@ internal expect object PlatformSocketBuilder : TcpSocket.Builder
7183

7284
suspend fun TcpSocket.receiveFully(size: Int): ByteArray =
7385
ByteArray(size).also { receiveFully(it) }
74-
75-
fun TcpSocket.linesFlow(): Flow<String> =
76-
flow {
77-
val buffer = ByteArray(8192)
78-
while (true) {
79-
val size = receiveAvailable(buffer)
80-
emit(buffer.subArray(size))
81-
}
82-
}
83-
.decodeToString()
84-
.splitByLines()

src/commonTest/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientTest.kt

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@ package fr.acinq.lightning.blockchain.electrum
22

33
import fr.acinq.bitcoin.*
44
import fr.acinq.lightning.blockchain.fee.FeeratePerKw
5+
import fr.acinq.lightning.io.TcpSocket
56
import fr.acinq.lightning.tests.utils.LightningTestSuite
67
import fr.acinq.lightning.tests.utils.runSuspendTest
78
import fr.acinq.lightning.utils.Connection
9+
import fr.acinq.lightning.utils.ServerAddress
810
import fr.acinq.lightning.utils.toByteVector32
911
import fr.acinq.secp256k1.Hex
10-
import kotlinx.coroutines.CoroutineScope
11-
import kotlinx.coroutines.flow.first
12-
import kotlinx.coroutines.joinAll
13-
import kotlinx.coroutines.launch
12+
import kotlinx.coroutines.*
13+
import kotlinx.coroutines.flow.*
14+
import org.kodein.log.LoggerFactory
1415
import kotlin.test.*
1516
import kotlin.time.Duration.Companion.seconds
1617

@@ -177,4 +178,45 @@ class ElectrumClientTest : LightningTestSuite() {
177178

178179
client.stop()
179180
}
181+
182+
@OptIn(DelicateCoroutinesApi::class)
183+
@Test
184+
fun `catch coroutine errors`() {
185+
val myCustomError = "this is a test error"
186+
187+
class MyTcpSopcket(val socket: TcpSocket) : TcpSocket by socket {
188+
override fun linesFlow(): Flow<String> {
189+
return super.linesFlow().map {
190+
// during the handshake with the electrum server we first ask for the server version, then headers, fee rates
191+
// so id == 2 means we're asking for fee rates, and here we return an error
192+
val sendError = it.contains("\"id\": 2")
193+
if (sendError) {
194+
"""{"jsonrpc": "2.0", "error": {"code": 42, "message": "$myCustomError"}, "id": 2}"""
195+
} else {
196+
it
197+
}
198+
}
199+
}
200+
}
201+
202+
class MyBuilder(val builder: TcpSocket.Builder) : TcpSocket.Builder {
203+
override suspend fun connect(host: String, port: Int, tls: TcpSocket.TLS, loggerFactory: LoggerFactory): TcpSocket {
204+
val socket = builder.connect(host, port, tls, loggerFactory)
205+
return MyTcpSopcket(socket)
206+
}
207+
}
208+
209+
runBlocking {
210+
val builder = MyBuilder(TcpSocket.Builder())
211+
val errorFlow = MutableStateFlow<Throwable?>(null)
212+
val myErrorHandler = CoroutineExceptionHandler { _, e -> errorFlow.value = e }
213+
val client = ElectrumClient(builder, GlobalScope, LoggerFactory.default, myErrorHandler)
214+
client.connect(ServerAddress("electrum.acinq.co", 50002, TcpSocket.TLS.UNSAFE_CERTIFICATES))
215+
client.connectionState.first { it is Connection.CLOSED }
216+
client.connectionState.first { it is Connection.ESTABLISHING }
217+
val error = errorFlow.filterNotNull().first()
218+
assertTrue(error.message!!.contains(myCustomError))
219+
client.stop()
220+
}
221+
}
180222
}

0 commit comments

Comments
 (0)