Skip to content

Commit ea60fe0

Browse files
committed
protofsm: add ability for state machine to consume wire msgs
In this commit, we add the ability for the state machine to consume wire messages. This'll allow the creation of a new generic message router that takes the place of the current peer `readHandler` in an upcoming commit.
1 parent ae080ed commit ea60fe0

File tree

4 files changed

+170
-17
lines changed

4 files changed

+170
-17
lines changed

protofsm/daemon_events.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"github.com/lightningnetwork/lnd/lnwire"
99
)
1010

11-
// DaemonEvent is a special event that can be emmitted by a state transition
11+
// DaemonEvent is a special event that can be emitted by a state transition
1212
// function. A state machine can use this to perform side effects, such as
1313
// sending a message to a peer, or broadcasting a transaction.
1414
type DaemonEvent interface {

protofsm/msg_mapper.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package protofsm
2+
3+
import (
4+
"github.com/lightningnetwork/lnd/fn"
5+
"github.com/lightningnetwork/lnd/lnwire"
6+
)
7+
8+
// MsgMapper is used to map incoming wire messages into a FSM event. This is
9+
// useful to decouple the translation of an outside or wire message into an
10+
// event type that can be understood by the FSM.
11+
type MsgMapper[Event any] interface {
12+
// MapMsg maps a wire message into a FSM event. If the message is not
13+
// mappable, then an error is returned.
14+
MapMsg(msg lnwire.Message) fn.Option[Event]
15+
}

protofsm/state_machine.go

Lines changed: 73 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ type State[Event any, Env Environment] interface {
6767
// emitted.
6868
ProcessEvent(event Event, env Env) (*StateTransition[Event, Env], error)
6969

70-
// IsTerminal returns true if this state is terminal, and false otherwise.
70+
// IsTerminal returns true if this state is terminal, and false
71+
// otherwise.
7172
IsTerminal() bool
7273

7374
// TODO(roasbeef): also add state serialization?
@@ -162,13 +163,17 @@ type StateMachineCfg[Event any, Env Environment] struct {
162163
// can be used to set up tracking state such as a txid confirmation
163164
// event.
164165
InitEvent fn.Option[DaemonEvent]
166+
167+
// MsgMapper is an optional message mapper that can be used to map
168+
// normal wire messages into FSM events.
169+
MsgMapper fn.Option[MsgMapper[Event]]
165170
}
166171

167172
// NewStateMachine creates a new state machine given a set of daemon adapters,
168173
// an initial state, an environment, and an event to process as if emitted at
169174
// the onset of the state machine. Such an event can be used to set up tracking
170175
// state such as a txid confirmation event.
171-
func NewStateMachine[Event any, Env Environment](cfg StateMachineCfg[Event, Env],
176+
func NewStateMachine[Event any, Env Environment](cfg StateMachineCfg[Event, Env], //nolint:lll
172177
) StateMachine[Event, Env] {
173178

174179
return StateMachine[Event, Env]{
@@ -209,6 +214,43 @@ func (s *StateMachine[Event, Env]) SendEvent(event Event) {
209214
}
210215
}
211216

217+
// CanHandle returns true if the target message can be routed to the state
218+
// machine.
219+
func (s *StateMachine[Event, Env]) CanHandle(msg lnwire.Message) bool {
220+
cfgMapper := s.cfg.MsgMapper
221+
return fn.MapOptionZ(cfgMapper, func(mapper MsgMapper[Event]) bool {
222+
return mapper.MapMsg(msg).IsSome()
223+
})
224+
}
225+
226+
// SendMessage attempts to send a wire message to the state machine. If the
227+
// message can be mapped using the default message mapper, then true is
228+
// returned indicating that the message was processed. Otherwise, false is
229+
// returned.
230+
func (s *StateMachine[Event, Env]) SendMessage(msg lnwire.Message) bool {
231+
// If we have no message mapper, then return false as we can't process
232+
// this message.
233+
if !s.cfg.MsgMapper.IsSome() {
234+
return false
235+
}
236+
237+
// Otherwise, try to map the message using the default message mapper.
238+
// If we can't extract an event, then we'll return false to indicate
239+
// that the message wasn't processed.
240+
var processed bool
241+
s.cfg.MsgMapper.WhenSome(func(mapper MsgMapper[Event]) {
242+
event := mapper.MapMsg(msg)
243+
244+
event.WhenSome(func(event Event) {
245+
s.SendEvent(event)
246+
247+
processed = true
248+
})
249+
})
250+
251+
return processed
252+
}
253+
212254
// CurrentState returns the current state of the state machine.
213255
func (s *StateMachine[Event, Env]) CurrentState() (State[Event, Env], error) {
214256
query := stateQuery[Event, Env]{
@@ -228,7 +270,9 @@ type StateSubscriber[E any, F Environment] *fn.EventReceiver[State[E, F]]
228270

229271
// RegisterStateEvents registers a new event listener that will be notified of
230272
// new state transitions.
231-
func (s *StateMachine[Event, Env]) RegisterStateEvents() StateSubscriber[Event, Env] {
273+
func (s *StateMachine[Event, Env]) RegisterStateEvents() StateSubscriber[
274+
Event, Env] {
275+
232276
subscriber := fn.NewEventReceiver[State[Event, Env]](10)
233277

234278
// TODO(roasbeef): instead give the state and the input event?
@@ -240,16 +284,17 @@ func (s *StateMachine[Event, Env]) RegisterStateEvents() StateSubscriber[Event,
240284

241285
// RemoveStateSub removes the target state subscriber from the set of active
242286
// subscribers.
243-
func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[Event, Env]) {
244-
s.newStateEvents.RemoveSubscriber(sub)
287+
func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[
288+
Event, Env]) {
289+
290+
_ = s.newStateEvents.RemoveSubscriber(sub)
245291
}
246292

247293
// executeDaemonEvent executes a daemon event, which is a special type of event
248294
// that can be emitted as part of the state transition function of the state
249295
// machine. An error is returned if the type of event is unknown.
250296
func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error {
251297
switch daemonEvent := event.(type) {
252-
253298
// This is a send message event, so we'll send the event, and also mind
254299
// any preconditions as well as post-send events.
255300
case *SendMsgEvent[Event]:
@@ -258,7 +303,8 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error {
258303
daemonEvent.TargetPeer, daemonEvent.Msgs,
259304
)
260305
if err != nil {
261-
return fmt.Errorf("unable to send msgs: %w", err)
306+
return fmt.Errorf("unable to send msgs: %w",
307+
err)
262308
}
263309

264310
// If a post-send event was specified, then we'll
@@ -303,7 +349,12 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error {
303349
)
304350

305351
if canSend {
306-
sendAndCleanUp()
352+
err := sendAndCleanUp()
353+
if err != nil {
354+
//nolint:lll
355+
log.Errorf("FSM(%v): unable to send message: %v", err)
356+
}
357+
307358
return
308359
}
309360

@@ -322,8 +373,6 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error {
322373
daemonEvent.Tx, daemonEvent.Label,
323374
)
324375
if err != nil {
325-
// TODO(roasbeef): hook has channel read event event is
326-
// hit?
327376
return fmt.Errorf("unable to broadcast txn: %w", err)
328377
}
329378

@@ -417,6 +466,8 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env],
417466
// any new emitted internal events to our event queue. This continues
418467
// until we reach a terminal state, or we run out of internal events to
419468
// process.
469+
//
470+
//nolint:lll
420471
for nextEvent := eventQueue.Dequeue(); nextEvent.IsSome(); nextEvent = eventQueue.Dequeue() {
421472
err := fn.MapOptionZ(nextEvent, func(event Event) error {
422473
// Apply the state transition function of the current
@@ -429,13 +480,17 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env],
429480
}
430481

431482
newEvents := transition.NewEvents
432-
err = fn.MapOptionZ(newEvents, func(events EmittedEvent[Event]) error {
483+
err = fn.MapOptionZ(newEvents, func(events EmittedEvent[Event]) error { //nolint:lll
433484
// With the event processed, we'll process any
434485
// new daemon events that were emitted as part
435486
// of this new state transition.
487+
//
488+
//nolint:lll
436489
err := fn.MapOptionZ(events.ExternalEvents, func(dEvents DaemonEventSet) error {
437490
for _, dEvent := range dEvents {
438-
err := s.executeDaemonEvent(dEvent)
491+
err := s.executeDaemonEvent(
492+
dEvent,
493+
)
439494
if err != nil {
440495
return err
441496
}
@@ -449,6 +504,8 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env],
449504

450505
// Next, we'll add any new emitted events to
451506
// our event queue.
507+
//
508+
//nolint:lll
452509
events.InternalEvent.WhenSome(func(inEvent Event) {
453510
eventQueue.Enqueue(inEvent)
454511
})
@@ -530,7 +587,10 @@ func (s *StateMachine[Event, Env]) driveMachine() {
530587
// An outside caller is querying our state, so we'll return the
531588
// latest state.
532589
case stateQuery := <-s.stateQuery:
533-
if !fn.SendOrQuit(stateQuery.CurrentState, currentState, s.quit) {
590+
if !fn.SendOrQuit(
591+
stateQuery.CurrentState, currentState, s.quit,
592+
) {
593+
534594
return
535595
}
536596

protofsm/state_machine_test.go

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,13 +176,17 @@ func newDaemonAdapters() *dummyAdapters {
176176
}
177177
}
178178

179-
func (d *dummyAdapters) SendMessages(pub btcec.PublicKey, msgs []lnwire.Message) error {
179+
func (d *dummyAdapters) SendMessages(pub btcec.PublicKey,
180+
msgs []lnwire.Message) error {
181+
180182
args := d.Called(pub, msgs)
181183

182184
return args.Error(0)
183185
}
184186

185-
func (d *dummyAdapters) BroadcastTransaction(tx *wire.MsgTx, label string) error {
187+
func (d *dummyAdapters) BroadcastTransaction(tx *wire.MsgTx,
188+
label string) error {
189+
186190
args := d.Called(tx, label)
187191

188192
return args.Error(0)
@@ -196,6 +200,7 @@ func (d *dummyAdapters) RegisterConfirmationsNtfn(txid *chainhash.Hash,
196200
args := d.Called(txid, pkScript, numConfs)
197201

198202
err := args.Error(0)
203+
199204
return &chainntnfs.ConfirmationEvent{
200205
Confirmed: d.confChan,
201206
}, err
@@ -388,7 +393,9 @@ func TestStateMachineDaemonEvents(t *testing.T) {
388393

389394
// As soon as we send in the daemon event, we expect the
390395
// disable+broadcast events to be processed, as they are unconditional.
391-
adapters.On("BroadcastTransaction", mock.Anything, mock.Anything).Return(nil)
396+
adapters.On(
397+
"BroadcastTransaction", mock.Anything, mock.Anything,
398+
).Return(nil)
392399
adapters.On("SendMessages", *pub2, mock.Anything).Return(nil)
393400

394401
// We'll start off by sending in the daemon event, which'll trigger the
@@ -420,3 +427,74 @@ func TestStateMachineDaemonEvents(t *testing.T) {
420427
adapters.AssertExpectations(t)
421428
env.AssertExpectations(t)
422429
}
430+
431+
type dummyMsgMapper struct {
432+
mock.Mock
433+
}
434+
435+
func (d *dummyMsgMapper) MapMsg(wireMsg lnwire.Message) fn.Option[dummyEvents] {
436+
args := d.Called(wireMsg)
437+
438+
//nolint:forcetypeassert
439+
return args.Get(0).(fn.Option[dummyEvents])
440+
}
441+
442+
// TestStateMachineMsgMapper tests that given a message mapper, we can properly
443+
// send in wire messages get mapped to FSM events.
444+
func TestStateMachineMsgMapper(t *testing.T) {
445+
// First, we'll create our state machine given the env, and our
446+
// starting state.
447+
env := &dummyEnv{}
448+
startingState := &dummyStateStart{}
449+
adapters := newDaemonAdapters()
450+
451+
// We'll also provide a message mapper that only knows how to map a
452+
// single wire message (error).
453+
dummyMapper := &dummyMsgMapper{}
454+
455+
// The only thing we know how to map is the error message, which'll
456+
// terminate the state machine.
457+
wireError := &lnwire.Error{}
458+
initMsg := &lnwire.Init{}
459+
dummyMapper.On("MapMsg", wireError).Return(
460+
fn.Some(dummyEvents(&goToFin{})),
461+
)
462+
dummyMapper.On("MapMsg", initMsg).Return(fn.None[dummyEvents]())
463+
464+
cfg := StateMachineCfg[dummyEvents, *dummyEnv]{
465+
Daemon: adapters,
466+
InitialState: startingState,
467+
Env: env,
468+
MsgMapper: fn.Some[MsgMapper[dummyEvents]](dummyMapper),
469+
}
470+
stateMachine := NewStateMachine(cfg)
471+
stateMachine.Start()
472+
defer stateMachine.Stop()
473+
474+
// As we're triggering internal events, we'll also subscribe to the set
475+
// of new states so we can assert as we go.
476+
stateSub := stateMachine.RegisterStateEvents()
477+
defer stateMachine.RemoveStateSub(stateSub)
478+
479+
// We'll still be going to a terminal state, so we expect that the
480+
// clean up method will be called.
481+
env.On("CleanUp").Return(nil)
482+
483+
// First, we'll verify that the CanHandle method works as expected.
484+
require.True(t, stateMachine.CanHandle(wireError))
485+
require.False(t, stateMachine.CanHandle(&lnwire.Init{}))
486+
487+
// Next, we'll attempt to send the wire message into the state machine.
488+
// We should transition to the final state.
489+
require.True(t, stateMachine.SendMessage(wireError))
490+
491+
// We should transition to the final state.
492+
expectedStates := []State[dummyEvents, *dummyEnv]{
493+
&dummyStateStart{}, &dummyStateFin{},
494+
}
495+
assertStateTransitions(t, stateSub, expectedStates)
496+
497+
dummyMapper.AssertExpectations(t)
498+
adapters.AssertExpectations(t)
499+
env.AssertExpectations(t)
500+
}

0 commit comments

Comments
 (0)