Skip to content

Commit a51c29a

Browse files
committed
Retrieve onchain feerate when we connect to an electrum server
When we connect to an electrum sever we perform a "handshake" that includes exchanging protocol version messages and retrieving the server's current tip, and now we also retrieve onchain fees. We also add an extra, optional CoroutineExceptionHandler to ElectrumClient's constructor, which can be used for testing or to specify a different behaviour to the one that is currently hard-coded. Since this is done during the connection handshake, errors will be caught by the corouting exception handler that we use in the client and will not crash the application.
1 parent 80195ed commit a51c29a

File tree

4 files changed

+132
-49
lines changed

4 files changed

+132
-49
lines changed

src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package fr.acinq.lightning.blockchain.electrum
22

33
import fr.acinq.bitcoin.*
4+
import fr.acinq.lightning.blockchain.fee.FeeratePerByte
5+
import fr.acinq.lightning.blockchain.fee.FeeratePerKw
6+
import fr.acinq.lightning.blockchain.fee.OnChainFeerates
47
import fr.acinq.lightning.io.TcpSocket
5-
import fr.acinq.lightning.io.linesFlow
68
import fr.acinq.lightning.io.send
79
import fr.acinq.lightning.utils.*
810
import kotlinx.coroutines.*
@@ -23,14 +25,14 @@ sealed interface ElectrumClientCommand {
2325
sealed interface ElectrumConnectionStatus {
2426
data class Closed(val reason: TcpSocket.IOException?) : ElectrumConnectionStatus
2527
object Connecting : ElectrumConnectionStatus
26-
data class Connected(val version: ServerVersionResponse, val height: Int, val header: BlockHeader) : ElectrumConnectionStatus
28+
data class Connected(val version: ServerVersionResponse, val height: Int, val header: BlockHeader, val onchainFeeRates: OnChainFeerates) : ElectrumConnectionStatus
2729
}
2830

29-
@OptIn(ExperimentalCoroutinesApi::class)
3031
class ElectrumClient(
3132
socketBuilder: TcpSocket.Builder?,
3233
scope: CoroutineScope,
33-
private val loggerFactory: LoggerFactory
34+
private val loggerFactory: LoggerFactory,
35+
defaultExceptionHandler: CoroutineExceptionHandler? = null
3436
) : CoroutineScope by scope, IElectrumClient {
3537

3638
private val logger = loggerFactory.newLogger(this::class)
@@ -99,9 +101,11 @@ class ElectrumClient(
99101
}
100102
}
101103

102-
private fun establishConnection(serverAddress: ServerAddress) = launch(CoroutineExceptionHandler { _, exception ->
104+
val exceptionHandler = defaultExceptionHandler ?: CoroutineExceptionHandler { _, exception ->
103105
logger.error(exception) { "error starting electrum client" }
104-
}) {
106+
}
107+
108+
private fun establishConnection(serverAddress: ServerAddress) = launch(exceptionHandler) {
105109
_connectionStatus.value = ElectrumConnectionStatus.Connecting
106110
val socket: TcpSocket = try {
107111
val (host, port, tls) = serverAddress
@@ -139,22 +143,41 @@ class ElectrumClient(
139143
}
140144

141145
val flow = socket.linesFlow().map { json.decodeFromString(ElectrumResponseDeserializer, it) }
142-
val version = ServerVersion()
143-
sendRequest(version, 0)
144146
val rpcFlow = flow.filterIsInstance<Either.Right<Nothing, JsonRPCResponse>>().map { it.value }
147+
var requestId = 0
148+
149+
val version = ServerVersion()
150+
sendRequest(version, requestId++)
145151
val theirVersion = parseJsonResponse(version, rpcFlow.first())
146152
require(theirVersion is ServerVersionResponse) { "invalid server version response $theirVersion" }
147153
logger.info { "server version $theirVersion" }
148-
sendRequest(HeaderSubscription, 0)
154+
155+
sendRequest(HeaderSubscription, requestId++)
149156
val header = parseJsonResponse(HeaderSubscription, rpcFlow.first())
150157
require(header is HeaderSubscriptionResponse) { "invalid header subscription response $header" }
158+
159+
suspend fun estimateFee(confirmations: Int): EstimateFeeResponse {
160+
val request = EstimateFees(confirmations)
161+
sendRequest(request, requestId++)
162+
val response = parseJsonResponse(request, rpcFlow.first())
163+
require(response is EstimateFeeResponse) { "invalid estimatefee response $response" }
164+
return response
165+
}
166+
167+
val fees = listOf(estimateFee(2), estimateFee(6), estimateFee(18), estimateFee(144))
168+
logger.info { "onchain fees $fees" }
169+
val feeRates = OnChainFeerates(
170+
fundingFeerate = fees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)),
171+
mutualCloseFeerate = fees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)),
172+
claimMainFeerate = fees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)),
173+
fastFeerate = fees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat))
174+
)
151175
_notifications.emit(header)
152-
_connectionStatus.value = ElectrumConnectionStatus.Connected(theirVersion, header.blockHeight, header.header)
176+
_connectionStatus.value = ElectrumConnectionStatus.Connected(theirVersion, header.blockHeight, header.header, feeRates)
153177
logger.info { "server tip $header" }
154178

