Skip to content

Commit 36822bb

Browse files
t-bastpm47
andauthored
Don't scan the blockchain for spent external channels (#3226)
When an external channel is spent, we don't immediately remove it from our network graph in case the spending transaction is a splice (see lightning/bolts#1270 for more details). A side-effect of this change, introduced in #2936, is that when we start watching a channel after receiving its `channel_announcement`, we will scan the blockchain if it is actually already spent. This can be expensive if peers send us `channel_announcement`s for channels that have been spent a long time ago since `bitcoind` doesn't provide an index for spending transactions. It is also misleading, because if we give up after scanning X blocks of the blockchain, we will create a log line saying that funds are at risk: they're never at risk since those are not our channels. This commit fixes this issue by only checking whether the channel is already spent by a confirmed transaction or not when setting the watch (which is an inexpensive and efficient RPC call to `bitcoind`), without scanning the blockchain to find the spending transaction. If it is already spent, we immediately remove it from our network graph, even if the spending transaction was actually a splice. This is fine, since that channel will be re-added to our graph whenever we receive the `channel_announcement` for the splice. In the worst case, we will simply not route through an actually available channels for a few blocks while its splice transaction is confirming. Co-authored-by: pm47 <pm.padiou@gmail.com>
1 parent 5aea514 commit 36822bb

File tree

8 files changed

+105
-82
lines changed

8 files changed

