Skip to content

Commit 60c2e6e

Browse files
committed
lndclient: block until chain notifier is ready
LND commit c6f458e478f9 (v0.20.0-rc3) moved ChainNotifier startup later in the lifecycle, so RegisterBlockEpochNtfn callers now see "chain notifier RPC is still in the process of starting" coming from Recv(). The new BlockUntilChainNotifier config option repeatedly calls RegisterBlockEpochNtfn during startup and only proceeds once the stream yields its first block height, retrying solely when we detect the ErrChainNotifierServerNotActive condition introduced by the LND commit above.
1 parent 5e5921c commit 60c2e6e

File tree

2 files changed

+143
-0
lines changed

2 files changed

+143
-0
lines changed

lnd_services.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,11 @@ type LndServicesConfig struct {
143143
// block download is still in progress.
144144
BlockUntilChainSynced bool
145145

146+
// BlockUntilChainNotifier indicates that the client should wait until
147+
// the ChainNotifier RPC is accepting subscriptions. This requires lnd
148+
// to be built with the "chainrpc" tag.
149+
BlockUntilChainNotifier bool
150+
146151
// BlockUntilUnlocked denotes that the NewLndServices function should
147152
// block until lnd is unlocked.
148153
BlockUntilUnlocked bool
@@ -453,6 +458,25 @@ func NewLndServices(cfg *LndServicesConfig) (*GrpcLndServices, error) {
453458
log.Infof("lnd is now fully synced to its chain backend")
454459
}
455460

461+
// If requested, wait until the chain notifier RPC is ready before we
462+
// return. This ensures sub-servers relying on the notifier don't fail
463+
// during startup.
464+
if cfg.BlockUntilChainNotifier {
465+
log.Infof("Waiting for chain notifier RPC to be ready")
466+
467+
err := services.waitForChainNotifier(
468+
cfg.CallerCtx, timeout, cfg.ChainSyncPollInterval,
469+
)
470+
if err != nil {
471+
cleanup()
472+
473+
return nil, fmt.Errorf("error waiting for chain "+
474+
"notifier readiness: %w", err)
475+
}
476+
477+
log.Infof("Chain notifier RPC is ready")
478+
}
479+
456480
return services, nil
457481
}
458482

@@ -533,6 +557,70 @@ func (s *GrpcLndServices) waitForChainSync(ctx context.Context,
533557
return <-update
534558
}
535559

560+
// waitForChainNotifier blocks until the ChainNotifier RPC accepts block epoch
561+
// subscriptions and delivers at least one block height.
562+
func (s *GrpcLndServices) waitForChainNotifier(ctx context.Context,
563+
timeout, pollInterval time.Duration) error {
564+
565+
register := s.ChainNotifier.RegisterBlockEpochNtfn
566+
567+
var errRetry = errors.New("retry RegisterBlockEpochNtfn")
568+
569+
// attempt is a single attempt to make a RegisterBlockEpochNtfn call.
570+
// It returns nil on success, errRetry if another retry is needed and
571+
// other error in case of a final error.
572+
attempt := func() error {
573+
subCtx, cancel := context.WithTimeout(ctx, timeout)
574+
defer cancel()
575+
576+
// Make new RegisterBlockEpochNtfn call.
577+
blockChan, errChan, err := register(subCtx)
578+
if err != nil {
579+
return fmt.Errorf("register block epoch ntfn: %w", err)
580+
}
581+
582+
// Wait for block height notification, which indicates success.
583+
select {
584+
case <-subCtx.Done():
585+
return subCtx.Err()
586+
587+
case err := <-errChan:
588+
// If chainNotifier is not ready yet, retry.
589+
if isChainNotifierStartingErr(err) {
590+
select {
591+
case <-time.After(pollInterval):
592+
return errRetry
593+
594+
case <-ctx.Done():
595+
return ctx.Err()
596+
}
597+
}
598+
599+
return err
600+
601+
// We got a block height. Success!
602+
case <-blockChan:
603+
return nil
604+
}
605+
}
606+
607+
// Main retry loop.
608+
for {
609+
err := attempt()
610+
611+
switch {
612+
case err == nil:
613+
return nil
614+
615+
case errors.Is(err, errRetry):
616+
continue
617+
618+
default:
619+
return err
620+
}
621+
}
622+
}
623+
536624
// getLndInfo queries lnd for information about the node it is connected to.
537625
// If the waitForUnlocked boolean is set, it will examine any errors returned
538626
// and back off if the failure is due to lnd currently being locked. Otherwise,
@@ -676,6 +764,37 @@ func IsUnlockError(err error) bool {
676764
return false
677765
}
678766

767+
// chainNotifierStartupMessage matches the error string returned by lnd
768+
// v0.20.0-rc3+ when a ChainNotifier RPC is invoked before the sub-server
769+
// finishes initialization.
770+
const chainNotifierStartupMessage = "chain notifier RPC is still in the " +
771+
"process of starting"
772+
773+
// isChainNotifierStartingErr reports whether err is due to the lnd
774+
// ChainNotifier sub-server still starting up. Starting with lnd v0.20.0-rc3
775+
// the notifier is initialised later in the daemon lifecycle, and the RPC layer
776+
// surfaces this as an Unknown gRPC status that contains the message defined in
777+
// chainNotifierStartupMessage. There is a PR in LND to return code Unavailable
778+
// instead of Unknown: https://github.com/lightningnetwork/lnd/pull/10352
779+
func isChainNotifierStartingErr(err error) bool {
780+
if err == nil {
781+
return false
782+
}
783+
784+
// gRPC code Unavailable means "the server can't handle this request
785+
// now, retry later". LND's chain notifier returns this error when
786+
// the server is starting.
787+
// See https://github.com/lightningnetwork/lnd/pull/10352
788+
st, ok := status.FromError(err)
789+
if ok && st.Code() == codes.Unavailable {
790+
return true
791+
}
792+
793+
// TODO(ln-v0.20.0) remove the string fallback once lndclient depends on
794+
// a version of lnd that returns codes.Unavailable for this condition.
795+
return strings.Contains(err.Error(), chainNotifierStartupMessage)
796+
}
797+
679798
// checkLndCompatibility makes sure the connected lnd instance is running on the
680799
// correct network, has the version RPC implemented, is the correct minimal
681800
// version and supports all required build tags/subservers.

lnd_services_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,3 +361,27 @@ func TestCustomMacaroonHex(t *testing.T) {
361361
_, err = NewLndServices(testCfg)
362362
require.Error(t, err, "must set only one")
363363
}
364+
365+
// TestIsChainNotifierStartingErr ensures we correctly detect the startup lag
366+
// error returned by lnd v0.20.0-rc3+.
367+
func TestIsChainNotifierStartingErr(t *testing.T) {
368+
t.Parallel()
369+
370+
require.True(t, isChainNotifierStartingErr(
371+
status.Error(codes.Unavailable, chainNotifierStartupMessage),
372+
))
373+
374+
require.True(t, isChainNotifierStartingErr(
375+
status.Error(codes.Unknown, chainNotifierStartupMessage),
376+
))
377+
378+
require.True(t, isChainNotifierStartingErr(
379+
status.Error(codes.Unavailable, "some other error"),
380+
))
381+
382+
require.False(t, isChainNotifierStartingErr(nil))
383+
384+
require.False(t, isChainNotifierStartingErr(
385+
status.Error(codes.Unknown, "some other error"),
386+
))
387+
}

0 commit comments

Comments
 (0)