Skip to content

Commit f4783eb

Browse files
authored
Merge pull request #391 from tbenr/floodPublishMaxMessageSizeThreshold
[GOSSIP][BREAKING] Replace `floodPublish` param with `floodPublishMaxMessageSizeThreshold`
2 parents a00728e + 88fe8ae commit f4783eb

File tree

10 files changed

+43
-32
lines changed

10 files changed

+43
-32
lines changed

libp2p/src/main/kotlin/io/libp2p/pubsub/PubsubRouter.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ interface PubsubMessage {
2222
val topics: List<Topic>
2323
get() = protobufMessage.topicIDsList
2424

25+
val size: Int
26+
get() = protobufMessage.data.size()
27+
2528
fun messageSha256() = sha256(protobufMessage.toByteArray())
2629

2730
override fun equals(other: Any?): Boolean

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipParams.kt

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ fun defaultDLazy(D: Int) = D
2222
fun defaultDScore(D: Int) = D * 2 / 3
2323
fun defaultDOut(D: Int, DLow: Int) = min(D / 2, max(DLow - 1, 0))
2424

25+
// floodPublishMaxMessageSizeThreshold shortcuts
26+
const val NEVER_FLOOD_PUBLISH = 0
27+
const val ALWAYS_FLOOD_PUBLISH = Int.MAX_VALUE
28+
2529
/**
2630
* Parameters of Gossip 1.1 router
2731
*/
@@ -112,11 +116,16 @@ data class GossipParams(
112116
val seenTTL: Duration = 2.minutes,
113117

114118
/**
115-
* [floodPublish] is a gossipsub router option that enables flood publishing.
116-
* When this is enabled, published messages are forwarded to all peers with score >=
117-
* to publishThreshold
119+
* [floodPublishMaxMessageSizeThreshold] controls the maximum size (in bytes) a message will be
120+
* published using flood publishing mode.
121+
* When a message size is <= [floodPublishMaxMessageSizeThreshold], published messages are forwarded
122+
* to all peers with score >= to [GossipScoreParams.publishThreshold]
123+
*
124+
* [NEVER_FLOOD_PUBLISH] and [ALWAYS_FLOOD_PUBLISH] can be used as shortcuts.
125+
*
126+
* The default is [NEVER_FLOOD_PUBLISH] (0 KiB).
118127
*/
119-
val floodPublish: Boolean = false,
128+
val floodPublishMaxMessageSizeThreshold: Int = NEVER_FLOOD_PUBLISH,
120129

121130
/**
122131
* [gossipFactor] affects how many peers we will emit gossip to at each heartbeat.
@@ -240,7 +249,7 @@ data class GossipParams(
240249

241250
/**
242251
* [iDontWantMinMessageSizeThreshold] controls the minimum size (in bytes) that an incoming message needs to be so that an IDONTWANT message is sent to mesh peers.
243-
* The default is 16 KB.
252+
* The default is 16 KiB.
244253
*/
245254
val iDontWantMinMessageSizeThreshold: Int = 16384,
246255

@@ -260,6 +269,8 @@ data class GossipParams(
260269
check(DLow <= D, "DLow should be <= D")
261270
check(DHigh >= D, "DHigh should be >= D")
262271
check(gossipFactor in 0.0..1.0, "gossipFactor should be in range [0.0, 1.0]")
272+
check(floodPublishMaxMessageSizeThreshold >= 0, "floodPublishMaxMessageSizeThreshold should be >= 0")
273+
check(iDontWantMinMessageSizeThreshold >= 0, "iDontWantMinMessageSizeThreshold should be >= 0")
263274
}
264275

265276
companion object {

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,8 +400,10 @@ open class GossipRouter(
400400
override fun broadcastOutbound(msg: PubsubMessage): CompletableFuture<Unit> {
401401
msg.topics.forEach { lastPublished[it] = currentTimeSupplier() }
402402

403+
val floodPublish = msg.size <= params.floodPublishMaxMessageSizeThreshold
404+
403405
val peers =
404-
if (params.floodPublish) {
406+
if (floodPublish) {
405407
msg.topics
406408
.flatMap { getTopicPeers(it) }
407409
.filter { score.score(it.peerId) >= scoreParams.publishThreshold }
@@ -613,7 +615,7 @@ open class GossipRouter(
613615

614616
private fun iDontWant(msg: PubsubMessage, receivedFrom: PeerHandler? = null) {
615617
if (!this.protocol.supportsIDontWant()) return
616-
if (msg.protobufMessage.data.size() < params.iDontWantMinMessageSizeThreshold) return
618+
if (msg.size < params.iDontWantMinMessageSizeThreshold) return
617619
// we need to send IDONTWANT messages to mesh peers immediately in order for them to have an effect
618620
msg.topics
619621
.mapNotNull { mesh[it] }

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/builders/GossipParamsBuilder.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ class GossipParamsBuilder {
4040

4141
private var pruneBackoff: Duration? = null
4242

43-
private var floodPublish: Boolean? = null
44-
4543
private var gossipFactor: Double? = null
4644

4745
private var opportunisticGraftPeers: Int? = null
@@ -76,6 +74,8 @@ class GossipParamsBuilder {
7674

7775
private var iDontWantMinMessageSizeThreshold: Int? = null
7876

77+
private var floodPublishMaxMessageSizeThreshold: Int? = null
78+
7979
private var iDontWantTTL: Duration? = null
8080

8181
init {
@@ -90,7 +90,7 @@ class GossipParamsBuilder {
9090
this.maxPeersSentInPruneMsg = source.maxPeersSentInPruneMsg
9191
this.maxPeersAcceptedInPruneMsg = source.maxPeersAcceptedInPruneMsg
9292
this.pruneBackoff = source.pruneBackoff
93-
this.floodPublish = source.floodPublish
93+
this.floodPublishMaxMessageSizeThreshold = source.floodPublishMaxMessageSizeThreshold
9494
this.gossipFactor = source.gossipFactor
9595
this.opportunisticGraftPeers = source.opportunisticGraftPeers
9696
this.opportunisticGraftTicks = source.opportunisticGraftTicks
@@ -141,8 +141,6 @@ class GossipParamsBuilder {
141141

142142
fun pruneBackoff(value: Duration): GossipParamsBuilder = apply { pruneBackoff = value }
143143

144-
fun floodPublish(value: Boolean): GossipParamsBuilder = apply { floodPublish = value }
145-
146144
fun gossipFactor(value: Double): GossipParamsBuilder = apply { gossipFactor = value }
147145

148146
fun opportunisticGraftPeers(value: Int): GossipParamsBuilder = apply {
@@ -185,6 +183,8 @@ class GossipParamsBuilder {
185183

186184
fun iDontWantMinMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { iDontWantMinMessageSizeThreshold = value }
187185

186+
fun floodPublishMaxMessageSizeThreshold(value: Int): GossipParamsBuilder = apply { floodPublishMaxMessageSizeThreshold = value }
187+
188188
fun iDontWantTTL(value: Duration): GossipParamsBuilder = apply { iDontWantTTL = value }
189189

190190
fun build(): GossipParams {
@@ -203,7 +203,7 @@ class GossipParamsBuilder {
203203
gossipHistoryLength = gossipHistoryLength!!,
204204
heartbeatInterval = heartbeatInterval!!,
205205
seenTTL = seenTTL!!,
206-
floodPublish = floodPublish!!,
206+
floodPublishMaxMessageSizeThreshold = floodPublishMaxMessageSizeThreshold!!,
207207
gossipFactor = gossipFactor!!,
208208
opportunisticGraftPeers = opportunisticGraftPeers!!,
209209
opportunisticGraftTicks = opportunisticGraftTicks!!,
@@ -252,7 +252,7 @@ class GossipParamsBuilder {
252252
check(seenTTL != null, { "seenTTL must not be null" })
253253
check(maxPeersSentInPruneMsg != null, { "maxPeersSentInPruneMsg must not be null" })
254254
check(pruneBackoff != null, { "pruneBackoff must not be null" })
255-
check(floodPublish != null, { "floodPublish must not be null" })
255+
check(floodPublishMaxMessageSizeThreshold != null, { "floodPublishMaxMessageSizeThreshold must not be null" })
256256
check(gossipFactor != null, { "gossipFactor must not be null" })
257257
check(opportunisticGraftPeers != null, { "opportunisticGraftPeers must not be null" })
258258
check(opportunisticGraftTicks != null, { "opportunisticGraftTicks must not be null" })

libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipPubsubRouterTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
2222

2323
class GossipPubsubRouterTest : PubsubRouterTest(
2424
createGossipFuzzRouterFactory {
25-
GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublish = false))
25+
GossipRouterBuilder(params = GossipParams(3, 3, 100, floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH))
2626
}
2727
) {
2828

@@ -59,7 +59,7 @@ class GossipPubsubRouterTest : PubsubRouterTest(
5959
// this is to test ihave/iwant
6060
fuzz.timeController.addTime(Duration.ofMillis(1))
6161

62-
val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublish = false)) }
62+
val r = { GossipRouterBuilder(params = GossipParams(3, 3, 3, DOut = 0, DLazy = 1000, floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH)) }
6363
val routerCenter = fuzz.createTestGossipRouter(r)
6464
allRouters.add(0, routerCenter)
6565

libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipV1_1Tests.kt

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -530,8 +530,9 @@ class GossipV1_1Tests : GossipTestsBase() {
530530

531531
@Test
532532
fun testNotFloodPublish() {
533+
val message = newMessage("topic1", 0L, "Hello-0".toByteArray())
533534
val appScore = mutableMapOf<PeerId, Double>().withDefault { 0.0 }
534-
val coreParams = GossipParams(3, 3, 3, floodPublish = false)
535+
val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.size - 1)
535536
val peerScoreParams = GossipPeerScoreParams(appSpecificScore = { appScore.getValue(it) })
536537
val scoreParams = GossipScoreParams(peerScoreParams = peerScoreParams)
537538
val test = ManyRoutersTest(params = coreParams, scoreParams = scoreParams)
@@ -545,7 +546,7 @@ class GossipV1_1Tests : GossipTestsBase() {
545546
val topicMesh = test.gossipRouter.mesh["topic1"]!!
546547
assertTrue(topicMesh.size > 0 && topicMesh.size < test.routers.size)
547548

548-
test.gossipRouter.publish(newMessage("topic1", 0L, "Hello-0".toByteArray()))
549+
test.gossipRouter.publish(message)
549550

550551
test.fuzz.timeController.addTime(50.millis)
551552

@@ -557,8 +558,9 @@ class GossipV1_1Tests : GossipTestsBase() {
557558

558559
@Test
559560
fun testFloodPublish() {
561+
val message = newMessage("topic1", 0L, "Hello-0".toByteArray())
560562
val appScore = mutableMapOf<PeerId, Double>().withDefault { 0.0 }
561-
val coreParams = GossipParams(3, 3, 3, floodPublish = true)
563+
val coreParams = GossipParams(3, 3, 3, floodPublishMaxMessageSizeThreshold = message.size)
562564
val peerScoreParams = GossipPeerScoreParams(
563565
appSpecificScore = { appScore.getValue(it) },
564566
appSpecificWeight = 1.0
@@ -580,7 +582,7 @@ class GossipV1_1Tests : GossipTestsBase() {
580582
val topicMesh = test.gossipRouter.mesh["topic1"]!!.map { it.peerId }
581583
assertTrue(topicMesh.size > 0 && topicMesh.size < test.routers.size)
582584

583-
test.gossipRouter.publish(newMessage("topic1", 0L, "Hello-0".toByteArray()))
585+
test.gossipRouter.publish(message)
584586

585587
test.fuzz.timeController.addTime(50.millis)
586588

@@ -650,7 +652,7 @@ class GossipV1_1Tests : GossipTestsBase() {
650652
3,
651653
3,
652654
DLazy = 3,
653-
floodPublish = false,
655+
floodPublishMaxMessageSizeThreshold = 0,
654656
gossipFactor = 0.5
655657
)
656658
val peerScoreParams = GossipPeerScoreParams(
@@ -714,7 +716,7 @@ class GossipV1_1Tests : GossipTestsBase() {
714716
@Test
715717
fun testOutboundMeshQuotas1() {
716718
val appScore = mutableMapOf<PeerId, Double>().withDefault { 0.0 }
717-
val coreParams = GossipParams(3, 3, 3, DLazy = 3, DOut = 1, floodPublish = false)
719+
val coreParams = GossipParams(3, 3, 3, DLazy = 3, DOut = 1, floodPublishMaxMessageSizeThreshold = NEVER_FLOOD_PUBLISH)
718720
val peerScoreParams = GossipPeerScoreParams(appSpecificScore = { appScore.getValue(it) })
719721
val scoreParams = GossipScoreParams(peerScoreParams = peerScoreParams)
720722
val test = ManyRoutersTest(params = coreParams, scoreParams = scoreParams)

libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/SubscriptionsLimitTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import org.junit.jupiter.api.Test
1010
import org.junit.jupiter.api.assertDoesNotThrow
1111

1212
class SubscriptionsLimitTest : TwoGossipHostTestBase() {
13-
override val params = GossipParams(maxSubscriptions = 5, floodPublish = true)
13+
override val params = GossipParams(maxSubscriptions = 5, floodPublishMaxMessageSizeThreshold = ALWAYS_FLOOD_PUBLISH)
1414

1515
@Test
1616
fun `new peer subscribed to many topics`() {

libp2p/src/testFixtures/kotlin/io/libp2p/pubsub/gossip/Eth2GossipParams.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ val Eth2DefaultGossipParams = GossipParams(
2020
DLazy = 8,
2121

2222
pruneBackoff = 1.minutes,
23-
floodPublish = true,
23+
floodPublishMaxMessageSizeThreshold = 16384,
2424
gossipFactor = 0.25,
2525
DScore = 4,
2626
DOut = 2,

tools/simulator/src/main/kotlin/io/libp2p/simulate/gossip/Eth2GossipParams.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ val Eth2DefaultGossipParams = GossipParams(
2525
DLazy = 8,
2626

2727
pruneBackoff = 1.minutes,
28-
floodPublish = true,
28+
floodPublishMaxMessageSizeThreshold = 16384,
2929
gossipFactor = 0.25,
3030
DScore = 4,
3131
DOut = 2,

tools/simulator/src/main/kotlin/io/libp2p/simulate/main/BlobDecouplingSimulation.kt

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ class BlobDecouplingSimulation(
4141
val randomSeed: Long = 3L,
4242
val rnd: Random = Random(randomSeed),
4343

44-
val floodPublish: Boolean = true,
45-
4644
val sendingPeerBand: Bandwidth = Bandwidth.mbitsPerSec(100),
4745

4846
val peerBands: Iterator<Bandwidth> = iterator {
@@ -83,10 +81,6 @@ class BlobDecouplingSimulation(
8381
)
8482

8583
val gossipParams = Eth2DefaultGossipParams
86-
.copy(
87-
// heartbeatInterval = 1.minutes
88-
floodPublish = floodPublish
89-
)
9084
val gossipScoreParams = Eth2DefaultScoreParams
9185
val gossipRouterCtor = { _: Int ->
9286
SimGossipRouterBuilder().also {
@@ -294,7 +288,6 @@ fun main() {
294288
// logger = {},
295289
nodeCount = 1000,
296290
peerBands = band,
297-
floodPublish = false,
298291
// randomSeed = 2
299292
)
300293

0 commit comments

Comments
 (0)