Skip to content

Commit 0c9b78c

Browse files
committed
Store incoming peers with channels in PeersDb
Once we have a channel with a peer that connected to us, we store their details in our DB. We don't store the address they're connecting from, because we don't know if we will be able to connect to them using this address, but we store their features.
1 parent d0a0589 commit 0c9b78c

File tree

3 files changed

+133
-31
lines changed

3 files changed

+133
-31
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,9 @@ class Peer(val nodeParams: NodeParams,
9292
} else {
9393
None
9494
}
95-
goto(DISCONNECTED) using DisconnectedData(channels, activeChannels = Set.empty, PeerStorage(peerStorageData, written = true)) // when we restart, we will attempt to reconnect right away, but then we'll wait
95+
// When we restart, we will attempt to reconnect right away, but then we'll wait.
96+
// We don't fetch our peer's features from the DB: if the connection succeeds, we will get them from their init message, which saves a DB call.
97+
goto(DISCONNECTED) using DisconnectedData(channels, activeChannels = Set.empty, PeerStorage(peerStorageData, written = true), remoteFeatures_opt = None)
9698
}
9799

98100
when(DISCONNECTED) {
@@ -150,7 +152,14 @@ class Peer(val nodeParams: NodeParams,
150152
if (!d.peerStorage.written && !isTimerActive(WritePeerStorageTimerKey)) {
151153
startSingleTimer(WritePeerStorageTimerKey, WritePeerStorage, nodeParams.peerStorageConfig.writeDelay)
152154
}
153-
stay() using d.copy(activeChannels = d.activeChannels + e.channelId)
155+
val remoteFeatures_opt = d.remoteFeatures_opt match {
156+
case Some(remoteFeatures) if !remoteFeatures.written =>
157+
// We have a channel, so we can write to the DB without any DoS risk.
158+
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(remoteFeatures.features, None))
159+
Some(remoteFeatures.copy(written = true))
160+
case _ => d.remoteFeatures_opt
161+
}
162+
stay() using d.copy(activeChannels = d.activeChannels + e.channelId, remoteFeatures_opt = remoteFeatures_opt)
154163

155164
case Event(e: LocalChannelDown, d: DisconnectedData) =>
156165
stay() using d.copy(activeChannels = d.activeChannels - e.channelId)
@@ -447,7 +456,11 @@ class Peer(val nodeParams: NodeParams,
447456
if (!d.peerStorage.written && !isTimerActive(WritePeerStorageTimerKey)) {
448457
startSingleTimer(WritePeerStorageTimerKey, WritePeerStorage, nodeParams.peerStorageConfig.writeDelay)
449458
}
450-
stay() using d.copy(activeChannels = d.activeChannels + e.channelId)
459+
if (!d.remoteFeaturesWritten) {
460+
// We have a channel, so we can write to the DB without any DoS risk.
461+
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(d.remoteFeatures, None))
462+
}
463+
stay() using d.copy(activeChannels = d.activeChannels + e.channelId, remoteFeaturesWritten = true)
451464

452465
case Event(e: LocalChannelDown, d: ConnectedData) =>
453466
stay() using d.copy(activeChannels = d.activeChannels - e.channelId)
@@ -492,7 +505,8 @@ class Peer(val nodeParams: NodeParams,
492505
stopPeer(d.peerStorage)
493506
} else {
494507
d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
495-
goto(DISCONNECTED) using DisconnectedData(d.channels.collect { case (k: FinalChannelId, v) => (k, v) }, d.activeChannels, d.peerStorage)
508+
val lastRemoteFeatures = LastRemoteFeatures(d.remoteFeatures, d.remoteFeaturesWritten)
509+
goto(DISCONNECTED) using DisconnectedData(d.channels.collect { case (k: FinalChannelId, v) => (k, v) }, d.activeChannels, d.peerStorage, Some(lastRemoteFeatures))
496510
}
497511

498512
case Event(Terminated(actor), d: ConnectedData) if d.channels.values.toSet.contains(actor) =>
@@ -587,12 +601,22 @@ class Peer(val nodeParams: NodeParams,
587601

588602
case Event(r: GetPeerInfo, d) =>
589603
val replyTo = r.replyTo.getOrElse(sender().toTyped)
590-
val peerInfo = d match {
591-
case c: ConnectedData => PeerInfo(self, remoteNodeId, stateName, Some(c.remoteFeatures), Some(c.address), c.channels.values.toSet)
592-
case _ => PeerInfo(self, remoteNodeId, stateName, None, None, d.channels.values.toSet)
604+
d match {
605+
case c: ConnectedData =>
606+
replyTo ! PeerInfo(self, remoteNodeId, stateName, Some(c.remoteFeatures), Some(c.address), c.channels.values.toSet)
607+
stay()
608+
case d: DisconnectedData =>
609+
// If we haven't reconnected since our last restart, we fetch the latest remote features from our DB.
610+
val remoteFeatures_opt = d.remoteFeatures_opt match {
611+
case Some(remoteFeatures) => Some(remoteFeatures)
612+
case None => nodeParams.db.peers.getPeer(remoteNodeId).map(nodeInfo => LastRemoteFeatures(nodeInfo.features, written = true))
613+
}
614+
replyTo ! PeerInfo(self, remoteNodeId, stateName, remoteFeatures_opt.map(_.features), None, d.channels.values.toSet)
615+
stay() using d.copy(remoteFeatures_opt = remoteFeatures_opt)
616+
case _ =>
617+
replyTo ! PeerInfo(self, remoteNodeId, stateName, None, None, d.channels.values.toSet)
618+
stay()
593619
}
594-
replyTo ! peerInfo
595-
stay()
596620

597621
case Event(r: GetPeerChannels, d) =>
598622
if (d.channels.isEmpty) {
@@ -804,7 +828,13 @@ class Peer(val nodeParams: NodeParams,
804828
// We store the node address and features upon successful outgoing connection, so we can reconnect later.
805829
// The previous address is overwritten: we don't need it since the current one works.
806830
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(connectionReady.remoteInit.features, Some(connectionReady.address)))
831+
} else if (channels.nonEmpty) {
832+
// If this is an incoming connection, we only store the peer details in our DB if we have channels with them.
833+
// Otherwise nodes could DoS by simply connecting to us to force us to store data in our DB.
834+
// We don't update the remote address, we don't know if we would successfully connect using the current one.
835+
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(connectionReady.remoteInit.features, None))
807836
}
837+
val remoteFeaturesWritten = connectionReady.outgoing || channels.nonEmpty
808838

809839
// If we have some data stored from our peer, we send it to them before doing anything else.
810840
peerStorage.data.foreach(connectionReady.peerConnection ! PeerStorageRetrieval(_))
@@ -826,7 +856,7 @@ class Peer(val nodeParams: NodeParams,
826856
connectionReady.peerConnection ! CurrentFeeCredit(nodeParams.chainHash, feeCredit.getOrElse(0 msat))
827857
}
828858

829-
goto(CONNECTED) using ConnectedData(connectionReady.address, connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit, channels, activeChannels, feerates, None, peerStorage)
859+
goto(CONNECTED) using ConnectedData(connectionReady.address, connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit, channels, activeChannels, feerates, None, peerStorage, remoteFeaturesWritten)
830860
}
831861

832862
/**
@@ -967,6 +997,8 @@ object Peer {
967997

968998
case class PeerStorage(data: Option[ByteVector], written: Boolean)
969999

1000+
case class LastRemoteFeatures(features: Features[InitFeature], written: Boolean)
1001+
9701002
sealed trait Data {
9711003
def channels: Map[_ <: ChannelId, ActorRef] // will be overridden by Map[FinalChannelId, ActorRef] or Map[ChannelId, ActorRef]
9721004
def activeChannels: Set[ByteVector32] // channels that are available to process payments
@@ -977,8 +1009,8 @@ object Peer {
9771009
override def activeChannels: Set[ByteVector32] = Set.empty
9781010
override def peerStorage: PeerStorage = PeerStorage(None, written = true)
9791011
}
980-
case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], activeChannels: Set[ByteVector32], peerStorage: PeerStorage) extends Data
981-
case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], activeChannels: Set[ByteVector32], currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates], peerStorage: PeerStorage) extends Data {
1012+
case class DisconnectedData(channels: Map[FinalChannelId, ActorRef], activeChannels: Set[ByteVector32], peerStorage: PeerStorage, remoteFeatures_opt: Option[LastRemoteFeatures]) extends Data
1013+
case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef], activeChannels: Set[ByteVector32], currentFeerates: RecommendedFeerates, previousFeerates_opt: Option[RecommendedFeerates], peerStorage: PeerStorage, remoteFeaturesWritten: Boolean) extends Data {
9821014
val connectionInfo: ConnectionInfo = ConnectionInfo(address, peerConnection, localInit, remoteInit)
9831015
def localFeatures: Features[InitFeature] = localInit.features
9841016
def remoteFeatures: Features[InitFeature] = remoteInit.features

eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -755,7 +755,7 @@ class PeerSpec extends FixtureSpec {
755755
channel.expectMsg(open)
756756
}
757757

758-
test("peer storage") { f =>
758+
test("store remote peer storage once we have channels") { f =>
759759
import f._
760760

761761
// We connect with a previous backup.
@@ -768,7 +768,6 @@ class PeerSpec extends FixtureSpec {
768768
peerConnection1.send(peer, PeerStorageStore(hex"0123456789"))
769769

770770
// We disconnect and reconnect, sending the last backup we received.
771-
peer ! Peer.Disconnect(f.remoteNodeId)
772771
val peerConnection2 = TestProbe()
773772
connect(remoteNodeId, peer, peerConnection2, switchboard, channels = Set(ChannelCodecsSpec.normal), initializePeer = false, peerStorage = Some(hex"0123456789"))
774773
peerConnection2.send(peer, PeerStorageStore(hex"1111"))
@@ -788,6 +787,82 @@ class PeerSpec extends FixtureSpec {
788787
assert(nodeParams.db.peers.getStorage(remoteNodeId).contains(hex"1111"))
789788
}
790789

790+
test("store remote features when channel confirms") { f =>
791+
import f._
792+
793+
// When we make an outgoing connection, we store the peer details in our DB.
794+
assert(nodeParams.db.peers.getPeer(remoteNodeId).isEmpty)
795+
connect(remoteNodeId, peer, peerConnection, switchboard)
796+
val Some(nodeInfo1) = nodeParams.db.peers.getPeer(remoteNodeId)
797+
assert(nodeInfo1.features == TestConstants.Bob.nodeParams.features.initFeatures())
798+
assert(nodeInfo1.address_opt.contains(fakeIPAddress))
799+
800+
// We disconnect and our peer connects to us: we don't have any channel, so we don't update the DB entry.
801+
val peerConnection2 = TestProbe()
802+
val address2 = Tor3("of7husrflx7sforh3fw6yqlpwstee3wg5imvvmkp4bz6rbjxtg5nljad", 9735)
803+
val remoteFeatures2 = Features(Features.ChannelType -> FeatureSupport.Mandatory).initFeatures()
804+
switchboard.send(peer, PeerConnection.ConnectionReady(peerConnection2.ref, remoteNodeId, address2, outgoing = false, protocol.Init(Features.empty), protocol.Init(remoteFeatures2)))
805+
val probe = TestProbe()
806+
probe.send(peer, Peer.GetPeerInfo(Some(probe.ref.toTyped)))
807+
assert(probe.expectMsgType[Peer.PeerInfo].address.contains(address2))
808+
assert(nodeParams.db.peers.getPeer(remoteNodeId).contains(nodeInfo1))
809+
810+
// A channel is created, so we update the remote features in our DB.
811+
// We don't update the address because this was an incoming connection.
812+
peer ! ChannelReadyForPayments(ActorRef.noSender, remoteNodeId, randomBytes32(), 0)
813+
probe.send(peer, Peer.GetPeerInfo(Some(probe.ref.toTyped)))
814+
assert(probe.expectMsgType[Peer.PeerInfo].features.contains(remoteFeatures2))
815+
assert(nodeParams.db.peers.getPeer(remoteNodeId).contains(nodeInfo1.copy(features = remoteFeatures2)))
816+
}
817+
818+
test("store remote features when channel confirms while disconnected") { f =>
819+
import f._
820+
821+
// When we receive an incoming connection, we don't store the peer details in our DB.
822+
assert(nodeParams.db.peers.getPeer(remoteNodeId).isEmpty)
823+
switchboard.send(peer, Peer.Init(Set.empty, Map.empty))
824+
val localInit = protocol.Init(peer.underlyingActor.nodeParams.features.initFeatures())
825+
val remoteInit = protocol.Init(TestConstants.Bob.nodeParams.features.initFeatures())
826+
switchboard.send(peer, PeerConnection.ConnectionReady(peerConnection.ref, remoteNodeId, fakeIPAddress, outgoing = false, localInit, remoteInit))
827+
val probe = TestProbe()
828+
probe.send(peer, Peer.GetPeerInfo(Some(probe.ref.toTyped)))
829+
assert(probe.expectMsgType[Peer.PeerInfo].state == Peer.CONNECTED)
830+
assert(nodeParams.db.peers.getPeer(remoteNodeId).isEmpty)
831+
832+
// Our peer wants to open a channel to us, but we disconnect before we have a confirmed channel.
833+
peer ! SpawnChannelNonInitiator(Left(createOpenChannelMessage()), ChannelConfig.standard, ChannelTypes.Standard(), None, localParams, peerConnection.ref)
834+
peer ! Peer.ConnectionDown(peerConnection.ref)
835+
probe.send(peer, Peer.GetPeerInfo(Some(probe.ref.toTyped)))
836+
assert(probe.expectMsgType[Peer.PeerInfo].state == Peer.DISCONNECTED)
837+
assert(nodeParams.db.peers.getPeer(remoteNodeId).isEmpty)
838+
839+
// The channel confirms, so we store the remote features in our DB.
840+
// We don't store the remote address because this was an incoming connection.
841+
peer ! ChannelReadyForPayments(ActorRef.noSender, remoteNodeId, randomBytes32(), 0)
842+
probe.send(peer, Peer.GetPeerInfo(Some(probe.ref.toTyped)))
843+
assert(probe.expectMsgType[Peer.PeerInfo].state == Peer.DISCONNECTED)
844+
assert(nodeParams.db.peers.getPeer(remoteNodeId).contains(NodeInfo(remoteInit.features, None)))
845+
}
846+
847+
test("get remote features from DB") { f =>
848+
import f._
849+
850+
// We have information about one of our peers in our DB.
851+
val nodeInfo = NodeInfo(TestConstants.Bob.nodeParams.features.initFeatures(), None)
852+
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, nodeInfo)
853+
854+
// We initialize ourselves after a restart, but our peer doesn't reconnect immediately to us.
855+
switchboard.send(peer, Peer.Init(Set(ChannelCodecsSpec.normal), Map.empty))
856+
// When we request information about the peer, we will fetch it from the DB.
857+
val probe = TestProbe()
858+
probe.send(peer, Peer.GetPeerInfo(Some(probe.ref.toTyped)))
859+
val peerInfo = probe.expectMsgType[Peer.PeerInfo]
860+
assert(peerInfo.state == Peer.DISCONNECTED)
861+
assert(peerInfo.address.isEmpty)
862+
assert(peerInfo.features.contains(nodeInfo.features))
863+
assert(nodeParams.db.peers.getPeer(remoteNodeId).contains(nodeInfo))
864+
}
865+
791866
}
792867

793868
object PeerSpec {

eclair-core/src/test/scala/fr/acinq/eclair/io/ReconnectionTaskSpec.scala

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
3838
private val recommendedFeerates = RecommendedFeerates(Block.RegtestGenesisBlock.hash, TestConstants.feeratePerKw, TestConstants.anchorOutputsFeeratePerKw)
3939

4040
private val PeerNothingData = Peer.Nothing
41-
private val PeerDisconnectedData = Peer.DisconnectedData(channels, activeChannels = Set.empty, PeerStorage(None, written = true))
42-
private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, activeChannels = Set.empty, recommendedFeerates, None, PeerStorage(None, written = true))
41+
private val PeerDisconnectedData = Peer.DisconnectedData(channels, activeChannels = Set.empty, PeerStorage(None, written = true), remoteFeatures_opt = None)
42+
private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) }, activeChannels = Set.empty, recommendedFeerates, None, PeerStorage(None, written = true), remoteFeaturesWritten = true)
4343

4444
case class FixtureParam(nodeParams: NodeParams, remoteNodeId: PublicKey, reconnectionTask: TestFSMRef[ReconnectionTask.State, ReconnectionTask.Data, ReconnectionTask], monitor: TestProbe)
4545

@@ -82,7 +82,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
8282
import f._
8383

8484
val peer = TestProbe()
85-
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty, activeChannels = Set.empty, PeerStorage(None, written = true))))
85+
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, Peer.DisconnectedData(Map.empty, activeChannels = Set.empty, PeerStorage(None, written = true), None)))
8686
monitor.expectNoMessage()
8787
}
8888

@@ -205,7 +205,6 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
205205
peer.send(reconnectionTask, Peer.Transition(PeerDisconnectedData, PeerConnectedData))
206206
// we cancel the reconnection and go to idle state
207207
val TransitionWithData(ReconnectionTask.WAITING, ReconnectionTask.IDLE, _, _) = monitor.expectMsgType[TransitionWithData]
208-
209208
}
210209

211210
test("reconnect using the address from node_announcement") { f =>
@@ -232,39 +231,35 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
232231
val tor = NodeAddress.fromParts("iq7zhmhck54vcax2vlrdcavq2m32wao7ekh6jyeglmnuuvv3js57r4id.onion", 9735).get
233232

234233
// NB: we don't test randomization here, but it makes tests unnecessary more complex for little value
235-
236234
{
237235
// tor not supported: always return clearnet addresses
238236
nodeParams.socksProxy_opt returns None
239-
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet)) == Some(clearnet))
240-
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(tor)) == None)
241-
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet, tor)) == Some(clearnet))
237+
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet)).contains(clearnet))
238+
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(tor)).isEmpty)
239+
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet, tor)).contains(clearnet))
242240
}
243-
244241
{
245242
// tor supported but not enabled for clearnet addresses: return clearnet addresses when available
246243
val socksParams = mock[Socks5ProxyParams]
247244
socksParams.useForTor returns true
248245
socksParams.useForIPv4 returns false
249246
socksParams.useForIPv6 returns false
250247
nodeParams.socksProxy_opt returns Some(socksParams)
251-
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet)) == Some(clearnet))
252-
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(tor)) == Some(tor))
253-
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet, tor)) == Some(clearnet))
248+
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet)).contains(clearnet))
249+
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(tor)).contains(tor))
250+
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet, tor)).contains(clearnet))
254251
}
255-
256252
{
257253
// tor supported and enabled for clearnet addresses: return tor addresses when available
258254
val socksParams = mock[Socks5ProxyParams]
259255
socksParams.useForTor returns true
260256
socksParams.useForIPv4 returns true
261257
socksParams.useForIPv6 returns true
262258
nodeParams.socksProxy_opt returns Some(socksParams)
263-
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet)) == Some(clearnet))
264-
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(tor)) == Some(tor))
265-
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet, tor)) == Some(tor))
259+
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet)).contains(clearnet))
260+
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(tor)).contains(tor))
261+
assert(ReconnectionTask.selectNodeAddress(nodeParams, List(clearnet, tor)).contains(tor))
266262
}
267-
268263
}
269264

270265
}

0 commit comments

Comments
 (0)