diff --git a/loopout.go b/loopout.go index 99a7b7371..f2c1e4b22 100644 --- a/loopout.go +++ b/loopout.go @@ -1159,8 +1159,12 @@ func (s *loopOutSwap) waitForHtlcSpendConfirmedV2(globalCtx context.Context, sweepReq := sweepbatcher.SweepRequest{ SwapHash: s.hash, - Outpoint: htlcOutpoint, - Value: htlcValue, + Inputs: []sweepbatcher.Input{ + { + Outpoint: htlcOutpoint, + Value: htlcValue, + }, + }, Notifier: ¬ifier, } diff --git a/sweepbatcher/greedy_batch_selection.go b/sweepbatcher/greedy_batch_selection.go index 1df9591f1..1d6726e86 100644 --- a/sweepbatcher/greedy_batch_selection.go +++ b/sweepbatcher/greedy_batch_selection.go @@ -15,28 +15,34 @@ import ( "github.com/lightningnetwork/lnd/lnwallet/chainfee" ) -// greedyAddSweep selects a batch for the sweep using the greedy algorithm, -// which minimizes costs, and adds the sweep to the batch. To accomplish this, -// it first collects fee details about the sweep being added, about a potential -// new batch composed of this sweep only, and about all existing batches. It +// greedyAddSweeps selects a batch for the sweeps using the greedy algorithm, +// which minimizes costs, and adds the sweeps to the batch. To accomplish this, +// it first collects fee details about the sweeps being added, about a potential +// new batch composed of these sweeps only, and about all existing batches. It // skips batches with at least MaxSweepsPerBatch swaps to keep tx standard. Then // it passes the data to selectBatches() function, which emulates adding the -// sweep to each batch and creating new batch for the sweep, and calculates the +// sweep to each batch and creating new batch for the sweeps, and calculates the // costs of each alternative. Based on the estimates of selectBatches(), this -// method adds the sweep to the batch that results in the least overall fee -// increase, or creates new batch for it. If the sweep is not accepted by an +// method adds the sweeps to the batch that results in the least overall fee +// increase, or creates new batch for it. If the sweeps are not accepted by an // existing batch (may happen because of too distant timeouts), next batch is // tried in the list returned by selectBatches(). If adding fails or new batch // creation fails, this method returns an error. If this method fails for any // reason, the caller falls back to the simple algorithm (method handleSweep). -func (b *Batcher) greedyAddSweep(ctx context.Context, sweep *sweep) error { +func (b *Batcher) greedyAddSweeps(ctx context.Context, sweeps []*sweep) error { + if len(sweeps) == 0 { + return fmt.Errorf("trying to greedy add an empty sweeps group") + } + + swap := sweeps[0].swapHash + // Collect weight and fee rate info about the sweep and new batch. sweepFeeDetails, newBatchFeeDetails, err := estimateSweepFeeIncrement( - sweep, + sweeps, ) if err != nil { return fmt.Errorf("failed to estimate tx weight for "+ - "sweep %x: %w", sweep.swapHash[:6], err) + "sweep %x: %w", swap[:6], err) } // Collect weight and fee rate info about existing batches. @@ -64,30 +70,30 @@ func (b *Batcher) greedyAddSweep(ctx context.Context, sweep *sweep) error { ) if err != nil { return fmt.Errorf("batch selection algorithm failed for sweep "+ - "%x: %w", sweep.swapHash[:6], err) + "%x: %w", swap[:6], err) } // Try batches, starting with the best. for _, batchId := range batchesIds { // If the best option is to start new batch, do it. if batchId == newBatchSignal { - return b.spinUpNewBatch(ctx, sweep) + return b.spinUpNewBatch(ctx, sweeps) } - // Locate the batch to add the sweep to. + // Locate the batch to add the sweeps to. bestBatch, has := b.batches[batchId] if !has { return fmt.Errorf("batch selection algorithm returned "+ "batch id %d which doesn't exist, for sweep %x", - batchId, sweep.swapHash[:6]) + batchId, swap[:6]) } - // Add the sweep to the batch. - accepted, err := bestBatch.addSweep(ctx, sweep) + // Add the sweeps to the batch. + accepted, err := bestBatch.addSweeps(ctx, sweeps) if err != nil { return fmt.Errorf("batch selection algorithm returned "+ "batch id %d for sweep %x, but adding failed: "+ - "%w", batchId, sweep.swapHash[:6], err) + "%w", batchId, swap[:6], err) } if accepted { return nil @@ -95,23 +101,34 @@ func (b *Batcher) greedyAddSweep(ctx context.Context, sweep *sweep) error { debugf("Batch selection algorithm returned batch id %d "+ "for sweep %x, but acceptance failed.", batchId, - sweep.swapHash[:6]) + swap[:6]) } - return fmt.Errorf("no batch accepted sweep %x", sweep.swapHash[:6]) + return fmt.Errorf("no batch accepted sweep group %x", swap[:6]) } -// estimateSweepFeeIncrement returns fee details for adding the sweep to a batch -// and for creating new batch with this sweep only. -func estimateSweepFeeIncrement(s *sweep) (feeDetails, feeDetails, error) { - // Create a fake batch with this sweep. +// estimateSweepFeeIncrement returns fee details for adding the sweeps to +// a batch and for creating new batch with these sweeps only. +func estimateSweepFeeIncrement( + sweeps []*sweep) (feeDetails, feeDetails, error) { + + if len(sweeps) == 0 { + return feeDetails{}, feeDetails{}, fmt.Errorf("estimating an " + + "empty group of sweeps") + } + + // Create a fake batch with the sweeps. batch := &batch{ rbfCache: rbfCache{ - FeeRate: s.minFeeRate, - }, - sweeps: map[wire.OutPoint]sweep{ - s.outpoint: *s, + FeeRate: sweeps[0].minFeeRate, }, + sweeps: make(map[wire.OutPoint]sweep, len(sweeps)), + } + for _, s := range sweeps { + batch.sweeps[s.outpoint] = *s + batch.rbfCache.FeeRate = max( + batch.rbfCache.FeeRate, s.minFeeRate, + ) } // Estimate new batch. @@ -120,14 +137,17 @@ func estimateSweepFeeIncrement(s *sweep) (feeDetails, feeDetails, error) { return feeDetails{}, feeDetails{}, err } - // Add the same sweep again to measure weight increments. - outpoint2 := s.outpoint - outpoint2.Hash[0]++ - if _, has := batch.sweeps[outpoint2]; has { - return feeDetails{}, feeDetails{}, fmt.Errorf("dummy outpoint "+ - "%s is present in the batch", outpoint2) + // Add the same sweeps again with different outpoints to measure weight + // increments. + for _, s := range sweeps { + dummy := s.outpoint + dummy.Hash[0]++ + if _, has := batch.sweeps[dummy]; has { + return feeDetails{}, feeDetails{}, fmt.Errorf("dummy "+ + "outpoint %s is present in the batch", dummy) + } + batch.sweeps[dummy] = *s } - batch.sweeps[outpoint2] = *s // Estimate weight of a batch with two sweeps. fd2, err := estimateBatchWeight(batch) @@ -137,8 +157,8 @@ func estimateSweepFeeIncrement(s *sweep) (feeDetails, feeDetails, error) { // Create feeDetails for sweep. sweepFeeDetails := feeDetails{ - FeeRate: s.minFeeRate, - IsExternalAddr: s.isExternalAddr, + FeeRate: batch.rbfCache.FeeRate, + IsExternalAddr: sweeps[0].isExternalAddr, // Calculate sweep weight as a difference. Weight: fd2.Weight - fd1.Weight, @@ -252,10 +272,10 @@ func (e1 feeDetails) combine(e2 feeDetails) feeDetails { // rate and a weight is provided. Also, a hint is provided to signal which // spending path will be used by the batch. // -// The same data is also provided for the sweep for which we are selecting a -// batch to add. In case of the sweep weights are weight deltas resulted from -// adding the sweep. Finally, the same data is provided for new batch having -// this sweep only. +// The same data is also provided for the sweep (or sweeps) for which we are +// selecting a batch to add. In case of the sweep weights are weight deltas +// resulted from adding the sweep. Finally, the same data is provided for new +// batch having this sweep(s) only. // // The algorithm compares costs of adding the sweep to each existing batch, and // costs of new batch creation for this sweep and returns BatchId of the winning @@ -265,11 +285,11 @@ func (e1 feeDetails) combine(e2 feeDetails) feeDetails { // having flag IsExternalAddr must go in individual batches. Cooperative // spending may only be available for some sweeps supporting it, not for all. func selectBatches(batches []feeDetails, - sweep, oneSweepBatch feeDetails) ([]int32, error) { + added, newBatch feeDetails) ([]int32, error) { // If the sweep has IsExternalAddr flag, the sweep can't be added to // a batch, so create new batch for it. - if sweep.IsExternalAddr { + if added.IsExternalAddr { return []int32{newBatchSignal}, nil } @@ -286,7 +306,7 @@ func selectBatches(batches []feeDetails, // creation with this sweep only in it. The cost is its full fee. alternatives = append(alternatives, alternative{ batchId: newBatchSignal, - cost: oneSweepBatch.fee(), + cost: newBatch.fee(), }) // Try to add the sweep to every batch, calculate the costs and @@ -299,7 +319,7 @@ func selectBatches(batches []feeDetails, } // Add the sweep to the batch virtually. - combinedBatch := batch.combine(sweep) + combinedBatch := batch.combine(added) // The cost is the fee increase. cost := combinedBatch.fee() - batch.fee() diff --git a/sweepbatcher/greedy_batch_selection_test.go b/sweepbatcher/greedy_batch_selection_test.go index d5aa58a49..cc47f1ad5 100644 --- a/sweepbatcher/greedy_batch_selection_test.go +++ b/sweepbatcher/greedy_batch_selection_test.go @@ -74,17 +74,28 @@ func TestEstimateSweepFeeIncrement(t *testing.T) { trAddr := (*btcutil.AddressTaproot)(nil) p2pkhAddr := (*btcutil.AddressPubKeyHash)(nil) + outpoint1 := wire.OutPoint{ + Hash: chainhash.Hash{1, 1, 1}, + Index: 1, + } + outpoint2 := wire.OutPoint{ + Hash: chainhash.Hash{2, 2, 2}, + Index: 2, + } + cases := []struct { name string - sweep *sweep + sweeps []*sweep wantSweepFeeDetails feeDetails wantNewBatchFeeDetails feeDetails }{ { name: "regular", - sweep: &sweep{ - minFeeRate: lowFeeRate, - htlcSuccessEstimator: se3, + sweeps: []*sweep{ + { + minFeeRate: lowFeeRate, + htlcSuccessEstimator: se3, + }, }, wantSweepFeeDetails: feeDetails{ FeeRate: lowFeeRate, @@ -98,9 +109,11 @@ func TestEstimateSweepFeeIncrement(t *testing.T) { { name: "high fee rate", - sweep: &sweep{ - minFeeRate: highFeeRate, - htlcSuccessEstimator: se3, + sweeps: []*sweep{ + { + minFeeRate: highFeeRate, + htlcSuccessEstimator: se3, + }, }, wantSweepFeeDetails: feeDetails{ FeeRate: highFeeRate, @@ -114,11 +127,13 @@ func TestEstimateSweepFeeIncrement(t *testing.T) { { name: "isExternalAddr taproot", - sweep: &sweep{ - minFeeRate: lowFeeRate, - htlcSuccessEstimator: se3, - isExternalAddr: true, - destAddr: trAddr, + sweeps: []*sweep{ + { + minFeeRate: lowFeeRate, + htlcSuccessEstimator: se3, + isExternalAddr: true, + destAddr: trAddr, + }, }, wantSweepFeeDetails: feeDetails{ FeeRate: lowFeeRate, @@ -134,11 +149,13 @@ func TestEstimateSweepFeeIncrement(t *testing.T) { { name: "isExternalAddr P2PKH", - sweep: &sweep{ - minFeeRate: lowFeeRate, - htlcSuccessEstimator: se3, - isExternalAddr: true, - destAddr: p2pkhAddr, + sweeps: []*sweep{ + { + minFeeRate: lowFeeRate, + htlcSuccessEstimator: se3, + isExternalAddr: true, + destAddr: p2pkhAddr, + }, }, wantSweepFeeDetails: feeDetails{ FeeRate: lowFeeRate, @@ -155,10 +172,12 @@ func TestEstimateSweepFeeIncrement(t *testing.T) { { name: "non-coop", - sweep: &sweep{ - minFeeRate: lowFeeRate, - htlcSuccessEstimator: se3, - nonCoopHint: true, + sweeps: []*sweep{ + { + minFeeRate: lowFeeRate, + htlcSuccessEstimator: se3, + nonCoopHint: true, + }, }, wantSweepFeeDetails: feeDetails{ FeeRate: lowFeeRate, @@ -172,10 +191,12 @@ func TestEstimateSweepFeeIncrement(t *testing.T) { { name: "coop-failed", - sweep: &sweep{ - minFeeRate: lowFeeRate, - htlcSuccessEstimator: se3, - coopFailed: true, + sweeps: []*sweep{ + { + minFeeRate: lowFeeRate, + htlcSuccessEstimator: se3, + coopFailed: true, + }, }, wantSweepFeeDetails: feeDetails{ FeeRate: lowFeeRate, @@ -186,12 +207,36 @@ func TestEstimateSweepFeeIncrement(t *testing.T) { Weight: nonCoopNewBatchWeight, }, }, + + { + name: "two sweeps", + sweeps: []*sweep{ + { + outpoint: outpoint1, + minFeeRate: lowFeeRate, + htlcSuccessEstimator: se3, + }, + { + outpoint: outpoint2, + minFeeRate: highFeeRate, + htlcSuccessEstimator: se3, + }, + }, + wantSweepFeeDetails: feeDetails{ + FeeRate: highFeeRate, + Weight: coopInputWeight * 2, + }, + wantNewBatchFeeDetails: feeDetails{ + FeeRate: highFeeRate, + Weight: coopNewBatchWeight + coopInputWeight, + }, + }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { gotSweepFeeDetails, gotNewBatchFeeDetails, err := - estimateSweepFeeIncrement(tc.sweep) + estimateSweepFeeIncrement(tc.sweeps) require.NoError(t, err) require.Equal( t, tc.wantSweepFeeDetails, gotSweepFeeDetails, diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 1cf8e573a..7497ec3da 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -465,55 +465,19 @@ func (b *batch) Errorf(format string, params ...interface{}) { b.log().Errorf(format, params...) } -// addSweep tries to add a sweep to the batch. If this is the first sweep being -// added to the batch then it also sets the primary sweep ID. -func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) { - done, err := b.scheduleNextCall() - defer done() - - if err != nil { - return false, err - } - +// checkSweepToAdd checks if a sweep can be added or updated in the batch. The +// caller must lock the event loop using scheduleNextCall. The function returns +// if the sweep already exists in the batch. +func (b *batch) checkSweepToAdd(_ context.Context, sweep *sweep) (bool, error) { // If the provided sweep is nil, we can't proceed with any checks, so // we just return early. if sweep == nil { - b.Infof("the sweep is nil") - - return false, nil + return false, fmt.Errorf("the sweep is nil") } // Before we run through the acceptance checks, let's just see if this // sweep is already in our batch. In that case, just update the sweep. - oldSweep, ok := b.sweeps[sweep.outpoint] - if ok { - // Preserve coopFailed value not to forget about cooperative - // spending failure in this sweep. - tmp := *sweep - tmp.coopFailed = oldSweep.coopFailed - - // If the sweep was resumed from storage, and the swap requested - // to sweep again, a new sweep notifier will be created by the - // swap. By re-assigning to the batch's sweep we make sure that - // everything, including the notifier, is up to date. - b.sweeps[sweep.outpoint] = tmp - - // If this is the primary sweep, we also need to update the - // batch's confirmation target and fee rate. - if b.primarySweepID == sweep.outpoint { - b.cfg.batchConfTarget = sweep.confTarget - b.rbfCache.SkipNextBump = true - } - - // Update batch's fee rate to be greater than or equal to - // minFeeRate of the sweep. Make sure batch's fee rate does not - // decrease (otherwise it won't pass RBF rules and won't be - // broadcasted) and that it is not lower that minFeeRate of - // other sweeps (so it is applied). - if b.rbfCache.FeeRate < sweep.minFeeRate { - b.rbfCache.FeeRate = sweep.minFeeRate - } - + if _, ok := b.sweeps[sweep.outpoint]; ok { return true, nil } @@ -521,19 +485,16 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) { // the batch, do not add another sweep to prevent the tx from becoming // non-standard. if len(b.sweeps) >= MaxSweepsPerBatch { - b.Infof("the batch has already too many sweeps %d >= %d", - len(b.sweeps), MaxSweepsPerBatch) - - return false, nil + return false, fmt.Errorf("the batch has already too many "+ + "sweeps %d >= %d", len(b.sweeps), MaxSweepsPerBatch) } // Since all the actions of the batch happen sequentially, we could // arrive here after the batch got closed because of a spend. In this // case we cannot add the sweep to this batch. if b.state != Open { - b.Infof("the batch state (%v) is not open", b.state) - - return false, nil + return false, fmt.Errorf("the batch state (%v) is not open", + b.state) } // If this batch contains a single sweep that spends to a non-wallet @@ -541,17 +502,15 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) { // we cannot add this sweep to the batch. for _, s := range b.sweeps { if s.isExternalAddr { - b.Infof("the batch already has a sweep %x with "+ - "an external address", s.swapHash[:6]) - - return false, nil + return false, fmt.Errorf("the batch already has a "+ + "sweep %x with an external address", + s.swapHash[:6]) } if sweep.isExternalAddr { - b.Infof("the batch is not empty and new sweep %x "+ - "has an external address", sweep.swapHash[:6]) - - return false, nil + return false, fmt.Errorf("the batch is not empty and "+ + "new sweep %x has an external address", + sweep.swapHash[:6]) } } @@ -563,46 +522,165 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) { int32(math.Abs(float64(sweep.timeout - s.timeout))) if timeoutDistance > b.cfg.maxTimeoutDistance { - b.Infof("too long timeout distance between the "+ - "batch and sweep %x: %d > %d", + return false, fmt.Errorf("too long timeout distance "+ + "between the batch and sweep %x: %d > %d", sweep.swapHash[:6], timeoutDistance, b.cfg.maxTimeoutDistance) + } + } + + // Everything is ok, the sweep can be added to the batch. + return false, nil +} + +// addSweeps tries to add sweeps to the batch. If this is the first sweep being +// added to the batch then it also sets the primary sweep ID. It returns if the +// sweeps were accepted to the batch. +func (b *batch) addSweeps(ctx context.Context, sweeps []*sweep) (bool, error) { + done, err := b.scheduleNextCall() + defer done() + if err != nil { + return false, err + } + + // This must be a bug, so log a warning. + if len(sweeps) == 0 { + b.Warnf("An attempt to add zero sweeps.") + + return false, nil + } + + // Track how many new and existing sweeps are among the sweeps. + var numExisting, numNew int + for _, s := range sweeps { + existing, err := b.checkSweepToAdd(ctx, s) + if err != nil { + b.Infof("Failed to add sweep %v to batch %d: %v", + s.outpoint, b.id, err) + + return false, nil + } + if existing { + numExisting++ + } else { + numNew++ + } + } + + // Make sure the whole group is either new or existing. If this is not + // the case, this might be a bug, so print a warning. + if numExisting > 0 && numNew > 0 { + b.Warnf("There are %d existing and %d new sweeps among the "+ + "group. They must not be mixed.", numExisting, numNew) + + return false, nil + } + + // Make sure all the sweeps spend different outpoints. + outpointsSet := make(map[wire.OutPoint]struct{}, len(sweeps)) + for _, s := range sweeps { + if _, has := outpointsSet[s.outpoint]; has { + b.Warnf("Multiple sweeps spend outpoint %v", s.outpoint) return false, nil } + outpointsSet[s.outpoint] = struct{}{} } // Past this point we know that a new incoming sweep passes the // acceptance criteria and is now ready to be added to this batch. - // If this is the first sweep being added to the batch, make it the - // primary sweep. - if b.primarySweepID == zeroSweepID { - b.primarySweepID = sweep.outpoint - b.cfg.batchConfTarget = sweep.confTarget - b.rbfCache.FeeRate = sweep.minFeeRate - b.rbfCache.SkipNextBump = true - - // We also need to start the spend monitor for this new primary - // sweep. - err := b.monitorSpend(ctx, *sweep) - if err != nil { - return false, err + // For an existing group, update the sweeps in the batch. + if numExisting == len(sweeps) { + for _, s := range sweeps { + oldSweep, ok := b.sweeps[s.outpoint] + if !ok { + return false, fmt.Errorf("sweep %v not found "+ + "in batch %d", s.outpoint, b.id) + } + + // Preserve coopFailed value not to forget about + // cooperative spending failure in this sweep. + tmp := *s + tmp.coopFailed = oldSweep.coopFailed + + // If the sweep was resumed from storage, and the swap + // requested to sweep again, a new sweep notifier will + // be created by the swap. By re-assigning to the + // batch's sweep we make sure that everything, including + // the notifier, is up to date. + b.sweeps[s.outpoint] = tmp + + // If this is the primary sweep, we also need to update + // the batch's confirmation target and fee rate. + if b.primarySweepID == s.outpoint { + b.cfg.batchConfTarget = s.confTarget + b.rbfCache.SkipNextBump = true + } + + // Update batch's fee rate to be greater than or equal + // to minFeeRate of the sweep. Make sure batch's fee + // rate does not decrease (otherwise it won't pass RBF + // rules and won't be broadcasted) and that it is not + // lower that minFeeRate of other sweeps (so it is + // applied). + if b.rbfCache.FeeRate < s.minFeeRate { + b.rbfCache.FeeRate = s.minFeeRate + } } - } - // Add the sweep to the batch's sweeps. - b.Infof("adding sweep %x", sweep.swapHash[:6]) - b.sweeps[sweep.outpoint] = *sweep + return true, nil + } else if numNew != len(sweeps) { + // Sanity check: all the sweeps must be either existing or new. + // We have checked this above, let's check here as well. + return false, fmt.Errorf("bug in numExisting and numNew logic:"+ + " numExisting=%d, numNew=%d, len(sweeps)=%d, "+ + "len(b.sweeps)=%d", numExisting, numNew, len(sweeps), + len(b.sweeps)) + } + + // Here is the code to add new sweeps to a batch. + for _, s := range sweeps { + // If this is the first sweep being added to the batch, make it + // the primary sweep. + if b.primarySweepID == zeroSweepID { + b.primarySweepID = s.outpoint + b.cfg.batchConfTarget = s.confTarget + b.rbfCache.FeeRate = s.minFeeRate + b.rbfCache.SkipNextBump = true - // Update FeeRate. Max(sweep.minFeeRate) for all the sweeps of - // the batch is the basis for fee bumps. - if b.rbfCache.FeeRate < sweep.minFeeRate { - b.rbfCache.FeeRate = sweep.minFeeRate - b.rbfCache.SkipNextBump = true + // We also need to start the spend monitor for this new + // primary sweep. + err := b.monitorSpend(ctx, *s) + if err != nil { + return false, err + } + } + + // Make sure the sweep is not present in the batch. If it is + // present, this is a bug, return an error to stop here. + if _, has := b.sweeps[s.outpoint]; has { + return false, fmt.Errorf("sweep %v is already present "+ + "in batch %d", s.outpoint, b.id) + } + + // Add the sweep to the batch's sweeps. + b.Infof("adding sweep %v, swap %x", s.outpoint, s.swapHash[:6]) + b.sweeps[s.outpoint] = *s + + // Update FeeRate. Max(s.minFeeRate) for all the sweeps of + // the batch is the basis for fee bumps. + if b.rbfCache.FeeRate < s.minFeeRate { + b.rbfCache.FeeRate = s.minFeeRate + b.rbfCache.SkipNextBump = true + } + + if err := b.persistSweep(ctx, *s, false); err != nil { + return true, err + } } - return true, b.persistSweep(ctx, *sweep, false) + return true, nil } // sweepExists returns true if the batch contains the sweep with the given @@ -1754,8 +1832,12 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { delete(b.sweeps, sweep.outpoint) purgeList = append(purgeList, SweepRequest{ SwapHash: newSweep.swapHash, - Outpoint: newSweep.outpoint, - Value: newSweep.value, + Inputs: []Input{ + { + Outpoint: newSweep.outpoint, + Value: newSweep.value, + }, + }, Notifier: newSweep.notifier, }) } diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index 3c2534971..908e88896 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -199,16 +199,23 @@ func defaultPublishErrorLogger(err error, errMsg string, log btclog.Logger) { log.Warnf("%s: %v", errMsg, err) } -// SweepRequest is a request to sweep a specific outpoint. -type SweepRequest struct { - // SwapHash is the hash of the swap that is being swept. - SwapHash lntypes.Hash - +// Input specifies an UTXO with amount that is added to the batcher. +type Input struct { // Outpoint is the outpoint that is being swept. Outpoint wire.OutPoint // Value is the value of the outpoint that is being swept. Value btcutil.Amount +} + +// SweepRequest is a request to sweep an outpoint or a group of outpoints. +type SweepRequest struct { + // SwapHash is the hash of the swap that is being swept. + SwapHash lntypes.Hash + + // Inputs specifies the inputs in the same request. All the inputs + // belong to the same swap and are added to the same batch. + Inputs []Input // Notifier is a notifier that is used to notify the requester of this // sweep that the sweep was successful. @@ -551,16 +558,16 @@ func (b *Batcher) Run(ctx context.Context) error { for { select { case sweepReq := <-b.sweepReqs: - sweep, err := b.fetchSweep(runCtx, sweepReq) + sweeps, err := b.fetchSweeps(runCtx, sweepReq) if err != nil { - warnf("fetchSweep failed: %v.", err) + warnf("fetchSweeps failed: %v.", err) return err } - err = b.handleSweep(runCtx, sweep, sweepReq.Notifier) + err = b.handleSweeps(runCtx, sweeps, sweepReq.Notifier) if err != nil { - warnf("handleSweep failed: %v.", err) + warnf("handleSweeps failed: %v.", err) return err } @@ -624,11 +631,19 @@ func (b *Batcher) testRunInEventLoop(ctx context.Context, handler func()) { } } -// handleSweep handles a sweep request by either placing it in an existing -// batch, or by spinning up a new batch for it. -func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep, +// handleSweeps handles a sweep request by either placing the group of sweeps in +// an existing batch, or by spinning up a new batch for it. +func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep, notifier *SpendNotifier) error { + if len(sweeps) == 0 { + return fmt.Errorf("trying to add an empty group of sweeps") + } + + // Since the whole group is added to the same batch and belongs to + // the same transaction, we use sweeps[0] below where we need any sweep. + sweep := sweeps[0] + completed, err := b.store.GetSweepStatus(ctx, sweep.outpoint) if err != nil { return err @@ -675,7 +690,7 @@ func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep, // provide the sweep to that batch and return. for _, batch := range b.batches { if batch.sweepExists(sweep.outpoint) { - accepted, err := batch.addSweep(ctx, sweep) + accepted, err := batch.addSweeps(ctx, sweeps) if err != nil && !errors.Is(err, ErrBatchShuttingDown) { return err } @@ -692,7 +707,7 @@ func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep, } // Try to run the greedy algorithm of batch selection to minimize costs. - err = b.greedyAddSweep(ctx, sweep) + err = b.greedyAddSweeps(ctx, sweeps) if err == nil { // The greedy algorithm succeeded. return nil @@ -703,7 +718,7 @@ func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep, // If one of the batches accepts the sweep, we provide it to that batch. for _, batch := range b.batches { - accepted, err := batch.addSweep(ctx, sweep) + accepted, err := batch.addSweeps(ctx, sweeps) if err != nil && !errors.Is(err, ErrBatchShuttingDown) { return err } @@ -717,28 +732,28 @@ func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep, // If no batch is capable of accepting the sweep, we spin up a fresh // batch and hand the sweep over to it. - return b.spinUpNewBatch(ctx, sweep) + return b.spinUpNewBatch(ctx, sweeps) } -// spinUpNewBatch creates new batch, starts it and adds the sweep to it. -func (b *Batcher) spinUpNewBatch(ctx context.Context, sweep *sweep) error { +// spinUpNewBatch creates new batch, starts it and adds the sweeps to it. +func (b *Batcher) spinUpNewBatch(ctx context.Context, sweeps []*sweep) error { // Spin up a fresh batch. newBatch, err := b.spinUpBatch(ctx) if err != nil { return err } - // Add the sweep to the fresh batch. - accepted, err := newBatch.addSweep(ctx, sweep) + // Add the sweeps to the fresh batch. + accepted, err := newBatch.addSweeps(ctx, sweeps) if err != nil { return err } - // If the sweep wasn't accepted by the fresh batch something is wrong, + // If the sweeps weren't accepted by the fresh batch something is wrong, // we should return the error. if !accepted { return fmt.Errorf("sweep %x was not accepted by new batch %d", - sweep.swapHash[:6], newBatch.id) + sweeps[0].swapHash[:6], newBatch.id) } return nil @@ -1101,12 +1116,24 @@ func NewSweepFetcherFromSwapStore(swapStore LoopOutFetcher, }, nil } -// fetchSweep fetches the sweep related information from the database. -func (b *Batcher) fetchSweep(ctx context.Context, - sweepReq SweepRequest) (*sweep, error) { +// fetchSweeps fetches the sweep related information from the database. +func (b *Batcher) fetchSweeps(ctx context.Context, + sweepReq SweepRequest) ([]*sweep, error) { + + sweeps := make([]*sweep, len(sweepReq.Inputs)) + for i, utxo := range sweepReq.Inputs { + s, err := b.loadSweep( + ctx, sweepReq.SwapHash, utxo.Outpoint, + utxo.Value, + ) + if err != nil { + return nil, fmt.Errorf("failed to load "+ + "sweep %v: %w", utxo.Outpoint, err) + } + sweeps[i] = s + } - return b.loadSweep(ctx, sweepReq.SwapHash, sweepReq.Outpoint, - sweepReq.Value) + return sweeps, nil } // loadSweep loads inputs of sweep from the database and from FeeRateProvider diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index dfef3395f..12c9261e9 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -211,14 +211,26 @@ func testSweepBatcherBatchCreation(t *testing.T, store testStore, checkBatcherError(t, err) }() + op1 := wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + } + op2 := wire.OutPoint{ + Hash: chainhash.Hash{2, 2}, + Index: 2, + } + op3 := wire.OutPoint{ + Hash: chainhash.Hash{3, 3}, + Index: 3, + } + // Create a sweep request. sweepReq1 := SweepRequest{ SwapHash: lntypes.Hash{1, 1, 1}, - Value: 111, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{1, 1}, - Index: 1, - }, + Inputs: []Input{{ + Value: 111, + Outpoint: op1, + }}, Notifier: &dummyNotifier, } @@ -262,11 +274,10 @@ func testSweepBatcherBatchCreation(t *testing.T, store testStore, // our configured threshold. sweepReq2 := SweepRequest{ SwapHash: lntypes.Hash{2, 2, 2}, - Value: 222, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{2, 2}, - Index: 2, - }, + Inputs: []Input{{ + Value: 222, + Outpoint: op2, + }}, Notifier: &dummyNotifier, } @@ -309,11 +320,10 @@ func testSweepBatcherBatchCreation(t *testing.T, store testStore, // the default. sweepReq3 := SweepRequest{ SwapHash: lntypes.Hash{3, 3, 3}, - Value: 333, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{3, 3}, - Index: 3, - }, + Inputs: []Input{{ + Value: 333, + Outpoint: op3, + }}, Notifier: &dummyNotifier, } @@ -360,12 +370,12 @@ func testSweepBatcherBatchCreation(t *testing.T, store testStore, for _, batch := range batches { batch := batch.snapshot(ctx) switch batch.primarySweepID { - case sweepReq1.Outpoint: + case op1: if len(batch.sweeps) != 2 { return false } - case sweepReq3.Outpoint: + case op3: if len(batch.sweeps) != 1 { return false } @@ -376,9 +386,9 @@ func testSweepBatcherBatchCreation(t *testing.T, store testStore, }, test.Timeout, eventuallyCheckFrequency) // Check that all sweeps were stored. - require.True(t, batcherStore.AssertSweepStored(sweepReq1.Outpoint)) - require.True(t, batcherStore.AssertSweepStored(sweepReq2.Outpoint)) - require.True(t, batcherStore.AssertSweepStored(sweepReq3.Outpoint)) + require.True(t, batcherStore.AssertSweepStored(op1)) + require.True(t, batcherStore.AssertSweepStored(op2)) + require.True(t, batcherStore.AssertSweepStored(op3)) } // testFeeBumping tests that sweep is RBFed with slightly higher fee rate after @@ -419,11 +429,13 @@ func testFeeBumping(t *testing.T, store testStore, // Create a sweep request. sweepReq1 := SweepRequest{ SwapHash: lntypes.Hash{1, 1, 1}, - Value: 1_000_000, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{1, 1}, - Index: 1, - }, + Inputs: []Input{{ + Value: 1_000_000, + Outpoint: wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + }, + }}, Notifier: &dummyNotifier, } @@ -519,13 +531,16 @@ func testTxLabeler(t *testing.T, store testStore, }() // Create a sweep request. + op1 := wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + } sweepReq1 := SweepRequest{ SwapHash: lntypes.Hash{1, 1, 1}, - Value: 111, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{1, 1}, - Index: 1, - }, + Inputs: []Input{{ + Value: 111, + Outpoint: op1, + }}, Notifier: &dummyNotifier, } @@ -565,7 +580,7 @@ func testTxLabeler(t *testing.T, store testStore, var wantLabel string for _, btch := range getBatches(ctx, batcher) { btch := btch.snapshot(ctx) - if btch.primarySweepID == sweepReq1.Outpoint { + if btch.primarySweepID == op1 { wantLabel = fmt.Sprintf( "BatchOutSweepSuccess -- %d", btch.id, ) @@ -675,11 +690,13 @@ func testPublishErrorHandler(t *testing.T, store testStore, // Create a sweep request. sweepReq1 := SweepRequest{ SwapHash: lntypes.Hash{1, 1, 1}, - Value: 111, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{1, 1}, - Index: 1, - }, + Inputs: []Input{{ + Value: 111, + Outpoint: wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + }, + }}, Notifier: &dummyNotifier, } @@ -751,13 +768,16 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, }() // Create a sweep request. + op1 := wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + } sweepReq1 := SweepRequest{ SwapHash: lntypes.Hash{1, 1, 1}, - Value: 111, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{1, 1}, - Index: 1, - }, + Inputs: []Input{{ + Value: 111, + Outpoint: op1, + }}, Notifier: &dummyNotifier, } @@ -794,7 +814,7 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, batch := &batch{} for _, btch := range getBatches(ctx, batcher) { btch.testRunInEventLoop(ctx, func() { - if btch.primarySweepID == sweepReq1.Outpoint { + if btch.primarySweepID == op1 { batch = btch } }) @@ -806,7 +826,7 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, }, test.Timeout, eventuallyCheckFrequency) // The primary sweep id should be that of the first inserted sweep. - require.Equal(t, batch.primarySweepID, sweepReq1.Outpoint) + require.Equal(t, batch.primarySweepID, op1) // Wait for tx to be published. <-lnd.TxPublishChannel @@ -833,7 +853,7 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, // outpoint we insert that outpoint here. TxIn: []*wire.TxIn{ { - PreviousOutPoint: sweepReq1.Outpoint, + PreviousOutPoint: op1, }, }, TxOut: []*wire.TxOut{ @@ -847,7 +867,7 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, // Mock the spend notification that spends the swap. spendDetail := &chainntnfs.SpendDetail{ - SpentOutPoint: &sweepReq1.Outpoint, + SpentOutPoint: &op1, SpendingTx: spendingTx, SpenderTxHash: &spendingTxHash, SpenderInputIndex: 0, @@ -959,13 +979,16 @@ func testDelays(t *testing.T, store testStore, batcherStore testBatcherStore) { <-batcher.initDone // Create a sweep request. + op1 := wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + } sweepReq := SweepRequest{ SwapHash: lntypes.Hash{1, 1, 1}, - Value: 111, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{1, 1}, - Index: 1, - }, + Inputs: []Input{{ + Value: 111, + Outpoint: op1, + }}, Notifier: &dummyNotifier, } @@ -1064,7 +1087,7 @@ func testDelays(t *testing.T, store testStore, batcherStore testBatcherStore) { // batch. require.Eventually(t, func() bool { // Make sure that the sweep was stored - if !batcherStore.AssertSweepStored(sweepReq.Outpoint) { + if !batcherStore.AssertSweepStored(op1) { return false } @@ -1143,7 +1166,7 @@ func testDelays(t *testing.T, store testStore, batcherStore testBatcherStore) { // Wait for batch to load. require.Eventually(t, func() bool { // Make sure that the sweep was stored - if !batcherStore.AssertSweepStored(sweepReq.Outpoint) { + if !batcherStore.AssertSweepStored(op1) { return false } @@ -1255,11 +1278,13 @@ func testDelays(t *testing.T, store testStore, batcherStore testBatcherStore) { // Create a sweep request which is not urgent, but close to. sweepReq2 := SweepRequest{ SwapHash: lntypes.Hash{2, 2, 2}, - Value: 111, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{2, 2}, - Index: 2, - }, + Inputs: []Input{{ + Value: 111, + Outpoint: wire.OutPoint{ + Hash: chainhash.Hash{2, 2}, + Index: 2, + }, + }}, Notifier: &dummyNotifier, } @@ -1336,11 +1361,13 @@ func testDelays(t *testing.T, store testStore, batcherStore testBatcherStore) { // to make sure minimum timeout is calculated properly. sweepReq3 := SweepRequest{ SwapHash: lntypes.Hash{3, 3, 3}, - Value: 111, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{3, 3}, - Index: 3, - }, + Inputs: []Input{{ + Value: 111, + Outpoint: wire.OutPoint{ + Hash: chainhash.Hash{3, 3}, + Index: 3, + }, + }}, Notifier: &dummyNotifier, } swap3 := &loopdb.LoopOutContract{ @@ -1373,7 +1400,9 @@ func testDelays(t *testing.T, store testStore, batcherStore testBatcherStore) { testLogger2.mu.Lock() defer testLogger2.mu.Unlock() - assert.Contains(c, testLogger2.infoMessages, "adding sweep %x") + assert.Contains( + c, testLogger2.infoMessages, "adding sweep %v, swap %x", + ) }, test.Timeout, eventuallyCheckFrequency) // Advance the clock by publishDelay. Don't wait largeInitialDelay. @@ -1470,11 +1499,13 @@ func testCustomDelays(t *testing.T, store testStore, // Create a sweep request. sweepReq1 := SweepRequest{ SwapHash: swapHash1, - Value: swapSize1, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{1, 1}, - Index: 1, - }, + Inputs: []Input{{ + Value: swapSize1, + Outpoint: wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + }, + }}, Notifier: &dummyNotifier, } @@ -1536,11 +1567,13 @@ func testCustomDelays(t *testing.T, store testStore, // Now add swap 2, which has lower initialDelay. sweepReq2 := SweepRequest{ SwapHash: swapHash2, - Value: swapSize2, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{2, 2}, - Index: 2, - }, + Inputs: []Input{{ + Value: swapSize2, + Outpoint: wire.OutPoint{ + Hash: chainhash.Hash{2, 2}, + Index: 2, + }, + }}, Notifier: &dummyNotifier, } @@ -1685,8 +1718,10 @@ func testMaxSweepsPerBatch(t *testing.T, store testStore, // Create a sweep request. sweepReq := SweepRequest{ SwapHash: swapHash, - Value: 111, - Outpoint: outpoint, + Inputs: []Input{{ + Value: 111, + Outpoint: outpoint, + }}, Notifier: &dummyNotifier, } @@ -1797,13 +1832,17 @@ func testSweepBatcherSweepReentry(t *testing.T, store testStore, // Create some sweep requests with timeouts not too far away, in order // to enter the same batch. + op1 := wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + } + value1 := btcutil.Amount(111) sweepReq1 := SweepRequest{ SwapHash: lntypes.Hash{1, 1, 1}, - Value: 111, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{1, 1}, - Index: 1, - }, + Inputs: []Input{{ + Value: value1, + Outpoint: op1, + }}, Notifier: &dummyNotifier, } @@ -1825,11 +1864,13 @@ func testSweepBatcherSweepReentry(t *testing.T, store testStore, sweepReq2 := SweepRequest{ SwapHash: lntypes.Hash{2, 2, 2}, - Value: 222, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{2, 2}, - Index: 2, - }, + Inputs: []Input{{ + Value: 222, + Outpoint: wire.OutPoint{ + Hash: chainhash.Hash{2, 2}, + Index: 2, + }, + }}, Notifier: &dummyNotifier, } @@ -1854,11 +1895,13 @@ func testSweepBatcherSweepReentry(t *testing.T, store testStore, sweepReq3 := SweepRequest{ SwapHash: lntypes.Hash{3, 3, 3}, - Value: 333, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{3, 3}, - Index: 3, - }, + Inputs: []Input{{ + Value: 333, + Outpoint: wire.OutPoint{ + Hash: chainhash.Hash{3, 3}, + Index: 3, + }, + }}, Notifier: &dummyNotifier, } @@ -1920,7 +1963,7 @@ func testSweepBatcherSweepReentry(t *testing.T, store testStore, b := &batch{} for _, btch := range getBatches(ctx, batcher) { btch.testRunInEventLoop(ctx, func() { - if btch.primarySweepID == sweepReq1.Outpoint { + if btch.primarySweepID == op1 { b = btch } }) @@ -1933,7 +1976,7 @@ func testSweepBatcherSweepReentry(t *testing.T, store testStore, // Verify that the batch has a primary sweep id that matches the first // inserted sweep, sweep1. - require.Equal(t, b.primarySweepID, sweepReq1.Outpoint) + require.Equal(t, b.primarySweepID, op1) // Create the spending tx. In order to simulate an older version of the // batch transaction being confirmed, we only insert the primary sweep's @@ -1944,12 +1987,12 @@ func testSweepBatcherSweepReentry(t *testing.T, store testStore, Version: 1, TxIn: []*wire.TxIn{ { - PreviousOutPoint: sweepReq1.Outpoint, + PreviousOutPoint: op1, }, }, TxOut: []*wire.TxOut{ { - Value: int64(sweepReq1.Value.ToUnit( + Value: int64(value1.ToUnit( btcutil.AmountSatoshi, )), PkScript: []byte{3, 2, 1}, @@ -1960,7 +2003,7 @@ func testSweepBatcherSweepReentry(t *testing.T, store testStore, spendingTxHash := spendingTx.TxHash() spendDetail := &chainntnfs.SpendDetail{ - SpentOutPoint: &sweepReq1.Outpoint, + SpentOutPoint: &op1, SpendingTx: spendingTx, SpenderTxHash: &spendingTxHash, SpenderInputIndex: 0, @@ -2031,6 +2074,88 @@ func testSweepBatcherSweepReentry(t *testing.T, store testStore, require.Equal(t, b1.state, Open) } +// testSweepBatcherGroup tests adding a group of UTXOs with the same swap hash +// to the batcher. +func testSweepBatcherGroup(t *testing.T, store testStore, + batcherStore testBatcherStore) { + + defer test.Guard(t)() + + lnd := test.NewMockLnd() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sweepStore, err := NewSweepFetcherFromSwapStore(store, lnd.ChainParams) + require.NoError(t, err) + + batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, sweepStore) + go func() { + err := batcher.Run(ctx) + checkBatcherError(t, err) + }() + + swapHash := lntypes.Hash{1, 1, 1} + + outpoint1 := wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + } + outpoint2 := wire.OutPoint{ + Hash: chainhash.Hash{2, 2}, + Index: 2, + } + + swap1 := &loopdb.LoopOutContract{ + SwapContract: loopdb.SwapContract{ + CltvExpiry: 111, + AmountRequested: 111, + ProtocolVersion: loopdb.ProtocolVersionMuSig2, + HtlcKeys: htlcKeys, + }, + DestAddr: destAddr, + SwapInvoice: swapInvoice, + SweepConfTarget: 111, + } + + err = store.CreateLoopOut(ctx, swapHash, swap1) + require.NoError(t, err) + store.AssertLoopOutStored() + + // Create sweep request with a group of two UTXOs. + sweepReq := SweepRequest{ + SwapHash: lntypes.Hash{1, 1, 1}, + Inputs: []Input{ + { + Outpoint: outpoint1, + Value: 111, + }, + { + Outpoint: outpoint2, + Value: 222, + }, + }, + Notifier: &dummyNotifier, + } + require.NoError(t, batcher.AddSweep(&sweepReq)) + + // After inserting the primary (first) sweep, a spend monitor should be + // registered. + <-lnd.RegisterSpendChannel + + // Wait for tx to be published. + tx := <-lnd.TxPublishChannel + require.Len(t, tx.TxIn, 2) + require.ElementsMatch( + t, []wire.OutPoint{outpoint1, outpoint2}, + []wire.OutPoint{ + tx.TxIn[0].PreviousOutPoint, + tx.TxIn[1].PreviousOutPoint, + }, + ) +} + // testSweepBatcherNonWalletAddr tests that sweep requests that sweep to a non // wallet address enter individual batches. func testSweepBatcherNonWalletAddr(t *testing.T, store testStore, @@ -2053,14 +2178,26 @@ func testSweepBatcherNonWalletAddr(t *testing.T, store testStore, checkBatcherError(t, err) }() + op1 := wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + } + op2 := wire.OutPoint{ + Hash: chainhash.Hash{2, 2}, + Index: 2, + } + op3 := wire.OutPoint{ + Hash: chainhash.Hash{3, 3}, + Index: 3, + } + // Create a sweep request. sweepReq1 := SweepRequest{ SwapHash: lntypes.Hash{1, 1, 1}, - Value: 111, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{1, 1}, - Index: 1, - }, + Inputs: []Input{{ + Value: 111, + Outpoint: op1, + }}, Notifier: &dummyNotifier, } @@ -2104,11 +2241,10 @@ func testSweepBatcherNonWalletAddr(t *testing.T, store testStore, // our configured threshold. sweepReq2 := SweepRequest{ SwapHash: lntypes.Hash{2, 2, 2}, - Value: 222, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{2, 2}, - Index: 2, - }, + Inputs: []Input{{ + Value: 222, + Outpoint: op2, + }}, Notifier: &dummyNotifier, } @@ -2151,11 +2287,10 @@ func testSweepBatcherNonWalletAddr(t *testing.T, store testStore, // the default. sweepReq3 := SweepRequest{ SwapHash: lntypes.Hash{3, 3, 3}, - Value: 333, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{3, 3}, - Index: 3, - }, + Inputs: []Input{{ + Value: 333, + Outpoint: op3, + }}, Notifier: &dummyNotifier, } @@ -2201,17 +2336,17 @@ func testSweepBatcherNonWalletAddr(t *testing.T, store testStore, for _, batch := range batches { batch := batch.snapshot(ctx) switch batch.primarySweepID { - case sweepReq1.Outpoint: + case op1: if len(batch.sweeps) != 1 { return false } - case sweepReq2.Outpoint: + case op2: if len(batch.sweeps) != 1 { return false } - case sweepReq3.Outpoint: + case op3: if len(batch.sweeps) != 1 { return false } @@ -2222,9 +2357,9 @@ func testSweepBatcherNonWalletAddr(t *testing.T, store testStore, }, test.Timeout, eventuallyCheckFrequency) // Check that all sweeps were stored. - require.True(t, batcherStore.AssertSweepStored(sweepReq1.Outpoint)) - require.True(t, batcherStore.AssertSweepStored(sweepReq2.Outpoint)) - require.True(t, batcherStore.AssertSweepStored(sweepReq3.Outpoint)) + require.True(t, batcherStore.AssertSweepStored(op1)) + require.True(t, batcherStore.AssertSweepStored(op2)) + require.True(t, batcherStore.AssertSweepStored(op3)) } // testSweepBatcherComposite tests that sweep requests that sweep to both wallet @@ -2249,14 +2384,38 @@ func testSweepBatcherComposite(t *testing.T, store testStore, checkBatcherError(t, err) }() + op1 := wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + } + op2 := wire.OutPoint{ + Hash: chainhash.Hash{2, 2}, + Index: 2, + } + op3 := wire.OutPoint{ + Hash: chainhash.Hash{3, 3}, + Index: 3, + } + op4 := wire.OutPoint{ + Hash: chainhash.Hash{4, 4}, + Index: 4, + } + op5 := wire.OutPoint{ + Hash: chainhash.Hash{5, 5}, + Index: 5, + } + op6 := wire.OutPoint{ + Hash: chainhash.Hash{6, 6}, + Index: 6, + } + // Create a sweep request. sweepReq1 := SweepRequest{ SwapHash: lntypes.Hash{1, 1, 1}, - Value: 111, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{1, 1}, - Index: 1, - }, + Inputs: []Input{{ + Value: 111, + Outpoint: op1, + }}, Notifier: &dummyNotifier, } @@ -2280,11 +2439,10 @@ func testSweepBatcherComposite(t *testing.T, store testStore, // our configured threshold. sweepReq2 := SweepRequest{ SwapHash: lntypes.Hash{2, 2, 2}, - Value: 222, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{2, 2}, - Index: 2, - }, + Inputs: []Input{{ + Value: 222, + Outpoint: op2, + }}, Notifier: &dummyNotifier, } @@ -2311,11 +2469,10 @@ func testSweepBatcherComposite(t *testing.T, store testStore, // default max, but is not spending to a wallet address. sweepReq3 := SweepRequest{ SwapHash: lntypes.Hash{3, 3, 3}, - Value: 333, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{3, 3}, - Index: 3, - }, + Inputs: []Input{{ + Value: 333, + Outpoint: op3, + }}, Notifier: &dummyNotifier, } @@ -2343,11 +2500,10 @@ func testSweepBatcherComposite(t *testing.T, store testStore, // for the first batch, so it will cause it to create a new batch. sweepReq4 := SweepRequest{ SwapHash: lntypes.Hash{4, 4, 4}, - Value: 444, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{4, 4}, - Index: 4, - }, + Inputs: []Input{{ + Value: 444, + Outpoint: op4, + }}, Notifier: &dummyNotifier, } @@ -2374,11 +2530,10 @@ func testSweepBatcherComposite(t *testing.T, store testStore, // for the first batch, but a valid timeout for the new batch. sweepReq5 := SweepRequest{ SwapHash: lntypes.Hash{5, 5, 5}, - Value: 555, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{5, 5}, - Index: 5, - }, + Inputs: []Input{{ + Value: 555, + Outpoint: op5, + }}, Notifier: &dummyNotifier, } @@ -2405,11 +2560,10 @@ func testSweepBatcherComposite(t *testing.T, store testStore, // batch, but is paying to a non-wallet address. sweepReq6 := SweepRequest{ SwapHash: lntypes.Hash{6, 6, 6}, - Value: 666, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{6, 6}, - Index: 6, - }, + Inputs: []Input{{ + Value: 666, + Outpoint: op6, + }}, Notifier: &dummyNotifier, } @@ -2540,22 +2694,22 @@ func testSweepBatcherComposite(t *testing.T, store testStore, for _, batch := range batches { batch := batch.snapshot(ctx) switch batch.primarySweepID { - case sweepReq1.Outpoint: + case op1: if len(batch.sweeps) != 2 { return false } - case sweepReq3.Outpoint: + case op3: if len(batch.sweeps) != 1 { return false } - case sweepReq4.Outpoint: + case op4: if len(batch.sweeps) != 2 { return false } - case sweepReq6.Outpoint: + case op5: if len(batch.sweeps) != 1 { return false } @@ -2566,12 +2720,12 @@ func testSweepBatcherComposite(t *testing.T, store testStore, }, test.Timeout, eventuallyCheckFrequency) // Check that all sweeps were stored. - require.True(t, batcherStore.AssertSweepStored(sweepReq1.Outpoint)) - require.True(t, batcherStore.AssertSweepStored(sweepReq2.Outpoint)) - require.True(t, batcherStore.AssertSweepStored(sweepReq3.Outpoint)) - require.True(t, batcherStore.AssertSweepStored(sweepReq4.Outpoint)) - require.True(t, batcherStore.AssertSweepStored(sweepReq5.Outpoint)) - require.True(t, batcherStore.AssertSweepStored(sweepReq6.Outpoint)) + require.True(t, batcherStore.AssertSweepStored(op1)) + require.True(t, batcherStore.AssertSweepStored(op2)) + require.True(t, batcherStore.AssertSweepStored(op3)) + require.True(t, batcherStore.AssertSweepStored(op4)) + require.True(t, batcherStore.AssertSweepStored(op5)) + require.True(t, batcherStore.AssertSweepStored(op6)) } // makeTestTx creates a test transaction with a single output of the given @@ -2653,14 +2807,18 @@ func testRestoringEmptyBatch(t *testing.T, store testStore, // Wait for the batcher to be initialized. <-batcher.initDone + op := wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + } + // Create a sweep request. sweepReq := SweepRequest{ SwapHash: lntypes.Hash{1, 1, 1}, - Value: 111, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{1, 1}, - Index: 1, - }, + Inputs: []Input{{ + Value: 111, + Outpoint: op, + }}, Notifier: &dummyNotifier, } @@ -2695,7 +2853,7 @@ func testRestoringEmptyBatch(t *testing.T, store testStore, require.Eventually(t, func() bool { // Make sure that the sweep was stored and we have exactly one // active batch. - if !batcherStore.AssertSweepStored(sweepReq.Outpoint) { + if !batcherStore.AssertSweepStored(op) { return false } @@ -2835,13 +2993,16 @@ func testHandleSweepTwice(t *testing.T, backend testStore, // Create two sweep requests with CltvExpiry distant from each other // to go assigned to separate batches. + op1 := wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + } sweepReq1 := SweepRequest{ SwapHash: lntypes.Hash{1, 1, 1}, - Value: 111, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{1, 1}, - Index: 1, - }, + Inputs: []Input{{ + Value: 111, + Outpoint: op1, + }}, Notifier: &dummyNotifier, } @@ -2862,13 +3023,16 @@ func testHandleSweepTwice(t *testing.T, backend testStore, }, } + op2 := wire.OutPoint{ + Hash: chainhash.Hash{2, 2}, + Index: 2, + } sweepReq2 := SweepRequest{ SwapHash: lntypes.Hash{2, 2, 2}, - Value: 222, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{2, 2}, - Index: 2, - }, + Inputs: []Input{{ + Value: 222, + Outpoint: op2, + }}, Notifier: &dummyNotifier, } @@ -2915,10 +3079,10 @@ func testHandleSweepTwice(t *testing.T, backend testStore, require.Eventually(t, func() bool { // Make sure that the sweep was stored and we have exactly one // active batch. - if !batcherStore.AssertSweepStored(sweepReq1.Outpoint) { + if !batcherStore.AssertSweepStored(op1) { return false } - if !batcherStore.AssertSweepStored(sweepReq2.Outpoint) { + if !batcherStore.AssertSweepStored(op2) { return false } @@ -2969,7 +3133,7 @@ func testHandleSweepTwice(t *testing.T, backend testStore, snapshot := secondBatch.snapshot(ctx) // Make sure the second batch has the second sweep. - sweep2, has := snapshot.sweeps[sweepReq2.Outpoint] + sweep2, has := snapshot.sweeps[op2] if !has { return false } @@ -3031,13 +3195,16 @@ func testRestoringPreservesConfTarget(t *testing.T, store testStore, <-batcher.initDone // Create a sweep request. + op := wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + } sweepReq := SweepRequest{ SwapHash: lntypes.Hash{1, 1, 1}, - Value: 111, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{1, 1}, - Index: 1, - }, + Inputs: []Input{{ + Value: 111, + Outpoint: op, + }}, Notifier: &dummyNotifier, } @@ -3072,7 +3239,7 @@ func testRestoringPreservesConfTarget(t *testing.T, store testStore, // batch. require.Eventually(t, func() bool { // Make sure that the sweep was stored - if !batcherStore.AssertSweepStored(sweepReq.Outpoint) { + if !batcherStore.AssertSweepStored(op) { return false } @@ -3128,7 +3295,7 @@ func testRestoringPreservesConfTarget(t *testing.T, store testStore, // Wait for batch to load. require.Eventually(t, func() bool { // Make sure that the sweep was stored - if !batcherStore.AssertSweepStored(sweepReq.Outpoint) { + if !batcherStore.AssertSweepStored(op) { return false } @@ -3226,19 +3393,22 @@ func testSweepFetcher(t *testing.T, store testStore, } // Create a sweep request. + op := wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + } sweepReq := SweepRequest{ SwapHash: swapHash, - Value: amt, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{1, 1}, - Index: 1, - }, + Inputs: []Input{{ + Value: amt, + Outpoint: op, + }}, Notifier: &dummyNotifier, } sweepFetcher := &sweepFetcherMock{ store: map[wire.OutPoint]*SweepInfo{ - sweepReq.Outpoint: sweepInfo, + op: sweepInfo, }, } @@ -3284,7 +3454,7 @@ func testSweepFetcher(t *testing.T, store testStore, // batch. require.Eventually(t, func() bool { // Make sure that the sweep was stored - if !batcherStore.AssertSweepStored(sweepReq.Outpoint) { + if !batcherStore.AssertSweepStored(op) { return false } @@ -3384,11 +3554,13 @@ func testSweepBatcherCloseDuringAdding(t *testing.T, store testStore, // Create a sweep request. sweepReq := SweepRequest{ SwapHash: lntypes.Hash{i, i, i}, - Value: 111, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{i, i}, - Index: 1, - }, + Inputs: []Input{{ + Value: 111, + Outpoint: wire.OutPoint{ + Hash: chainhash.Hash{i, i}, + Index: 1, + }, + }}, Notifier: &dummyNotifier, } @@ -3461,11 +3633,13 @@ func testCustomSignMuSig2(t *testing.T, store testStore, // Create a sweep request. sweepReq := SweepRequest{ SwapHash: lntypes.Hash{1, 1, 1}, - Value: 111, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{1, 1}, - Index: 1, - }, + Inputs: []Input{{ + Value: 111, + Outpoint: wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + }, + }}, Notifier: &dummyNotifier, } @@ -3644,8 +3818,10 @@ func testWithMixedBatch(t *testing.T, store testStore, // Create sweep request. sweepReq := SweepRequest{ SwapHash: swapHash, - Value: 1_000_000, - Outpoint: outpoint, + Inputs: []Input{{ + Value: 1_000_000, + Outpoint: outpoint, + }}, Notifier: &dummyNotifier, } require.NoError(t, batcher.AddSweep(&sweepReq)) @@ -3812,8 +3988,10 @@ func testWithMixedBatchCustom(t *testing.T, store testStore, // Create sweep request. sweepReq := SweepRequest{ SwapHash: swapHash, - Value: 1_000_000, - Outpoint: outpoint, + Inputs: []Input{{ + Value: 1_000_000, + Outpoint: outpoint, + }}, Notifier: &dummyNotifier, } require.NoError(t, batcher.AddSweep(&sweepReq)) @@ -4114,11 +4292,13 @@ func testFeeRateGrows(t *testing.T, store testStore, setFeeRate(swapHash1, feeRateMedium) sweepReq1 := SweepRequest{ SwapHash: swapHash1, - Value: 1_000_000, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{1, 1}, - Index: 1, - }, + Inputs: []Input{{ + Value: 1_000_000, + Outpoint: wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + }, + }}, Notifier: &dummyNotifier, } @@ -4178,11 +4358,13 @@ func testFeeRateGrows(t *testing.T, store testStore, setFeeRate(swapHash2, feeRateMedium) sweepReq2 := SweepRequest{ SwapHash: swapHash2, - Value: 1_000_000, - Outpoint: wire.OutPoint{ - Hash: chainhash.Hash{2, 2}, - Index: 1, - }, + Inputs: []Input{{ + Value: 1_000_000, + Outpoint: wire.OutPoint{ + Hash: chainhash.Hash{2, 2}, + Index: 1, + }, + }}, Notifier: &dummyNotifier, } @@ -4302,6 +4484,12 @@ func TestSweepBatcherSweepReentry(t *testing.T) { runTests(t, testSweepBatcherSweepReentry) } +// TestSweepBatcherGroup tests adding a group of UTXOs with the same swap hash +// to the batcher. +func TestSweepBatcherGroup(t *testing.T) { + runTests(t, testSweepBatcherGroup) +} + // TestSweepBatcherNonWalletAddr tests that sweep requests that sweep to a non // wallet address enter individual batches. func TestSweepBatcherNonWalletAddr(t *testing.T) {