Skip to content

Commit 90b49c7

Browse files
committed
loopd: gate manager startup on chain notifier readiness
- add a readiness probe in Daemon.initialize that blocks until lnd’s chain notifier accepts (and maintains) a block-epoch subscription before starting static-address managers - treat initial RegisterBlockEpochNtfn RPC failures and early stream shutdowns with exponential backoff, logging retries without crashing loopd - keep a background drainer running on the readiness subscription so the notifier stays happy once startup proceeds
1 parent b8d6583 commit 90b49c7

File tree

1 file changed

+116
-0
lines changed

1 file changed

+116
-0
lines changed

loopd/daemon.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import (
3333
"github.com/lightningnetwork/lnd/macaroons"
3434
"go.etcd.io/bbolt"
3535
"google.golang.org/grpc"
36+
"google.golang.org/grpc/codes"
37+
"google.golang.org/grpc/status"
3638
"google.golang.org/protobuf/encoding/protojson"
3739
"gopkg.in/macaroon-bakery.v2/bakery"
3840
)
@@ -45,6 +47,9 @@ var (
4547
// errOnlyStartOnce is the error that is returned if the daemon is
4648
// started more than once.
4749
errOnlyStartOnce = fmt.Errorf("daemon can only be started once")
50+
51+
chainNotifierStartupMessage = "chain notifier RPC is still in the " +
52+
"process of starting"
4853
)
4954

5055
// ListenerCfg holds closures used to retrieve listeners for the gRPC services.
@@ -404,6 +409,15 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
404409

405410
infof("Swap server address: %v", d.cfg.Server.Host)
406411

412+
infof("Waiting for chain notifier to become available 3")
413+
err := waitForChainNotifierReady(
414+
d.mainCtx, d.lnd.ChainNotifier,
415+
)
416+
if err != nil {
417+
return fmt.Errorf("chain notifier unavailable: %w", err)
418+
}
419+
infof("Chain notifier ready")
420+
407421
// Check if we need to migrate the database.
408422
if needSqlMigration(d.cfg) {
409423
infof("Boltdb found, running migration")
@@ -1095,3 +1109,105 @@ func allowCORS(handler http.Handler, origin string) http.Handler {
10951109
handler.ServeHTTP(w, r)
10961110
})
10971111
}
1112+
1113+
// waitForChainNotifierReady blocks until lnd's chain notifier accepts a block
1114+
// epoch subscription or the provided context is canceled.
1115+
func waitForChainNotifierReady(ctx context.Context,
1116+
notifier lndclient.ChainNotifierClient) error {
1117+
1118+
const (
1119+
initialBackoff = 200 * time.Millisecond
1120+
maxBackoff = 5 * time.Second
1121+
streamGrace = 2 * time.Second
1122+
)
1123+
1124+
backoff := initialBackoff
1125+
1126+
for {
1127+
select {
1128+
case <-ctx.Done():
1129+
return ctx.Err()
1130+
1131+
default:
1132+
}
1133+
1134+
attemptCtx, cancel := context.WithCancel(ctx)
1135+
blockChan, errChan, err := notifier.RegisterBlockEpochNtfn(attemptCtx)
1136+
1137+
if err != nil {
1138+
cancel()
1139+
1140+
isStartupErr := status.Code(err) == codes.Unavailable ||
1141+
strings.Contains(err.Error(),
1142+
chainNotifierStartupMessage)
1143+
1144+
if !isStartupErr {
1145+
return err
1146+
}
1147+
1148+
warnf("Chain notifier RPC not ready, retrying in %v: %v",
1149+
backoff, err)
1150+
} else {
1151+
// Registration succeeded; wait briefly to ensure the
1152+
// notifier keeps the stream open. If it pushes the startup
1153+
// error down errChan, retry as well.
1154+
timer := time.NewTimer(streamGrace)
1155+
1156+
select {
1157+
case <-blockChan:
1158+
go drainReadinessNtfn(
1159+
attemptCtx, cancel, blockChan, errChan,
1160+
)
1161+
return nil
1162+
1163+
case err = <-errChan:
1164+
cancel()
1165+
1166+
warnf("Chain notifier stream ended early, "+
1167+
"retrying: %v", err)
1168+
1169+
case <-timer.C:
1170+
go drainReadinessNtfn(
1171+
attemptCtx, cancel, blockChan, errChan,
1172+
)
1173+
return nil
1174+
1175+
case <-ctx.Done():
1176+
cancel()
1177+
return ctx.Err()
1178+
}
1179+
}
1180+
1181+
select {
1182+
case <-time.After(backoff):
1183+
1184+
case <-ctx.Done():
1185+
return ctx.Err()
1186+
}
1187+
1188+
backoff *= 2
1189+
if backoff > maxBackoff {
1190+
backoff = maxBackoff
1191+
}
1192+
}
1193+
}
1194+
1195+
// drainReadinessNtfn discards notifications until the daemon shuts down,
1196+
// allowing the readiness subscription to stay open without affecting lnd logs.
1197+
func drainReadinessNtfn(ctx context.Context, cancel func(),
1198+
blockChan <-chan int32, errChan <-chan error) {
1199+
1200+
defer cancel()
1201+
1202+
for {
1203+
select {
1204+
case <-blockChan:
1205+
1206+
case <-errChan:
1207+
return
1208+
1209+
case <-ctx.Done():
1210+
return
1211+
}
1212+
}
1213+
}

0 commit comments

Comments
 (0)