Skip to content

Commit 7794046

Browse files
Fire a transfer queued event when a transfer is queued in Graphsync (#221)
* req queued event in GS * fire a transfer queued event * changes as per review and tests * address nit * update graphsync deps
1 parent 54e3630 commit 7794046

File tree

11 files changed

+125
-3
lines changed

11 files changed

+125
-3
lines changed

channels/channels.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,10 @@ func (c *Channels) Accept(chid datatransfer.ChannelID) error {
215215
return c.send(chid, datatransfer.Accept)
216216
}
217217

218+
func (c *Channels) TransferRequestQueued(chid datatransfer.ChannelID) error {
219+
return c.send(chid, datatransfer.TransferRequestQueued)
220+
}
221+
218222
// Restart marks a data transfer as restarted
219223
func (c *Channels) Restart(chid datatransfer.ChannelID) error {
220224
return c.send(chid, datatransfer.Restart)

channels/channels_fsm.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ var ChannelEvents = fsm.Events{
3434
chst.AddLog("")
3535
return nil
3636
}),
37+
38+
fsm.Event(datatransfer.TransferRequestQueued).FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
39+
chst.Message = ""
40+
chst.AddLog("")
41+
return nil
42+
}),
43+
3744
fsm.Event(datatransfer.Restart).FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
3845
chst.Message = ""
3946
chst.AddLog("")

channels/channels_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,17 @@ func TestChannels(t *testing.T) {
127127
require.True(t, xerrors.As(err, new(*channels.ErrNotFound)))
128128
})
129129

