Skip to content

Commit c78dd2f

Browse files
committed
executor: move block registration retries to utils
This function will be reused in many other places.
1 parent 16d56f1 commit c78dd2f

File tree

4 files changed

+182
-31
lines changed

4 files changed

+182
-31
lines changed

executor.go

Lines changed: 6 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"strings"
87
"sync"
98
"sync/atomic"
109
"time"
@@ -14,6 +13,7 @@ import (
1413
"github.com/lightninglabs/loop/loopdb"
1514
"github.com/lightninglabs/loop/sweep"
1615
"github.com/lightninglabs/loop/sweepbatcher"
16+
"github.com/lightninglabs/loop/utils"
1717
"github.com/lightningnetwork/lnd/lntypes"
1818
"github.com/lightningnetwork/lnd/queue"
1919
)
@@ -67,40 +67,15 @@ func (s *executor) run(mainCtx context.Context,
6767
statusChan chan<- SwapInfo,
6868
abandonChans map[lntypes.Hash]chan struct{}) error {
6969

70-
var (
71-
err error
72-
blockEpochChan <-chan int32
73-
blockErrorChan <-chan error
74-
batcherErrChan chan error
70+
blockEpochChan, blockErrorChan, err := utils.RegisterBlockEpochNtfnWithRetry(
71+
mainCtx, s.lnd.ChainNotifier,
7572
)
76-
77-
for {
78-
blockEpochChan, blockErrorChan, err =
79-
s.lnd.ChainNotifier.RegisterBlockEpochNtfn(mainCtx)
80-
if err == nil {
81-
break
82-
}
83-
84-
if strings.Contains(err.Error(),
85-
"in the process of starting") {
86-
87-
log.Warnf("LND chain notifier server not ready yet, " +
88-
"retrying with delay")
89-
90-
// Give chain notifier some time to start and try to
91-
// re-attempt block epoch subscription.
92-
select {
93-
case <-time.After(500 * time.Millisecond):
94-
continue
95-
96-
case <-mainCtx.Done():
97-
return err
98-
}
99-
}
100-
73+
if err != nil {
10174
return err
10275
}
10376

77+
var batcherErrChan chan error
78+
10479
// Before starting, make sure we have an up-to-date block height.
10580
// Otherwise, we might reveal a preimage for a swap that is already
10681
// expired.

utils/chainnotifier.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package utils
2+
3+
import (
4+
"context"
5+
"strings"
6+
"time"
7+
8+
"google.golang.org/grpc/status"
9+
)
10+
11+
const (
12+
// chainNotifierStartupMessage is returned by lnd while the chain
13+
// notifier RPC sub-server is initialising.
14+
chainNotifierStartupMessage = "chain notifier RPC is still in the " +
15+
"process of starting"
16+
17+
// chainNotifierRetryBackoff is the delay used between subscription
18+
// attempts while the chain notifier is still starting.
19+
chainNotifierRetryBackoff = 500 * time.Millisecond
20+
)
21+
22+
// BlockEpochRegistrar represents the ability to subscribe to block epoch
23+
// notifications.
24+
type BlockEpochRegistrar interface {
25+
RegisterBlockEpochNtfn(ctx context.Context) (chan int32, chan error,
26+
error)
27+
}
28+
29+
// RegisterBlockEpochNtfnWithRetry keeps retrying block epoch subscriptions as
30+
// long as lnd reports that the chain notifier sub-server is still starting.
31+
func RegisterBlockEpochNtfnWithRetry(ctx context.Context,
32+
registrar BlockEpochRegistrar) (chan int32, chan error, error) {
33+
34+
for {
35+
blockChan, errChan, err := registrar.RegisterBlockEpochNtfn(ctx)
36+
if err == nil {
37+
return blockChan, errChan, nil
38+
}
39+
40+
if !isChainNotifierStartingErr(err) {
41+
return nil, nil, err
42+
}
43+
44+
log.Warnf("Chain notifier RPC not ready yet, retrying: %v",
45+
err)
46+
47+
select {
48+
case <-time.After(chainNotifierRetryBackoff):
49+
continue
50+
51+
case <-ctx.Done():
52+
return nil, nil, ctx.Err()
53+
}
54+
}
55+
}
56+
57+
// isChainNotifierStartingErr checks whether an error indicates that lnd's chain
58+
// notifier has not started yet.
59+
func isChainNotifierStartingErr(err error) bool {
60+
if err == nil {
61+
return false
62+
}
63+
64+
st, ok := status.FromError(err)
65+
if ok && strings.Contains(st.Message(), chainNotifierStartupMessage) {
66+
return true
67+
}
68+
69+
return strings.Contains(err.Error(), chainNotifierStartupMessage)
70+
}

utils/chainnotifier_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package utils
2+
3+
import (
4+
"context"
5+
"sync"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/require"
10+
"google.golang.org/grpc/codes"
11+
"google.golang.org/grpc/status"
12+
)
13+
14+
// stubBlockEpochRegistrar implements BlockEpochRegistrar and records the
15+
// number of attempts before succeeding.
16+
type stubBlockEpochRegistrar struct {
17+
mu sync.Mutex
18+
attempts int
19+
succeedAfter int
20+
}
21+
22+
// RegisterBlockEpochNtfn simulates a chain notifier that returns a startup
23+
// error until the configured number of attempts has been exhausted.
24+
func (s *stubBlockEpochRegistrar) RegisterBlockEpochNtfn(
25+
context.Context) (chan int32, chan error, error) {
26+
27+
s.mu.Lock()
28+
defer s.mu.Unlock()
29+
30+
s.attempts++
31+
if s.attempts <= s.succeedAfter {
32+
return nil, nil, status.Error(
33+
codes.Unknown, chainNotifierStartupMessage,
34+
)
35+
}
36+
37+
return make(chan int32), make(chan error), nil
38+
}
39+
40+
// Attempts returns the total number of registration attempts made.
41+
func (s *stubBlockEpochRegistrar) Attempts() int {
42+
s.mu.Lock()
43+
defer s.mu.Unlock()
44+
45+
return s.attempts
46+
}
47+
48+
// TestRegisterBlockEpochNtfnWithRetry ensures we retry until the notifier
49+
// becomes available.
50+
func TestRegisterBlockEpochNtfnWithRetry(t *testing.T) {
51+
t.Parallel()
52+
53+
ctx, cancel := context.WithTimeout(t.Context(), time.Second)
54+
t.Cleanup(cancel)
55+
56+
stub := &stubBlockEpochRegistrar{
57+
succeedAfter: 1,
58+
}
59+
60+
blockChan, errChan, err := RegisterBlockEpochNtfnWithRetry(ctx, stub)
61+
require.NoError(t, err)
62+
require.NotNil(t, blockChan)
63+
require.NotNil(t, errChan)
64+
require.Equal(t, 2, stub.Attempts())
65+
}
66+
67+
// TestRegisterBlockEpochNtfnWithRetryContextCancel ensures we propagate the
68+
// caller's context error if the notifier never becomes ready.
69+
func TestRegisterBlockEpochNtfnWithRetryContextCancel(t *testing.T) {
70+
t.Parallel()
71+
72+
ctx, cancel := context.WithTimeout(t.Context(), 50*time.Millisecond)
73+
t.Cleanup(cancel)
74+
75+
stub := &stubBlockEpochRegistrar{
76+
succeedAfter: 100,
77+
}
78+
79+
_, _, err := RegisterBlockEpochNtfnWithRetry(ctx, stub)
80+
require.ErrorIs(t, err, context.DeadlineExceeded)
81+
require.GreaterOrEqual(t, stub.Attempts(), 1)
82+
}

utils/log.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package utils
2+
3+
import (
4+
"github.com/btcsuite/btclog/v2"
5+
"github.com/lightningnetwork/lnd/build"
6+
)
7+
8+
// Subsystem defines the sub system name of this package.
9+
const Subsystem = "UTILS"
10+
11+
// log is a logger that is initialized with no output filters. This means the
12+
// package will not perform any logging by default until the caller requests it.
13+
var log btclog.Logger
14+
15+
// The default amount of logging is none.
16+
func init() {
17+
UseLogger(build.NewSubLogger(Subsystem, nil))
18+
}
19+
20+
// UseLogger uses a specified Logger to output package logging info. This should
21+
// be used in preference to SetLogWriter if the caller is also using btclog.
22+
func UseLogger(logger btclog.Logger) {
23+
log = logger
24+
}

0 commit comments

Comments
 (0)