diff --git a/channels/channel_state.go b/channels/channel_state.go index efabd74..a1915c3 100644 --- a/channels/channel_state.go +++ b/channels/channel_state.go @@ -144,7 +144,7 @@ func (c channelState) InitiatorPaused() bool { } func (c channelState) ResponderPaused() bool { - return c.ic.ResponderPaused || c.ic.Status == datatransfer.Finalizing + return c.ic.ResponderPaused } func (c channelState) BothPaused() bool { @@ -158,6 +158,28 @@ func (c channelState) SelfPaused() bool { return c.ResponderPaused() } +func (c channelState) TransferClosed() bool { + return c.ic.TransferClosed +} + +func (c channelState) ExceededDataLimit() bool { + var limitFactor uint64 + if c.ic.SelfPeer == c.ic.Sender { + limitFactor = c.ic.Queued + } else { + limitFactor = c.ic.Received + } + return c.ic.DataLimit != 0 && limitFactor >= c.ic.DataLimit +} + +func (c channelState) AwaitingFinalization() bool { + return c.Status().InFinalization() && c.ic.RequiresFinalization +} + +func (c channelState) TransferOnHold() bool { + return c.SelfPaused() || c.AwaitingFinalization() || c.ExceededDataLimit() +} + // Stages returns the current ChannelStages object, or an empty object. // It is unsafe for the caller to modify the return value, and changes may not // be persisted. It should be treated as immutable. diff --git a/channels/channels.go b/channels/channels.go index bc00c42..44a9ddb 100644 --- a/channels/channels.go +++ b/channels/channels.go @@ -306,6 +306,11 @@ func (c *Channels) SetRequiresFinalization(chid datatransfer.ChannelID, Requires return c.send(chid, datatransfer.SetRequiresFinalization, RequiresFinalization) } +// CloseTransfer indicates the transfer is closed, even if the transfer was not finished +func (c *Channels) CloseTransfer(chid datatransfer.ChannelID) error { + return c.send(chid, datatransfer.CloseTransfer) +} + // HasChannel returns true if the given channel id is being tracked func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error) { return c.stateMachines.Has(chid) diff --git a/channels/channels_fsm.go b/channels/channels_fsm.go index 51dfc6d..5bcce12 100644 --- a/channels/channels_fsm.go +++ b/channels/channels_fsm.go @@ -46,6 +46,7 @@ var ChannelEvents = fsm.Events{ }), fsm.Event(datatransfer.Cancel).FromAny().To(datatransfer.Cancelling).Action(func(chst *internal.ChannelState) error { + chst.TransferClosed = true chst.AddLog("") return nil }), @@ -145,6 +146,7 @@ var ChannelEvents = fsm.Events{ fsm.Event(datatransfer.Error).FromAny().To(datatransfer.Failing).Action(func(chst *internal.ChannelState, err error) error { chst.Message = err.Error() + chst.TransferClosed = true chst.AddLog("data transfer erred: %s", chst.Message) return nil }), @@ -173,7 +175,7 @@ var ChannelEvents = fsm.Events{ // seems less than ideal. We need some kind of support for pausing being an independent aspect of state // Possibly we should just remove whether a state is paused from the state entirely. fsm.Event(datatransfer.PauseInitiator). - FromMany(datatransfer.Ongoing, datatransfer.Requested, datatransfer.Queued, datatransfer.AwaitingAcceptance).ToJustRecord(). + FromMany(datatransfer.Ongoing, datatransfer.Requested, datatransfer.Queued, datatransfer.AwaitingAcceptance, datatransfer.ResponderFinalizing, datatransfer.ResponderFinalizingTransferFinished).ToJustRecord(). Action(func(chst *internal.ChannelState) error { chst.InitiatorPaused = true chst.AddLog("") @@ -191,7 +193,6 @@ var ChannelEvents = fsm.Events{ fsm.Event(datatransfer.DataLimitExceeded). FromMany(datatransfer.Ongoing, datatransfer.Requested, datatransfer.Queued, datatransfer.AwaitingAcceptance, datatransfer.ResponderCompleted, datatransfer.ResponderFinalizing).ToJustRecord(). Action(func(chst *internal.ChannelState) error { - chst.ResponderPaused = true chst.AddLog("") return nil }), @@ -224,6 +225,7 @@ var ChannelEvents = fsm.Events{ // the finalization process and complete the transfer From(datatransfer.AwaitingAcceptance).To(datatransfer.Completing). Action(func(chst *internal.ChannelState) error { + chst.TransferClosed = true chst.AddLog("") return nil }), @@ -248,12 +250,20 @@ var ChannelEvents = fsm.Events{ }), fsm.Event(datatransfer.BeginFinalizing).FromAny().To(datatransfer.Finalizing).Action(func(chst *internal.ChannelState) error { + chst.TransferClosed = true + chst.AddLog("") + return nil + }), + + fsm.Event(datatransfer.CloseTransfer).FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error { + chst.TransferClosed = true chst.AddLog("") return nil }), // Both the local node and the remote peer have completed the transfer fsm.Event(datatransfer.Complete).FromAny().To(datatransfer.Completing).Action(func(chst *internal.ChannelState) error { + chst.TransferClosed = true chst.AddLog("") return nil }), diff --git a/channels/channels_test.go b/channels/channels_test.go index 3e26af1..d2303e2 100644 --- a/channels/channels_test.go +++ b/channels/channels_test.go @@ -196,7 +196,6 @@ func TestChannels(t *testing.T) { err = channelList.DataLimitExceeded(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1}) require.NoError(t, err) state = checkEvent(ctx, t, received, datatransfer.DataLimitExceeded) - require.True(t, state.ResponderPaused()) err = channelList.SetDataLimit(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1}, 700) require.NoError(t, err) @@ -205,7 +204,6 @@ func TestChannels(t *testing.T) { err = channelList.ResumeResponder(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1}) state = checkEvent(ctx, t, received, datatransfer.ResumeResponder) - require.False(t, state.ResponderPaused()) err = channelList.PauseInitiator(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1}) state = checkEvent(ctx, t, received, datatransfer.PauseInitiator) @@ -214,7 +212,7 @@ func TestChannels(t *testing.T) { err = channelList.DataLimitExceeded(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1}) require.NoError(t, err) state = checkEvent(ctx, t, received, datatransfer.DataLimitExceeded) - require.True(t, state.BothPaused()) + require.True(t, state.InitiatorPaused()) }) @@ -440,6 +438,15 @@ func TestMigrations(t *testing.T) { datatransfer.ResponderPaused, datatransfer.BothPaused, datatransfer.Ongoing, + datatransfer.TransferFinished, + datatransfer.ResponderFinalizingTransferFinished, + datatransfer.Finalizing, + datatransfer.Completed, + datatransfer.Completing, + datatransfer.Failing, + datatransfer.Failed, + datatransfer.Cancelling, + datatransfer.Cancelled, } for i := 0; i < numChannels; i++ { transferIDs[i] = datatransfer.TransferID(rand.Uint64()) @@ -515,8 +522,9 @@ func TestMigrations(t *testing.T) { datatransfer.Ongoing, } - expectedInitiatorPaused := []bool{false, true, false, true, false} - expectedResponderPaused := []bool{false, false, true, true, false} + expectedInitiatorPaused := []bool{false, true, false, true, false, false, false, false, false, false, false, false, false, false} + expectedResponderPaused := []bool{false, false, true, true, false, false, false, false, false, false, false, false, false, false} + expectedTransferClosed := []bool{false, false, false, false, false, true, true, true, true, true, true, true, true, true} for i := 0; i < numChannels; i++ { channel, err := channelList.GetByID(ctx, datatransfer.ChannelID{ @@ -540,10 +548,10 @@ func TestMigrations(t *testing.T) { require.Equal(t, expectedStatuses[i], channel.Status()) require.Equal(t, expectedInitiatorPaused[i], channel.InitiatorPaused()) require.Equal(t, expectedResponderPaused[i], channel.ResponderPaused()) + require.Equal(t, expectedTransferClosed[i], channel.TransferClosed()) require.Equal(t, basicnode.NewInt(sentIndex[i]), channel.SentIndex()) require.Equal(t, basicnode.NewInt(receivedIndex[i]), channel.ReceivedIndex()) require.Equal(t, basicnode.NewInt(queuedIndex[i]), channel.QueuedIndex()) - } } diff --git a/channels/internal/internalchannel.go b/channels/internal/internalchannel.go index 4209eda..6fd0097 100644 --- a/channels/internal/internalchannel.go +++ b/channels/internal/internalchannel.go @@ -121,6 +121,8 @@ type ChannelState struct { ResponderPaused bool // InitiatorPaused indicates whether the initiator is in a paused state InitiatorPaused bool + // TransferClosed indicates the transfer portion of the request is over + TransferClosed bool // Stages traces the execution fo a data transfer. // // EXPERIMENTAL; subject to change. diff --git a/channels/internal/internalchannel_cbor_gen.go b/channels/internal/internalchannel_cbor_gen.go index c7b1e95..5b88b47 100644 --- a/channels/internal/internalchannel_cbor_gen.go +++ b/channels/internal/internalchannel_cbor_gen.go @@ -23,7 +23,7 @@ func (t *ChannelState) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{184, 24}); err != nil { + if _, err := w.Write([]byte{184, 25}); err != nil { return err } @@ -457,6 +457,22 @@ func (t *ChannelState) MarshalCBOR(w io.Writer) error { return err } + // t.TransferClosed (bool) (bool) + if len("TransferClosed") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"TransferClosed\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("TransferClosed"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("TransferClosed")); err != nil { + return err + } + + if err := cbg.WriteBool(w, t.TransferClosed); err != nil { + return err + } + // t.Stages (datatransfer.ChannelStages) (struct) if len("Stages") > cbg.MaxLength { return xerrors.Errorf("Value in field \"Stages\" was too long") @@ -846,6 +862,24 @@ func (t *ChannelState) UnmarshalCBOR(r io.Reader) error { default: return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) } + // t.TransferClosed (bool) (bool) + case "TransferClosed": + + maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + if maj != cbg.MajOther { + return fmt.Errorf("booleans must be major type 7") + } + switch extra { + case 20: + t.TransferClosed = false + case 21: + t.TransferClosed = true + default: + return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) + } // t.Stages (datatransfer.ChannelStages) (struct) case "Stages": diff --git a/channels/internal/migrations/migrations.go b/channels/internal/migrations/migrations.go index 210dfc0..0a78db6 100644 --- a/channels/internal/migrations/migrations.go +++ b/channels/internal/migrations/migrations.go @@ -71,6 +71,18 @@ func NoOpChannelState0To2(oldChannelState *ChannelStateV2) (*ChannelStateV2, err return oldChannelState, nil } +var transferClosedStatuses = []datatransfer.Status{ + datatransfer.TransferFinished, + datatransfer.ResponderFinalizingTransferFinished, + datatransfer.Finalizing, + datatransfer.Completed, + datatransfer.Completing, + datatransfer.Failing, + datatransfer.Failed, + datatransfer.Cancelling, + datatransfer.Cancelled, +} + func MigrateChannelState2To3(oldChannelState *ChannelStateV2) (*internal.ChannelState, error) { receivedIndex := basicnode.NewInt(oldChannelState.ReceivedBlocksTotal) sentIndex := basicnode.NewInt(oldChannelState.SentBlocksTotal) @@ -82,6 +94,12 @@ func MigrateChannelState2To3(oldChannelState *ChannelStateV2) (*internal.Channel if newStatus == datatransfer.ResponderPaused || newStatus == datatransfer.InitiatorPaused || newStatus == datatransfer.BothPaused { newStatus = datatransfer.Ongoing } + var transferClosed bool + for _, status := range transferClosedStatuses { + if oldChannelState.Status == status { + transferClosed = true + } + } return &internal.ChannelState{ SelfPeer: oldChannelState.SelfPeer, TransferID: oldChannelState.TransferID, @@ -106,6 +124,7 @@ func MigrateChannelState2To3(oldChannelState *ChannelStateV2) (*internal.Channel RequiresFinalization: oldChannelState.RequiresFinalization, InitiatorPaused: initiatorPaused, ResponderPaused: responderPaused, + TransferClosed: transferClosed, Stages: oldChannelState.Stages, }, nil } diff --git a/events.go b/events.go index 1cc42be..82f0019 100644 --- a/events.go +++ b/events.go @@ -129,6 +129,9 @@ const ( // SendMessageError indicates an error sending a data transfer message SendMessageError + + // CloseTransfer is called indicates the transfer is complete, but not because it was finished + CloseTransfer ) // Events are human readable names for data transfer events @@ -169,6 +172,7 @@ var Events = map[EventCode]string{ DataLimitExceeded: "DataLimitExceeded", TransferInitiated: "TransferInitiated", SendMessageError: "SendMessageError", + CloseTransfer: "CloseTransfer", } // Event is a struct containing information about a data transfer event diff --git a/impl/events.go b/impl/events.go index af2085f..5c5f6d4 100644 --- a/impl/events.go +++ b/impl/events.go @@ -42,7 +42,11 @@ func (m *manager) processTransferEvent(ctx context.Context, chid datatransfer.Ch return err } msg := message.UpdateResponse(chid.ID, true) - return m.transport.SendMessage(ctx, chid, msg) + err := m.transport.SendMessage(ctx, chid, msg) + if err != nil { + return m.channels.SendMessageError(chid, err) + } + return nil case datatransfer.TransportTransferCancelled: log.Warnf("channel %+v was cancelled: %s", chid, evt.ErrorMessage) return m.channels.RequestCancelled(chid, errors.New(evt.ErrorMessage)) @@ -168,7 +172,7 @@ func (m *manager) channelCompleted(chid datatransfer.ChannelID, success bool, er // If the transferred errored on completion if !success { // send an error, but only if we haven't already errored/finished transfer already for some reason - if !chst.Status().TransferComplete() { + if !chst.TransferClosed() { err := fmt.Errorf("data transfer channel %s failed to transfer data: %s", chid, errorMessage) log.Warnf(err.Error()) return m.channels.Error(chid, err) diff --git a/impl/impl.go b/impl/impl.go index 156869b..1e56197 100644 --- a/impl/impl.go +++ b/impl/impl.go @@ -327,6 +327,7 @@ func (m *manager) updateValidationStatus(ctx context.Context, chid datatransfer. return err } + transferAlreadyClosed := chst.TransferClosed() // dispatch channel events and generate a response message err = m.processValidationUpdate(ctx, chst, result) if err != nil { @@ -341,6 +342,9 @@ func (m *manager) updateValidationStatus(ctx context.Context, chid datatransfer. response := message.ValidationResultResponse(messageType, chid.ID, result, err, result.LeaveRequestPaused(chst)) + if transferAlreadyClosed { + return m.transport.SendMessage(ctx, chid, response) + } // dispatch transport updates return m.transport.ChannelUpdated(ctx, chid, response) } @@ -374,29 +378,36 @@ func (m *manager) CloseDataTransferChannel(ctx context.Context, chid datatransfe } func (m *manager) closeChannel(ctx context.Context, chid datatransfer.ChannelID) error { - // Fire a cancel event - err := m.channels.Cancel(chid) - if err != nil { - return xerrors.Errorf("unable to send cancel to channel FSM: %w", err) + log.Infof("closing the channel imperatively") + // Close the channel on the local transport + if err := m.channels.CloseTransfer(chid); err != nil { + return xerrors.Errorf("unable to send close transfer message to FSM: %w", err) } - // Close the channel on the local transport - err = m.transport.ChannelUpdated(ctx, chid, nil) + if err := m.transport.ChannelUpdated(ctx, chid, nil); err != nil { + log.Warnf("unable to close channel %s: %s", chid, err) + } // Send a cancel message to the remote peer async go func() { sctx, cancel := context.WithTimeout(context.Background(), cancelSendTimeout) defer cancel() log.Infof("%s: sending cancel channel to %s for channel %s", m.peerID, m.otherPeer(chid), chid) - err = m.transport.SendMessage(sctx, chid, m.cancelMessage(chid)) - if err != nil { + + if err := m.transport.SendMessage(sctx, chid, m.cancelMessage(chid)); err != nil { err = fmt.Errorf("unable to send cancel message for channel %s to peer %s: %w", chid, m.peerID, err) log.Warn(err) } }() - return err + // Fire a cancel event + err := m.channels.Cancel(chid) + if err != nil { + return xerrors.Errorf("unable to send cancel to channel FSM: %w", err) + } + + return nil } // close an open channel and fire an error event @@ -420,15 +431,14 @@ func (m *manager) CloseDataTransferChannelWithError(ctx context.Context, chid da func (m *manager) closeChannelWithError(ctx context.Context, chid datatransfer.ChannelID, cherr error) error { - // Fire an error event - if err := m.channels.Error(chid, cherr); err != nil { - return xerrors.Errorf("unable to send error %s to channel FSM: %w", cherr, err) - } - // Close transfport and try to send a cancel message to the remote peer. // It's quite likely we aren't able to send the message to the peer because // the channel is already in an error state, which is probably because of // connection issues, so if we cant send the message just log a warning. + if err := m.channels.CloseTransfer(chid); err != nil { + return xerrors.Errorf("unable to send close transfer message to FSM: %w", err) + } + log.Infof("%s: sending cancel channel to %s for channel %s", m.peerID, m.otherPeer(chid), chid) if err := m.transport.ChannelUpdated(ctx, chid, m.cancelMessage(chid)); err != nil { @@ -437,6 +447,11 @@ func (m *manager) closeChannelWithError(ctx context.Context, chid datatransfer.C // by subsequent errors. log.Warnf("unable to close channel %s: %s", chid, err) } + // Fire an error event + if err := m.channels.Error(chid, cherr); err != nil { + return xerrors.Errorf("unable to send error %s to channel FSM: %w", cherr, err) + } + return nil } diff --git a/impl/initiating_test.go b/impl/initiating_test.go index a8da316..04bcc31 100644 --- a/impl/initiating_test.go +++ b/impl/initiating_test.go @@ -191,7 +191,7 @@ func TestDataTransferInitiating(t *testing.T) { }, }, "close push request": { - expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.Cancel, datatransfer.CleanupComplete}, + expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.CloseTransfer, datatransfer.Cancel, datatransfer.CleanupComplete}, verify: func(t *testing.T, h *harness) { channelID, err := h.dt.OpenPushDataChannel(h.ctx, h.peers[1], h.voucher, h.baseCid, h.stor) require.NoError(t, err) @@ -247,7 +247,7 @@ func TestDataTransferInitiating(t *testing.T) { }, }, "close pull request": { - expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.Cancel, datatransfer.CleanupComplete}, + expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.CloseTransfer, datatransfer.Cancel, datatransfer.CleanupComplete}, verify: func(t *testing.T, h *harness) { channelID, err := h.dt.OpenPullDataChannel(h.ctx, h.peers[1], h.voucher, h.baseCid, h.stor) require.NoError(t, err) diff --git a/itest/integration_test.go b/itest/integration_test.go index a98a67d..e6bb83d 100644 --- a/itest/integration_test.go +++ b/itest/integration_test.go @@ -725,6 +725,11 @@ func TestAutoRestart(t *testing.T) { // Watch for successful completion finished := make(chan struct{}, 2) var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) { + if channelState.ChannelID().Initiator == channelState.SelfPeer() { + t.Logf("Initiator Event: %s, Status: %s", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()]) + } else { + t.Logf("Responder Event: %s, Status: %s", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()]) + } if channelState.Status() == datatransfer.Completed { finished <- struct{}{} } @@ -1161,9 +1166,11 @@ func TestRoundTripCancelledRequest(t *testing.T) { case <-ctx.Done(): case <-timer.C: if data.isPull { - _ = dt1.CloseDataTransferChannel(ctx, chid) + err = dt1.CloseDataTransferChannel(ctx, chid) + require.NoError(t, err) } else { - _ = dt2.CloseDataTransferChannel(ctx, chid) + err = dt2.CloseDataTransferChannel(ctx, chid) + require.NoError(t, err) } } }() diff --git a/manager.go b/manager.go index b7a6f21..ce163c5 100644 --- a/manager.go +++ b/manager.go @@ -4,10 +4,13 @@ import ( "context" "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime/datamodel" "github.com/libp2p/go-libp2p-core/peer" ) +var log = logging.Logger("dt_graphsync") + // ValidationResult describes the result of validating a voucher type ValidationResult struct { // Accepted indicates whether the request was accepted. If a request is not @@ -55,6 +58,7 @@ func (vr ValidationResult) LeaveRequestPaused(chst ChannelState) bool { } else { limitFactor = chst.Received() } + log.Infof("datalimit %d, dataprocesed %d", vr.DataLimit, limitFactor) return vr.DataLimit != 0 && limitFactor >= vr.DataLimit } diff --git a/statuses.go b/statuses.go index 3b263de..e9ccfff 100644 --- a/statuses.go +++ b/statuses.go @@ -109,23 +109,6 @@ func (s Status) InFinalization() bool { return FinalizationStatuses.Contains(s) } -var TransferCompleteStates = statusList{ - TransferFinished, - ResponderFinalizingTransferFinished, - Finalizing, - Completed, - Completing, - Failing, - Failed, - Cancelling, - Cancelled, - ChannelNotFoundError, -} - -func (s Status) TransferComplete() bool { - return TransferCompleteStates.Contains(s) -} - var TransferringStates = statusList{ Ongoing, ResponderCompleted, diff --git a/testutil/mockchannelstate.go b/testutil/mockchannelstate.go index 92fc5f0..acbdd9b 100644 --- a/testutil/mockchannelstate.go +++ b/testutil/mockchannelstate.go @@ -10,60 +10,66 @@ import ( ) type MockChannelStateParams struct { - ReceivedIndex datamodel.Node - SentIndex datamodel.Node - QueuedIndex datamodel.Node - ChannelID datatransfer.ChannelID - Queued uint64 - Sent uint64 - Received uint64 - Complete bool - BaseCID cid.Cid - Selector ipld.Node - Voucher datatransfer.TypedVoucher - IsPull bool - Self peer.ID - DataLimit uint64 - InitiatorPaused bool - ResponderPaused bool + ReceivedIndex datamodel.Node + SentIndex datamodel.Node + QueuedIndex datamodel.Node + ChannelID datatransfer.ChannelID + Queued uint64 + Sent uint64 + Received uint64 + Complete bool + BaseCID cid.Cid + Selector ipld.Node + Voucher datatransfer.TypedVoucher + IsPull bool + Self peer.ID + DataLimit uint64 + InitiatorPaused bool + ResponderPaused bool + ExceededDataLimit bool + AwaitingFinalization bool } func NewMockChannelState(params MockChannelStateParams) *MockChannelState { return &MockChannelState{ - receivedIndex: params.ReceivedIndex, - sentIndex: params.SentIndex, - queuedIndex: params.QueuedIndex, - dataLimit: params.DataLimit, - chid: params.ChannelID, - queued: params.Queued, - sent: params.Sent, - received: params.Received, - complete: params.Complete, - isPull: params.IsPull, - self: params.Self, - baseCID: params.BaseCID, - initiatorPaused: params.InitiatorPaused, - responderPaused: params.ResponderPaused, + receivedIndex: params.ReceivedIndex, + sentIndex: params.SentIndex, + queuedIndex: params.QueuedIndex, + dataLimit: params.DataLimit, + chid: params.ChannelID, + queued: params.Queued, + sent: params.Sent, + received: params.Received, + complete: params.Complete, + isPull: params.IsPull, + self: params.Self, + baseCID: params.BaseCID, + initiatorPaused: params.InitiatorPaused, + responderPaused: params.ResponderPaused, + exceededDataLimit: params.ExceededDataLimit, + awaitingFinalization: params.AwaitingFinalization, } } type MockChannelState struct { - receivedIndex datamodel.Node - sentIndex datamodel.Node - queuedIndex datamodel.Node - dataLimit uint64 - chid datatransfer.ChannelID - queued uint64 - sent uint64 - received uint64 - complete bool - isPull bool - baseCID cid.Cid - selector ipld.Node - voucher datatransfer.TypedVoucher - self peer.ID - initiatorPaused bool - responderPaused bool + receivedIndex datamodel.Node + sentIndex datamodel.Node + queuedIndex datamodel.Node + dataLimit uint64 + chid datatransfer.ChannelID + queued uint64 + sent uint64 + received uint64 + complete bool + isPull bool + baseCID cid.Cid + selector ipld.Node + voucher datatransfer.TypedVoucher + self peer.ID + initiatorPaused bool + responderPaused bool + exceededDataLimit bool + awaitingFinalization bool } var _ datatransfer.ChannelState = (*MockChannelState)(nil) @@ -250,3 +256,27 @@ func (m *MockChannelState) SelfPaused() bool { } return m.responderPaused } + +func (m *MockChannelState) TransferClosed() bool { + return false +} + +func (m *MockChannelState) SetExceededDataLimit(exceededDataLimit bool) { + m.exceededDataLimit = exceededDataLimit +} + +func (m *MockChannelState) ExceededDataLimit() bool { + return m.exceededDataLimit +} + +func (m *MockChannelState) SetAwaitingFinalization(awaitingFinalization bool) { + m.awaitingFinalization = awaitingFinalization +} + +func (m *MockChannelState) AwaitingFinalization() bool { + return m.awaitingFinalization +} + +func (m *MockChannelState) TransferOnHold() bool { + return m.SelfPaused() || m.AwaitingFinalization() || m.ExceededDataLimit() +} diff --git a/transport/graphsync/dtchannel/dtchannel.go b/transport/graphsync/dtchannel/dtchannel.go index 81a9944..441421a 100644 --- a/transport/graphsync/dtchannel/dtchannel.go +++ b/transport/graphsync/dtchannel/dtchannel.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "runtime/debug" "sync" "time" @@ -198,6 +199,7 @@ func (c *Channel) GsDataRequestRcvd(sender peer.ID, requestID graphsync.RequestI } action := c.actionFromChannelState(chst) + log.Infof(string(action)) switch action { case Pause: c.state = channelPaused @@ -362,7 +364,12 @@ func (c *Channel) ActionFromChannelState(chst datatransfer.ChannelState) Action func (c *Channel) actionFromChannelState(chst datatransfer.ChannelState) Action { // if the state is closed, and we haven't closed, we need to close - if !c.requesterCancelled && c.state != channelClosed && chst.Status().TransferComplete() { + if !c.requesterCancelled && c.state != channelClosed && chst.TransferClosed() { + if chst.ChannelID().Initiator == chst.SelfPeer() { + log.Infof("closing initiator") + } else { + log.Infof("closing responder") + } return Close } @@ -486,6 +493,7 @@ func (c *Channel) Cleanup() { } func (c *Channel) Close(ctx context.Context) error { + debug.PrintStack() // Cancel the graphsync request c.lk.Lock() errch := c.cancel(ctx) diff --git a/transport/graphsync/graphsync.go b/transport/graphsync/graphsync.go index 41d1e08..e204cc3 100644 --- a/transport/graphsync/graphsync.go +++ b/transport/graphsync/graphsync.go @@ -282,14 +282,14 @@ func (t *Transport) CleanupChannel(chid datatransfer.ChannelID) { t.dtChannelsLk.Unlock() - // Clean up mapping from gs key to channel ID - t.requestIDToChannelID.deleteRefs(chid) - // Clean up the channel if ok { ch.Cleanup() } + // Clean up mapping from gs key to channel ID + t.requestIDToChannelID.deleteRefs(chid) + t.dtNet.Unprotect(t.otherPeer(chid), chid.String()) } diff --git a/transport/graphsync/receiver.go b/transport/graphsync/receiver.go index 2391fe1..e1745be 100644 --- a/transport/graphsync/receiver.go +++ b/transport/graphsync/receiver.go @@ -49,6 +49,7 @@ func (r *receiver) receiveRequest(ctx context.Context, initiator peer.ID, incomi if isNewOrRestart && incoming.IsPull() { return datatransfer.ErrUnsupported } + log.Debugf("received request for channel: %s", chid) response, receiveErr := r.transport.events.OnRequestReceived(chid, incoming) initiateGraphsyncRequest := isNewOrRestart && response != nil && receiveErr == nil ch, err := r.transport.getDTChannel(chid) diff --git a/types.go b/types.go index e97c803..1ad2c79 100644 --- a/types.go +++ b/types.go @@ -146,10 +146,10 @@ type ChannelState interface { // be left open for a final settlement RequiresFinalization() bool - // InitiatorPaused indicates whether the initiator of this channel is in a paused state + // InitiatorPaused indicates whether the initiator of this channel is in manually paused state InitiatorPaused() bool - // ResponderPaused indicates whether the responder of this channel is in a paused state + // ResponderPaused indicates whether the responder of this channel is in manually paused state ResponderPaused() bool // BothPaused indicates both sides of the transfer have paused the transfer @@ -158,6 +158,19 @@ type ChannelState interface { // SelfPaused indicates whether the local peer for this channel is in a paused state SelfPaused() bool + // ExceededDataLimit indicates whether the responder in the transfer is over its data limit + ExceededDataLimit() bool + + // AwaitingFinalization indicates whether a transfer is waiting to be finalized + AwaitingFinalization() bool + + // OnHold indicates whether the local peer is on hold from proceeding with the transfer, + // either cause they are manually paused, exceeded data limit or are awaiting finalization + TransferOnHold() bool + + // TransferClosed indicates whether the transfer has finished + TransferClosed() bool + // Stages returns the timeline of events this data transfer has gone through, // for observability purposes. //