Skip to content

Commit 424ae09

Browse files
committed
protofsm: add ability for state machine to consume wire msgs
In this commit, we add the ability for the state machine to consume wire messages. This'll allow the creation of a new generic message router that takes the place of the current peer `readHandler` in an upcoming commit.
1 parent bf10e31 commit 424ae09

File tree

4 files changed

+166
-17
lines changed

4 files changed

+166
-17
lines changed

protofsm/daemon_events.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"github.com/lightningnetwork/lnd/lnwire"
99
)
1010

11-
// DaemonEvent is a special event that can be emmitted by a state transition
11+
// DaemonEvent is a special event that can be emitted by a state transition
1212
// function. A state machine can use this to perform side effects, such as
1313
// sending a message to a peer, or broadcasting a transaction.
1414
type DaemonEvent interface {

protofsm/msg_mapper.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package protofsm
2+
3+
import (
4+
"github.com/lightningnetwork/lnd/fn"
5+
"github.com/lightningnetwork/lnd/lnwire"
6+
)
7+
8+
// MsgMapper is used to map incoming wire messages into a FSM event. This is
9+
// useful to decouple the translation of an outside or wire message into an
10+
// event type that can be understood by the FSM.
11+
type MsgMapper[Event any] interface {
12+
// MapMsg maps a wire message into a FSM event. If the message is not
13+
// mappable, then an None is returned.
14+
MapMsg(msg lnwire.Message) fn.Option[Event]
15+
}

protofsm/state_machine.go

Lines changed: 73 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ type State[Event any, Env Environment] interface {
6464
// emitted.
6565
ProcessEvent(event Event, env Env) (*StateTransition[Event, Env], error)
6666

67-
// IsTerminal returns true if this state is terminal, and false otherwise.
67+
// IsTerminal returns true if this state is terminal, and false
68+
// otherwise.
6869
IsTerminal() bool
6970

7071
// TODO(roasbeef): also add state serialization?
@@ -159,13 +160,17 @@ type StateMachineCfg[Event any, Env Environment] struct {
159160
// can be used to set up tracking state such as a txid confirmation
160161
// event.
161162
InitEvent fn.Option[DaemonEvent]
163+
164+
// MsgMapper is an optional message mapper that can be used to map
165+
// normal wire messages into FSM events.
166+
MsgMapper fn.Option[MsgMapper[Event]]
162167
}
163168

164169
// NewStateMachine creates a new state machine given a set of daemon adapters,
165170
// an initial state, an environment, and an event to process as if emitted at
166171
// the onset of the state machine. Such an event can be used to set up tracking
167172
// state such as a txid confirmation event.
168-
func NewStateMachine[Event any, Env Environment](cfg StateMachineCfg[Event, Env],
173+
func NewStateMachine[Event any, Env Environment](cfg StateMachineCfg[Event, Env], //nolint:lll
169174
) StateMachine[Event, Env] {
170175

171176
return StateMachine[Event, Env]{
@@ -206,6 +211,43 @@ func (s *StateMachine[Event, Env]) SendEvent(event Event) {
206211
}
207212
}
208213

214+
// CanHandle returns true if the target message can be routed to the state
215+
// machine.
216+
func (s *StateMachine[Event, Env]) CanHandle(msg lnwire.Message) bool {
217+
cfgMapper := s.cfg.MsgMapper
218+
return fn.MapOptionZ(cfgMapper, func(mapper MsgMapper[Event]) bool {
219+
return mapper.MapMsg(msg).IsSome()
220+
})
221+
}
222+
223+
// SendMessage attempts to send a wire message to the state machine. If the
224+
// message can be mapped using the default message mapper, then true is
225+
// returned indicating that the message was processed. Otherwise, false is
226+
// returned.
227+
func (s *StateMachine[Event, Env]) SendMessage(msg lnwire.Message) bool {
228+
// If we have no message mapper, then return false as we can't process
229+
// this message.
230+
if !s.cfg.MsgMapper.IsSome() {
231+
return false
232+
}
233+
234+
// Otherwise, try to map the message using the default message mapper.
235+
// If we can't extract an event, then we'll return false to indicate
236+
// that the message wasn't processed.
237+
var processed bool
238+
s.cfg.MsgMapper.WhenSome(func(mapper MsgMapper[Event]) {
239+
event := mapper.MapMsg(msg)
240+
241+
event.WhenSome(func(event Event) {
242+
s.SendEvent(event)
243+
244+
processed = true
245+
})
246+
})
247+
248+
return processed
249+
}
250+
209251
// CurrentState returns the current state of the state machine.
210252
func (s *StateMachine[Event, Env]) CurrentState() (State[Event, Env], error) {
211253
query := stateQuery[Event, Env]{
@@ -225,7 +267,9 @@ type StateSubscriber[E any, F Environment] *fn.EventReceiver[State[E, F]]
225267

226268
// RegisterStateEvents registers a new event listener that will be notified of
227269
// new state transitions.
228-
func (s *StateMachine[Event, Env]) RegisterStateEvents() StateSubscriber[Event, Env] {
270+
func (s *StateMachine[Event, Env]) RegisterStateEvents() StateSubscriber[
271+
Event, Env] {
272+
229273
subscriber := fn.NewEventReceiver[State[Event, Env]](10)
230274

231275
// TODO(roasbeef): instead give the state and the input event?
@@ -237,16 +281,17 @@ func (s *StateMachine[Event, Env]) RegisterStateEvents() StateSubscriber[Event,
237281

238282
// RemoveStateSub removes the target state subscriber from the set of active
239283
// subscribers.
240-
func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[Event, Env]) {
241-
s.newStateEvents.RemoveSubscriber(sub)
284+
func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[
285+
Event, Env]) {
286+
287+
_ = s.newStateEvents.RemoveSubscriber(sub)
242288
}
243289

244290
// executeDaemonEvent executes a daemon event, which is a special type of event
245291
// that can be emitted as part of the state transition function of the state
246292
// machine. An error is returned if the type of event is unknown.
247293
func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error {
248294
switch daemonEvent := event.(type) {
249-
250295
// This is a send message event, so we'll send the event, and also mind
251296
// any preconditions as well as post-send events.
252297
case *SendMsgEvent[Event]:
@@ -255,7 +300,8 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error {
255300
daemonEvent.TargetPeer, daemonEvent.Msgs,
256301
)
257302
if err != nil {
258-
return fmt.Errorf("unable to send msgs: %w", err)
303+
return fmt.Errorf("unable to send msgs: %w",
304+
err)
259305
}
260306

261307
// If a post-send event was specified, then we'll
@@ -300,7 +346,12 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error {
300346
)
301347

302348
if canSend {
303-
sendAndCleanUp()
349+
err := sendAndCleanUp()
350+
if err != nil {
351+
//nolint:lll
352+
log.Errorf("FSM(%v): unable to send message: %v", err)
353+
}
354+
304355
return
305356
}
306357

@@ -319,8 +370,6 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error {
319370
daemonEvent.Tx, daemonEvent.Label,
320371
)
321372
if err != nil {
322-
// TODO(roasbeef): hook has channel read event event is
323-
// hit?
324373
return fmt.Errorf("unable to broadcast txn: %w", err)
325374
}
326375

@@ -414,6 +463,8 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env],
414463
// any new emitted internal events to our event queue. This continues
415464
// until we reach a terminal state, or we run out of internal events to
416465
// process.
466+
//
467+
//nolint:lll
417468
for nextEvent := eventQueue.Dequeue(); nextEvent.IsSome(); nextEvent = eventQueue.Dequeue() {
418469
err := fn.MapOptionZ(nextEvent, func(event Event) error {
419470
// Apply the state transition function of the current
@@ -426,13 +477,17 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env],
426477
}
427478

428479
newEvents := transition.NewEvents
429-
err = fn.MapOptionZ(newEvents, func(events EmittedEvent[Event]) error {
480+
err = fn.MapOptionZ(newEvents, func(events EmittedEvent[Event]) error { //nolint:lll
430481
// With the event processed, we'll process any
431482
// new daemon events that were emitted as part
432483
// of this new state transition.
484+
//
485+
//nolint:lll
433486
err := fn.MapOptionZ(events.ExternalEvents, func(dEvents DaemonEventSet) error {
434487
for _, dEvent := range dEvents {
435-
err := s.executeDaemonEvent(dEvent)
488+
err := s.executeDaemonEvent(
489+
dEvent,
490+
)
436491
if err != nil {
437492
return err
438493
}
@@ -446,6 +501,8 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env],
446501

447502
// Next, we'll add any new emitted events to
448503
// our event queue.
504+
//
505+
//nolint:lll
449506
events.InternalEvent.WhenSome(func(inEvent Event) {
450507
eventQueue.Enqueue(inEvent)
451508
})
@@ -516,7 +573,10 @@ func (s *StateMachine[Event, Env]) driveMachine() {
516573
// An outside caller is querying our state, so we'll return the
517574
// latest state.
518575
case stateQuery := <-s.stateQuery:
519-
if !fn.SendOrQuit(stateQuery.CurrentState, currentState, s.quit) {
576+
if !fn.SendOrQuit(
577+
stateQuery.CurrentState, currentState, s.quit,
578+
) {
579+
520580
return
521581
}
522582

protofsm/state_machine_test.go

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,13 +174,17 @@ func newDaemonAdapters() *dummyAdapters {
174174
}
175175
}
176176

177-
func (d *dummyAdapters) SendMessages(pub btcec.PublicKey, msgs []lnwire.Message) error {
177+
func (d *dummyAdapters) SendMessages(pub btcec.PublicKey,
178+
msgs []lnwire.Message) error {
179+
178180
args := d.Called(pub, msgs)
179181

180182
return args.Error(0)
181183
}
182184

183-
func (d *dummyAdapters) BroadcastTransaction(tx *wire.MsgTx, label string) error {
185+
func (d *dummyAdapters) BroadcastTransaction(tx *wire.MsgTx,
186+
label string) error {
187+
184188
args := d.Called(tx, label)
185189

186190
return args.Error(0)
@@ -194,6 +198,7 @@ func (d *dummyAdapters) RegisterConfirmationsNtfn(txid *chainhash.Hash,
194198
args := d.Called(txid, pkScript, numConfs)
195199

196200
err := args.Error(0)
201+
197202
return &chainntnfs.ConfirmationEvent{
198203
Confirmed: d.confChan,
199204
}, err
@@ -342,7 +347,9 @@ func TestStateMachineDaemonEvents(t *testing.T) {
342347

343348
// As soon as we send in the daemon event, we expect the
344349
// disable+broadcast events to be processed, as they are unconditional.
345-
adapters.On("BroadcastTransaction", mock.Anything, mock.Anything).Return(nil)
350+
adapters.On(
351+
"BroadcastTransaction", mock.Anything, mock.Anything,
352+
).Return(nil)
346353
adapters.On("SendMessages", *pub2, mock.Anything).Return(nil)
347354

348355
// We'll start off by sending in the daemon event, which'll trigger the
@@ -374,3 +381,70 @@ func TestStateMachineDaemonEvents(t *testing.T) {
374381
adapters.AssertExpectations(t)
375382
env.AssertExpectations(t)
376383
}
384+
385+
type dummyMsgMapper struct {
386+
mock.Mock
387+
}
388+
389+
func (d *dummyMsgMapper) MapMsg(wireMsg lnwire.Message) fn.Option[dummyEvents] {
390+
args := d.Called(wireMsg)
391+
392+
//nolint:forcetypeassert
393+
return args.Get(0).(fn.Option[dummyEvents])
394+
}
395+
396+
// TestStateMachineMsgMapper tests that given a message mapper, we can properly
397+
// send in wire messages get mapped to FSM events.
398+
func TestStateMachineMsgMapper(t *testing.T) {
399+
// First, we'll create our state machine given the env, and our
400+
// starting state.
401+
env := &dummyEnv{}
402+
startingState := &dummyStateStart{}
403+
adapters := newDaemonAdapters()
404+
405+
// We'll also provide a message mapper that only knows how to map a
406+
// single wire message (error).
407+
dummyMapper := &dummyMsgMapper{}
408+
409+
// The only thing we know how to map is the error message, which'll
410+
// terminate the state machine.
411+
wireError := &lnwire.Error{}
412+
initMsg := &lnwire.Init{}
413+
dummyMapper.On("MapMsg", wireError).Return(
414+
fn.Some(dummyEvents(&goToFin{})),
415+
)
416+
dummyMapper.On("MapMsg", initMsg).Return(fn.None[dummyEvents]())
417+
418+
cfg := StateMachineCfg[dummyEvents, *dummyEnv]{
419+
Daemon: adapters,
420+
InitialState: startingState,
421+
Env: env,
422+
MsgMapper: fn.Some[MsgMapper[dummyEvents]](dummyMapper),
423+
}
424+
stateMachine := NewStateMachine(cfg)
425+
stateMachine.Start()
426+
defer stateMachine.Stop()
427+
428+
// As we're triggering internal events, we'll also subscribe to the set
429+
// of new states so we can assert as we go.
430+
stateSub := stateMachine.RegisterStateEvents()
431+
defer stateMachine.RemoveStateSub(stateSub)
432+
433+
// First, we'll verify that the CanHandle method works as expected.
434+
require.True(t, stateMachine.CanHandle(wireError))
435+
require.False(t, stateMachine.CanHandle(&lnwire.Init{}))
436+
437+
// Next, we'll attempt to send the wire message into the state machine.
438+
// We should transition to the final state.
439+
require.True(t, stateMachine.SendMessage(wireError))
440+
441+
// We should transition to the final state.
442+
expectedStates := []State[dummyEvents, *dummyEnv]{
443+
&dummyStateStart{}, &dummyStateFin{},
444+
}
445+
assertStateTransitions(t, stateSub, expectedStates)
446+
447+
dummyMapper.AssertExpectations(t)
448+
adapters.AssertExpectations(t)
449+
env.AssertExpectations(t)
450+
}

0 commit comments

Comments
 (0)