Skip to content

Commit e7dde5e

Browse files
authored
Merge pull request #1064 from lla-dane/floodsub
WIP: Improve Floodsub in PubSub module
2 parents 70d83f7 + 941b0e5 commit e7dde5e

File tree

7 files changed

+450
-45
lines changed

7 files changed

+450
-45
lines changed

examples/pubsub/floodsub.py

Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
import argparse
2+
import logging
3+
4+
import base58
5+
import multiaddr
6+
import trio
7+
8+
from libp2p import (
9+
new_host,
10+
)
11+
from libp2p.crypto.rsa import (
12+
create_new_key_pair,
13+
)
14+
from libp2p.custom_types import (
15+
TProtocol,
16+
)
17+
from libp2p.peer.peerinfo import (
18+
info_from_p2p_addr,
19+
)
20+
from libp2p.pubsub.floodsub import FloodSub
21+
from libp2p.pubsub.pubsub import (
22+
Pubsub,
23+
)
24+
from libp2p.stream_muxer.mplex.mplex import (
25+
MPLEX_PROTOCOL_ID,
26+
Mplex,
27+
)
28+
from libp2p.tools.async_service.trio_service import (
29+
background_trio_service,
30+
)
31+
from libp2p.utils.address_validation import (
32+
find_free_port,
33+
)
34+
35+
# Configure logging
36+
logging.basicConfig(
37+
level=logging.INFO, # Set default to DEBUG for more verbose output
38+
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
39+
)
40+
logger = logging.getLogger("floodsub-demo")
41+
CHAT_TOPIC = "floodsub-chat"
42+
FLOODSUB_PROTOCOL_ID = TProtocol("/floodsub/1.0.0")
43+
44+
45+
# Generate a key pair for the node
46+
key_pair = create_new_key_pair()
47+
48+
49+
async def receive_loop(subscription, termination_event):
50+
logger.debug("Starting receive loop")
51+
while not termination_event.is_set():
52+
try:
53+
message = await subscription.get()
54+
logger.info(f"From peer: {base58.b58encode(message.from_id).decode()}")
55+
print(f"Received message: {message.data.decode('utf-8')}")
56+
except Exception:
57+
logger.exception("Error in receive loop")
58+
await trio.sleep(1)
59+
60+
61+
async def publish_loop(pubsub, topic, termination_event):
62+
"""Continuously read input from user and publish to the topic."""
63+
logger.debug("Starting publish loop...")
64+
print("Type messages to send (press Enter to send):")
65+
while not termination_event.is_set():
66+
try:
67+
# Use trio's run_sync_in_worker_thread to avoid blocking the event loop
68+
message = await trio.to_thread.run_sync(input)
69+
if message.lower() == "quit":
70+
termination_event.set() # Signal termination
71+
break
72+
if message:
73+
logger.debug(f"Publishing message: {message}")
74+
await pubsub.publish(topic, message.encode())
75+
print(f"Published: {message}")
76+
except Exception:
77+
logger.exception("Error in publish loop")
78+
await trio.sleep(1) # Avoid tight loop on error
79+
80+
81+
async def monitor_peer_topics(pubsub, nursery, termination_event):
82+
"""
83+
Monitor for new topics that peers are subscribed to and
84+
automatically subscribe the server to those topics.
85+
"""
86+
# Keep track of topics we've already subscribed to
87+
subscribed_topics = set()
88+
89+
while not termination_event.is_set():
90+
# Check for new topics in peer_topics
91+
for topic in pubsub.peer_topics.keys():
92+
if topic not in subscribed_topics:
93+
logger.info(f"Auto-subscribing to new topic: {topic}")
94+
subscription = await pubsub.subscribe(topic)
95+
subscribed_topics.add(topic)
96+
# Start a receive loop for this topic
97+
nursery.start_soon(receive_loop, subscription, termination_event)
98+
99+
# Check every 2 seconds for new topics
100+
await trio.sleep(2)
101+
102+
103+
async def run(topic: str, destination: str | None, port: int | None) -> None:
104+
from libp2p.utils.address_validation import (
105+
get_available_interfaces,
106+
get_optimal_binding_address,
107+
)
108+
109+
if port is None or port == 0:
110+
port = find_free_port()
111+
logger.info(f"Using random available port: {port}")
112+
113+
listen_addrs = get_available_interfaces(port)
114+
115+
# Create a new libp2p host
116+
host = new_host(
117+
key_pair=key_pair,
118+
muxer_opt={MPLEX_PROTOCOL_ID: Mplex},
119+
)
120+
# Log available protocols
121+
logger.debug(f"Host ID: {host.get_id()}")
122+
logger.debug(
123+
f"Host multiselect protocols: "
124+
f"{host.get_mux().get_protocols() if hasattr(host, 'get_mux') else 'N/A'}"
125+
)
126+
# Create and start floodsub
127+
floodsub = FloodSub(
128+
protocols=[FLOODSUB_PROTOCOL_ID],
129+
)
130+
131+
# pubsub = Pubsub(host, gossipsub)
132+
pubsub = Pubsub(host, floodsub)
133+
termination_event = trio.Event() # Event to signal termination
134+
async with host.run(listen_addrs=listen_addrs), trio.open_nursery() as nursery:
135+
# Start the peer-store cleanup task
136+
nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
137+
138+
logger.info(f"Node started with peer ID: {host.get_id()}")
139+
logger.info("Initializing PubSub and GossipSub...")
140+
async with background_trio_service(pubsub):
141+
# async with background_trio_service(gossipsub):
142+
logger.info("Pubsub and Floodsub services started.")
143+
await pubsub.wait_until_ready()
144+
# logger.info("Pubsub ready.")
145+
146+
# Subscribe to the topic
147+
subscription = await pubsub.subscribe(topic)
148+
logger.info(f"Subscribed to topic: {topic}")
149+
150+
if not destination:
151+
# Server mode
152+
# Get all available addresses with peer ID
153+
all_addrs = host.get_addrs()
154+
155+
logger.info("Listener ready, listening on:")
156+
for addr in all_addrs:
157+
logger.info(f"{addr}")
158+
159+
# Use optimal address for the client command
160+
optimal_addr = get_optimal_binding_address(port)
161+
optimal_addr_with_peer = (
162+
f"{optimal_addr}/p2p/{host.get_id().to_string()}"
163+
)
164+
logger.info(
165+
f"\nRun this from the same folder in another console:\n\n"
166+
f" floodsub-demo -d {optimal_addr_with_peer}\n"
167+
)
168+
logger.info("Waiting for peers...")
169+
170+
# Start topic monitoring to auto-subscribe to client topics
171+
nursery.start_soon(
172+
monitor_peer_topics, pubsub, nursery, termination_event
173+
)
174+
175+
# Start message publish and receive loops
176+
nursery.start_soon(receive_loop, subscription, termination_event)
177+
nursery.start_soon(publish_loop, pubsub, topic, termination_event)
178+
else:
179+
# Client mode
180+
maddr = multiaddr.Multiaddr(destination)
181+
protocols_in_maddr = maddr.protocols()
182+
info = info_from_p2p_addr(maddr)
183+
logger.debug(f"Multiaddr protocols: {protocols_in_maddr}")
184+
logger.info(
185+
f"Connecting to peer: {info.peer_id} "
186+
f"using protocols: {protocols_in_maddr}"
187+
)
188+
try:
189+
await host.connect(info)
190+
logger.info(f"Connected to peer: {info.peer_id}")
191+
if logger.isEnabledFor(logging.DEBUG):
192+
await trio.sleep(1)
193+
logger.debug(f"After connection, pubsub.peers: {pubsub.peers}")
194+
peer_protocols = [
195+
floodsub.peer_protocol.get(p) for p in pubsub.peers.keys()
196+
]
197+
logger.debug(f"Peer protocols: {peer_protocols}")
198+
199+
# Start the loops
200+
nursery.start_soon(receive_loop, subscription, termination_event)
201+
nursery.start_soon(publish_loop, pubsub, topic, termination_event)
202+
except Exception:
203+
logger.exception(f"Failed to connect to peer: {info.peer_id}")
204+
return
205+
206+
await termination_event.wait() # Wait for termination signal
207+
208+
# Ensure all tasks are completed before exiting
209+
nursery.cancel_scope.cancel()
210+
211+
print("Application shutdown complete") # Print shutdown message
212+
213+
214+
def main() -> None:
215+
description = """
216+
This program demonstrates a pubsub p2p chat application using libp2p with
217+
the gossipsub protocol as the pubsub router.
218+
To use it, first run 'python pubsub.py -p <PORT> -t <TOPIC>',
219+
where <PORT> is the port number,
220+
and <TOPIC> is the name of the topic you want to subscribe to.
221+
Then, run another instance with 'python pubsub.py -p <ANOTHER_PORT> -t <TOPIC>
222+
-d <DESTINATION>', where <DESTINATION> is the multiaddress of the previous
223+
listener host. Messages typed in either terminal will be received by all peers
224+
subscribed to the same topic.
225+
"""
226+
227+
parser = argparse.ArgumentParser(description=description)
228+
parser.add_argument(
229+
"-t",
230+
"--topic",
231+
type=str,
232+
help="topic name to subscribe",
233+
default=CHAT_TOPIC,
234+
)
235+
236+
parser.add_argument(
237+
"-d",
238+
"--destination",
239+
type=str,
240+
help="Address of peer to connect to",
241+
default=None,
242+
)
243+
244+
parser.add_argument(
245+
"-p",
246+
"--port",
247+
type=int,
248+
help="Port to listen on",
249+
default=None,
250+
)
251+
252+
parser.add_argument(
253+
"-v",
254+
"--verbose",
255+
action="store_true",
256+
help="Enable debug logging",
257+
)
258+
259+
args = parser.parse_args()
260+
261+
# Set debug level if verbose flag is provided
262+
if args.verbose:
263+
logger.setLevel(logging.DEBUG)
264+
logger.debug("Debug logging enabled")
265+
266+
logger.info("Running pubsub chat example...")
267+
logger.info(f"Your selected topic is: {args.topic}")
268+
269+
try:
270+
trio.run(run, *(args.topic, args.destination, args.port))
271+
except KeyboardInterrupt:
272+
logger.info("Application terminated by user")
273+
274+
275+
if __name__ == "__main__":
276+
main()

0 commit comments

Comments
 (0)