Skip to content

Commit f577c53

Browse files
committed
server+tapchannel: fix deadlock potential, queue msgs
This commit fixes a potential deadlock when running in conjunction with lnd/litd. On startup, if a custom message needs to be sent by an lnd component as part of its initialization, it would previously run into the waitForReady method in tapd and wait. But that would block lnd from fully starting up, meaning that waitForReady would never return. We fix this by queuing up incoming messages so we can handle them asynchronously once we've fully started up. This is safe as long as the server and funding controller have been created, even if not fully started. The queued messages will be processed as soon as the funding controller's Start method is called.
1 parent fa97f8f commit f577c53

File tree

2 files changed

+26
-12
lines changed

2 files changed

+26
-12
lines changed

server.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -862,12 +862,16 @@ func (s *Server) Name() msgmux.EndpointName {
862862
//
863863
// NOTE: This method is part of the msgmux.MsgEndpoint interface.
864864
func (s *Server) CanHandle(msg msgmux.PeerMsg) bool {
865-
err := s.waitForReady()
866-
if err != nil {
867-
srvrLog.Debugf("Can't handle PeerMsg, server not ready %v",
868-
err)
865+
// We can't wait for ready here, as this method is potentially called
866+
// during startup. The `CanHandle` method is stateless, so we can call
867+
// it if the funding controller has been created (but potentially has
868+
// not yet been started).
869+
if s == nil || s.cfg == nil || s.cfg.AuxFundingController == nil {
870+
// This shouldn't happen, the server and funding controller
871+
// should always be initialized before the msgmux is started.
869872
return false
870873
}
874+
871875
return s.cfg.AuxFundingController.CanHandle(msg)
872876
}
873877

@@ -876,12 +880,18 @@ func (s *Server) CanHandle(msg msgmux.PeerMsg) bool {
876880
//
877881
// NOTE: This method is part of the msgmux.MsgEndpoint interface.
878882
func (s *Server) SendMessage(ctx context.Context, msg msgmux.PeerMsg) bool {
879-
err := s.waitForReady()
880-
if err != nil {
881-
srvrLog.Debugf("Failed to send PeerMsg, server not ready %v",
882-
err)
883+
// We can't wait for ready here, as this method is potentially called
884+
// during startup. The `SendMessage` method will buffer messages that
885+
// come in between the funding controller being created and it being
886+
// started (which only happens after waitForReady fires). So it's safe
887+
// to call it here, as long as the funding controller has been created.
888+
if s == nil || s.cfg == nil || s.cfg.AuxFundingController == nil {
889+
// This shouldn't happen, the CanHandle method is always called
890+
// first, and that should've already returned false in this
891+
// case.
883892
return false
884893
}
894+
885895
return s.cfg.AuxFundingController.SendMessage(ctx, msg)
886896
}
887897

tapchannel/aux_funding_controller.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ type FundingController struct {
301301

302302
cfg FundingControllerCfg
303303

304-
msgs chan msgmux.PeerMsg
304+
msgQueue *fn.ConcurrentQueue[msgmux.PeerMsg]
305305

306306
bindFundingReqs chan *bindFundingReq
307307

@@ -318,9 +318,11 @@ type FundingController struct {
318318

319319
// NewFundingController creates a new instance of the FundingController.
320320
func NewFundingController(cfg FundingControllerCfg) *FundingController {
321+
msgs := fn.NewConcurrentQueue[msgmux.PeerMsg](fn.DefaultQueueSize)
322+
msgs.Start()
321323
return &FundingController{
322324
cfg: cfg,
323-
msgs: make(chan msgmux.PeerMsg, 10),
325+
msgQueue: msgs,
324326
bindFundingReqs: make(chan *bindFundingReq, 10),
325327
newFundingReqs: make(chan *FundReq, 10),
326328
rootReqs: make(chan *assetRootReq, 10),
@@ -387,6 +389,8 @@ func (f *FundingController) Stop() error {
387389
close(f.Quit)
388390
f.Wg.Wait()
389391

392+
f.msgQueue.Stop()
393+
390394
return nil
391395
}
392396

@@ -1858,7 +1862,7 @@ func (f *FundingController) chanFunder() {
18581862
// The remote party has sent us some upfront proof for channel
18591863
// asset inputs. We'll log this pending chan ID, then validate
18601864
// the proofs included.
1861-
case msg := <-f.msgs:
1865+
case msg := <-f.msgQueue.ChanOut():
18621866
tempFundingID, err := f.processFundingMsg(
18631867
ctxc, fundingFlows, msg,
18641868
)
@@ -2438,7 +2442,7 @@ func (f *FundingController) CanHandle(msg msgmux.PeerMsg) bool {
24382442
func (f *FundingController) SendMessage(_ context.Context,
24392443
msg msgmux.PeerMsg) bool {
24402444

2441-
return fn.SendOrQuit(f.msgs, msg, f.Quit)
2445+
return fn.SendOrQuit(f.msgQueue.ChanIn(), msg, f.Quit)
24422446
}
24432447

24442448
// TODO(roasbeef): try to protofsm it?

0 commit comments

Comments
 (0)