Skip to content

Commit 98a872e

Browse files
committed
sweepbatcher: re-add sweeps after fully confirmed
In case of a reorg sweeps should not go to another batch but stay in the current batch until it is fully confirmed. Only after that the remaining sweeps are re-added to another batch. Field sweep.completed is now set to true only for fully-confirmed sweeps.
1 parent d029c71 commit 98a872e

File tree

5 files changed

+234
-143
lines changed

5 files changed

+234
-143
lines changed

sweepbatcher/store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ type dbSweep struct {
248248
// Amount is the amount of the sweep.
249249
Amount btcutil.Amount
250250

251-
// Completed indicates whether this sweep is completed.
251+
// Completed indicates whether this sweep is fully-confirmed.
252252
Completed bool
253253
}
254254

sweepbatcher/sweep_batch.go

Lines changed: 128 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -1939,7 +1939,6 @@ func getFeePortionPaidBySweep(spendTx *wire.MsgTx, feePortionPerSweep,
19391939
func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
19401940
var (
19411941
txHash = spendTx.TxHash()
1942-
purgeList = make([]SweepRequest, 0, len(b.sweeps))
19431942
notifyList = make([]sweep, 0, len(b.sweeps))
19441943
)
19451944
b.batchTxid = &txHash
@@ -1949,7 +1948,106 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
19491948
b.Warnf("transaction %v has no outputs", txHash)
19501949
}
19511950

