Skip to content

Commit 1a2ee8a

Browse files
committed
multi: add LndFsmDaemonAdapters for state machine manager
Introduce LndFsmDaemonAdapters, a struct implementing the protofsm.DaemonAdapters interface. This is passed into supplycommit.NewMultiStateMachineManager, which then propagates it to each asset group-specific state machine.
1 parent 886aba2 commit 1a2ee8a

File tree

5 files changed

+231
-7
lines changed

5 files changed

+231
-7
lines changed

chain_bridge.go

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7+
"sync"
8+
"time"
79

810
"github.com/btcsuite/btcd/btcec/v2"
911
"github.com/btcsuite/btcd/chaincfg/chainhash"
@@ -22,6 +24,7 @@ import (
2224
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
2325
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
2426
"github.com/lightningnetwork/lnd/lnwire"
27+
"github.com/lightningnetwork/lnd/protofsm"
2528
"github.com/lightningnetwork/lnd/routing/route"
2629
)
2730

@@ -31,6 +34,10 @@ const (
3134
// 3200kB of memory (4 bytes for the block height and 4 bytes for the
3235
// timestamp, not including any map/cache overhead).
3336
maxNumBlocksInCache = 400_000
37+
38+
// DefaultTimeout is the default timeout we use for RPC and database
39+
// operations.
40+
DefaultTimeout = 30 * time.Second
3441
)
3542

