Skip to content

Commit d0a0589

Browse files
committed
Store remote features in PeersDb
We only store information about our peers when we succeed in making an outgoing connection to them. The only informaiton we stored was the address that we used when connecting. We now also store the features supported by our peer when we last connected to them. It's important to note that this means that we currently don't store peers that always connect to us (e.g. mobile wallet users that never have a stable address): we will address that in the next commit.
1 parent db93cbe commit d0a0589

File tree

9 files changed

+275
-140
lines changed

9 files changed

+275
-140
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import fr.acinq.eclair.payment._
1212
import fr.acinq.eclair.payment.relay.OnTheFlyFunding
1313
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
1414
import fr.acinq.eclair.router.Router
15-
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAddress, NodeAnnouncement}
15+
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, NodeInfo}
1616
import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli, TimestampSecond}
1717
import grizzled.slf4j.Logging
1818
import scodec.bits.ByteVector
@@ -264,22 +264,22 @@ case class DualPeersDb(primary: PeersDb, secondary: PeersDb) extends PeersDb {
264264

265265
private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-peers").build()))
266266

267-
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, address: NodeAddress): Unit = {
268-
runAsync(secondary.addOrUpdatePeer(nodeId, address))
269-
primary.addOrUpdatePeer(nodeId, address)
267+
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, nodeInfo: NodeInfo): Unit = {
268+
runAsync(secondary.addOrUpdatePeer(nodeId, nodeInfo))
269+
primary.addOrUpdatePeer(nodeId, nodeInfo)
270270
}
271271

272272
override def removePeer(nodeId: Crypto.PublicKey): Unit = {
273273
runAsync(secondary.removePeer(nodeId))
274274
primary.removePeer(nodeId)
275275
}
276276

