Skip to content

Commit 6929c7d

Browse files
committed
sweepbatcher: notify caller about confirmations
Add fields ConfChan and ConfErrChan to SpendNotifier type which is a part of SweepRequest passed to AddSweep method. This is needed to reuse confirmation notifications on the calling side the same way it is done for spending notifications.
1 parent ac2785f commit 6929c7d

File tree

3 files changed

+230
-3
lines changed

3 files changed

+230
-3
lines changed

sweepbatcher/sweep_batch.go

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1882,6 +1882,8 @@ func (b *batch) monitorConfirmations(ctx context.Context) error {
18821882
}
18831883

18841884
case err := <-errChan:
1885+
b.writeToConfErrChan(ctx, err)
1886+
18851887
b.writeToErrChan(fmt.Errorf("confirmations "+
18861888
"monitoring error: %w", err))
18871889

@@ -2159,7 +2161,37 @@ func (b *batch) handleConf(ctx context.Context,
21592161
b.Infof("confirmed in txid %s", b.batchTxid)
21602162
b.state = Confirmed
21612163

2162-
return b.store.ConfirmBatch(ctx, b.id)
2164+
if err := b.store.ConfirmBatch(ctx, b.id); err != nil {
2165+
return fmt.Errorf("failed to store confirmed state: %w", err)
2166+
}
2167+
2168+
// Send the confirmation to all the notifiers.
2169+
for _, s := range b.sweeps {
2170+
// If the sweep's notifier is empty then this means that
2171+
// a swap is not waiting to read an update from it, so
2172+
// we can skip the notification part.
2173+
if s.notifier == nil || s.notifier.ConfChan == nil {
2174+
continue
2175+
}
2176+
2177+
// Notify the caller in a goroutine to avoid possible dead-lock.
2178+
go func(notifier *SpendNotifier) {
2179+
// Note that we don't unblock on ctx, because it will
2180+
// expire soon, when batch.Run completes. The caller is
2181+
// responsible to consume ConfChan or close QuitChan.
2182+
select {
2183+
// Try to write the confirmation to the notification
2184+
// channel.
2185+
case notifier.ConfChan <- conf:
2186+
2187+
// If a quit signal was provided by the swap,
2188+
// continue.
2189+
case <-notifier.QuitChan:
2190+
}
2191+
}(s.notifier)
2192+
}
2193+
2194+
return nil
21632195
}
21642196

21652197
// isComplete returns true if the batch is completed. This method is used by the
@@ -2315,6 +2347,44 @@ func (b *batch) writeToSpendErrChan(ctx context.Context, spendErr error) {
23152347
}
23162348
}
23172349

