44 "context"
55 "fmt"
66 "io"
7- "sync"
87 "time"
98
109 logging "github.com/ipfs/go-log/v2"
@@ -13,7 +12,6 @@ import (
1312 "github.com/libp2p/go-libp2p-core/network"
1413 "github.com/libp2p/go-libp2p-core/peer"
1514 "github.com/libp2p/go-libp2p-core/protocol"
16- ma "github.com/multiformats/go-multiaddr"
1715 "golang.org/x/xerrors"
1816
1917 datatransfer "github.com/filecoin-project/go-data-transfer"
@@ -53,8 +51,7 @@ type Option func(*libp2pDataTransferNetwork)
5351// DataTransferProtocols OVERWRITES the default libp2p protocols we use for data transfer with the given protocols.
5452func DataTransferProtocols (protocols []protocol.ID ) Option {
5553 return func (impl * libp2pDataTransferNetwork ) {
56- impl .dtProtocols = nil
57- impl .dtProtocols = append (impl .dtProtocols , protocols ... )
54+ impl .setDataTransferProtocols (protocols )
5855 }
5956}
6057
@@ -87,17 +84,13 @@ func NewFromLibp2pHost(host host.Host, options ...Option) DataTransferNetwork {
8784 minAttemptDuration : defaultMinAttemptDuration ,
8885 maxAttemptDuration : defaultMaxAttemptDuration ,
8986 backoffFactor : defaultBackoffFactor ,
90- dtProtocols : defaultDataTransferProtocols ,
91- peerProtocols : make (map [peer.ID ]protocol.ID ),
9287 }
88+ dataTransferNetwork .setDataTransferProtocols (defaultDataTransferProtocols )
9389
9490 for _ , option := range options {
9591 option (& dataTransferNetwork )
9692 }
9793
98- // Listen to network notifications
99- host .Network ().Notify (& dataTransferNetwork )
100-
10194 return & dataTransferNetwork
10295}
10396
@@ -114,10 +107,8 @@ type libp2pDataTransferNetwork struct {
114107 minAttemptDuration time.Duration
115108 maxAttemptDuration time.Duration
116109 dtProtocols []protocol.ID
110+ dtProtocolStrings []string
117111 backoffFactor float64
118-
119- pplk sync.RWMutex
120- peerProtocols map [peer.ID ]protocol.ID
121112}
122113
123114func (impl * libp2pDataTransferNetwork ) openStream (ctx context.Context , id peer.ID , protocols ... protocol.ID ) (network.Stream , error ) {
@@ -143,9 +134,6 @@ func (impl *libp2pDataTransferNetwork) openStream(ctx context.Context, id peer.I
143134 id , nAttempts , impl .maxStreamOpenAttempts , time .Since (start ))
144135 }
145136
146- // Cache the peer's protocol version
147- impl .setPeerProtocol (id , s .Protocol ())
148-
149137 return s , err
150138 }
151139
@@ -229,10 +217,7 @@ func (dtnet *libp2pDataTransferNetwork) handleNewStream(s network.Stream) {
229217 return
230218 }
231219
232- // Cache the peer's protocol version
233220 p := s .Conn ().RemotePeer ()
234- dtnet .setPeerProtocol (p , s .Protocol ())
235-
236221 for {
237222 var received datatransfer.Message
238223 var err error
@@ -321,12 +306,13 @@ func (dtnet *libp2pDataTransferNetwork) msgToStream(ctx context.Context, s netwo
321306
322307func (impl * libp2pDataTransferNetwork ) Protocol (ctx context.Context , id peer.ID ) (protocol.ID , error ) {
323308 // Check the cache for the peer's protocol version
324- impl .pplk .RLock ()
325- proto , ok := impl .peerProtocols [id ]
326- impl .pplk .RUnlock ()
309+ firstProto , err := impl .host .Peerstore ().FirstSupportedProtocol (id , impl .dtProtocolStrings ... )
310+ if err != nil {
311+ return "" , err
312+ }
327313
328- if ok {
329- return proto , nil
314+ if firstProto != "" {
315+ return protocol . ID ( firstProto ) , nil
330316 }
331317
332318 // The peer's protocol version is not in the cache, so connect to the peer.
@@ -341,27 +327,12 @@ func (impl *libp2pDataTransferNetwork) Protocol(ctx context.Context, id peer.ID)
341327 return s .Protocol (), nil
342328}
343329
344- func (impl * libp2pDataTransferNetwork ) setPeerProtocol (p peer.ID , proto protocol.ID ) {
345- impl .pplk .Lock ()
346- defer impl .pplk .Unlock ()
330+ func (impl * libp2pDataTransferNetwork ) setDataTransferProtocols (protocols []protocol.ID ) {
331+ impl .dtProtocols = append ([]protocol.ID {}, protocols ... )
347332
348- impl .peerProtocols [p ] = proto
349- }
350-
351- func (impl * libp2pDataTransferNetwork ) clearPeerProtocol (p peer.ID ) {
352- impl .pplk .Lock ()
353- defer impl .pplk .Unlock ()
354-
355- delete (impl .peerProtocols , p )
356- }
357-
358- // Clear the peer protocol version cache for the peer when the peer disconnects
359- func (impl * libp2pDataTransferNetwork ) Disconnected (n network.Network , conn network.Conn ) {
360- impl .clearPeerProtocol (conn .RemotePeer ())
333+ // Keep a string version of the protocols for performance reasons
334+ impl .dtProtocolStrings = make ([]string , 0 , len (impl .dtProtocols ))
335+ for _ , proto := range impl .dtProtocols {
336+ impl .dtProtocolStrings = append (impl .dtProtocolStrings , string (proto ))
337+ }
361338}
362-
363- func (impl * libp2pDataTransferNetwork ) Listen (n network.Network , multiaddr ma.Multiaddr ) {}
364- func (impl * libp2pDataTransferNetwork ) ListenClose (n network.Network , multiaddr ma.Multiaddr ) {}
365- func (impl * libp2pDataTransferNetwork ) Connected (n network.Network , conn network.Conn ) {}
366- func (impl * libp2pDataTransferNetwork ) OpenedStream (n network.Network , stream network.Stream ) {}
367- func (impl * libp2pDataTransferNetwork ) ClosedStream (n network.Network , stream network.Stream ) {}
0 commit comments