@@ -143,6 +143,11 @@ type LndServicesConfig struct {
143143 // block download is still in progress.
144144 BlockUntilChainSynced bool
145145
146+ // BlockUntilChainNotifier indicates that the client should wait until
147+ // the ChainNotifier RPC is accepting subscriptions. This requires lnd
148+ // to be built with the "chainrpc" tag.
149+ BlockUntilChainNotifier bool
150+
146151 // BlockUntilUnlocked denotes that the NewLndServices function should
147152 // block until lnd is unlocked.
148153 BlockUntilUnlocked bool
@@ -453,6 +458,33 @@ func NewLndServices(cfg *LndServicesConfig) (*GrpcLndServices, error) {
453458 log .Infof ("lnd is now fully synced to its chain backend" )
454459 }
455460
461+ // If requested, wait until the chain notifier RPC is ready before we
462+ // return. This ensures sub-servers relying on the notifier don't fail
463+ // during startup.
464+ if cfg .BlockUntilChainNotifier {
465+ if ! hasBuildTag (services .Version , "chainrpc" ) {
466+ cleanup ()
467+
468+ return nil , fmt .Errorf ("chain notifier build tag is " +
469+ "required when waiting for chain notifier " +
470+ "readiness" )
471+ }
472+
473+ log .Infof ("Waiting for chain notifier RPC to be ready" )
474+
475+ err := services .waitForChainNotifier (
476+ cfg .CallerCtx , timeout , cfg .ChainSyncPollInterval ,
477+ )
478+ if err != nil {
479+ cleanup ()
480+
481+ return nil , fmt .Errorf ("error waiting for chain " +
482+ "notifier readiness: %w" , err )
483+ }
484+
485+ log .Infof ("Chain notifier RPC is ready" )
486+ }
487+
456488 return services , nil
457489}
458490
@@ -533,6 +565,60 @@ func (s *GrpcLndServices) waitForChainSync(ctx context.Context,
533565 return <- update
534566}
535567
568+ // waitForChainNotifier blocks until the ChainNotifier RPC accepts block epoch
569+ // subscriptions and delivers at least one block height.
570+ func (s * GrpcLndServices ) waitForChainNotifier (ctx context.Context ,
571+ timeout , pollInterval time.Duration ) error {
572+
573+ mainCtx := ctx
574+ if mainCtx == nil {
575+ mainCtx = context .Background ()
576+ }
577+
578+ register := s .ChainNotifier .RegisterBlockEpochNtfn
579+
580+ for {
581+ // Make new RegisterBlockEpochNtfn call.
582+ subCtx , cancel := context .WithTimeout (mainCtx , timeout )
583+ blockChan , errChan , err := register (subCtx )
584+ if err != nil {
585+ cancel ()
586+
587+ return fmt .Errorf ("register block epoch ntfn: %w" , err )
588+ }
589+
590+ // Wait for block height notification, which indicates success.
591+ select {
592+ case <- mainCtx .Done ():
593+ cancel ()
594+
595+ return mainCtx .Err ()
596+
597+ case err := <- errChan :
598+ cancel ()
599+
600+ // If chainNotifier is not ready yet, retry.
601+ if isChainNotifierStartingErr (err ) {
602+ select {
603+ case <- time .After (pollInterval ):
604+ continue
605+
606+ case <- mainCtx .Done ():
607+ return mainCtx .Err ()
608+ }
609+ }
610+
611+ return err
612+
613+ // We got a block height. Success!
614+ case <- blockChan :
615+ cancel ()
616+
617+ return nil
618+ }
619+ }
620+ }
621+
536622// getLndInfo queries lnd for information about the node it is connected to.
537623// If the waitForUnlocked boolean is set, it will examine any errors returned
538624// and back off if the failure is due to lnd currently being locked. Otherwise,
@@ -676,6 +762,37 @@ func IsUnlockError(err error) bool {
676762 return false
677763}
678764
765+ // chainNotifierStartupMessage matches the error string returned by lnd
766+ // v0.20.0-rc3+ when a ChainNotifier RPC is invoked before the sub-server
767+ // finishes initialization.
768+ const chainNotifierStartupMessage = "chain notifier RPC is still in the " +
769+ "process of starting"
770+
771+ // isChainNotifierStartingErr reports whether err is due to the lnd
772+ // ChainNotifier sub-server still starting up. Starting with lnd v0.20.0-rc3
773+ // the notifier is initialised later in the daemon lifecycle, and the RPC layer
774+ // surfaces this as an Unknown gRPC status that contains the message defined in
775+ // chainNotifierStartupMessage. There is a PR in LND to return code Unavailable
776+ // instead of Unknown: https://github.com/lightningnetwork/lnd/pull/10352
777+ func isChainNotifierStartingErr (err error ) bool {
778+ if err == nil {
779+ return false
780+ }
781+
782+ // gRPC code Unavailable means "the server can't handle this request
783+ // now, retry later". LND's chain notifier returns this error when
784+ // the server is starting.
785+ // See https://github.com/lightningnetwork/lnd/pull/10352
786+ st , ok := status .FromError (err )
787+ if ok && st .Code () == codes .Unavailable {
788+ return true
789+ }
790+
791+ // TODO(ln-v0.20.0) remove the string fallback once lndclient depends on
792+ // a version of lnd that returns codes.Unavailable for this condition.
793+ return strings .Contains (err .Error (), chainNotifierStartupMessage )
794+ }
795+
679796// checkLndCompatibility makes sure the connected lnd instance is running on the
680797// correct network, has the version RPC implemented, is the correct minimal
681798// version and supports all required build tags/subservers.
@@ -809,6 +926,22 @@ func assertBuildTagsEnabled(actual *verrpc.Version,
809926 return nil
810927}
811928
929+ // hasBuildTag reports whether the given version advertises the specified
930+ // build tag.
931+ func hasBuildTag (version * verrpc.Version , tag string ) bool {
932+ if version == nil {
933+ return false
934+ }
935+
936+ for _ , t := range version .BuildTags {
937+ if t == tag {
938+ return true
939+ }
940+ }
941+
942+ return false
943+ }
944+
812945var (
813946 defaultRPCPort = "10009"
814947 defaultLndDir = btcutil .AppDataDir ("lnd" , false )
0 commit comments