Skip to content

Commit efe0aa0

Browse files
committed
lndclient: block until chain notifier is ready
LND commit c6f458e478f9 (v0.20.0-rc3) moved ChainNotifier startup later in the lifecycle, so RegisterBlockEpochNtfn callers now see "chain notifier RPC is still in the process of starting" coming from Recv(). The new BlockUntilChainNotifier config option repeatedly calls RegisterBlockEpochNtfn during startup and only proceeds once the stream yields its first block height, retrying solely when we detect the ErrChainNotifierServerNotActive condition introduced by the LND commit above.
1 parent 5e5921c commit efe0aa0

File tree

2 files changed

+153
-0
lines changed

2 files changed

+153
-0
lines changed

lnd_services.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,11 @@ type LndServicesConfig struct {
143143
// block download is still in progress.
144144
BlockUntilChainSynced bool
145145

146+
// BlockUntilChainNotifier indicates that the client should wait until
147+
// the ChainNotifier RPC is accepting subscriptions. This requires lnd
148+
// to be built with the "chainrpc" tag.
149+
BlockUntilChainNotifier bool
150+
146151
// BlockUntilUnlocked denotes that the NewLndServices function should
147152
// block until lnd is unlocked.
148153
BlockUntilUnlocked bool
@@ -453,6 +458,25 @@ func NewLndServices(cfg *LndServicesConfig) (*GrpcLndServices, error) {
453458
log.Infof("lnd is now fully synced to its chain backend")
454459
}
455460

461+
// If requested, wait until the chain notifier RPC is ready before we
462+
// return. This ensures sub-servers relying on the notifier don't fail
463+
// during startup.
464+
if cfg.BlockUntilChainNotifier {
465+
log.Infof("Waiting for chain notifier RPC to be ready")
466+
467+
err := services.waitForChainNotifier(
468+
cfg.CallerCtx, timeout, cfg.ChainSyncPollInterval,
469+
)
470+
if err != nil {
471+
cleanup()
472+
473+
return nil, fmt.Errorf("error waiting for chain "+
474+
"notifier readiness: %w", err)
475+
}
476+
477+
log.Infof("Chain notifier RPC is ready")
478+
}
479+
456480
return services, nil
457481
}
458482

@@ -533,6 +557,80 @@ func (s *GrpcLndServices) waitForChainSync(ctx context.Context,
533557
return <-update
534558
}
535559

560+
// waitForChainNotifier blocks until the ChainNotifier RPC accepts block epoch
561+
// subscriptions and delivers at least one block height.
562+
func (s *GrpcLndServices) waitForChainNotifier(ctx context.Context,
563+
timeout, pollInterval time.Duration) error {
564+
565+
register := s.ChainNotifier.RegisterBlockEpochNtfn
566+
567+
var errRetry = errors.New("retry RegisterBlockEpochNtfn")
568+
569+
// attempt is a single attempt to make a RegisterBlockEpochNtfn call.
570+
// It returns nil on success, errRetry if another retry is needed and
571+
// other error in case of a final error.
572+
attempt := func() error {
573+
subCtx, cancel := context.WithTimeout(ctx, timeout)
574+
defer cancel()
575+
576+
// Make new RegisterBlockEpochNtfn call.
577+
blockChan, errChan, err := register(subCtx)
578+
if err != nil {
579+
return fmt.Errorf("register block epoch ntfn: %w", err)
580+
}
581+
582+
// Wait for block height notification, which indicates success.
583+
select {
584+
case <-subCtx.Done():
585+
return subCtx.Err()
586+
587+
case err := <-errChan:
588+
// If chainNotifier is not ready yet, retry.
589+
if isChainNotifierStartingErr(err) {
590+
select {
591+
case <-time.After(pollInterval):
592+
return errRetry
593+
594+
case <-ctx.Done():
595+
return ctx.Err()
596+
}
597+
}
598+
599+
return err
600+
601+
// We got a block height. Success!
602+
case <-blockChan:
603+
return nil
604+
}
605+
}
606+
607+
// Main retry loop.
608+
for {
609+
log.Info("Trying to make RegisterBlockEpochNtfn and receive " +
610+
"the current height...")
611+
err := attempt()
612+
613+
switch {
614+
case err == nil:
615+
log.Info("RegisterBlockEpochNtfn returned a height. " +
616+
"Success!")
617+
618+
return nil
619+
620+
case errors.Is(err, errRetry):
621+
log.Info("ChainNotifier is not ready yet. Will retry")
622+
623+
continue
624+
625+
default:
626+
log.Info("RegisterBlockEpochNtfn returned unexpected "+
627+
"error %v. LND client failed to start!", err)
628+
629+
return err
630+
}
631+
}
632+
}
633+
536634
// getLndInfo queries lnd for information about the node it is connected to.
537635
// If the waitForUnlocked boolean is set, it will examine any errors returned
538636
// and back off if the failure is due to lnd currently being locked. Otherwise,
@@ -676,6 +774,37 @@ func IsUnlockError(err error) bool {
676774
return false
677775
}
678776

