Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 59 additions & 13 deletions sweepbatcher/sweep_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,13 @@ type batchConfig struct {
// clock provides methods to work with time and timers.
clock clock.Clock

// initialDelay is the delay of first batch publishing after creation.
// It only affects newly created batches, not batches loaded from DB,
// so publishing does happen in case of a daemon restart (especially
// important in case of a crashloop).
initialDelay time.Duration
// initialDelayProvider provides the delay of first batch publishing
// after creation. It only affects newly created batches, not batches
// loaded from DB, so publishing does happen in case of a daemon restart
// (especially important in case of a crashloop). If a sweep is about to
// expire (time until timeout is less that 2x initialDelay), then
// waiting is skipped.
initialDelayProvider InitialDelayProvider
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: as this isn't really about init, but the first publish, for clarity we could rename it to firstPublishDelayProvider (and the firstPublishDelay below in the runloop). wdyt?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the current name. We also have another option called PublishDelay. It could be confused with firstPublishDelayProvider if it is renamed. Also if we grep by PublishDelay, we would find both options.


// batchPublishDelay is the delay between receiving a new block or
// initial delay completion and publishing the batch transaction.
Expand Down Expand Up @@ -650,6 +652,7 @@ func (b *batch) Run(ctx context.Context) error {

// Cache clock variable.
clock := b.cfg.clock
startTime := clock.Now()

blockChan, blockErrChan, err :=
b.chainNotifier.RegisterBlockEpochNtfn(runCtx)
Expand Down Expand Up @@ -679,17 +682,15 @@ func (b *batch) Run(ctx context.Context) error {

// skipBefore is the time before which we skip batch publishing.
// This is needed to facilitate better grouping of sweeps.
// The value is set only if the batch has at least one sweep.
// For batches loaded from DB initialDelay should be 0.
skipBefore := clock.Now().Add(b.cfg.initialDelay)
var skipBefore *time.Time

// initialDelayChan is a timer which fires upon initial delay end.
// If initialDelay is set to 0, it will not trigger to avoid setting up
// timerChan twice, which could lead to double publishing if
// batchPublishDelay is also 0.
var initialDelayChan <-chan time.Time
if b.cfg.initialDelay > 0 {
initialDelayChan = clock.TickAfter(b.cfg.initialDelay)
}

// We use a timer in order to not publish new transactions at the same
// time as the block epoch notification. This is done to prevent
Expand All @@ -703,6 +704,45 @@ func (b *batch) Run(ctx context.Context) error {
b.primarySweepID, len(b.sweeps))

for {
// If the batch is not empty, find earliest initialDelay.
var totalSweptAmt btcutil.Amount
for _, sweep := range b.sweeps {
totalSweptAmt += sweep.value
}

skipBeforeUpdated := false
if totalSweptAmt != 0 {
initialDelay, err := b.cfg.initialDelayProvider(
ctx, len(b.sweeps), totalSweptAmt,
)
if err != nil {
b.Warnf("InitialDelayProvider failed: %v. We "+
"publish this batch without a delay.",
err)
initialDelay = 0
}
if initialDelay < 0 {
b.Warnf("Negative delay: %v. We publish this "+
"batch without a delay.", initialDelay)
initialDelay = 0
}
delayStop := startTime.Add(initialDelay)
if skipBefore == nil || delayStop.Before(*skipBefore) {
skipBefore = &delayStop
skipBeforeUpdated = true
}
}

// Create new timer only if the value of skipBefore was updated.
// Don't create the timer if the delay is <= 0 to avoid double
// publishing if batchPublishDelay is also 0.
if skipBeforeUpdated {
delay := skipBefore.Sub(clock.Now())
if delay > 0 {
initialDelayChan = clock.TickAfter(delay)
}
}

select {
case <-b.callEnter:
<-b.callLeave
Expand All @@ -718,7 +758,7 @@ func (b *batch) Run(ctx context.Context) error {

case <-initialDelayChan:
b.Debugf("initial delay of duration %v has ended",
b.cfg.initialDelay)
clock.Now().Sub(startTime))

// Set the timer to publish the batch transaction after
// the configured delay.
Expand All @@ -732,17 +772,23 @@ func (b *batch) Run(ctx context.Context) error {
continue
}

if skipBefore == nil {
b.Debugf("Skipping publishing, because " +
"the batch is empty.")
continue
}

// If the batch became urgent, skipBefore is set to now.
if b.isUrgent(skipBefore) {
skipBefore = clock.Now()
if b.isUrgent(*skipBefore) {
*skipBefore = clock.Now()
}

// Check that the initial delay has ended. We have also
// batchPublishDelay on top of initialDelay, so if
// initialDelayChan has just fired, this check passes.
now := clock.Now()
if skipBefore.After(now) {
b.Debugf(stillWaitingMsg, skipBefore, now)
b.Debugf(stillWaitingMsg, *skipBefore, now)
continue
}

Expand Down
98 changes: 57 additions & 41 deletions sweepbatcher/sweep_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,21 @@ type VerifySchnorrSig func(pubKey *btcec.PublicKey, hash, sig []byte) error
type FeeRateProvider func(ctx context.Context,
swapHash lntypes.Hash) (chainfee.SatPerKWeight, error)

// InitialDelayProvider returns the duration after which a newly created batch
// is first published. It allows to customize the duration based on total value
// of the batch. There is a trade-off between better grouping and getting funds
// faster. If the function returns an error, no delay is used and the error is
// logged as a warning.
type InitialDelayProvider func(ctx context.Context, numSweeps int,
value btcutil.Amount) (time.Duration, error)

// zeroInitialDelay returns no delay for any sweeps.
func zeroInitialDelay(_ context.Context, _ int,
_ btcutil.Amount) (time.Duration, error) {

return 0, nil
}

// PublishErrorHandler is a function that handles transaction publishing error.
type PublishErrorHandler func(err error, errMsg string, log btclog.Logger)

Expand Down Expand Up @@ -299,13 +314,13 @@ type Batcher struct {
// clock provides methods to work with time and timers.
clock clock.Clock

// initialDelay is the delay of first batch publishing after creation.
// It only affects newly created batches, not batches loaded from DB,
// so publishing does happen in case of a daemon restart (especially
// important in case of a crashloop). If a sweep is about to expire
// (time until timeout is less that 2x initialDelay), then waiting is
// skipped.
initialDelay time.Duration
// initialDelayProvider provides the delay of first batch publishing
// after creation. It only affects newly created batches, not batches
// loaded from DB, so publishing does happen in case of a daemon restart
// (especially important in case of a crashloop). If a sweep is about to
// expire (time until timeout is less that 2x initialDelay), then
// waiting is skipped.
initialDelayProvider InitialDelayProvider

// publishDelay is the delay of batch publishing that is applied in the
// beginning, after the appearance of a new block in the network or
Expand Down Expand Up @@ -339,13 +354,13 @@ type BatcherConfig struct {
// clock provides methods to work with time and timers.
clock clock.Clock

// initialDelay is the delay of first batch publishing after creation.
// It only affects newly created batches, not batches loaded from DB,
// so publishing does happen in case of a daemon restart (especially
// important in case of a crashloop). If a sweep is about to expire
// (time until timeout is less that 2x initialDelay), then waiting is
// skipped.
initialDelay time.Duration
// initialDelayProvider provides the delay of first batch publishing
// after creation. It only affects newly created batches, not batches
// loaded from DB, so publishing does happen in case of a daemon restart
// (especially important in case of a crashloop). If a sweep is about to
// expire (time until timeout is less that 2x initialDelay), then
// waiting is skipped.
initialDelayProvider InitialDelayProvider

// publishDelay is the delay of batch publishing that is applied in the
// beginning, after the appearance of a new block in the network or
Expand Down Expand Up @@ -390,9 +405,9 @@ func WithClock(clock clock.Clock) BatcherOption {
// better grouping. Defaults to 0s (no initial delay). If a sweep is about
// to expire (time until timeout is less that 2x initialDelay), then waiting
// is skipped.
func WithInitialDelay(initialDelay time.Duration) BatcherOption {
func WithInitialDelay(provider InitialDelayProvider) BatcherOption {
return func(cfg *BatcherConfig) {
cfg.initialDelay = initialDelay
cfg.initialDelayProvider = provider
}
}

Expand Down Expand Up @@ -478,27 +493,27 @@ func NewBatcher(wallet lndclient.WalletKitClient,
}

return &Batcher{
batches: make(map[int32]*batch),
sweepReqs: make(chan SweepRequest),
testReqs: make(chan *testRequest),
errChan: make(chan error, 1),
quit: make(chan struct{}),
initDone: make(chan struct{}),
wallet: wallet,
chainNotifier: chainNotifier,
signerClient: signerClient,
musig2ServerSign: musig2ServerSigner,
VerifySchnorrSig: verifySchnorrSig,
chainParams: chainparams,
store: store,
sweepStore: sweepStore,
clock: cfg.clock,
initialDelay: cfg.initialDelay,
publishDelay: cfg.publishDelay,
customFeeRate: cfg.customFeeRate,
txLabeler: cfg.txLabeler,
customMuSig2Signer: cfg.customMuSig2Signer,
publishErrorHandler: cfg.publishErrorHandler,
batches: make(map[int32]*batch),
sweepReqs: make(chan SweepRequest),
testReqs: make(chan *testRequest),
errChan: make(chan error, 1),
quit: make(chan struct{}),
initDone: make(chan struct{}),
wallet: wallet,
chainNotifier: chainNotifier,
signerClient: signerClient,
musig2ServerSign: musig2ServerSigner,
VerifySchnorrSig: verifySchnorrSig,
chainParams: chainparams,
store: store,
sweepStore: sweepStore,
clock: cfg.clock,
initialDelayProvider: cfg.initialDelayProvider,
publishDelay: cfg.publishDelay,
customFeeRate: cfg.customFeeRate,
txLabeler: cfg.txLabeler,
customMuSig2Signer: cfg.customMuSig2Signer,
publishErrorHandler: cfg.publishErrorHandler,
}
}

Expand Down Expand Up @@ -749,11 +764,10 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) {
cfg.batchPublishDelay = b.publishDelay
}

if b.initialDelay < 0 {
return nil, fmt.Errorf("negative initialDelay: %v",
b.initialDelay)
cfg.initialDelayProvider = b.initialDelayProvider
if cfg.initialDelayProvider == nil {
cfg.initialDelayProvider = zeroInitialDelay
}
cfg.initialDelay = b.initialDelay

batchKit := b.newBatchKit()

Expand Down Expand Up @@ -847,6 +861,8 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
// Note that initialDelay and batchPublishDelay are 0 for batches
// recovered from DB so publishing happen in case of a daemon restart
// (especially important in case of a crashloop).
cfg.initialDelayProvider = zeroInitialDelay

newBatch, err := NewBatchFromDB(cfg, batchKit)
if err != nil {
return fmt.Errorf("failed in NewBatchFromDB: %w", err)
Expand Down
Loading