Skip to content

Commit a04f8d2

Browse files
committed
make maxDurationInNonPriorityQueue configurable and none by default
1 parent 5d9478b commit a04f8d2

File tree

3 files changed

+20
-12
lines changed

3 files changed

+20
-12
lines changed

libp2p/protocols/pubsub/gossipsub.nim

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
8383
enablePX: false,
8484
bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps
8585
overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration]),
86-
disconnectPeerAboveRateLimit: false
86+
disconnectPeerAboveRateLimit: false,
87+
maxDurationInNonPriorityQueue: Opt.none(Duration),
8788
)
8889

8990
proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
@@ -751,4 +752,5 @@ method getOrCreatePeer*(
751752
let peer = procCall PubSub(g).getOrCreatePeer(peerId, protos)
752753
g.parameters.overheadRateLimit.withValue(overheadRateLimit):
753754
peer.overheadRateLimitOpt = Opt.some(TokenBucket.new(overheadRateLimit.bytes, overheadRateLimit.interval))
755+
peer.rpcmessagequeue.maxDurationInNonPriorityQueue = g.parameters.maxDurationInNonPriorityQueue
754756
return peer

libp2p/protocols/pubsub/gossipsub/types.nim

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,10 @@ type
147147
overheadRateLimit*: Opt[tuple[bytes: int, interval: Duration]]
148148
disconnectPeerAboveRateLimit*: bool
149149

150+
# The maximum duration a message can stay in the non-priority queue. If it exceeds this duration, it will be discarded
151+
# as soon as it is dequeued, instead of being sent to the remote peer. The default value is none, i.e., no maximum duration.
152+
maxDurationInNonPriorityQueue*: Opt[Duration]
153+
150154
BackoffTable* = Table[string, Table[PeerId, Moment]]
151155
ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]]
152156

libp2p/protocols/pubsub/pubsubpeer.nim

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ type
6767
# Task for processing non-priority message queue.
6868
sendNonPriorityTask: Future[void]
6969
# The max duration a message to be relayed can wait to be sent before it is dropped. The default is 500ms.
70-
maxDurationInNonPriorityQueue: Duration
70+
maxDurationInNonPriorityQueue*: Opt[Duration]
7171

7272
PubSubPeer* = ref object of RootObj
7373
getConn*: GetConn # callback to establish a new send connection
@@ -90,7 +90,7 @@ type
9090
behaviourPenalty*: float64 # the eventual penalty score
9191
overheadRateLimitOpt*: Opt[TokenBucket]
9292

93-
rpcmessagequeue: RpcMessageQueue
93+
rpcmessagequeue*: RpcMessageQueue
9494

9595
RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void]
9696
{.gcsafe, raises: [].}
@@ -385,7 +385,7 @@ proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
385385
proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
386386
while true:
387387
# we send non-priority messages only if there are no pending priority messages
388-
let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst()
388+
let queuedMsg = await p.rpcmessagequeue.nonPriorityQueue.popFirst()
389389
while p.rpcmessagequeue.sendPriorityQueue.len > 0:
390390
p.clearSendPriorityQueue()
391391
# this minimizes the number of times we have to wait for something (each wait = performance cost)
@@ -395,11 +395,12 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
395395
await p.rpcmessagequeue.sendPriorityQueue[^1]
396396
when defined(libp2p_expensive_metrics):
397397
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
398-
if Moment.now() - ttlMsg.addedAt >= p.rpcmessagequeue.maxDurationInNonPriorityQueue:
399-
when defined(libp2p_expensive_metrics):
400-
libp2p_gossipsub_non_priority_msgs_dropped.inc(labelValues = [$p.peerId])
401-
continue
402-
await p.sendMsg(ttlMsg.msg)
398+
p.rpcmessagequeue.maxDurationInNonPriorityQueue.withValue(maxDurationInNonPriorityQueue):
399+
if Moment.now() - queuedMsg.addedAt >= maxDurationInNonPriorityQueue:
400+
when defined(libp2p_expensive_metrics):
401+
libp2p_gossipsub_non_priority_msgs_dropped.inc(labelValues = [$p.peerId])
402+
continue
403+
await p.sendMsg(queuedMsg.msg)
403404

404405
proc startSendNonPriorityTask(p: PubSubPeer) =
405406
debug "starting sendNonPriorityTask", p
@@ -417,7 +418,7 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) =
417418
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
418419
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
419420

420-
proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = 1.seconds): T =
421+
proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = Opt.none(Duration)): T =
421422
return T(
422423
sendPriorityQueue: initDeque[Future[void]](),
423424
nonPriorityQueue: newAsyncQueue[QueuedMessage](),
@@ -431,7 +432,8 @@ proc new*(
431432
onEvent: OnEvent,
432433
codec: string,
433434
maxMessageSize: int,
434-
overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T =
435+
overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket),
436+
maxDurationInNonPriorityQueue = Opt.none(Duration)): T =
435437

436438
result = T(
437439
getConn: getConn,
@@ -441,7 +443,7 @@ proc new*(
441443
connectedFut: newFuture[void](),
442444
maxMessageSize: maxMessageSize,
443445
overheadRateLimitOpt: overheadRateLimitOpt,
444-
rpcmessagequeue: RpcMessageQueue.new(),
446+
rpcmessagequeue: RpcMessageQueue.new(maxDurationInNonPriorityQueue),
445447
)
446448
result.sentIHaves.addFirst(default(HashSet[MessageId]))
447449
result.heDontWants.addFirst(default(HashSet[MessageId]))

0 commit comments

Comments
 (0)