+105
-82
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/blockchain/Monitoring.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ object Monitoring {
2424

2525
object Metrics {
2626
val NewBlockCheckConfirmedDuration: Metric.Timer = Kamon.timer("bitcoin.watcher.newblock.checkconfirmed")
27+
val Watches: Metric.Gauge = Kamon.gauge("bitcoin.watcher.watches")
28+
val WatchedUtxos: Metric.Gauge = Kamon.gauge("bitcoin.watcher.utxos")
2729
val RpcBasicInvokeCount: Metric.Counter = Kamon.counter("bitcoin.rpc.basic.invoke.count")
2830
val RpcBasicInvokeDuration: Metric.Timer = Kamon.timer("bitcoin.rpc.basic.invoke.duration")
2931
val RpcBatchInvokeDuration: Metric.Timer = Kamon.timer("bitcoin.rpc.batch.invoke.duration")
@@ -45,6 +47,7 @@ object Monitoring {
4547
val Wallet = "wallet"
4648
val Priority = "priority"
4749
val Provider = "provider"
50+
val WatchType = "watch-type"
4851

4952
object Priorities {
5053
val Minimum = "0-minimum"

eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala

Lines changed: 57 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,10 @@ object ZmqWatcher {
121121
}
122122

123123
/** This event is sent when a [[WatchSpent]] condition is met. */
124-
sealed trait WatchSpentTriggered extends WatchTriggered {
125-
/** Transaction spending the watched outpoint. */
126-
def spendingTx: Transaction
127-
}
124+
sealed trait WatchSpentTriggered extends WatchTriggered
128125

129126
case class WatchExternalChannelSpent(replyTo: ActorRef[WatchExternalChannelSpentTriggered], txId: TxId, outputIndex: Int, shortChannelId: RealShortChannelId) extends WatchSpent[WatchExternalChannelSpentTriggered] { override def hints: Set[TxId] = Set.empty }
130-
case class WatchExternalChannelSpentTriggered(shortChannelId: RealShortChannelId, spendingTx: Transaction) extends WatchSpentTriggered
127+
case class WatchExternalChannelSpentTriggered(shortChannelId: RealShortChannelId, spendingTx_opt: Option[Transaction]) extends WatchSpentTriggered
131128
case class UnwatchExternalChannelSpent(txId: TxId, outputIndex: Int) extends Command
132129

133130
case class WatchFundingSpent(replyTo: ActorRef[WatchFundingSpentTriggered], txId: TxId, outputIndex: Int, hints: Set[TxId]) extends WatchSpent[WatchFundingSpentTriggered]
@@ -231,7 +228,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
231228
.flatMap(watchedUtxos.get)
232229
.flatten
233230
.foreach {
234-
case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId, tx))
231+
case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId, Some(tx)))
235232
case w: WatchFundingSpent => context.self ! TriggerEvent(w.replyTo, w, WatchFundingSpentTriggered(tx))
236233
case w: WatchOutputSpent => context.self ! TriggerEvent(w.replyTo, w, WatchOutputSpentTriggered(w.amount, tx))
237234
case _: WatchPublished => // nothing to do
@@ -322,12 +319,14 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
322319
blockHeight.set(currentHeight.toLong)
323320
context.system.eventStream ! EventStream.Publish(CurrentBlockHeight(currentHeight))
324321
// TODO: should we try to mitigate the herd effect and not check all watches immediately?
325-
val watchExternalChannelCount = watches.keySet.count(_.isInstanceOf[WatchExternalChannelSpent])
326-
val watchFundingSpentCount = watches.keySet.count(_.isInstanceOf[WatchFundingSpent])
327-
val watchOutputSpentCount = watches.keySet.count(_.isInstanceOf[WatchOutputSpent])
328-
val watchPublishedCount = watches.keySet.count(_.isInstanceOf[WatchPublished])
329-
val watchConfirmedCount = watches.keySet.count(_.isInstanceOf[WatchConfirmed[_]])
330-
log.info("{} watched utxos: external-channels={}, funding-spent={}, output-spent={}, tx-published={}, tx-confirmed={}", watchedUtxos.size, watchExternalChannelCount, watchFundingSpentCount, watchOutputSpentCount, watchPublishedCount, watchConfirmedCount)
322+
Metrics.WatchedUtxos.withoutTags().update(watchedUtxos.size)
323+
log.info("currently watching {} utxos with {} watches", watchedUtxos.size, watches.size)
324+
watches.keys
325+
.groupBy(_.getClass.getSimpleName)
326+
.foreach { case (t, list) =>
327+
Metrics.Watches.withTag(Monitoring.Tags.WatchType, t).update(list.size)
328+
log.info("we have {} {} currently registered", list.size, t)
329+
}
331330
KamonExt.timeFuture(Metrics.NewBlockCheckConfirmedDuration.withoutTags()) {
332331
Future.sequence(watches.collect {
333332
case (w: WatchPublished, _) => checkPublished(w)
@@ -429,41 +428,54 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
429428
}
430429

431430
private def checkSpent(w: WatchSpent[_ <: WatchSpentTriggered]): Future[Unit] = {
432-
// first let's see if the parent tx was published or not
431+
// First let's see if the parent tx was published or not before checking whether it has been spent.
433432
client.getTxConfirmations(w.txId).collect {
434-
case Some(_) =>
435-
// parent tx was published, we need to make sure this particular output has not been spent
436-
client.isTransactionOutputSpendable(w.txId, w.outputIndex, includeMempool = true).collect {
437-
case false =>
438-
// the output has been spent, let's find the spending tx
439-
// if we know some potential spending txs, we try to fetch them directly
440-
Future.sequence(w.hints.map(txid => client.getTransaction(txid).map(Some(_)).recover { case _ => None }))
441-
.map(_.flatten) // filter out errors and hint transactions that can't be found
442-
.map(hintTxs => {
443-
hintTxs.find(tx => tx.txIn.exists(i => i.outPoint.txid == w.txId && i.outPoint.index == w.outputIndex)) match {
444-
case Some(spendingTx) =>
445-
log.info(s"${w.txId}:${w.outputIndex} has already been spent by a tx provided in hints: txid=${spendingTx.txid}")
446-
context.self ! ProcessNewTransaction(spendingTx)
447-
case None =>
448-
// The hints didn't help us, let's search for the spending transaction.
449-
log.info(s"${w.txId}:${w.outputIndex} has already been spent, looking for the spending tx in the mempool")
450-
client.lookForMempoolSpendingTx(w.txId, w.outputIndex).map(Some(_)).recover { case _ => None }.map {
451-
case Some(spendingTx) =>
452-
log.info(s"found tx spending ${w.txId}:${w.outputIndex} in the mempool: txid=${spendingTx.txid}")
453-
context.self ! ProcessNewTransaction(spendingTx)
454-
case None =>
455-
// no luck, we have to do it the hard way...
456-
log.warn(s"${w.txId}:${w.outputIndex} has already been spent, spending tx not in the mempool, looking in the blockchain...")
457-
client.lookForSpendingTx(None, w.txId, w.outputIndex, nodeParams.channelConf.maxChannelSpentRescanBlocks).map { spendingTx =>
458-
log.warn(s"found the spending tx of ${w.txId}:${w.outputIndex} in the blockchain: txid=${spendingTx.txid}")
433+
case Some(_) => w match {
434+
case w: WatchExternalChannelSpent =>
435+
// This is an external channels: funds are not at risk, so we don't need to scan the blockchain to find the
436+
// spending transaction, it is costly and unnecessary. We simply check whether the output has already been
437+
// spent by a confirmed transaction.
438+
client.isTransactionOutputSpent(w.txId, w.outputIndex).collect {
439+
case true =>
440+
// The output has been spent, so we trigger the watch without including the spending transaction.
441+
context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId, None))
442+
}
443+
case _ =>
444+
// The parent tx was published, we need to make sure this particular output has not been spent.
445+
client.isTransactionOutputSpendable(w.txId, w.outputIndex, includeMempool = true).collect {
446+
case false =>
447+
// The output has been spent, let's find the spending tx.
448+
// If we know some potential spending txs, we try to fetch them directly.
449+
Future.sequence(w.hints.map(txid => client.getTransaction(txid).map(Some(_)).recover { case _ => None }))
450+
.map(_.flatten) // filter out errors and hint transactions that can't be found
451+
.map(hintTxs => {
452+
hintTxs.find(tx => tx.txIn.exists(i => i.outPoint.txid == w.txId && i.outPoint.index == w.outputIndex)) match {
453+
case Some(spendingTx) =>
454+
log.info("{}:{} has already been spent by a tx provided in hints: txid={}", w.txId, w.outputIndex, spendingTx.txid)
455+
context.self ! ProcessNewTransaction(spendingTx)
456+
case None =>
457+
// The hints didn't help us, let's search for the spending transaction in the mempool.
458+
log.info("{}:{} has already been spent, looking for the spending tx in the mempool", w.txId, w.outputIndex)
459+
client.lookForMempoolSpendingTx(w.txId, w.outputIndex).map(Some(_)).recover { case _ => None }.map {
460+
case Some(spendingTx) =>
461+
log.info("found tx spending {}:{} in the mempool: txid={}", w.txId, w.outputIndex, spendingTx.txid)
459462
context.self ! ProcessNewTransaction(spendingTx)
460-
}.recover {
461-
case _ => log.warn(s"could not find the spending tx of ${w.txId}:${w.outputIndex} in the blockchain, funds are at risk")
462-
}
463-
}
464-
}
465-
})
466-
}
463+
case None =>
464+
// The spending transaction isn't in the mempool, so it must be a transaction that confirmed
465+
// before we set the watch. We have to scan the blockchain to find it, which is expensive
466+
// since bitcoind doesn't provide indexes for this scenario.
467+
log.warn("{}:{} has already been spent, spending tx not in the mempool, looking in the blockchain...", w.txId, w.outputIndex)
468+
client.lookForSpendingTx(None, w.txId, w.outputIndex, nodeParams.channelConf.maxChannelSpentRescanBlocks).map { spendingTx =>
469+
log.warn("found the spending tx of {}:{} in the blockchain: txid={}", w.txId, w.outputIndex, spendingTx.txid)
470+
context.self ! ProcessNewTransaction(spendingTx)
471+
}.recover {
472+
case _ => log.warn("could not find the spending tx of {}:{} in the blockchain, funds are at risk", w.txId, w.outputIndex)
473+
}
474+
}
475+
}
476+
})
477+
}
478+
}
467479
}
468480
}
469481

eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -263,15 +263,24 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
263263
case Event(r: ValidateResult, d) =>
264264
stay() using Validation.handleChannelValidationResponse(d, nodeParams, watcher, r)
265265

266-
case Event(WatchExternalChannelSpentTriggered(shortChannelId, spendingTx), d) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) =>
266+
case Event(WatchExternalChannelSpentTriggered(shortChannelId, spendingTx_opt), d) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) =>
267267
val fundingTxId = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get.fundingTxId
268-
log.info("funding tx txId={} of channelId={} has been spent by txId={}: waiting for the spending tx to have enough confirmations before removing the channel from the graph", fundingTxId, shortChannelId, spendingTx.txid)
269-
watcher ! WatchTxConfirmed(self, spendingTx.txid, nodeParams.routerConf.channelSpentSpliceDelay)
270-
stay() using d.copy(spentChannels = d.spentChannels.updated(spendingTx.txid, d.spentChannels.getOrElse(spendingTx.txid, Set.empty) + shortChannelId))
268+
spendingTx_opt match {
269+
case Some(spendingTx) =>
270+
log.info("funding tx txId={} of channelId={} has been spent by txId={}: waiting for the spending tx to have enough confirmations before removing the channel from the graph", fundingTxId, shortChannelId, spendingTx.txid)
271+
watcher ! WatchTxConfirmed(self, spendingTx.txid, nodeParams.routerConf.channelSpentSpliceDelay)
272+
stay() using d.copy(spentChannels = d.spentChannels.updated(spendingTx.txid, d.spentChannels.getOrElse(spendingTx.txid, Set.empty) + shortChannelId))
273+
case None =>
274+
// If the channel was spent by a transaction that is already confirmed, it would be very inefficient to scan
275+
// the blockchain for the spending transaction (which could have confirmed a long time ago), just to forget
276+
// that channel. We skip scanning the blockchain and immediately forget the channel.
277+
log.info("funding tx txId={} of channelId={} has already been spent by a confirmed transaction: removing the channel from the graph immediately", fundingTxId, shortChannelId)
278+
stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, None, Set(shortChannelId))
279+
}
271280

