Skip to content

Commit 1cde874

Browse files
authored
Gossip: more reliable publishing (#387)
* When publishing take extra nodes out of mesh when mesh is not large enough * Add tests for publish extra nodes * When publishing prioritize back up non-mesh peers with acceptable score
1 parent 269af48 commit 1cde874

File tree

2 files changed

+224
-22
lines changed

2 files changed

+224
-22
lines changed

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -281,17 +281,21 @@ open class GossipRouter(
281281
when {
282282
isDirect(peer) ->
283283
prune(peer, topic)
284+
284285
isBackOff(peer, topic) -> {
285286
notifyRouterMisbehavior(peer, 1)
286287
if (isBackOffFlood(peer, topic)) {
287288
notifyRouterMisbehavior(peer, 1)
288289
}
289290
prune(peer, topic)
290291
}
292+
291293
score.score(peer.peerId) < 0 ->
292294
prune(peer, topic)
295+
293296
meshPeers.size >= params.DHigh && !peer.isOutbound() ->
294297
prune(peer, topic)
298+
295299
peer !in meshPeers ->
296300
graft(peer, topic)
297301
}
@@ -410,11 +414,35 @@ open class GossipRouter(
410414
.plus(getDirectPeers())
411415
} else {
412416
msg.topics
413-
.mapNotNull { topic ->
414-
mesh[topic] ?: fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D)
415-
.also {
416-
if (it.isNotEmpty()) fanout[topic] = it.toMutableSet()
417+
.map { topic ->
418+
val topicMeshPeers = mesh[topic]
419+
if (topicMeshPeers != null) {
420+
// we are subscribed to the topic
421+
if (topicMeshPeers.size < params.D) {
422+
// we need extra non-mesh peers for more reliable publishing
423+
val nonMeshTopicPeers = getTopicPeers(topic) - topicMeshPeers
424+
val (nonMeshTopicPeersAbovePublishThreshold, nonMeshTopicPeersBelowPublishThreshold) =
425+
nonMeshTopicPeers.partition { score.score(it.peerId) >= scoreParams.publishThreshold }
426+
// this deviates from the original spec but we want at least D peers for publishing
427+
// prioritizing mesh peers, then non-mesh peers with acceptable score,
428+
// and then underscored non-mesh peers as a last resort
429+
listOf(
430+
topicMeshPeers,
431+
nonMeshTopicPeersAbovePublishThreshold.shuffled(random),
432+
nonMeshTopicPeersBelowPublishThreshold.shuffled(random)
433+
)
434+
.flatten()
435+
.take(params.D)
436+
} else {
437+
topicMeshPeers
417438
}
439+
} else {
440+
// we are not subscribed to the topic
441+
fanout[topic] ?: getTopicPeers(topic).shuffled(random).take(params.D)
442+
.also {
443+
if (it.isNotEmpty()) fanout[topic] = it.toMutableSet()
444+
}
445+
}
418446
}
419447
.flatten()
420448
}

libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt

Lines changed: 192 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ class GossipV1_1Tests : GossipTestsBase() {
125125
super.initChannelWithHandler(streamHandler, handler)
126126
}
127127
}
128+
128129
val test = TwoRoutersTest(mockRouterFactory = { exec, _, _ -> MalformedMockRouter(exec) })
129130
val mockRouter = test.router2.router as MalformedMockRouter
130131

@@ -1128,34 +1129,207 @@ class GossipV1_1Tests : GossipTestsBase() {
11281129
// 2 heartbeats - the topic should be GRAFTed
11291130
test.fuzz.timeController.addTime(2.seconds)
11301131

1131-
fun createPruneMessage(peersCount: Int): Rpc.RPC {
1132-
val peerInfos = List(peersCount) {
1133-
Rpc.PeerInfo.newBuilder()
1134-
.setPeerID(PeerId.random().bytes.toProtobuf())
1135-
.setSignedPeerRecord(ByteString.EMPTY)
1136-
.build()
1137-
}
1138-
return Rpc.RPC.newBuilder().setControl(
1139-
Rpc.ControlMessage.newBuilder().addPrune(
1140-
Rpc.ControlPrune.newBuilder()
1141-
.setTopicID(topic)
1142-
.addAllPeers(peerInfos)
1143-
)
1144-
).build()
1145-
}
1146-
11471132
test.mockRouter.sendToSingle(
1148-
createPruneMessage(test.gossipRouter.params.maxPeersAcceptedInPruneMsg + 1)
1133+
createPruneMessage(topic, test.gossipRouter.params.maxPeersAcceptedInPruneMsg + 1)
11491134
)
11501135

11511136
// prune message should be dropped because too many peers
11521137
assertEquals(1, test.gossipRouter.mesh[topic]!!.size)
11531138

11541139
test.mockRouter.sendToSingle(
1155-
createPruneMessage(test.gossipRouter.params.maxPeersAcceptedInPruneMsg)
1140+
createPruneMessage(topic, test.gossipRouter.params.maxPeersAcceptedInPruneMsg)
11561141
)
11571142

11581143
// prune message should now be processed
11591144
assertEquals(0, test.gossipRouter.mesh[topic]!!.size)
11601145
}
1146+
1147+
@Test
1148+
fun `when a peer leaves the mesh it should still be considered for publishing`() {
1149+
val test = TwoRoutersTest()
1150+
val topic = "topic1"
1151+
1152+
test.mockRouter.subscribe(topic)
1153+
test.gossipRouter.subscribe(topic)
1154+
1155+
// 2 heartbeats - the topic should be GRAFTed
1156+
test.fuzz.timeController.addTime(2.seconds)
1157+
1158+
assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == 1)
1159+
1160+
// remote peer leaves the mesh
1161+
test.mockRouter.sendToSingle(createPruneMessage(topic))
1162+
test.fuzz.timeController.addTime(1.seconds)
1163+
1164+
assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == 0)
1165+
1166+
val message1 = newMessage(topic, 0L, "Hello-0".toByteArray())
1167+
test.gossipRouter.publish(message1)
1168+
1169+
test.mockRouter.waitForMessage { it.publishCount > 0 }
1170+
}
1171+
1172+
@Test
1173+
fun `should publish to all mesh peers when mesh exceeds D`() {
1174+
val gossipParams = GossipParams(D = 6, DHigh = 10)
1175+
val test = ManyRoutersTest(params = gossipParams, mockRouterCount = gossipParams.DHigh)
1176+
val topic = "topic1"
1177+
test.connectAll()
1178+
1179+
test.mockRouters.forEach {
1180+
it.subscribe(topic)
1181+
}
1182+
test.gossipRouter.subscribe(topic)
1183+
1184+
// 2 heartbeats - the topic should be GRAFTed
1185+
test.fuzz.timeController.addTime(2.seconds)
1186+
1187+
assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == gossipParams.D)
1188+
1189+
test.mockRouters.forEach {
1190+
it.sendToSingle(createGraftMessage(topic))
1191+
}
1192+
1193+
test.fuzz.timeController.addTime(2.seconds)
1194+
1195+
assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == gossipParams.DHigh)
1196+
1197+
// remote peer leaves the mesh
1198+
val message1 = newMessage(topic, 0L, "Hello-0".toByteArray())
1199+
test.gossipRouter.publish(message1)
1200+
1201+
val routerReceivedMessageCount =
1202+
test.mockRouters.count { mockRouter ->
1203+
mockRouter.inboundMessages.any { msg ->
1204+
msg.publishCount > 0
1205+
}
1206+
}
1207+
1208+
assertTrue(routerReceivedMessageCount == gossipParams.DHigh)
1209+
}
1210+
1211+
@Test
1212+
fun `publishing should collect at least D peers if mesh is smaller`() {
1213+
val params = GossipParams()
1214+
1215+
val test = ManyRoutersTest(params = params, mockRouterCount = params.D)
1216+
val topic = "topic1"
1217+
test.connectAll()
1218+
1219+
test.mockRouters.forEach { it.subscribe(topic) }
1220+
test.gossipRouter.subscribe(topic)
1221+
1222+
// 2 heartbeats - the topic should be GRAFTed
1223+
test.fuzz.timeController.addTime(2.seconds)
1224+
1225+
val topicMeshRouters = test.gossipRouter.mesh[topic]!!
1226+
assertTrue((topicMeshRouters.size) >= params.DLow)
1227+
1228+
// leave just 2 peers in the mesh
1229+
topicMeshRouters.drop(2)
1230+
.forEach {
1231+
test.getMockRouter(it.peerId).sendToSingle(createPruneMessage(topic))
1232+
}
1233+
test.fuzz.timeController.addTime(1.seconds)
1234+
1235+
assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == 2)
1236+
1237+
val message1 = newMessage(topic, 0L, "Hello-0".toByteArray())
1238+
test.gossipRouter.publish(message1)
1239+
1240+
val routerReceivedMessageCount =
1241+
test.mockRouters.count { mockRouter ->
1242+
mockRouter.inboundMessages.any { msg ->
1243+
msg.publishCount > 0
1244+
}
1245+
}
1246+
1247+
assertTrue(routerReceivedMessageCount >= params.D)
1248+
}
1249+
1250+
@Test
1251+
fun `publishing should collect at least D peers if mesh is smaller and prefer well scored peers`() {
1252+
val params = GossipParams()
1253+
val peerAppScores = mutableMapOf<PeerId, Int>()
1254+
val gossipScoreParams = GossipScoreParams(
1255+
peerScoreParams = GossipPeerScoreParams(
1256+
appSpecificScore = {
1257+
peerAppScores[it]?.toDouble() ?: 0.0
1258+
},
1259+
appSpecificWeight = 1.0
1260+
)
1261+
)
1262+
1263+
val test = ManyRoutersTest(params = params, scoreParams = gossipScoreParams, mockRouterCount = 10)
1264+
val topic = "topic1"
1265+
test.connectAll()
1266+
1267+
test.mockRouters.forEach { it.subscribe(topic) }
1268+
test.gossipRouter.subscribe(topic)
1269+
1270+
// 2 heartbeats - the topic should be GRAFTed
1271+
test.fuzz.timeController.addTime(2.seconds)
1272+
1273+
val topicMeshRouters = test.gossipRouter.mesh[topic]!!.toList()
1274+
assertTrue((topicMeshRouters.size) == params.D)
1275+
1276+
// leave just 2 peers in the mesh
1277+
topicMeshRouters.drop(2)
1278+
.forEach {
1279+
test.getMockRouter(it.peerId).sendToSingle(createPruneMessage(topic))
1280+
}
1281+
// downscore all peers except 5
1282+
val goodScoredPeers = topicMeshRouters.take(5).map { it.peerId }.toSet()
1283+
test.routers
1284+
.map { it.peerId }
1285+
.filter { it !in goodScoredPeers }
1286+
.forEach { peerAppScores[it] = -gossipScoreParams.publishThreshold.toInt() - 1 }
1287+
1288+
// for D = 6: 2 peers in the mesh + 3 peers outside of mesh + others are significantly downscored
1289+
test.fuzz.timeController.addTime(1.seconds)
1290+
1291+
assertTrue((test.gossipRouter.mesh[topic]?.size ?: 0) == 2)
1292+
1293+
val message1 = newMessage(topic, 0L, "Hello-0".toByteArray())
1294+
test.gossipRouter.publish(message1)
1295+
1296+
// router should take 2 mesh peers, 3 well scored peers and 1 peer scored below publishThreshold
1297+
val peersReceivedMessage = test.routers
1298+
.filter {
1299+
val mockRouter = it.router as MockRouter
1300+
mockRouter.inboundMessages.any { msg ->
1301+
msg.publishCount > 0
1302+
}
1303+
}
1304+
.map { it.peerId }
1305+
1306+
assertTrue(peersReceivedMessage.size == params.D)
1307+
assertTrue(peersReceivedMessage.containsAll(goodScoredPeers))
1308+
}
1309+
1310+
private fun createGraftMessage(topic: String): Rpc.RPC {
1311+
return Rpc.RPC.newBuilder().setControl(
1312+
Rpc.ControlMessage.newBuilder().addGraft(
1313+
Rpc.ControlGraft.newBuilder()
1314+
.setTopicID(topic)
1315+
)
1316+
).build()
1317+
}
1318+
1319+
private fun createPruneMessage(topic: String, pxPeersCount: Int = 0): Rpc.RPC {
1320+
val peerInfos = List(pxPeersCount) {
1321+
Rpc.PeerInfo.newBuilder()
1322+
.setPeerID(PeerId.random().bytes.toProtobuf())
1323+
.setSignedPeerRecord(ByteString.EMPTY)
1324+
.build()
1325+
}
1326+
return Rpc.RPC.newBuilder().setControl(
1327+
Rpc.ControlMessage.newBuilder().addPrune(
1328+
Rpc.ControlPrune.newBuilder()
1329+
.setTopicID(topic)
1330+
.setBackoff(10)
1331+
.addAllPeers(peerInfos)
1332+
)
1333+
).build()
1334+
}
11611335
}

0 commit comments

Comments
 (0)