Skip to content

Commit 00fe7a3

Browse files
Only sync with top peers (#2983)
By default we sync with every peer when reconnecting, which can be a lot. We now only sync on reconnection with our top peers (by capacity of our shared channels).
1 parent 06eb44a commit 00fe7a3

File tree

11 files changed

+147
-59
lines changed

11 files changed

+147
-59
lines changed

docs/release-notes/eclair-vnext.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@ There are many organisations that package Java runtimes and development kits, fo
2424

2525
### Miscellaneous improvements and bug fixes
2626

27-
<insert changes>
27+
#### Gossip sync limits
28+
29+
On reconnection, eclair now only synchronizes its routing table with a small number of top peers instead of synchronizing with every peer.
30+
If you already use `sync-whitelist`, the default behavior has been modified and you must set `router.sync.peer-limit = 0` to keep preventing any synchronization with other nodes.
31+
You must also use `router.sync.whitelist` instead of `sync-whitelist`.
2832

2933
## Verifying signatures
3034

eclair-core/src/main/resources/reference.conf

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ eclair {
9696
# features { }
9797
# }
9898
]
99-
sync-whitelist = [] // a list of public keys; if non-empty, we will only do the initial sync with those peers
10099

101100
channel {
102101
channel-flags {
@@ -411,6 +410,8 @@ eclair {
411410
request-node-announcements = true // if true we will ask for node announcements when we receive channel ids that we don't know
412411
channel-range-chunk-size = 1500 // max number of short_channel_ids (+ timestamps + checksums) in reply_channel_range *do not change this unless you know what you are doing*
413412
channel-query-chunk-size = 100 // max number of short_channel_ids in query_short_channel_ids *do not change this unless you know what you are doing*
413+
peer-limit = 5 // number of peers to do the initial sync with. We limit the initial sync to the peers that have the largest capacity with us when starting the node.
414+
whitelist = [] // a list of public keys to do the initial sync with, in addition to the top peers by capacity
414415
}
415416

416417
message-path-finding {

eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, Re
3535
import fr.acinq.eclair.router.Announcements.AddressException
3636
import fr.acinq.eclair.router.Graph.{HeuristicsConstants, PaymentWeightRatios}
3737
import fr.acinq.eclair.router.Router._
38-
import fr.acinq.eclair.router.{Graph, PathFindingExperimentConf}
38+
import fr.acinq.eclair.router.{Graph, PathFindingExperimentConf, Router}
3939
import fr.acinq.eclair.tor.Socks5ProxyParams
4040
import fr.acinq.eclair.transactions.Transactions
4141
import fr.acinq.eclair.wire.protocol._
@@ -66,7 +66,6 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
6666
torAddress_opt: Option[NodeAddress],
6767
features: Features[Feature],
6868
private val overrideInitFeatures: Map[PublicKey, Features[InitFeature]],
69-
syncWhitelist: Set[PublicKey],
7069
pluginParams: Seq[PluginParams],
7170
channelConf: ChannelConf,
7271
onChainFeeConf: OnChainFeeConf,
@@ -319,6 +318,7 @@ object NodeParams extends Logging {
319318
"on-chain-fees.target-blocks" -> "on-chain-fees.confirmation-priority",
320319
// v0.12.0
321320
"channel.mindepth-blocks" -> "channel.min-depth-funding-blocks",
321+
"sync-whitelist" -> "router.sync.whitelist",
322322
)
323323
deprecatedKeyPaths.foreach {
324324
case (old, new_) => require(!config.hasPath(old), s"configuration key '$old' has been replaced by '$new_'")
@@ -413,8 +413,6 @@ object NodeParams extends Logging {
413413
p -> (f.copy(unknown = f.unknown ++ pluginMessageParams.map(_.pluginFeature)): Features[InitFeature])
414414
}.toMap
415415

416-
val syncWhitelist: Set[PublicKey] = config.getStringList("sync-whitelist").asScala.map(s => PublicKey(ByteVector.fromValidHex(s))).toSet
417-
418416
val socksProxy_opt = parseSocks5ProxyParams(config)
419417

420418
val publicTorAddress_opt = if (config.getBoolean("tor.publish-onion-address")) torAddress_opt else None
@@ -561,7 +559,6 @@ object NodeParams extends Logging {
561559
features = coreAndPluginFeatures,
562560
pluginParams = pluginParams,
563561
overrideInitFeatures = overrideInitFeatures,
564-
syncWhitelist = syncWhitelist,
565562
channelConf = ChannelConf(
566563
channelFlags = channelFlags,
567564
dustLimit = dustLimitSatoshis,
@@ -659,10 +656,14 @@ object NodeParams extends Logging {
659656
watchSpentWindow = watchSpentWindow,
660657
channelExcludeDuration = FiniteDuration(config.getDuration("router.channel-exclude-duration").getSeconds, TimeUnit.SECONDS),
661658
routerBroadcastInterval = FiniteDuration(config.getDuration("router.broadcast-interval").getSeconds, TimeUnit.SECONDS),
662-
requestNodeAnnouncements = config.getBoolean("router.sync.request-node-announcements"),
663-
encodingType = EncodingType.UNCOMPRESSED,
664-
channelRangeChunkSize = config.getInt("router.sync.channel-range-chunk-size"),
665-
channelQueryChunkSize = config.getInt("router.sync.channel-query-chunk-size"),
659+
syncConf = Router.SyncConf(
660+
requestNodeAnnouncements = config.getBoolean("router.sync.request-node-announcements"),
661+
encodingType = EncodingType.UNCOMPRESSED,
662+
channelRangeChunkSize = config.getInt("router.sync.channel-range-chunk-size"),
663+
channelQueryChunkSize = config.getInt("router.sync.channel-query-chunk-size"),
664+
peerLimit = config.getInt("router.sync.peer-limit"),
665+
whitelist = config.getStringList("router.sync.whitelist").asScala.map(s => PublicKey(ByteVector.fromValidHex(s))).toSet
666+
),
666667
pathFindingExperimentConf = getPathFindingExperimentConf(config.getConfig("router.path-finding.experiments")),
667668
messageRouteParams = getMessageRouteParams(config.getConfig("router.message-path-finding")),
668669
balanceEstimateHalfLife = FiniteDuration(config.getDuration("router.balance-estimate-half-life").getSeconds, TimeUnit.SECONDS),

eclair-core/src/main/scala/fr/acinq/eclair/io/IncomingConnectionsTracker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ private class IncomingConnectionsTracker(nodeParams: NodeParams, switchboard: Ac
6060
Metrics.IncomingConnectionsNoChannels.withoutTags().update(incomingConnections.size)
6161
Behaviors.receiveMessage {
6262
case TrackIncomingConnection(remoteNodeId) =>
63-
if (nodeParams.syncWhitelist.contains(remoteNodeId)) {
63+
if (nodeParams.routerConf.syncConf.whitelist.contains(remoteNodeId)) {
6464
Behaviors.same
6565
} else {
6666
if (incomingConnections.size >= nodeParams.peerConnectionConf.maxNoChannels) {

eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
2020
import akka.actor.typed.scaladsl.Behaviors
2121
import akka.actor.typed.scaladsl.adapter.{ClassicActorContextOps, ClassicActorRefOps, ClassicActorSystemOps, TypedActorRefOps}
2222
import akka.actor.{Actor, ActorContext, ActorLogging, ActorRef, OneForOneStrategy, Props, Stash, Status, SupervisorStrategy, typed}
23-
import fr.acinq.bitcoin.scalacompat.ByteVector32
23+
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi}
2424
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
2525
import fr.acinq.eclair.blockchain.OnchainPubkeyCache
2626
import fr.acinq.eclair.channel.Helpers.Closing
@@ -67,14 +67,19 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory)
6767
(peersWithOnTheFlyFunding -- peersWithChannels.keySet).foreach {
6868
case (remoteNodeId, pending) => createOrGetPeer(remoteNodeId, Set.empty, pending)
6969
}
70+
val peerCapacities = channels.map {
71+
case channelData: ChannelDataWithoutCommitments => (channelData.remoteNodeId, 0L)
72+
case channelData: ChannelDataWithCommitments => (channelData.remoteNodeId, channelData.commitments.capacity.toLong)
73+
}.groupMapReduce[PublicKey, Long](_._1)(_._2)(_ + _)
74+
val topCapacityPeers = peerCapacities.toSeq.sortBy(_._2).takeRight(nodeParams.routerConf.syncConf.peerLimit).map(_._1).toSet
7075
log.info("restoring {} peer(s) with {} channel(s) and {} peers with pending on-the-fly funding", peersWithChannels.size, channels.size, (peersWithOnTheFlyFunding.keySet -- peersWithChannels.keySet).size)
7176
unstashAll()
72-
context.become(normal(peersWithChannels.keySet))
77+
context.become(normal(peersWithChannels.keySet, topCapacityPeers))
7378
case _ =>
7479
stash()
7580
}
7681

77-
def normal(peersWithChannels: Set[PublicKey]): Receive = {
82+
def normal(peersWithChannels: Set[PublicKey], peersToSyncWith: Set[PublicKey]): Receive = {
7883

7984
case Peer.Connect(publicKey, _, _, _) if publicKey == nodeParams.nodeId =>
8085
sender() ! Status.Failure(new RuntimeException("cannot open connection with oneself"))
@@ -110,16 +115,17 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory)
110115
val peer = createOrGetPeer(authenticated.remoteNodeId, offlineChannels = Set.empty, pendingOnTheFlyFunding = Map.empty)
111116
val features = nodeParams.initFeaturesFor(authenticated.remoteNodeId)
112117
val hasChannels = peersWithChannels.contains(authenticated.remoteNodeId)
113-
// if the peer is whitelisted, we sync with them, otherwise we only sync with peers with whom we have at least one channel
114-
val doSync = nodeParams.syncWhitelist.contains(authenticated.remoteNodeId) || (nodeParams.syncWhitelist.isEmpty && hasChannels)
118+
val doSync = peersToSyncWith.contains(authenticated.remoteNodeId) || nodeParams.routerConf.syncConf.whitelist.contains(authenticated.remoteNodeId)
115119
authenticated.peerConnection ! PeerConnection.InitializeConnection(peer, nodeParams.chainHash, features, doSync, nodeParams.liquidityAdsConfig.rates_opt)
116120
if (!hasChannels && !authenticated.outgoing) {
117121
incomingConnectionsTracker ! TrackIncomingConnection(authenticated.remoteNodeId)
118122
}
119123

120-
case ChannelIdAssigned(_, remoteNodeId, _, _) => context.become(normal(peersWithChannels + remoteNodeId))
124+
case ChannelIdAssigned(_, remoteNodeId, _, _) =>
125+
val peersToSyncWith1 = if (peersToSyncWith.size < nodeParams.routerConf.syncConf.peerLimit) peersToSyncWith + remoteNodeId else peersToSyncWith
126+
context.become(normal(peersWithChannels + remoteNodeId, peersToSyncWith1))
121127

122-
case LastChannelClosed(_, remoteNodeId) => context.become(normal(peersWithChannels - remoteNodeId))
128+
case LastChannelClosed(_, remoteNodeId) => context.become(normal(peersWithChannels - remoteNodeId, peersToSyncWith - remoteNodeId))
123129

124130
case GetPeers => sender() ! context.children.filterNot(_ == incomingConnectionsTracker.toClassic)
125131

eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,16 +97,21 @@ object EclairInternalsSerializer {
9797
("ageFactor" | double) ::
9898
("capacityFactor" | double)).as[Graph.MessageWeightRatios]).as[MessageRouteParams]
9999

100-
val routerConfCodec: Codec[RouterConf] = (
101-
("watchSpentWindow" | finiteDurationCodec) ::
102-
("channelExcludeDuration" | finiteDurationCodec) ::
103-
("routerBroadcastInterval" | finiteDurationCodec) ::
104-
("requestNodeAnnouncements" | bool(8)) ::
100+
val syncConfCodec: Codec[Router.SyncConf] = (
101+
("requestNodeAnnouncements" | bool(8)) ::
105102
("encodingType" | discriminated[EncodingType].by(uint8)
106103
.typecase(0, provide(EncodingType.UNCOMPRESSED))
107104
.typecase(1, provide(EncodingType.COMPRESSED_ZLIB))) ::
108105
("channelRangeChunkSize" | int32) ::
109106
("channelQueryChunkSize" | int32) ::
107+
("peerLimit" | int32) ::
108+
("whitelist" | listOfN(uint16, publicKey).xmap[Set[PublicKey]](_.toSet, _.toList))).as[Router.SyncConf]
109+
110+
val routerConfCodec: Codec[RouterConf] = (
111+
("watchSpentWindow" | finiteDurationCodec) ::
112+
("channelExcludeDuration" | finiteDurationCodec) ::
113+
("routerBroadcastInterval" | finiteDurationCodec) ::
114+
("syncConf" | syncConfCodec) ::
110115
("pathFindingExperimentConf" | pathFindingExperimentConfCodec) ::
111116
("messageRouteParams" | messageRouteParamsCodec) ::
112117
("balanceEstimateHalfLife" | finiteDurationCodec)).as[RouterConf]

eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -368,19 +368,23 @@ object Router {
368368
)
369369
}
370370

371+
case class SyncConf(requestNodeAnnouncements: Boolean,
372+
encodingType: EncodingType,
373+
channelRangeChunkSize: Int,
374+
channelQueryChunkSize: Int,
375+
peerLimit: Int,
376+
whitelist: Set[PublicKey]) {
377+
require(channelRangeChunkSize <= Sync.MAXIMUM_CHUNK_SIZE, "channel range chunk size exceeds the size of a lightning message")
378+
require(channelQueryChunkSize <= Sync.MAXIMUM_CHUNK_SIZE, "channel query chunk size exceeds the size of a lightning message")
379+
}
380+
371381
case class RouterConf(watchSpentWindow: FiniteDuration,
372382
channelExcludeDuration: FiniteDuration,
373383
routerBroadcastInterval: FiniteDuration,
374-
requestNodeAnnouncements: Boolean,
375-
encodingType: EncodingType,
376-
channelRangeChunkSize: Int,
377-
channelQueryChunkSize: Int,
384+
syncConf: SyncConf,
378385
pathFindingExperimentConf: PathFindingExperimentConf,
379386
messageRouteParams: MessageRouteParams,
380-
balanceEstimateHalfLife: FiniteDuration) {
381-
require(channelRangeChunkSize <= Sync.MAXIMUM_CHUNK_SIZE, "channel range chunk size exceeds the size of a lightning message")
382-
require(channelQueryChunkSize <= Sync.MAXIMUM_CHUNK_SIZE, "channel query chunk size exceeds the size of a lightning message")
383-
}
387+
balanceEstimateHalfLife: FiniteDuration)
384388

385389
// @formatter:off
386390
case class ChannelDesc private(shortChannelId: ShortChannelId, a: PublicKey, b: PublicKey){

eclair-core/src/main/scala/fr/acinq/eclair/router/Sync.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,11 @@ object Sync {
7777
// keep channel ids that are in [firstBlockNum, firstBlockNum + numberOfBlocks]
7878
val shortChannelIds: SortedSet[RealShortChannelId] = channels.keySet.filter(keep(q.firstBlock, q.numberOfBlocks, _))
7979
log.info("replying with {} items for range=({}, {})", shortChannelIds.size, q.firstBlock, q.numberOfBlocks)
80-
val chunks = split(shortChannelIds, q.firstBlock, q.numberOfBlocks, routerConf.channelRangeChunkSize)
80+
val chunks = split(shortChannelIds, q.firstBlock, q.numberOfBlocks, routerConf.syncConf.channelRangeChunkSize)
8181
Metrics.QueryChannelRange.Replies.withoutTags().record(chunks.size)
8282
chunks.zipWithIndex.foreach { case (chunk, i) =>
8383
val syncComplete = i == chunks.size - 1
84-
val reply = buildReplyChannelRange(chunk, syncComplete, q.chainHash, routerConf.encodingType, q.queryFlags_opt, channels)
84+
val reply = buildReplyChannelRange(chunk, syncComplete, q.chainHash, routerConf.syncConf.encodingType, q.queryFlags_opt, channels)
8585
origin.peerConnection ! reply
8686
Metrics.ReplyChannelRange.Blocks.withTag(Tags.Direction, Tags.Directions.Outgoing).record(reply.numberOfBlocks)
8787
Metrics.ReplyChannelRange.ShortChannelIds.withTag(Tags.Direction, Tags.Directions.Outgoing).record(reply.shortChannelIds.array.size)
@@ -111,7 +111,7 @@ object Sync {
111111
ids match {
112112
case Nil => acc.reverse
113113
case head :: tail =>
114-
val flag = computeFlag(d.channels)(head, timestamps.headOption, checksums.headOption, routerConf.requestNodeAnnouncements)
114+
val flag = computeFlag(d.channels)(head, timestamps.headOption, checksums.headOption, routerConf.syncConf.requestNodeAnnouncements)
115115
// 0 means nothing to query, just don't include it
116116
val acc1 = if (flag != 0) ShortChannelIdAndFlag(head, flag) :: acc else acc
117117
loop(tail, timestamps.drop(1), checksums.drop(1), acc1)
@@ -144,7 +144,7 @@ object Sync {
144144

145145
// we update our sync data to this node (there may be multiple channel range responses and we can only query one set of ids at a time)
146146
val replies = shortChannelIdAndFlags
147-
.grouped(routerConf.channelQueryChunkSize)
147+
.grouped(routerConf.syncConf.channelQueryChunkSize)
148148
.map(buildQuery)
149149
.toList
150150

eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig
2929
import fr.acinq.eclair.payment.relay.OnTheFlyFunding
3030
import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams}
3131
import fr.acinq.eclair.router.Graph.{MessageWeightRatios, PaymentWeightRatios}
32-
import fr.acinq.eclair.router.PathFindingExperimentConf
32+
import fr.acinq.eclair.router.{PathFindingExperimentConf, Router}
3333
import fr.acinq.eclair.router.Router._
3434
import fr.acinq.eclair.wire.protocol._
3535
import org.scalatest.Tag
@@ -114,7 +114,6 @@ object TestConstants {
114114
),
115115
pluginParams = List(pluginParams),
116116
overrideInitFeatures = Map.empty,
117-
syncWhitelist = Set.empty,
118117
channelConf = ChannelConf(
119118
dustLimit = 1100 sat,
120119
maxRemoteDustLimit = 1500 sat,
@@ -197,10 +196,14 @@ object TestConstants {
197196
watchSpentWindow = 1 second,
198197
channelExcludeDuration = 60 seconds,
199198
routerBroadcastInterval = 1 day, // "disables" rebroadcast
200-
requestNodeAnnouncements = true,
201-
encodingType = EncodingType.COMPRESSED_ZLIB,
202-
channelRangeChunkSize = 20,
203-
channelQueryChunkSize = 5,
199+
syncConf = Router.SyncConf(
200+
requestNodeAnnouncements = true,
201+
encodingType = EncodingType.COMPRESSED_ZLIB,
202+
channelRangeChunkSize = 20,
203+
channelQueryChunkSize = 5,
204+
peerLimit = 10,
205+
whitelist = Set.empty
206+
),
204207
pathFindingExperimentConf = PathFindingExperimentConf(Map("alice-test-experiment" -> PathFindingConf(
205208
randomize = false,
206209
boundaries = SearchBoundaries(
@@ -293,7 +296,6 @@ object TestConstants {
293296
),
294297
pluginParams = Nil,
295298
overrideInitFeatures = Map.empty,
296-
syncWhitelist = Set.empty,
297299
channelConf = ChannelConf(
298300
dustLimit = 1000 sat,
299301
maxRemoteDustLimit = 1500 sat,
@@ -376,10 +378,14 @@ object TestConstants {
376378
watchSpentWindow = 1 second,
377379
channelExcludeDuration = 60 seconds,
378380
routerBroadcastInterval = 1 day, // "disables" rebroadcast
379-
requestNodeAnnouncements = true,
380-
encodingType = EncodingType.UNCOMPRESSED,
381-
channelRangeChunkSize = 20,
382-
channelQueryChunkSize = 5,
381+
syncConf = Router.SyncConf(
382+
requestNodeAnnouncements = true,
383+
encodingType = EncodingType.UNCOMPRESSED,
384+
channelRangeChunkSize = 20,
385+
channelQueryChunkSize = 5,
386+
peerLimit = 20,
387+
whitelist = Set.empty
388+
),
383389
pathFindingExperimentConf = PathFindingExperimentConf(Map("bob-test-experiment" -> PathFindingConf(
384390
randomize = false,
385391
boundaries = SearchBoundaries(

0 commit comments

Comments
 (0)