Skip to content

Commit c201dc1

Browse files
committed
sweepbatcher: notify caller about confirmations
Add fields ConfChan and ConfErrChan to SpendNotifier type which is a part of SweepRequest passed to AddSweep method. This is needed to reuse confirmation notifications on the calling side the same way it is done for spending notifications.
1 parent e54b735 commit c201dc1

File tree

3 files changed

+230
-3
lines changed

3 files changed

+230
-3
lines changed

sweepbatcher/sweep_batch.go

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1876,6 +1876,8 @@ func (b *batch) monitorConfirmations(ctx context.Context) error {
18761876
}
18771877

18781878
case err := <-errChan:
1879+
b.writeToConfErrChan(ctx, err)
1880+
18791881
b.writeToErrChan(fmt.Errorf("confirmations "+
18801882
"monitoring error: %w", err))
18811883

@@ -2136,7 +2138,37 @@ func (b *batch) handleConf(ctx context.Context,
21362138
b.Infof("confirmed in txid %s", b.batchTxid)
21372139
b.state = Confirmed
21382140

2139-
return b.store.ConfirmBatch(ctx, b.id)
2141+
if err := b.store.ConfirmBatch(ctx, b.id); err != nil {
2142+
return fmt.Errorf("failed to store confirmed state: %w", err)
2143+
}
2144+
2145+
// Send the confirmation to all the notifiers.
2146+
for _, s := range b.sweeps {
2147+
// If the sweep's notifier is empty then this means that
2148+
// a swap is not waiting to read an update from it, so
2149+
// we can skip the notification part.
2150+
if s.notifier == nil || s.notifier.ConfChan == nil {
2151+
continue
2152+
}
2153+
2154+
// Notify the caller in a goroutine to avoid possible dead-lock.
2155+
go func(notifier *SpendNotifier) {
2156+
// Note that we don't unblock on ctx, because it will
2157+
// expire soon, when batch.Run completes. The caller is
2158+
// responsible to consume ConfChan or close QuitChan.
2159+
select {
2160+
// Try to write the confirmation to the notification
2161+
// channel.
2162+
case notifier.ConfChan <- conf:
2163+
2164+
// If a quit signal was provided by the swap,
2165+
// continue.
2166+
case <-notifier.QuitChan:
2167+
}
2168+
}(s.notifier)
2169+
}
2170+
2171+
return nil
21402172
}
21412173

21422174
// isComplete returns true if the batch is completed. This method is used by the
@@ -2292,6 +2324,44 @@ func (b *batch) writeToSpendErrChan(ctx context.Context, spendErr error) {
22922324
}
22932325
}
22942326

