Skip to content

Commit c4bb5bc

Browse files
authored
Merge pull request #1643 from lightninglabs/wip/supplycommit/add-multi-sm-manager
supplycommit: add MultiStateMachineManager for managing group state machines
2 parents 751bd94 + a3cbff2 commit c4bb5bc

File tree

13 files changed

+849
-48
lines changed

13 files changed

+849
-48
lines changed

config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/lightninglabs/taproot-assets/tapfreighter"
1818
"github.com/lightninglabs/taproot-assets/tapgarden"
1919
"github.com/lightninglabs/taproot-assets/universe"
20+
"github.com/lightninglabs/taproot-assets/universe/supplycommit"
2021
"github.com/lightningnetwork/lnd"
2122
"github.com/lightningnetwork/lnd/build"
2223
"github.com/lightningnetwork/lnd/signal"
@@ -186,6 +187,15 @@ type Config struct {
186187

187188
ChainPorter tapfreighter.Porter
188189

190+
// FsmDaemonAdapters is a set of adapters that allow a state machine to
191+
// interact with external daemons.
192+
FsmDaemonAdapters *LndFsmDaemonAdapters
193+
194+
// SupplyCommitManager is a service that is used to manage supply
195+
// commitments for assets. Supply commitments are issuer published
196+
// attestations of the total supply of an asset.
197+
SupplyCommitManager *supplycommit.MultiStateMachineManager
198+
189199
UniverseArchive *universe.Archive
190200

191201
UniverseSyncer universe.Syncer

chain_bridge.go renamed to lnd_services.go

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7+
"sync"
8+
"time"
79

810
"github.com/btcsuite/btcd/btcec/v2"
911
"github.com/btcsuite/btcd/chaincfg/chainhash"
@@ -22,6 +24,7 @@ import (
2224
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
2325
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
2426
"github.com/lightningnetwork/lnd/lnwire"
27+
"github.com/lightningnetwork/lnd/protofsm"
2528
"github.com/lightningnetwork/lnd/routing/route"
2629
)
2730

@@ -31,6 +34,10 @@ const (
3134
// 3200kB of memory (4 bytes for the block height and 4 bytes for the
3235
// timestamp, not including any map/cache overhead).
3336
maxNumBlocksInCache = 400_000
37+
38+
// DefaultTimeout is the default timeout we use for RPC and database
39+
// operations.
40+
DefaultTimeout = 30 * time.Second
3441
)
3542

3643
// cacheableTimestamp is a wrapper around an uint32 that can be used as a value
@@ -458,3 +465,184 @@ func (l *LndInvoicesClient) HtlcModifier(ctx context.Context,
458465
// Ensure LndInvoicesClient implements the tapchannel.InvoiceHtlcModifier
459466
// interface.
460467
var _ tapchannel.InvoiceHtlcModifier = (*LndInvoicesClient)(nil)
468+
469+
// LndFsmDaemonAdapters is a struct that implements the protofsm.DaemonAdapters
470+
// interface.
471+
type LndFsmDaemonAdapters struct {
472+
// lnd is the LND services client that will be used to interact with
473+
// the LND node.
474+
lnd *lndclient.LndServices
475+
476+
// retryConfig is the retry configuration that will be used for
477+
// operations that may fail due to temporary issues, such as network
478+
// errors or RPC timeouts.
479+
retryConfig fn.RetryConfig
480+
481+
// msgTransport is the message transport client that will be used to
482+
// send messages to peers.
483+
msgTransport LndMsgTransportClient
484+
485+
// chainBridge is the chain bridge that will be used to interact with
486+
// the blockchain.
487+
chainBridge LndRpcChainBridge
488+
489+
// ContextGuard manages the context and quit channel for this service.
490+
fn.ContextGuard
491+
492+
startOnce sync.Once
493+
stopOnce sync.Once
494+
}
495+
496+
// NewLndFsmDaemonAdapters creates a new instance of LndFsmDaemonAdapters.
497+
func NewLndFsmDaemonAdapters(lnd *lndclient.LndServices) *LndFsmDaemonAdapters {
498+
retryConfig := fn.DefaultRetryConfig()
499+
500+
msgTransport := NewLndMsgTransportClient(lnd)
501+
502+
// Initialize the chain bridge without the asset store, as it is not
503+
// needed for the FSM adapters.
504+
chainBridge := NewLndRpcChainBridge(lnd, nil)
505+
chainBridge.retryConfig = retryConfig
506+
507+
return &LndFsmDaemonAdapters{
508+
lnd: lnd,
509+
retryConfig: retryConfig,
510+
msgTransport: *msgTransport,
511+
chainBridge: *chainBridge,
512+
ContextGuard: fn.ContextGuard{
513+
DefaultTimeout: DefaultTimeout,
514+
Quit: make(chan struct{}),
515+
},
516+
}
517+
}
518+
519+
// Start attempts to start the service.
520+
func (l *LndFsmDaemonAdapters) Start() error {
521+
l.startOnce.Do(func() {})
522+
return nil
523+
}
524+
525+
// Stop signals for the service to gracefully exit.
526+
func (l *LndFsmDaemonAdapters) Stop() error {
527+
l.stopOnce.Do(func() {
528+
// Signal the quit channel which will cancel all active
529+
// contexts.
530+
close(l.Quit)
531+
})
532+
533+
return nil
534+
}
535+
536+
// SendMessages sends a slice of lnwire.Message to the peer with the given
537+
// public key.
538+
func (l *LndFsmDaemonAdapters) SendMessages(peer btcec.PublicKey,
539+
messages []lnwire.Message) error {
540+
541+
// Convert messages to a slice of CustomMessage.
542+
customMessages := make([]lndclient.CustomMessage, 0, len(messages))
543+
for idx := range messages {
544+
msg := messages[idx]
545+
546+
var buf bytes.Buffer
547+
if err := msg.Encode(&buf, 0); err != nil {
548+
return fmt.Errorf("unable to encode message: %w", err)
549+
}
550+
551+
customMsg := lndclient.CustomMessage{
552+
Peer: route.NewVertex(&peer),
553+
MsgType: uint32(msg.MsgType()),
554+
Data: buf.Bytes(),
555+
}
556+
557+
customMessages = append(customMessages, customMsg)
558+
}
559+
560+
ctx, cancel := l.WithCtxQuitNoTimeout()
561+
defer cancel()
562+
563+
// Send each message in turn.
564+
for idx := range customMessages {
565+
msg := customMessages[idx]
566+
567+
err := l.msgTransport.SendCustomMessage(ctx, msg)
568+
if err != nil {
569+
return fmt.Errorf("unable to send custom message: %w",
570+
err)
571+
}
572+
}
573+
574+
return nil
575+
}
576+
577+
// BroadcastTransaction attempts to broadcast a transaction to the
578+
// network. It uses the chain bridge to publish the transaction.
579+
func (l *LndFsmDaemonAdapters) BroadcastTransaction(tx *wire.MsgTx,
580+
label string) error {
581+
582+
ctx, cancel := l.WithCtxQuitNoTimeout()
583+
defer cancel()
584+
585+
return l.chainBridge.PublishTransaction(ctx, tx, label)
586+
}
587+
588+
// RegisterConfirmationsNtfn registers an intent to be notified once the
589+
// transaction with the given txid reaches the specified number of
590+
// confirmations.
591+
func (l *LndFsmDaemonAdapters) RegisterConfirmationsNtfn(
592+
txid *chainhash.Hash, pkScript []byte, numConfs uint32,
593+
heightHint uint32,
594+
optFuncs ...chainntnfs.NotifierOption) (*chainntnfs.ConfirmationEvent,
595+
error) {
596+
597+
opts := chainntnfs.DefaultNotifierOptions()
598+
for _, optFunc := range optFuncs {
599+
optFunc(opts)
600+
}
601+
602+
lndCliOpt := make([]lndclient.NotifierOption, 0, len(optFuncs))
603+
if opts.IncludeBlock {
604+
lndCliOpt = append(lndCliOpt, lndclient.WithIncludeBlock())
605+
}
606+
607+
ctx, cancel := l.WithCtxQuitNoTimeout()
608+
spendDetail, _, err := l.lnd.ChainNotifier.RegisterConfirmationsNtfn(
609+
ctx, txid, pkScript, int32(numConfs), int32(heightHint),
610+
lndCliOpt...,
611+
)
612+
if err != nil {
613+
return nil, fmt.Errorf("unable to register for conf: %w", err)
614+
}
615+
616+
return &chainntnfs.ConfirmationEvent{
617+
Confirmed: spendDetail,
618+
Updates: make(chan uint32),
619+
NegativeConf: make(chan int32),
620+
Done: make(chan struct{}),
621+
Cancel: cancel,
622+
}, nil
623+
}
624+
625+
// RegisterSpendNtfn registers an intent to be notified once the outpoint
626+
// is spent on-chain.
627+
func (l *LndFsmDaemonAdapters) RegisterSpendNtfn(outpoint *wire.OutPoint,
628+
pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) {
629+
630+
ctx, cancel := l.WithCtxQuitNoTimeout()
631+
spendDetail, _, err := l.lnd.ChainNotifier.RegisterSpendNtfn(
632+
ctx, outpoint, pkScript, int32(heightHint),
633+
)
634+
if err != nil {
635+
return nil, fmt.Errorf("unable to register for spend: %w", err)
636+
}
637+
638+
return &chainntnfs.SpendEvent{
639+
Spend: spendDetail,
640+
Reorg: make(chan struct{}, 1),
641+
Done: make(chan struct{}, 1),
642+
Cancel: cancel,
643+
}, nil
644+
}
645+
646+
// Ensure LndFsmDaemonAdapters implements the protofsm.DaemonAdapters
647+
// interface.
648+
var _ protofsm.DaemonAdapters = (*LndFsmDaemonAdapters)(nil)

log.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/lightninglabs/taproot-assets/tapgarden"
1515
"github.com/lightninglabs/taproot-assets/tapsend"
1616
"github.com/lightninglabs/taproot-assets/universe"
17+
"github.com/lightninglabs/taproot-assets/universe/supplycommit"
1718
"github.com/lightningnetwork/lnd/build"
1819
"github.com/lightningnetwork/lnd/signal"
1920
)
@@ -109,6 +110,10 @@ func SetupLoggers(root *build.SubLoggerManager,
109110
AddSubLogger(root, address.Subsystem, interceptor, address.UseLogger)
110111
AddSubLogger(root, tapsend.Subsystem, interceptor, tapsend.UseLogger)
111112
AddSubLogger(root, universe.Subsystem, interceptor, universe.UseLogger)
113+
AddSubLogger(
114+
root, supplycommit.Subsystem, interceptor,
115+
supplycommit.UseLogger,
116+
)
112117
AddSubLogger(
113118
root, commitment.Subsystem, interceptor, commitment.UseLogger,
114119
)

server.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,18 @@ func (s *Server) initialize(interceptorChain *rpcperms.InterceptorChain) error {
211211
return fmt.Errorf("unable to start RFQ manager: %w", err)
212212
}
213213

214+
// Start FSM daemon adapters.
215+
if err := s.cfg.FsmDaemonAdapters.Start(); err != nil {
216+
return fmt.Errorf("unable to start FSM daemon adapters: %w",
217+
err)
218+
}
219+
220+
// Start universe supply commitment manager.
221+
if err := s.cfg.SupplyCommitManager.Start(); err != nil {
222+
return fmt.Errorf("unable to start supply commit manager: %w",
223+
err)
224+
}
225+
214226
// Start the auxiliary components.
215227
if err := s.cfg.AuxLeafSigner.Start(); err != nil {
216228
return fmt.Errorf("unable to start aux leaf signer: %w", err)
@@ -691,6 +703,18 @@ func (s *Server) Stop() error {
691703
return err
692704
}
693705

706+
// Stop FSM daemon adapters.
707+
if err := s.cfg.FsmDaemonAdapters.Stop(); err != nil {
708+
return fmt.Errorf("unable to stop FSM daemon adapters: %w",
709+
err)
710+
}
711+
712+
// Stop universe supply commitment manager.
713+
if err := s.cfg.SupplyCommitManager.Stop(); err != nil {
714+
return fmt.Errorf("unable to stop supply commit manager: %w",
715+
err)
716+
}
717+
694718
if err := s.cfg.AuxLeafSigner.Stop(); err != nil {
695719
return err
696720
}

tapcfg/server.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/lightninglabs/taproot-assets/tapgarden"
2424
"github.com/lightninglabs/taproot-assets/tapscript"
2525
"github.com/lightninglabs/taproot-assets/universe"
26+
"github.com/lightninglabs/taproot-assets/universe/supplycommit"
2627
"github.com/lightningnetwork/lnd"
2728
"github.com/lightningnetwork/lnd/clock"
2829
"github.com/lightningnetwork/lnd/signal"
@@ -110,6 +111,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
110111
lndRouterClient := tap.NewLndRouterClient(lndServices)
111112
lndInvoicesClient := tap.NewLndInvoicesClient(lndServices)
112113
lndFeatureBitsVerifier := tap.NewLndFeatureBitVerifier(lndServices)
114+
lndFsmDaemonAdapters := tap.NewLndFsmDaemonAdapters(lndServices)
113115

114116
uniDB := tapdb.NewTransactionExecutor(
115117
db, func(tx *sql.Tx) tapdb.BaseUniverseStore {
@@ -161,6 +163,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
161163
groupVerifier := tapgarden.GenGroupVerifier(
162164
context.Background(), assetMintingStore,
163165
)
166+
164167
uniArchiveCfg := universe.ArchiveConfig{
165168
// nolint: lll
166169
NewBaseTree: func(id universe.Identifier) universe.StorageBackend {
@@ -583,6 +586,40 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
583586
"access status: %w", err)
584587
}
585588

589+
// Construct the supply commit manager, which is used to
590+
// formulate universe supply commitment transactions.
591+
//
592+
// Construct database backends for the supply commitment state machines.
593+
supplyCommitDb := tapdb.NewTransactionExecutor(
594+
db, func(tx *sql.Tx) tapdb.SupplyCommitStore {
595+
return db.WithTx(tx)
596+
},
597+
)
598+
supplyCommitStore := tapdb.NewSupplyCommitMachine(supplyCommitDb)
599+
600+
// Construct the supply tree database backend.
601+
supplyTreeDb := tapdb.NewTransactionExecutor(
602+
db, func(tx *sql.Tx) tapdb.BaseUniverseStore {
603+
return db.WithTx(tx)
604+
},
605+
)
606+
supplyTreeStore := tapdb.NewSupplyTreeStore(supplyTreeDb)
607+
608+
// Create the supply commitment state machine manager, which is used to
609+
// manage the supply commitment state machines for each asset group.
610+
supplyCommitManager := supplycommit.NewMultiStateMachineManager(
611+
supplycommit.MultiStateMachineManagerCfg{
612+
TreeView: supplyTreeStore,
613+
Commitments: supplyCommitStore,
614+
Wallet: walletAnchor,
615+
KeyRing: keyRing,
616+
Chain: chainBridge,
617+
DaemonAdapters: lndFsmDaemonAdapters,
618+
StateLog: supplyCommitStore,
619+
ChainParams: *tapChainParams.Params,
620+
},
621+
)
622+
586623
return &tap.Config{
587624
DebugLevel: cfg.DebugLevel,
588625
RuntimeID: runtimeID,
@@ -633,6 +670,8 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
633670
AssetWallet: assetWallet,
634671
CoinSelect: coinSelect,
635672
ChainPorter: chainPorter,
673+
FsmDaemonAdapters: lndFsmDaemonAdapters,
674+
SupplyCommitManager: supplyCommitManager,
636675
UniverseArchive: uniArchive,
637676
UniverseSyncer: universeSyncer,
638677
UniverseFederation: universeFederation,

0 commit comments

Comments
 (0)