Skip to content

Commit f3280ab

Browse files
authored
Simplify graphsync cancel (#229)
* refactor: simplify graphsync cancel * docs: add lock required comments to graphsync transport * feat: restore wait for complete in graphsync transport * feat: upgrade to graphsync v0.6.8
1 parent 1d44c41 commit f3280ab

File tree

11 files changed

+139
-110
lines changed

11 files changed

+139
-110
lines changed

channels/channels.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -332,10 +332,10 @@ func (c *Channels) Disconnected(chid datatransfer.ChannelID, err error) error {
332332
return c.send(chid, datatransfer.Disconnected, err)
333333
}
334334

335-
// RequestTimedOut indicates that the transport layer had a timeout trying to
336-
// make a request
337-
func (c *Channels) RequestTimedOut(chid datatransfer.ChannelID, err error) error {
338-
return c.send(chid, datatransfer.RequestTimedOut, err)
335+
// RequestCancelled indicates that a transport layer request was cancelled by the
336+
// request opener
337+
func (c *Channels) RequestCancelled(chid datatransfer.ChannelID, err error) error {
338+
return c.send(chid, datatransfer.RequestCancelled, err)
339339
}
340340

341341
// SendDataError indicates that the transport layer had an error trying

channels/channels_fsm.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,9 @@ var ChannelEvents = fsm.Events{
9595
chst.AddLog("data transfer receive error: %s", chst.Message)
9696
return nil
9797
}),
98-
fsm.Event(datatransfer.RequestTimedOut).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
98+
fsm.Event(datatransfer.RequestCancelled).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
9999
chst.Message = err.Error()
100-
chst.AddLog("data transfer request timed out: %s", chst.Message)
100+
chst.AddLog("data transfer request cancelled: %s", chst.Message)
101101
return nil
102102
}),
103103
fsm.Event(datatransfer.Error).FromAny().To(datatransfer.Failing).Action(func(chst *internal.ChannelState, err error) error {

events.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,7 @@ const (
9191
// data has been received.
9292
DataReceivedProgress
9393

94-
// RequestTimedOut indicates that the transport layer had a timeout trying to
95-
// make a request
94+
// Deprecated in favour of RequestCancelled
9695
RequestTimedOut
9796

9897
// SendDataError indicates that the transport layer had an error trying
@@ -105,6 +104,9 @@ const (
105104

106105
// TransferRequestQueued indicates that a new data transfer request has been queued in the transport layer
107106
TransferRequestQueued
107+
108+
// RequestCancelled indicates that a transport layer request was cancelled by the request opener
109+
RequestCancelled
108110
)
109111

110112
// Events are human readable names for data transfer events
@@ -138,6 +140,7 @@ var Events = map[EventCode]string{
138140
SendDataError: "SendDataError",
139141
ReceiveDataError: "ReceiveDataError",
140142
TransferRequestQueued: "TransferRequestQueued",
143+
RequestCancelled: "RequestCancelled",
141144
}
142145

143146
// Event is a struct containing information about a data transfer event

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/ipfs/go-cid v0.0.7
1414
github.com/ipfs/go-datastore v0.4.5
1515
github.com/ipfs/go-ds-badger v0.2.3
16-
github.com/ipfs/go-graphsync v0.6.4
16+
github.com/ipfs/go-graphsync v0.6.8
1717
github.com/ipfs/go-ipfs-blockstore v1.0.1
1818
github.com/ipfs/go-ipfs-blocksutil v0.0.1
1919
github.com/ipfs/go-ipfs-chunker v0.0.5
@@ -33,5 +33,6 @@ require (
3333
github.com/stretchr/testify v1.6.1
3434
github.com/whyrusleeping/cbor-gen v0.0.0-20200826160007-0b9f6c5fb163
3535
go.uber.org/atomic v1.6.0
36+
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
3637
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
3738
)

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR
215215
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
216216
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
217217
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
218-
github.com/ipfs/go-graphsync v0.6.4 h1:g6wFRK2BkLPnx8nfoSdnokp5gtpuGyWZjbqI6q3NGb8=
219-
github.com/ipfs/go-graphsync v0.6.4/go.mod h1:5WyaeigpNdpiYQuW2vwpuecOoEfB4h747ZGEOKmAGTg=
218+
github.com/ipfs/go-graphsync v0.6.8 h1:mgyPdBDPcgL8ujO132grQjP3rfQv+KN/riQzbmTVgg4=
219+
github.com/ipfs/go-graphsync v0.6.8/go.mod h1:GdHT8JeuIZ0R4lSjFR16Oe4zPi5dXwKi9zR9ADVlcdk=
220220
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
221221
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
222222
github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ=

impl/events.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,9 +216,9 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datat
216216
return m.resumeOther(chid)
217217
}
218218

219-
func (m *manager) OnRequestTimedOut(chid datatransfer.ChannelID, err error) error {
220-
log.Warnf("channel %+v has timed out: %s", chid, err)
221-
return m.channels.RequestTimedOut(chid, err)
219+
func (m *manager) OnRequestCancelled(chid datatransfer.ChannelID, err error) error {
220+
log.Warnf("channel %+v was cancelled: %s", chid, err)
221+
return m.channels.RequestCancelled(chid, err)
222222
}
223223

224224
func (m *manager) OnRequestDisconnected(chid datatransfer.ChannelID, err error) error {

impl/restart_integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func TestRestartPush(t *testing.T) {
201201
require.EqualError(t, err, "context timed-out without completing data transfer")
202202
require.True(t, len(receivedI) < totalIncrements)
203203
require.NotEmpty(t, sentI)
204-
t.Logf("not request was completed after disconnect")
204+
t.Logf("request was not completed after disconnect")
205205

206206
// Connect the peers and restart
207207
require.NoError(t, rh.gsData.Mn.LinkAll())

testutil/fakegraphsync.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ type FakeGraphSync struct {
100100
pauseResponses chan PauseResponse
101101
resumeResponses chan ResumeResponse
102102
cancelResponses chan CancelResponse
103+
cancelRequests chan graphsync.RequestID
103104
persistenceOptionsLk sync.RWMutex
104105
persistenceOptions map[string]PersistenceOption
105106
leaveRequestsOpen bool
@@ -126,6 +127,7 @@ func NewFakeGraphSync() *FakeGraphSync {
126127
pauseResponses: make(chan PauseResponse, 1),
127128
resumeResponses: make(chan ResumeResponse, 1),
128129
cancelResponses: make(chan CancelResponse, 1),
130+
cancelRequests: make(chan graphsync.RequestID, 1),
129131
persistenceOptions: make(map[string]PersistenceOption),
130132
}
131133
}
@@ -230,6 +232,17 @@ func (fgs *FakeGraphSync) AssertCancelResponseReceived(ctx context.Context, t *t
230232
return cancelResponseReceived
231233
}
232234

235+
// AssertCancelRequestReceived asserts a request was cancelled
236+
func (fgs *FakeGraphSync) AssertCancelRequestReceived(ctx context.Context, t *testing.T) graphsync.RequestID {
237+
select {
238+
case <-ctx.Done():
239+
t.Fatal("did not receive message sent")
240+
return 0
241+
case requestID := <-fgs.cancelRequests:
242+
return requestID
243+
}
244+
}
245+
233246
// AssertHasPersistenceOption verifies that a persistence option was registered
234247
func (fgs *FakeGraphSync) AssertHasPersistenceOption(t *testing.T, name string) PersistenceOption {
235248
fgs.persistenceOptionsLk.RLock()
@@ -367,6 +380,11 @@ func (fgs *FakeGraphSync) PauseRequest(requestID graphsync.RequestID) error {
367380
return nil
368381
}
369382

383+
func (fgs *FakeGraphSync) CancelRequest(ctx context.Context, requestID graphsync.RequestID) error {
384+
fgs.cancelRequests <- requestID
385+
return nil
386+
}
387+
370388
// CancelResponse cancels a response in progress
371389
func (fgs *FakeGraphSync) CancelResponse(p peer.ID, requestID graphsync.RequestID) error {
372390
fgs.cancelResponses <- CancelResponse{p, requestID}

transport.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,10 @@ type EventsHandler interface {
5656
// Error returns are logged but otherwise have no effect
5757
OnChannelCompleted(chid ChannelID, err error) error
5858

59-
// OnRequestTimedOut is called when a request we opened (with the given channel Id) to receive data times out.
59+
// OnRequestCancelled is called when a request we opened (with the given channel Id) to
60+
// receive data is cancelled by us.
6061
// Error returns are logged but otherwise have no effect
61-
OnRequestTimedOut(chid ChannelID, err error) error
62+
OnRequestCancelled(chid ChannelID, err error) error
6263

6364
// OnRequestDisconnected is called when a network error occurs trying to send a request
6465
OnRequestDisconnected(chid ChannelID, err error) error

0 commit comments

Comments
 (0)