2327+
// writeToConfErrChan sends an error to confirmation error channels of all the
2328+
// sweeps.
2329+
func (b *batch) writeToConfErrChan(ctx context.Context, confErr error) {
2330+
done, err := b.scheduleNextCall()
2331+
if err != nil {
2332+
done()
2333+
2334+
return
2335+
}
2336+
notifiers := make([]*SpendNotifier, 0, len(b.sweeps))
2337+
for _, s := range b.sweeps {
2338+
// If the sweep's notifier is empty then this means that a swap
2339+
// is not waiting to read an update from it, so we can skip
2340+
// the notification part.
2341+
if s.notifier == nil || s.notifier.ConfErrChan == nil {
2342+
continue
2343+
}
2344+
2345+
notifiers = append(notifiers, s.notifier)
2346+
}
2347+
done()
2348+
2349+
for _, notifier := range notifiers {
2350+
select {
2351+
// Try to write the error to the notification
2352+
// channel.
2353+
case notifier.ConfErrChan <- confErr:
2354+
2355+
// If a quit signal was provided by the swap,
2356+
// continue.
2357+
case <-notifier.QuitChan:
2358+
2359+
// If the context was canceled, stop.
2360+
case <-ctx.Done():
2361+
}
2362+
}
2363+
}
2364+
22952365
func (b *batch) persistSweep(ctx context.Context, sweep sweep,
22962366
completed bool) error {
22972367

sweepbatcher/sweep_batcher.go

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/lightninglabs/loop/loopdb"
2121
"github.com/lightninglabs/loop/swap"
2222
"github.com/lightninglabs/loop/utils"
23+
"github.com/lightningnetwork/lnd/chainntnfs"
2324
"github.com/lightningnetwork/lnd/clock"
2425
"github.com/lightningnetwork/lnd/input"
2526
"github.com/lightningnetwork/lnd/lntypes"
@@ -281,6 +282,14 @@ type SpendNotifier struct {
281282
// SpendErrChan is a channel where spend errors are received.
282283
SpendErrChan chan<- error
283284

285+
// ConfChan is a channel where the confirmation details are received.
286+
// This channel is optional.
287+
ConfChan chan<- *chainntnfs.TxConfirmation
288+
289+
// ConfErrChan is a channel where confirmation errors are received.
290+
// This channel is optional.
291+
ConfErrChan chan<- error
292+
284293
// QuitChan is a channel that can be closed to stop the notifier.
285294
QuitChan <-chan bool
286295
}
@@ -1049,7 +1058,9 @@ func (b *Batcher) FetchUnconfirmedBatches(ctx context.Context) ([]*batch,
10491058
}
10501059

10511060
// monitorSpendAndNotify monitors the spend of a specific outpoint and writes
1052-
// the response back to the response channel.
1061+
// the response back to the response channel. It is called if the batch is fully
1062+
// confirmed and we just need to deliver the data back to the caller though
1063+
// SpendNotifier.
10531064
func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep,
10541065
parentBatchID int32, notifier *SpendNotifier) error {
10551066

@@ -1107,6 +1118,15 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep,
11071118
select {
11081119
// Try to write the update to the notification channel.
11091120
case notifier.SpendChan <- spendDetail:
1121+
err := b.monitorConfAndNotify(
1122+
ctx, sweep, notifier, spendTx,
1123+
)
1124+
if err != nil {
1125+
b.writeToErrChan(
1126+
ctx, fmt.Errorf("monitor conf "+
1127+
"failed: %w", err),
1128+
)
1129+
}
11101130

11111131
// If a quit signal was provided by the swap, continue.
11121132
case <-notifier.QuitChan:
@@ -1150,6 +1170,78 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep,
11501170
return nil
11511171
}
11521172

1173+
// monitorConfAndNotify monitors the confirmation of a specific transaction and
1174+
// writes the response back to the response channel. It is called if the batch
1175+
// is fully confirmed and we just need to deliver the data back to the caller
1176+
// though SpendNotifier.
1177+
func (b *Batcher) monitorConfAndNotify(ctx context.Context, sweep *sweep,
1178+
notifier *SpendNotifier, spendTx *wire.MsgTx) error {
1179+
1180+
// If confirmation notifications were not requested, stop.
1181+
if notifier.ConfChan == nil && notifier.ConfErrChan == nil {
1182+
return nil
1183+
}
1184+
1185+
batchTxid := spendTx.TxHash()
1186+
1187+
if len(spendTx.TxOut) != 1 {
1188+
return fmt.Errorf("unexpected number of outputs in batch: %d, "+
1189+
"want %d", len(spendTx.TxOut), 1)
1190+
}
1191+
batchPkScript := spendTx.TxOut[0].PkScript
1192+
1193+
reorgChan := make(chan struct{})
1194+
1195+
confCtx, cancel := context.WithCancel(ctx)
1196+
1197+
confChan, errChan, err := b.chainNotifier.RegisterConfirmationsNtfn(
1198+
confCtx, &batchTxid, batchPkScript, batchConfHeight,
1199+
sweep.initiationHeight, lndclient.WithReOrgChan(reorgChan),
1200+
)
1201+
if err != nil {
1202+
cancel()
1203+
return err
1204+
}
1205+
1206+
b.wg.Add(1)
1207+
go func() {
1208+
defer cancel()
1209+
defer b.wg.Done()
1210+
1211+
select {
1212+
case conf := <-confChan:
1213+
if notifier.ConfChan != nil {
1214+
select {
1215+
case notifier.ConfChan <- conf:
1216+
case <-notifier.QuitChan:
1217+
case <-ctx.Done():
1218+
}
1219+
}
1220+
1221+
case err := <-errChan:
1222+
if notifier.ConfErrChan != nil {
1223+
select {
1224+
case notifier.ConfErrChan <- err:
1225+
case <-notifier.QuitChan:
1226+
case <-ctx.Done():
1227+
}
1228+
}
1229+
1230+
b.writeToErrChan(ctx, fmt.Errorf("confirmations "+
1231+
"monitoring error: %w", err))
1232+
1233+
case <-reorgChan:
1234+
// A re-org has been detected, but the batch is fully
1235+
// confirmed and this is unexpected. Crash the batcher.
1236+
b.writeToErrChan(ctx, fmt.Errorf("unexpected reorg"))
1237+
1238+
case <-ctx.Done():
1239+
}
1240+
}()
1241+
1242+
return nil
1243+
}
1244+
11531245
func (b *Batcher) writeToErrChan(ctx context.Context, err error) {
11541246
select {
11551247
case b.errChan <- err:

sweepbatcher/sweep_batcher_test.go

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -887,9 +887,11 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
887887

888888
// Deliver sweep request to batcher.
889889
spendChan := make(chan *SpendDetail, 1)
890+
confErrChan := make(chan error)
890891
notifier = &SpendNotifier{
891892
SpendChan: spendChan,
892893
SpendErrChan: make(chan error, 1),
894+
ConfErrChan: confErrChan,
893895
QuitChan: make(chan bool, 1),
894896
}
895897
sweepReq1.Notifier = notifier
@@ -968,6 +970,10 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
968970
// Emulate a confirmation error.
969971
confReg.ErrChan <- testError
970972

973+
// Make sure the notifier gets the confirmation error.
974+
confErr := <-confErrChan
975+
require.ErrorIs(t, confErr, testError)
976+
971977
// Wait for the batcher to crash because of the confirmation error.
972978
runErr = <-runErrChan
973979
require.ErrorIs(t, runErr, testError)
@@ -986,9 +992,11 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
986992

987993
// Deliver sweep request to batcher.
988994
spendChan = make(chan *SpendDetail, 1)
995+
confChan := make(chan *chainntnfs.TxConfirmation)
989996
notifier = &SpendNotifier{
990997
SpendChan: spendChan,
991998
SpendErrChan: make(chan error, 1),
999+
ConfChan: confChan,
9921000
QuitChan: make(chan bool, 1),
9931001
}
9941002
sweepReq1.Notifier = notifier
@@ -1043,9 +1051,15 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
10431051

10441052
// We mock the tx confirmation notification.
10451053
lnd.ConfChannel <- &chainntnfs.TxConfirmation{
1046-
Tx: spendingTx,
1054+
BlockHeight: 604,
1055+
Tx: spendingTx,
10471056
}
10481057

1058+
// Make sure the notifier gets a confirmation notification.
1059+
conf := <-confChan
1060+
require.Equal(t, uint32(604), conf.BlockHeight)
1061+
require.Equal(t, spendingTx.TxHash(), conf.Tx.TxHash())
1062+
10491063
// Eventually the batch receives the confirmation notification and
10501064
// confirms itself.
10511065
require.Eventually(t, func() bool {
@@ -1060,9 +1074,11 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
10601074
// Now emulate adding the sweep again after it was fully confirmed.
10611075
// This triggers another code path (monitorSpendAndNotify).
10621076
spendChan = make(chan *SpendDetail, 1)
1077+
confChan = make(chan *chainntnfs.TxConfirmation)
10631078
notifier = &SpendNotifier{
10641079
SpendChan: spendChan,
10651080
SpendErrChan: make(chan error, 1),
1081+
ConfChan: confChan,
10661082
QuitChan: make(chan bool, 1),
10671083
}
10681084
sweepReq1.Notifier = notifier
@@ -1079,6 +1095,18 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
10791095
require.Equal(t, spendingTxHash, spending.Tx.TxHash())
10801096
require.Equal(t, btcutil.Amount(fee), spending.OnChainFeePortion)
10811097

1098+
// We mock the tx confirmation notification.
1099+
<-lnd.RegisterConfChannel
1100+
lnd.ConfChannel <- &chainntnfs.TxConfirmation{
1101+
BlockHeight: 604,
1102+
Tx: spendingTx,
1103+
}
1104+
1105+
// Make sure the notifier gets a confirmation notification.
1106+
conf = <-confChan
1107+
require.Equal(t, uint32(604), conf.BlockHeight)
1108+
require.Equal(t, spendingTx.TxHash(), conf.Tx.TxHash())
1109+
10821110
// Now check what happens in case of a spending error.
10831111
spendErrChan = make(chan error, 1)
10841112
notifier = &SpendNotifier{
@@ -1103,6 +1131,43 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
11031131
// Wait for the batcher to crash because of the spending error.
11041132
runErr = <-runErrChan
11051133
require.ErrorIs(t, runErr, testError)
1134+
1135+
// Now launch the batcher again.
1136+
batcher = NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
1137+
testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams,
1138+
batcherStore, sweepStore)
1139+
go func() {
1140+
runErrChan <- batcher.Run(ctx)
1141+
}()
1142+
1143+
// Now check what happens in case of a confirmation error.
1144+
confErrChan = make(chan error, 1)
1145+
notifier = &SpendNotifier{
1146+
SpendChan: make(chan *SpendDetail, 1),
1147+
SpendErrChan: make(chan error, 1),
1148+
ConfErrChan: confErrChan,
1149+
QuitChan: make(chan bool, 1),
1150+
}
1151+
sweepReq1.Notifier = notifier
1152+
require.NoError(t, batcher.AddSweep(&sweepReq1))
1153+
1154+
// Expect a spending registration.
1155+
<-lnd.RegisterSpendChannel
1156+
1157+
// We notify the spend.
1158+
lnd.SpendChannel <- spendDetail
1159+
1160+
// We mock the tx confirmation error notification.
1161+
confReg = <-lnd.RegisterConfChannel
1162+
confReg.ErrChan <- testError
1163+
1164+
// Make sure the notifier gets the confirmation error.
1165+
confErr = <-confErrChan
1166+
require.ErrorIs(t, confErr, testError)
1167+
1168+
// Wait for the batcher to crash because of the confirmation error.
1169+
runErr = <-runErrChan
1170+
require.ErrorIs(t, runErr, testError)
11061171
}
11071172

11081173
// wrappedLogger implements btclog.Logger, recording last debug message format.

0 commit comments

Comments
 (0)