Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion channels/channel_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (c channelState) InitiatorPaused() bool {
}

func (c channelState) ResponderPaused() bool {
return c.ic.ResponderPaused || c.ic.Status == datatransfer.Finalizing
return c.ic.ResponderPaused
}

func (c channelState) BothPaused() bool {
Expand All @@ -158,6 +158,28 @@ func (c channelState) SelfPaused() bool {
return c.ResponderPaused()
}

func (c channelState) TransferClosed() bool {
return c.ic.TransferClosed
}

func (c channelState) ExceededDataLimit() bool {
var limitFactor uint64
if c.ic.SelfPeer == c.ic.Sender {
limitFactor = c.ic.Queued
} else {
limitFactor = c.ic.Received
}
return c.ic.DataLimit != 0 && limitFactor >= c.ic.DataLimit
}

func (c channelState) AwaitingFinalization() bool {
return c.Status().InFinalization() && c.ic.RequiresFinalization
}

func (c channelState) TransferOnHold() bool {
return c.SelfPaused() || c.AwaitingFinalization() || c.ExceededDataLimit()
}

// Stages returns the current ChannelStages object, or an empty object.
// It is unsafe for the caller to modify the return value, and changes may not
// be persisted. It should be treated as immutable.
Expand Down
5 changes: 5 additions & 0 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ func (c *Channels) SetRequiresFinalization(chid datatransfer.ChannelID, Requires
return c.send(chid, datatransfer.SetRequiresFinalization, RequiresFinalization)
}

// CloseTransfer indicates the transfer is closed, even if the transfer was not finished
func (c *Channels) CloseTransfer(chid datatransfer.ChannelID) error {
return c.send(chid, datatransfer.CloseTransfer)
}

// HasChannel returns true if the given channel id is being tracked
func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error) {
return c.stateMachines.Has(chid)
Expand Down
14 changes: 12 additions & 2 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ var ChannelEvents = fsm.Events{
}),

fsm.Event(datatransfer.Cancel).FromAny().To(datatransfer.Cancelling).Action(func(chst *internal.ChannelState) error {
chst.TransferClosed = true
chst.AddLog("")
return nil
}),
Expand Down Expand Up @@ -145,6 +146,7 @@ var ChannelEvents = fsm.Events{

fsm.Event(datatransfer.Error).FromAny().To(datatransfer.Failing).Action(func(chst *internal.ChannelState, err error) error {
chst.Message = err.Error()
chst.TransferClosed = true
chst.AddLog("data transfer erred: %s", chst.Message)
return nil
}),
Expand Down Expand Up @@ -173,7 +175,7 @@ var ChannelEvents = fsm.Events{
// seems less than ideal. We need some kind of support for pausing being an independent aspect of state
// Possibly we should just remove whether a state is paused from the state entirely.
fsm.Event(datatransfer.PauseInitiator).
FromMany(datatransfer.Ongoing, datatransfer.Requested, datatransfer.Queued, datatransfer.AwaitingAcceptance).ToJustRecord().
FromMany(datatransfer.Ongoing, datatransfer.Requested, datatransfer.Queued, datatransfer.AwaitingAcceptance, datatransfer.ResponderFinalizing, datatransfer.ResponderFinalizingTransferFinished).ToJustRecord().
Action(func(chst *internal.ChannelState) error {
chst.InitiatorPaused = true
chst.AddLog("")
Expand All @@ -191,7 +193,6 @@ var ChannelEvents = fsm.Events{
fsm.Event(datatransfer.DataLimitExceeded).
FromMany(datatransfer.Ongoing, datatransfer.Requested, datatransfer.Queued, datatransfer.AwaitingAcceptance, datatransfer.ResponderCompleted, datatransfer.ResponderFinalizing).ToJustRecord().
Action(func(chst *internal.ChannelState) error {
chst.ResponderPaused = true
chst.AddLog("")
return nil
}),
Expand Down Expand Up @@ -224,6 +225,7 @@ var ChannelEvents = fsm.Events{
// the finalization process and complete the transfer
From(datatransfer.AwaitingAcceptance).To(datatransfer.Completing).
Action(func(chst *internal.ChannelState) error {
chst.TransferClosed = true
chst.AddLog("")
return nil
}),
Expand All @@ -248,12 +250,20 @@ var ChannelEvents = fsm.Events{
}),

fsm.Event(datatransfer.BeginFinalizing).FromAny().To(datatransfer.Finalizing).Action(func(chst *internal.ChannelState) error {
chst.TransferClosed = true
chst.AddLog("")
return nil
}),

fsm.Event(datatransfer.CloseTransfer).FromAny().ToJustRecord().Action(func(chst *internal.ChannelState) error {
chst.TransferClosed = true
chst.AddLog("")
return nil
}),

// Both the local node and the remote peer have completed the transfer
fsm.Event(datatransfer.Complete).FromAny().To(datatransfer.Completing).Action(func(chst *internal.ChannelState) error {
chst.TransferClosed = true
chst.AddLog("")
return nil
}),
Expand Down
20 changes: 14 additions & 6 deletions channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ func TestChannels(t *testing.T) {
err = channelList.DataLimitExceeded(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1})
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.DataLimitExceeded)
require.True(t, state.ResponderPaused())

err = channelList.SetDataLimit(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1}, 700)
require.NoError(t, err)
Expand All @@ -205,7 +204,6 @@ func TestChannels(t *testing.T) {

err = channelList.ResumeResponder(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1})
state = checkEvent(ctx, t, received, datatransfer.ResumeResponder)
require.False(t, state.ResponderPaused())

err = channelList.PauseInitiator(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1})
state = checkEvent(ctx, t, received, datatransfer.PauseInitiator)
Expand All @@ -214,7 +212,7 @@ func TestChannels(t *testing.T) {
err = channelList.DataLimitExceeded(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1})
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.DataLimitExceeded)
require.True(t, state.BothPaused())
require.True(t, state.InitiatorPaused())

})

