Skip to content

Commit 4d5b18f

Browse files
authored
Merge pull request #1652 from lightninglabs/msg-router-non-blocking
server+tapchannel: fix deadlock potential, queue msgs
2 parents 8dce848 + f577c53 commit 4d5b18f

File tree

2 files changed

+26
-12
lines changed

2 files changed

+26
-12
lines changed

server.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -862,12 +862,16 @@ func (s *Server) Name() msgmux.EndpointName {
862862
//
863863
// NOTE: This method is part of the msgmux.MsgEndpoint interface.
864864
func (s *Server) CanHandle(msg msgmux.PeerMsg) bool {
865-
err := s.waitForReady()
866-
if err != nil {
867-
srvrLog.Debugf("Can't handle PeerMsg, server not ready %v",
868-
err)
865+
// We can't wait for ready here, as this method is potentially called
866+
// during startup. The `CanHandle` method is stateless, so we can call
867+
// it if the funding controller has been created (but potentially has
868+
// not yet been started).
869+
if s == nil || s.cfg == nil || s.cfg.AuxFundingController == nil {
870+
// This shouldn't happen, the server and funding controller
871+
// should always be initialized before the msgmux is started.
869872
return false
870873
}
874+
871875
return s.cfg.AuxFundingController.CanHandle(msg)
872876
}
873877

@@ -876,12 +880,18 @@ func (s *Server) CanHandle(msg msgmux.PeerMsg) bool {
876880
//
877881
// NOTE: This method is part of the msgmux.MsgEndpoint interface.
878882
func (s *Server) SendMessage(ctx context.Context, msg msgmux.PeerMsg) bool {
879-
err := s.waitForReady()
880-
if err != nil {
881-
srvrLog.Debugf("Failed to send PeerMsg, server not ready %v",
882-
err)
883+
// We can't wait for ready here, as this method is potentially called
884+
// during startup. The `SendMessage` method will buffer messages that
885+
// come in between the funding controller being created and it being
886+
// started (which only happens after waitForReady fires). So it's safe
887+
// to call it here, as long as the funding controller has been created.
888+
if s == nil || s.cfg == nil || s.cfg.AuxFundingController == nil {
889+
// This shouldn't happen, the CanHandle method is always called
890+
// first, and that should've already returned false in this
891+
// case.
883892
return false
884893
}
894+
885895
return s.cfg.AuxFundingController.SendMessage(ctx, msg)
886896
}
887897

tapchannel/aux_funding_controller.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ type FundingController struct {
301301

302302
cfg FundingControllerCfg
303303

304-
msgs chan msgmux.PeerMsg
304+
msgQueue *fn.ConcurrentQueue[msgmux.PeerMsg]
305305

306306
bindFundingReqs chan *bindFundingReq
307307

@@ -318,9 +318,11 @@ type FundingController struct {
318318

319319
// NewFundingController creates a new instance of the FundingController.
320320
func NewFundingController(cfg FundingControllerCfg) *FundingController {
321+
msgs := fn.NewConcurrentQueue[msgmux.PeerMsg](fn.DefaultQueueSize)
322+
msgs.Start()
321323
return &FundingController{
322324
cfg: cfg,
323-
msgs: make(chan msgmux.PeerMsg, 10),
325+
msgQueue: msgs,
324326
bindFundingReqs: make(chan *bindFundingReq, 10),
325327
newFundingReqs: make(chan *FundReq, 10),
326328
rootReqs: make(chan *assetRootReq, 10),
@@ -387,6 +389,8 @@ func (f *FundingController) Stop() error {
387389
close(f.Quit)
388390
f.Wg.Wait()
389391

392+
f.msgQueue.Stop()
393+
390394
return nil
391395
}
392396

@@ -1858,7 +1862,7 @@ func (f *FundingController) chanFunder() {
18581862
// The remote party has sent us some upfront proof for channel
18591863
// asset inputs. We'll log this pending chan ID, then validate
18601864
// the proofs included.
1861-
case msg := <-f.msgs:
1865+
case msg := <-f.msgQueue.ChanOut():
18621866
tempFundingID, err := f.processFundingMsg(
18631867
ctxc, fundingFlows, msg,
18641868
)
@@ -2438,7 +2442,7 @@ func (f *FundingController) CanHandle(msg msgmux.PeerMsg) bool {
24382442
func (f *FundingController) SendMessage(_ context.Context,
24392443
msg msgmux.PeerMsg) bool {
24402444

2441-
return fn.SendOrQuit(f.msgs, msg, f.Quit)
2445+
return fn.SendOrQuit(f.msgQueue.ChanIn(), msg, f.Quit)
24422446
}
24432447

24442448
// TODO(roasbeef): try to protofsm it?

0 commit comments

Comments
 (0)