Skip to content

Commit 717d0bf

Browse files
authored
Use do-not-send-first-blocks extension for restarts (#257)
* feat: do-not-send-first-blocks * refactor: move received blocks total onto channel state * fix: record stream protocol on stream open
1 parent b06ea85 commit 717d0bf

33 files changed

+757
-285
lines changed

benchmarks/testinstance/testinstance.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func NewInstance(ctx context.Context, net tn.Network, tempDir string, diskBasedD
165165

166166
linkSystem := storeutil.LinkSystemForBlockstore(bstore)
167167
gs := gsimpl.New(ctx, gsNet, linkSystem, gsimpl.RejectAllRequestsByDefault())
168-
transport := gstransport.NewTransport(p, gs)
168+
transport := gstransport.NewTransport(p, gs, dtNet)
169169
dt, err := dtimpl.NewDataTransfer(namespace.Wrap(dstore, datastore.NewKey("/data-transfers/transfers")), os.TempDir(), dtNet, transport)
170170
if err != nil {
171171
return Instance{}, err

channelmonitor/channelmonitor_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,8 @@ type mockChannelState struct {
521521
complete bool
522522
}
523523

524+
var _ datatransfer.ChannelState = (*mockChannelState)(nil)
525+
524526
func (m *mockChannelState) Queued() uint64 {
525527
return m.queued
526528
}
@@ -615,3 +617,7 @@ func (m *mockChannelState) ReceivedCids() []cid.Cid {
615617
func (m *mockChannelState) ReceivedCidsLen() int {
616618
panic("implement me")
617619
}
620+
621+
func (m *mockChannelState) ReceivedCidsTotal() int64 {
622+
panic("implement me")
623+
}

channels/channel_state.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ type channelState struct {
4040
sent uint64
4141
// total bytes received by this node (0 if sender)
4242
received uint64
43+
// number of blocks that have been received, including blocks that are
44+
// present in more than one place in the DAG
45+
receivedBlocksTotal int64
4346
// more informative status on a channel
4447
message string
4548
// additional vouchers
@@ -107,7 +110,7 @@ func (c channelState) ReceivedCids() []cid.Cid {
107110
return receivedCids
108111
}
109112

110-
// ReceivedCids returns the number of cids received so far on this channel
113+
// ReceivedCids returns the number of unique cids received so far on this channel
111114
func (c channelState) ReceivedCidsLen() int {
112115
len, err := c.receivedCids.Len(c.ChannelID())
113116
if err != nil {
@@ -116,6 +119,12 @@ func (c channelState) ReceivedCidsLen() int {
116119
return len
117120
}
118121

122+
// ReceivedCidsTotal returns the number of (non-unique) cids received so far
123+
// on the channel - note that a block can exist in more than one place in the DAG
124+
func (c channelState) ReceivedCidsTotal() int64 {
125+
return c.receivedBlocksTotal
126+
}
127+
119128
// Sender returns the peer id for the node that is sending data
120129
func (c channelState) Sender() peer.ID { return c.sender }
121130

@@ -213,6 +222,7 @@ func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByT
213222
queued: c.Queued,
214223
sent: c.Sent,
215224
received: c.Received,
225+
receivedBlocksTotal: c.ReceivedBlocksTotal,
216226
message: c.Message,
217227
vouchers: c.Vouchers,
218228
voucherResults: c.VoucherResults,

channels/channels.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -241,8 +241,27 @@ func (c *Channels) DataQueued(chid datatransfer.ChannelID, k cid.Cid, delta uint
241241
}
242242

243243
// Returns true if this is the first time the block has been received
244-
func (c *Channels) DataReceived(chid datatransfer.ChannelID, k cid.Cid, delta uint64) (bool, error) {
245-
return c.fireProgressEvent(chid, datatransfer.DataReceived, datatransfer.DataReceivedProgress, k, delta)
244+
func (c *Channels) DataReceived(chid datatransfer.ChannelID, k cid.Cid, delta uint64, index int64) (bool, error) {
245+
if err := c.checkChannelExists(chid, datatransfer.DataReceived); err != nil {
246+
return false, err
247+
}
248+
249+
// Check if the block has already been seen
250+
sid := seenCidsSetID(chid, datatransfer.DataReceived)
251+
seen, err := c.seenCIDs.InsertSetCID(sid, k)
252+
if err != nil {
253+
return false, err
254+
}
255+
256+
// If the block has not been seen before, fire the progress event
257+
if !seen {
258+
if err := c.stateMachines.Send(chid, datatransfer.DataReceivedProgress, delta); err != nil {
259+
return false, err
260+
}
261+
}
262+
263+
// Fire the regular event
264+
return !seen, c.stateMachines.Send(chid, datatransfer.DataReceived, index)
246265
}
247266

248267
// PauseInitiator pauses the initator of this channel
@@ -358,6 +377,7 @@ func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error) {
358377
// removeSeenCIDCaches cleans up the caches of "seen" blocks, ie
359378
// blocks that have already been queued / sent / received
360379
func (c *Channels) removeSeenCIDCaches(chid datatransfer.ChannelID) error {
380+
// Clean up seen block caches
361381
progressStates := []datatransfer.EventCode{
362382
datatransfer.DataQueued,
363383
datatransfer.DataSent,
@@ -432,9 +452,7 @@ func seenCidsSetID(chid datatransfer.ChannelID, evt datatransfer.EventCode) cids
432452

433453
// Convert from the internally used channel state format to the externally exposed ChannelState
434454
func (c *Channels) fromInternalChannelState(ch internal.ChannelState) datatransfer.ChannelState {
435-
rcr := &receivedCidsReader{
436-
seenCIDs: c.seenCIDs,
437-
}
455+
rcr := &receivedCidsReader{seenCIDs: c.seenCIDs}
438456
return fromInternalChannelState(ch, c.voucherDecoder, c.voucherResultDecoder, rcr)
439457
}
440458

channels/channels_fsm.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,14 @@ var ChannelEvents = fsm.Events{
5050
chst.AddLog("")
5151
return nil
5252
}),
53-
fsm.Event(datatransfer.DataReceived).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
54-
chst.AddLog("")
55-
return nil
56-
}),
53+
fsm.Event(datatransfer.DataReceived).FromAny().ToNoChange().
54+
Action(func(chst *internal.ChannelState, rcvdBlocksTotal int64) error {
55+
if rcvdBlocksTotal > chst.ReceivedBlocksTotal {
56+
chst.ReceivedBlocksTotal = rcvdBlocksTotal
57+
}
58+
chst.AddLog("")
59+
return nil
60+
}),
5761
fsm.Event(datatransfer.DataReceivedProgress).FromMany(transferringStates...).ToNoChange().
5862
Action(func(chst *internal.ChannelState, delta uint64) error {
5963
chst.Received += delta

channels/channels_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func TestChannels(t *testing.T) {
188188
require.Equal(t, uint64(0), state.Sent())
189189
require.Empty(t, state.ReceivedCids())
190190

191-
isNew, err := channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0], 50)
191+
isNew, err := channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0], 50, 1)
192192
require.NoError(t, err)
193193
_ = checkEvent(ctx, t, received, datatransfer.DataReceivedProgress)
194194
require.True(t, isNew)
@@ -207,15 +207,15 @@ func TestChannels(t *testing.T) {
207207
require.Equal(t, []cid.Cid{cids[0]}, state.ReceivedCids())
208208

209209
// errors if channel does not exist
210-
isNew, err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1}, cids[1], 200)
210+
isNew, err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1}, cids[1], 200, 2)
211211
require.True(t, xerrors.As(err, new(*channels.ErrNotFound)))
212212
require.False(t, isNew)
213213
isNew, err = channelList.DataSent(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1}, cids[1], 200)
214214
require.True(t, xerrors.As(err, new(*channels.ErrNotFound)))
215215
require.Equal(t, []cid.Cid{cids[0]}, state.ReceivedCids())
216216
require.False(t, isNew)
217217

218-
isNew, err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 50)
218+
isNew, err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 50, 2)
219219
require.NoError(t, err)
220220
_ = checkEvent(ctx, t, received, datatransfer.DataReceivedProgress)
221221
require.True(t, isNew)
@@ -232,7 +232,7 @@ func TestChannels(t *testing.T) {
232232
require.Equal(t, uint64(100), state.Sent())
233233
require.ElementsMatch(t, []cid.Cid{cids[0], cids[1]}, state.ReceivedCids())
234234

235-
isNew, err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0], 50)
235+
isNew, err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0], 50, 3)
236236
require.NoError(t, err)
237237
require.False(t, isNew)
238238
state = checkEvent(ctx, t, received, datatransfer.DataReceived)

