Skip to content

Commit 211e951

Browse files
fix: improve async validator handling in Pubsub class (#705)
Signed-off-by: varun-r-mallya <[email protected]>
1 parent ef16f3c commit 211e951

File tree

2 files changed

+5
-5
lines changed

2 files changed

+5
-5
lines changed

libp2p/pubsub/pubsub.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -682,19 +682,18 @@ async def validate_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None:
682682
# TODO: Implement throttle on async validators
683683

684684
if len(async_topic_validators) > 0:
685-
# TODO: Use a better pattern
686-
final_result: bool = True
685+
# Appends to lists are thread safe in CPython
686+
results = []
687687

688688
async def run_async_validator(func: AsyncValidatorFn) -> None:
689-
nonlocal final_result
690689
result = await func(msg_forwarder, msg)
691-
final_result = final_result and result
690+
results.append(result)
692691

693692
async with trio.open_nursery() as nursery:
694693
for async_validator in async_topic_validators:
695694
nursery.start_soon(run_async_validator, async_validator)
696695

697-
if not final_result:
696+
if not all(results):
698697
raise ValidationError(f"Validation failed for msg={msg}")
699698

700699
async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None:

newsfragments/702.bugfix.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed an issue in `Pubsub` where async validators were not handled reliably under concurrency. Now uses a safe aggregator list for consistent behavior.

0 commit comments

Comments
 (0)