130+
t.Run("transfer queued", func(t *testing.T) {
131+
state, err := channelList.GetByID(ctx, datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1})
132+
require.NoError(t, err)
133+
require.Equal(t, state.Status(), datatransfer.Ongoing)
134+
135+
err = channelList.TransferRequestQueued(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1})
136+
require.NoError(t, err)
137+
state = checkEvent(ctx, t, received, datatransfer.TransferRequestQueued)
138+
require.Equal(t, state.Status(), datatransfer.Ongoing)
139+
})
140+
130141
t.Run("updating send/receive values", func(t *testing.T) {
131142
ds := dss.MutexWrap(datastore.NewMapDatastore())
132143
dir := os.TempDir()

events.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ const (
102102
// ReceiveDataError indicates that the transport layer had an error
103103
// receiving data from the remote peer
104104
ReceiveDataError
105+
106+
// TransferRequestQueued indicates that a new data transfer request has been queued in the transport layer
107+
TransferRequestQueued
105108
)
106109

107110
// Events are human readable names for data transfer events
@@ -134,6 +137,7 @@ var Events = map[EventCode]string{
134137
RequestTimedOut: "RequestTimedOut",
135138
SendDataError: "SendDataError",
136139
ReceiveDataError: "ReceiveDataError",
140+
TransferRequestQueued: "TransferRequestQueued",
137141
}
138142

139143
// Event is a struct containing information about a data transfer event

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/ipfs/go-cid v0.0.7
1414
github.com/ipfs/go-datastore v0.4.5
1515
github.com/ipfs/go-ds-badger v0.2.3
16-
github.com/ipfs/go-graphsync v0.6.1
16+
github.com/ipfs/go-graphsync v0.6.4
1717
github.com/ipfs/go-ipfs-blockstore v1.0.1
1818
github.com/ipfs/go-ipfs-blocksutil v0.0.1
1919
github.com/ipfs/go-ipfs-chunker v0.0.5

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR
215215
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
216216
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
217217
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
218-
github.com/ipfs/go-graphsync v0.6.1 h1:i9wN7YkBXWwIsUjVQeuaDxFB59yWZrG1xL564Nz7aGE=
219-
github.com/ipfs/go-graphsync v0.6.1/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
218+
github.com/ipfs/go-graphsync v0.6.4 h1:g6wFRK2BkLPnx8nfoSdnokp5gtpuGyWZjbqI6q3NGb8=
219+
github.com/ipfs/go-graphsync v0.6.4/go.mod h1:5WyaeigpNdpiYQuW2vwpuecOoEfB4h747ZGEOKmAGTg=
220220
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
221221
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
222222
github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ=

impl/events.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ func (m *manager) OnRequestReceived(chid datatransfer.ChannelID, request datatra
160160
return nil, nil
161161
}
162162

163+
func (m *manager) OnTransferQueued(chid datatransfer.ChannelID) {
164+
m.channels.TransferRequestQueued(chid)
165+
}
166+
163167
func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datatransfer.Response) error {
164168
if response.IsCancel() {
165169
log.Infof("channel %s: received cancel response, cancelling channel", chid)

testutil/fakegraphsync.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ type FakeGraphSync struct {
106106
OutgoingRequestHook graphsync.OnOutgoingRequestHook
107107
IncomingBlockHook graphsync.OnIncomingBlockHook
108108
OutgoingBlockHook graphsync.OnOutgoingBlockHook
109+
IncomingRequestQueuedHook graphsync.OnIncomingRequestQueuedHook
109110
IncomingRequestHook graphsync.OnIncomingRequestHook
110111
CompletedResponseListener graphsync.OnResponseCompletedListener
111112
RequestUpdatedHook graphsync.OnRequestUpdatedHook
@@ -286,6 +287,14 @@ func (fgs *FakeGraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingR
286287
}
287288
}
288289

290+
// RegisterIncomingRequestQueuedHook adds a hook that runs when an incoming GS request is queued.
291+
func (fgs *FakeGraphSync) RegisterIncomingRequestQueuedHook(hook graphsync.OnIncomingRequestQueuedHook) graphsync.UnregisterHookFunc {
292+
fgs.IncomingRequestQueuedHook = hook
293+
return func() {
294+
fgs.IncomingRequestQueuedHook = nil
295+
}
296+
}
297+
289298
// RegisterIncomingResponseHook adds a hook that runs when a response is received
290299
func (fgs *FakeGraphSync) RegisterIncomingResponseHook(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc {
291300
fgs.IncomingResponseHook = hook

transport.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ type EventsHandler interface {
3939
// OnDataSent is called when we send data for the given channel ID
4040
OnDataSent(chid ChannelID, link ipld.Link, size uint64) error
4141

42+
// OnTransferQueued is called when a new data transfer request is queued in the transport layer.
43+
OnTransferQueued(chid ChannelID)
44+
4245
// OnRequestReceived is called when we receive a new request to send data
4346
// for the given channel ID
4447
// return values are:

transport/graphsync/graphsync.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ func (t *Transport) SetEventHandler(events datatransfer.EventsHandler) error {
285285
}
286286
t.events = events
287287

288+
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterIncomingRequestQueuedHook(t.gsReqQueuedHook))
288289
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterIncomingRequestHook(t.gsReqRecdHook))
289290
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterCompletedResponseListener(t.gsCompletedResponseListener))
290291
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterIncomingBlockHook(t.gsIncomingBlockHook))
@@ -448,6 +449,40 @@ func (t *Transport) gsOutgoingBlockHook(p peer.ID, request graphsync.RequestData
448449
}
449450
}
450451

452+
// gsReqQueuedHook is called when graphsync enqueues an incoming request for data
453+
func (t *Transport) gsReqQueuedHook(p peer.ID, request graphsync.RequestData) {
454+
msg, err := extension.GetTransferData(request, t.supportedExtensions)
455+
if err != nil {
456+
log.Errorf("failed GetTransferData, req=%+v, err=%s", request, err)
457+
}
458+
// extension not found; probably not our request.
459+
if msg == nil {
460+
return
461+
}
462+
463+
var chid datatransfer.ChannelID
464+
if msg.IsRequest() {
465+
// when a data transfer request comes in on graphsync, the remote peer
466+
// initiated a pull
467+
chid = datatransfer.ChannelID{ID: msg.TransferID(), Initiator: p, Responder: t.peerID}
468+
request := msg.(datatransfer.Request)
469+
if request.IsNew() {
470+
log.Infof("%s, pull request queued, req=%+v", chid, request)
471+
t.events.OnTransferQueued(chid)
472+
}
473+
} else {
474+
// when a data transfer response comes in on graphsync, this node
475+
// initiated a push, and the remote peer responded with a request
476+
// for data
477+
chid = datatransfer.ChannelID{ID: msg.TransferID(), Initiator: t.peerID, Responder: p}
478+
response := msg.(datatransfer.Response)
479+
if response.IsNew() {
480+
log.Infof("%s, GS pull request queued in response to our push, req=%+v", chid, request)
481+
t.events.OnTransferQueued(chid)
482+
}
483+
}
484+
}
485+
451486
// gsReqRecdHook is called when graphsync receives an incoming request for data
452487
func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
453488
// if this is a push request the sender is us.

0 commit comments

Comments
 (0)