channels/internal/internalchannel.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ type ChannelState struct {
6060
Message string
6161
Vouchers []EncodedVoucher
6262
VoucherResults []EncodedVoucherResult
63+
// Number of blocks that have been received, including blocks that are
64+
// present in more than one place in the DAG
65+
ReceivedBlocksTotal int64
6366

6467
// Stages traces the execution fo a data transfer.
6568
//

channels/internal/internalchannel_cbor_gen.go

Lines changed: 49 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/ipfs/go-cid v0.0.7
1414
github.com/ipfs/go-datastore v0.4.5
1515
github.com/ipfs/go-ds-badger v0.2.6
16-
github.com/ipfs/go-graphsync v0.9.1
16+
github.com/ipfs/go-graphsync v0.10.0-rc2
1717
github.com/ipfs/go-ipfs-blockstore v1.0.1
1818
github.com/ipfs/go-ipfs-blocksutil v0.0.1
1919
github.com/ipfs/go-ipfs-chunker v0.0.5
@@ -31,6 +31,7 @@ require (
3131
github.com/libp2p/go-libp2p v0.13.0
3232
github.com/libp2p/go-libp2p-core v0.8.5
3333
github.com/libp2p/go-libp2p-record v0.1.1 // indirect
34+
github.com/multiformats/go-multiaddr v0.3.1
3435
github.com/stretchr/testify v1.6.1
3536
github.com/whyrusleeping/cbor-gen v0.0.0-20210219115102-f37d292932f2
3637
go.uber.org/atomic v1.6.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,8 @@ github.com/ipfs/go-ds-badger v0.2.6/go.mod h1:02rnztVKA4aZwDuaRPTf8mpqcKmXP7mLl6
223223
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
224224
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
225225
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
226-
github.com/ipfs/go-graphsync v0.9.1 h1:jo7ZaAZ3lal89RhKxKoRkPzIO8lmOY6KUWA1mDRZ2+U=
227-
github.com/ipfs/go-graphsync v0.9.1/go.mod h1:J62ahWT9JbPsFL2UWsUM5rOu0lZJ0LOIH1chHdxGGcw=
226+
github.com/ipfs/go-graphsync v0.10.0-rc2 h1:nS0IolMkkDnTrNZFSLolT1Kd+IaUSXU89jqce4aGq54=
227+
github.com/ipfs/go-graphsync v0.10.0-rc2/go.mod h1:kQJlkg1aE9HfCwp577BgPy/UxlordJ7ScIBO2IHwPZU=
228228
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
229229
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
230230
github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ=

0 commit comments

Comments
 (0)