Skip to content

Commit f83cc47

Browse files
committed
tapfreighter: refactor into new method ChainPorter.resumePendingParcels
This commit moves code into a new `ChainPorter` method called `resumePendingParcels`.
1 parent 940b6ee commit f83cc47

File tree

1 file changed

+30
-24
lines changed

1 file changed

+30
-24
lines changed

tapfreighter/chain_porter.go

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -152,35 +152,41 @@ func (p *ChainPorter) Start() error {
152152
p.Wg.Add(1)
153153
go p.mainEventLoop()
154154

155-
// Identify any pending parcels that need to be resumed and add
156-
// them to the outboundParcels channel so they can be processed
157-
// by the main porter goroutine.
158-
ctx, cancel := p.WithCtxQuit()
159-
defer cancel()
160-
outboundParcels, err := p.cfg.ExportLog.PendingParcels(ctx)
161-
if err != nil {
162-
startErr = err
163-
return
164-
}
165-
166-
// We resume delivery using the normal parcel delivery mechanism
167-
// by converting the outbound parcels into pending parcels.
168-
for idx := range outboundParcels {
169-
outboundParcel := outboundParcels[idx]
170-
log.Infof("Attempting to resume delivery for "+
171-
"anchor_txid=%v",
172-
outboundParcel.AnchorTx.TxHash().String())
173-
174-
// At this point the asset porter should be running.
175-
// It should therefore pick up the pending parcels from
176-
// the channel and attempt to deliver them.
177-
p.outboundParcels <- NewPendingParcel(outboundParcel)
178-
}
155+
startErr = p.resumePendingParcels()
179156
})
180157

181158
return startErr
182159
}
183160

161+
// resumePendingParcels attempts to resume delivery for any pending parcels that
162+
// were previously interrupted. This is done by querying the export log for any
163+
// pending parcels and adding them to the outboundParcels channel so they can be
164+
// processed by the main porter goroutine.
165+
func (p *ChainPorter) resumePendingParcels() error {
166+
ctx, cancel := p.WithCtxQuit()
167+
defer cancel()
168+
169+
outboundParcels, err := p.cfg.ExportLog.PendingParcels(ctx)
170+
if err != nil {
171+
return err
172+
}
173+
174+
// We resume delivery using the normal parcel delivery mechanism by
175+
// converting the outbound parcels into pending parcels.
176+
for idx := range outboundParcels {
177+
outboundParcel := outboundParcels[idx]
178+
log.Infof("Attempting to resume delivery for anchor_txid=%v",
179+
outboundParcel.AnchorTx.TxHash().String())
180+
181+
// At this point the asset porter should be running. It should
182+
// therefore pick up the pending parcels from the channel and
183+
// attempt to deliver them.
184+
p.outboundParcels <- NewPendingParcel(outboundParcel)
185+
}
186+
187+
return nil
188+
}
189+
184190
// Stop signals that the chain porter should gracefully stop.
185191
func (p *ChainPorter) Stop() error {
186192
var stopErr error

0 commit comments

Comments
 (0)