Skip to content

Commit 798c32c

Browse files
committed
sweepbatcher: customize initialDelay per sweep
Option WithInitialDelay now accepts a function returning initialDelay depending on sweep's data. This is needed to be able to wait longer for sweeps with low priority, but still sweeping high priority sweeps soon.
1 parent e734d69 commit 798c32c

File tree

3 files changed

+348
-59
lines changed

3 files changed

+348
-59
lines changed

sweepbatcher/sweep_batch.go

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,13 @@ type batchConfig struct {
150150
// clock provides methods to work with time and timers.
151151
clock clock.Clock
152152

153-
// initialDelay is the delay of first batch publishing after creation.
154-
// It only affects newly created batches, not batches loaded from DB,
155-
// so publishing does happen in case of a daemon restart (especially
156-
// important in case of a crashloop).
157-
initialDelay time.Duration
153+
// initialDelayProvider provides the delay of first batch publishing
154+
// after creation. It only affects newly created batches, not batches
155+
// loaded from DB, so publishing does happen in case of a daemon restart
156+
// (especially important in case of a crashloop). If a sweep is about to
157+
// expire (time until timeout is less that 2x initialDelay), then
158+
// waiting is skipped.
159+
initialDelayProvider InitialDelayProvider
158160

159161
// batchPublishDelay is the delay between receiving a new block or
160162
// initial delay completion and publishing the batch transaction.
@@ -645,6 +647,7 @@ func (b *batch) Run(ctx context.Context) error {
645647

646648
// Cache clock variable.
647649
clock := b.cfg.clock
650+
startTime := clock.Now()
648651

649652
blockChan, blockErrChan, err :=
650653
b.chainNotifier.RegisterBlockEpochNtfn(runCtx)
@@ -674,17 +677,15 @@ func (b *batch) Run(ctx context.Context) error {
674677

675678
// skipBefore is the time before which we skip batch publishing.
676679
// This is needed to facilitate better grouping of sweeps.
680+
// The value is set only if the batch has at least one sweep.
677681
// For batches loaded from DB initialDelay should be 0.
678-
skipBefore := clock.Now().Add(b.cfg.initialDelay)
682+
var skipBefore *time.Time
679683

680684
// initialDelayChan is a timer which fires upon initial delay end.
681685
// If initialDelay is set to 0, it will not trigger to avoid setting up
682686
// timerChan twice, which could lead to double publishing if
683687
// batchPublishDelay is also 0.
684688
var initialDelayChan <-chan time.Time
685-
if b.cfg.initialDelay > 0 {
686-
initialDelayChan = clock.TickAfter(b.cfg.initialDelay)
687-
}
688689

689690
// We use a timer in order to not publish new transactions at the same
690691
// time as the block epoch notification. This is done to prevent
@@ -698,6 +699,42 @@ func (b *batch) Run(ctx context.Context) error {
698699
b.primarySweepID[0:6], len(b.sweeps))
699700

700701
for {
702+
// If the batch if not empty, find earliest initialDelay.
703+
var totalSweptAmt btcutil.Amount
704+
for _, sweep := range b.sweeps {
705+
totalSweptAmt += sweep.value
706+
}
707+
708+
skipBeforeUpdated := false
709+
if totalSweptAmt != 0 {
710+
initialDelay, err := b.cfg.initialDelayProvider(
711+
ctx, totalSweptAmt,
712+
)
713+
if err != nil {
714+
b.Warnf("initialDelayProvider failed: %v", err)
715+
initialDelay = 0
716+
}
717+
if initialDelay < 0 {
718+
b.Warnf("negative delay: %v", initialDelay)
719+
initialDelay = 0
720+
}
721+
delayStop := startTime.Add(initialDelay)
722+
if skipBefore == nil || delayStop.Before(*skipBefore) {
723+
skipBefore = &delayStop
724+
skipBeforeUpdated = true
725+
}
726+
}
727+
728+
// Create new timer only if the value of skipBefore was updated.
729+
// Don't create the timer if the delay is <= 0 to avoid double
730+
// publishing if batchPublishDelay is also 0.
731+
if skipBeforeUpdated {
732+
delay := skipBefore.Sub(clock.Now())
733+
if delay > 0 {
734+
initialDelayChan = clock.TickAfter(delay)
735+
}
736+
}
737+
701738
select {
702739
case <-b.callEnter:
703740
<-b.callLeave
@@ -713,7 +750,7 @@ func (b *batch) Run(ctx context.Context) error {
713750

714751
case <-initialDelayChan:
715752
b.Debugf("initial delay of duration %v has ended",
716-
b.cfg.initialDelay)
753+
clock.Now().Sub(startTime))
717754

718755
// Set the timer to publish the batch transaction after
719756
// the configured delay.
@@ -727,17 +764,23 @@ func (b *batch) Run(ctx context.Context) error {
727764
continue
728765
}
729766

767+
if skipBefore == nil {
768+
b.Debugf("Skipping publishing, because " +
769+
"the batch is empty.")
770+
continue
771+
}
772+
730773
// If the batch became urgent, skipBefore is set to now.
731-
if b.isUrgent(skipBefore) {
732-
skipBefore = clock.Now()
774+
if b.isUrgent(*skipBefore) {
775+
*skipBefore = clock.Now()
733776
}
734777

735778
// Check that the initial delay has ended. We have also
736779
// batchPublishDelay on top of initialDelay, so if
737780
// initialDelayChan has just fired, this check passes.
738781
now := clock.Now()
739782
if skipBefore.After(now) {
740-
b.Debugf(stillWaitingMsg, skipBefore, now)
783+
b.Debugf(stillWaitingMsg, *skipBefore, now)
741784
continue
742785
}
743786

sweepbatcher/sweep_batcher.go

Lines changed: 57 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,21 @@ type VerifySchnorrSig func(pubKey *btcec.PublicKey, hash, sig []byte) error
162162
type FeeRateProvider func(ctx context.Context,
163163
swapHash lntypes.Hash) (chainfee.SatPerKWeight, error)
164164

165+
// InitialDelayProvider returns the duration after new batch creation before it
166+
// is first published. It allows to customize the duration based on total value
167+
// of the batch. There is a trade-off between better grouping and getting funds
168+
// faster. If the function returns an error, no delay is used and the error is
169+
// logged as a warning.
170+
type InitialDelayProvider func(ctx context.Context,
171+
value btcutil.Amount) (time.Duration, error)
172+
173+
// zeroInitialDelay returns no delay for any sweeps.
174+
func zeroInitialDelay(_ context.Context,
175+
_ btcutil.Amount) (time.Duration, error) {
176+
177+
return 0, nil
178+
}
179+
165180
// PublishErrorHandler is a function that handles transaction publishing error.
166181
type PublishErrorHandler func(err error, errMsg string, log btclog.Logger)
167182

@@ -296,13 +311,13 @@ type Batcher struct {
296311
// clock provides methods to work with time and timers.
297312
clock clock.Clock
298313

299-
// initialDelay is the delay of first batch publishing after creation.
300-
// It only affects newly created batches, not batches loaded from DB,
301-
// so publishing does happen in case of a daemon restart (especially
302-
// important in case of a crashloop). If a sweep is about to expire
303-
// (time until timeout is less that 2x initialDelay), then waiting is
304-
// skipped.
305-
initialDelay time.Duration
314+
// initialDelayProvider provides the delay of first batch publishing
315+
// after creation. It only affects newly created batches, not batches
316+
// loaded from DB, so publishing does happen in case of a daemon restart
317+
// (especially important in case of a crashloop). If a sweep is about to
318+
// expire (time until timeout is less that 2x initialDelay), then
319+
// waiting is skipped.
320+
initialDelayProvider InitialDelayProvider
306321

307322
// publishDelay is the delay of batch publishing that is applied in the
308323
// beginning, after the appearance of a new block in the network or
@@ -336,13 +351,13 @@ type BatcherConfig struct {
336351
// clock provides methods to work with time and timers.
337352
clock clock.Clock
338353

339-
// initialDelay is the delay of first batch publishing after creation.
340-
// It only affects newly created batches, not batches loaded from DB,
341-
// so publishing does happen in case of a daemon restart (especially
342-
// important in case of a crashloop). If a sweep is about to expire
343-
// (time until timeout is less that 2x initialDelay), then waiting is
344-
// skipped.
345-
initialDelay time.Duration
354+
// initialDelayProvider provides the delay of first batch publishing
355+
// after creation. It only affects newly created batches, not batches
356+
// loaded from DB, so publishing does happen in case of a daemon restart
357+
// (especially important in case of a crashloop). If a sweep is about to
358+
// expire (time until timeout is less that 2x initialDelay), then
359+
// waiting is skipped.
360+
initialDelayProvider InitialDelayProvider
346361

347362
// publishDelay is the delay of batch publishing that is applied in the
348363
// beginning, after the appearance of a new block in the network or
@@ -387,9 +402,9 @@ func WithClock(clock clock.Clock) BatcherOption {
387402
// better grouping. Defaults to 0s (no initial delay). If a sweep is about
388403
// to expire (time until timeout is less that 2x initialDelay), then waiting
389404
// is skipped.
390-
func WithInitialDelay(initialDelay time.Duration) BatcherOption {
405+
func WithInitialDelay(provider InitialDelayProvider) BatcherOption {
391406
return func(cfg *BatcherConfig) {
392-
cfg.initialDelay = initialDelay
407+
cfg.initialDelayProvider = provider
393408
}
394409
}
395410

@@ -475,27 +490,27 @@ func NewBatcher(wallet lndclient.WalletKitClient,
475490
}
476491

477492
return &Batcher{
478-
batches: make(map[int32]*batch),
479-
sweepReqs: make(chan SweepRequest),
480-
testReqs: make(chan *testRequest),
481-
errChan: make(chan error, 1),
482-
quit: make(chan struct{}),
483-
initDone: make(chan struct{}),
484-
wallet: wallet,
485-
chainNotifier: chainNotifier,
486-
signerClient: signerClient,
487-
musig2ServerSign: musig2ServerSigner,
488-
VerifySchnorrSig: verifySchnorrSig,
489-
chainParams: chainparams,
490-
store: store,
491-
sweepStore: sweepStore,
492-
clock: cfg.clock,
493-
initialDelay: cfg.initialDelay,
494-
publishDelay: cfg.publishDelay,
495-
customFeeRate: cfg.customFeeRate,
496-
txLabeler: cfg.txLabeler,
497-
customMuSig2Signer: cfg.customMuSig2Signer,
498-
publishErrorHandler: cfg.publishErrorHandler,
493+
batches: make(map[int32]*batch),
494+
sweepReqs: make(chan SweepRequest),
495+
testReqs: make(chan *testRequest),
496+
errChan: make(chan error, 1),
497+
quit: make(chan struct{}),
498+
initDone: make(chan struct{}),
499+
wallet: wallet,
500+
chainNotifier: chainNotifier,
501+
signerClient: signerClient,
502+
musig2ServerSign: musig2ServerSigner,
503+
VerifySchnorrSig: verifySchnorrSig,
504+
chainParams: chainparams,
505+
store: store,
506+
sweepStore: sweepStore,
507+
clock: cfg.clock,
508+
initialDelayProvider: cfg.initialDelayProvider,
509+
publishDelay: cfg.publishDelay,
510+
customFeeRate: cfg.customFeeRate,
511+
txLabeler: cfg.txLabeler,
512+
customMuSig2Signer: cfg.customMuSig2Signer,
513+
publishErrorHandler: cfg.publishErrorHandler,
499514
}
500515
}
501516

@@ -746,11 +761,10 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) {
746761
cfg.batchPublishDelay = b.publishDelay
747762
}
748763

749-
if b.initialDelay < 0 {
750-
return nil, fmt.Errorf("negative initialDelay: %v",
751-
b.initialDelay)
764+
cfg.initialDelayProvider = b.initialDelayProvider
765+
if cfg.initialDelayProvider == nil {
766+
cfg.initialDelayProvider = zeroInitialDelay
752767
}
753-
cfg.initialDelay = b.initialDelay
754768

755769
batchKit := b.newBatchKit()
756770

@@ -844,6 +858,8 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
844858
// Note that initialDelay and batchPublishDelay are 0 for batches
845859
// recovered from DB so publishing happen in case of a daemon restart
846860
// (especially important in case of a crashloop).
861+
cfg.initialDelayProvider = zeroInitialDelay
862+
847863
newBatch, err := NewBatchFromDB(cfg, batchKit)
848864
if err != nil {
849865
return fmt.Errorf("failed in NewBatchFromDB: %w", err)

0 commit comments

Comments
 (0)