Skip to content

Commit 5df3734

Browse files
committed
Store incoming peers with channels in PeersDb
Once we have a channel with a peer that connected to us, we store their details in our DB. We don't store the address they're connecting from, because we don't know if we will be able to connect to them using this address, but we store their features.
1 parent 32af0de commit 5df3734

File tree

3 files changed

+133
-31
lines changed

3 files changed

+133
-31
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,9 @@ class Peer(val nodeParams: NodeParams,
9292
} else {
9393
None
9494
}
95-
goto(DISCONNECTED) using DisconnectedData(channels, activeChannels = Set.empty, PeerStorage(peerStorageData, written = true)) // when we restart, we will attempt to reconnect right away, but then we'll wait
95+
// When we restart, we will attempt to reconnect right away, but then we'll wait.
96+
// We don't fetch our peer's features from the DB: if the connection succeeds, we will get them from their init message, which saves a DB call.
97+
goto(DISCONNECTED) using DisconnectedData(channels, activeChannels = Set.empty, PeerStorage(peerStorageData, written = true), remoteFeatures_opt = None)
9698
}
9799

98100
when(DISCONNECTED) {
@@ -154,7 +156,14 @@ class Peer(val nodeParams: NodeParams,
154156
if (!d.peerStorage.written && !isTimerActive(WritePeerStorageTimerKey)) {
155157
startSingleTimer(WritePeerStorageTimerKey, WritePeerStorage, nodeParams.peerStorageConfig.writeDelay)
156158
}
157-
stay() using d.copy(activeChannels = d.activeChannels + e.channelId)
159+
val remoteFeatures_opt = d.remoteFeatures_opt match {
160+
case Some(remoteFeatures) if !remoteFeatures.written =>
161+
// We have a channel, so we can write to the DB without any DoS risk.
162+
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(remoteFeatures.features, None))
163+
Some(remoteFeatures.copy(written = true))
164+
case _ => d.remoteFeatures_opt
165+
}
166+
stay() using d.copy(activeChannels = d.activeChannels + e.channelId, remoteFeatures_opt = remoteFeatures_opt)
158167

159168
case Event(e: LocalChannelDown, d: DisconnectedData) =>
160169
stay() using d.copy(activeChannels = d.activeChannels - e.channelId)
@@ -452,7 +461,11 @@ class Peer(val nodeParams: NodeParams,
452461
if (!d.peerStorage.written && !isTimerActive(WritePeerStorageTimerKey)) {
453462
startSingleTimer(WritePeerStorageTimerKey, WritePeerStorage, nodeParams.peerStorageConfig.writeDelay)
454463
}
455-
stay() using d.copy(activeChannels = d.activeChannels + e.channelId)
464+
if (!d.remoteFeaturesWritten) {
465+
// We have a channel, so we can write to the DB without any DoS risk.
466+
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(d.remoteFeatures, None))
467+
}
468+
stay() using d.copy(activeChannels = d.activeChannels + e.channelId, remoteFeaturesWritten = true)
456469

457470
case Event(e: LocalChannelDown, d: ConnectedData) =>
458471
stay() using d.copy(activeChannels = d.activeChannels - e.channelId)
@@ -497,7 +510,8 @@ class Peer(val nodeParams: NodeParams,
497510
stopPeer(d.peerStorage)
498511
} else {
499512
d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
500-
goto(DISCONNECTED) using DisconnectedData(d.channels.collect { case (k: FinalChannelId, v) => (k, v) }, d.activeChannels, d.peerStorage)
513+
val lastRemoteFeatures = LastRemoteFeatures(d.remoteFeatures, d.remoteFeaturesWritten)
514+
goto(DISCONNECTED) using DisconnectedData(d.channels.collect { case (k: FinalChannelId, v) => (k, v) }, d.activeChannels, d.peerStorage, Some(lastRemoteFeatures))
501515
}
502516

503517
case Event(ChannelTerminated(actor), d: ConnectedData) =>
@@ -600,12 +614,22 @@ class Peer(val nodeParams: NodeParams,
600614

601615
case Event(r: GetPeerInfo, d) =>
602616
val replyTo = r.replyTo.getOrElse(sender().toTyped)
603-
val peerInfo = d match {
604-
case c: ConnectedData => PeerInfo(self, remoteNodeId, stateName, Some(c.remoteFeatures), Some(c.address), c.channels.values.toSet)
605-
case _ => PeerInfo(self, remoteNodeId, stateName, None, None, d.channels.values.toSet)
617+
d match {
618+
case c: ConnectedData =>
619+
replyTo ! PeerInfo(self, remoteNodeId, stateName, Some(c.remoteFeatures), Some(c.address), c.channels.values.toSet)
620+
stay()
621+
case d: DisconnectedData =>
622+
// If we haven't reconnected since our last restart, we fetch the latest remote features from our DB.
623+
val remoteFeatures_opt = d.remoteFeatures_opt match {
624+
case Some(remoteFeatures) => Some(remoteFeatures)
625+
case None => nodeParams.db.peers.getPeer(remoteNodeId).map(nodeInfo => LastRemoteFeatures(nodeInfo.features, written = true))
626+
}
627+
replyTo ! PeerInfo(self, remoteNodeId, stateName, remoteFeatures_opt.map(_.features), None, d.channels.values.toSet)
628+
stay() using d.copy(remoteFeatures_opt = remoteFeatures_opt)
629+
case _ =>
630+
replyTo ! PeerInfo(self, remoteNodeId, stateName, None, None, d.channels.values.toSet)
631+
stay()
606632
}
607-
replyTo ! peerInfo
608-
stay()
609633

610634
case Event(r: GetPeerChannels, d) =>
611635
if (d.channels.isEmpty) {
@@ -813,7 +837,13 @@ class Peer(val nodeParams: NodeParams,
813837
// We store the node address and features upon successful outgoing connection, so we can reconnect later.
814838
// The previous address is overwritten: we don't need it since the current one works.
815839
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(connectionReady.remoteInit.features, Some(connectionReady.address)))
840+
} else if (channels.nonEmpty) {
841+
// If this is an incoming connection, we only store the peer details in our DB if we have channels with them.
842+
// Otherwise nodes could DoS by simply connecting to us to force us to store data in our DB.
843+
// We don't update the remote address, we don't know if we would successfully connect using the current one.
844+
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(connectionReady.remoteInit.features, None))
816845
}
846+
val remoteFeaturesWritten = connectionReady.outgoing || channels.nonEmpty
817847

818848
// If we have some data stored from our peer, we send it to them before doing anything else.
819849
peerStorage.data.foreach(connectionReady.peerConnection ! PeerStorageRetrieval(_))
@@ -835,7 +865,7 @@ class Peer(val nodeParams: NodeParams,
835865
connectionReady.peerConnection ! CurrentFeeCredit(nodeParams.chainHash, feeCredit.getOrElse(0 msat))
836866
}
837867

838-
goto(CONNECTED) using ConnectedData(connectionReady.address, connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit, channels, activeChannels, feerates, None, peerStorage)
868+
goto(CONNECTED) using ConnectedData(connectionReady.address, connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit, channels, activeChannels, feerates, None, peerStorage, remoteFeaturesWritten)
839869
}
840870

841871
/**
@@ -976,6 +1006,8 @@ object Peer {
9761006

9771007
case class PeerStorage(data: Option[ByteVector], written: Boolean)
9781008

1009+
case class LastRemoteFeatures(features: Features[InitFeature], written: Boolean)
1010+
9791011
sealed trait Data {
9801012
def channels: Map[_ <: ChannelId, ActorRef] // will be overridden by Map[FinalChannelId, ActorRef] or Map[ChannelId, ActorRef]
9811013
def activeChannels: Set[ByteVector32] // channels that are available to process payments
@@ -986,8 +1018,8 @@ object Peer {
9861018
override def activeChannels: Set[ByteVector32] = Set.empty
9871019
override def peerStorage: PeerStorage = PeerStorage(None, written = true)
9881020
}
989-
case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], activeChannels: Set[ByteVector32], peerStorage: PeerStorage) extends Data
990-
case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], activeChannels: Set[ByteVector32], currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates], peerStorage: PeerStorage) extends Data {
1021+
case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], activeChannels: Set[ByteVector32], peerStorage: PeerStorage, remoteFeatures_opt: Option[LastRemoteFeatures]) extends Data
1022+
case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], activeChannels: Set[ByteVector32], currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates], peerStorage: PeerStorage, remoteFeaturesWritten: Boolean) extends Data {
9911023
val connectionInfo: ConnectionInfo = ConnectionInfo(address, peerConnection, localInit, remoteInit)
9921024
def localFeatures: Features[InitFeature] = localInit.features
9931025
def remoteFeatures: Features[InitFeature] = remoteInit.features

eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -768,7 +768,7 @@ class PeerSpec extends FixtureSpec {
768768
channel.expectMsg(open)
769769
}
770770

771-
test("peer storage") { f =>
771+
test("store remote peer storage once we have channels") { f =>
772772
import f._
773773

774774
// We connect with a previous backup.
@@ -781,7 +781,6 @@ class PeerSpec extends FixtureSpec {
781781
peerConnection1.send(peer, PeerStorageStore(hex"0123456789"))
782782

783783
// We disconnect and reconnect, sending the last backup we received.
784-
peer ! Peer.Disconnect(f.remoteNodeId)
785784
val peerConnection2 = TestProbe()
786785
connect(remoteNodeId, peer, peerConnection2, switchboard, channels = Set(ChannelCodecsSpec.normal), initializePeer = false, peerStorage = Some(hex"0123456789"))
787786
peerConnection2.send(peer, PeerStorageStore(hex"1111"))
@@ -801,6 +800,82 @@ class PeerSpec extends FixtureSpec {
801800
assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111"))
802801
}
803802

803+
test("store remote features when channel confirms") { f =>
804+
import f._
805+
806+
// When we make an outgoing connection, we store the peer details in our DB.
807+
assert(nodeParams.db.peers.getPeer(remoteNodeId).isEmpty)
808+
connect(remoteNodeId, peer, peerConnection, switchboard)
809+
val Some(nodeInfo1) = nodeParams.db.peers.getPeer(remoteNodeId)
810+
assert(nodeInfo1.features == TestConstants.Bob.nodeParams.features.initFeatures())
811+
assert(nodeInfo1.address_opt.contains(fakeIPAddress))
812+
813+
// We disconnect and our peer connects to us: we don't have any channel, so we don't update the DB entry.
814+
val peerConnection2 = TestProbe()
815+
val address2 = Tor3("of7husrflx7sforh3fw6yqlpwstee3wg5imvvmkp4bz6rbjxtg5nljad", 9735)
816+
val remoteFeatures2 = Features(Features.ChannelType -> FeatureSupport.Mandatory).initFeatures()
817+
switchboard.send(peer, PeerConnection.ConnectionReady(peerConnection2.ref, remoteNodeId, address2, outgoing = false, protocol.Init(Features.empty), protocol.Init(remoteFeatures2)))
818+
val probe = TestProbe()
819+
probe.send(peer, Peer.GetPeerInfo(Some(probe.ref.toTyped)))
820+
assert(probe.expectMsgType[Peer.PeerInfo].address.contains(address2))
821+
assert(nodeParams.db.peers.getPeer(remoteNodeId).contains(nodeInfo1))
822+
823+
// A channel is created, so we update the remote features in our DB.
824+
// We don't update the address because this was an incoming connection.
825+
peer ! ChannelReadyForPayments(ActorRef.noSender, remoteNodeId, randomBytes32(), 0)
826+
probe.send(peer, Peer.GetPeerInfo(Some(probe.ref.toTyped)))
827+
assert(probe.expectMsgType[Peer.PeerInfo].features.contains(remoteFeatures2))
828+
assert(nodeParams.db.peers.getPeer(remoteNodeId).contains(nodeInfo1.copy(features = remoteFeatures2)))
829+
}
830+
831+
test("store remote features when channel confirms while disconnected") { f =>
832+
import f._
833+
834+
// When we receive an incoming connection, we don't store the peer details in our DB.
835+
assert(nodeParams.db.peers.getPeer(remoteNodeId).isEmpty)
836+
switchboard.send(peer, Peer.Init(Set.empty, Map.empty))
837+
val localInit = protocol.Init(peer.underlyingActor.nodeParams.features.initFeatures())
838+
val remoteInit = protocol.Init(TestConstants.Bob.nodeParams.features.initFeatures())
839+
switchboard.send(peer, PeerConnection.ConnectionReady(peerConnection.ref, remoteNodeId, fakeIPAddress, outgoing = false, localInit, remoteInit))
840+
val probe = TestProbe()
841+
probe.send(peer, Peer.GetPeerInfo(Some(probe.ref.toTyped)))
842+
assert(probe.expectMsgType[Peer.PeerInfo].state == Peer.CONNECTED)
843+
assert(nodeParams.db.peers.getPeer(remoteNodeId).isEmpty)
844+
845+
// Our peer wants to open a channel to us, but we disconnect before we have a confirmed channel.
846+
peer ! SpawnChannelNonInitiator(Left(createOpenChannelMessage()), ChannelConfig.standard, ChannelTypes.Standard(), None, localParams, peerConnection.ref)
847+
peer ! Peer.ConnectionDown(peerConnection.ref)
848+
probe.send(peer, Peer.GetPeerInfo(Some(probe.ref.toTyped)))
849+
assert(probe.expectMsgType[Peer.PeerInfo].state == Peer.DISCONNECTED)
850+
assert(nodeParams.db.peers.getPeer(remoteNodeId).isEmpty)
851+
852+
// The channel confirms, so we store the remote features in our DB.
853+
// We don't store the remote address because this was an incoming connection.
854+
peer ! ChannelReadyForPayments(ActorRef.noSender, remoteNodeId, randomBytes32(), 0)
855+
probe.send(peer, Peer.GetPeerInfo(Some(probe.ref.toTyped)))
856+
assert(probe.expectMsgType[Peer.PeerInfo].state == Peer.DISCONNECTED)
857+
assert(nodeParams.db.peers.getPeer(remoteNodeId).contains(NodeInfo(remoteInit.features, None)))
858+
}
859+
860+
test("get remote features from DB") { f =>
861+
import f._
862+
863+
// We have information about one of our peers in our DB.
864+
val nodeInfo = NodeInfo(TestConstants.Bob.nodeParams.features.initFeatures(), None)
865+
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, nodeInfo)
866+
867+
// We initialize ourselves after a restart, but our peer doesn't reconnect immediately to us.
868+
switchboard.send(peer, Peer.Init(Set(ChannelCodecsSpec.normal), Map.empty))
869+
// When we request information about the peer, we will fetch it from the DB.
870+
val probe = TestProbe()
871+
probe.send(peer, Peer.GetPeerInfo(Some(probe.ref.toTyped)))
872+
val peerInfo = probe.expectMsgType[Peer.PeerInfo]
873+
assert(peerInfo.state == Peer.DISCONNECTED)
874+
assert(peerInfo.address.isEmpty)
875+
assert(peerInfo.features.contains(nodeInfo.features))
876+
assert(nodeParams.db.peers.getPeer(remoteNodeId).contains(nodeInfo))
877+
}
878+
804879
}
805880

806881
object PeerSpec {

eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
3838
private val recommendedFeerates = RecommendedFeerates(Block.RegtestGenesisBlock.hash, TestConstants.feeratePerKw, TestConstants.anchorOutputsFeeratePerKw)
3939

4040
private val PeerNothingData = Peer.Nothing
41-
private val PeerDisconnectedData = Peer.DisconnectedData(channels, activeChannels = Set.empty, PeerStorage(None, written = true))
42-
private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, activeChannels = Set.empty, recommendedFeerates, None, PeerStorage(None, written = true))
41+
private val PeerDisconnectedData = Peer.DisconnectedData(channels, activeChannels = Set.empty, PeerStorage(None, written = true), remoteFeatures_opt = None)
42+
private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, activeChannels = Set.empty, recommendedFeerates, None, PeerStorage(None, written = true), remoteFeaturesWritten = true)
4343

4444
case class FixtureParam(nodeParams: NodeParams, remoteNodeId: PublicKey, reconnectionTask: TestFSMRef[ReconnectionTask.State, ReconnectionTask.Data, ReconnectionTask], monitor: TestProbe)
4545

@@ -82,7 +82,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
8282
import f._
8383

8484
val peer = TestProbe()
85-
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty, activeChannels = Set.empty, PeerStorage(None, written = true))))
85+
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty, activeChannels = Set.empty, PeerStorage(None, written = true), None)))
8686
monitor.expectNoMessage()
8787
}
8888

@@ -205,7 +205,6 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
205205
peer.send(reconnectionTask, Peer.Transition(PeerDisconnectedData, PeerConnectedData))
206206
// we cancel the reconnection and go to idle state
207207
val TransitionWithData(ReconnectionTask.WAITING, ReconnectionTask.IDLE, _, _) = monitor.expectMsgType[TransitionWithData]
208-
209208
}
210209

211210
test("reconnect using the address from node_announcement") { f =>
@@ -232,39 +231,35 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
232231
val tor = NodeAddress.fromParts("iq7zhmhck54vcax2vlrdcavq2m32wao7ekh6jyeglmnuuvv3js57r4id.onion", 9735).get
233232

234233
// NB: we don't test randomization here, but it makes tests unnecessary more complex for little value
235-
236234
{
237235
// tor not supported: always return clearnet addresses
238236
nodeParams.socksProxy_opt returns None
239-
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet)) == Some(clearnet))
240-
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(tor)) == None)
241-
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet, tor)) == Some(clearnet))
237+
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet)).contains(clearnet))
238+
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(tor)).isEmpty)
239+
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet, tor)).contains(clearnet))
242240
}
243-
244241
{
245242
// tor supported but not enabled for clearnet addresses: return clearnet addresses when available
246243
val socksParams = mock[Socks5ProxyParams]
247244
socksParams.useForTor returns true
248245
socksParams.useForIPv4 returns false
249246
socksParams.useForIPv6 returns false
250247
nodeParams.socksProxy_opt returns Some(socksParams)
251-
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet)) == Some(clearnet))
252-
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(tor)) == Some(tor))
253-
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet, tor)) == Some(clearnet))
248+
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet)).contains(clearnet))
249+
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(tor)).contains(tor))
250+
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet, tor)).contains(clearnet))
254251
}
255-
256252
{
257253
// tor supported and enabled for clearnet addresses: return tor addresses when available
258254
val socksParams = mock[Socks5ProxyParams]
259255
socksParams.useForTor returns true
260256
socksParams.useForIPv4 returns true
261257
socksParams.useForIPv6 returns true
262258
nodeParams.socksProxy_opt returns Some(socksParams)
263-
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet)) == Some(clearnet))
264-
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(tor)) == Some(tor))
265-
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet, tor)) == Some(tor))
259+
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet)).contains(clearnet))
260+
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(tor)).contains(tor))
261+
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet, tor)).contains(tor))
266262
}
267-
268263
}
269264

270265
}

0 commit comments

Comments
 (0)