Skip to content

Commit 0e54c01

Browse files
committed
add chain notifier readiness guard
- add waitForChainNotifierReady helper that retries RegisterBlockEpochNtfn until the notifier accepts a stream or the context cancels - call the helper immediately after the full lnd client connects so every integrated subserver starts only after lnd’s chain notifier is ready
1 parent f0124e8 commit 0e54c01

File tree

2 files changed

+125
-0
lines changed

2 files changed

+125
-0
lines changed

chain_notifier_wait.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package terminal
2+
3+
import (
4+
"context"
5+
"strings"
6+
"time"
7+
8+
"github.com/lightninglabs/lndclient"
9+
"google.golang.org/grpc/codes"
10+
"google.golang.org/grpc/status"
11+
)
12+
13+
const (
14+
chainNotifierStartupMessage = "chain notifier RPC is still in the " +
15+
"process of starting"
16+
)
17+
18+
// waitForChainNotifierReady blocks until lnd's chain notifier accepts a block
19+
// epoch subscription or the provided context is canceled.
20+
func waitForChainNotifierReady(ctx context.Context,
21+
notifier lndclient.ChainNotifierClient) error {
22+
23+
const (
24+
initialBackoff = 200 * time.Millisecond
25+
maxBackoff = 5 * time.Second
26+
streamGrace = 2 * time.Second
27+
)
28+
29+
backoff := initialBackoff
30+
31+
for {
32+
select {
33+
case <-ctx.Done():
34+
return ctx.Err()
35+
36+
default:
37+
}
38+
39+
attemptCtx, cancel := context.WithCancel(ctx)
40+
blockChan, errChan, err := notifier.RegisterBlockEpochNtfn(attemptCtx)
41+
switch {
42+
case err != nil:
43+
cancel()
44+
45+
isStartupErr := status.Code(err) == codes.Unavailable ||
46+
strings.Contains(err.Error(),
47+
chainNotifierStartupMessage)
48+
if !isStartupErr {
49+
return err
50+
}
51+
52+
log.Warnf("Chain notifier RPC not ready, retrying in %v: %v",
53+
backoff, err)
54+
55+
default:
56+
timer := time.NewTimer(streamGrace)
57+
58+
select {
59+
case <-blockChan:
60+
go drainReadinessNtfn(
61+
attemptCtx, cancel, blockChan, errChan,
62+
)
63+
return nil
64+
65+
case err = <-errChan:
66+
cancel()
67+
68+
log.Warnf("Chain notifier stream ended early, "+
69+
"retrying: %v", err)
70+
71+
case <-timer.C:
72+
go drainReadinessNtfn(
73+
attemptCtx, cancel, blockChan, errChan,
74+
)
75+
return nil
76+
77+
case <-ctx.Done():
78+
cancel()
79+
return ctx.Err()
80+
}
81+
}
82+
83+
select {
84+
case <-time.After(backoff):
85+
86+
case <-ctx.Done():
87+
return ctx.Err()
88+
}
89+
90+
backoff *= 2
91+
if backoff > maxBackoff {
92+
backoff = maxBackoff
93+
}
94+
}
95+
}
96+
97+
// drainReadinessNtfn discards notifications until the daemon shuts down,
98+
// allowing the readiness subscription to stay open without affecting lnd logs.
99+
func drainReadinessNtfn(ctx context.Context, cancel func(),
100+
blockChan <-chan int32, errChan <-chan error) {
101+
102+
defer cancel()
103+
104+
for {
105+
select {
106+
case <-blockChan:
107+
108+
case <-errChan:
109+
return
110+
111+
case <-ctx.Done():
112+
return
113+
}
114+
}
115+
}

terminal.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -956,6 +956,16 @@ func (g *LightningTerminal) setupFullLNDClient(ctx context.Context,
956956
if err == nil {
957957
log.Infof("Full lnd client connected")
958958

959+
log.Infof("Waiting for chain notifier to become available")
960+
err = waitForChainNotifierReady(
961+
ctx, g.lndClient.ChainNotifier,
962+
)
963+
if err != nil {
964+
return fmt.Errorf("waiting for chain notifier: %w",
965+
err)
966+
}
967+
log.Infof("Chain notifier ready")
968+
959969
break
960970
}
961971

0 commit comments

Comments
 (0)