1952-
// Determine if we should use presigned mode for the batch.
1951+
// Make a set of confirmed sweeps.
1952+
confirmedSet := make(map[wire.OutPoint]struct{}, len(spendTx.TxIn))
1953+
for _, txIn := range spendTx.TxIn {
1954+
confirmedSet[txIn.PreviousOutPoint] = struct{}{}
1955+
}
1956+
1957+
// As a previous version of the batch transaction may get confirmed,
1958+
// which does not contain the latest sweeps, we need to detect the
1959+
// sweeps that did not make it to the confirmed transaction and feed
1960+
// them back to the batcher. This will ensure that the sweeps will enter
1961+
// a new batch instead of remaining dangling.
1962+
var (
1963+
totalSweptAmt btcutil.Amount
1964+
confirmedSweeps = []wire.OutPoint{}
1965+
)
1966+
for _, sweep := range b.sweeps {
1967+
// Skip sweeps that were not included into the confirmed tx.
1968+
_, found := confirmedSet[sweep.outpoint]
1969+
if !found {
1970+
continue
1971+
}
1972+
1973+
totalSweptAmt += sweep.value
1974+
notifyList = append(notifyList, sweep)
1975+
confirmedSweeps = append(confirmedSweeps, sweep.outpoint)
1976+
}
1977+
1978+
// Calculate the fee portion that each sweep should pay for the batch.
1979+
feePortionPaidPerSweep, roundingDifference := getFeePortionForSweep(
1980+
spendTx, len(notifyList), totalSweptAmt,
1981+
)
1982+
1983+
for _, sweep := range notifyList {
1984+
// If the sweep's notifier is empty then this means that a swap
1985+
// is not waiting to read an update from it, so we can skip
1986+
// the notification part.
1987+
if sweep.notifier == nil ||
1988+
*sweep.notifier == (SpendNotifier{}) {
1989+
1990+
continue
1991+
}
1992+
1993+
spendDetail := SpendDetail{
1994+
Tx: spendTx,
1995+
OnChainFeePortion: getFeePortionPaidBySweep(
1996+
spendTx, feePortionPaidPerSweep,
1997+
roundingDifference, &sweep,
1998+
),
1999+
}
2000+
2001+
// Dispatch the sweep notifier, we don't care about the outcome
2002+
// of this action so we don't wait for it.
2003+
go func() {
2004+
// Make sure this context doesn't expire so we
2005+
// successfully notify the caller.
2006+
ctx := context.WithoutCancel(ctx)
2007+
2008+
sweep.notifySweepSpend(ctx, &spendDetail)
2009+
}()
2010+
}
2011+
2012+
b.Infof("spent, confirmed sweeps: %v", confirmedSweeps)
2013+
2014+
// We are no longer able to accept new sweeps, so we mark the batch as
2015+
// closed and persist on storage.
2016+
b.state = Closed
2017+
2018+
if err := b.persist(ctx); err != nil {
2019+
return fmt.Errorf("saving batch failed: %w", err)
2020+
}
2021+
2022+
if err := b.monitorConfirmations(ctx); err != nil {
2023+
return fmt.Errorf("monitorConfirmations failed: %w", err)
2024+
}
2025+
2026+
return nil
2027+
}
2028+
2029+
// handleConf handles a confirmation notification. This is the final step of the
2030+
// batch. Here we signal to the batcher that this batch was completed.
2031+
func (b *batch) handleConf(ctx context.Context,
2032+
conf *chainntnfs.TxConfirmation) error {
2033+
2034+
spendTx := conf.Tx
2035+
txHash := spendTx.TxHash()
2036+
if b.batchTxid == nil || *b.batchTxid != txHash {
2037+
b.Warnf("Mismatch of batch txid: tx in spend notification had "+
2038+
"txid %v, but confirmation notification has txif %v. "+
2039+
"Using the later.", b.batchTxid, txHash)
2040+
}
2041+
b.batchTxid = &txHash
2042+
2043+
b.Infof("confirmed in txid %s", b.batchTxid)
2044+
b.state = Confirmed
2045+
2046+
if err := b.persist(ctx); err != nil {
2047+
return fmt.Errorf("saving batch failed: %w", err)
2048+
}
2049+
2050+
// If the batch is in presigned mode, cleanup presignedHelper.
19532051
presigned, err := b.isPresigned()
19542052
if err != nil {
19552053
return fmt.Errorf("failed to determine if the batch %d uses "+
@@ -1967,40 +2065,43 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
19672065
b.id, err)
19682066
}
19692067

2068+
// Make a set of confirmed sweeps.
2069+
confirmedSet := make(map[wire.OutPoint]struct{}, len(spendTx.TxIn))
2070+
for _, txIn := range spendTx.TxIn {
2071+
confirmedSet[txIn.PreviousOutPoint] = struct{}{}
2072+
}
2073+
19702074
// As a previous version of the batch transaction may get confirmed,
19712075
// which does not contain the latest sweeps, we need to detect the
19722076
// sweeps that did not make it to the confirmed transaction and feed
19732077
// them back to the batcher. This will ensure that the sweeps will enter
19742078
// a new batch instead of remaining dangling.
19752079
var (
1976-
totalSweptAmt btcutil.Amount
19772080
confirmedSweeps = []wire.OutPoint{}
1978-
purgedSweeps = []wire.OutPoint{}
1979-
purgedSwaps = []lntypes.Hash{}
2081+
purgeList = make([]SweepRequest, 0, len(b.sweeps))
19802082
)
19812083
for _, sweep := range allSweeps {
1982-
found := false
1983-
1984-
for _, txIn := range spendTx.TxIn {
1985-
if txIn.PreviousOutPoint == sweep.outpoint {
1986-
found = true
1987-
totalSweptAmt += sweep.value
1988-
notifyList = append(notifyList, sweep)
1989-
confirmedSweeps = append(
1990-
confirmedSweeps, sweep.outpoint,
1991-
)
1992-
1993-
break
2084+
_, found := confirmedSet[sweep.outpoint]
2085+
if found {
2086+
// Save the sweep as completed. Note that sweeps are
2087+
// marked completed after the batch is marked confirmed
2088+
// because the check in handleSweeps checks sweep's
2089+
// status first and then checks the batch status.
2090+
err := b.persistSweep(ctx, sweep, true)
2091+
if err != nil {
2092+
return err
19942093
}
2094+
2095+
confirmedSweeps = append(
2096+
confirmedSweeps, sweep.outpoint,
2097+
)
2098+
2099+
continue
19952100
}
19962101

19972102
// If the sweep's outpoint was not found in the transaction's
19982103
// inputs this means it was left out. So we delete it from this
19992104
// batch and feed it back to the batcher.
2000-
if found {
2001-
continue
2002-
}
2003-
20042105
newSweep := sweep
20052106
delete(b.sweeps, sweep.outpoint)
20062107

@@ -2032,52 +2133,19 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
20322133
})
20332134
}
20342135
}
2136+
var (
2137+
purgedSweeps = []wire.OutPoint{}
2138+
purgedSwaps = []lntypes.Hash{}
2139+
)
20352140
for _, sweepReq := range purgeList {
20362141
purgedSwaps = append(purgedSwaps, sweepReq.SwapHash)
20372142
for _, input := range sweepReq.Inputs {
20382143
purgedSweeps = append(purgedSweeps, input.Outpoint)
20392144
}
20402145
}
20412146

