Skip to content

Commit cfdff90

Browse files
Don't throw NoPeersForOutboundMessageException if peers DONTWANT message (#385)
1 parent a32f486 commit cfdff90

File tree

1 file changed

+12
-7
lines changed

1 file changed

+12
-7
lines changed

libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -416,15 +416,20 @@ open class GossipRouter(
416416
}
417417
.flatten()
418418
}
419-
val list = peers
420-
.filterNot { peerDoesNotWantMessage(it, msg.messageId) }
421-
.map { submitPublishMessage(it, msg) }
422419

423420
mCache += msg
424-
flushAllPending()
425421

426-
return if (list.isNotEmpty()) {
427-
anyComplete(list)
422+
return if (peers.isNotEmpty()) {
423+
val publishedMessages = peers
424+
.filterNot { peerDoesNotWantMessage(it, msg.messageId) }
425+
.map { submitPublishMessage(it, msg) }
426+
if (publishedMessages.isEmpty()) {
427+
// all peers have sent IDONTWANT for this message id
428+
CompletableFuture.completedFuture(Unit)
429+
} else {
430+
flushAllPending()
431+
anyComplete(publishedMessages)
432+
}
428433
} else {
429434
completedExceptionally(
430435
NoPeersForOutboundMessageException("No peers for message topics ${msg.topics} found")
@@ -614,7 +619,7 @@ open class GossipRouter(
614619
.flatten()
615620
.distinct()
616621
.minus(receivedFrom)
617-
.forEach { peer -> sendIdontwant(peer, msg.messageId) }
622+
.forEach { sendIdontwant(it, msg.messageId) }
618623
}
619624

620625
private fun enqueuePrune(peer: PeerHandler, topic: Topic) {

0 commit comments

Comments
 (0)