3643
// cacheableTimestamp is a wrapper around an uint32 that can be used as a value
@@ -458,3 +465,184 @@ func (l *LndInvoicesClient) HtlcModifier(ctx context.Context,
458465
// Ensure LndInvoicesClient implements the tapchannel.InvoiceHtlcModifier
459466
// interface.
460467
var _ tapchannel.InvoiceHtlcModifier = (*LndInvoicesClient)(nil)
468+
469+
// LndFsmDaemonAdapters is a struct that implements the protofsm.DaemonAdapters
470+
// interface.
471+
type LndFsmDaemonAdapters struct {
472+
// lnd is the LND services client that will be used to interact with
473+
// the LND node.
474+
lnd *lndclient.LndServices
475+
476+
// retryConfig is the retry configuration that will be used for
477+
// operations that may fail due to temporary issues, such as network
478+
// errors or RPC timeouts.
479+
retryConfig fn.RetryConfig
480+
481+
// msgTransport is the message transport client that will be used to
482+
// send messages to peers.
483+
msgTransport LndMsgTransportClient
484+
485+
// chainBridge is the chain bridge that will be used to interact with
486+
// the blockchain.
487+
chainBridge LndRpcChainBridge
488+
489+
// ContextGuard manages the context and quit channel for this service.
490+
fn.ContextGuard
491+
492+
startOnce sync.Once
493+
stopOnce sync.Once
494+
}
495+
496+
// NewLndFsmDaemonAdapters creates a new instance of LndFsmDaemonAdapters.
497+
func NewLndFsmDaemonAdapters(lnd *lndclient.LndServices) *LndFsmDaemonAdapters {
498+
retryConfig := fn.DefaultRetryConfig()
499+
500+
msgTransport := NewLndMsgTransportClient(lnd)
501+
502+
// Initialize the chain bridge without the asset store, as it is not
503+
// needed for the FSM adapters.
504+
chainBridge := NewLndRpcChainBridge(lnd, nil)
505+
chainBridge.retryConfig = retryConfig
506+
507+
return &LndFsmDaemonAdapters{
508+
lnd: lnd,
509+
retryConfig: retryConfig,
510+
msgTransport: *msgTransport,
511+
chainBridge: *chainBridge,
512+
ContextGuard: fn.ContextGuard{
513+
DefaultTimeout: DefaultTimeout,
514+
Quit: make(chan struct{}),
515+
},
516+
}
517+
}
518+
519+
// Start attempts to start the service.
520+
func (l *LndFsmDaemonAdapters) Start() error {
521+
l.startOnce.Do(func() {})
522+
return nil
523+
}
524+
525+
// Stop signals for the service to gracefully exit.
526+
func (l *LndFsmDaemonAdapters) Stop() error {
527+
l.stopOnce.Do(func() {
528+
// Signal the quit channel which will cancel all active
529+
// contexts.
530+
close(l.Quit)
531+
})
532+
533+
return nil
534+
}
535+
536+
// SendMessages sends a slice of lnwire.Message to the peer with the given
537+
// public key.
538+
func (l *LndFsmDaemonAdapters) SendMessages(peer btcec.PublicKey,
539+
messages []lnwire.Message) error {
540+
541+
// Convert messages to a slice of CustomMessage.
542+
customMessages := make([]lndclient.CustomMessage, 0, len(messages))
543+
for idx := range messages {
544+
msg := messages[idx]
545+
546+
var buf bytes.Buffer
547+
if err := msg.Encode(&buf, 0); err != nil {
548+
return fmt.Errorf("unable to encode message: %w", err)
549+
}
550+
551+
customMsg := lndclient.CustomMessage{
552+
Peer: route.NewVertex(&peer),
553+
MsgType: uint32(msg.MsgType()),
554+
Data: buf.Bytes(),
555+
}
556+
557+
customMessages = append(customMessages, customMsg)
558+
}
559+
560+
ctx, cancel := l.WithCtxQuitNoTimeout()
561+
defer cancel()
562+
563+
// Send each message in turn.
564+
for idx := range customMessages {
565+
msg := customMessages[idx]
566+
567+
err := l.msgTransport.SendCustomMessage(ctx, msg)
568+
if err != nil {
569+
return fmt.Errorf("unable to send custom message: %w",
570+
err)
571+
}
572+
}
573+
574+
return nil
575+
}
576+
577+
// BroadcastTransaction attempts to broadcast a transaction to the
578+
// network. It uses the chain bridge to publish the transaction.
579+
func (l *LndFsmDaemonAdapters) BroadcastTransaction(tx *wire.MsgTx,
580+
label string) error {
581+
582+
ctx, cancel := l.WithCtxQuitNoTimeout()
583+
defer cancel()
584+
585+
return l.chainBridge.PublishTransaction(ctx, tx, label)
586+
}
587+
588+
// RegisterConfirmationsNtfn registers an intent to be notified once the
589+
// transaction with the given txid reaches the specified number of
590+
// confirmations.
591+
func (l *LndFsmDaemonAdapters) RegisterConfirmationsNtfn(
592+
txid *chainhash.Hash, pkScript []byte, numConfs uint32,
593+
heightHint uint32,
594+
optFuncs ...chainntnfs.NotifierOption) (*chainntnfs.ConfirmationEvent,
595+
error) {
596+
597+
opts := chainntnfs.DefaultNotifierOptions()
598+
for _, optFunc := range optFuncs {
599+
optFunc(opts)
600+
}
601+
602+
lndCliOpt := make([]lndclient.NotifierOption, 0, len(optFuncs))
603+
if opts.IncludeBlock {
604+
lndCliOpt = append(lndCliOpt, lndclient.WithIncludeBlock())
605+
}
606+
607+
ctx, cancel := l.WithCtxQuitNoTimeout()
608+
spendDetail, _, err := l.lnd.ChainNotifier.RegisterConfirmationsNtfn(
609+
ctx, txid, pkScript, int32(numConfs), int32(heightHint),
610+
lndCliOpt...,
611+
)
612+
if err != nil {
613+
return nil, fmt.Errorf("unable to register for conf: %w", err)
614+
}
615+
616+
return &chainntnfs.ConfirmationEvent{
617+
Confirmed: spendDetail,
618+
Updates: make(chan uint32),
619+
NegativeConf: make(chan int32),
620+
Done: make(chan struct{}),
621+
Cancel: cancel,
622+
}, nil
623+
}
624+
625+
// RegisterSpendNtfn registers an intent to be notified once the outpoint
626+
// is spent on-chain.
627+
func (l *LndFsmDaemonAdapters) RegisterSpendNtfn(outpoint *wire.OutPoint,
628+
pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) {
629+
630+
ctx, cancel := l.WithCtxQuitNoTimeout()
631+
spendDetail, _, err := l.lnd.ChainNotifier.RegisterSpendNtfn(
632+
ctx, outpoint, pkScript, int32(heightHint),
633+
)
634+
if err != nil {
635+
return nil, fmt.Errorf("unable to register for spend: %w", err)
636+
}
637+
638+
return &chainntnfs.SpendEvent{
639+
Spend: spendDetail,
640+
Reorg: make(chan struct{}, 1),
641+
Done: make(chan struct{}, 1),
642+
Cancel: cancel,
643+
}, nil
644+
}
645+
646+
// Ensure LndFsmDaemonAdapters implements the protofsm.DaemonAdapters
647+
// interface.
648+
var _ protofsm.DaemonAdapters = (*LndFsmDaemonAdapters)(nil)

config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,10 @@ type Config struct {
187187

188188
ChainPorter tapfreighter.Porter
189189

190+
// FsmDaemonAdapters is a set of adapters that allow a state machine to
191+
// interact with external daemons.
192+
FsmDaemonAdapters *LndFsmDaemonAdapters
193+
190194
// SupplyCommitManager is a service that is used to manage supply
191195
// commitments for assets. Supply commitments are issuer published
192196
// attestations of the total supply of an asset.

server.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,12 @@ func (s *Server) initialize(interceptorChain *rpcperms.InterceptorChain) error {
211211
return fmt.Errorf("unable to start RFQ manager: %w", err)
212212
}
213213

214+
// Start FSM daemon adapters.
215+
if err := s.cfg.FsmDaemonAdapters.Start(); err != nil {
216+
return fmt.Errorf("unable to start FSM daemon adapters: %w",
217+
err)
218+
}
219+
214220
// Start universe supply commitment manager.
215221
if err := s.cfg.SupplyCommitManager.Start(); err != nil {
216222
return fmt.Errorf("unable to start supply commit manager: %w",
@@ -697,6 +703,12 @@ func (s *Server) Stop() error {
697703
return err
698704
}
699705

706+
// Stop FSM daemon adapters.
707+
if err := s.cfg.FsmDaemonAdapters.Stop(); err != nil {
708+
return fmt.Errorf("unable to stop FSM daemon adapters: %w",
709+
err)
710+
}
711+
700712
// Stop universe supply commitment manager.
701713
if err := s.cfg.SupplyCommitManager.Stop(); err != nil {
702714
return fmt.Errorf("unable to stop supply commit manager: %w",

tapcfg/server.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
111111
lndRouterClient := tap.NewLndRouterClient(lndServices)
112112
lndInvoicesClient := tap.NewLndInvoicesClient(lndServices)
113113
lndFeatureBitsVerifier := tap.NewLndFeatureBitVerifier(lndServices)
114+
lndFsmDaemonAdapters := tap.NewLndFsmDaemonAdapters(lndServices)
114115

115116
uniDB := tapdb.NewTransactionExecutor(
116117
db, func(tx *sql.Tx) tapdb.BaseUniverseStore {
@@ -608,13 +609,14 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
608609
// manage the supply commitment state machines for each asset group.
609610
supplyCommitManager := supplycommit.NewMultiStateMachineManager(
610611
supplycommit.MultiStateMachineManagerCfg{
611-
TreeView: supplyTreeStore,
612-
Commitments: supplyCommitStore,
613-
Wallet: walletAnchor,
614-
KeyRing: keyRing,
615-
Chain: chainBridge,
616-
StateLog: supplyCommitStore,
617-
ChainParams: *tapChainParams.Params,
612+
TreeView: supplyTreeStore,
613+
Commitments: supplyCommitStore,
614+
Wallet: walletAnchor,
615+
KeyRing: keyRing,
616+
Chain: chainBridge,
617+
DaemonAdapters: lndFsmDaemonAdapters,
618+
StateLog: supplyCommitStore,
619+
ChainParams: *tapChainParams.Params,
618620
},
619621
)
620622

@@ -668,6 +670,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
668670
AssetWallet: assetWallet,
669671
CoinSelect: coinSelect,
670672
ChainPorter: chainPorter,
673+
FsmDaemonAdapters: lndFsmDaemonAdapters,
671674
SupplyCommitManager: supplyCommitManager,
672675
UniverseArchive: uniArchive,
673676
UniverseSyncer: universeSyncer,

universe/supplycommit/multi_sm_manager.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,18 @@ const (
2121
DefaultTimeout = 30 * time.Second
2222
)
2323

24+
// DaemonAdapters is a wrapper around the protofsm.DaemonAdapters interface
25+
// with the addition of Start and Stop methods.
26+
type DaemonAdapters interface {
27+
protofsm.DaemonAdapters
28+
29+
// Start starts the daemon adapters handler service.
30+
Start() error
31+
32+
// Stop stops the daemon adapters handler service.
33+
Stop() error
34+
}
35+
2436
// MultiStateMachineManagerCfg is the configuration for the
2537
// MultiStateMachineManager. It contains all the dependencies needed to
2638
// manage multiple supply commitment state machines, one for each asset group.
@@ -45,6 +57,10 @@ type MultiStateMachineManagerCfg struct {
4557
// TODO(roasbeef): can make a slimmer version of
4658
Chain tapgarden.ChainBridge
4759

60+
// DaemonAdapters is a set of adapters that allow the state machine to
61+
// interact with external daemons whilst processing internal events.
62+
DaemonAdapters DaemonAdapters
63+
4864
// StateLog is the main state log that is used to track the state of the
4965
// state machine. This is used to persist the state of the state machine
5066
// across restarts.
@@ -150,6 +166,7 @@ func (m *MultiStateMachineManager) fetchStateMachine(
150166
fsmCfg := protofsm.StateMachineCfg[Event, *Environment]{
151167
InitialState: initialState,
152168
Env: env,
169+
Daemon: m.cfg.DaemonAdapters,
153170
}
154171
newSm := protofsm.NewStateMachine[Event, *Environment](fsmCfg)
155172

0 commit comments

Comments
 (0)