Skip to content

Commit eaf0b75

Browse files
authored
Merge pull request #255 from starius/block-until-chain-notifier-is-ready
lndclient: block until chain notifier is ready
2 parents 5e5921c + d4b697c commit eaf0b75

File tree

2 files changed

+150
-0
lines changed

2 files changed

+150
-0
lines changed

lnd_services.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,11 @@ type LndServicesConfig struct {
143143
// block download is still in progress.
144144
BlockUntilChainSynced bool
145145

146+
// BlockUntilChainNotifier indicates that the client should wait until
147+
// the ChainNotifier RPC is accepting subscriptions. This requires lnd
148+
// to be built with the "chainrpc" tag.
149+
BlockUntilChainNotifier bool
150+
146151
// BlockUntilUnlocked denotes that the NewLndServices function should
147152
// block until lnd is unlocked.
148153
BlockUntilUnlocked bool
@@ -453,6 +458,25 @@ func NewLndServices(cfg *LndServicesConfig) (*GrpcLndServices, error) {
453458
log.Infof("lnd is now fully synced to its chain backend")
454459
}
455460

461+
// If requested, wait until the chain notifier RPC is ready before we
462+
// return. This ensures sub-servers relying on the notifier don't fail
463+
// during startup.
464+
if cfg.BlockUntilChainNotifier {
465+
log.Infof("Waiting for chain notifier RPC to be ready")
466+
467+
err := services.waitForChainNotifier(
468+
cfg.CallerCtx, timeout, cfg.ChainSyncPollInterval,
469+
)
470+
if err != nil {
471+
cleanup()
472+
473+
return nil, fmt.Errorf("error waiting for chain "+
474+
"notifier readiness: %w", err)
475+
}
476+
477+
log.Infof("Chain notifier RPC is ready")
478+
}
479+
456480
return services, nil
457481
}
458482

@@ -533,6 +557,77 @@ func (s *GrpcLndServices) waitForChainSync(ctx context.Context,
533557
return <-update
534558
}
535559

560+
// waitForChainNotifier blocks until the ChainNotifier RPC accepts block epoch
561+
// subscriptions and delivers at least one block height.
562+
func (s *GrpcLndServices) waitForChainNotifier(ctx context.Context,
563+
timeout, pollInterval time.Duration) error {
564+
565+
register := s.ChainNotifier.RegisterBlockEpochNtfn
566+
567+
var errRetry = errors.New("retry RegisterBlockEpochNtfn")
568+
569+
// attempt is a single attempt to make a RegisterBlockEpochNtfn call.
570+
// It returns nil on success, errRetry if another retry is needed and
571+
// other error in case of a final error.
572+
attempt := func() error {
573+
subCtx, cancel := context.WithTimeout(ctx, timeout)
574+
defer cancel()
575+
576+
// Make new RegisterBlockEpochNtfn call.
577+
blockChan, errChan, err := register(subCtx)
578+
if err != nil {
579+
return fmt.Errorf("register block epoch ntfn: %w", err)
580+
}
581+
582+
// Wait for block height notification, which indicates success.
583+
select {
584+
case <-subCtx.Done():
585+
return subCtx.Err()
586+
587+
case err := <-errChan:
588+
// If chainNotifier is not ready yet, retry.
589+
if isChainNotifierStartingErr(err) {
590+
select {
591+
case <-time.After(pollInterval):
592+
return errRetry
593+
594+
case <-ctx.Done():
595+
return ctx.Err()
596+
}
597+
}
598+
599+
return err
600+
601+
// We got a block height. Success!
602+
case <-blockChan:
603+
return nil
604+
}
605+
}
606+
607+
// Main retry loop.
608+
for {
609+
log.Info("Trying to make RegisterBlockEpochNtfn and receive " +
610+
"the current height...")
611+
err := attempt()
612+
613+
if errors.Is(err, errRetry) {
614+
log.Info("ChainNotifier is not ready yet. Will retry")
615+
616+
continue
617+
} else if err != nil {
618+
log.Info("RegisterBlockEpochNtfn returned unexpected "+
619+
"error %v. LND client failed to start!", err)
620+
621+
return err
622+
}
623+
624+
log.Info("RegisterBlockEpochNtfn returned a height. Success!")
625+
break
626+
}
627+
628+
return nil
629+
}
630+
536631
// getLndInfo queries lnd for information about the node it is connected to.
537632
// If the waitForUnlocked boolean is set, it will examine any errors returned
538633
// and back off if the failure is due to lnd currently being locked. Otherwise,
@@ -676,6 +771,37 @@ func IsUnlockError(err error) bool {
676771
return false
677772
}
678773

774+
// chainNotifierStartupMessage matches the error string returned by lnd
775+
// v0.20.0-rc3+ when a ChainNotifier RPC is invoked before the sub-server
776+
// finishes initialization.
777+
const chainNotifierStartupMessage = "chain notifier RPC is still in the " +
778+
"process of starting"
779+
780+
// isChainNotifierStartingErr reports whether err is due to the lnd
781+
// ChainNotifier sub-server still starting up. Starting with lnd v0.20.0-rc3
782+
// the notifier is initialised later in the daemon lifecycle, and the RPC layer
783+
// surfaces this as an Unknown gRPC status that contains the message defined in
784+
// chainNotifierStartupMessage. There is a PR in LND to return code Unavailable
785+
// instead of Unknown: https://github.com/lightningnetwork/lnd/pull/10352
786+
func isChainNotifierStartingErr(err error) bool {
787+
if err == nil {
788+
return false
789+
}
790+
791+
// gRPC code Unavailable means "the server can't handle this request
792+
// now, retry later". LND's chain notifier returns this error when
793+
// the server is starting.
794+
// See https://github.com/lightningnetwork/lnd/pull/10352
795+
st, ok := status.FromError(err)
796+
if ok && st.Code() == codes.Unavailable {
797+
return true
798+
}
799+
800+
// TODO(ln-v0.20.0) remove the string fallback once lndclient depends on
801+
// a version of lnd that returns codes.Unavailable for this condition.
802+
return strings.Contains(err.Error(), chainNotifierStartupMessage)
803+
}
804+
679805
// checkLndCompatibility makes sure the connected lnd instance is running on the
680806
// correct network, has the version RPC implemented, is the correct minimal
681807
// version and supports all required build tags/subservers.

lnd_services_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,3 +361,27 @@ func TestCustomMacaroonHex(t *testing.T) {
361361
_, err = NewLndServices(testCfg)
362362
require.Error(t, err, "must set only one")
363363
}
364+
365+
// TestIsChainNotifierStartingErr ensures we correctly detect the startup lag
366+
// error returned by lnd v0.20.0-rc3+.
367+
func TestIsChainNotifierStartingErr(t *testing.T) {
368+
t.Parallel()
369+
370+
require.True(t, isChainNotifierStartingErr(
371+
status.Error(codes.Unavailable, chainNotifierStartupMessage),
372+
))
373+
374+
require.True(t, isChainNotifierStartingErr(
375+
status.Error(codes.Unknown, chainNotifierStartupMessage),
376+
))
377+
378+
require.True(t, isChainNotifierStartingErr(
379+
status.Error(codes.Unavailable, "some other error"),
380+
))
381+
382+
require.False(t, isChainNotifierStartingErr(nil))
383+
384+
require.False(t, isChainNotifierStartingErr(
385+
status.Error(codes.Unknown, "some other error"),
386+
))
387+
}

0 commit comments

Comments
 (0)