Skip to content

Commit 7ffddbd

Browse files
committed
feat: gossipsub px support
1 parent 93db700 commit 7ffddbd

File tree

3 files changed

+387
-32
lines changed

3 files changed

+387
-32
lines changed

libp2p/crawler.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,10 @@ type Crawler struct {
4646
cfg *CrawlerConfig
4747
host Host
4848
pm *pb.ProtocolMessenger
49+
psTopics map[string]struct{}
4950
crawledPeers int
5051
client *kubo.Client
52+
stateChan chan string
5153
}
5254

5355
var _ core.Worker[PeerInfo, core.CrawlResult[PeerInfo]] = (*Crawler)(nil)
@@ -61,6 +63,11 @@ func (c *Crawler) Work(ctx context.Context, task PeerInfo) (core.CrawlResult[Pee
6163
logEntry.Debugln("Crawling peer")
6264
defer logEntry.Debugln("Crawled peer")
6365

66+
if c.cfg.GossipSubPX {
67+
c.stateChan <- "busy"
68+
defer func() { c.stateChan <- "idle" }()
69+
}
70+
6471
// adhere to the addr-dial-type command line flag and only work with
6572
// private/public addresses if the user asked for it.
6673
var filterFn func(info peer.AddrInfo) (peer.AddrInfo, peer.AddrInfo)

libp2p/crawler_p2p.go

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ type P2PResult struct {
6262
Transport string
6363
}
6464

65+
// crawlP2P establishes a connection and crawls neighbor info from a peer.
66+
// It returns a channel that streams the crawling results asynchronously.
67+
// The method retrieves routing table, listen addresses, protocols, and agent.
68+
// Connection attempts and errors are tracked for debugging or analysis.
69+
// It supports context cancellation for graceful operation termination.
6570
func (c *Crawler) crawlP2P(ctx context.Context, pi PeerInfo) <-chan P2PResult {
6671
resultCh := make(chan P2PResult)
6772

@@ -85,7 +90,6 @@ func (c *Crawler) crawlP2P(ctx context.Context, pi PeerInfo) <-chan P2PResult {
8590

8691
// If we could successfully connect to the peer we actually crawl it.
8792
if result.ConnectError == nil {
88-
8993
// keep track of the multi address over which we successfully connected
9094
result.ConnectMaddr = conn.RemoteMultiaddr()
9195

@@ -128,6 +132,68 @@ func (c *Crawler) crawlP2P(ctx context.Context, pi PeerInfo) <-chan P2PResult {
128132

129133
// Extract listen addresses
130134
result.ListenMaddrs = ps.Addrs(pi.ID())
135+
136+
if c.cfg.GossipSubPX {
137+
// give the other peer a chance to open a stream to and prune us
138+
streams := openInboundGossipSubStreams(c.host, pi.ID())
139+
140+
// The maximum time to wait for the gossipsub px to complete
141+
maxGossipSubWait := c.cfg.DialTimeout
142+
143+
// the minimum time to wait for the gossipsub px to start
144+
minGossipSubWait := 2 * time.Second
145+
146+
// the time since we're connected to the peer
147+
elapsed := time.Since(result.ConnectEndTime)
148+
149+
// the remaining time until the maximum wait time is reached
150+
remainingWait := maxGossipSubWait - elapsed
151+
152+
// the interval to check the open gossipsub streams
153+
interval := 250 * time.Millisecond
154+
155+
// if 1) we are supposed to wait a little longer for the
156+
// gossipsub exchange (remainingWait > 0) and 2) we either have
157+
// at least one open gossipsubstream or haven't waited long enough
158+
// for such a stream to be there yet then we will enter the for
159+
// loop that waits the calculated remaining time before exiting
160+
// or until we don't have any open gossipsub streams anymore by
161+
// checking every 250ms if there are still any.
162+
163+
if remainingWait > 0 && (streams != 0 || elapsed < minGossipSubWait) {
164+
165+
// if we don't have an open stream yet and the check
166+
// interval is way below the minimum wait time we increase
167+
// the initial ticker delay
168+
initialTickerDelay := interval
169+
if streams == 0 && minGossipSubWait-elapsed > interval {
170+
initialTickerDelay = minGossipSubWait - elapsed
171+
}
172+
173+
timer := time.NewTimer(remainingWait)
174+
ticker := time.NewTicker(initialTickerDelay)
175+
176+
defer timer.Stop()
177+
defer ticker.Stop()
178+
179+
for {
180+
select {
181+
case <-ctx.Done():
182+
// exit for loop because the context was cancelled
183+
case <-timer.C:
184+
// exit for loop because the maximum wait time was reached
185+
case <-ticker.C:
186+
ticker.Reset(interval)
187+
if openInboundGossipSubStreams(c.host, pi.ID()) != 0 {
188+
continue
189+
}
190+
// exit for loop because we don't have any open
191+
// streams despite waiting minGossipSubWait
192+
}
193+
break
194+
}
195+
}
196+
}
131197
} else {
132198
// if there was a connection error, parse it to a known one
133199
result.ConnectErrorStr = db.NetError(result.ConnectError)
@@ -182,7 +248,10 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) (network.Conn,
182248

183249
switch true {
184250
case strings.Contains(err.Error(), db.ErrorStr[pgmodels.NetErrorConnectionRefused]):
185-
// Might be transient because the remote doesn't want us to connect. Try again!
251+
// Might be transient because the remote doesn't want us to connect.
252+
// Try again, but reduce the maximum elapsed time because it's still
253+
// unlikely to succeed
254+
bo.MaxElapsedTime = 2 * c.cfg.DialTimeout
186255
case strings.Contains(err.Error(), db.ErrorStr[pgmodels.NetErrorConnectionGated]):
187256
// Hints at a configuration issue and should not happen, but if it
188257
// does it could be transient. Try again anyway, but at least log a warning.

0 commit comments

Comments
 (0)