Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 232 additions & 25 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import (
"time"

vole "github.com/ipfs-shipyard/vole/lib"
bsmsg "github.com/ipfs/boxo/bitswap/message"
"github.com/ipfs/boxo/bitswap/message/pb"
"github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/bitswap/network/httpnet"
"github.com/ipfs/boxo/ipns"
"github.com/ipfs/boxo/routing/http/client"
"github.com/ipfs/boxo/routing/http/contentrouter"
Expand All @@ -19,6 +23,7 @@ import (
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -140,17 +145,23 @@ type providerOutput struct {
Addrs []string
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
DataAvailableOverHTTP HTTPCheckOutput
Source string
}

// runCidCheck finds providers of a given CID, using the DHT and IPNI
// concurrently. A check of connectivity and Bitswap availability is performed
// for each provider found.
func (d *daemon) runCidCheck(ctx context.Context, cidKey cid.Cid, ipniURL string) (cidCheckOutput, error) {
func (d *daemon) runCidCheck(ctx context.Context, cidKey cid.Cid, ipniURL string, httpRetrieval bool) (cidCheckOutput, error) {
protocols := defaultProtocolFilter
if httpRetrieval {
protocols = append(protocols, "transport-ipfs-gateway-http")
}

crClient, err := client.New(ipniURL,
client.WithStreamResultsRequired(), // // https://specs.ipfs.tech/routing/http-routing-v1/#streaming
client.WithProtocolFilter(defaultProtocolFilter), // IPIP-484
client.WithDisabledLocalFiltering(false), // force local filtering in case remote server does not support IPIP-484
client.WithStreamResultsRequired(), // // https://specs.ipfs.tech/routing/http-routing-v1/#streaming
client.WithProtocolFilter(protocols), // IPIP-484
client.WithDisabledLocalFiltering(false), // force local filtering in case remote server does not support IPIP-484
)
if err != nil {
return nil, fmt.Errorf("failed to create content router client: %w", err)
Expand Down Expand Up @@ -210,6 +221,54 @@ func (d *daemon) runCidCheck(ctx context.Context, cidKey cid.Cid, ipniURL string
wg.Add(1)
go func(provider peer.AddrInfo, src string) {
defer wg.Done()
// Get http retrieval out of the way if this is such
// provider.
httpInfo, libp2pInfo := network.SplitHTTPAddrs(provider)
if len(httpInfo.Addrs) > 0 && httpRetrieval {

provOutput := providerOutput{
ID: provider.ID.String(),
DataAvailableOverBitswap: BitswapCheckOutput{
Enabled: false,
},
DataAvailableOverHTTP: HTTPCheckOutput{
Enabled: true,
},
Source: src,
}
for _, ma := range httpInfo.Addrs {
provOutput.Addrs = append(provOutput.Addrs, ma.String())
}

testHost, err := d.createTestHost()
if err != nil {
log.Printf("Error creating test host: %v\n", err)
return
}
defer testHost.Close()
httpCheck := checkHTTPRetrieval(ctx, testHost, cidKey, httpInfo)
provOutput.DataAvailableOverHTTP = httpCheck
if !httpCheck.Connected {
provOutput.ConnectionError = httpCheck.Error
}
for _, ma := range httpCheck.Endpoints {
provOutput.ConnectionMaddrs = append(provOutput.ConnectionMaddrs, ma.String())
}

mu.Lock()
out = append(out, provOutput)
mu.Unlock()

// Do not continue processing if there are no
// other addresses as we would trigger dht
// lookups etc.
if len(libp2pInfo.Addrs) == 0 {
return
}
}

// process non-http providers addresses.
provider = libp2pInfo

outputAddrs := []string{}
if len(provider.Addrs) > 0 {
Expand All @@ -233,10 +292,15 @@ func (d *daemon) runCidCheck(ctx context.Context, cidKey cid.Cid, ipniURL string
}

provOutput := providerOutput{
ID: provider.ID.String(),
Addrs: outputAddrs,
DataAvailableOverBitswap: BitswapCheckOutput{},
Source: src,
ID: provider.ID.String(),
Addrs: outputAddrs,
DataAvailableOverBitswap: BitswapCheckOutput{
Enabled: true,
},
DataAvailableOverHTTP: HTTPCheckOutput{
Enabled: false,
},
Source: src,
}

testHost, err := d.createTestHost()
Expand Down Expand Up @@ -286,10 +350,17 @@ type peerCheckOutput struct {
ProviderRecordFromPeerInIPNI bool
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
DataAvailableOverHTTP HTTPCheckOutput
}

// runPeerCheck checks the connectivity and Bitswap availability of a CID from a given peer (either with just peer ID or specific multiaddr)
func (d *daemon) runPeerCheck(ctx context.Context, ma multiaddr.Multiaddr, ai *peer.AddrInfo, c cid.Cid, ipniURL string) (*peerCheckOutput, error) {
func (d *daemon) runPeerCheck(ctx context.Context, ma multiaddr.Multiaddr, ai peer.AddrInfo, c cid.Cid, ipniURL string, httpRetrieval bool) (*peerCheckOutput, error) {
testHost, err := d.createTestHost()
if err != nil {
return nil, fmt.Errorf("server error: %w", err)
}
defer testHost.Close()

addrMap, peerAddrDHTErr := peerAddrsInDHT(ctx, d.dht, d.dhtMessenger, ai.ID)

var inDHT, inIPNI bool
Expand All @@ -303,6 +374,7 @@ func (d *daemon) runPeerCheck(ctx context.Context, ma multiaddr.Multiaddr, ai *p
inIPNI = providerRecordFromPeerInIPNI(ctx, ipniURL, c, ai.ID)
wg.Done()
}()

wg.Wait()

out := &peerCheckOutput{
Expand All @@ -311,38 +383,63 @@ func (d *daemon) runPeerCheck(ctx context.Context, ma multiaddr.Multiaddr, ai *p
PeerFoundInDHT: addrMap,
}

// FIXME: without AcceleratedDHT client, we usually timeout the full
// operation context in the DHT steps. Provide early exit in that
// case.
if err := ctx.Err(); err != nil {
return out, err
}

httpInfo, libp2pInfo := network.SplitHTTPAddrs(ai)

// If they provided an http address and enabled retrieval, try that.
if len(httpInfo.Addrs) > 0 && httpRetrieval {
httpCheck := checkHTTPRetrieval(ctx, testHost, c, httpInfo)
out.DataAvailableOverHTTP = httpCheck
if !httpCheck.Connected {
out.ConnectionError = httpCheck.Error
}
for _, ma := range httpCheck.Endpoints {
out.ConnectionMaddrs = append(out.ConnectionMaddrs, ma.String())
}
out.DataAvailableOverBitswap = BitswapCheckOutput{
Enabled: false,
}
return out, nil
}

out.DataAvailableOverHTTP = HTTPCheckOutput{
Enabled: false,
}

// non-http peers. Try to connect via p2p etc.
var connectionFailed bool

// If peerID given,but no addresses check the DHT
if len(ai.Addrs) == 0 {
// If this is a non-HTTP peer, try with DHT addresses
if len(libp2pInfo.Addrs) == 0 {
if peerAddrDHTErr != nil {
// PeerID is not resolvable via the DHT
connectionFailed = true
out.ConnectionError = peerAddrDHTErr.Error()
}

for a := range addrMap {
ma, err := multiaddr.NewMultiaddr(a)
if err != nil {
log.Println(fmt.Errorf("error parsing multiaddr %s: %w", a, err))
continue
}
ai.Addrs = append(ai.Addrs, ma)
libp2pInfo.Addrs = append(libp2pInfo.Addrs, ma)
}
}

testHost, err := d.createTestHost()
if err != nil {
return nil, fmt.Errorf("server error: %w", err)
}
defer testHost.Close()

if !connectionFailed {
// Test Is the target connectable
dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*120)

_ = testHost.Connect(dialCtx, *ai)
_ = testHost.Connect(dialCtx, libp2pInfo)
// Call NewStream to force NAT hole punching. see https://github.com/libp2p/go-libp2p/issues/2714
_, connErr := testHost.NewStream(dialCtx, ai.ID, "/ipfs/bitswap/1.2.0", "/ipfs/bitswap/1.1.0", "/ipfs/bitswap/1.0.0", "/ipfs/bitswap")
_, connErr := testHost.NewStream(dialCtx, libp2pInfo.ID, "/ipfs/bitswap/1.2.0", "/ipfs/bitswap/1.1.0", "/ipfs/bitswap/1.0.0", "/ipfs/bitswap")
dialCancel()
if connErr != nil {
out.ConnectionError = connErr.Error()
Expand All @@ -362,6 +459,7 @@ func (d *daemon) runPeerCheck(ctx context.Context, ma multiaddr.Multiaddr, ai *p
}

type BitswapCheckOutput struct {
Enabled bool
Duration time.Duration
Found bool
Responded bool
Expand All @@ -370,7 +468,9 @@ type BitswapCheckOutput struct {

func checkBitswapCID(ctx context.Context, host host.Host, c cid.Cid, ma multiaddr.Multiaddr) BitswapCheckOutput {
log.Printf("Start of Bitswap check for cid %s by attempting to connect to ma: %v with the peer: %s", c, ma, host.ID())
out := BitswapCheckOutput{}
out := BitswapCheckOutput{
Enabled: true,
}
start := time.Now()

bsOut, err := vole.CheckBitswapCID(ctx, host, c, ma, false)
Expand All @@ -389,12 +489,120 @@ func checkBitswapCID(ctx context.Context, host host.Host, c cid.Cid, ma multiadd
return out
}

type HTTPCheckOutput struct {
Enabled bool
Duration time.Duration
Endpoints []multiaddr.Multiaddr
Connected bool
Requested bool
Found bool
Error string
}

type httpReceiver struct {
msgCh chan bsmsg.BitSwapMessage
errorCh chan error
}

func (recv *httpReceiver) ReceiveMessage(ctx context.Context, sender peer.ID, incoming bsmsg.BitSwapMessage) {
recv.msgCh <- incoming
}

func (recv *httpReceiver) ReceiveError(err error) {
recv.errorCh <- err
}

func (recv *httpReceiver) PeerConnected(p peer.ID) { // nop
}

func (recv *httpReceiver) PeerDisconnected(p peer.ID) { // nop
}

// FIXME: could expose this directly in Boxo.
func supportsHEAD(pstore peerstore.Peerstore, p peer.ID) bool {
v, err := pstore.Get(p, "http-retrieval-head-support")
if err != nil {
return false
}

b, ok := v.(bool)
return ok && b
}

func checkHTTPRetrieval(ctx context.Context, host host.Host, c cid.Cid, pinfo peer.AddrInfo) HTTPCheckOutput {
log.Printf("Start of HTTP check for cid %s by attempting to connect to %s", c, pinfo)

out := HTTPCheckOutput{
Enabled: true,
}

htnet := httpnet.New(host,
httpnet.WithUserAgent(userAgent),
httpnet.WithResponseHeaderTimeout(5*time.Second), // default: 10
httpnet.WithHTTPWorkers(1),
)
defer htnet.Stop()

recv := httpReceiver{
msgCh: make(chan bsmsg.BitSwapMessage),
errorCh: make(chan error),
}
htnet.Start(&recv)

pid := pinfo.ID
err := htnet.Connect(ctx, pinfo)
defer htnet.DisconnectFrom(ctx, pid)
if err != nil {
log.Printf("End of HTTP check for %s: %s", c, err)
out.Error = err.Error()
return out
}
out.Connected = true
out.Endpoints = host.Peerstore().Addrs(pid)

if !supportsHEAD(host.Peerstore(), pid) {
log.Printf("End of HTTP check for %s at %s: no support for HEAD requests", c, pinfo)
out.Error = "HTTP endpoint does not support HEAD requests"
return out
}

// Now we are in a position of sending a HEAD request.
msg := bsmsg.New(true)
msg.AddEntry(c, 0, pb.Message_Wantlist_Have, true)
start := time.Now()
err = htnet.SendMessage(ctx, pid, msg)
out.Requested = true
if err != nil {
log.Printf("End of HTTP check for %s at . Connected: true. Error: %s", c, err)
out.Error = err.Error()
return out
}

waitCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
select {
case <-waitCtx.Done():
case msg := <-recv.msgCh:
if len(msg.Haves()) > 0 {
out.Found = true
}

case err = <-recv.errorCh:
out.Error = err.Error()
}

out.Duration = time.Since(start)
log.Printf("End of HTTP check for %s at %s. Connected: true. Requested: true. Found: %t. Error: %s", c, pinfo, out.Found, out.Error)
return out
}

func peerAddrsInDHT(ctx context.Context, d kademlia, messenger *dhtpb.ProtocolMessenger, p peer.ID) (map[string]int, error) {
addrMap := make(map[string]int)

closestPeers, err := d.GetClosestPeers(ctx, string(p))
if err != nil {
return nil, err
return addrMap, err
}

resCh := make(chan *peer.AddrInfo, len(closestPeers))

numSuccessfulResponses := execOnMany(ctx, 0.3, time.Second*3, func(ctx context.Context, peerToQuery peer.ID) error {
Expand All @@ -413,10 +621,9 @@ func peerAddrsInDHT(ctx context.Context, d kademlia, messenger *dhtpb.ProtocolMe
close(resCh)

if numSuccessfulResponses == 0 {
return nil, fmt.Errorf("host had trouble querying the DHT")
return addrMap, fmt.Errorf("host had trouble querying the DHT")
}

addrMap := make(map[string]int)
for r := range resCh {
if r == nil {
continue
Expand Down
Loading