Skip to content

Commit 46f06d3

Browse files
committed
msgmux: add new abstract message router
In this commit, we add a new abstract message router. Over time, the goal is that this message router replaces the logic we currently have in the readHandler (the giant switch for each message). With this new abstraction, can reduce the responsibilities of the readHandler to *just* reading messages off the wire and handing them off to the msg router. The readHandler no longer needs to know *where* the messages should go, or how they should be dispatched. This will be used in tandem with the new `protofsm` module in an upcoming PR implementing the new rbf-coop close.
1 parent 77c7f77 commit 46f06d3

File tree

5 files changed

+461
-3
lines changed

5 files changed

+461
-3
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ require (
3535
github.com/lightningnetwork/lightning-onion v1.2.1-0.20240712235311-98bd56499dfb
3636
github.com/lightningnetwork/lnd/cert v1.2.2
3737
github.com/lightningnetwork/lnd/clock v1.1.1
38-
github.com/lightningnetwork/lnd/fn v1.2.0
38+
github.com/lightningnetwork/lnd/fn v1.2.1
3939
github.com/lightningnetwork/lnd/healthcheck v1.2.5
4040
github.com/lightningnetwork/lnd/kvdb v1.4.10
4141
github.com/lightningnetwork/lnd/queue v1.1.1

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -450,8 +450,8 @@ github.com/lightningnetwork/lnd/cert v1.2.2 h1:71YK6hogeJtxSxw2teq3eGeuy4rHGKcFf
450450
github.com/lightningnetwork/lnd/cert v1.2.2/go.mod h1:jQmFn/Ez4zhDgq2hnYSw8r35bqGVxViXhX6Cd7HXM6U=
451451
github.com/lightningnetwork/lnd/clock v1.1.1 h1:OfR3/zcJd2RhH0RU+zX/77c0ZiOnIMsDIBjgjWdZgA0=
452452
github.com/lightningnetwork/lnd/clock v1.1.1/go.mod h1:mGnAhPyjYZQJmebS7aevElXKTFDuO+uNFFfMXK1W8xQ=
453-
github.com/lightningnetwork/lnd/fn v1.2.0 h1:YTb2m8NN5ZiJAskHeBZAmR1AiPY8SXziIYPAX1VI/ZM=
454-
github.com/lightningnetwork/lnd/fn v1.2.0/go.mod h1:SyFohpVrARPKH3XVAJZlXdVe+IwMYc4OMAvrDY32kw0=
453+
github.com/lightningnetwork/lnd/fn v1.2.1 h1:pPsVGrwi9QBwdLJzaEGK33wmiVKOxs/zc8H7+MamFf0=
454+
github.com/lightningnetwork/lnd/fn v1.2.1/go.mod h1:SyFohpVrARPKH3XVAJZlXdVe+IwMYc4OMAvrDY32kw0=
455455
github.com/lightningnetwork/lnd/healthcheck v1.2.5 h1:aTJy5xeBpcWgRtW/PGBDe+LMQEmNm/HQewlQx2jt7OA=
456456
github.com/lightningnetwork/lnd/healthcheck v1.2.5/go.mod h1:G7Tst2tVvWo7cx6mSBEToQC5L1XOGxzZTPB29g9Rv2I=
457457
github.com/lightningnetwork/lnd/kvdb v1.4.10 h1:vK89IVv1oVH9ubQWU+EmoCQFeVRaC8kfmOrqHbY5zoY=

msgmux/log.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package msgmux
2+
3+
import (
4+
"github.com/btcsuite/btclog"
5+
"github.com/lightningnetwork/lnd/build"
6+
)
7+
8+
// Subsystem defines the logging code for this subsystem.
9+
const Subsystem = "MSGX"
10+
11+
// log is a logger that is initialized with no output filters. This
12+
// means the package will not perform any logging by default until the caller
13+
// requests it.
14+
var log btclog.Logger
15+
16+
// The default amount of logging is none.
17+
func init() {
18+
UseLogger(build.NewSubLogger(Subsystem, nil))
19+
}
20+
21+
// DisableLog disables all library log output. Logging output is disabled
22+
// by default until UseLogger is called.
23+
func DisableLog() {
24+
UseLogger(btclog.Disabled)
25+
}
26+
27+
// UseLogger uses a specified Logger to output package logging info.
28+
// This should be used in preference to SetLogWriter if the caller is also
29+
// using btclog.
30+
func UseLogger(logger btclog.Logger) {
31+
log = logger
32+
}

msgmux/msg_router.go

Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
package msgmux
2+
3+
import (
4+
"fmt"
5+
"maps"
6+
"sync"
7+
8+
"github.com/btcsuite/btcd/btcec/v2"
9+
"github.com/lightningnetwork/lnd/fn"
10+
"github.com/lightningnetwork/lnd/lnwire"
11+
)
12+
13+
var (
14+
// ErrDuplicateEndpoint is returned when an endpoint is registered with
15+
// a name that already exists.
16+
ErrDuplicateEndpoint = fmt.Errorf("endpoint already registered")
17+
18+
// ErrUnableToRouteMsg is returned when a message is unable to be
19+
// routed to any endpoints.
20+
ErrUnableToRouteMsg = fmt.Errorf("unable to route message")
21+
)
22+
23+
// EndpointName is the name of a given endpoint. This MUST be unique across all
24+
// registered endpoints.
25+
type EndpointName = string
26+
27+
// PeerMsg is a wire message that includes the public key of the peer that sent
28+
// it.
29+
type PeerMsg struct {
30+
lnwire.Message
31+
32+
// PeerPub is the public key of the peer that sent this message.
33+
PeerPub btcec.PublicKey
34+
}
35+
36+
// Endpoint is an interface that represents a message endpoint, or the
37+
// sub-system that will handle processing an incoming wire message.
38+
type Endpoint interface {
39+
// Name returns the name of this endpoint. This MUST be unique across
40+
// all registered endpoints.
41+
Name() EndpointName
42+
43+
// CanHandle returns true if the target message can be routed to this
44+
// endpoint.
45+
CanHandle(msg PeerMsg) bool
46+
47+
// SendMessage handles the target message, and returns true if the
48+
// message was able being processed.
49+
SendMessage(msg PeerMsg) bool
50+
}
51+
52+
// MsgRouter is an interface that represents a message router, which is generic
53+
// sub-system capable of routing any incoming wire message to a set of
54+
// registered endpoints.
55+
type Router interface {
56+
// RegisterEndpoint registers a new endpoint with the router. If a
57+
// duplicate endpoint exists, an error is returned.
58+
RegisterEndpoint(Endpoint) error
59+
60+
// UnregisterEndpoint unregisters the target endpoint from the router.
61+
UnregisterEndpoint(EndpointName) error
62+
63+
// RouteMsg attempts to route the target message to a registered
64+
// endpoint. If ANY endpoint could handle the message, then nil is
65+
// returned. Otherwise, ErrUnableToRouteMsg is returned.
66+
RouteMsg(PeerMsg) error
67+
68+
// Start starts the peer message router.
69+
Start()
70+
71+
// Stop stops the peer message router.
72+
Stop()
73+
}
74+
75+
// sendQuery sends a query to the main event loop, and returns the response.
76+
func sendQuery[Q any, R any](sendChan chan fn.Req[Q, R], queryArg Q,
77+
quit chan struct{}) fn.Result[R] {
78+
79+
query, respChan := fn.NewReq[Q, R](queryArg)
80+
81+
if !fn.SendOrQuit(sendChan, query, quit) {
82+
return fn.Errf[R]("router shutting down")
83+
}
84+
85+
return fn.NewResult(fn.RecvResp(respChan, nil, quit))
86+
}
87+
88+
// sendQueryErr is a helper function based on sendQuery that can be used when
89+
// the query only needs an error response.
90+
func sendQueryErr[Q any](sendChan chan fn.Req[Q, error], queryArg Q,
91+
quitChan chan struct{}) error {
92+
93+
return fn.ElimEither(
94+
fn.Iden, fn.Iden,
95+
sendQuery(sendChan, queryArg, quitChan).Either,
96+
)
97+
}
98+
99+
// EndpointsMap is a map of all registered endpoints.
100+
type EndpointsMap map[EndpointName]Endpoint
101+
102+
// MultiMsgRouter is a type of message router that is capable of routing new
103+
// incoming messages, permitting a message to be routed to multiple registered
104+
// endpoints.
105+
type MultiMsgRouter struct {
106+
startOnce sync.Once
107+
stopOnce sync.Once
108+
109+
// registerChan is the channel that all new endpoints will be sent to.
110+
registerChan chan fn.Req[Endpoint, error]
111+
112+
// unregisterChan is the channel that all endpoints that are to be
113+
// removed are sent to.
114+
unregisterChan chan fn.Req[EndpointName, error]
115+
116+
// msgChan is the channel that all messages will be sent to for
117+
// processing.
118+
msgChan chan fn.Req[PeerMsg, error]
119+
120+
// endpointsQueries is a channel that all queries to the endpoints map
121+
// will be sent to.
122+
endpointQueries chan fn.Req[Endpoint, EndpointsMap]
123+
124+
wg sync.WaitGroup
125+
quit chan struct{}
126+
}
127+
128+
// NewMultiMsgRouter creates a new instance of a peer message router.
129+
func NewMultiMsgRouter() *MultiMsgRouter {
130+
return &MultiMsgRouter{
131+
registerChan: make(chan fn.Req[Endpoint, error]),
132+
unregisterChan: make(chan fn.Req[EndpointName, error]),
133+
msgChan: make(chan fn.Req[PeerMsg, error]),
134+
endpointQueries: make(chan fn.Req[Endpoint, EndpointsMap]),
135+
quit: make(chan struct{}),
136+
}
137+
}
138+
139+
// Start starts the peer message router.
140+
func (p *MultiMsgRouter) Start() {
141+
log.Infof("Starting Router")
142+
143+
p.startOnce.Do(func() {
144+
p.wg.Add(1)
145+
go p.msgRouter()
146+
})
147+
}
148+
149+
// Stop stops the peer message router.
150+
func (p *MultiMsgRouter) Stop() {
151+
log.Infof("Stopping Router")
152+
153+
p.stopOnce.Do(func() {
154+
close(p.quit)
155+
p.wg.Wait()
156+
})
157+
}
158+
159+
// RegisterEndpoint registers a new endpoint with the router. If a duplicate
160+
// endpoint exists, an error is returned.
161+
func (p *MultiMsgRouter) RegisterEndpoint(endpoint Endpoint) error {
162+
return sendQueryErr(p.registerChan, endpoint, p.quit)
163+
}
164+
165+
// UnregisterEndpoint unregisters the target endpoint from the router.
166+
func (p *MultiMsgRouter) UnregisterEndpoint(name EndpointName) error {
167+
return sendQueryErr(p.unregisterChan, name, p.quit)
168+
}
169+
170+
// RouteMsg attempts to route the target message to a registered endpoint. If
171+
// ANY endpoint could handle the message, then nil is returned.
172+
func (p *MultiMsgRouter) RouteMsg(msg PeerMsg) error {
173+
return sendQueryErr(p.msgChan, msg, p.quit)
174+
}
175+
176+
// Endpoints returns a list of all registered endpoints.
177+
func (p *MultiMsgRouter) endpoints() fn.Result[EndpointsMap] {
178+
return sendQuery(p.endpointQueries, nil, p.quit)
179+
}
180+
181+
// msgRouter is the main goroutine that handles all incoming messages.
182+
func (p *MultiMsgRouter) msgRouter() {
183+
defer p.wg.Done()
184+
185+
// endpoints is a map of all registered endpoints.
186+
endpoints := make(map[EndpointName]Endpoint)
187+
188+
for {
189+
select {
190+
// A new endpoint was just sent in, so we'll add it to our set
191+
// of registered endpoints.
192+
case newEndpointMsg := <-p.registerChan:
193+
endpoint := newEndpointMsg.Request
194+
195+
log.Infof("MsgRouter: registering new "+
196+
"Endpoint(%s)", endpoint.Name())
197+
198+
// If this endpoint already exists, then we'll return
199+
// an error as we require unique names.
200+
if _, ok := endpoints[endpoint.Name()]; ok {
201+
log.Errorf("MsgRouter: rejecting "+
202+
"duplicate endpoint: %v",
203+
endpoint.Name())
204+
205+
newEndpointMsg.Resolve(ErrDuplicateEndpoint)
206+
207+
continue
208+
}
209+
210+
endpoints[endpoint.Name()] = endpoint
211+
212+
newEndpointMsg.Resolve(nil)
213+
214+
// A request to unregister an endpoint was just sent in, so
215+
// we'll attempt to remove it.
216+
case endpointName := <-p.unregisterChan:
217+
delete(endpoints, endpointName.Request)
218+
219+
log.Infof("MsgRouter: unregistering "+
220+
"Endpoint(%s)", endpointName.Request)
221+
222+
endpointName.Resolve(nil)
223+
224+
// A new message was just sent in. We'll attempt to route it to
225+
// all the endpoints that can handle it.
226+
case msgQuery := <-p.msgChan:
227+
msg := msgQuery.Request
228+
229+
// Loop through all the endpoints and send the message
230+
// to those that can handle it the message.
231+
var couldSend bool
232+
for _, endpoint := range endpoints {
233+
if endpoint.CanHandle(msg) {
234+
log.Tracef("MsgRouter: sending "+
235+
"msg %T to endpoint %s", msg,
236+
endpoint.Name())
237+
238+
sent := endpoint.SendMessage(msg)
239+
couldSend = couldSend || sent
240+
}
241+
}
242+
243+
var err error
244+
if !couldSend {
245+
log.Tracef("MsgRouter: unable to route "+
246+
"msg %T", msg)
247+
248+
err = ErrUnableToRouteMsg
249+
}
250+
251+
msgQuery.Resolve(err)
252+
253+
// A query for the endpoint state just came in, we'll send back
254+
// a copy of our current state.
255+
case endpointQuery := <-p.endpointQueries:
256+
endpointsCopy := make(EndpointsMap, len(endpoints))
257+
maps.Copy(endpointsCopy, endpoints)
258+
259+
endpointQuery.Resolve(endpointsCopy)
260+
261+
case <-p.quit:
262+
return
263+
}
264+
}
265+
}
266+
267+
// A compile time check to ensure MultiMsgRouter implements the MsgRouter
268+
// interface.
269+
var _ Router = (*MultiMsgRouter)(nil)

0 commit comments

Comments
 (0)