272281
case Event(WatchTxConfirmedTriggered(_, _, spendingTx), d) =>
273282
d.spentChannels.get(spendingTx.txid) match {
274-
case Some(shortChannelIds) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, spendingTx.txid, shortChannelIds)
283+
case Some(shortChannelIds) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, Some(spendingTx.txid), shortChannelIds)
275284
case None => stay()
276285
}
277286

eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ object Validation {
268268
} else d1
269269
}
270270

271-
def handleChannelSpent(d: Data, watcher: typed.ActorRef[ZmqWatcher.Command], db: NetworkDb, spendingTxId: TxId, shortChannelIds: Set[RealShortChannelId])(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
271+
def handleChannelSpent(d: Data, watcher: typed.ActorRef[ZmqWatcher.Command], db: NetworkDb, spendingTxId_opt: Option[TxId], shortChannelIds: Set[RealShortChannelId])(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
272272
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
273273
val lostChannels = shortChannelIds.flatMap(shortChannelId => d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)))
274274
log.info("funding tx for channelIds={} was spent", shortChannelIds.mkString(","))
@@ -277,7 +277,7 @@ object Validation {
277277
val prunedChannels1 = d.prunedChannels -- shortChannelIds
278278
val lostNodes = lostChannels.flatMap(lostChannel => Seq(lostChannel.nodeId1, lostChannel.nodeId2).filterNot(nodeId => hasChannels(nodeId, channels1.values)))
279279
// let's clean the db and send the events
280-
log.info("pruning shortChannelIds={} (spent)", shortChannelIds.mkString(","))
280+
log.info("pruning shortChannelIds={} (spent)", shortChannelIds.mkString(","))
281281
shortChannelIds.foreach(db.removeChannel(_)) // NB: this also removes channel updates
282282
// we also need to remove updates from the graph
283283
val graphWithBalances1 = lostChannels.foldLeft(d.graphWithBalances) { (graph, lostChannel) =>
@@ -298,13 +298,12 @@ object Validation {
298298
// we will re-add a spliced channel as a new channel later when we receive the announcement
299299
watcher ! UnwatchExternalChannelSpent(lostChannel.fundingTxId, outputIndex(lostChannel.ann.shortChannelId))
300300
}
301-
302301
// We may have received RBF candidates for this splice: we can find them by looking at transactions that spend one
303302
// of the channels we're removing (note that they may spend a slightly different set of channels).
304303
// Those transactions cannot confirm anymore (they have been double-spent by the current one), so we should stop
305304
// watching them.
306305
val spendingTxs = d.spentChannels.filter(_._2.intersect(shortChannelIds).nonEmpty).keySet
307-
(spendingTxs - spendingTxId).foreach(txId => watcher ! UnwatchTxConfirmed(txId))
306+
(spendingTxs -- spendingTxId_opt.toSet).foreach(txId => watcher ! UnwatchTxConfirmed(txId))
308307
val spentChannels1 = d.spentChannels -- spendingTxs
309308
d.copy(nodes = d.nodes -- lostNodes, channels = channels1, prunedChannels = prunedChannels1, graphWithBalances = graphWithBalances1, spentChannels = spentChannels1)
310309
}

eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -264,15 +264,15 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind
264264
bitcoinClient.publishTransaction(tx1)
265265
// tx and tx1 aren't confirmed yet, but we trigger the WatchSpentTriggered event when we see tx1 in the mempool.
266266
probe.expectMsgAllOf(
267-
WatchExternalChannelSpentTriggered(RealShortChannelId(5), tx1),
267+
WatchExternalChannelSpentTriggered(RealShortChannelId(5), Some(tx1)),
268268
WatchFundingSpentTriggered(tx1)
269269
)
270270
// Let's confirm tx and tx1: seeing tx1 in a block should trigger both WatchSpentTriggered events again.
271271
bitcoinClient.getBlockHeight().pipeTo(probe.ref)
272272
val initialBlockHeight = probe.expectMsgType[BlockHeight]
273273
generateBlocks(1)
274274
probe.expectMsgAllOf(
275-
WatchExternalChannelSpentTriggered(RealShortChannelId(5), tx1),
275+
WatchExternalChannelSpentTriggered(RealShortChannelId(5), Some(tx1)),
276276
WatchFundingSpentTriggered(tx1)
277277
)
278278
probe.expectNoMessage(100 millis)
@@ -311,8 +311,10 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind
311311
probe.fishForMessage() { case m: WatchOutputSpentTriggered => m.spendingTx.txid == tx1.txid }
312312
watcher ! StopWatching(probe.ref)
313313

314+
// If we watch after being spent by a confirmed transaction, we immediately trigger the watch without fetching
315+
// the spending transaction.
314316
watcher ! WatchExternalChannelSpent(probe.ref, tx1.txid, 0, RealShortChannelId(1))
315-
probe.expectMsg(WatchExternalChannelSpentTriggered(RealShortChannelId(1), tx2))
317+
probe.expectMsg(WatchExternalChannelSpentTriggered(RealShortChannelId(1), None))
316318
watcher ! StopWatching(probe.ref)
317319
watcher ! WatchFundingSpent(probe.ref, tx1.txid, 0, Set.empty)
318320
probe.expectMsg(WatchFundingSpentTriggered(tx2))
@@ -340,7 +342,7 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind
340342
// When publishing the transaction, the watch triggers immediately.
341343
bitcoinClient.publishTransaction(spendingTx1).pipeTo(sender.ref)
342344
sender.expectMsg(spendingTx1.txid)
343-
probe.expectMsg(WatchExternalChannelSpentTriggered(RealShortChannelId(3), spendingTx1))
345+
probe.expectMsg(WatchExternalChannelSpentTriggered(RealShortChannelId(3), Some(spendingTx1)))
344346
probe.expectNoMessage(100 millis)
345347

346348
// If we unwatch the transaction, we will ignore when it's published.
@@ -351,7 +353,7 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind
351353

352354
// If we watch again, this will trigger immediately because the transaction is in the mempool.
353355
watcher ! WatchExternalChannelSpent(probe.ref, tx2.txid, outputIndex2, RealShortChannelId(5))
354-
probe.expectMsg(WatchExternalChannelSpentTriggered(RealShortChannelId(5), spendingTx2))
356+
probe.expectMsg(WatchExternalChannelSpentTriggered(RealShortChannelId(5), Some(spendingTx2)))
355357
probe.expectNoMessage(100 millis)
356358

357359
// We make the transactions confirm while we're not watching.
@@ -365,7 +367,7 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind
365367

366368
// If we watch again after confirmation, the watch instantly triggers.
367369
watcher ! WatchExternalChannelSpent(probe.ref, tx1.txid, outputIndex1, RealShortChannelId(3))
368-
probe.expectMsg(WatchExternalChannelSpentTriggered(RealShortChannelId(3), spendingTx1))
370+
probe.expectMsg(WatchExternalChannelSpentTriggered(RealShortChannelId(3), None))
369371
probe.expectNoMessage(100 millis)
370372
})
371373
}

0 commit comments

Comments
 (0)