Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions sweepbatcher/sweep_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ func (b *batch) Run(ctx context.Context) error {
blockChan, blockErrChan, err :=
b.chainNotifier.RegisterBlockEpochNtfn(runCtx)
if err != nil {
return err
return fmt.Errorf("block registration error: %w", err)
}

// Set currentHeight here, because it may be needed in monitorSpend.
Expand All @@ -658,7 +658,8 @@ func (b *batch) Run(ctx context.Context) error {
b.Debugf("initial height for the batch is %v", b.currentHeight)

case <-runCtx.Done():
return runCtx.Err()
return fmt.Errorf("context expired while waiting for current "+
"height: %w", runCtx.Err())
}

// If a primary sweep exists we immediately start monitoring for its
Expand All @@ -667,7 +668,7 @@ func (b *batch) Run(ctx context.Context) error {
sweep := b.sweeps[b.primarySweepID]
err := b.monitorSpend(runCtx, sweep)
if err != nil {
return err
return fmt.Errorf("monitorSpend error: %w", err)
}
}

Expand Down Expand Up @@ -742,17 +743,21 @@ func (b *batch) Run(ctx context.Context) error {

err := b.publish(ctx)
if err != nil {
return err
return fmt.Errorf("publish error: %w", err)
}

case spend := <-b.spendChan:
err := b.handleSpend(runCtx, spend.SpendingTx)
if err != nil {
return err
return fmt.Errorf("handleSpend error: %w", err)
}

case <-b.confChan:
return b.handleConf(runCtx)
if err := b.handleConf(runCtx); err != nil {
return fmt.Errorf("handleConf error: %w", err)
}

return nil

case <-b.reorgChan:
b.state = Open
Expand All @@ -761,21 +766,22 @@ func (b *batch) Run(ctx context.Context) error {

err := b.monitorSpend(ctx, b.sweeps[b.primarySweepID])
if err != nil {
return err
return fmt.Errorf("monitorSpend error: %w", err)
}

case testReq := <-b.testReqs:
testReq.handler()
close(testReq.quit)

case err := <-blockErrChan:
return err
return fmt.Errorf("blocks monitoring error: %w", err)

case err := <-b.errChan:
return err
return fmt.Errorf("error with the batch: %w", err)

case <-runCtx.Done():
return runCtx.Err()
return fmt.Errorf("batch context expired: %w",
runCtx.Err())
}
}
}
Expand Down Expand Up @@ -1555,7 +1561,10 @@ func (b *batch) monitorSpend(ctx context.Context, primarySweep sweep) error {
return

case err := <-spendErr:
b.writeToErrChan(err)
b.writeToErrChan(
fmt.Errorf("spend error: %w", err),
)

return

case <-ctx.Done():
Expand Down Expand Up @@ -1595,10 +1604,13 @@ func (b *batch) monitorConfirmations(ctx context.Context) error {

case <-ctx.Done():
}

return

case err := <-errChan:
b.writeToErrChan(err)
b.writeToErrChan(fmt.Errorf("confirmations "+
"monitoring error: %w", err))

return

case <-reorgChan:
Expand Down
18 changes: 11 additions & 7 deletions sweepbatcher/sweep_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,9 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) {

err := batch.Run(ctx)
if err != nil {
_ = b.writeToErrChan(ctx, err)
b.writeToErrChan(
ctx, fmt.Errorf("new batch failed: %w", err),
)
}
}()

Expand Down Expand Up @@ -856,7 +858,9 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {

err := newBatch.Run(ctx)
if err != nil {
_ = b.writeToErrChan(ctx, err)
b.writeToErrChan(
ctx, fmt.Errorf("db batch failed: %w", err),
)
}
}()

Expand Down Expand Up @@ -973,7 +977,10 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep,
case <-ctx.Done():
}

_ = b.writeToErrChan(ctx, err)
b.writeToErrChan(
ctx, fmt.Errorf("spend error: %w", err),
)

return

case <-notifier.QuitChan:
Expand All @@ -988,13 +995,10 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep,
return nil
}

func (b *Batcher) writeToErrChan(ctx context.Context, err error) error {
func (b *Batcher) writeToErrChan(ctx context.Context, err error) {
select {
case b.errChan <- err:
return nil

case <-ctx.Done():
return ctx.Err()
}
}

Expand Down