diff --git a/encoding/encoding.go b/encoding/encoding.go deleted file mode 100644 index 593448df..00000000 --- a/encoding/encoding.go +++ /dev/null @@ -1,122 +0,0 @@ -package encoding - -import ( - "bytes" - "reflect" - - cbor "github.com/ipfs/go-ipld-cbor" - "github.com/ipld/go-ipld-prime" - "github.com/ipld/go-ipld-prime/codec/dagcbor" - cborgen "github.com/whyrusleeping/cbor-gen" - "golang.org/x/xerrors" -) - -// Encodable is an object that can be written to CBOR and decoded back -type Encodable interface{} - -// Encode encodes an encodable to CBOR, using the best available path for -// writing to CBOR -func Encode(value Encodable) ([]byte, error) { - if cbgEncodable, ok := value.(cborgen.CBORMarshaler); ok { - buf := new(bytes.Buffer) - err := cbgEncodable.MarshalCBOR(buf) - if err != nil { - return nil, err - } - return buf.Bytes(), nil - } - if ipldEncodable, ok := value.(ipld.Node); ok { - buf := new(bytes.Buffer) - err := dagcbor.Encode(ipldEncodable, buf) - if err != nil { - return nil, err - } - return buf.Bytes(), nil - } - return cbor.DumpObject(value) -} - -// Decoder is CBOR decoder for a given encodable type -type Decoder interface { - DecodeFromCbor([]byte) (Encodable, error) -} - -// NewDecoder creates a new Decoder that will decode into new instances of the given -// object type. It will use the decoding that is optimal for that type -// It returns error if it's not possible to setup a decoder for this type -func NewDecoder(decodeType Encodable) (Decoder, error) { - // check if type is ipld.Node, if so, just use style - if ipldDecodable, ok := decodeType.(ipld.Node); ok { - return &ipldDecoder{ipldDecodable.Prototype()}, nil - } - // check if type is a pointer, as we need that to make new copies - // for cborgen types & regular IPLD types - decodeReflectType := reflect.TypeOf(decodeType) - if decodeReflectType.Kind() != reflect.Ptr { - return nil, xerrors.New("type must be a pointer") - } - // check if type is a cbor-gen type - if _, ok := decodeType.(cborgen.CBORUnmarshaler); ok { - return &cbgDecoder{decodeReflectType}, nil - } - // type does is neither ipld-prime nor cbor-gen, so we need to see if it - // can rountrip with oldschool ipld-format - encoded, err := cbor.DumpObject(decodeType) - if err != nil { - return nil, xerrors.New("Object type did not encode") - } - newDecodable := reflect.New(decodeReflectType.Elem()).Interface() - if err := cbor.DecodeInto(encoded, newDecodable); err != nil { - return nil, xerrors.New("Object type did not decode") - } - return &defaultDecoder{decodeReflectType}, nil -} - -type ipldDecoder struct { - style ipld.NodePrototype -} - -func (decoder *ipldDecoder) DecodeFromCbor(encoded []byte) (Encodable, error) { - builder := decoder.style.NewBuilder() - buf := bytes.NewReader(encoded) - err := dagcbor.Decode(builder, buf) - if err != nil { - return nil, err - } - return builder.Build(), nil -} - -type cbgDecoder struct { - cbgType reflect.Type -} - -func (decoder *cbgDecoder) DecodeFromCbor(encoded []byte) (Encodable, error) { - decodedValue := reflect.New(decoder.cbgType.Elem()) - decoded, ok := decodedValue.Interface().(cborgen.CBORUnmarshaler) - if !ok || reflect.ValueOf(decoded).IsNil() { - return nil, xerrors.New("problem instantiating decoded value") - } - buf := bytes.NewReader(encoded) - err := decoded.UnmarshalCBOR(buf) - if err != nil { - return nil, err - } - return decoded, nil -} - -type defaultDecoder struct { - ptrType reflect.Type -} - -func (decoder *defaultDecoder) DecodeFromCbor(encoded []byte) (Encodable, error) { - decodedValue := reflect.New(decoder.ptrType.Elem()) - decoded, ok := decodedValue.Interface().(Encodable) - if !ok || reflect.ValueOf(decoded).IsNil() { - return nil, xerrors.New("problem instantiating decoded value") - } - err := cbor.DecodeInto(encoded, decoded) - if err != nil { - return nil, err - } - return decoded, nil -} diff --git a/encoding/encoding_test.go b/encoding/encoding_test.go deleted file mode 100644 index 43a66f5d..00000000 --- a/encoding/encoding_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package encoding_test - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "github.com/filecoin-project/go-data-transfer/encoding" - "github.com/filecoin-project/go-data-transfer/encoding/testdata" -) - -func TestRoundTrip(t *testing.T) { - testCases := map[string]struct { - val encoding.Encodable - }{ - "can encode/decode IPLD prime types": { - val: testdata.Prime, - }, - "can encode/decode cbor-gen types": { - val: testdata.Cbg, - }, - "can encode/decode old ipld format types": { - val: testdata.Standard, - }, - } - for testCase, data := range testCases { - t.Run(testCase, func(t *testing.T) { - encoded, err := encoding.Encode(data.val) - require.NoError(t, err) - decoder, err := encoding.NewDecoder(data.val) - require.NoError(t, err) - decoded, err := decoder.DecodeFromCbor(encoded) - require.NoError(t, err) - require.Equal(t, data.val, decoded) - }) - } -} diff --git a/encoding/testdata/testdata.go b/encoding/testdata/testdata.go deleted file mode 100644 index 5bed37ba..00000000 --- a/encoding/testdata/testdata.go +++ /dev/null @@ -1,37 +0,0 @@ -package testdata - -import ( - cbor "github.com/ipfs/go-ipld-cbor" - "github.com/ipld/go-ipld-prime/fluent" - basicnode "github.com/ipld/go-ipld-prime/node/basic" -) - -// Prime = an instance of an ipld prime piece of data -var Prime = fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) { - nva := na.AssembleEntry("X") - nva.AssignInt(100) - nva = na.AssembleEntry("Y") - nva.AssignString("appleSauce") -}) - -type standardType struct { - X int - Y string -} - -func init() { - cbor.RegisterCborType(standardType{}) -} - -// Standard = an instance that is neither ipld prime nor cbor -var Standard *standardType = &standardType{X: 100, Y: "appleSauce"} - -//go:generate cbor-gen-for cbgType - -type cbgType struct { - X uint64 - Y string -} - -// Cbg = an instance of a cbor-gen type -var Cbg *cbgType = &cbgType{X: 100, Y: "appleSauce"} diff --git a/encoding/testdata/testdata_cbor_gen.go b/encoding/testdata/testdata_cbor_gen.go deleted file mode 100644 index 67c6c688..00000000 --- a/encoding/testdata/testdata_cbor_gen.go +++ /dev/null @@ -1,84 +0,0 @@ -// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. - -package testdata - -import ( - "fmt" - "io" - - cbg "github.com/whyrusleeping/cbor-gen" - xerrors "golang.org/x/xerrors" -) - -var _ = xerrors.Errorf - -func (t *cbgType) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{130}); err != nil { - return err - } - - // t.X (uint64) (uint64) - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.X))); err != nil { - return err - } - - // t.Y (string) (string) - if len(t.Y) > cbg.MaxLength { - return xerrors.Errorf("Value in field t.Y was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Y)))); err != nil { - return err - } - if _, err := w.Write([]byte(t.Y)); err != nil { - return err - } - return nil -} - -func (t *cbgType) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 2 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.X (uint64) (uint64) - - { - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.X = uint64(extra) - - } - // t.Y (string) (string) - - { - sval, err := cbg.ReadString(br) - if err != nil { - return err - } - - t.Y = string(sval) - } - return nil -} diff --git a/go.mod b/go.mod index 6e624766..5aac35d2 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/ipfs/go-cid v0.1.0 github.com/ipfs/go-datastore v0.5.1 github.com/ipfs/go-ds-badger v0.3.0 - github.com/ipfs/go-graphsync v0.12.0 + github.com/ipfs/go-graphsync v0.12.1-0.20220216203242-e66b39d441e9 github.com/ipfs/go-ipfs-blockstore v1.1.2 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 @@ -25,7 +25,7 @@ require ( github.com/ipfs/go-log/v2 v2.5.0 github.com/ipfs/go-merkledag v0.5.1 github.com/ipfs/go-unixfs v0.3.1 - github.com/ipld/go-ipld-prime v0.14.4 + github.com/ipld/go-ipld-prime v0.14.5-0.20220204050122-679d74376a0d github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c github.com/jpillora/backoff v1.0.0 github.com/libp2p/go-libp2p v0.18.0-rc1 diff --git a/go.sum b/go.sum index cbe33715..20ff6b28 100644 --- a/go.sum +++ b/go.sum @@ -208,6 +208,7 @@ github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2 github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzPPUss= github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= +github.com/frankban/quicktest v1.14.1/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -295,6 +296,7 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -420,6 +422,8 @@ github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1 github.com/ipfs/go-ds-leveldb v0.5.0/go.mod h1:d3XG9RUDzQ6V4SHi8+Xgj9j1XuEk1z82lquxrVbml/Q= github.com/ipfs/go-graphsync v0.12.0 h1:QCsVHVzb9FTkcm3NEa8GjXnUeGit1L9s08HcSVQ4m/g= github.com/ipfs/go-graphsync v0.12.0/go.mod h1:nASYWYETgsnMbQ3+DirNImOHQ8TY0a5AhAqyOY55tUg= +github.com/ipfs/go-graphsync v0.12.1-0.20220216203242-e66b39d441e9 h1:U7Jt8nPvA4ihdqAduZ+X77bZUouOWiWs2ATmqkM8psY= +github.com/ipfs/go-graphsync v0.12.1-0.20220216203242-e66b39d441e9/go.mod h1:DQwkDxlrxhxUAp0Zpv0WATAQsr0WrpFiImqjbbCy/LU= github.com/ipfs/go-ipfs-blockstore v0.2.1/go.mod h1:jGesd8EtCM3/zPgx+qr0/feTXGUeRai6adgwC+Q+JvE= github.com/ipfs/go-ipfs-blockstore v1.1.2 h1:WCXoZcMYnvOTmlpX+RSSnhVN0uCmbWTeepTGX5lgiXw= github.com/ipfs/go-ipfs-blockstore v1.1.2/go.mod h1:w51tNR9y5+QXB0wkNcHt4O2aSZjTdqaEWaQdSxEyUOY= @@ -498,6 +502,8 @@ github.com/ipld/go-ipld-prime v0.11.0/go.mod h1:+WIAkokurHmZ/KwzDOMUuoeJgaRQktHt github.com/ipld/go-ipld-prime v0.14.0/go.mod h1:9ASQLwUFLptCov6lIYc70GRB4V7UTyLD0IJtrDJe6ZM= github.com/ipld/go-ipld-prime v0.14.4 h1:bqhmume8+nbNsX4/+J6eohktfZHAI8GKrF3rQ0xgOyc= github.com/ipld/go-ipld-prime v0.14.4/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704nH0RDcQtgTP0= +github.com/ipld/go-ipld-prime v0.14.5-0.20220204050122-679d74376a0d h1:HMvFmQbipEXniV3cRdqnkrsvAlKYMjEPbvvKN3mWsDE= +github.com/ipld/go-ipld-prime v0.14.5-0.20220204050122-679d74376a0d/go.mod h1:f5ls+uUY8Slf1NN6YUOeEyYe3TA/J02Rn7zw1NQTeSk= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73/go.mod h1:2PJ0JgxyB08t0b2WKrcuqI3di0V+5n6RS/LTUJhkoxY= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= diff --git a/impl/integration_test.go b/impl/integration_test.go index 158827f8..31541ef5 100644 --- a/impl/integration_test.go +++ b/impl/integration_test.go @@ -1784,7 +1784,7 @@ func (fgsr *fakeGraphSyncReceiver) consumeResponses(ctx context.Context, t *test t.Fail() case gsMessageReceived = <-fgsr.receivedMessages: responses := gsMessageReceived.message.Responses() - if (len(responses) > 0) && gsmsg.IsTerminalResponseCode(responses[0].Status()) { + if (len(responses) > 0) && responses[0].Status().IsTerminal() { return responses[0].Status() } } @@ -1841,7 +1841,7 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) { require.NoError(t, err) extData := buf.Bytes() - request := gsmsg.NewRequest(graphsync.RequestID(rand.Int31()), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{ + request := gsmsg.NewRequest(graphsync.NewRequestID(), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{ Name: extension.ExtensionDataTransfer1_1, Data: extData, }) @@ -1852,7 +1852,7 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) { require.NoError(t, gsData.GsNet2.SendMessage(ctx, host1.ID(), gsmessage)) status := gsr.consumeResponses(ctx, t) - require.False(t, gsmsg.IsTerminalFailureCode(status)) + require.False(t, status.IsTerminal()) }) t.Run("when no request is initiated", func(t *testing.T) { @@ -1863,7 +1863,7 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) { require.NoError(t, err) extData := buf.Bytes() - request := gsmsg.NewRequest(graphsync.RequestID(rand.Int31()), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{ + request := gsmsg.NewRequest(graphsync.NewRequestID(), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{ Name: extension.ExtensionDataTransfer1_1, Data: extData, }) @@ -1874,7 +1874,7 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) { require.NoError(t, gsData.GsNet2.SendMessage(ctx, host1.ID(), gsmessage)) status := gsr.consumeResponses(ctx, t) - require.True(t, gsmsg.IsTerminalFailureCode(status)) + require.True(t, status.IsTerminal()) }) } @@ -1923,7 +1923,7 @@ func TestResponseHookWhenExtensionNotFound(t *testing.T) { case <-r.messageReceived: } - request := gsmsg.NewRequest(graphsync.RequestID(rand.Int31()), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31())) + request := gsmsg.NewRequest(graphsync.NewRequestID(), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31())) builder := gsmsg.NewBuilder() builder.AddRequest(request) gsmessage, err := builder.Build() @@ -1931,7 +1931,7 @@ func TestResponseHookWhenExtensionNotFound(t *testing.T) { require.NoError(t, gsData.GsNet2.SendMessage(ctx, host1.ID(), gsmessage)) status := gsr.consumeResponses(ctx, t) - assert.False(t, gsmsg.IsTerminalFailureCode(status)) + assert.False(t, status.IsTerminal()) }) } @@ -1959,7 +1959,7 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) { require.NoError(t, err) extData := buf.Bytes() - gsRequest := gsmsg.NewRequest(graphsync.RequestID(rand.Int31()), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{ + gsRequest := gsmsg.NewRequest(graphsync.NewRequestID(), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{ Name: extension.ExtensionDataTransfer1_1, Data: extData, }) @@ -1971,7 +1971,7 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) { require.NoError(t, err) require.NoError(t, gsData.GsNet1.SendMessage(ctx, gsData.Host2.ID(), gsmessage)) status := gsr.consumeResponses(ctx, t) - require.False(t, gsmsg.IsTerminalFailureCode(status)) + require.False(t, status.IsTerminal()) }, }, "When request is initiated, but fails validation": { @@ -1990,7 +1990,7 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) { err = dtRequest.ToNet(buf) require.NoError(t, err) extData := buf.Bytes() - request := gsmsg.NewRequest(graphsync.RequestID(rand.Int31()), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{ + request := gsmsg.NewRequest(graphsync.NewRequestID(), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{ Name: extension.ExtensionDataTransfer1_1, Data: extData, }) @@ -2003,7 +2003,7 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) { // because there was no previous request require.NoError(t, gsData.GsNet1.SendMessage(ctx, gsData.Host2.ID(), gsmessage)) status := gsr.consumeResponses(ctx, t) - require.True(t, gsmsg.IsTerminalFailureCode(status)) + require.True(t, status.IsTerminal()) }, }, } diff --git a/message.go b/message.go index ad61db54..4b2359f7 100644 --- a/message.go +++ b/message.go @@ -5,10 +5,9 @@ import ( "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/schema" "github.com/libp2p/go-libp2p-core/protocol" - cborgen "github.com/whyrusleeping/cbor-gen" - - "github.com/filecoin-project/go-data-transfer/encoding" ) var ( @@ -27,9 +26,8 @@ type Message interface { IsPaused() bool IsCancel() bool TransferID() TransferID - cborgen.CBORMarshaler - cborgen.CBORUnmarshaler ToNet(w io.Writer) error + AsNode() schema.TypedNode MessageForProtocol(targetProtocol protocol.ID) (newMsg Message, err error) } @@ -38,8 +36,8 @@ type Request interface { Message IsPull() bool IsVoucher() bool - VoucherType() TypeIdentifier - Voucher(decoder encoding.Decoder) (encoding.Encodable, error) + VoucherType() schema.TypeName + Voucher(proto datamodel.NodePrototype) (datamodel.Node, error) BaseCid() cid.Cid Selector() (ipld.Node, error) IsRestartExistingChannelRequest() bool @@ -52,7 +50,7 @@ type Response interface { IsVoucherResult() bool IsComplete() bool Accepted() bool - VoucherResultType() TypeIdentifier - VoucherResult(decoder encoding.Decoder) (encoding.Encodable, error) + VoucherResultType() schema.TypeName + VoucherResult(proto datamodel.NodePrototype) (datamodel.Node, error) EmptyVoucherResult() bool } diff --git a/message/message1_1/message.go b/message/message1_1/message.go index cf60dc4a..676e3707 100644 --- a/message/message1_1/message.go +++ b/message/message1_1/message.go @@ -4,7 +4,7 @@ import ( "io" "github.com/ipfs/go-cid" - "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/datamodel" cborgen "github.com/whyrusleeping/cbor-gen" xerrors "golang.org/x/xerrors" @@ -14,34 +14,25 @@ import ( ) // NewRequest generates a new request for the data transfer protocol -func NewRequest(id datatransfer.TransferID, isRestart bool, isPull bool, vtype datatransfer.TypeIdentifier, voucher encoding.Encodable, baseCid cid.Cid, selector ipld.Node) (datatransfer.Request, error) { - vbytes, err := encoding.Encode(voucher) - if err != nil { - return nil, xerrors.Errorf("Creating request: %w", err) - } +func NewRequest(id datatransfer.TransferID, isRestart bool, isPull bool, voucher datatransfer.Registerable, baseCid cid.Cid, selector datamodel.Node) (datatransfer.Request, error) { if baseCid == cid.Undef { return nil, xerrors.Errorf("base CID must be defined") } - selBytes, err := encoding.Encode(selector) - if err != nil { - return nil, xerrors.Errorf("Error encoding selector") - } - - var typ uint64 + var typ int64 if isRestart { - typ = uint64(types.RestartMessage) + typ = int64(types.RestartMessage) } else { - typ = uint64(types.NewMessage) + typ = int64(types.NewMessage) } return &transferRequest1_1{ Type: typ, Pull: isPull, - Vouch: &cborgen.Deferred{Raw: vbytes}, - Stor: &cborgen.Deferred{Raw: selBytes}, + Vouch: voucher.Representation(), + Stor: selector, BCid: &baseCid, - VTyp: vtype, - XferID: uint64(id), + VTyp: v.type, + XferID: int64(id), }, nil } diff --git a/message/message1_1/schema.go b/message/message1_1/schema.go new file mode 100644 index 00000000..57287232 --- /dev/null +++ b/message/message1_1/schema.go @@ -0,0 +1,25 @@ +package message1_1 + +import ( + _ "embed" + + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/node/bindnode" + "github.com/ipld/go-ipld-prime/schema" +) + +//go:embed schema.ipldsch +var embedSchema []byte + +var Prototype struct { + Message schema.TypedPrototype +} + +func init() { + ts, err := ipld.LoadSchemaBytes(embedSchema) + if err != nil { + panic(err) + } + + Prototype.Message = bindnode.Prototype((*transferMessage1_1)(nil), ts.TypeByName("transferMessage1_1")) +} diff --git a/message/message1_1/schema.ipldsch b/message/message1_1/schema.ipldsch new file mode 100644 index 00000000..df1706cf --- /dev/null +++ b/message/message1_1/schema.ipldsch @@ -0,0 +1,5 @@ +################################################################################ +### DataTransfer message format v1.1 ### +################################################################################ + +# TBD fill in from go types \ No newline at end of file diff --git a/message/message1_1/transfer_message.go b/message/message1_1/transfer_message.go index ac55e1ae..6479b916 100644 --- a/message/message1_1/transfer_message.go +++ b/message/message1_1/transfer_message.go @@ -4,6 +4,9 @@ import ( "io" datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/ipld/go-ipld-prime/codec/dagcbor" + "github.com/ipld/go-ipld-prime/node/bindnode" + "github.com/ipld/go-ipld-prime/schema" ) //go:generate cbor-gen-for --map-encoding transferMessage1_1 @@ -31,8 +34,12 @@ func (tm *transferMessage1_1) TransferID() datatransfer.TransferID { return tm.Response.TransferID() } +func (tm *transferMessage1_1) AsNode() schema.TypedNode { + return bindnode.Wrap(tm, Prototype.Message.Type()) +} + // ToNet serializes a transfer message type. It is simply a wrapper for MarshalCBOR, to provide // symmetry with FromNet func (tm *transferMessage1_1) ToNet(w io.Writer) error { - return tm.MarshalCBOR(w) + return dagcbor.Encode(tm.AsNode().Representation(), w) } diff --git a/message/message1_1/transfer_message_cbor_gen.go b/message/message1_1/transfer_message_cbor_gen.go deleted file mode 100644 index b4f4fdbe..00000000 --- a/message/message1_1/transfer_message_cbor_gen.go +++ /dev/null @@ -1,179 +0,0 @@ -// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. - -package message1_1 - -import ( - "fmt" - "io" - "sort" - - cid "github.com/ipfs/go-cid" - cbg "github.com/whyrusleeping/cbor-gen" - xerrors "golang.org/x/xerrors" -) - -var _ = xerrors.Errorf -var _ = cid.Undef -var _ = sort.Sort - -func (t *transferMessage1_1) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{163}); err != nil { - return err - } - - scratch := make([]byte, 9) - - // t.IsRq (bool) (bool) - if len("IsRq") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"IsRq\" was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("IsRq"))); err != nil { - return err - } - if _, err := io.WriteString(w, string("IsRq")); err != nil { - return err - } - - if err := cbg.WriteBool(w, t.IsRq); err != nil { - return err - } - - // t.Request (message1_1.transferRequest1_1) (struct) - if len("Request") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"Request\" was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("Request"))); err != nil { - return err - } - if _, err := io.WriteString(w, string("Request")); err != nil { - return err - } - - if err := t.Request.MarshalCBOR(w); err != nil { - return err - } - - // t.Response (message1_1.transferResponse1_1) (struct) - if len("Response") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"Response\" was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("Response"))); err != nil { - return err - } - if _, err := io.WriteString(w, string("Response")); err != nil { - return err - } - - if err := t.Response.MarshalCBOR(w); err != nil { - return err - } - return nil -} - -func (t *transferMessage1_1) UnmarshalCBOR(r io.Reader) error { - *t = transferMessage1_1{} - - br := cbg.GetPeeker(r) - scratch := make([]byte, 8) - - maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) - if err != nil { - return err - } - if maj != cbg.MajMap { - return fmt.Errorf("cbor input should be of type map") - } - - if extra > cbg.MaxLength { - return fmt.Errorf("transferMessage1_1: map struct too large (%d)", extra) - } - - var name string - n := extra - - for i := uint64(0); i < n; i++ { - - { - sval, err := cbg.ReadStringBuf(br, scratch) - if err != nil { - return err - } - - name = string(sval) - } - - switch name { - // t.IsRq (bool) (bool) - case "IsRq": - - maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) - if err != nil { - return err - } - if maj != cbg.MajOther { - return fmt.Errorf("booleans must be major type 7") - } - switch extra { - case 20: - t.IsRq = false - case 21: - t.IsRq = true - default: - return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) - } - // t.Request (message1_1.transferRequest1_1) (struct) - case "Request": - - { - - b, err := br.ReadByte() - if err != nil { - return err - } - if b != cbg.CborNull[0] { - if err := br.UnreadByte(); err != nil { - return err - } - t.Request = new(transferRequest1_1) - if err := t.Request.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("unmarshaling t.Request pointer: %w", err) - } - } - - } - // t.Response (message1_1.transferResponse1_1) (struct) - case "Response": - - { - - b, err := br.ReadByte() - if err != nil { - return err - } - if b != cbg.CborNull[0] { - if err := br.UnreadByte(); err != nil { - return err - } - t.Response = new(transferResponse1_1) - if err := t.Response.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("unmarshaling t.Response pointer: %w", err) - } - } - - } - - default: - // Field doesn't exist on this type, so ignore it - cbg.ScanForLinks(r, func(cid.Cid) {}) - } - } - - return nil -} diff --git a/message/message1_1/transfer_request.go b/message/message1_1/transfer_request.go index 0b3020d2..d793e5c8 100644 --- a/message/message1_1/transfer_request.go +++ b/message/message1_1/transfer_request.go @@ -1,38 +1,42 @@ package message1_1 import ( - "bytes" "io" "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/codec/dagcbor" - basicnode "github.com/ipld/go-ipld-prime/node/basic" + "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/node/bindnode" + "github.com/ipld/go-ipld-prime/schema" + peer "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" - cbg "github.com/whyrusleeping/cbor-gen" xerrors "golang.org/x/xerrors" datatransfer "github.com/filecoin-project/go-data-transfer" - "github.com/filecoin-project/go-data-transfer/encoding" "github.com/filecoin-project/go-data-transfer/message/types" ) -//go:generate cbor-gen-for --map-encoding transferRequest1_1 +type channelID struct { + Initiator peer.ID + Responder peer.ID + ID int64 +} // transferRequest1_1 is a struct for the 1.1 Data Transfer Protocol that fulfills the datatransfer.Request interface. // its members are exported to be used by cbor-gen type transferRequest1_1 struct { BCid *cid.Cid - Type uint64 + Type int64 Paus bool Part bool Pull bool - Stor *cbg.Deferred - Vouch *cbg.Deferred - VTyp datatransfer.TypeIdentifier - XferID uint64 + Stor datamodel.Node + Vouch datamodel.Node + VTyp schema.TypeName + XferID int64 - RestartChannel datatransfer.ChannelID + RestartChannel channelID } func (trq *transferRequest1_1) MessageForProtocol(targetProtocol protocol.ID) (datatransfer.Message, error) { @@ -50,30 +54,34 @@ func (trq *transferRequest1_1) IsRequest() bool { } func (trq *transferRequest1_1) IsRestart() bool { - return trq.Type == uint64(types.RestartMessage) + return trq.Type == int64(types.RestartMessage) } func (trq *transferRequest1_1) IsRestartExistingChannelRequest() bool { - return trq.Type == uint64(types.RestartExistingChannelRequestMessage) + return trq.Type == int64(types.RestartExistingChannelRequestMessage) } func (trq *transferRequest1_1) RestartChannelId() (datatransfer.ChannelID, error) { if !trq.IsRestartExistingChannelRequest() { return datatransfer.ChannelID{}, xerrors.New("not a restart request") } - return trq.RestartChannel, nil + return datatransfer.ChannelID{ + Initiator: trq.RestartChannel.Initiator, + Responder: trq.RestartChannel.Responder, + ID: datatransfer.TransferID(trq.RestartChannel.ID), + }, nil } func (trq *transferRequest1_1) IsNew() bool { - return trq.Type == uint64(types.NewMessage) + return trq.Type == int64(types.NewMessage) } func (trq *transferRequest1_1) IsUpdate() bool { - return trq.Type == uint64(types.UpdateMessage) + return trq.Type == int64(types.UpdateMessage) } func (trq *transferRequest1_1) IsVoucher() bool { - return trq.Type == uint64(types.VoucherMessage) || trq.Type == uint64(types.NewMessage) + return trq.Type == int64(types.VoucherMessage) || trq.Type == int64(types.NewMessage) } func (trq *transferRequest1_1) IsPaused() bool { @@ -91,16 +99,21 @@ func (trq *transferRequest1_1) IsPull() bool { } // VoucherType returns the Voucher ID -func (trq *transferRequest1_1) VoucherType() datatransfer.TypeIdentifier { +func (trq *transferRequest1_1) VoucherType() schema.TypeName { return trq.VTyp } // Voucher returns the Voucher bytes -func (trq *transferRequest1_1) Voucher(decoder encoding.Decoder) (encoding.Encodable, error) { +func (trq *transferRequest1_1) Voucher(proto datamodel.NodePrototype) (datamodel.Node, error) { if trq.Vouch == nil { return nil, xerrors.New("No voucher present to read") } - return decoder.DecodeFromCbor(trq.Vouch.Raw) + nb := proto.NewBuilder() + err := nb.AssignNode(trq.Vouch) + if err != nil { + return nil, err + } + return nb.Build(), nil } func (trq *transferRequest1_1) EmptyVoucher() bool { @@ -120,18 +133,12 @@ func (trq *transferRequest1_1) Selector() (ipld.Node, error) { if trq.Stor == nil { return nil, xerrors.New("No selector present to read") } - builder := basicnode.Prototype.Any.NewBuilder() - reader := bytes.NewReader(trq.Stor.Raw) - err := dagcbor.Decode(builder, reader) - if err != nil { - return nil, xerrors.Errorf("Error decoding selector: %w", err) - } - return builder.Build(), nil + return trq.Stor, nil } // IsCancel returns true if this is a cancel request func (trq *transferRequest1_1) IsCancel() bool { - return trq.Type == uint64(types.CancelMessage) + return trq.Type == int64(types.CancelMessage) } // IsPartial returns true if this is a partial request @@ -139,13 +146,17 @@ func (trq *transferRequest1_1) IsPartial() bool { return trq.Part } -// ToNet serializes a transfer request. It's a wrapper for MarshalCBOR to provide -// symmetry with FromNet -func (trq *transferRequest1_1) ToNet(w io.Writer) error { - msg := transferMessage1_1{ +func (trq *transferRequest1_1) AsNode() schema.TypedNode { + msg := &transferMessage1_1{ IsRq: true, Request: trq, Response: nil, } - return msg.MarshalCBOR(w) + return bindnode.Wrap(msg, Prototype.Message.Type()) +} + +// ToNet serializes a transfer request. It's a wrapper for MarshalCBOR to provide +// symmetry with FromNet +func (trq *transferRequest1_1) ToNet(w io.Writer) error { + return dagcbor.Encode(trq.AsNode().Representation(), w) } diff --git a/message/message1_1/transfer_response_cbor_gen.go b/message/message1_1/transfer_response_cbor_gen.go deleted file mode 100644 index 2da5f0b8..00000000 --- a/message/message1_1/transfer_response_cbor_gen.go +++ /dev/null @@ -1,265 +0,0 @@ -// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. - -package message1_1 - -import ( - "fmt" - "io" - "sort" - - datatransfer "github.com/filecoin-project/go-data-transfer" - cid "github.com/ipfs/go-cid" - cbg "github.com/whyrusleeping/cbor-gen" - xerrors "golang.org/x/xerrors" -) - -var _ = xerrors.Errorf -var _ = cid.Undef -var _ = sort.Sort - -func (t *transferResponse1_1) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{166}); err != nil { - return err - } - - scratch := make([]byte, 9) - - // t.Type (uint64) (uint64) - if len("Type") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"Type\" was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("Type"))); err != nil { - return err - } - if _, err := io.WriteString(w, string("Type")); err != nil { - return err - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.Type)); err != nil { - return err - } - - // t.Acpt (bool) (bool) - if len("Acpt") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"Acpt\" was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("Acpt"))); err != nil { - return err - } - if _, err := io.WriteString(w, string("Acpt")); err != nil { - return err - } - - if err := cbg.WriteBool(w, t.Acpt); err != nil { - return err - } - - // t.Paus (bool) (bool) - if len("Paus") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"Paus\" was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("Paus"))); err != nil { - return err - } - if _, err := io.WriteString(w, string("Paus")); err != nil { - return err - } - - if err := cbg.WriteBool(w, t.Paus); err != nil { - return err - } - - // t.XferID (uint64) (uint64) - if len("XferID") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"XferID\" was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("XferID"))); err != nil { - return err - } - if _, err := io.WriteString(w, string("XferID")); err != nil { - return err - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.XferID)); err != nil { - return err - } - - // t.VRes (typegen.Deferred) (struct) - if len("VRes") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"VRes\" was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("VRes"))); err != nil { - return err - } - if _, err := io.WriteString(w, string("VRes")); err != nil { - return err - } - - if err := t.VRes.MarshalCBOR(w); err != nil { - return err - } - - // t.VTyp (datatransfer.TypeIdentifier) (string) - if len("VTyp") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"VTyp\" was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("VTyp"))); err != nil { - return err - } - if _, err := io.WriteString(w, string("VTyp")); err != nil { - return err - } - - if len(t.VTyp) > cbg.MaxLength { - return xerrors.Errorf("Value in field t.VTyp was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len(t.VTyp))); err != nil { - return err - } - if _, err := io.WriteString(w, string(t.VTyp)); err != nil { - return err - } - return nil -} - -func (t *transferResponse1_1) UnmarshalCBOR(r io.Reader) error { - *t = transferResponse1_1{} - - br := cbg.GetPeeker(r) - scratch := make([]byte, 8) - - maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) - if err != nil { - return err - } - if maj != cbg.MajMap { - return fmt.Errorf("cbor input should be of type map") - } - - if extra > cbg.MaxLength { - return fmt.Errorf("transferResponse1_1: map struct too large (%d)", extra) - } - - var name string - n := extra - - for i := uint64(0); i < n; i++ { - - { - sval, err := cbg.ReadStringBuf(br, scratch) - if err != nil { - return err - } - - name = string(sval) - } - - switch name { - // t.Type (uint64) (uint64) - case "Type": - - { - - maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.Type = uint64(extra) - - } - // t.Acpt (bool) (bool) - case "Acpt": - - maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) - if err != nil { - return err - } - if maj != cbg.MajOther { - return fmt.Errorf("booleans must be major type 7") - } - switch extra { - case 20: - t.Acpt = false - case 21: - t.Acpt = true - default: - return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) - } - // t.Paus (bool) (bool) - case "Paus": - - maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) - if err != nil { - return err - } - if maj != cbg.MajOther { - return fmt.Errorf("booleans must be major type 7") - } - switch extra { - case 20: - t.Paus = false - case 21: - t.Paus = true - default: - return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) - } - // t.XferID (uint64) (uint64) - case "XferID": - - { - - maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.XferID = uint64(extra) - - } - // t.VRes (typegen.Deferred) (struct) - case "VRes": - - { - - t.VRes = new(cbg.Deferred) - - if err := t.VRes.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("failed to read deferred field: %w", err) - } - } - // t.VTyp (datatransfer.TypeIdentifier) (string) - case "VTyp": - - { - sval, err := cbg.ReadStringBuf(br, scratch) - if err != nil { - return err - } - - t.VTyp = datatransfer.TypeIdentifier(sval) - } - - default: - // Field doesn't exist on this type, so ignore it - cbg.ScanForLinks(r, func(cid.Cid) {}) - } - } - - return nil -} diff --git a/testutil/fakegraphsync.go b/testutil/fakegraphsync.go index c4365eab..17c3a507 100644 --- a/testutil/fakegraphsync.go +++ b/testutil/fakegraphsync.go @@ -11,6 +11,7 @@ import ( "github.com/ipfs/go-cid" "github.com/ipfs/go-graphsync" "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/traversal" "github.com/libp2p/go-libp2p-core/peer" @@ -52,50 +53,22 @@ func (gsRequest ReceivedGraphSyncRequest) DTMessage(t *testing.T) datatransfer.M return matchDtMessage(t, gsRequest.Extensions) } -type PauseRequest struct { - RequestID graphsync.RequestID -} - -type ResumeRequest struct { +type Resume struct { RequestID graphsync.RequestID Extensions []graphsync.ExtensionData } // DTMessage returns the data transfer message among the graphsync extensions sent with this request -func (resumeRequest ResumeRequest) DTMessage(t *testing.T) datatransfer.Message { - return matchDtMessage(t, resumeRequest.Extensions) -} - -type PauseResponse struct { - P peer.ID - RequestID graphsync.RequestID -} - -type ResumeResponse struct { - P peer.ID - RequestID graphsync.RequestID - Extensions []graphsync.ExtensionData -} - -// DTMessage returns the data transfer message among the graphsync extensions sent with this request -func (resumeResponse ResumeResponse) DTMessage(t *testing.T) datatransfer.Message { - return matchDtMessage(t, resumeResponse.Extensions) -} - -type CancelResponse struct { - P peer.ID - RequestID graphsync.RequestID +func (resume Resume) DTMessage(t *testing.T) datatransfer.Message { + return matchDtMessage(t, resume.Extensions) } // FakeGraphSync implements a GraphExchange but does nothing type FakeGraphSync struct { requests chan ReceivedGraphSyncRequest // records calls to fakeGraphSync.Request - pauseRequests chan PauseRequest - resumeRequests chan ResumeRequest - pauseResponses chan PauseResponse - resumeResponses chan ResumeResponse - cancelResponses chan CancelResponse - cancelRequests chan graphsync.RequestID + pauses chan graphsync.RequestID + resumes chan Resume + cancels chan graphsync.RequestID persistenceOptionsLk sync.RWMutex persistenceOptions map[string]ipld.LinkSystem leaveRequestsOpen bool @@ -117,12 +90,9 @@ type FakeGraphSync struct { func NewFakeGraphSync() *FakeGraphSync { return &FakeGraphSync{ requests: make(chan ReceivedGraphSyncRequest, 2), - pauseRequests: make(chan PauseRequest, 1), - resumeRequests: make(chan ResumeRequest, 1), - pauseResponses: make(chan PauseResponse, 1), - resumeResponses: make(chan ResumeResponse, 1), - cancelResponses: make(chan CancelResponse, 1), - cancelRequests: make(chan graphsync.RequestID, 1), + pauses: make(chan graphsync.RequestID, 1), + resumes: make(chan Resume, 1), + cancels: make(chan graphsync.RequestID, 1), persistenceOptions: make(map[string]ipld.LinkSystem), } } @@ -147,95 +117,52 @@ func (fgs *FakeGraphSync) AssertRequestReceived(ctx context.Context, t *testing. return requestReceived } -// AssertNoPauseRequestReceived asserts that no pause requests should ahve been received by this graphsync implementation -func (fgs *FakeGraphSync) AssertNoPauseRequestReceived(t *testing.T) { - require.Empty(t, fgs.pauseRequests, "should not receive pause request") -} - -// AssertPauseRequestReceived asserts a pause request should be received before the context closes (and returns said request) -func (fgs *FakeGraphSync) AssertPauseRequestReceived(ctx context.Context, t *testing.T) PauseRequest { - var pauseRequestReceived PauseRequest - select { - case <-ctx.Done(): - t.Fatal("did not receive message sent") - case pauseRequestReceived = <-fgs.pauseRequests: - } - return pauseRequestReceived -} - -// AssertNoResumeRequestReceived asserts that no resume requests should ahve been received by this graphsync implementation -func (fgs *FakeGraphSync) AssertNoResumeRequestReceived(t *testing.T) { - require.Empty(t, fgs.resumeRequests, "should not receive resume request") -} - -// AssertResumeRequestReceived asserts a resume request should be received before the context closes (and returns said request) -func (fgs *FakeGraphSync) AssertResumeRequestReceived(ctx context.Context, t *testing.T) ResumeRequest { - var resumeRequestReceived ResumeRequest - select { - case <-ctx.Done(): - t.Fatal("did not receive message sent") - case resumeRequestReceived = <-fgs.resumeRequests: - } - return resumeRequestReceived -} - -// AssertNoPauseResponseReceived asserts that no pause requests should ahve been received by this graphsync implementation -func (fgs *FakeGraphSync) AssertNoPauseResponseReceived(t *testing.T) { - require.Empty(t, fgs.pauseResponses, "should not receive pause request") +// AssertNoPauseReceived asserts that no pause requests should ahve been received by this graphsync implementation +func (fgs *FakeGraphSync) AssertNoPauseReceived(t *testing.T) { + require.Empty(t, fgs.pauses, "should not receive pause request") } -// AssertPauseResponseReceived asserts a pause request should be received before the context closes (and returns said request) -func (fgs *FakeGraphSync) AssertPauseResponseReceived(ctx context.Context, t *testing.T) PauseResponse { - var pauseResponseReceived PauseResponse +// AssertPauseReceived asserts a pause request should be received before the context closes (and returns said request) +func (fgs *FakeGraphSync) AssertPauseReceived(ctx context.Context, t *testing.T) graphsync.RequestID { + var pauseReceived graphsync.RequestID select { case <-ctx.Done(): t.Fatal("did not receive message sent") - case pauseResponseReceived = <-fgs.pauseResponses: + case pauseReceived = <-fgs.pauses: } - return pauseResponseReceived + return pauseReceived } -// AssertNoResumeResponseReceived asserts that no resume requests should ahve been received by this graphsync implementation -func (fgs *FakeGraphSync) AssertNoResumeResponseReceived(t *testing.T) { - require.Empty(t, fgs.resumeResponses, "should not receive resume request") +// AssertNoResumeReceived asserts that no resume requests should ahve been received by this graphsync implementation +func (fgs *FakeGraphSync) AssertNoResumeReceived(t *testing.T) { + require.Empty(t, fgs.resumes, "should not receive resume request") } -// AssertResumeResponseReceived asserts a resume request should be received before the context closes (and returns said request) -func (fgs *FakeGraphSync) AssertResumeResponseReceived(ctx context.Context, t *testing.T) ResumeResponse { - var resumeResponseReceived ResumeResponse +// AssertResumeReceived asserts a resume request should be received before the context closes (and returns said request) +func (fgs *FakeGraphSync) AssertResumeReceived(ctx context.Context, t *testing.T) Resume { + var resumeReceived Resume select { case <-ctx.Done(): t.Fatal("did not receive message sent") - case resumeResponseReceived = <-fgs.resumeResponses: + case resumeReceived = <-fgs.resumes: } - return resumeResponseReceived + return resumeReceived } -// AssertNoCancelResponseReceived asserts that no responses were cancelled by thiss graphsync implementation -func (fgs *FakeGraphSync) AssertNoCancelResponseReceived(t *testing.T) { - require.Empty(t, fgs.cancelResponses, "should not cancel request") +// AssertNoCancelReceived asserts that no requests were cancelled by thiss graphsync implementation +func (fgs *FakeGraphSync) AssertNoCancelReceived(t *testing.T) { + require.Empty(t, fgs.cancels, "should not cancel request") } -// AssertCancelResponseReceived asserts a response was cancelled before the context closes (and returns said response) -func (fgs *FakeGraphSync) AssertCancelResponseReceived(ctx context.Context, t *testing.T) CancelResponse { - var cancelResponseReceived CancelResponse +// AssertCancelReceived asserts a requests was cancelled before the context closes (and returns said request id) +func (fgs *FakeGraphSync) AssertCancelReceived(ctx context.Context, t *testing.T) graphsync.RequestID { + var cancelReceived graphsync.RequestID select { case <-ctx.Done(): t.Fatal("did not receive message sent") - case cancelResponseReceived = <-fgs.cancelResponses: - } - return cancelResponseReceived -} - -// AssertCancelRequestReceived asserts a request was cancelled -func (fgs *FakeGraphSync) AssertCancelRequestReceived(ctx context.Context, t *testing.T) graphsync.RequestID { - select { - case <-ctx.Done(): - t.Fatal("did not receive message sent") - return 0 - case requestID := <-fgs.cancelRequests: - return requestID + case cancelReceived = <-fgs.cancels: } + return cancelReceived } // AssertHasPersistenceOption verifies that a persistence option was registered @@ -351,38 +278,20 @@ func (fgs *FakeGraphSync) RegisterCompletedResponseListener(listener graphsync.O } } -// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID -func (fgs *FakeGraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error { - fgs.resumeResponses <- ResumeResponse{p, requestID, extensions} - return nil -} - -// PauseResponse pauses a response based on peer ID and request ID -func (fgs *FakeGraphSync) PauseResponse(p peer.ID, requestID graphsync.RequestID) error { - fgs.pauseResponses <- PauseResponse{p, requestID} +// Unpause unpauses a request that was paused in a block hook based on request ID +func (fgs *FakeGraphSync) Unpause(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error { + fgs.resumes <- Resume{requestID, extensions} return nil } -// UnpauseRequest unpauses a request that was paused in a block hook based on request ID -func (fgs *FakeGraphSync) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error { - fgs.resumeRequests <- ResumeRequest{requestID, extensions} +// Pause pauses a request based on request ID +func (fgs *FakeGraphSync) Pause(ctx context.Context, requestID graphsync.RequestID) error { + fgs.pauses <- requestID return nil } -// PauseRequest unpauses a response that was paused in a block hook based on peer ID and request ID -func (fgs *FakeGraphSync) PauseRequest(requestID graphsync.RequestID) error { - fgs.pauseRequests <- PauseRequest{requestID} - return nil -} - -func (fgs *FakeGraphSync) CancelRequest(ctx context.Context, requestID graphsync.RequestID) error { - fgs.cancelRequests <- requestID - return nil -} - -// CancelResponse cancels a response in progress -func (fgs *FakeGraphSync) CancelResponse(p peer.ID, requestID graphsync.RequestID) error { - fgs.cancelResponses <- CancelResponse{p, requestID} +func (fgs *FakeGraphSync) Cancel(ctx context.Context, requestID graphsync.RequestID) error { + fgs.cancels <- requestID return nil } @@ -461,12 +370,12 @@ func NewFakeBlockData() graphsync.BlockData { } type fakeRequest struct { - id graphsync.RequestID - root cid.Cid - selector ipld.Node - priority graphsync.Priority - isCancel bool - extensions map[graphsync.ExtensionName][]byte + id graphsync.RequestID + root cid.Cid + selector ipld.Node + priority graphsync.Priority + requestType graphsync.RequestType + extensions map[graphsync.ExtensionName]datamodel.Node } // ID Returns the request ID for this Request @@ -491,32 +400,32 @@ func (fr *fakeRequest) Priority() graphsync.Priority { // Extension returns the content for an extension on a response, or errors // if extension is not present -func (fr *fakeRequest) Extension(name graphsync.ExtensionName) ([]byte, bool) { +func (fr *fakeRequest) Extension(name graphsync.ExtensionName) (datamodel.Node, bool) { data, has := fr.extensions[name] return data, has } -// IsCancel returns true if this particular request is being cancelled -func (fr *fakeRequest) IsCancel() bool { - return fr.isCancel +// Type returns the type of request +func (fr *fakeRequest) Type() graphsync.RequestType { + return fr.requestType } // NewFakeRequest returns a fake request that matches the request data interface -func NewFakeRequest(id graphsync.RequestID, extensions map[graphsync.ExtensionName][]byte) graphsync.RequestData { +func NewFakeRequest(id graphsync.RequestID, extensions map[graphsync.ExtensionName]datamodel.Node) graphsync.RequestData { return &fakeRequest{ - id: id, - root: GenerateCids(1)[0], - selector: allSelector, - priority: graphsync.Priority(rand.Int()), - isCancel: false, - extensions: extensions, + id: id, + root: GenerateCids(1)[0], + selector: allSelector, + priority: graphsync.Priority(rand.Int()), + extensions: extensions, + requestType: graphsync.RequestTypeNew, } } type fakeResponse struct { id graphsync.RequestID status graphsync.ResponseStatusCode - extensions map[graphsync.ExtensionName][]byte + extensions map[graphsync.ExtensionName]datamodel.Node } // RequestID returns the request ID for this response @@ -531,13 +440,18 @@ func (fr *fakeResponse) Status() graphsync.ResponseStatusCode { // Extension returns the content for an extension on a response, or errors // if extension is not present -func (fr *fakeResponse) Extension(name graphsync.ExtensionName) ([]byte, bool) { +func (fr *fakeResponse) Extension(name graphsync.ExtensionName) (datamodel.Node, bool) { data, has := fr.extensions[name] return data, has } +// Metadata returns metadata for this response +func (fr *fakeResponse) Metadata() graphsync.LinkMetadata { + return nil +} + // NewFakeResponse returns a fake response that matches the response data interface -func NewFakeResponse(id graphsync.RequestID, extensions map[graphsync.ExtensionName][]byte, status graphsync.ResponseStatusCode) graphsync.ResponseData { +func NewFakeResponse(id graphsync.RequestID, extensions map[graphsync.ExtensionName]datamodel.Node, status graphsync.ResponseStatusCode) graphsync.ResponseData { return &fakeResponse{ id: id, status: status, diff --git a/transport/graphsync/graphsync_test.go b/transport/graphsync/graphsync_test.go index c9b0edc7..97e2cfcb 100644 --- a/transport/graphsync/graphsync_test.go +++ b/transport/graphsync/graphsync_test.go @@ -889,7 +889,7 @@ func TestManager(t *testing.T) { for _, ext := range requestReceived.Extensions { extensions[ext.Name] = ext.Data } - request := testutil.NewFakeRequest(graphsync.RequestID(rand.Int31()), extensions) + request := testutil.NewFakeRequest(graphsync.NewRequestID(), extensions) gsData.fgs.OutgoingRequestHook(gsData.other, request, gsData.outgoingRequestHookActions) _ = gsData.transport.CloseChannel(gsData.ctx, datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.other, Initiator: gsData.self}) ctxt, cancel := context.WithTimeout(context.Background(), 2*time.Second) @@ -990,7 +990,7 @@ func TestManager(t *testing.T) { for _, ext := range requestReceived.Extensions { extensions[ext.Name] = ext.Data } - request := testutil.NewFakeRequest(graphsync.RequestID(rand.Int31()), extensions) + request := testutil.NewFakeRequest(graphsync.NewRequestID(), extensions) gsData.fgs.OutgoingRequestHook(gsData.other, request, gsData.outgoingRequestHookActions) select { case <-gsData.ctx.Done(): @@ -1048,9 +1048,9 @@ func TestManager(t *testing.T) { defer cancel() peers := testutil.GeneratePeers(2) transferID := datatransfer.TransferID(rand.Uint64()) - requestID := graphsync.RequestID(rand.Int31()) + requestID := graphsync.NewRequestID() request := data.requestConfig.makeRequest(t, transferID, requestID) - altRequest := data.requestConfig.makeRequest(t, transferID, graphsync.RequestID(rand.Int31())) + altRequest := data.requestConfig.makeRequest(t, transferID, graphsync.NewRequestID()) response := data.responseConfig.makeResponse(t, transferID, requestID) updatedRequest := data.updatedConfig.makeRequest(t, transferID, requestID) block := testutil.NewFakeBlockData() diff --git a/types.go b/types.go index cd970e0d..5bc5cef6 100644 --- a/types.go +++ b/types.go @@ -6,28 +6,19 @@ import ( "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/schema" "github.com/libp2p/go-libp2p-core/peer" cbg "github.com/whyrusleeping/cbor-gen" - - "github.com/filecoin-project/go-data-transfer/encoding" ) //go:generate cbor-gen-for ChannelID ChannelStages ChannelStage Log -// TypeIdentifier is a unique string identifier for a type of encodable object in a -// registry -type TypeIdentifier string - // EmptyTypeIdentifier means there is no voucher present const EmptyTypeIdentifier = TypeIdentifier("") // Registerable is a type of object in a registry. It must be encodable and must // have a single method that uniquely identifies its type -type Registerable interface { - encoding.Encodable - // Type is a unique string identifier for this voucher type - Type() TypeIdentifier -} +type Registerable = schema.TypedNode // Voucher is used to validate // a data transfer request against the underlying storage or retrieval deal