277-
override def getPeer(nodeId: Crypto.PublicKey): Option[NodeAddress] = {
277+
override def getPeer(nodeId: Crypto.PublicKey): Option[NodeInfo] = {
278278
runAsync(secondary.getPeer(nodeId))
279279
primary.getPeer(nodeId)
280280
}
281281

282-
override def listPeers(): Map[Crypto.PublicKey, NodeAddress] = {
282+
override def listPeers(): Map[Crypto.PublicKey, NodeInfo] = {
283283
runAsync(secondary.listPeers())
284284
primary.listPeers()
285285
}

eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,21 @@
1717
package fr.acinq.eclair.db
1818

1919
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
20-
import fr.acinq.eclair.TimestampSecond
20+
import fr.acinq.eclair.{Features, InitFeature, TimestampSecond}
2121
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
22-
import fr.acinq.eclair.wire.protocol.NodeAddress
22+
import fr.acinq.eclair.wire.protocol.{NodeAddress, NodeInfo}
2323
import scodec.bits.ByteVector
2424

25+
/** The PeersDb contains information about our direct peers, with whom we have or had channels. */
2526
trait PeersDb {
2627

27-
def addOrUpdatePeer(nodeId: PublicKey, address: NodeAddress): Unit
28+
def addOrUpdatePeer(nodeId: PublicKey, nodeInfo: NodeInfo): Unit
2829

2930
def removePeer(nodeId: PublicKey): Unit
3031

31-
def getPeer(nodeId: PublicKey): Option[NodeAddress]
32+
def getPeer(nodeId: PublicKey): Option[NodeInfo]
3233

33-
def listPeers(): Map[PublicKey, NodeAddress]
34+
def listPeers(): Map[PublicKey, NodeInfo]
3435

3536
def addOrUpdateRelayFees(nodeId: PublicKey, fees: RelayFees): Unit
3637

eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala

Lines changed: 59 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,22 @@ package fr.acinq.eclair.db.pg
1818

1919
import fr.acinq.bitcoin.scalacompat.Crypto
2020
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
21-
import fr.acinq.eclair.{MilliSatoshi, TimestampSecond}
2221
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
2322
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
2423
import fr.acinq.eclair.db.PeersDb
2524
import fr.acinq.eclair.db.pg.PgUtils.PgLock
2625
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
2726
import fr.acinq.eclair.wire.protocol._
27+
import fr.acinq.eclair.{Features, MilliSatoshi, TimestampSecond}
2828
import grizzled.slf4j.Logging
29-
import scodec.bits.{BitVector, ByteVector}
29+
import scodec.bits.ByteVector
3030

3131
import java.sql.Statement
3232
import javax.sql.DataSource
3333

3434
object PgPeersDb {
3535
val DB_NAME = "peers"
36-
val CURRENT_VERSION = 4
36+
val CURRENT_VERSION = 5
3737
}
3838

3939
class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logging {
@@ -59,16 +59,22 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
5959
statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON local.peer_storage(removed_peer_at)")
6060
}
6161

62+
def migration45(statement: Statement): Unit = {
63+
statement.executeUpdate("ALTER TABLE local.peers RENAME COLUMN data TO node_address")
64+
statement.executeUpdate("ALTER TABLE local.peers ALTER COLUMN node_address DROP NOT NULL")
65+
statement.executeUpdate("ALTER TABLE local.peers ADD COLUMN node_features BYTEA")
66+
}
67+
6268
using(pg.createStatement()) { statement =>
6369
getVersion(statement, DB_NAME) match {
6470
case None =>
6571
statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local")
66-
statement.executeUpdate("CREATE TABLE local.peers (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL)")
72+
statement.executeUpdate("CREATE TABLE local.peers (node_id TEXT NOT NULL PRIMARY KEY, node_address BYTEA, node_features BYTEA)")
6773
statement.executeUpdate("CREATE TABLE local.relay_fees (node_id TEXT NOT NULL PRIMARY KEY, fee_base_msat BIGINT NOT NULL, fee_proportional_millionths BIGINT NOT NULL)")
6874
statement.executeUpdate("CREATE TABLE local.peer_storage (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, last_updated_at TIMESTAMP WITH TIME ZONE NOT NULL, removed_peer_at TIMESTAMP WITH TIME ZONE)")
6975

7076
statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON local.peer_storage(removed_peer_at)")
71-
case Some(v@(1 | 2 | 3)) =>
77+
case Some(v@(1 | 2 | 3 | 4)) =>
7278
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
7379
if (v < 2) {
7480
migration12(statement)
@@ -79,26 +85,47 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
7985
if (v < 4) {
8086
migration34(statement)
8187
}
88+
if (v < 5) {
89+
migration45(statement)
90+
}
8291
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
8392
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
8493
}
8594
setVersion(statement, DB_NAME, CURRENT_VERSION)
8695
}
8796
}
8897

89-
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, nodeaddress: NodeAddress): Unit = withMetrics("peers/add-or-update", DbBackends.Postgres) {
98+
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, nodeInfo: NodeInfo): Unit = withMetrics("peers/add-or-update", DbBackends.Postgres) {
9099
withLock { pg =>
91-
val data = CommonCodecs.nodeaddress.encode(nodeaddress).require.toByteArray
92-
using(pg.prepareStatement(
93-
"""
94-
| INSERT INTO local.peers (node_id, data)
95-
| VALUES (?, ?)
96-
| ON CONFLICT (node_id)
97-
| DO UPDATE SET data = EXCLUDED.data ;
98-
| """.stripMargin)) { statement =>
99-
statement.setString(1, nodeId.value.toHex)
100-
statement.setBytes(2, data)
101-
statement.executeUpdate()
100+
nodeInfo.address_opt match {
101+
case Some(address) =>
102+
val encodedAddress = CommonCodecs.nodeaddress.encode(address).require.toByteArray
103+
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(nodeInfo.features).require.toByteArray
104+
using(pg.prepareStatement(
105+
"""
106+
| INSERT INTO local.peers (node_id, node_address, node_features)
107+
| VALUES (?, ?, ?)
108+
| ON CONFLICT (node_id)
109+
| DO UPDATE SET node_address = EXCLUDED.node_address, node_features = EXCLUDED.node_features
110+
|""".stripMargin)) { statement =>
111+
statement.setString(1, nodeId.value.toHex)
112+
statement.setBytes(2, encodedAddress)
113+
statement.setBytes(3, encodedFeatures)
114+
statement.executeUpdate()
115+
}
116+
case None =>
117+
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(nodeInfo.features).require.toByteArray
118+
using(pg.prepareStatement(
119+
"""
120+
| INSERT INTO local.peers (node_id, node_address, node_features)
121+
| VALUES (?, NULL, ?)
122+
| ON CONFLICT (node_id)
123+
| DO UPDATE SET node_features = EXCLUDED.node_features
124+
|""".stripMargin)) { statement =>
125+
statement.setString(1, nodeId.value.toHex)
126+
statement.setBytes(2, encodedFeatures)
127+
statement.executeUpdate()
128+
}
102129
}
103130
}
104131
}
@@ -126,27 +153,29 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
126153
}
127154
}
128155

129-
override def getPeer(nodeId: PublicKey): Option[NodeAddress] = withMetrics("peers/get", DbBackends.Postgres) {
156+
override def getPeer(nodeId: PublicKey): Option[NodeInfo] = withMetrics("peers/get", DbBackends.Postgres) {
130157
withLock { pg =>
131-
using(pg.prepareStatement("SELECT data FROM local.peers WHERE node_id=?")) { statement =>
158+
using(pg.prepareStatement("SELECT node_address, node_features FROM local.peers WHERE node_id=?")) { statement =>
132159
statement.setString(1, nodeId.value.toHex)
133-
statement.executeQuery()
134-
.mapCodec(CommonCodecs.nodeaddress)
135-
.headOption
160+
statement.executeQuery().map { rs =>
161+
val nodeAddress_opt = rs.getBitVectorOpt("node_address").map(CommonCodecs.nodeaddress.decode(_).require.value)
162+
val nodeFeatures_opt = rs.getBitVectorOpt("node_features").map(CommonCodecs.initFeaturesCodec.decode(_).require.value)
163+
NodeInfo(nodeFeatures_opt.getOrElse(Features.empty), nodeAddress_opt)
164+
}.headOption
136165
}
137166
}
138167
}
139168

140-
override def listPeers(): Map[PublicKey, NodeAddress] = withMetrics("peers/list", DbBackends.Postgres) {
169+
override def listPeers(): Map[PublicKey, NodeInfo] = withMetrics("peers/list", DbBackends.Postgres) {
141170
withLock { pg =>
142-
using(pg.createStatement()) { statement =>
143-
statement.executeQuery("SELECT node_id, data FROM local.peers")
171+
using(pg.prepareStatement("SELECT node_id, node_address, node_features FROM local.peers")) { statement =>
172+
statement.executeQuery()
144173
.map { rs =>
145-
val nodeid = PublicKey(rs.getByteVectorFromHex("node_id"))
146-
val nodeaddress = CommonCodecs.nodeaddress.decode(BitVector(rs.getBytes("data"))).require.value
147-
nodeid -> nodeaddress
148-
}
149-
.toMap
174+
val nodeId = PublicKey(rs.getByteVectorFromHex("node_id"))
175+
val nodeAddress_opt = rs.getBitVectorOpt("node_address").map(CommonCodecs.nodeaddress.decode(_).require.value)
176+
val nodeFeatures_opt = rs.getBitVectorOpt("node_features").map(CommonCodecs.initFeaturesCodec.decode(_).require.value)
177+
nodeId -> NodeInfo(nodeFeatures_opt.getOrElse(Features.empty), nodeAddress_opt)
178+
}.toMap
150179
}
151180
}
152181
}

0 commit comments

Comments
 (0)