Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ import (

"github.com/filecoin-project/go-statemachine"

datatransfer "github.com/filecoin-project/go-data-transfer"
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
)
```

Expand Down
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ This module encapsulates protocols for exchanging piece data between storage cli

**Requires go 1.13**

Install the module in your package or app with `go get "github.com/filecoin-project/go-data-transfer/datatransfer"`
Install the module in your package or app with `go get "github.com/filecoin-project/go-data-transfer/v2/datatransfer"`


### Initialize a data transfer module
Expand All @@ -31,8 +31,8 @@ Install the module in your package or app with `go get "github.com/filecoin-proj

import (
gsimpl "github.com/ipfs/go-graphsync/impl"
datatransfer "github.com/filecoin-project/go-data-transfer/impl"
gstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
datatransfer "github.com/filecoin-project/go-data-transfer/v2/impl"
gstransport "github.com/filecoin-project/go-data-transfer/v2/transport/graphsync"
"github.com/libp2p/go-libp2p-core/host"
)

Expand Down Expand Up @@ -85,7 +85,7 @@ func (vl *myValidator) ValidatePush(
sender peer.ID,
voucher datatransfer.Voucher,
baseCid cid.Cid,
selector ipld.Node) error {
selector datamodel.Node) error {

v := voucher.(*myVoucher)
if v.data == "" || v.data != "validpush" {
Expand All @@ -99,7 +99,7 @@ func (vl *myValidator) ValidatePull(
receiver peer.ID,
voucher datatransfer.Voucher,
baseCid cid.Cid,
selector ipld.Node) error {
selector datamodel.Node) error {

v := voucher.(*myVoucher)
if v.data == "" || v.data != "validpull" {
Expand Down Expand Up @@ -135,7 +135,7 @@ must be sent with the request. Using the trivial examples above:
For more detail, please see the [unit tests](https://github.com/filecoin-project/go-data-transfer/blob/master/impl/impl_test.go).

### Open a Push or Pull Request
For a push or pull request, provide a context, a `datatransfer.Voucher`, a host recipient `peer.ID`, a baseCID `cid.CID` and a selector `ipld.Node`. These
For a push or pull request, provide a context, a `datatransfer.Voucher`, a host recipient `peer.ID`, a baseCID `cid.CID` and a selector `datamodel.Node`. These
calls return a `datatransfer.ChannelID` and any error:
```go
channelID, err := dtm.OpenPullDataChannel(ctx, recipient, voucher, baseCid, selector)
Expand Down
18 changes: 6 additions & 12 deletions benchmarks/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,14 @@ import (
"github.com/ipfs/go-merkledag"
"github.com/ipfs/go-unixfs/importer/balanced"
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
ipldselector "github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/benchmarks/testinstance"
tn "github.com/filecoin-project/go-data-transfer/benchmarks/testnet"
"github.com/filecoin-project/go-data-transfer/testutil"
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
"github.com/filecoin-project/go-data-transfer/v2/benchmarks/testinstance"
tn "github.com/filecoin-project/go-data-transfer/v2/benchmarks/testnet"
"github.com/filecoin-project/go-data-transfer/v2/testutil"
)

const stdBlockSize = 8000
Expand Down Expand Up @@ -77,10 +75,6 @@ func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc,
thisCids := df(ctx, b, instances[:1])
allCids = append(allCids, thisCids...)
}
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)

allSelector := ssb.ExploreRecursive(ipldselector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()

runtime.GC()
b.ResetTimer()
Expand All @@ -105,7 +99,7 @@ func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc,
timer := time.NewTimer(30 * time.Second)
start := time.Now()
for j := 0; j < numfiles; j++ {
_, err := pusher.Manager.OpenPushDataChannel(ctx, receiver.Peer, testutil.NewFakeDTType(), allCids[j], allSelector)
_, err := pusher.Manager.OpenPushDataChannel(ctx, receiver.Peer, testutil.NewTestTypedVoucher(), allCids[j], selectorparse.CommonSelector_ExploreAllRecursively)
if err != nil {
b.Fatalf("received error on request: %s", err.Error())
}
Expand Down
19 changes: 9 additions & 10 deletions benchmarks/testinstance/testinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (
"github.com/ipld/go-ipld-prime"
peer "github.com/libp2p/go-libp2p-core/peer"

datatransfer "github.com/filecoin-project/go-data-transfer"
tn "github.com/filecoin-project/go-data-transfer/benchmarks/testnet"
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
dtnet "github.com/filecoin-project/go-data-transfer/network"
"github.com/filecoin-project/go-data-transfer/testutil"
gstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
tn "github.com/filecoin-project/go-data-transfer/v2/benchmarks/testnet"
dtimpl "github.com/filecoin-project/go-data-transfer/v2/impl"
"github.com/filecoin-project/go-data-transfer/v2/testutil"
gstransport "github.com/filecoin-project/go-data-transfer/v2/transport/graphsync"
dtnet "github.com/filecoin-project/go-data-transfer/v2/transport/helpers/network"
)

// TempDirGenerator is any interface that can generate temporary directories
Expand Down Expand Up @@ -164,8 +164,8 @@ func NewInstance(ctx context.Context, net tn.Network, tempDir string, diskBasedD

linkSystem := storeutil.LinkSystemForBlockstore(bstore)
gs := gsimpl.New(ctx, gsNet, linkSystem, gsimpl.RejectAllRequestsByDefault())
transport := gstransport.NewTransport(p, gs)
dt, err := dtimpl.NewDataTransfer(namespace.Wrap(dstore, datastore.NewKey("/data-transfers/transfers")), dtNet, transport)
transport := gstransport.NewTransport(gs, dtNet)
dt, err := dtimpl.NewDataTransfer(namespace.Wrap(dstore, datastore.NewKey("/data-transfers/transfers")), p, transport)
if err != nil {
return Instance{}, err
}
Expand All @@ -188,8 +188,7 @@ func NewInstance(ctx context.Context, net tn.Network, tempDir string, diskBasedD
sv := testutil.NewStubbedValidator()
sv.StubSuccessPull()
sv.StubSuccessPush()
dt.RegisterVoucherType(testutil.NewFakeDTType(), sv)
dt.RegisterVoucherResultType(testutil.NewFakeDTType())
dt.RegisterVoucherType(testutil.TestVoucherType, sv)
return Instance{
Adapter: dtNet,
Peer: p,
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/testnet/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/libp2p/go-libp2p-core/peer"

dtnet "github.com/filecoin-project/go-data-transfer/network"
dtnet "github.com/filecoin-project/go-data-transfer/v2/transport/helpers/network"
)

// Network is an interface for generating graphsync network interfaces
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/testnet/peernet.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
mockpeernet "github.com/libp2p/go-libp2p/p2p/net/mock"

dtnet "github.com/filecoin-project/go-data-transfer/network"
dtnet "github.com/filecoin-project/go-data-transfer/v2/transport/helpers/network"
)

type peernet struct {
Expand Down
34 changes: 6 additions & 28 deletions channelmonitor/channelmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/channels"
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
"github.com/filecoin-project/go-data-transfer/v2/channels"
)

var log = logging.Logger("dt-chanmon")
Expand All @@ -21,7 +21,6 @@ type monitorAPI interface {
SubscribeToEvents(subscriber datatransfer.Subscriber) datatransfer.Unsubscribe
RestartDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error
CloseDataTransferChannelWithError(ctx context.Context, chid datatransfer.ChannelID, cherr error) error
ConnectTo(context.Context, peer.ID) error
PeerID() peer.ID
}

Expand Down Expand Up @@ -84,18 +83,8 @@ func checkConfig(cfg *Config) {
}
}

// AddPushChannel adds a push channel to the channel monitor
func (m *Monitor) AddPushChannel(chid datatransfer.ChannelID) *monitoredChannel {
return m.addChannel(chid, true)
}

// AddPullChannel adds a pull channel to the channel monitor
func (m *Monitor) AddPullChannel(chid datatransfer.ChannelID) *monitoredChannel {
return m.addChannel(chid, false)
}

// addChannel adds a channel to the channel monitor
func (m *Monitor) addChannel(chid datatransfer.ChannelID, isPush bool) *monitoredChannel {
// AddChannel adds a channel to the channel monitor
func (m *Monitor) AddChannel(chid datatransfer.ChannelID, isPull bool) *monitoredChannel {
if !m.enabled() {
return nil
}
Expand All @@ -106,7 +95,7 @@ func (m *Monitor) addChannel(chid datatransfer.ChannelID, isPush bool) *monitore
// Check if there is already a monitor for this channel
if _, ok := m.channels[chid]; ok {
tp := "push"
if !isPush {
if isPull {
tp = "pull"
}
log.Warnf("ignoring add %s channel %s: %s channel with that id already exists",
Expand Down Expand Up @@ -454,22 +443,11 @@ func (mc *monitoredChannel) doRestartChannel() error {
}

func (mc *monitoredChannel) sendRestartMessage(restartCount int) error {
// Establish a connection to the peer, in case the connection went down.
// Note that at the networking layer there is logic to retry if a network
// connection cannot be established, so this may take some time.
p := mc.chid.OtherParty(mc.mgr.PeerID())
log.Debugf("%s: re-establishing connection to %s", mc.chid, p)
start := time.Now()
err := mc.mgr.ConnectTo(mc.ctx, p)
if err != nil {
return xerrors.Errorf("%s: failed to reconnect to peer %s after %s: %w",
mc.chid, p, time.Since(start), err)
}
log.Debugf("%s: re-established connection to %s in %s", mc.chid, p, time.Since(start))

// Send a restart message for the channel
log.Debugf("%s: sending restart message to %s (%d consecutive restarts)", mc.chid, p, restartCount)
err = mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid)
err := mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid)
if err != nil {
return xerrors.Errorf("%s: failed to send restart message to %s: %w", mc.chid, p, err)
}
Expand Down
Loading