@@ -14,6 +14,7 @@ import (
1414 "time"
1515
1616 "github.com/btcsuite/btcd/btcec/v2"
17+ "github.com/btcsuite/btcd/chaincfg/chainhash"
1718 "github.com/lightningnetwork/lnd/batch"
1819 "github.com/lightningnetwork/lnd/graph/db/models"
1920 "github.com/lightningnetwork/lnd/lnwire"
@@ -75,10 +76,19 @@ type SQLQueries interface {
7576 */
7677 CreateChannel (ctx context.Context , arg sqlc.CreateChannelParams ) (int64 , error )
7778 GetChannelBySCID (ctx context.Context , arg sqlc.GetChannelBySCIDParams ) (sqlc.Channel , error )
79+ GetChannelAndNodesBySCID (ctx context.Context , arg sqlc.GetChannelAndNodesBySCIDParams ) (sqlc.GetChannelAndNodesBySCIDRow , error )
7880 HighestSCID (ctx context.Context , version int16 ) ([]byte , error )
7981
8082 CreateChannelExtraType (ctx context.Context , arg sqlc.CreateChannelExtraTypeParams ) error
8183 InsertChannelFeature (ctx context.Context , arg sqlc.InsertChannelFeatureParams ) error
84+
85+ /*
86+ Channel Policy table queries.
87+ */
88+ UpsertEdgePolicy (ctx context.Context , arg sqlc.UpsertEdgePolicyParams ) (int64 , error )
89+
90+ InsertChanPolicyExtraType (ctx context.Context , arg sqlc.InsertChanPolicyExtraTypeParams ) error
91+ DeleteChannelPolicyExtraTypes (ctx context.Context , channelPolicyID int64 ) error
8292}
8393
8494// BatchedSQLQueries is a version of SQLQueries that's capable of batched
@@ -96,7 +106,8 @@ type BatchedSQLQueries interface {
96106// implemented, things will fall back to the KVStore. This is ONLY the case
97107// for the time being while this struct is purely used in unit tests only.
98108type SQLStore struct {
99- db BatchedSQLQueries
109+ cfg * SQLStoreConfig
110+ db BatchedSQLQueries
100111
101112 // cacheMu guards all caches (rejectCache and chanCache). If
102113 // this mutex will be acquired at the same time as the DB mutex then
@@ -117,9 +128,16 @@ type SQLStore struct {
117128// interface.
118129var _ V1Store = (* SQLStore )(nil )
119130
131+ // SQLStoreConfig holds the configuration for the SQLStore.
132+ type SQLStoreConfig struct {
133+ // ChainHash is the genesis hash for the chain that all the gossip
134+ // messages in this store are aimed at.
135+ ChainHash chainhash.Hash
136+ }
137+
120138// NewSQLStore creates a new SQLStore instance given an open BatchedSQLQueries
121139// storage backend.
122- func NewSQLStore (db BatchedSQLQueries , kvStore * KVStore ,
140+ func NewSQLStore (cfg * SQLStoreConfig , db BatchedSQLQueries , kvStore * KVStore ,
123141 options ... StoreOptionModifier ) (* SQLStore , error ) {
124142
125143 opts := DefaultOptions ()
@@ -133,6 +151,7 @@ func NewSQLStore(db BatchedSQLQueries, kvStore *KVStore,
133151 }
134152
135153 s := & SQLStore {
154+ cfg : cfg ,
136155 db : db ,
137156 KVStore : kvStore ,
138157 rejectCache : newRejectCache (opts .RejectCacheSize ),
@@ -542,6 +561,193 @@ func (s *SQLStore) HighestChanID() (uint64, error) {
542561 return highestChanID , nil
543562}
544563
564+ // UpdateEdgePolicy updates the edge routing policy for a single directed edge
565+ // within the database for the referenced channel. The `flags` attribute within
566+ // the ChannelEdgePolicy determines which of the directed edges are being
567+ // updated. If the flag is 1, then the first node's information is being
568+ // updated, otherwise it's the second node's information. The node ordering is
569+ // determined by the lexicographical ordering of the identity public keys of the
570+ // nodes on either side of the channel.
571+ //
572+ // NOTE: part of the V1Store interface.
573+ func (s * SQLStore ) UpdateEdgePolicy (edge * models.ChannelEdgePolicy ,
574+ opts ... batch.SchedulerOption ) (route.Vertex , route.Vertex , error ) {
575+
576+ ctx := context .TODO ()
577+
578+ var (
579+ isUpdate1 bool
580+ edgeNotFound bool
581+ from , to route.Vertex
582+ )
583+
584+ r := & batch.Request [SQLQueries ]{
585+ Opts : batch .NewSchedulerOptions (opts ... ),
586+ Reset : func () {
587+ isUpdate1 = false
588+ edgeNotFound = false
589+ },
590+ Do : func (tx SQLQueries ) error {
591+ var err error
592+ from , to , isUpdate1 , err = updateChanEdgePolicy (
593+ ctx , tx , edge ,
594+ )
595+ if err != nil {
596+ log .Errorf ("UpdateEdgePolicy faild: %v" , err )
597+ }
598+
599+ // Silence ErrEdgeNotFound so that the batch can
600+ // succeed, but propagate the error via local state.
601+ if errors .Is (err , ErrEdgeNotFound ) {
602+ edgeNotFound = true
603+ return nil
604+ }
605+
606+ return err
607+ },
608+ OnCommit : func (err error ) error {
609+ switch {
610+ case err != nil :
611+ return err
612+ case edgeNotFound :
613+ return ErrEdgeNotFound
614+ default :
615+ s .updateEdgeCache (edge , isUpdate1 )
616+ return nil
617+ }
618+ },
619+ }
620+
621+ err := s .chanScheduler .Execute (ctx , r )
622+
623+ return from , to , err
624+ }
625+
626+ // updateEdgeCache updates our reject and channel caches with the new
627+ // edge policy information.
628+ func (s * SQLStore ) updateEdgeCache (e * models.ChannelEdgePolicy ,
629+ isUpdate1 bool ) {
630+
631+ // If an entry for this channel is found in reject cache, we'll modify
632+ // the entry with the updated timestamp for the direction that was just
633+ // written. If the edge doesn't exist, we'll load the cache entry lazily
634+ // during the next query for this edge.
635+ if entry , ok := s .rejectCache .get (e .ChannelID ); ok {
636+ if isUpdate1 {
637+ entry .upd1Time = e .LastUpdate .Unix ()
638+ } else {
639+ entry .upd2Time = e .LastUpdate .Unix ()
640+ }
641+ s .rejectCache .insert (e .ChannelID , entry )
642+ }
643+
644+ // If an entry for this channel is found in channel cache, we'll modify
645+ // the entry with the updated policy for the direction that was just
646+ // written. If the edge doesn't exist, we'll defer loading the info and
647+ // policies and lazily read from disk during the next query.
648+ if channel , ok := s .chanCache .get (e .ChannelID ); ok {
649+ if isUpdate1 {
650+ channel .Policy1 = e
651+ } else {
652+ channel .Policy2 = e
653+ }
654+ s .chanCache .insert (e .ChannelID , channel )
655+ }
656+ }
657+
658+ // updateChanEdgePolicy upserts the channel policy info we have stored for
659+ // a channel we already know of.
660+ func updateChanEdgePolicy (ctx context.Context , tx SQLQueries ,
661+ edge * models.ChannelEdgePolicy ) (route.Vertex , route.Vertex , bool ,
662+ error ) {
663+
664+ var (
665+ node1Pub , node2Pub route.Vertex
666+ isNode1 bool
667+ chanIDB [8 ]byte
668+ )
669+ byteOrder .PutUint64 (chanIDB [:], edge .ChannelID )
670+
671+ // Check that this edge policy refers to a channel that we already
672+ // know of. We do this explicitly so that we can return the appropriate
673+ // ErrEdgeNotFound error if the channel doesn't exist, rather than
674+ // abort the transaction which would abort the entire batch.
675+ dbChan , err := tx .GetChannelAndNodesBySCID (
676+ ctx , sqlc.GetChannelAndNodesBySCIDParams {
677+ Scid : chanIDB [:],
678+ Version : int16 (ProtocolV1 ),
679+ },
680+ )
681+ if errors .Is (err , sql .ErrNoRows ) {
682+ return node1Pub , node2Pub , false , ErrEdgeNotFound
683+ } else if err != nil {
684+ return node1Pub , node2Pub , false , fmt .Errorf ("unable to " +
685+ "fetch channel(%v): %w" , edge .ChannelID , err )
686+ }
687+
688+ copy (node1Pub [:], dbChan .Node1PubKey )
689+ copy (node2Pub [:], dbChan .Node2PubKey )
690+
691+ // Figure out which node this edge is from.
692+ isNode1 = edge .ChannelFlags & lnwire .ChanUpdateDirection == 0
693+ nodeID := dbChan .NodeID1
694+ if ! isNode1 {
695+ nodeID = dbChan .NodeID2
696+ }
697+
698+ var (
699+ inboundBase sql.NullInt64
700+ inboundRate sql.NullInt64
701+ )
702+ edge .InboundFee .WhenSome (func (fee lnwire.Fee ) {
703+ inboundRate = sqldb .SQLInt64 (fee .FeeRate )
704+ inboundBase = sqldb .SQLInt64 (fee .BaseFee )
705+ })
706+
707+ id , err := tx .UpsertEdgePolicy (ctx , sqlc.UpsertEdgePolicyParams {
708+ Version : int16 (ProtocolV1 ),
709+ ChannelID : dbChan .ID ,
710+ NodeID : nodeID ,
711+ Timelock : int32 (edge .TimeLockDelta ),
712+ FeePpm : int64 (edge .FeeProportionalMillionths ),
713+ BaseFeeMsat : int64 (edge .FeeBaseMSat ),
714+ MinHtlcMsat : int64 (edge .MinHTLC ),
715+ LastUpdate : sqldb .SQLInt64 (edge .LastUpdate .Unix ()),
716+ Disabled : sql.NullBool {
717+ Valid : true ,
718+ Bool : edge .IsDisabled (),
719+ },
720+ MaxHtlcMsat : sql.NullInt64 {
721+ Valid : edge .MessageFlags .HasMaxHtlc (),
722+ Int64 : int64 (edge .MaxHTLC ),
723+ },
724+ InboundBaseFeeMsat : inboundBase ,
725+ InboundFeeRateMilliMsat : inboundRate ,
726+ Signature : edge .SigBytes ,
727+ })
728+ if err != nil {
729+ return node1Pub , node2Pub , isNode1 ,
730+ fmt .Errorf ("unable to upsert edge policy: %w" , err )
731+ }
732+
733+ // Convert the flat extra opaque data into a map of TLV types to
734+ // values.
735+ extra , err := marshalExtraOpaqueData (edge .ExtraOpaqueData )
736+ if err != nil {
737+ return node1Pub , node2Pub , false , fmt .Errorf ("unable to " +
738+ "marshal extra opaque data: %w" , err )
739+ }
740+
741+ // Update the channel policy's extra signed fields.
742+ err = upsertChanPolicyExtraSignedFields (ctx , tx , id , extra )
743+ if err != nil {
744+ return node1Pub , node2Pub , false , fmt .Errorf ("inserting chan " +
745+ "policy extra TLVs: %w" , err )
746+ }
747+
748+ return node1Pub , node2Pub , isNode1 , nil
749+ }
750+
545751// getNodeByPubKey attempts to look up a target node by its public key.
546752func getNodeByPubKey (ctx context.Context , db SQLQueries ,
547753 pubKey route.Vertex ) (int64 , * models.LightningNode , error ) {
@@ -1257,3 +1463,36 @@ func maybeCreateShellNode(ctx context.Context, db SQLQueries,
12571463
12581464 return id , nil
12591465}
1466+
1467+ // upsertChanPolicyExtraSignedFields updates the policy's extra signed fields in
1468+ // the database. This includes deleting any existing types and then inserting
1469+ // the new types.
1470+ func upsertChanPolicyExtraSignedFields (ctx context.Context , db SQLQueries ,
1471+ chanPolicyID int64 , extraFields map [uint64 ][]byte ) error {
1472+
1473+ // Delete all existing extra signed fields for the channel policy.
1474+ err := db .DeleteChannelPolicyExtraTypes (ctx , chanPolicyID )
1475+ if err != nil {
1476+ return fmt .Errorf ("unable to delete " +
1477+ "existing policy extra signed fields for policy %d: %w" ,
1478+ chanPolicyID , err )
1479+ }
1480+
1481+ // Insert all new extra signed fields for the channel policy.
1482+ for tlvType , value := range extraFields {
1483+ err = db .InsertChanPolicyExtraType (
1484+ ctx , sqlc.InsertChanPolicyExtraTypeParams {
1485+ ChannelPolicyID : chanPolicyID ,
1486+ Type : int64 (tlvType ),
1487+ Value : value ,
1488+ },
1489+ )
1490+ if err != nil {
1491+ return fmt .Errorf ("unable to insert " +
1492+ "channel_policy(%d) extra signed field(%v): %w" ,
1493+ chanPolicyID , tlvType , err )
1494+ }
1495+ }
1496+
1497+ return nil
1498+ }
0 commit comments