Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,26 @@
# go-data-transfer changelog

# go-data-transfer 1.5.0

Support the data transfer being restarted.

- github.com/filecoin-project/go-data-transfer:
- Add isRestart param to validators (#197) ([filecoin-project/go-data-transfer#197](https://github.com/filecoin-project/go-data-transfer/pull/197))
- fix: flaky TestChannelMonitorAutoRestart (#198) ([filecoin-project/go-data-transfer#198](https://github.com/filecoin-project/go-data-transfer/pull/198))
- Channel monitor watches for errors instead of measuring data rate (#190) ([filecoin-project/go-data-transfer#190](https://github.com/filecoin-project/go-data-transfer/pull/190))
- fix: prevent concurrent restarts for same channel (#195) ([filecoin-project/go-data-transfer#195](https://github.com/filecoin-project/go-data-transfer/pull/195))
- fix: channel state machine event handling (#194) ([filecoin-project/go-data-transfer#194](https://github.com/filecoin-project/go-data-transfer/pull/194))
- Dont double count data sent (#185) ([filecoin-project/go-data-transfer#185](https://github.com/filecoin-project/go-data-transfer/pull/185))
- github.com/ipfs/go-graphsync (v0.6.0 -> v0.6.1):
- feat: fire network error when network disconnects during request (#164) ([ipfs/go-graphsync#164](https://github.com/ipfs/go-graphsync/pull/164))

Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| dirkmc | 8 | +1235/-868 | 37 |
| Dirk McCormick | 1 | +11/-0 | 1 |

# go-data-transfer 1.4.3

- github.com/filecoin-project/go-data-transfer:
Expand Down
17 changes: 14 additions & 3 deletions channelmonitor/channelmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,12 @@ func (m *Monitor) addChannel(chid datatransfer.ChannelID, isPush bool) *monitore
}

m.lk.Lock()
defer m.lk.Unlock()
log.Debugf("aquired lock to create channel monitor for channelID=%s", chid)
defer func() {
log.Debugf("will release channel monitor lock for channelID=%s", chid)
m.lk.Unlock()
log.Debugf("released channel monitor lock for channelID=%s", chid)
}()

// Check if there is already a monitor for this channel
if _, ok := m.channels[chid]; ok {
Expand All @@ -118,8 +123,10 @@ func (m *Monitor) addChannel(chid datatransfer.ChannelID, isPush bool) *monitore
return nil
}

log.Debugf("will create channel monitor for channelID=%s", chid)
mpc := newMonitoredChannel(m.ctx, m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown)
m.channels[chid] = mpc
log.Debugf("created channel monitor for channelID=%s", chid)
return mpc
}

Expand Down Expand Up @@ -229,6 +236,8 @@ func (mc *monitoredChannel) start() {
// Watch to make sure the responder accepts the channel in time
cancelAcceptTimer := mc.watchForResponderAccept()

log.Debugf("finished creating timer for accept messages, channelID=%s", mc.chid)

// Watch for data-transfer channel events
mc.unsub = mc.mgr.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
if channelState.ChannelID() != mc.chid {
Expand Down Expand Up @@ -308,7 +317,7 @@ func (mc *monitoredChannel) watchForResponderComplete() {
case <-timer.C:
// Timer expired before we received a Complete message from the responder
err := xerrors.Errorf("%s: timed out waiting %s for Complete message from remote peer",
mc.chid, mc.cfg.AcceptTimeout)
mc.chid, mc.cfg.CompleteTimeout)
mc.closeChannelAndShutdown(err)
}
}
Expand Down Expand Up @@ -438,12 +447,14 @@ func (mc *monitoredChannel) sendRestartMessage(restartCount int) error {
log.Infof("%s: re-established connection to %s in %s", mc.chid, p, time.Since(start))

// Send a restart message for the channel
restartResult := mc.waitForRestartResponse()
log.Infof("%s: sending restart message to %s (%d consecutive restarts)", mc.chid, p, restartCount)
err = mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid)
if err != nil {
return xerrors.Errorf("%s: failed to send restart message to %s: %w", mc.chid, p, err)
}
log.Infof("%s: sent restart message to %s (%d consecutive restarts)", mc.chid, p, restartCount)

restartResult := mc.waitForRestartResponse()

// The restart message is fire and forget, so we need to watch for a
// restart response to know that the restart message reached the peer.
Expand Down
20 changes: 20 additions & 0 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,29 +161,40 @@ func (m *manager) OnRequestReceived(chid datatransfer.ChannelID, request datatra
}

func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datatransfer.Response) error {
log.Infof("channel %s: received response %+v from provider", chid, response)

if response.IsCancel() {
log.Infof("channel %s: received cancel response, cancelling channel", chid)
return m.channels.Cancel(chid)
}

if response.IsVoucherResult() {
log.Infof("channel %s: received response %+v from provider is a voucher result", chid, response)
if !response.EmptyVoucherResult() {
log.Debugf("channel %s: processing non-empty voucher result", chid)
vresult, err := m.decodeVoucherResult(response)
if err != nil {
log.Errorf("channel %s:, failed to decode voucher result, err=%s", chid, err)
return err
}
log.Infof("channel %s: received voucher response %+v", chid, vresult)
err = m.channels.NewVoucherResult(chid, vresult)
if err != nil {
log.Errorf("channel %s: failed NewVoucherResult, err=%s ", chid, err)
return err
}
}

if !response.Accepted() {
log.Infof("channel %s: received rejected response, erroring out channel", chid)
return m.channels.Error(chid, datatransfer.ErrRejected)
}

if response.IsNew() {
log.Infof("channel %s: received new response, accepting channel", chid)
err := m.channels.Accept(chid)
if err != nil {
log.Errorf("channel %s: failed to accept new response, err=%s", chid, err)
return err
}
}
Expand All @@ -196,16 +207,21 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datat
}
}
}

if response.IsComplete() && response.Accepted() {
if !response.IsPaused() {
log.Infof("channel %s: received complete response, completing channel", chid)
return m.channels.ResponderCompletes(chid)
}

log.Infof("channel %s: received complete response but responder is paused", chid)

err := m.channels.ResponderBeginsFinalization(chid)
if err != nil {
return nil
}
}

if response.IsPaused() {
return m.pauseOther(chid)
}
Expand Down Expand Up @@ -432,6 +448,10 @@ func (m *manager) validateVoucher(
}

result, err := validatorFunc(isRestart, sender, vouch, baseCid, stor)
if isPull {
log.Infof("ValidatePull, result=%s, err=%s", result, err)
}

return vouch, result, err
}

Expand Down
27 changes: 12 additions & 15 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,20 +204,18 @@ func (m *manager) OpenPushDataChannel(ctx context.Context, requestTo peer.ID, vo
transportConfigurer(chid, voucher, m.transport)
}
m.dataTransferNetwork.Protect(requestTo, chid.String())
monitoredChan := m.channelMonitor.AddPushChannel(chid)

if err := m.dataTransferNetwork.SendMessage(ctx, requestTo, req); err != nil {
err = fmt.Errorf("Unable to send request: %w", err)
_ = m.channels.Error(chid, err)

// If push channel monitoring is enabled, shutdown the monitor as it
// wasn't possible to start the data transfer
if monitoredChan != nil {
monitoredChan.Shutdown()
}

return chid, err
}

log.Debugf("sent push request message, channelID=%s", chid)

m.channelMonitor.AddPushChannel(chid)
log.Infof("started new channel monitor for push request, channelID=%s", chid)

return chid, nil
}

Expand All @@ -242,19 +240,18 @@ func (m *manager) OpenPullDataChannel(ctx context.Context, requestTo peer.ID, vo
transportConfigurer(chid, voucher, m.transport)
}
m.dataTransferNetwork.Protect(requestTo, chid.String())
monitoredChan := m.channelMonitor.AddPullChannel(chid)

if err := m.transport.OpenChannel(ctx, requestTo, chid, cidlink.Link{Cid: baseCid}, selector, nil, req); err != nil {
err = fmt.Errorf("Unable to send request: %w", err)
_ = m.channels.Error(chid, err)

// If pull channel monitoring is enabled, shutdown the monitor as it
// wasn't possible to start the data transfer
if monitoredChan != nil {
monitoredChan.Shutdown()
}

return chid, err
}

log.Debugf("sent pull channel request channelID=%s", chid)
m.channelMonitor.AddPullChannel(chid)
log.Infof("started new channel monitor for pull request: channelID=%s", chid)

return chid, nil
}

