@@ -28,7 +28,10 @@ import (
2828
2929type DecoderByTypeFunc func (identifier datatransfer.TypeIdentifier ) (encoding.Decoder , bool )
3030
31- type ChannelCIDsReader func (chid datatransfer.ChannelID ) ([]cid.Cid , error )
31+ type ReceivedCidsReader interface {
32+ ToArray (chid datatransfer.ChannelID ) ([]cid.Cid , error )
33+ Len (chid datatransfer.ChannelID ) (int , error )
34+ }
3235
3336type Notifier func (datatransfer.Event , datatransfer.ChannelState )
3437
@@ -55,7 +58,6 @@ type Channels struct {
5558 voucherResultDecoder DecoderByTypeFunc
5659 stateMachines fsm.Group
5760 migrateStateMachines func (context.Context ) error
58- cidLists cidlists.CIDLists
5961 seenCIDs * cidsets.CIDSetManager
6062}
6163
@@ -78,7 +80,6 @@ func New(ds datastore.Batching,
7880
7981 seenCIDsDS := namespace .Wrap (ds , datastore .NewKey ("seencids" ))
8082 c := & Channels {
81- cidLists : cidLists ,
8283 seenCIDs : cidsets .NewCIDSetManager (seenCIDsDS ),
8384 notifier : notifier ,
8485 voucherDecoder : voucherDecoder ,
@@ -123,7 +124,7 @@ func (c *Channels) dispatch(eventName fsm.EventName, channel fsm.StateType) {
123124 Timestamp : time .Now (),
124125 }
125126
126- c .notifier (evt , fromInternalChannelState (realChannel , c . voucherDecoder , c . voucherResultDecoder , c . cidLists . ReadList ))
127+ c .notifier (evt , c . fromInternalChannelState (realChannel ))
127128
128129 // When the channel has been cleaned up, remove the caches of seen cids
129130 if evt .Code == datatransfer .CleanupComplete {
@@ -180,10 +181,6 @@ func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, base
180181 if err != nil {
181182 return datatransfer.ChannelID {}, err
182183 }
183- err = c .cidLists .CreateList (chid , nil )
184- if err != nil {
185- return datatransfer.ChannelID {}, err
186- }
187184 return chid , c .stateMachines .Send (chid , datatransfer .Open )
188185}
189186
@@ -197,7 +194,7 @@ func (c *Channels) InProgress() (map[datatransfer.ChannelID]datatransfer.Channel
197194 channels := make (map [datatransfer.ChannelID ]datatransfer.ChannelState , len (internalChannels ))
198195 for _ , internalChannel := range internalChannels {
199196 channels [datatransfer.ChannelID {ID : internalChannel .TransferID , Responder : internalChannel .Responder , Initiator : internalChannel .Initiator }] =
200- fromInternalChannelState (internalChannel , c . voucherDecoder , c . voucherResultDecoder , c . cidLists . ReadList )
197+ c . fromInternalChannelState (internalChannel )
201198 }
202199 return channels , nil
203200}
@@ -210,7 +207,7 @@ func (c *Channels) GetByID(ctx context.Context, chid datatransfer.ChannelID) (da
210207 if err != nil {
211208 return nil , NewErrNotFound (chid )
212209 }
213- return fromInternalChannelState (internalChannel , c . voucherDecoder , c . voucherResultDecoder , c . cidLists . ReadList ), nil
210+ return c . fromInternalChannelState (internalChannel ), nil
214211}
215212
216213// Accept marks a data transfer as accepted
@@ -239,11 +236,6 @@ func (c *Channels) DataQueued(chid datatransfer.ChannelID, k cid.Cid, delta uint
239236
240237// Returns true if this is the first time the block has been received
241238func (c * Channels ) DataReceived (chid datatransfer.ChannelID , k cid.Cid , delta uint64 ) (bool , error ) {
242- err := c .cidLists .AppendList (chid , k )
243- if err != nil {
244- return false , err
245- }
246-
247239 return c .fireProgressEvent (chid , datatransfer .DataReceived , datatransfer .DataReceivedProgress , k , delta )
248240}
249241
@@ -361,12 +353,12 @@ func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error) {
361353// blocks that have already been queued / sent / received
362354func (c * Channels ) removeSeenCIDCaches (chid datatransfer.ChannelID ) error {
363355 progressStates := []datatransfer.EventCode {
364- datatransfer .DataQueuedProgress ,
365- datatransfer .DataSentProgress ,
366- datatransfer .DataReceivedProgress ,
356+ datatransfer .DataQueued ,
357+ datatransfer .DataSent ,
358+ datatransfer .DataReceived ,
367359 }
368360 for _ , evt := range progressStates {
369- sid := cidsets . SetID (chid . String () + "/" + datatransfer . Events [ evt ] )
361+ sid := seenCidsSetID (chid , evt )
370362 err := c .seenCIDs .DeleteSet (sid )
371363 if err != nil {
372364 return err
@@ -388,7 +380,7 @@ func (c *Channels) fireProgressEvent(chid datatransfer.ChannelID, evt datatransf
388380 }
389381
390382 // Check if the block has already been seen
391- sid := cidsets . SetID (chid . String () + "/" + datatransfer . Events [ evt ] )
383+ sid := seenCidsSetID (chid , evt )
392384 seen , err := c .seenCIDs .InsertSetCID (sid , k )
393385 if err != nil {
394386 return false , err
@@ -424,3 +416,40 @@ func (c *Channels) checkChannelExists(chid datatransfer.ChannelID, code datatran
424416 }
425417 return nil
426418}
419+
420+ // Get the ID of the CID set for the given channel ID and event code.
421+ // The CID set stores a unique list of queued / sent / received CIDs.
422+ func seenCidsSetID (chid datatransfer.ChannelID , evt datatransfer.EventCode ) cidsets.SetID {
423+ return cidsets .SetID (chid .String () + "/" + datatransfer .Events [evt ])
424+ }
425+
426+ // Convert from the internally used channel state format to the externally exposed ChannelState
427+ func (c * Channels ) fromInternalChannelState (ch internal.ChannelState ) datatransfer.ChannelState {
428+ rcr := & receivedCidsReader {
429+ seenCIDs : c .seenCIDs ,
430+ }
431+ return fromInternalChannelState (ch , c .voucherDecoder , c .voucherResultDecoder , rcr )
432+ }
433+
434+ // Implements the ReceivedCidsReader interface so that the internal channel
435+ // state has access to the received CIDs.
436+ // The interface is used (instead of passing these values directly)
437+ // so the values can be loaded lazily. Reading all CIDs from the datastore
438+ // is an expensive operation so we want to avoid doing it unless necessary.
439+ // Note that the received CIDs get cleaned up when the channel completes, so
440+ // these methods will return an empty array after that point.
441+ type receivedCidsReader struct {
442+ seenCIDs * cidsets.CIDSetManager
443+ }
444+
445+ func (r * receivedCidsReader ) ToArray (chid datatransfer.ChannelID ) ([]cid.Cid , error ) {
446+ sid := seenCidsSetID (chid , datatransfer .DataReceived )
447+ return r .seenCIDs .SetToArray (sid )
448+ }
449+
450+ func (r * receivedCidsReader ) Len (chid datatransfer.ChannelID ) (int , error ) {
451+ sid := seenCidsSetID (chid , datatransfer .DataReceived )
452+ return r .seenCIDs .SetLen (sid )
453+ }
454+
455+ var _ ReceivedCidsReader = (* receivedCidsReader )(nil )
0 commit comments