@@ -91,6 +91,7 @@ type SQLQueries interface {
9191 Channel queries.
9292 */
9393 CreateChannel (ctx context.Context , arg sqlc.CreateChannelParams ) (int64 , error )
94+ AddV1ChannelProof (ctx context.Context , arg sqlc.AddV1ChannelProofParams ) (sql.Result , error )
9495 GetChannelBySCID (ctx context.Context , arg sqlc.GetChannelBySCIDParams ) (sqlc.Channel , error )
9596 GetChannelByOutpoint (ctx context.Context , outpoint string ) (sqlc.GetChannelByOutpointRow , error )
9697 GetChannelsBySCIDRange (ctx context.Context , arg sqlc.GetChannelsBySCIDRangeParams ) ([]sqlc.GetChannelsBySCIDRangeRow , error )
@@ -136,6 +137,12 @@ type SQLQueries interface {
136137 GetPruneTip (ctx context.Context ) (sqlc.PruneLog , error )
137138 UpsertPruneLogEntry (ctx context.Context , arg sqlc.UpsertPruneLogEntryParams ) error
138139 DeletePruneLogEntriesInRange (ctx context.Context , arg sqlc.DeletePruneLogEntriesInRangeParams ) error
140+
141+ /*
142+ Closed SCID table queries.
143+ */
144+ InsertClosedChannel (ctx context.Context , scid []byte ) error
145+ IsClosedChannel (ctx context.Context , scid []byte ) (bool , error )
139146}
140147
141148// BatchedSQLQueries is a version of SQLQueries that's capable of batched
@@ -1096,8 +1103,8 @@ func (s *SQLStore) ChanUpdatesInHorizon(startTime,
10961103 }
10971104
10981105 if len (edges ) > 0 {
1099- log .Debugf ("ChanUpdatesInHorizon hit percentage: %f (%d/%d)" ,
1100- float64 (hits )/ float64 (len (edges )), hits , len (edges ))
1106+ log .Debugf ("ChanUpdatesInHorizon hit percentage: %.2f (%d/%d)" ,
1107+ float64 (hits )* 100 / float64 (len (edges )), hits , len (edges ))
11011108 } else {
11021109 log .Debugf ("ChanUpdatesInHorizon returned no edges in " +
11031110 "horizon (%s, %s)" , startTime , endTime )
@@ -1231,6 +1238,103 @@ func (s *SQLStore) ForEachNodeCached(cb func(node route.Vertex,
12311238 }, sqldb .NoOpReset )
12321239}
12331240
1241+ // ForEachChannelCacheable iterates through all the channel edges stored
1242+ // within the graph and invokes the passed callback for each edge. The
1243+ // callback takes two edges as since this is a directed graph, both the
1244+ // in/out edges are visited. If the callback returns an error, then the
1245+ // transaction is aborted and the iteration stops early.
1246+ //
1247+ // NOTE: If an edge can't be found, or wasn't advertised, then a nil
1248+ // pointer for that particular channel edge routing policy will be
1249+ // passed into the callback.
1250+ //
1251+ // NOTE: this method is like ForEachChannel but fetches only the data
1252+ // required for the graph cache.
1253+ func (s * SQLStore ) ForEachChannelCacheable (cb func (* models.CachedEdgeInfo ,
1254+ * models.CachedEdgePolicy ,
1255+ * models.CachedEdgePolicy ) error ) error {
1256+
1257+ ctx := context .TODO ()
1258+
1259+ handleChannel := func (db SQLQueries ,
1260+ row sqlc.ListChannelsWithPoliciesPaginatedRow ) error {
1261+
1262+ node1 , node2 , err := buildNodeVertices (
1263+ row .Node1Pubkey , row .Node2Pubkey ,
1264+ )
1265+ if err != nil {
1266+ return err
1267+ }
1268+
1269+ edge := buildCacheableChannelInfo (row .Channel , node1 , node2 )
1270+
1271+ dbPol1 , dbPol2 , err := extractChannelPolicies (row )
1272+ if err != nil {
1273+ return err
1274+ }
1275+
1276+ var pol1 , pol2 * models.CachedEdgePolicy
1277+ if dbPol1 != nil {
1278+ policy1 , err := buildChanPolicy (
1279+ * dbPol1 , edge .ChannelID , nil , node2 , true ,
1280+ )
1281+ if err != nil {
1282+ return err
1283+ }
1284+
1285+ pol1 = models .NewCachedPolicy (policy1 )
1286+ }
1287+ if dbPol2 != nil {
1288+ policy2 , err := buildChanPolicy (
1289+ * dbPol2 , edge .ChannelID , nil , node1 , false ,
1290+ )
1291+ if err != nil {
1292+ return err
1293+ }
1294+
1295+ pol2 = models .NewCachedPolicy (policy2 )
1296+ }
1297+
1298+ if err := cb (edge , pol1 , pol2 ); err != nil {
1299+ return err
1300+ }
1301+
1302+ return nil
1303+ }
1304+
1305+ return s .db .ExecTx (ctx , sqldb .ReadTxOpt (), func (db SQLQueries ) error {
1306+ lastID := int64 (- 1 )
1307+ for {
1308+ //nolint:ll
1309+ rows , err := db .ListChannelsWithPoliciesPaginated (
1310+ ctx , sqlc.ListChannelsWithPoliciesPaginatedParams {
1311+ Version : int16 (ProtocolV1 ),
1312+ ID : lastID ,
1313+ Limit : pageSize ,
1314+ },
1315+ )
1316+ if err != nil {
1317+ return err
1318+ }
1319+
1320+ if len (rows ) == 0 {
1321+ break
1322+ }
1323+
1324+ for _ , row := range rows {
1325+ err := handleChannel (db , row )
1326+ if err != nil {
1327+ return err
1328+ }
1329+
1330+ lastID = row .Channel .ID
1331+ }
1332+ }
1333+
1334+ return nil
1335+ }, sqldb .NoOpReset )
1336+ }
1337+
12341338// ForEachChannel iterates through all the channel edges stored within the
12351339// graph and invokes the passed callback for each edge. The callback takes two
12361340// edges as since this is a directed graph, both the in/out edges are visited.
@@ -1291,7 +1395,7 @@ func (s *SQLStore) ForEachChannel(cb func(*models.ChannelEdgeInfo,
12911395 }
12921396
12931397 return s .db .ExecTx (ctx , sqldb .ReadTxOpt (), func (db SQLQueries ) error {
1294- var lastID int64
1398+ lastID := int64 ( - 1 )
12951399 for {
12961400 //nolint:ll
12971401 rows , err := db .ListChannelsWithPoliciesPaginated (
@@ -2575,6 +2679,155 @@ func (s *SQLStore) DisconnectBlockAtHeight(height uint32) (
25752679 return removedChans , nil
25762680}
25772681
2682+ // AddEdgeProof sets the proof of an existing edge in the graph database.
2683+ //
2684+ // NOTE: part of the V1Store interface.
2685+ func (s * SQLStore ) AddEdgeProof (scid lnwire.ShortChannelID ,
2686+ proof * models.ChannelAuthProof ) error {
2687+
2688+ var (
2689+ ctx = context .TODO ()
2690+ scidBytes = channelIDToBytes (scid .ToUint64 ())
2691+ )
2692+
2693+ err := s .db .ExecTx (ctx , sqldb .WriteTxOpt (), func (db SQLQueries ) error {
2694+ res , err := db .AddV1ChannelProof (
2695+ ctx , sqlc.AddV1ChannelProofParams {
2696+ Scid : scidBytes [:],
2697+ Node1Signature : proof .NodeSig1Bytes ,
2698+ Node2Signature : proof .NodeSig2Bytes ,
2699+ Bitcoin1Signature : proof .BitcoinSig1Bytes ,
2700+ Bitcoin2Signature : proof .BitcoinSig2Bytes ,
2701+ },
2702+ )
2703+ if err != nil {
2704+ return fmt .Errorf ("unable to add edge proof: %w" , err )
2705+ }
2706+
2707+ n , err := res .RowsAffected ()
2708+ if err != nil {
2709+ return err
2710+ }
2711+
2712+ if n == 0 {
2713+ return fmt .Errorf ("no rows affected when adding edge " +
2714+ "proof for SCID %v" , scid )
2715+ } else if n > 1 {
2716+ return fmt .Errorf ("multiple rows affected when adding " +
2717+ "edge proof for SCID %v: %d rows affected" ,
2718+ scid , n )
2719+ }
2720+
2721+ return nil
2722+ }, sqldb .NoOpReset )
2723+ if err != nil {
2724+ return fmt .Errorf ("unable to add edge proof: %w" , err )
2725+ }
2726+
2727+ return nil
2728+ }
2729+
2730+ // PutClosedScid stores a SCID for a closed channel in the database. This is so
2731+ // that we can ignore channel announcements that we know to be closed without
2732+ // having to validate them and fetch a block.
2733+ //
2734+ // NOTE: part of the V1Store interface.
2735+ func (s * SQLStore ) PutClosedScid (scid lnwire.ShortChannelID ) error {
2736+ var (
2737+ ctx = context .TODO ()
2738+ chanIDB = channelIDToBytes (scid .ToUint64 ())
2739+ )
2740+
2741+ return s .db .ExecTx (ctx , sqldb .WriteTxOpt (), func (db SQLQueries ) error {
2742+ return db .InsertClosedChannel (ctx , chanIDB [:])
2743+ }, sqldb .NoOpReset )
2744+ }
2745+
2746+ // IsClosedScid checks whether a channel identified by the passed in scid is
2747+ // closed. This helps avoid having to perform expensive validation checks.
2748+ //
2749+ // NOTE: part of the V1Store interface.
2750+ func (s * SQLStore ) IsClosedScid (scid lnwire.ShortChannelID ) (bool , error ) {
2751+ var (
2752+ ctx = context .TODO ()
2753+ isClosed bool
2754+ chanIDB = channelIDToBytes (scid .ToUint64 ())
2755+ )
2756+ err := s .db .ExecTx (ctx , sqldb .ReadTxOpt (), func (db SQLQueries ) error {
2757+ var err error
2758+ isClosed , err = db .IsClosedChannel (ctx , chanIDB [:])
2759+ if err != nil {
2760+ return fmt .Errorf ("unable to fetch closed channel: %w" ,
2761+ err )
2762+ }
2763+
2764+ return nil
2765+ }, sqldb .NoOpReset )
2766+ if err != nil {
2767+ return false , fmt .Errorf ("unable to fetch closed channel: %w" ,
2768+ err )
2769+ }
2770+
2771+ return isClosed , nil
2772+ }
2773+
2774+ // GraphSession will provide the call-back with access to a NodeTraverser
2775+ // instance which can be used to perform queries against the channel graph.
2776+ //
2777+ // NOTE: part of the V1Store interface.
2778+ func (s * SQLStore ) GraphSession (cb func (graph NodeTraverser ) error ) error {
2779+ var ctx = context .TODO ()
2780+
2781+ return s .db .ExecTx (ctx , sqldb .ReadTxOpt (), func (db SQLQueries ) error {
2782+ return cb (newSQLNodeTraverser (db , s .cfg .ChainHash ))
2783+ }, sqldb .NoOpReset )
2784+ }
2785+
2786+ // sqlNodeTraverser implements the NodeTraverser interface but with a backing
2787+ // read only transaction for a consistent view of the graph.
2788+ type sqlNodeTraverser struct {
2789+ db SQLQueries
2790+ chain chainhash.Hash
2791+ }
2792+
2793+ // A compile-time assertion to ensure that sqlNodeTraverser implements the
2794+ // NodeTraverser interface.
2795+ var _ NodeTraverser = (* sqlNodeTraverser )(nil )
2796+
2797+ // newSQLNodeTraverser creates a new instance of the sqlNodeTraverser.
2798+ func newSQLNodeTraverser (db SQLQueries ,
2799+ chain chainhash.Hash ) * sqlNodeTraverser {
2800+
2801+ return & sqlNodeTraverser {
2802+ db : db ,
2803+ chain : chain ,
2804+ }
2805+ }
2806+
2807+ // ForEachNodeDirectedChannel calls the callback for every channel of the given
2808+ // node.
2809+ //
2810+ // NOTE: Part of the NodeTraverser interface.
2811+ func (s * sqlNodeTraverser ) ForEachNodeDirectedChannel (nodePub route.Vertex ,
2812+ cb func (channel * DirectedChannel ) error ) error {
2813+
2814+ ctx := context .TODO ()
2815+
2816+ return forEachNodeDirectedChannel (ctx , s .db , nodePub , cb )
2817+ }
2818+
2819+ // FetchNodeFeatures returns the features of the given node. If the node is
2820+ // unknown, assume no additional features are supported.
2821+ //
2822+ // NOTE: Part of the NodeTraverser interface.
2823+ func (s * sqlNodeTraverser ) FetchNodeFeatures (nodePub route.Vertex ) (
2824+ * lnwire.FeatureVector , error ) {
2825+
2826+ ctx := context .TODO ()
2827+
2828+ return fetchNodeFeatures (ctx , s .db , nodePub )
2829+ }
2830+
25782831// forEachNodeDirectedChannel iterates through all channels of a given
25792832// node, executing the passed callback on the directed edge representing the
25802833// channel and its incoming policy. If the node is not found, no error is
@@ -2704,7 +2957,7 @@ func forEachNodeDirectedChannel(ctx context.Context, db SQLQueries,
27042957func forEachNodeCacheable (ctx context.Context , db SQLQueries ,
27052958 cb func (nodeID int64 , nodePub route.Vertex ) error ) error {
27062959
2707- var lastID int64
2960+ lastID := int64 ( - 1 )
27082961
27092962 for {
27102963 nodes , err := db .ListNodeIDsAndPubKeys (
0 commit comments