@@ -8,6 +8,10 @@ import (
88 "time"
99
1010 vole "github.com/ipfs-shipyard/vole/lib"
11+ bsmsg "github.com/ipfs/boxo/bitswap/message"
12+ "github.com/ipfs/boxo/bitswap/message/pb"
13+ "github.com/ipfs/boxo/bitswap/network"
14+ "github.com/ipfs/boxo/bitswap/network/httpnet"
1115 "github.com/ipfs/boxo/ipns"
1216 "github.com/ipfs/boxo/routing/http/client"
1317 "github.com/ipfs/boxo/routing/http/contentrouter"
@@ -19,6 +23,7 @@ import (
1923 record "github.com/libp2p/go-libp2p-record"
2024 "github.com/libp2p/go-libp2p/core/host"
2125 "github.com/libp2p/go-libp2p/core/peer"
26+ "github.com/libp2p/go-libp2p/core/peerstore"
2227 "github.com/libp2p/go-libp2p/core/routing"
2328 "github.com/libp2p/go-libp2p/p2p/net/connmgr"
2429 "github.com/multiformats/go-multiaddr"
@@ -140,17 +145,23 @@ type providerOutput struct {
140145 Addrs []string
141146 ConnectionMaddrs []string
142147 DataAvailableOverBitswap BitswapCheckOutput
148+ DataAvailableOverHTTP HTTPCheckOutput
143149 Source string
144150}
145151
146152// runCidCheck finds providers of a given CID, using the DHT and IPNI
147153// concurrently. A check of connectivity and Bitswap availability is performed
148154// for each provider found.
149- func (d * daemon ) runCidCheck (ctx context.Context , cidKey cid.Cid , ipniURL string ) (cidCheckOutput , error ) {
155+ func (d * daemon ) runCidCheck (ctx context.Context , cidKey cid.Cid , ipniURL string , httpRetrieval bool ) (cidCheckOutput , error ) {
156+ protocols := defaultProtocolFilter
157+ if httpRetrieval {
158+ protocols = append (protocols , "transport-ipfs-gateway-http" )
159+ }
160+
150161 crClient , err := client .New (ipniURL ,
151- client .WithStreamResultsRequired (), // // https://specs.ipfs.tech/routing/http-routing-v1/#streaming
152- client .WithProtocolFilter (defaultProtocolFilter ), // IPIP-484
153- client .WithDisabledLocalFiltering (false ), // force local filtering in case remote server does not support IPIP-484
162+ client .WithStreamResultsRequired (), // // https://specs.ipfs.tech/routing/http-routing-v1/#streaming
163+ client .WithProtocolFilter (protocols ), // IPIP-484
164+ client .WithDisabledLocalFiltering (false ), // force local filtering in case remote server does not support IPIP-484
154165 )
155166 if err != nil {
156167 return nil , fmt .Errorf ("failed to create content router client: %w" , err )
@@ -210,6 +221,52 @@ func (d *daemon) runCidCheck(ctx context.Context, cidKey cid.Cid, ipniURL string
210221 wg .Add (1 )
211222 go func (provider peer.AddrInfo , src string ) {
212223 defer wg .Done ()
224+ // Get http retrieval out of the way if this is such
225+ // provider.
226+ httpInfo , otherInfo := network .SplitHTTPAddrs (provider )
227+ if len (httpInfo .Addrs ) > 0 && httpRetrieval {
228+
229+ provOutput := providerOutput {
230+ ID : provider .ID .String (),
231+ DataAvailableOverBitswap : BitswapCheckOutput {
232+ Error : "not a bitswap endpoint" ,
233+ },
234+ DataAvailableOverHTTP : HTTPCheckOutput {},
235+ Source : src ,
236+ }
237+ for _ , ma := range httpInfo .Addrs {
238+ provOutput .Addrs = append (provOutput .Addrs , ma .String ())
239+ }
240+
241+ testHost , err := d .createTestHost ()
242+ if err != nil {
243+ log .Printf ("Error creating test host: %v\n " , err )
244+ return
245+ }
246+ defer testHost .Close ()
247+ httpCheck := checkHTTPRetrieval (ctx , testHost , cidKey , httpInfo )
248+ provOutput .DataAvailableOverHTTP = httpCheck
249+ if ! httpCheck .Connected {
250+ provOutput .ConnectionError = httpCheck .Error
251+ }
252+ for _ , ma := range httpCheck .Endpoints {
253+ provOutput .ConnectionMaddrs = append (provOutput .ConnectionMaddrs , ma .String ())
254+ }
255+
256+ mu .Lock ()
257+ out = append (out , provOutput )
258+ mu .Unlock ()
259+
260+ // Do not continue processing if there are no
261+ // other addresses as we would trigger dht
262+ // lookups etc.
263+ if len (otherInfo .Addrs ) == 0 {
264+ return
265+ }
266+ }
267+
268+ // process non-http providers addresses.
269+ provider = otherInfo
213270
214271 outputAddrs := []string {}
215272 if len (provider .Addrs ) > 0 {
@@ -236,7 +293,10 @@ func (d *daemon) runCidCheck(ctx context.Context, cidKey cid.Cid, ipniURL string
236293 ID : provider .ID .String (),
237294 Addrs : outputAddrs ,
238295 DataAvailableOverBitswap : BitswapCheckOutput {},
239- Source : src ,
296+ DataAvailableOverHTTP : HTTPCheckOutput {
297+ Error : "not an HTTP endpoint" ,
298+ },
299+ Source : src ,
240300 }
241301
242302 testHost , err := d .createTestHost ()
@@ -286,10 +346,17 @@ type peerCheckOutput struct {
286346 ProviderRecordFromPeerInIPNI bool
287347 ConnectionMaddrs []string
288348 DataAvailableOverBitswap BitswapCheckOutput
349+ DataAvailableOverHTTP HTTPCheckOutput
289350}
290351
291352// runPeerCheck checks the connectivity and Bitswap availability of a CID from a given peer (either with just peer ID or specific multiaddr)
292- func (d * daemon ) runPeerCheck (ctx context.Context , ma multiaddr.Multiaddr , ai * peer.AddrInfo , c cid.Cid , ipniURL string ) (* peerCheckOutput , error ) {
353+ func (d * daemon ) runPeerCheck (ctx context.Context , ma multiaddr.Multiaddr , ai peer.AddrInfo , c cid.Cid , ipniURL string , httpRetrieval bool ) (* peerCheckOutput , error ) {
354+ testHost , err := d .createTestHost ()
355+ if err != nil {
356+ return nil , fmt .Errorf ("server error: %w" , err )
357+ }
358+ defer testHost .Close ()
359+
293360 addrMap , peerAddrDHTErr := peerAddrsInDHT (ctx , d .dht , d .dhtMessenger , ai .ID )
294361
295362 var inDHT , inIPNI bool
@@ -303,6 +370,7 @@ func (d *daemon) runPeerCheck(ctx context.Context, ma multiaddr.Multiaddr, ai *p
303370 inIPNI = providerRecordFromPeerInIPNI (ctx , ipniURL , c , ai .ID )
304371 wg .Done ()
305372 }()
373+
306374 wg .Wait ()
307375
308376 out := & peerCheckOutput {
@@ -311,38 +379,63 @@ func (d *daemon) runPeerCheck(ctx context.Context, ma multiaddr.Multiaddr, ai *p
311379 PeerFoundInDHT : addrMap ,
312380 }
313381
382+ // FIXME: without AcceleratedDHT client, we usually timeout the full
383+ // operation context in the DHT steps. Provide early exit in that
384+ // case.
385+ if err := ctx .Err (); err != nil {
386+ return out , err
387+ }
388+
389+ httpInfo , otherInfo := network .SplitHTTPAddrs (ai )
390+
391+ // If they provided an http address and enabled retrieval, try that.
392+ if len (httpInfo .Addrs ) > 0 && httpRetrieval {
393+ httpCheck := checkHTTPRetrieval (ctx , testHost , c , httpInfo )
394+ out .DataAvailableOverHTTP = httpCheck
395+ if ! httpCheck .Connected {
396+ out .ConnectionError = httpCheck .Error
397+ }
398+ for _ , ma := range httpCheck .Endpoints {
399+ out .ConnectionMaddrs = append (out .ConnectionMaddrs , ma .String ())
400+ }
401+ out .DataAvailableOverBitswap = BitswapCheckOutput {
402+ Error : "peer multiaddress is an HTTP endpoint" ,
403+ }
404+ return out , nil
405+ }
406+
407+ out .DataAvailableOverHTTP = HTTPCheckOutput {
408+ Error : "peer multiaddress is not an HTTP endpoint or HTTP check not enabled" ,
409+ }
410+
411+ // non-http peers. Try to connect via p2p etc.
314412 var connectionFailed bool
315413
316- // If peerID given,but no addresses check the DHT
317- if len (ai .Addrs ) == 0 {
414+ // If this is a non-HTTP peer, try with DHT addresses
415+ if len (otherInfo .Addrs ) == 0 {
318416 if peerAddrDHTErr != nil {
319417 // PeerID is not resolvable via the DHT
320418 connectionFailed = true
321419 out .ConnectionError = peerAddrDHTErr .Error ()
322420 }
421+
323422 for a := range addrMap {
324423 ma , err := multiaddr .NewMultiaddr (a )
325424 if err != nil {
326425 log .Println (fmt .Errorf ("error parsing multiaddr %s: %w" , a , err ))
327426 continue
328427 }
329- ai .Addrs = append (ai .Addrs , ma )
428+ otherInfo .Addrs = append (otherInfo .Addrs , ma )
330429 }
331430 }
332431
333- testHost , err := d .createTestHost ()
334- if err != nil {
335- return nil , fmt .Errorf ("server error: %w" , err )
336- }
337- defer testHost .Close ()
338-
339432 if ! connectionFailed {
340433 // Test Is the target connectable
341434 dialCtx , dialCancel := context .WithTimeout (ctx , time .Second * 120 )
342435
343- _ = testHost .Connect (dialCtx , * ai )
436+ _ = testHost .Connect (dialCtx , otherInfo )
344437 // Call NewStream to force NAT hole punching. see https://github.com/libp2p/go-libp2p/issues/2714
345- _ , connErr := testHost .NewStream (dialCtx , ai .ID , "/ipfs/bitswap/1.2.0" , "/ipfs/bitswap/1.1.0" , "/ipfs/bitswap/1.0.0" , "/ipfs/bitswap" )
438+ _ , connErr := testHost .NewStream (dialCtx , otherInfo .ID , "/ipfs/bitswap/1.2.0" , "/ipfs/bitswap/1.1.0" , "/ipfs/bitswap/1.0.0" , "/ipfs/bitswap" )
346439 dialCancel ()
347440 if connErr != nil {
348441 out .ConnectionError = connErr .Error ()
@@ -389,12 +482,118 @@ func checkBitswapCID(ctx context.Context, host host.Host, c cid.Cid, ma multiadd
389482 return out
390483}
391484
485+ type HTTPCheckOutput struct {
486+ Enabled bool
487+ Duration time.Duration
488+ Endpoints []multiaddr.Multiaddr
489+ Connected bool
490+ Requested bool
491+ Found bool
492+ Error string
493+ }
494+
495+ type httpReceiver struct {
496+ msgCh chan bsmsg.BitSwapMessage
497+ errorCh chan error
498+ }
499+
500+ func (recv * httpReceiver ) ReceiveMessage (ctx context.Context , sender peer.ID , incoming bsmsg.BitSwapMessage ) {
501+ recv .msgCh <- incoming
502+ }
503+
504+ func (recv * httpReceiver ) ReceiveError (err error ) {
505+ recv .errorCh <- err
506+ }
507+
508+ func (recv * httpReceiver ) PeerConnected (p peer.ID ) { // nop
509+ }
510+
511+ func (recv * httpReceiver ) PeerDisconnected (p peer.ID ) { // nop
512+ }
513+
514+ // FIXME: could expose this directly in Boxo.
515+ func supportsHEAD (pstore peerstore.Peerstore , p peer.ID ) bool {
516+ v , err := pstore .Get (p , "http-retrieval-head-support" )
517+ if err != nil {
518+ return false
519+ }
520+
521+ b , ok := v .(bool )
522+ return ok && b
523+ }
524+
525+ func checkHTTPRetrieval (ctx context.Context , host host.Host , c cid.Cid , pinfo peer.AddrInfo ) HTTPCheckOutput {
526+ log .Printf ("Start of HTTP check for cid %s by attempting to connect to %s" , c , pinfo )
527+
528+ out := HTTPCheckOutput {
529+ Enabled : true ,
530+ }
531+
532+ htnet := httpnet .New (host ,
533+ httpnet .WithUserAgent (userAgent ),
534+ httpnet .WithResponseHeaderTimeout (5 * time .Second ), // default: 10
535+ httpnet .WithHTTPWorkers (1 ),
536+ )
537+ defer htnet .Stop ()
538+
539+ recv := httpReceiver {
540+ msgCh : make (chan bsmsg.BitSwapMessage ),
541+ errorCh : make (chan error ),
542+ }
543+ htnet .Start (& recv )
544+
545+ pid := pinfo .ID
546+ err := htnet .Connect (ctx , pinfo )
547+ defer htnet .DisconnectFrom (ctx , pid )
548+ if err != nil {
549+ log .Printf ("End of HTTP check for %s: %s" , c , err )
550+ out .Error = err .Error ()
551+ return out
552+ }
553+ out .Connected = true
554+ out .Endpoints = host .Peerstore ().Addrs (pid )
555+
556+ if ! supportsHEAD (host .Peerstore (), pid ) {
557+ log .Printf ("End of HTTP check for %s at %s: no support for HEAD requests" , c , pinfo )
558+ out .Error = "HTTP endpoint does not support HEAD requests"
559+ return out
560+ }
561+
562+ // Now we are in a position of sending a HEAD request.
563+ msg := bsmsg .New (true )
564+ msg .AddEntry (c , 0 , pb .Message_Wantlist_Have , true )
565+ start := time .Now ()
566+ err = htnet .SendMessage (ctx , pid , msg )
567+ out .Requested = true
568+ if err != nil {
569+ log .Printf ("End of HTTP check for %s at . Connected: true. Error: %s" , c , err )
570+ out .Error = err .Error ()
571+ return out
572+ }
573+
574+ waitCtx , cancel := context .WithTimeout (ctx , 5 * time .Second )
575+ defer cancel ()
576+ select {
577+ case <- waitCtx .Done ():
578+ case msg := <- recv .msgCh :
579+ if len (msg .Haves ()) > 0 {
580+ out .Found = true
581+ }
582+
583+ case err = <- recv .errorCh :
584+ out .Error = err .Error ()
585+ }
586+
587+ out .Duration = time .Since (start )
588+ log .Printf ("End of HTTP check for %s at %s. Connected: true. Requested: true. Found: %t. Error: %s" , c , pinfo , out .Found , out .Error )
589+ return out
590+ }
591+
392592func peerAddrsInDHT (ctx context.Context , d kademlia , messenger * dhtpb.ProtocolMessenger , p peer.ID ) (map [string ]int , error ) {
393593 closestPeers , err := d .GetClosestPeers (ctx , string (p ))
394594 if err != nil {
395595 return nil , err
396596 }
397-
398597 resCh := make (chan * peer.AddrInfo , len (closestPeers ))
399598
400599 numSuccessfulResponses := execOnMany (ctx , 0.3 , time .Second * 3 , func (ctx context.Context , peerToQuery peer.ID ) error {
0 commit comments