155179
// pending requests map
156180
val requestMap = mutableMapOf<Int, Pair<ElectrumRequest, CompletableDeferred<ElectrumResponse>>>()
157-
var requestId = 0
158181

159182
// reset mailbox
160183
mailbox.cancel(CancellationException("connection in progress"))

src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,11 @@ class Peer(
193193
}
194194
}
195195
launch {
196-
watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.collect {
197-
// onchain fees are retrieved punctually, when electrum status moves to Connection.ESTABLISHED
198-
// since the application is not running most of the time, and when it is, it will be only for a few minutes, this is good enough.
199-
// (for a node that is online most of the time things would be different and we would need to re-evaluate onchain fee estimates on a regular basis)
200-
updateEstimateFees()
196+
watcher.client.connectionStatus.filterIsInstance<ElectrumConnectionStatus.Connected>().collect {
197+
// Onchain fees are retrieved once when we establish a connection to an electrum server.
198+
// It is acceptable since the application will typically not be running more than a few minutes at a time.
199+
// (for a node that is online most of the time things would be different, and we would need to re-evaluate onchain fee estimates on a regular basis)
200+
onChainFeeratesFlow.value = it.onchainFeeRates
201201
}
202202
}
203203
launch {
@@ -255,24 +255,6 @@ class Peer(
255255
}
256256
}
257257

258-
private suspend fun updateEstimateFees() {
259-
watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.first()
260-
val sortedFees = listOf(
261-
watcher.client.estimateFees(2),
262-
watcher.client.estimateFees(6),
263-
watcher.client.estimateFees(18),
264-
watcher.client.estimateFees(144),
265-
)
266-
logger.info { "on-chain fees: $sortedFees" }
267-
// TODO: If some feerates are null, we may implement a retry
268-
onChainFeeratesFlow.value = OnChainFeerates(
269-
fundingFeerate = sortedFees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)),
270-
mutualCloseFeerate = sortedFees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)),
271-
claimMainFeerate = sortedFees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)),
272-
fastFeerate = sortedFees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat))
273-
)
274-
}
275-
276258
fun connect() {
277259
if (connectionState.value is Connection.CLOSED) establishConnection()
278260
else logger.warning { "Peer is already connecting / connected" }

src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,18 @@ interface TcpSocket {
2121
suspend fun receiveFully(buffer: ByteArray, offset: Int, length: Int)
2222
suspend fun receiveAvailable(buffer: ByteArray, offset: Int, length: Int): Int
2323

24+
fun linesFlow(): Flow<String> {
25+
return flow {
26+
val buffer = ByteArray(8192)
27+
while (true) {
28+
val size = receiveAvailable(buffer)
29+
emit(buffer.subArray(size))
30+
}
31+
}
32+
.decodeToString()
33+
.splitByLines()
34+
}
35+
2436
suspend fun startTls(tls: TLS): TcpSocket
2537

2638
fun close()
@@ -71,14 +83,3 @@ internal expect object PlatformSocketBuilder : TcpSocket.Builder
7183

7284
suspend fun TcpSocket.receiveFully(size: Int): ByteArray =
7385
ByteArray(size).also { receiveFully(it) }
74-
75-
fun TcpSocket.linesFlow(): Flow<String> =
76-
flow {
77-
val buffer = ByteArray(8192)
78-
while (true) {
79-
val size = receiveAvailable(buffer)
80-
emit(buffer.subArray(size))
81-
}
82-
}
83-
.decodeToString()
84-
.splitByLines()

src/commonTest/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientTest.kt

Lines changed: 81 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,20 @@ package fr.acinq.lightning.blockchain.electrum
22

33
import fr.acinq.bitcoin.*
44
import fr.acinq.lightning.blockchain.fee.FeeratePerKw
5+
import fr.acinq.lightning.io.TcpSocket
56
import fr.acinq.lightning.tests.utils.LightningTestSuite
67
import fr.acinq.lightning.tests.utils.runSuspendTest
78
import fr.acinq.lightning.utils.Connection
9+
import fr.acinq.lightning.utils.ServerAddress
810
import fr.acinq.lightning.utils.toByteVector32
911
import fr.acinq.secp256k1.Hex
10-
import kotlinx.coroutines.CoroutineScope
11-
import kotlinx.coroutines.flow.first
12-
import kotlinx.coroutines.joinAll
13-
import kotlinx.coroutines.launch
12+
import kotlinx.coroutines.*
13+
import kotlinx.coroutines.flow.*
14+
import kotlinx.serialization.json.Json
15+
import kotlinx.serialization.json.jsonObject
16+
import kotlinx.serialization.json.jsonPrimitive
17+
import org.kodein.log.LoggerFactory
18+
import org.kodein.log.newLogger
1419
import kotlin.test.*
1520
import kotlin.time.Duration.Companion.seconds
1621

@@ -177,4 +182,76 @@ class ElectrumClientTest : LightningTestSuite() {
177182

178183
client.stop()
179184
}
185+
186+
@OptIn(DelicateCoroutinesApi::class)
187+
@Test
188+
fun `catch coroutine errors`() {
189+
val myCustomError = "this is a test error"
190+
191+
class MyTcpSocket() : TcpSocket {
192+
val output = MutableSharedFlow<String>()
193+
override suspend fun send(bytes: ByteArray?, offset: Int, length: Int, flush: Boolean) {
194+
if (bytes != null) {
195+
CoroutineScope(Dispatchers.IO).launch {
196+
val encoded = bytes.decodeToString(offset, offset + length)
197+
val request = Json.parseToJsonElement(encoded)
198+
val response = when (request.jsonObject["method"]!!.jsonPrimitive.content) {
199+
"server.version" -> """{"jsonrpc": "2.0", "result": ["ElectrumX 1.15.0", "1.4"], "id": 0}"""
200+
"blockchain.headers.subscribe" -> """{"jsonrpc": "2.0", "result": {"hex": "000080209a35ef4422bc37b0e1c3df9d32cfaaef6a6d31047c0202000000000000000000b9f14c32922d305844c739829ef13df9d188953e74a392720c02eeadd93acbf9ae22a464be8e05174bc5c367", "height": 797144}, "id": 1}"""
201+
"blockchain.estimatefee" -> """{"jsonrpc": "2.0", "error": {"code": 42, "message": "$myCustomError"}, "id": 2}""" // we return an error, as if estimatefee had failed
202+
else -> """{"jsonrpc": "2.0", "error": {"code": 43, "message": "unhandled request"}, "id": 2}"""
203+
}
204+
output.emit(response)
205+
}
206+
}
207+
}
208+
209+
override suspend fun receiveFully(buffer: ByteArray, offset: Int, length: Int) = TODO("Not yet implemented")
210+
override suspend fun receiveAvailable(buffer: ByteArray, offset: Int, length: Int): Int = TODO("Not yet implemented")
211+
override suspend fun startTls(tls: TcpSocket.TLS): TcpSocket = TODO("Not yet implemented")
212+
override fun close() {}
213+
override fun linesFlow(): Flow<String> = output.asSharedFlow()
214+
}
215+
216+
class MyBuilder() : TcpSocket.Builder {
217+
override suspend fun connect(host: String, port: Int, tls: TcpSocket.TLS, loggerFactory: LoggerFactory): TcpSocket {
218+
return MyTcpSocket()
219+
}
220+
}
221+
222+
val errorFlow = MutableStateFlow<Throwable?>(null)
223+
val loggerFactory = LoggerFactory.default
224+
val logger = loggerFactory.newLogger(this::class)
225+
val myErrorHandler = CoroutineExceptionHandler { _, e ->
226+
logger.error(e) { "error caught in custom exception handler" }
227+
errorFlow.value = e
228+
}
229+
230+
runBlocking(Dispatchers.IO) {
231+
withTimeout(15.seconds) {
232+
val builder = MyBuilder()
233+
// from Kotlin's documentation:
234+
// all children coroutines (coroutines created in the context of another Job) delegate handling of their exceptions to their parent coroutine, which
235+
// also delegates to the parent, and so on until the root, so the CoroutineExceptionHandler installed in their context is never used
236+
// => here we need to create a new root scope (or we could use GlobalScope) otherwise our exception handler will not be used
237+
val client = ElectrumClient(builder, GlobalScope, LoggerFactory.default, myErrorHandler)
238+
client.connect(ServerAddress("my-test-node", 50002, TcpSocket.TLS.DISABLED)) // address and port do not matter, but we cannot use TLS (not implemented, see above)
239+
errorFlow.filterNotNull().first { it.message!!.contains(myCustomError) }
240+
client.stop()
241+
}
242+
243+
// if we use runBlocking's scope, our exception handler will not be used
244+
errorFlow.value = null
245+
val error = assertFails {
246+
withTimeout(15.seconds) {
247+
val builder = MyBuilder()
248+
val client = ElectrumClient(builder, this, LoggerFactory.default, myErrorHandler)
249+
client.connect(ServerAddress("my-test-node", 50002, TcpSocket.TLS.DISABLED)) // address and port do not matter, but we cannot use TLS (not implemented, see above)
250+
errorFlow.filterNotNull().first { it.message!!.contains(myCustomError) }
251+
client.stop()
252+
}
253+
}
254+
assertTrue(error.message!!.contains(myCustomError))
255+
}
256+
}
180257
}

0 commit comments

Comments
 (0)