2042-
// Calculate the fee portion that each sweep should pay for the batch.
2043-
feePortionPaidPerSweep, roundingDifference := getFeePortionForSweep(
2044-
spendTx, len(notifyList), totalSweptAmt,
2045-
)
2046-
2047-
for _, sweep := range notifyList {
2048-
// Save the sweep as completed.
2049-
err := b.persistSweep(ctx, sweep, true)
2050-
if err != nil {
2051-
return err
2052-
}
2053-
2054-
// If the sweep's notifier is empty then this means that a swap
2055-
// is not waiting to read an update from it, so we can skip
2056-
// the notification part.
2057-
if sweep.notifier == nil ||
2058-
*sweep.notifier == (SpendNotifier{}) {
2059-
2060-
continue
2061-
}
2062-
2063-
spendDetail := SpendDetail{
2064-
Tx: spendTx,
2065-
OnChainFeePortion: getFeePortionPaidBySweep(
2066-
spendTx, feePortionPaidPerSweep,
2067-
roundingDifference, &sweep,
2068-
),
2069-
}
2070-
2071-
// Dispatch the sweep notifier, we don't care about the outcome
2072-
// of this action so we don't wait for it.
2073-
go func() {
2074-
// Make sure this context doesn't expire so we
2075-
// successfully notify the caller.
2076-
ctx := context.WithoutCancel(ctx)
2077-
2078-
sweep.notifySweepSpend(ctx, &spendDetail)
2079-
}()
2080-
}
2147+
b.Infof("fully confirmed sweeps: %v, purged sweeps: %v, "+
2148+
"purged swaps: %v", confirmedSweeps, purgedSweeps, purgedSwaps)
20812149

20822150
// Proceed with purging the sweeps. This will feed the sweeps that
20832151
// didn't make it to the confirmed batch transaction back to the batcher
@@ -2099,50 +2167,6 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
20992167
}
21002168
}()
21012169

