diff --git a/lnd_services.go b/lnd_services.go index 9ef5653..ecb5268 100644 --- a/lnd_services.go +++ b/lnd_services.go @@ -143,6 +143,11 @@ type LndServicesConfig struct { // block download is still in progress. BlockUntilChainSynced bool + // BlockUntilChainNotifier indicates that the client should wait until + // the ChainNotifier RPC is accepting subscriptions. This requires lnd + // to be built with the "chainrpc" tag. + BlockUntilChainNotifier bool + // BlockUntilUnlocked denotes that the NewLndServices function should // block until lnd is unlocked. BlockUntilUnlocked bool @@ -453,6 +458,25 @@ func NewLndServices(cfg *LndServicesConfig) (*GrpcLndServices, error) { log.Infof("lnd is now fully synced to its chain backend") } + // If requested, wait until the chain notifier RPC is ready before we + // return. This ensures sub-servers relying on the notifier don't fail + // during startup. + if cfg.BlockUntilChainNotifier { + log.Infof("Waiting for chain notifier RPC to be ready") + + err := services.waitForChainNotifier( + cfg.CallerCtx, timeout, cfg.ChainSyncPollInterval, + ) + if err != nil { + cleanup() + + return nil, fmt.Errorf("error waiting for chain "+ + "notifier readiness: %w", err) + } + + log.Infof("Chain notifier RPC is ready") + } + return services, nil } @@ -533,6 +557,77 @@ func (s *GrpcLndServices) waitForChainSync(ctx context.Context, return <-update } +// waitForChainNotifier blocks until the ChainNotifier RPC accepts block epoch +// subscriptions and delivers at least one block height. +func (s *GrpcLndServices) waitForChainNotifier(ctx context.Context, + timeout, pollInterval time.Duration) error { + + register := s.ChainNotifier.RegisterBlockEpochNtfn + + var errRetry = errors.New("retry RegisterBlockEpochNtfn") + + // attempt is a single attempt to make a RegisterBlockEpochNtfn call. + // It returns nil on success, errRetry if another retry is needed and + // other error in case of a final error. + attempt := func() error { + subCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + // Make new RegisterBlockEpochNtfn call. + blockChan, errChan, err := register(subCtx) + if err != nil { + return fmt.Errorf("register block epoch ntfn: %w", err) + } + + // Wait for block height notification, which indicates success. + select { + case <-subCtx.Done(): + return subCtx.Err() + + case err := <-errChan: + // If chainNotifier is not ready yet, retry. + if isChainNotifierStartingErr(err) { + select { + case <-time.After(pollInterval): + return errRetry + + case <-ctx.Done(): + return ctx.Err() + } + } + + return err + + // We got a block height. Success! + case <-blockChan: + return nil + } + } + + // Main retry loop. + for { + log.Info("Trying to make RegisterBlockEpochNtfn and receive " + + "the current height...") + err := attempt() + + if errors.Is(err, errRetry) { + log.Info("ChainNotifier is not ready yet. Will retry") + + continue + } else if err != nil { + log.Info("RegisterBlockEpochNtfn returned unexpected "+ + "error %v. LND client failed to start!", err) + + return err + } + + log.Info("RegisterBlockEpochNtfn returned a height. Success!") + break + } + + return nil +} + // getLndInfo queries lnd for information about the node it is connected to. // If the waitForUnlocked boolean is set, it will examine any errors returned // and back off if the failure is due to lnd currently being locked. Otherwise, @@ -676,6 +771,37 @@ func IsUnlockError(err error) bool { return false } +// chainNotifierStartupMessage matches the error string returned by lnd +// v0.20.0-rc3+ when a ChainNotifier RPC is invoked before the sub-server +// finishes initialization. +const chainNotifierStartupMessage = "chain notifier RPC is still in the " + + "process of starting" + +// isChainNotifierStartingErr reports whether err is due to the lnd +// ChainNotifier sub-server still starting up. Starting with lnd v0.20.0-rc3 +// the notifier is initialised later in the daemon lifecycle, and the RPC layer +// surfaces this as an Unknown gRPC status that contains the message defined in +// chainNotifierStartupMessage. There is a PR in LND to return code Unavailable +// instead of Unknown: https://github.com/lightningnetwork/lnd/pull/10352 +func isChainNotifierStartingErr(err error) bool { + if err == nil { + return false + } + + // gRPC code Unavailable means "the server can't handle this request + // now, retry later". LND's chain notifier returns this error when + // the server is starting. + // See https://github.com/lightningnetwork/lnd/pull/10352 + st, ok := status.FromError(err) + if ok && st.Code() == codes.Unavailable { + return true + } + + // TODO(ln-v0.20.0) remove the string fallback once lndclient depends on + // a version of lnd that returns codes.Unavailable for this condition. + return strings.Contains(err.Error(), chainNotifierStartupMessage) +} + // checkLndCompatibility makes sure the connected lnd instance is running on the // correct network, has the version RPC implemented, is the correct minimal // version and supports all required build tags/subservers. diff --git a/lnd_services_test.go b/lnd_services_test.go index 8bade98..4a23975 100644 --- a/lnd_services_test.go +++ b/lnd_services_test.go @@ -361,3 +361,27 @@ func TestCustomMacaroonHex(t *testing.T) { _, err = NewLndServices(testCfg) require.Error(t, err, "must set only one") } + +// TestIsChainNotifierStartingErr ensures we correctly detect the startup lag +// error returned by lnd v0.20.0-rc3+. +func TestIsChainNotifierStartingErr(t *testing.T) { + t.Parallel() + + require.True(t, isChainNotifierStartingErr( + status.Error(codes.Unavailable, chainNotifierStartupMessage), + )) + + require.True(t, isChainNotifierStartingErr( + status.Error(codes.Unknown, chainNotifierStartupMessage), + )) + + require.True(t, isChainNotifierStartingErr( + status.Error(codes.Unavailable, "some other error"), + )) + + require.False(t, isChainNotifierStartingErr(nil)) + + require.False(t, isChainNotifierStartingErr( + status.Error(codes.Unknown, "some other error"), + )) +}