Expand Down Expand Up @@ -440,6 +438,15 @@ func TestMigrations(t *testing.T) {
datatransfer.ResponderPaused,
datatransfer.BothPaused,
datatransfer.Ongoing,
datatransfer.TransferFinished,
datatransfer.ResponderFinalizingTransferFinished,
datatransfer.Finalizing,
datatransfer.Completed,
datatransfer.Completing,
datatransfer.Failing,
datatransfer.Failed,
datatransfer.Cancelling,
datatransfer.Cancelled,
}
for i := 0; i < numChannels; i++ {
transferIDs[i] = datatransfer.TransferID(rand.Uint64())
Expand Down Expand Up @@ -515,8 +522,9 @@ func TestMigrations(t *testing.T) {
datatransfer.Ongoing,
}

expectedInitiatorPaused := []bool{false, true, false, true, false}
expectedResponderPaused := []bool{false, false, true, true, false}
expectedInitiatorPaused := []bool{false, true, false, true, false, false, false, false, false, false, false, false, false, false}
expectedResponderPaused := []bool{false, false, true, true, false, false, false, false, false, false, false, false, false, false}
expectedTransferClosed := []bool{false, false, false, false, false, true, true, true, true, true, true, true, true, true}
for i := 0; i < numChannels; i++ {

channel, err := channelList.GetByID(ctx, datatransfer.ChannelID{
Expand All @@ -540,10 +548,10 @@ func TestMigrations(t *testing.T) {
require.Equal(t, expectedStatuses[i], channel.Status())
require.Equal(t, expectedInitiatorPaused[i], channel.InitiatorPaused())
require.Equal(t, expectedResponderPaused[i], channel.ResponderPaused())
require.Equal(t, expectedTransferClosed[i], channel.TransferClosed())
require.Equal(t, basicnode.NewInt(sentIndex[i]), channel.SentIndex())
require.Equal(t, basicnode.NewInt(receivedIndex[i]), channel.ReceivedIndex())
require.Equal(t, basicnode.NewInt(queuedIndex[i]), channel.QueuedIndex())

}
}

Expand Down
2 changes: 2 additions & 0 deletions channels/internal/internalchannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ type ChannelState struct {
ResponderPaused bool
// InitiatorPaused indicates whether the initiator is in a paused state
InitiatorPaused bool
// TransferClosed indicates the transfer portion of the request is over
TransferClosed bool
// Stages traces the execution fo a data transfer.
//
// EXPERIMENTAL; subject to change.
Expand Down
36 changes: 35 additions & 1 deletion channels/internal/internalchannel_cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions channels/internal/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ func NoOpChannelState0To2(oldChannelState *ChannelStateV2) (*ChannelStateV2, err
return oldChannelState, nil
}

var transferClosedStatuses = []datatransfer.Status{
datatransfer.TransferFinished,
datatransfer.ResponderFinalizingTransferFinished,
datatransfer.Finalizing,
datatransfer.Completed,
datatransfer.Completing,
datatransfer.Failing,
datatransfer.Failed,
datatransfer.Cancelling,
datatransfer.Cancelled,
}

func MigrateChannelState2To3(oldChannelState *ChannelStateV2) (*internal.ChannelState, error) {
receivedIndex := basicnode.NewInt(oldChannelState.ReceivedBlocksTotal)
sentIndex := basicnode.NewInt(oldChannelState.SentBlocksTotal)
Expand All @@ -82,6 +94,12 @@ func MigrateChannelState2To3(oldChannelState *ChannelStateV2) (*internal.Channel
if newStatus == datatransfer.ResponderPaused || newStatus == datatransfer.InitiatorPaused || newStatus == datatransfer.BothPaused {
newStatus = datatransfer.Ongoing
}
var transferClosed bool
for _, status := range transferClosedStatuses {
if oldChannelState.Status == status {
transferClosed = true
}
}
return &internal.ChannelState{
SelfPeer: oldChannelState.SelfPeer,
TransferID: oldChannelState.TransferID,
Expand All @@ -106,6 +124,7 @@ func MigrateChannelState2To3(oldChannelState *ChannelStateV2) (*internal.Channel
RequiresFinalization: oldChannelState.RequiresFinalization,
InitiatorPaused: initiatorPaused,
ResponderPaused: responderPaused,
TransferClosed: transferClosed,
Stages: oldChannelState.Stages,
}, nil
}
Expand Down
4 changes: 4 additions & 0 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ const (

// SendMessageError indicates an error sending a data transfer message
SendMessageError

// CloseTransfer is called indicates the transfer is complete, but not because it was finished
CloseTransfer
)

// Events are human readable names for data transfer events
Expand Down Expand Up @@ -169,6 +172,7 @@ var Events = map[EventCode]string{
DataLimitExceeded: "DataLimitExceeded",
TransferInitiated: "TransferInitiated",
SendMessageError: "SendMessageError",
CloseTransfer: "CloseTransfer",
}

// Event is a struct containing information about a data transfer event
Expand Down
8 changes: 6 additions & 2 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ func (m *manager) processTransferEvent(ctx context.Context, chid datatransfer.Ch
return err
}
msg := message.UpdateResponse(chid.ID, true)
return m.transport.SendMessage(ctx, chid, msg)
err := m.transport.SendMessage(ctx, chid, msg)
if err != nil {
return m.channels.SendMessageError(chid, err)
}
return nil
case datatransfer.TransportTransferCancelled:
log.Warnf("channel %+v was cancelled: %s", chid, evt.ErrorMessage)
return m.channels.RequestCancelled(chid, errors.New(evt.ErrorMessage))
Expand Down Expand Up @@ -168,7 +172,7 @@ func (m *manager) channelCompleted(chid datatransfer.ChannelID, success bool, er
// If the transferred errored on completion
if !success {
// send an error, but only if we haven't already errored/finished transfer already for some reason
if !chst.Status().TransferComplete() {
if !chst.TransferClosed() {
err := fmt.Errorf("data transfer channel %s failed to transfer data: %s", chid, errorMessage)
log.Warnf(err.Error())
return m.channels.Error(chid, err)
Expand Down
Loading