Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions sweepbatcher/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sweepbatcher
import (
"context"
"database/sql"
"fmt"

"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg"
Expand Down Expand Up @@ -121,6 +122,30 @@ func (s *SQLStore) UpdateSweepBatch(ctx context.Context, batch *dbBatch) error {
return s.baseDb.UpdateBatch(ctx, batchToUpdateArgs(*batch))
}

// ConfirmBatchWithSweeps atomically confirms the batch and updates its sweeps.
func (s *SQLStore) ConfirmBatchWithSweeps(ctx context.Context, batch *dbBatch,
sweeps []*dbSweep) error {

writeOpts := loopdb.NewSqlWriteOpts()

return s.baseDb.ExecTx(ctx, writeOpts, func(tx Querier) error {
err := tx.UpdateBatch(ctx, batchToUpdateArgs(*batch))
if err != nil {
return fmt.Errorf("update batch %d: %w", batch.ID, err)
}

for _, sweep := range sweeps {
err := tx.UpsertSweep(ctx, sweepToUpsertArgs(*sweep))
if err != nil {
return fmt.Errorf("upsert sweep %v: %w",
sweep.Outpoint, err)
}
}

return nil
})
}

// FetchBatchSweeps fetches all the sweeps that are part a batch.
func (s *SQLStore) FetchBatchSweeps(ctx context.Context, id int32) (
[]*dbSweep, error) {
Expand Down
27 changes: 27 additions & 0 deletions sweepbatcher/store_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sweepbatcher
import (
"context"
"errors"
"fmt"
"sort"
"sync"

Expand Down Expand Up @@ -77,6 +78,32 @@ func (s *StoreMock) UpdateSweepBatch(ctx context.Context,
return nil
}

// ConfirmBatchWithSweeps updates the batch and the provided sweeps atomically.
func (s *StoreMock) ConfirmBatchWithSweeps(ctx context.Context,
batch *dbBatch, sweeps []*dbSweep) error {

s.mu.Lock()
defer s.mu.Unlock()

s.batches[batch.ID] = *batch

for _, sweep := range sweeps {
sweepCopy := *sweep

old, exists := s.sweeps[sweep.Outpoint]
if !exists {
return fmt.Errorf("confirming unknown sweep %v",
sweep.Outpoint)
}

sweepCopy.ID = old.ID

s.sweeps[sweep.Outpoint] = sweepCopy
}

return nil
}

// FetchBatchSweeps fetches all the sweeps that belong to a batch.
func (s *StoreMock) FetchBatchSweeps(ctx context.Context,
id int32) ([]*dbSweep, error) {
Expand Down
55 changes: 38 additions & 17 deletions sweepbatcher/sweep_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2224,10 +2224,6 @@ func (b *batch) handleConf(ctx context.Context,
b.Infof("confirmed in txid %s", b.batchTxid)
b.state = Confirmed

if err := b.persist(ctx); err != nil {
return fmt.Errorf("saving batch failed: %w", err)
}

// If the batch is in presigned mode, cleanup presignedHelper.
presigned, err := b.isPresigned()
if err != nil {
Expand Down Expand Up @@ -2261,18 +2257,16 @@ func (b *batch) handleConf(ctx context.Context,
confirmedSweeps = []wire.OutPoint{}
purgeList = make([]SweepRequest, 0, len(b.sweeps))
totalSweptAmt btcutil.Amount
dbConfirmed = make([]*dbSweep, 0, len(allSweeps))
)
for _, sweep := range allSweeps {
_, found := confirmedSet[sweep.outpoint]
if found {
// Save the sweep as completed. Note that sweeps are
// marked completed after the batch is marked confirmed
// because the check in handleSweeps checks sweep's
// status first and then checks the batch status.
err := b.persistSweep(ctx, sweep, true)
if err != nil {
return err
}
// Save the sweep as completed; the batch row and all
// sweeps are persisted atomically below.
dbConfirmed = append(
dbConfirmed, b.dbSweepFrom(sweep, true),
)

confirmedSweeps = append(
confirmedSweeps, sweep.outpoint,
Expand Down Expand Up @@ -2328,8 +2322,15 @@ func (b *batch) handleConf(ctx context.Context,
}
}

b.Infof("fully confirmed sweeps: %v, purged sweeps: %v, "+
"purged swaps: %v", confirmedSweeps, purgedSweeps, purgedSwaps)
b.Infof("Fully confirmed sweeps: %v, purged sweeps: %v, "+
"purged swaps: %v. Saving the batch and sweeps to DB",
confirmedSweeps, purgedSweeps, purgedSwaps)

if err := b.persistConfirmedBatch(ctx, dbConfirmed); err != nil {
return fmt.Errorf("saving confirmed batch failed: %w", err)
}

b.Infof("Successfully saved the batch and confirmed sweeps to DB")

// Proceed with purging the sweeps. This will feed the sweeps that
// didn't make it to the confirmed batch transaction back to the batcher
Expand Down Expand Up @@ -2445,6 +2446,11 @@ func (b *batch) isComplete() bool {

// persist updates the batch in the database.
func (b *batch) persist(ctx context.Context) error {
return b.store.UpdateSweepBatch(ctx, b.dbBatch())
}

// dbBatch builds the dbBatch representation for the current in-memory state.
func (b *batch) dbBatch() *dbBatch {
bch := &dbBatch{}

bch.ID = b.id
Expand All @@ -2459,7 +2465,7 @@ func (b *batch) persist(ctx context.Context) error {
bch.LastRbfSatPerKw = int32(b.rbfCache.FeeRate)
bch.MaxTimeoutDistance = b.cfg.maxTimeoutDistance

return b.store.UpdateSweepBatch(ctx, bch)
return bch
}

// getBatchDestAddr returns the batch's destination address. If the batch
Expand Down Expand Up @@ -2612,16 +2618,31 @@ func (b *batch) writeToConfErrChan(ctx context.Context, confErr error) {
}
}

// persistSweep upserts the given sweep into the backing store and optionally
// marks it as completed.
func (b *batch) persistSweep(ctx context.Context, sweep sweep,
completed bool) error {

return b.store.UpsertSweep(ctx, &dbSweep{
return b.store.UpsertSweep(ctx, b.dbSweepFrom(sweep, completed))
}

// dbSweepFrom builds the dbSweep representation for a batch sweep.
func (b *batch) dbSweepFrom(sweep sweep, completed bool) *dbSweep {
return &dbSweep{
BatchID: b.id,
SwapHash: sweep.swapHash,
Outpoint: sweep.outpoint,
Amount: sweep.value,
Completed: completed,
})
}
}

// persistConfirmedBatch atomically records the batch confirmation metadata
// along with all sweeps that confirmed in the same transaction.
func (b *batch) persistConfirmedBatch(ctx context.Context,
sweeps []*dbSweep) error {

return b.store.ConfirmBatchWithSweeps(ctx, b.dbBatch(), sweeps)
}

// clampBatchFee takes the fee amount and total amount of the sweeps in the
Expand Down
10 changes: 7 additions & 3 deletions sweepbatcher/sweep_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ type BatcherStore interface {
// UpdateSweepBatch updates a batch in the database.
UpdateSweepBatch(ctx context.Context, batch *dbBatch) error

// ConfirmBatchWithSweeps atomically marks the batch as confirmed and
// updates the provided sweeps in the database.
ConfirmBatchWithSweeps(ctx context.Context, batch *dbBatch,
sweeps []*dbSweep) error

// FetchBatchSweeps fetches all the sweeps that belong to a batch.
FetchBatchSweeps(ctx context.Context, id int32) ([]*dbSweep, error)

Expand Down Expand Up @@ -975,9 +980,8 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep,
"sweeps with primarySweep %x: confirmed=%v",
len(sweeps), sweep.swapHash[:6], parentBatch.Confirmed)

// Note that sweeps are marked completed after the batch is
// marked confirmed because here we check the sweep status
// first and then check the batch status.
// Batch + sweeps are persisted atomically, so if the sweep
// shows as completed its parent batch must be confirmed.
if parentBatch.Confirmed {
debugf("Sweep group of %d sweeps with primarySweep %x "+
"is fully confirmed, switching directly to "+
Expand Down
Loading