777+
// chainNotifierStartupMessage matches the error string returned by lnd
778+
// v0.20.0-rc3+ when a ChainNotifier RPC is invoked before the sub-server
779+
// finishes initialization.
780+
const chainNotifierStartupMessage = "chain notifier RPC is still in the " +
781+
"process of starting"
782+
783+
// isChainNotifierStartingErr reports whether err is due to the lnd
784+
// ChainNotifier sub-server still starting up. Starting with lnd v0.20.0-rc3
785+
// the notifier is initialised later in the daemon lifecycle, and the RPC layer
786+
// surfaces this as an Unknown gRPC status that contains the message defined in
787+
// chainNotifierStartupMessage. There is a PR in LND to return code Unavailable
788+
// instead of Unknown: https://github.com/lightningnetwork/lnd/pull/10352
789+
func isChainNotifierStartingErr(err error) bool {
790+
if err == nil {
791+
return false
792+
}
793+
794+
// gRPC code Unavailable means "the server can't handle this request
795+
// now, retry later". LND's chain notifier returns this error when
796+
// the server is starting.
797+
// See https://github.com/lightningnetwork/lnd/pull/10352
798+
st, ok := status.FromError(err)
799+
if ok && st.Code() == codes.Unavailable {
800+
return true
801+
}
802+
803+
// TODO(ln-v0.20.0) remove the string fallback once lndclient depends on
804+
// a version of lnd that returns codes.Unavailable for this condition.
805+
return strings.Contains(err.Error(), chainNotifierStartupMessage)
806+
}
807+
679808
// checkLndCompatibility makes sure the connected lnd instance is running on the
680809
// correct network, has the version RPC implemented, is the correct minimal
681810
// version and supports all required build tags/subservers.

lnd_services_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,3 +361,27 @@ func TestCustomMacaroonHex(t *testing.T) {
361361
_, err = NewLndServices(testCfg)
362362
require.Error(t, err, "must set only one")
363363
}
364+
365+
// TestIsChainNotifierStartingErr ensures we correctly detect the startup lag
366+
// error returned by lnd v0.20.0-rc3+.
367+
func TestIsChainNotifierStartingErr(t *testing.T) {
368+
t.Parallel()
369+
370+
require.True(t, isChainNotifierStartingErr(
371+
status.Error(codes.Unavailable, chainNotifierStartupMessage),
372+
))
373+
374+
require.True(t, isChainNotifierStartingErr(
375+
status.Error(codes.Unknown, chainNotifierStartupMessage),
376+
))
377+
378+
require.True(t, isChainNotifierStartingErr(
379+
status.Error(codes.Unavailable, "some other error"),
380+
))
381+
382+
require.False(t, isChainNotifierStartingErr(nil))
383+
384+
require.False(t, isChainNotifierStartingErr(
385+
status.Error(codes.Unknown, "some other error"),
386+
))
387+
}

0 commit comments

Comments
 (0)