Skip to content

Commit 461d8c8

Browse files
authored
Gossip: direct peers handling fix (#398)
1 parent 1cde874 commit 461d8c8

File tree

2 files changed

+164
-43
lines changed

2 files changed

+164
-43
lines changed

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt

Lines changed: 60 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,9 @@ open class GossipRouter(
146146
return currentTimeSupplier() < expire - (params.pruneBackoff + params.graftFloodThreshold).toMillis()
147147
}
148148

149-
private fun getDirectPeers() = peers.filter(::isDirect)
149+
private fun getDirectPeers(topic: Topic): List<PeerHandler> {
150+
return getTopicPeers(topic).filter(::isDirect)
151+
}
150152
private fun isDirect(peer: PeerHandler) = scoreParams.peerScoreParams.isDirect(peer.peerId)
151153
private fun isConnected(peerId: PeerId) = peers.any { it.peerId == peerId }
152154

@@ -387,12 +389,18 @@ open class GossipRouter(
387389

388390
override fun broadcastInbound(msgs: List<PubsubMessage>, receivedFrom: PeerHandler) {
389391
msgs.forEach { pubMsg ->
390-
pubMsg.topics
392+
val topics = pubMsg.topics
391393
.asSequence()
394+
395+
val peersFromMesh = topics
392396
.mapNotNull { mesh[it] }
393397
.flatten()
398+
399+
val peersFromDirectPeers = topics.flatMap { getDirectPeers(it) }
400+
401+
peersFromDirectPeers
402+
.plus(peersFromMesh)
394403
.distinct()
395-
.plus(getDirectPeers())
396404
.minus(receivedFrom)
397405
.filterNot { peerDoesNotWantMessage(it, pubMsg.messageId) }
398406
.forEach { submitPublishMessage(it, pubMsg) }
@@ -408,43 +416,9 @@ open class GossipRouter(
408416

409417
val peers =
410418
if (floodPublish) {
411-
msg.topics
412-
.flatMap { getTopicPeers(it) }
413-
.filter { score.score(it.peerId) >= scoreParams.publishThreshold }
414-
.plus(getDirectPeers())
419+
selectPeersForOutboundBroadcastingInFloodPublish(msg)
415420
} else {
416-
msg.topics
417-
.map { topic ->
418-
val topicMeshPeers = mesh[topic]
419-
if (topicMeshPeers != null) {
420-
// we are subscribed to the topic
421-
if (topicMeshPeers.size < params.D) {
422-
// we need extra non-mesh peers for more reliable publishing
423-
val nonMeshTopicPeers = getTopicPeers(topic) - topicMeshPeers
424-
val (nonMeshTopicPeersAbovePublishThreshold, nonMeshTopicPeersBelowPublishThreshold) =
425-
nonMeshTopicPeers.partition { score.score(it.peerId) >= scoreParams.publishThreshold }
426-
// this deviates from the original spec but we want at least D peers for publishing
427-
// prioritizing mesh peers, then non-mesh peers with acceptable score,
428-
// and then underscored non-mesh peers as a last resort
429-
listOf(
430-
topicMeshPeers,
431-
nonMeshTopicPeersAbovePublishThreshold.shuffled(random),
432-
nonMeshTopicPeersBelowPublishThreshold.shuffled(random)
433-
)
434-
.flatten()
435-
.take(params.D)
436-
} else {
437-
topicMeshPeers
438-
}
439-
} else {
440-
// we are not subscribed to the topic
441-
fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D)
442-
.also {
443-
if (it.isNotEmpty()) fanout[topic] = it.toMutableSet()
444-
}
445-
}
446-
}
447-
.flatten()
421+
selectPeersForOutboundBroadcasting(msg)
448422
}
449423

450424
mCache += msg
@@ -468,6 +442,53 @@ open class GossipRouter(
468442
}
469443
}
470444

445+
private fun selectPeersForOutboundBroadcastingInFloodPublish(msg: PubsubMessage): List<PeerHandler> {
446+
return msg.topics
447+
.flatMap { getTopicPeers(it) }
448+
.filter { isDirect(it) || score.score(it.peerId) >= scoreParams.publishThreshold }
449+
}
450+
451+
private fun selectPeersForOutboundBroadcasting(msg: PubsubMessage): List<PeerHandler> {
452+
val fromMesh = msg.topics
453+
.map { topic ->
454+
val topicMeshPeers = mesh[topic]
455+
if (topicMeshPeers != null) {
456+
// we are subscribed to the topic
457+
if (topicMeshPeers.size < params.D) {
458+
// we need extra non-mesh peers for more reliable publishing
459+
val nonMeshTopicPeers = getTopicPeers(topic) - topicMeshPeers
460+
val (nonMeshTopicPeersAbovePublishThreshold, nonMeshTopicPeersBelowPublishThreshold) =
461+
nonMeshTopicPeers.partition { score.score(it.peerId) >= scoreParams.publishThreshold }
462+
// this deviates from the original spec but we want at least D peers for publishing
463+
// prioritizing mesh peers, then non-mesh peers with acceptable score,
464+
// and then underscored non-mesh peers as a last resort
465+
listOf(
466+
topicMeshPeers,
467+
nonMeshTopicPeersAbovePublishThreshold.shuffled(random),
468+
nonMeshTopicPeersBelowPublishThreshold.shuffled(random)
469+
)
470+
.flatten()
471+
.take(params.D)
472+
} else {
473+
topicMeshPeers
474+
}
475+
} else {
476+
// we are not subscribed to the topic
477+
fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D)
478+
.also {
479+
if (it.isNotEmpty()) fanout[topic] = it.toMutableSet()
480+
}
481+
}
482+
}
483+
.flatten()
484+
485+
val fromDirectPeers = msg.topics.flatMap { getDirectPeers(it) }
486+
487+
return fromMesh
488+
.plus(fromDirectPeers)
489+
.distinct()
490+
}
491+
471492
override fun subscribe(topic: Topic) {
472493
super.subscribe(topic)
473494
val fanoutPeers = (fanout[topic] ?: mutableSetOf())

libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt

Lines changed: 104 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class GossipV1_1Tests : GossipTestsBase() {
9191

9292
val api = createPubsubApi(test.gossipRouter)
9393
val apiMessages = mutableListOf<MessageApi>()
94-
api.subscribe(Subscriber { apiMessages += it }, io.libp2p.core.pubsub.Topic("topic2"))
94+
api.subscribe(Subscriber { apiMessages += it }, Topic("topic2"))
9595

9696
val msg1 = Rpc.RPC.newBuilder()
9797
.addPublish(newProtoMessage("topic2", 0L, "Hello-1".toByteArray()))
@@ -131,7 +131,7 @@ class GossipV1_1Tests : GossipTestsBase() {
131131

132132
val api = createPubsubApi(test.gossipRouter)
133133
val apiMessages = mutableListOf<MessageApi>()
134-
api.subscribe(Subscriber { apiMessages += it }, io.libp2p.core.pubsub.Topic("topic1"))
134+
api.subscribe(Subscriber { apiMessages += it }, Topic("topic1"))
135135

136136
val msg1 = Rpc.RPC.newBuilder()
137137
.addPublish(newProtoMessage("topic1", 0L, "Hello-1".toByteArray()))
@@ -653,7 +653,7 @@ class GossipV1_1Tests : GossipTestsBase() {
653653
3,
654654
3,
655655
DLazy = 3,
656-
floodPublishMaxMessageSizeThreshold = 0,
656+
floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH,
657657
gossipFactor = 0.5
658658
)
659659
val peerScoreParams = GossipPeerScoreParams(
@@ -936,7 +936,7 @@ class GossipV1_1Tests : GossipTestsBase() {
936936
receivedMessages += it
937937
validationResult
938938
}
939-
api.subscribe(slowValidator, io.libp2p.core.pubsub.Topic("topic1"))
939+
api.subscribe(slowValidator, Topic("topic1"))
940940
test.mockRouters.forEach { it.subscribe("topic1") }
941941

942942
val gossiper = test.mockRouters[0]
@@ -1307,6 +1307,106 @@ class GossipV1_1Tests : GossipTestsBase() {
13071307
assertTrue(peersReceivedMessage.containsAll(goodScoredPeers))
13081308
}
13091309

1310+
@Test
1311+
fun `should always flood publish to subscribed direct peers`() {
1312+
val message = newMessage("topic1", 0L, "Hello-0".toByteArray())
1313+
val appScore = mutableMapOf<PeerId, Double>().withDefault { 0.0 }
1314+
val directPeers = mutableSetOf<PeerId>()
1315+
val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = ALWAYS_FLOOD_PUBLISH)
1316+
val peerScoreParams = GossipPeerScoreParams(
1317+
appSpecificScore = { appScore.getValue(it) },
1318+
appSpecificWeight = 1.0,
1319+
isDirect = { directPeers.contains(it) }
1320+
)
1321+
val scoreParams = GossipScoreParams(
1322+
peerScoreParams = peerScoreParams,
1323+
graylistThreshold = -15.0,
1324+
publishThreshold = -10.0,
1325+
)
1326+
val test = ManyRoutersTest(mockRouterCount = 10, params = coreParams, scoreParams = scoreParams)
1327+
test.connectAll()
1328+
1329+
test.gossipRouter.subscribe("topic1")
1330+
test.routers.slice(0..5).forEach {
1331+
it.router.subscribe("topic1")
1332+
}
1333+
1334+
test.routers.slice(1..6).forEach {
1335+
directPeers.add(it.peerId)
1336+
}
1337+
1338+
// now only peers from 1 to 5 are direct peers subscribed to the topic
1339+
1340+
test.fuzz.timeController.addTime(2.seconds)
1341+
1342+
// let's down score all peers
1343+
test.routers.forEach {
1344+
appScore[it.peerId] = -20.0
1345+
}
1346+
test.gossipRouter.publish(message)
1347+
1348+
test.fuzz.timeController.addTime(50.millis)
1349+
1350+
val publishedCount = test.mockRouters.flatMap { it.inboundMessages }.count { it.publishCount > 0 }
1351+
1352+
// only subscribed direct peers should receive the message
1353+
assertEquals(5, publishedCount)
1354+
}
1355+
1356+
@Test
1357+
fun `should always publish to subscribed direct peers`() {
1358+
val message = newMessage("topic1", 0L, "Hello-0".toByteArray())
1359+
val appScore = mutableMapOf<PeerId, Double>().withDefault { 0.0 }
1360+
val directPeers = mutableSetOf<PeerId>()
1361+
val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH)
1362+
val peerScoreParams = GossipPeerScoreParams(
1363+
appSpecificScore = { appScore.getValue(it) },
1364+
appSpecificWeight = 1.0,
1365+
isDirect = { directPeers.contains(it) }
1366+
)
1367+
val scoreParams = GossipScoreParams(
1368+
peerScoreParams = peerScoreParams,
1369+
graylistThreshold = -15.0,
1370+
publishThreshold = -10.0,
1371+
)
1372+
val test = ManyRoutersTest(mockRouterCount = 10, params = coreParams, scoreParams = scoreParams)
1373+
test.connectAll()
1374+
1375+
test.gossipRouter.subscribe("topic1")
1376+
1377+
test.routers.slice(0..5).forEach {
1378+
it.router.subscribe("topic1")
1379+
}
1380+
test.routers.slice(1..6).forEach {
1381+
directPeers.add(it.peerId)
1382+
}
1383+
1384+
// now only peers from 1 to 5 are direct peers subscribed to the topic
1385+
val subscribedDirectPeers = test.routers.slice(1..5).map { it.peerId }
1386+
1387+
test.fuzz.timeController.addTime(2.seconds)
1388+
1389+
// let's down score all direct peers
1390+
directPeers.forEach {
1391+
appScore[it] = -20.0
1392+
}
1393+
1394+
val topicMeshRouters = test.gossipRouter.mesh["topic1"]!!.toList()
1395+
1396+
// the mesh is strictly smaller than the number of subscribed direct peers
1397+
assertTrue(topicMeshRouters.size < subscribedDirectPeers.size)
1398+
1399+
val expectedPublishedCount = topicMeshRouters.map { it.peerId }.plus(subscribedDirectPeers).distinct().size
1400+
1401+
test.gossipRouter.publish(message)
1402+
1403+
test.fuzz.timeController.addTime(50.millis)
1404+
1405+
val publishedCount = test.mockRouters.flatMap { it.inboundMessages }.count { it.publishCount > 0 }
1406+
1407+
assertEquals(expectedPublishedCount, publishedCount)
1408+
}
1409+
13101410
private fun createGraftMessage(topic: String): Rpc.RPC {
13111411
return Rpc.RPC.newBuilder().setControl(
13121412
Rpc.ControlMessage.newBuilder().addGraft(

0 commit comments

Comments
 (0)