From d4b697c1424dad385bed18936438a7de70b1da8f Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Mon, 10 Nov 2025 17:13:50 -0300 Subject: [PATCH] lndclient: block until chain notifier is ready LND commit c6f458e478f9 (v0.20.0-rc3) moved ChainNotifier startup later in the lifecycle, so RegisterBlockEpochNtfn callers now see "chain notifier RPC is still in the process of starting" coming from Recv(). The new BlockUntilChainNotifier config option repeatedly calls RegisterBlockEpochNtfn during startup and only proceeds once the stream yields its first block height, retrying solely when we detect the ErrChainNotifierServerNotActive condition introduced by the LND commit above. --- lnd_services.go | 126 +++++++++++++++++++++++++++++++++++++++++++ lnd_services_test.go | 24 +++++++++ 2 files changed, 150 insertions(+) diff --git a/lnd_services.go b/lnd_services.go index 9ef5653..ecb5268 100644 --- a/lnd_services.go +++ b/lnd_services.go @@ -143,6 +143,11 @@ type LndServicesConfig struct { // block download is still in progress. BlockUntilChainSynced bool + // BlockUntilChainNotifier indicates that the client should wait until + // the ChainNotifier RPC is accepting subscriptions. This requires lnd + // to be built with the "chainrpc" tag. + BlockUntilChainNotifier bool + // BlockUntilUnlocked denotes that the NewLndServices function should // block until lnd is unlocked. BlockUntilUnlocked bool @@ -453,6 +458,25 @@ func NewLndServices(cfg *LndServicesConfig) (*GrpcLndServices, error) { log.Infof("lnd is now fully synced to its chain backend") } + // If requested, wait until the chain notifier RPC is ready before we + // return. This ensures sub-servers relying on the notifier don't fail + // during startup. + if cfg.BlockUntilChainNotifier { + log.Infof("Waiting for chain notifier RPC to be ready") + + err := services.waitForChainNotifier( + cfg.CallerCtx, timeout, cfg.ChainSyncPollInterval, + ) + if err != nil { + cleanup() + + return nil, fmt.Errorf("error waiting for chain "+ + "notifier readiness: %w", err) + } + + log.Infof("Chain notifier RPC is ready") + } + return services, nil } @@ -533,6 +557,77 @@ func (s *GrpcLndServices) waitForChainSync(ctx context.Context, return <-update } +// waitForChainNotifier blocks until the ChainNotifier RPC accepts block epoch +// subscriptions and delivers at least one block height. +func (s *GrpcLndServices) waitForChainNotifier(ctx context.Context, + timeout, pollInterval time.Duration) error { + + register := s.ChainNotifier.RegisterBlockEpochNtfn + + var errRetry = errors.New("retry RegisterBlockEpochNtfn") + + // attempt is a single attempt to make a RegisterBlockEpochNtfn call. + // It returns nil on success, errRetry if another retry is needed and + // other error in case of a final error. + attempt := func() error { + subCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + // Make new RegisterBlockEpochNtfn call. + blockChan, errChan, err := register(subCtx) + if err != nil { + return fmt.Errorf("register block epoch ntfn: %w", err) + } + + // Wait for block height notification, which indicates success. + select { + case <-subCtx.Done(): + return subCtx.Err() + + case err := <-errChan: + // If chainNotifier is not ready yet, retry. + if isChainNotifierStartingErr(err) { + select { + case <-time.After(pollInterval): + return errRetry + + case <-ctx.Done(): + return ctx.Err() + } + } + + return err + + // We got a block height. Success! + case <-blockChan: + return nil + } + } + + // Main retry loop. + for { + log.Info("Trying to make RegisterBlockEpochNtfn and receive " + + "the current height...") + err := attempt() + + if errors.Is(err, errRetry) { + log.Info("ChainNotifier is not ready yet. Will retry") + + continue + } else if err != nil { + log.Info("RegisterBlockEpochNtfn returned unexpected "+ + "error %v. LND client failed to start!", err) + + return err + } + + log.Info("RegisterBlockEpochNtfn returned a height. Success!") + break + } + + return nil +} + // getLndInfo queries lnd for information about the node it is connected to. // If the waitForUnlocked boolean is set, it will examine any errors returned // and back off if the failure is due to lnd currently being locked. Otherwise, @@ -676,6 +771,37 @@ func IsUnlockError(err error) bool { return false } +// chainNotifierStartupMessage matches the error string returned by lnd +// v0.20.0-rc3+ when a ChainNotifier RPC is invoked before the sub-server +// finishes initialization. +const chainNotifierStartupMessage = "chain notifier RPC is still in the " + + "process of starting" + +// isChainNotifierStartingErr reports whether err is due to the lnd +// ChainNotifier sub-server still starting up. Starting with lnd v0.20.0-rc3 +// the notifier is initialised later in the daemon lifecycle, and the RPC layer +// surfaces this as an Unknown gRPC status that contains the message defined in +// chainNotifierStartupMessage. There is a PR in LND to return code Unavailable +// instead of Unknown: https://github.com/lightningnetwork/lnd/pull/10352 +func isChainNotifierStartingErr(err error) bool { + if err == nil { + return false + } + + // gRPC code Unavailable means "the server can't handle this request + // now, retry later". LND's chain notifier returns this error when + // the server is starting. + // See https://github.com/lightningnetwork/lnd/pull/10352 + st, ok := status.FromError(err) + if ok && st.Code() == codes.Unavailable { + return true + } + + // TODO(ln-v0.20.0) remove the string fallback once lndclient depends on + // a version of lnd that returns codes.Unavailable for this condition. + return strings.Contains(err.Error(), chainNotifierStartupMessage) +} + // checkLndCompatibility makes sure the connected lnd instance is running on the // correct network, has the version RPC implemented, is the correct minimal // version and supports all required build tags/subservers. diff --git a/lnd_services_test.go b/lnd_services_test.go index 8bade98..4a23975 100644 --- a/lnd_services_test.go +++ b/lnd_services_test.go @@ -361,3 +361,27 @@ func TestCustomMacaroonHex(t *testing.T) { _, err = NewLndServices(testCfg) require.Error(t, err, "must set only one") } + +// TestIsChainNotifierStartingErr ensures we correctly detect the startup lag +// error returned by lnd v0.20.0-rc3+. +func TestIsChainNotifierStartingErr(t *testing.T) { + t.Parallel() + + require.True(t, isChainNotifierStartingErr( + status.Error(codes.Unavailable, chainNotifierStartupMessage), + )) + + require.True(t, isChainNotifierStartingErr( + status.Error(codes.Unknown, chainNotifierStartupMessage), + )) + + require.True(t, isChainNotifierStartingErr( + status.Error(codes.Unavailable, "some other error"), + )) + + require.False(t, isChainNotifierStartingErr(nil)) + + require.False(t, isChainNotifierStartingErr( + status.Error(codes.Unknown, "some other error"), + )) +}