Expand Down
2 changes: 2 additions & 0 deletions network/libp2p_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,13 @@ func (dtnet *libp2pDataTransferNetwork) SendMessage(
ctx context.Context,
p peer.ID,
outgoing datatransfer.Message) error {
log.Debugf("opening stream to peer %s to send message %+v", p, outgoing)

s, err := dtnet.openStream(ctx, p, dtnet.dtProtocols...)
if err != nil {
return err
}
log.Debugf("finished opening stream to peer %s to send message %+v", p, outgoing)

outgoing, err = outgoing.MessageForProtocol(s.Protocol())
if err != nil {
Expand Down
12 changes: 11 additions & 1 deletion transport/graphsync/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,9 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook
// when a DT request comes in on graphsync, it's a pull
chid = datatransfer.ChannelID{ID: msg.TransferID(), Initiator: p, Responder: t.peerID}
request := msg.(datatransfer.Request)
log.Debugf("will validate received gs request, chid=%s, request=%+v", chid, request)
responseMessage, err = t.events.OnRequestReceived(chid, request)
log.Debugf("will send response message %+v for request gs chid=%s, error/pause/resume value=%s", responseMessage, chid, err)
} else {
// when a DT response comes in on graphsync, it's a push
chid = datatransfer.ChannelID{ID: msg.TransferID(), Initiator: t.peerID, Responder: p}
Expand All @@ -604,15 +606,18 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook
extensions, extensionErr := extension.ToExtensionData(responseMessage, t.supportedExtensions)
if extensionErr != nil {
hookActions.TerminateWithError(err)
log.Errorf("terminated client gs request chid=%s with extension err=%s", chid, err)
return
}
for _, extension := range extensions {
log.Debugf("queued up extension %+v for response, gs chid=%s", extension, chid)
hookActions.SendExtensionData(extension)
}
}

if err != nil && err != datatransfer.ErrPause {
hookActions.TerminateWithError(err)
log.Errorf("terminated client gs request chid=%s with err=%s", chid, err)
return
}

Expand All @@ -632,6 +637,7 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook
hasXferStarted, isRestart := t.channelXferStarted[chid]
if isRestart && !hasXferStarted && !paused {
paused = true
log.Debugf("pausing responder for request gs chid=%s, even though validator sent no-op as it's a restart req", chid)
hookActions.PauseResponse()
}
t.channelXferStarted[chid] = !paused
Expand Down Expand Up @@ -820,7 +826,11 @@ func (t *Transport) processExtension(chid datatransfer.ChannelID, gsMsg extensio
}

dtResponse := msg.(datatransfer.Response)
return nil, t.events.OnResponseReceived(chid, dtResponse)
err = t.events.OnResponseReceived(chid, dtResponse)
if err != nil {
log.Errorf("error receieved from OnResponseReceived is %s", err)
}
return nil, err
}

func (t *Transport) gsRequestorCancelledListener(p peer.ID, request graphsync.RequestData) {
Expand Down