2350+
// writeToConfErrChan sends an error to confirmation error channels of all the
2351+
// sweeps.
2352+
func (b *batch) writeToConfErrChan(ctx context.Context, confErr error) {
2353+
done, err := b.scheduleNextCall()
2354+
if err != nil {
2355+
done()
2356+
2357+
return
2358+
}
2359+
notifiers := make([]*SpendNotifier, 0, len(b.sweeps))
2360+
for _, s := range b.sweeps {
2361+
// If the sweep's notifier is empty then this means that a swap
2362+
// is not waiting to read an update from it, so we can skip
2363+
// the notification part.
2364+
if s.notifier == nil || s.notifier.ConfErrChan == nil {
2365+
continue
2366+
}
2367+
2368+
notifiers = append(notifiers, s.notifier)
2369+
}
2370+
done()
2371+
2372+
for _, notifier := range notifiers {
2373+
select {
2374+
// Try to write the error to the notification
2375+
// channel.
2376+
case notifier.ConfErrChan <- confErr:
2377+
2378+
// If a quit signal was provided by the swap,
2379+
// continue.
2380+
case <-notifier.QuitChan:
2381+
2382+
// If the context was canceled, stop.
2383+
case <-ctx.Done():
2384+
}
2385+
}
2386+
}
2387+
23182388
func (b *batch) persistSweep(ctx context.Context, sweep sweep,
23192389
completed bool) error {
23202390

sweepbatcher/sweep_batcher.go

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/lightninglabs/loop/loopdb"
2121
"github.com/lightninglabs/loop/swap"
2222
"github.com/lightninglabs/loop/utils"
23+
"github.com/lightningnetwork/lnd/chainntnfs"
2324
"github.com/lightningnetwork/lnd/clock"
2425
"github.com/lightningnetwork/lnd/input"
2526
"github.com/lightningnetwork/lnd/lntypes"
@@ -300,6 +301,14 @@ type SpendNotifier struct {
300301
// SpendErrChan is a channel where spend errors are received.
301302
SpendErrChan chan<- error
302303

304+
// ConfChan is a channel where the confirmation details are received.
305+
// This channel is optional.
306+
ConfChan chan<- *chainntnfs.TxConfirmation
307+
308+
// ConfErrChan is a channel where confirmation errors are received.
309+
// This channel is optional.
310+
ConfErrChan chan<- error
311+
303312
// QuitChan is a channel that can be closed to stop the notifier.
304313
QuitChan <-chan bool
305314
}
@@ -1114,7 +1123,9 @@ func (b *Batcher) FetchUnconfirmedBatches(ctx context.Context) ([]*batch,
11141123
}
11151124

11161125
// monitorSpendAndNotify monitors the spend of a specific outpoint and writes
1117-
// the response back to the response channel.
1126+
// the response back to the response channel. It is called if the batch is fully
1127+
// confirmed and we just need to deliver the data back to the caller though
1128+
// SpendNotifier.
11181129
func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep,
11191130
parentBatchID int32, notifier *SpendNotifier) error {
11201131

@@ -1172,6 +1183,15 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep,
11721183
select {
11731184
// Try to write the update to the notification channel.
11741185
case notifier.SpendChan <- spendDetail:
1186+
err := b.monitorConfAndNotify(
1187+
ctx, sweep, notifier, spendTx,
1188+
)
1189+
if err != nil {
1190+
b.writeToErrChan(
1191+
ctx, fmt.Errorf("monitor conf "+
1192+
"failed: %w", err),
1193+
)
1194+
}
11751195

11761196
// If a quit signal was provided by the swap, continue.
11771197
case <-notifier.QuitChan:
@@ -1215,6 +1235,78 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep,
12151235
return nil
12161236
}
12171237

1238+
// monitorConfAndNotify monitors the confirmation of a specific transaction and
1239+
// writes the response back to the response channel. It is called if the batch
1240+
// is fully confirmed and we just need to deliver the data back to the caller
1241+
// though SpendNotifier.
1242+
func (b *Batcher) monitorConfAndNotify(ctx context.Context, sweep *sweep,
1243+
notifier *SpendNotifier, spendTx *wire.MsgTx) error {
1244+
1245+
// If confirmation notifications were not requested, stop.
1246+
if notifier.ConfChan == nil && notifier.ConfErrChan == nil {
1247+
return nil
1248+
}
1249+
1250+
batchTxid := spendTx.TxHash()
1251+
1252+
if len(spendTx.TxOut) != 1 {
1253+
return fmt.Errorf("unexpected number of outputs in batch: %d, "+
1254+
"want %d", len(spendTx.TxOut), 1)
1255+
}
1256+
batchPkScript := spendTx.TxOut[0].PkScript
1257+
1258+
reorgChan := make(chan struct{})
1259+
1260+
confCtx, cancel := context.WithCancel(ctx)
1261+
1262+
confChan, errChan, err := b.chainNotifier.RegisterConfirmationsNtfn(
1263+
confCtx, &batchTxid, batchPkScript, batchConfHeight,
1264+
sweep.initiationHeight, lndclient.WithReOrgChan(reorgChan),
1265+
)
1266+
if err != nil {
1267+
cancel()
1268+
return err
1269+
}
1270+
1271+
b.wg.Add(1)
1272+
go func() {
1273+
defer cancel()
1274+
defer b.wg.Done()
1275+
1276+
select {
1277+
case conf := <-confChan:
1278+
if notifier.ConfChan != nil {
1279+
select {
1280+
case notifier.ConfChan <- conf:
1281+
case <-notifier.QuitChan:
1282+
case <-ctx.Done():
1283+
}
1284+
}
1285+
1286+
case err := <-errChan:
1287+
if notifier.ConfErrChan != nil {
1288+
select {
1289+
case notifier.ConfErrChan <- err:
1290+
case <-notifier.QuitChan:
1291+
case <-ctx.Done():
1292+
}
1293+
}
1294+
1295+
b.writeToErrChan(ctx, fmt.Errorf("confirmations "+
1296+
"monitoring error: %w", err))
1297+
1298+
case <-reorgChan:
1299+
// A re-org has been detected, but the batch is fully
1300+
// confirmed and this is unexpected. Crash the batcher.
1301+
b.writeToErrChan(ctx, fmt.Errorf("unexpected reorg"))
1302+
1303+
case <-ctx.Done():
1304+
}
1305+
}()
1306+
1307+
return nil
1308+
}
1309+
12181310
func (b *Batcher) writeToErrChan(ctx context.Context, err error) {
12191311
select {
12201312
case b.errChan <- err:

sweepbatcher/sweep_batcher_test.go

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -887,9 +887,11 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
887887

888888
// Deliver sweep request to batcher.
889889
spendChan := make(chan *SpendDetail, 1)
890+
confErrChan := make(chan error)
890891
notifier = &SpendNotifier{
891892
SpendChan: spendChan,
892893
SpendErrChan: make(chan error, 1),
894+
ConfErrChan: confErrChan,
893895
QuitChan: make(chan bool, 1),
894896
}
895897
sweepReq1.Notifier = notifier
@@ -968,6 +970,10 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
968970
// Emulate a confirmation error.
969971
confReg.ErrChan <- testError
970972

973+
// Make sure the notifier gets the confirmation error.
974+
confErr := <-confErrChan
975+
require.ErrorIs(t, confErr, testError)
976+
971977
// Wait for the batcher to crash because of the confirmation error.
972978
runErr = <-runErrChan
973979
require.ErrorIs(t, runErr, testError)
@@ -986,9 +992,11 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
986992

987993
// Deliver sweep request to batcher.
988994
spendChan = make(chan *SpendDetail, 1)
995+
confChan := make(chan *chainntnfs.TxConfirmation)
989996
notifier = &SpendNotifier{
990997
SpendChan: spendChan,
991998
SpendErrChan: make(chan error, 1),
999+
ConfChan: confChan,
9921000
QuitChan: make(chan bool, 1),
9931001
}
9941002
sweepReq1.Notifier = notifier
@@ -1043,9 +1051,15 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
10431051

10441052
// We mock the tx confirmation notification.
10451053
lnd.ConfChannel <- &chainntnfs.TxConfirmation{
1046-
Tx: spendingTx,
1054+
BlockHeight: 604,
1055+
Tx: spendingTx,
10471056
}
10481057

1058+
// Make sure the notifier gets a confirmation notification.
1059+
conf := <-confChan
1060+
require.Equal(t, uint32(604), conf.BlockHeight)
1061+
require.Equal(t, spendingTx.TxHash(), conf.Tx.TxHash())
1062+
10491063
// Eventually the batch receives the confirmation notification and
10501064
// confirms itself.
10511065
require.Eventually(t, func() bool {
@@ -1060,9 +1074,11 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
10601074
// Now emulate adding the sweep again after it was fully confirmed.
10611075
// This triggers another code path (monitorSpendAndNotify).
10621076
spendChan = make(chan *SpendDetail, 1)
1077+
confChan = make(chan *chainntnfs.TxConfirmation)
10631078
notifier = &SpendNotifier{
10641079
SpendChan: spendChan,
10651080
SpendErrChan: make(chan error, 1),
1081+
ConfChan: confChan,
10661082
QuitChan: make(chan bool, 1),
10671083
}
10681084
sweepReq1.Notifier = notifier
@@ -1079,6 +1095,18 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
10791095
require.Equal(t, spendingTxHash, spending.Tx.TxHash())
10801096
require.Equal(t, btcutil.Amount(fee), spending.OnChainFeePortion)
10811097

1098+
// We mock the tx confirmation notification.
1099+
<-lnd.RegisterConfChannel
1100+
lnd.ConfChannel <- &chainntnfs.TxConfirmation{
1101+
BlockHeight: 604,
1102+
Tx: spendingTx,
1103+
}
1104+
1105+
// Make sure the notifier gets a confirmation notification.
1106+
conf = <-confChan
1107+
require.Equal(t, uint32(604), conf.BlockHeight)
1108+
require.Equal(t, spendingTx.TxHash(), conf.Tx.TxHash())
1109+
10821110
// Now check what happens in case of a spending error.
10831111
spendErrChan = make(chan error, 1)
10841112
notifier = &SpendNotifier{
@@ -1103,6 +1131,43 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore,
11031131
// Wait for the batcher to crash because of the spending error.
11041132
runErr = <-runErrChan
11051133
require.ErrorIs(t, runErr, testError)
1134+
1135+
// Now launch the batcher again.
1136+
batcher = NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
1137+
testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams,
1138+
batcherStore, sweepStore)
1139+
go func() {
1140+
runErrChan <- batcher.Run(ctx)
1141+
}()
1142+
1143+
// Now check what happens in case of a confirmation error.
1144+
confErrChan = make(chan error, 1)
1145+
notifier = &SpendNotifier{
1146+
SpendChan: make(chan *SpendDetail, 1),
1147+
SpendErrChan: make(chan error, 1),
1148+
ConfErrChan: confErrChan,
1149+
QuitChan: make(chan bool, 1),
1150+
}
1151+
sweepReq1.Notifier = notifier
1152+
require.NoError(t, batcher.AddSweep(ctx, &sweepReq1))
1153+
1154+
// Expect a spending registration.
1155+
<-lnd.RegisterSpendChannel
1156+
1157+
// We notify the spend.
1158+
lnd.SpendChannel <- spendDetail
1159+
1160+
// We mock the tx confirmation error notification.
1161+
confReg = <-lnd.RegisterConfChannel
1162+
confReg.ErrChan <- testError
1163+
1164+
// Make sure the notifier gets the confirmation error.
1165+
confErr = <-confErrChan
1166+
require.ErrorIs(t, confErr, testError)
1167+
1168+
// Wait for the batcher to crash because of the confirmation error.
1169+
runErr = <-runErrChan
1170+
require.ErrorIs(t, runErr, testError)
11061171
}
11071172

11081173
// wrappedLogger implements btclog.Logger, recording last debug message format.

0 commit comments

Comments
 (0)