2102-
b.Infof("spent, confirmed sweeps: %v, purged sweeps: %v, "+
2103-
"purged swaps: %v, purged groups: %v", confirmedSweeps,
2104-
purgedSweeps, purgedSwaps, len(purgeList))
2105-
2106-
// We are no longer able to accept new sweeps, so we mark the batch as
2107-
// closed and persist on storage.
2108-
b.state = Closed
2109-
2110-
if err := b.persist(ctx); err != nil {
2111-
return fmt.Errorf("saving batch failed: %w", err)
2112-
}
2113-
2114-
err = b.monitorConfirmations(ctx)
2115-
if err != nil {
2116-
return fmt.Errorf("monitorConfirmations failed: %w", err)
2117-
}
2118-
2119-
return nil
2120-
}
2121-
2122-
// handleConf handles a confirmation notification. This is the final step of the
2123-
// batch. Here we signal to the batcher that this batch was completed. We also
2124-
// cleanup up presigned transactions whose primarySweepID is one of the sweeps
2125-
// that were spent and fully confirmed: such a transaction can't be broadcasted
2126-
// since it is either in a block or double-spends one of spent outputs.
2127-
func (b *batch) handleConf(ctx context.Context,
2128-
conf *chainntnfs.TxConfirmation) error {
2129-
2130-
spendTx := conf.Tx
2131-
txHash := spendTx.TxHash()
2132-
if b.batchTxid == nil || *b.batchTxid != txHash {
2133-
b.Warnf("Mismatch of batch txid: tx in spend notification had "+
2134-
"txid %v, but confirmation notification has txif %v. "+
2135-
"Using the later.", b.batchTxid, txHash)
2136-
}
2137-
b.batchTxid = &txHash
2138-
2139-
// If the batch is in presigned mode, cleanup presignedHelper.
2140-
presigned, err := b.isPresigned()
2141-
if err != nil {
2142-
return fmt.Errorf("failed to determine if the batch %d uses "+
2143-
"presigned mode: %w", b.id, err)
2144-
}
2145-
21462170
if presigned {
21472171
b.Infof("Cleaning up presigned store")
21482172

@@ -2158,13 +2182,6 @@ func (b *batch) handleConf(ctx context.Context,
21582182
}
21592183
}
21602184

2161-
b.Infof("confirmed in txid %s", b.batchTxid)
2162-
b.state = Confirmed
2163-
2164-
if err := b.store.ConfirmBatch(ctx, b.id); err != nil {
2165-
return fmt.Errorf("failed to store confirmed state: %w", err)
2166-
}
2167-
21682185
// Send the confirmation to all the notifiers.
21692186
for _, s := range b.sweeps {
21702187
// If the sweep's notifier is empty then this means that

sweepbatcher/sweep_batcher.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ type addSweepsRequest struct {
273273
notifier *SpendNotifier
274274

275275
// completed is set if the sweep is spent and the spending transaction
276-
// is confirmed.
276+
// is fully confirmed.
277277
completed bool
278278

279279
// parentBatch is the parent batch of this sweep. It is loaded ony if
@@ -777,8 +777,8 @@ func (b *Batcher) AddSweep(ctx context.Context, sweepReq *SweepRequest) error {
777777
}
778778

779779
infof("Batcher adding sweep group of %d sweeps with primarySweep %x, "+
780-
"presigned=%v, completed=%v", len(sweeps), sweep.swapHash[:6],
781-
sweep.presigned, completed)
780+
"presigned=%v, fully_confirmed=%v", len(sweeps),
781+
sweep.swapHash[:6], sweep.presigned, completed)
782782

783783
req := &addSweepsRequest{
784784
sweeps: sweeps,
@@ -838,14 +838,10 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep,
838838
// If the sweep has already been completed in a confirmed batch then we
839839
// can't attach its notifier to the batch as that is no longer running.
840840
// Instead we directly detect and return the spend here.
841-
if completed && *notifier != (SpendNotifier{}) {
842-
// The parent batch is indeed confirmed, meaning it is complete
843-
// and we won't be able to attach this sweep to it.
844-
if parentBatch.Confirmed {
845-
return b.monitorSpendAndNotify(
846-
ctx, sweep, parentBatch.ID, notifier,
847-
)
848-
}
841+
if completed && parentBatch.Confirmed {
842+
return b.monitorSpendAndNotify(
843+
ctx, sweep, parentBatch.ID, notifier,
844+
)
849845
}
850846

851847
sweep.notifier = notifier
@@ -1129,6 +1125,11 @@ func (b *Batcher) FetchUnconfirmedBatches(ctx context.Context) ([]*batch,
11291125
func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep,
11301126
parentBatchID int32, notifier *SpendNotifier) error {
11311127

1128+
// If the caller has not provided a notifier, stop.
1129+
if notifier == nil || *notifier == (SpendNotifier{}) {
1130+
return nil
1131+
}
1132+
11321133
spendCtx, cancel := context.WithCancel(ctx)
11331134

11341135
// Then we get the total amount that was swept by the batch.

0 commit comments

Comments
 (0)