Skip to content

Commit 4ced3bc

Browse files
committed
peer: update readHandler to dispatch to msgRouter if set
Over time with this, we should be able to significantly reduce the size of the peer.Brontide struct as we only need all those deps as the peer needs to recognize and handle each incoming wire message itself.
1 parent 46f06d3 commit 4ced3bc

File tree

1 file changed

+36
-1
lines changed

1 file changed

+36
-1
lines changed

peer/brontide.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
4343
"github.com/lightningnetwork/lnd/lnwallet/chancloser"
4444
"github.com/lightningnetwork/lnd/lnwire"
45+
"github.com/lightningnetwork/lnd/msgmux"
4546
"github.com/lightningnetwork/lnd/netann"
4647
"github.com/lightningnetwork/lnd/pool"
4748
"github.com/lightningnetwork/lnd/queue"
@@ -522,6 +523,10 @@ type Brontide struct {
522523
// potentially holding lots of un-consumed events.
523524
channelEventClient *subscribe.Client
524525

526+
// msgRouter is an instance of the msgmux.Router which is used to send
527+
// off new wire messages for handing.
528+
msgRouter fn.Option[msgmux.Router]
529+
525530
startReady chan struct{}
526531
quit chan struct{}
527532
wg sync.WaitGroup
@@ -559,6 +564,9 @@ func NewBrontide(cfg Config) *Brontide {
559564
startReady: make(chan struct{}),
560565
quit: make(chan struct{}),
561566
log: build.NewPrefixLog(logPrefix, peerLog),
567+
msgRouter: fn.Some[msgmux.Router](
568+
msgmux.NewMultiMsgRouter(),
569+
),
562570
}
563571

564572
if cfg.Conn != nil && cfg.Conn.RemoteAddr() != nil {
@@ -738,6 +746,12 @@ func (p *Brontide) Start() error {
738746
return err
739747
}
740748

749+
// Register the message router now as we may need to register some
750+
// endpoints while loading the channels below.
751+
p.msgRouter.WhenSome(func(router msgmux.Router) {
752+
router.Start()
753+
})
754+
741755
msgs, err := p.loadActiveChannels(activeChans)
742756
if err != nil {
743757
return fmt.Errorf("unable to load channels: %w", err)
@@ -913,7 +927,8 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
913927
p.cfg.Signer, dbChan, p.cfg.SigPool,
914928
)
915929
if err != nil {
916-
return nil, err
930+
return nil, fmt.Errorf("unable to create channel "+
931+
"state machine: %w", err)
917932
}
918933

919934
chanPoint := dbChan.FundingOutpoint
@@ -1368,6 +1383,10 @@ func (p *Brontide) Disconnect(reason error) {
13681383
p.cfg.Conn.Close()
13691384

13701385
close(p.quit)
1386+
1387+
p.msgRouter.WhenSome(func(router msgmux.Router) {
1388+
router.Stop()
1389+
})
13711390
}
13721391

13731392
// String returns the string representation of this peer.
@@ -1809,6 +1828,22 @@ out:
18091828
}
18101829
}
18111830

1831+
// If a message router is active, then we'll try to have it
1832+
// handle this message. If it can, then we're able to skip the
1833+
// rest of the message handling logic.
1834+
err = fn.MapOptionZ(p.msgRouter, func(r msgmux.Router) error {
1835+
return r.RouteMsg(msgmux.PeerMsg{
1836+
PeerPub: *p.IdentityKey(),
1837+
Message: nextMsg,
1838+
})
1839+
})
1840+
1841+
// No error occurred, and the message was handled by the
1842+
// router.
1843+
if err == nil {
1844+
continue
1845+
}
1846+
18121847
var (
18131848
targetChan lnwire.ChannelID
18141849
isLinkUpdate bool

0 commit comments

Comments
 (0)