Skip to content

Commit 5dafe49

Browse files
make crawler protocol messenger configurable (#1128)
1 parent 944883e commit 5dafe49

File tree

2 files changed

+26
-7
lines changed

2 files changed

+26
-7
lines changed

crawler/crawler.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,13 @@ func NewDefaultCrawler(host host.Host, opts ...Option) (*DefaultCrawler, error)
5252
}
5353
}
5454

55-
pm, err := pb.NewProtocolMessenger(&messageSender{h: host, protocols: o.protocols, timeout: o.perMsgTimeout})
55+
var err error
56+
var pm *pb.ProtocolMessenger
57+
if o.msgSenderBuilder != nil {
58+
pm, err = pb.NewProtocolMessenger(o.msgSenderBuilder(host, o.protocols))
59+
} else {
60+
pm, err = pb.NewProtocolMessenger(&messageSender{h: host, protocols: o.protocols, timeout: o.perMsgTimeout})
61+
}
5662
if err != nil {
5763
return nil, err
5864
}
@@ -192,7 +198,7 @@ func (c *DefaultCrawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo
192198
// Start worker goroutines
193199
var wg sync.WaitGroup
194200
wg.Add(c.parallelism)
195-
for i := 0; i < c.parallelism; i++ {
201+
for range c.parallelism {
196202
go func() {
197203
defer wg.Done()
198204
for p := range jobs {

crawler/options.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,24 @@
11
package crawler
22

33
import (
4+
"slices"
45
"time"
56

67
"github.com/libp2p/go-libp2p-kad-dht/amino"
8+
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
9+
"github.com/libp2p/go-libp2p/core/host"
710
"github.com/libp2p/go-libp2p/core/protocol"
811
)
912

1013
// Option DHT Crawler option type.
1114
type Option func(*options) error
1215

1316
type options struct {
14-
protocols []protocol.ID
15-
parallelism int
16-
connectTimeout time.Duration
17-
perMsgTimeout time.Duration
17+
protocols []protocol.ID
18+
parallelism int
19+
connectTimeout time.Duration
20+
perMsgTimeout time.Duration
21+
msgSenderBuilder func(h host.Host, protos []protocol.ID) pb.MessageSenderWithDisconnect
1822
}
1923

2024
// defaults are the default crawler options. This option will be automatically
@@ -31,7 +35,7 @@ var defaults = func(o *options) error {
3135
// WithProtocols defines the ordered set of protocols the crawler will use to talk to other nodes
3236
func WithProtocols(protocols []protocol.ID) Option {
3337
return func(o *options) error {
34-
o.protocols = append([]protocol.ID{}, protocols...)
38+
o.protocols = slices.Clone(protocols)
3539
return nil
3640
}
3741
}
@@ -59,3 +63,12 @@ func WithConnectTimeout(timeout time.Duration) Option {
5963
return nil
6064
}
6165
}
66+
67+
// WithCustomMessageSender configures the pb.MessageSender of the IpfsDHT to use the
68+
// custom implementation of the pb.MessageSender
69+
func WithCustomMessageSender(messageSenderBuilder func(h host.Host, protos []protocol.ID) pb.MessageSenderWithDisconnect) Option {
70+
return func(o *options) error {
71+
o.msgSenderBuilder = messageSenderBuilder
72+
return nil
73+
}
74+
}

0 commit comments

Comments
 (0)