Skip to content

Commit f7095e7

Browse files
committed
sweepbatcher: customize initialDelay per sweep
Option WithInitialDelay now accepts a function returning initialDelay depending on sweep's data. This is needed to be able to wait longer for sweeps with low priority, but still sweeping high priority sweeps soon.
1 parent 283f31c commit f7095e7

File tree

3 files changed

+364
-59
lines changed

3 files changed

+364
-59
lines changed

sweepbatcher/sweep_batch.go

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,13 @@ type batchConfig struct {
151151
// clock provides methods to work with time and timers.
152152
clock clock.Clock
153153

154-
// initialDelay is the delay of first batch publishing after creation.
155-
// It only affects newly created batches, not batches loaded from DB,
156-
// so publishing does happen in case of a daemon restart (especially
157-
// important in case of a crashloop).
158-
initialDelay time.Duration
154+
// initialDelayProvider provides the delay of first batch publishing
155+
// after creation. It only affects newly created batches, not batches
156+
// loaded from DB, so publishing does happen in case of a daemon restart
157+
// (especially important in case of a crashloop). If a sweep is about to
158+
// expire (time until timeout is less that 2x initialDelay), then
159+
// waiting is skipped.
160+
initialDelayProvider InitialDelayProvider
159161

160162
// batchPublishDelay is the delay between receiving a new block or
161163
// initial delay completion and publishing the batch transaction.
@@ -728,6 +730,7 @@ func (b *batch) Run(ctx context.Context) error {
728730

729731
// Cache clock variable.
730732
clock := b.cfg.clock
733+
startTime := clock.Now()
731734

732735
blockChan, blockErrChan, err :=
733736
b.chainNotifier.RegisterBlockEpochNtfn(runCtx)
@@ -757,17 +760,15 @@ func (b *batch) Run(ctx context.Context) error {
757760

758761
// skipBefore is the time before which we skip batch publishing.
759762
// This is needed to facilitate better grouping of sweeps.
763+
// The value is set only if the batch has at least one sweep.
760764
// For batches loaded from DB initialDelay should be 0.
761-
skipBefore := clock.Now().Add(b.cfg.initialDelay)
765+
var skipBefore *time.Time
762766

763767
// initialDelayChan is a timer which fires upon initial delay end.
764768
// If initialDelay is set to 0, it will not trigger to avoid setting up
765769
// timerChan twice, which could lead to double publishing if
766770
// batchPublishDelay is also 0.
767771
var initialDelayChan <-chan time.Time
768-
if b.cfg.initialDelay > 0 {
769-
initialDelayChan = clock.TickAfter(b.cfg.initialDelay)
770-
}
771772

772773
// We use a timer in order to not publish new transactions at the same
773774
// time as the block epoch notification. This is done to prevent
@@ -781,6 +782,42 @@ func (b *batch) Run(ctx context.Context) error {
781782
b.primarySweepID, len(b.sweeps))
782783

783784
for {
785+
// If the batch if not empty, find earliest initialDelay.
786+
var totalSweptAmt btcutil.Amount
787+
for _, sweep := range b.sweeps {
788+
totalSweptAmt += sweep.value
789+
}
790+
791+
skipBeforeUpdated := false
792+
if totalSweptAmt != 0 {
793+
initialDelay, err := b.cfg.initialDelayProvider(
794+
ctx, len(b.sweeps), totalSweptAmt,
795+
)
796+
if err != nil {
797+
b.Warnf("initialDelayProvider failed: %v", err)
798+
initialDelay = 0
799+
}
800+
if initialDelay < 0 {
801+
b.Warnf("negative delay: %v", initialDelay)
802+
initialDelay = 0
803+
}
804+
delayStop := startTime.Add(initialDelay)
805+
if skipBefore == nil || delayStop.Before(*skipBefore) {
806+
skipBefore = &delayStop
807+
skipBeforeUpdated = true
808+
}
809+
}
810+
811+
// Create new timer only if the value of skipBefore was updated.
812+
// Don't create the timer if the delay is <= 0 to avoid double
813+
// publishing if batchPublishDelay is also 0.
814+
if skipBeforeUpdated {
815+
delay := skipBefore.Sub(clock.Now())
816+
if delay > 0 {
817+
initialDelayChan = clock.TickAfter(delay)
818+
}
819+
}
820+
784821
select {
785822
case <-b.callEnter:
786823
<-b.callLeave
@@ -796,7 +833,7 @@ func (b *batch) Run(ctx context.Context) error {
796833

797834
case <-initialDelayChan:
798835
b.Debugf("initial delay of duration %v has ended",
799-
b.cfg.initialDelay)
836+
clock.Now().Sub(startTime))
800837

801838
// Set the timer to publish the batch transaction after
802839
// the configured delay.
@@ -810,17 +847,23 @@ func (b *batch) Run(ctx context.Context) error {
810847
continue
811848
}
812849

850+
if skipBefore == nil {
851+
b.Debugf("Skipping publishing, because " +
852+
"the batch is empty.")
853+
continue
854+
}
855+
813856
// If the batch became urgent, skipBefore is set to now.
814-
if b.isUrgent(skipBefore) {
815-
skipBefore = clock.Now()
857+
if b.isUrgent(*skipBefore) {
858+
*skipBefore = clock.Now()
816859
}
817860

818861
// Check that the initial delay has ended. We have also
819862
// batchPublishDelay on top of initialDelay, so if
820863
// initialDelayChan has just fired, this check passes.
821864
now := clock.Now()
822865
if skipBefore.After(now) {
823-
b.Debugf(stillWaitingMsg, skipBefore, now)
866+
b.Debugf(stillWaitingMsg, *skipBefore, now)
824867
continue
825868
}
826869

sweepbatcher/sweep_batcher.go

Lines changed: 57 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,21 @@ type VerifySchnorrSig func(pubKey *btcec.PublicKey, hash, sig []byte) error
165165
type FeeRateProvider func(ctx context.Context,
166166
swapHash lntypes.Hash) (chainfee.SatPerKWeight, error)
167167

168+
// InitialDelayProvider returns the duration after new batch creation before it
169+
// is first published. It allows to customize the duration based on total value
170+
// of the batch. There is a trade-off between better grouping and getting funds
171+
// faster. If the function returns an error, no delay is used and the error is
172+
// logged as a warning.
173+
type InitialDelayProvider func(ctx context.Context, numSweeps int,
174+
value btcutil.Amount) (time.Duration, error)
175+
176+
// zeroInitialDelay returns no delay for any sweeps.
177+
func zeroInitialDelay(_ context.Context, _ int,
178+
_ btcutil.Amount) (time.Duration, error) {
179+
180+
return 0, nil
181+
}
182+
168183
// PublishErrorHandler is a function that handles transaction publishing error.
169184
type PublishErrorHandler func(err error, errMsg string, log btclog.Logger)
170185

@@ -306,13 +321,13 @@ type Batcher struct {
306321
// clock provides methods to work with time and timers.
307322
clock clock.Clock
308323

309-
// initialDelay is the delay of first batch publishing after creation.
310-
// It only affects newly created batches, not batches loaded from DB,
311-
// so publishing does happen in case of a daemon restart (especially
312-
// important in case of a crashloop). If a sweep is about to expire
313-
// (time until timeout is less that 2x initialDelay), then waiting is
314-
// skipped.
315-
initialDelay time.Duration
324+
// initialDelayProvider provides the delay of first batch publishing
325+
// after creation. It only affects newly created batches, not batches
326+
// loaded from DB, so publishing does happen in case of a daemon restart
327+
// (especially important in case of a crashloop). If a sweep is about to
328+
// expire (time until timeout is less that 2x initialDelay), then
329+
// waiting is skipped.
330+
initialDelayProvider InitialDelayProvider
316331

317332
// publishDelay is the delay of batch publishing that is applied in the
318333
// beginning, after the appearance of a new block in the network or
@@ -346,13 +361,13 @@ type BatcherConfig struct {
346361
// clock provides methods to work with time and timers.
347362
clock clock.Clock
348363

349-
// initialDelay is the delay of first batch publishing after creation.
350-
// It only affects newly created batches, not batches loaded from DB,
351-
// so publishing does happen in case of a daemon restart (especially
352-
// important in case of a crashloop). If a sweep is about to expire
353-
// (time until timeout is less that 2x initialDelay), then waiting is
354-
// skipped.
355-
initialDelay time.Duration
364+
// initialDelayProvider provides the delay of first batch publishing
365+
// after creation. It only affects newly created batches, not batches
366+
// loaded from DB, so publishing does happen in case of a daemon restart
367+
// (especially important in case of a crashloop). If a sweep is about to
368+
// expire (time until timeout is less that 2x initialDelay), then
369+
// waiting is skipped.
370+
initialDelayProvider InitialDelayProvider
356371

357372
// publishDelay is the delay of batch publishing that is applied in the
358373
// beginning, after the appearance of a new block in the network or
@@ -397,9 +412,9 @@ func WithClock(clock clock.Clock) BatcherOption {
397412
// better grouping. Defaults to 0s (no initial delay). If a sweep is about
398413
// to expire (time until timeout is less that 2x initialDelay), then waiting
399414
// is skipped.
400-
func WithInitialDelay(initialDelay time.Duration) BatcherOption {
415+
func WithInitialDelay(provider InitialDelayProvider) BatcherOption {
401416
return func(cfg *BatcherConfig) {
402-
cfg.initialDelay = initialDelay
417+
cfg.initialDelayProvider = provider
403418
}
404419
}
405420

@@ -485,27 +500,27 @@ func NewBatcher(wallet lndclient.WalletKitClient,
485500
}
486501

487502
return &Batcher{
488-
batches: make(map[int32]*batch),
489-
sweepReqs: make(chan SweepRequest),
490-
testReqs: make(chan *testRequest),
491-
errChan: make(chan error, 1),
492-
quit: make(chan struct{}),
493-
initDone: make(chan struct{}),
494-
wallet: wallet,
495-
chainNotifier: chainNotifier,
496-
signerClient: signerClient,
497-
musig2ServerSign: musig2ServerSigner,
498-
VerifySchnorrSig: verifySchnorrSig,
499-
chainParams: chainparams,
500-
store: store,
501-
sweepStore: sweepStore,
502-
clock: cfg.clock,
503-
initialDelay: cfg.initialDelay,
504-
publishDelay: cfg.publishDelay,
505-
customFeeRate: cfg.customFeeRate,
506-
txLabeler: cfg.txLabeler,
507-
customMuSig2Signer: cfg.customMuSig2Signer,
508-
publishErrorHandler: cfg.publishErrorHandler,
503+
batches: make(map[int32]*batch),
504+
sweepReqs: make(chan SweepRequest),
505+
testReqs: make(chan *testRequest),
506+
errChan: make(chan error, 1),
507+
quit: make(chan struct{}),
508+
initDone: make(chan struct{}),
509+
wallet: wallet,
510+
chainNotifier: chainNotifier,
511+
signerClient: signerClient,
512+
musig2ServerSign: musig2ServerSigner,
513+
VerifySchnorrSig: verifySchnorrSig,
514+
chainParams: chainparams,
515+
store: store,
516+
sweepStore: sweepStore,
517+
clock: cfg.clock,
518+
initialDelayProvider: cfg.initialDelayProvider,
519+
publishDelay: cfg.publishDelay,
520+
customFeeRate: cfg.customFeeRate,
521+
txLabeler: cfg.txLabeler,
522+
customMuSig2Signer: cfg.customMuSig2Signer,
523+
publishErrorHandler: cfg.publishErrorHandler,
509524
}
510525
}
511526

@@ -764,11 +779,10 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) {
764779
cfg.batchPublishDelay = b.publishDelay
765780
}
766781

767-
if b.initialDelay < 0 {
768-
return nil, fmt.Errorf("negative initialDelay: %v",
769-
b.initialDelay)
782+
cfg.initialDelayProvider = b.initialDelayProvider
783+
if cfg.initialDelayProvider == nil {
784+
cfg.initialDelayProvider = zeroInitialDelay
770785
}
771-
cfg.initialDelay = b.initialDelay
772786

773787
batchKit := b.newBatchKit()
774788

@@ -862,6 +876,8 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
862876
// Note that initialDelay and batchPublishDelay are 0 for batches
863877
// recovered from DB so publishing happen in case of a daemon restart
864878
// (especially important in case of a crashloop).
879+
cfg.initialDelayProvider = zeroInitialDelay
880+
865881
newBatch, err := NewBatchFromDB(cfg, batchKit)
866882
if err != nil {
867883
return fmt.Errorf("failed in NewBatchFromDB: %w", err)

0 commit comments

Comments
 (0)