Skip to content

Commit f37e149

Browse files
authored
Merge pull request #86 from TheBlueMatt/2024-11-val-field
Add a funding_value field in channel_announcements
2 parents 5d48711 + 29dfeb8 commit f37e149

File tree

6 files changed

+99
-26
lines changed

6 files changed

+99
-26
lines changed

src/config.rs

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use crate::hex_utils;
2+
use crate::verifier::ChainVerifier;
23

34
use std::env;
45
use std::net::{SocketAddr, ToSocketAddrs};
6+
use std::ops::Deref;
7+
use std::sync::Arc;
58
use std::time::Duration;
69

710
use bitcoin::io::Cursor;
@@ -10,11 +13,15 @@ use bitcoin::hashes::hex::FromHex;
1013
use bitcoin::secp256k1::PublicKey;
1114
use futures::stream::{FuturesUnordered, StreamExt};
1215
use lightning::ln::msgs::ChannelAnnouncement;
16+
use lightning::util::logger::Logger;
1317
use lightning::util::ser::Readable;
1418
use lightning_block_sync::http::HttpEndpoint;
19+
use lightning_block_sync::rest::RestClient;
1520
use tokio_postgres::Config;
1621

17-
pub(crate) const SCHEMA_VERSION: i32 = 14;
22+
use tokio::sync::Semaphore;
23+
24+
pub(crate) const SCHEMA_VERSION: i32 = 15;
1825
pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; // three hours
1926
pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks
2027
// generate symlinks based on a 3-hour-granularity
@@ -120,6 +127,7 @@ pub(crate) fn db_announcement_table_creation_query() -> &'static str {
120127
"CREATE TABLE IF NOT EXISTS channel_announcements (
121128
id SERIAL PRIMARY KEY,
122129
short_channel_id bigint NOT NULL UNIQUE,
130+
funding_amount_sats bigint NOT NULL,
123131
announcement_signed BYTEA,
124132
seen timestamp NOT NULL DEFAULT NOW()
125133
)"
@@ -167,7 +175,9 @@ pub(crate) fn db_index_creation_query() -> &'static str {
167175
"
168176
}
169177

170-
pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client) {
178+
pub(crate) async fn upgrade_db<L: Deref + Clone + Send + Sync + 'static>(
179+
schema: i32, client: &mut tokio_postgres::Client, logger: L,
180+
) where L::Target: Logger {
171181
if schema == 1 {
172182
let tx = client.transaction().await.unwrap();
173183
tx.execute("ALTER TABLE channel_updates DROP COLUMN chain_hash", &[]).await.unwrap();
@@ -313,6 +323,33 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client)
313323
tx.execute("UPDATE config SET db_schema = 14 WHERE id = 1", &[]).await.unwrap();
314324
tx.commit().await.unwrap();
315325
}
326+
if schema >= 1 && schema <= 14 {
327+
println!("Upgrading to schema 15 requiring UTXO lookups for each historical channel announcement. This may take some time");
328+
// Note that we don't bother doing this one in a transaction, and as such need to support
329+
// resuming on a crash.
330+
let _ = client.execute("ALTER TABLE channel_announcements ADD COLUMN funding_amount_sats bigint DEFAULT null", &[]).await;
331+
tokio::spawn(async move {
332+
let client = crate::connect_to_db().await;
333+
let mut scids = Box::pin(client.query_raw("SELECT DISTINCT ON (short_channel_id) short_channel_id FROM channel_announcements WHERE funding_amount_sats IS NULL;", &[0i64][1..]).await.unwrap());
334+
let sem = Arc::new(Semaphore::new(16));
335+
while let Some(scid_res) = scids.next().await {
336+
let scid: i64 = scid_res.unwrap().get(0);
337+
let permit = Arc::clone(&sem).acquire_owned().await.unwrap();
338+
let logger = logger.clone();
339+
tokio::spawn(async move {
340+
let rest_client = Arc::new(RestClient::new(bitcoin_rest_endpoint()).unwrap());
341+
let txo = ChainVerifier::retrieve_txo(rest_client, scid as u64, logger).await
342+
.expect("We shouldn't have accepted a channel announce with a bad TXO");
343+
let client = crate::connect_to_db().await;
344+
client.execute("UPDATE channel_announcements SET funding_amount_sats = $1 WHERE short_channel_id = $2", &[&(txo.value.to_sat() as i64), &scid]).await.unwrap();
345+
std::mem::drop(permit);
346+
});
347+
}
348+
let _all_updates_complete = sem.acquire_many(16).await.unwrap();
349+
client.execute("ALTER TABLE channel_announcements ALTER funding_amount_sats SET NOT NULL", &[]).await.unwrap();
350+
client.execute("UPDATE config SET db_schema = 15 WHERE id = 1", &[]).await.unwrap();
351+
});
352+
}
316353
if schema <= 1 || schema > SCHEMA_VERSION {
317354
panic!("Unknown schema in db: {}, we support up to {}", schema, SCHEMA_VERSION);
318355
}

src/downloader.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,16 @@ impl<L: Deref + Clone + Send + Sync> GossipRouter<L> where L::Target: Logger {
6464
counter.channel_announcements += 1;
6565
}
6666

67-
let gossip_message = GossipMessage::ChannelAnnouncement(msg, None);
67+
let mut funding_amount_sats = self.verifier.get_cached_funding_value(msg.contents.short_channel_id);
68+
if funding_amount_sats.is_none() {
69+
tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async {
70+
funding_amount_sats = self.verifier.retrieve_funding_value(msg.contents.short_channel_id).await.ok();
71+
})});
72+
}
73+
let funding_amount_sats = funding_amount_sats
74+
.expect("If we've accepted a ChannelAnnouncement, we must be able to fetch the TXO for it");
75+
76+
let gossip_message = GossipMessage::ChannelAnnouncement(msg, funding_amount_sats, None);
6877
if let Err(err) = self.sender.try_send(gossip_message) {
6978
let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
7079
tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {

src/persistence.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub(crate) struct GossipPersister<L: Deref> where L::Target: Logger {
2323
logger: L
2424
}
2525

26-
impl<L: Deref> GossipPersister<L> where L::Target: Logger {
26+
impl<L: Deref + Clone + Send + Sync + 'static> GossipPersister<L> where L::Target: Logger {
2727
pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> (Self, mpsc::Sender<GossipMessage>) {
2828
let (gossip_persistence_sender, gossip_persistence_receiver) =
2929
mpsc::channel::<GossipMessage>(100);
@@ -50,7 +50,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
5050

5151
let cur_schema = client.query("SELECT db_schema FROM config WHERE id = $1", &[&1]).await.unwrap();
5252
if !cur_schema.is_empty() {
53-
config::upgrade_db(cur_schema[0].get(0), &mut client).await;
53+
config::upgrade_db(cur_schema[0].get(0), &mut client, self.logger.clone()).await;
5454
}
5555

5656
let preparation = client.execute("set time zone UTC", &[]).await;
@@ -185,7 +185,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
185185
#[cfg(test)]
186186
tasks_spawned.push(_task);
187187
},
188-
GossipMessage::ChannelAnnouncement(announcement, seen_override) => {
188+
GossipMessage::ChannelAnnouncement(announcement, funding_value, seen_override) => {
189189
let scid = announcement.contents.short_channel_id as i64;
190190

191191
// start with the type prefix, which is already known a priori
@@ -197,20 +197,24 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
197197
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
198198
.execute("INSERT INTO channel_announcements (\
199199
short_channel_id, \
200+
funding_amount_sats, \
200201
announcement_signed, \
201202
seen \
202-
) VALUES ($1, $2, TO_TIMESTAMP($3)) ON CONFLICT (short_channel_id) DO NOTHING", &[
203+
) VALUES ($1, $2, $3, TO_TIMESTAMP($4)) ON CONFLICT (short_channel_id) DO NOTHING", &[
203204
&scid,
205+
&(funding_value as i64),
204206
&announcement_signed,
205207
&(seen_override.unwrap() as f64)
206208
])).await.unwrap().unwrap();
207209
} else {
208210
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
209211
.execute("INSERT INTO channel_announcements (\
210212
short_channel_id, \
213+
funding_amount_sats, \
211214
announcement_signed \
212-
) VALUES ($1, $2) ON CONFLICT (short_channel_id) DO NOTHING", &[
215+
) VALUES ($1, $2, $3) ON CONFLICT (short_channel_id) DO NOTHING", &[
213216
&scid,
217+
&(funding_value as i64),
214218
&announcement_signed
215219
])).await.unwrap().unwrap();
216220
}

src/tests/mod.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ async fn test_trivial_setup() {
242242
network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
243243
network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
244244

245-
receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
245+
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap();
246246
receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
247247
receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
248248
drop(receiver);
@@ -357,7 +357,7 @@ async fn test_node_announcement_delta_detection() {
357357
network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
358358
network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
359359

360-
receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
360+
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, Some(timestamp))).await.unwrap();
361361
receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap();
362362
receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp))).await.unwrap();
363363
}
@@ -450,7 +450,7 @@ async fn test_unidirectional_intermediate_update_consideration() {
450450
network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
451451
network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
452452

453-
receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
453+
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, Some(timestamp))).await.unwrap();
454454
receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
455455
receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
456456
receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
@@ -521,7 +521,7 @@ async fn test_bidirectional_intermediate_update_consideration() {
521521
network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
522522
network_graph_arc.update_channel_unsigned(&update_4.contents).unwrap();
523523

524-
receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
524+
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, Some(timestamp))).await.unwrap();
525525
receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
526526
receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
527527
receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
@@ -577,7 +577,7 @@ async fn test_channel_reminders() {
577577
network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
578578
network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
579579

580-
receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
580+
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
581581
receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
582582
receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
583583
}
@@ -598,7 +598,7 @@ async fn test_channel_reminders() {
598598
network_graph_arc.update_channel_unsigned(&update_7.contents).unwrap();
599599
network_graph_arc.update_channel_unsigned(&update_8.contents).unwrap();
600600

601-
receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
601+
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
602602
receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp - channel_reminder_delta - 10))).await.unwrap();
603603
receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - channel_reminder_delta - 5))).await.unwrap();
604604
receiver.send(GossipMessage::ChannelUpdate(update_3, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
@@ -653,7 +653,7 @@ async fn test_full_snapshot_recency() {
653653
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
654654
let announcement = generate_channel_announcement(short_channel_id);
655655
network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
656-
receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
656+
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap();
657657

658658
{ // direction false
659659
{ // first update
@@ -734,7 +734,7 @@ async fn test_full_snapshot_recency_with_wrong_seen_order() {
734734
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
735735
let announcement = generate_channel_announcement(short_channel_id);
736736
network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
737-
receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
737+
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap();
738738

739739
{ // direction false
740740
{ // first update, seen latest
@@ -815,7 +815,7 @@ async fn test_full_snapshot_recency_with_wrong_propagation_order() {
815815
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
816816
let announcement = generate_channel_announcement(short_channel_id);
817817
network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
818-
receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
818+
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap();
819819

820820
{ // direction false
821821
// apply updates in their timestamp order
@@ -898,7 +898,7 @@ async fn test_full_snapshot_mutiny_scenario() {
898898
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
899899
let announcement = generate_channel_announcement(short_channel_id);
900900
network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
901-
receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
901+
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap();
902902

903903
{ // direction false
904904
{
@@ -1036,13 +1036,13 @@ async fn test_full_snapshot_interlaced_channel_timestamps() {
10361036
{ // main channel
10371037
let announcement = generate_channel_announcement(main_channel_id);
10381038
network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
1039-
receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
1039+
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap();
10401040
}
10411041

10421042
{ // secondary channel
10431043
let announcement = generate_channel_announcement(secondary_channel_id);
10441044
network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
1045-
receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
1045+
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap();
10461046
}
10471047

10481048
{ // main channel
@@ -1145,7 +1145,7 @@ async fn test_full_snapshot_persistence() {
11451145
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
11461146
let announcement = generate_channel_announcement(short_channel_id);
11471147
network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
1148-
receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
1148+
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap();
11491149

11501150
{ // direction true
11511151
let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);

src/types.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ pub(crate) type GossipPeerManager<L> = Arc<PeerManager<lightning_net_tokio::Sock
1616
pub(crate) enum GossipMessage {
1717
NodeAnnouncement(NodeAnnouncement, Option<u32>),
1818
// the second element is an optional override for the seen value
19-
ChannelAnnouncement(ChannelAnnouncement, Option<u32>),
19+
ChannelAnnouncement(ChannelAnnouncement, u64, Option<u32>),
2020
ChannelUpdate(ChannelUpdate, Option<u32>),
2121
}
2222

src/verifier.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::collections::HashMap;
12
use std::io::ErrorKind;
23
use std::ops::Deref;
34
use std::sync::Arc;
@@ -23,6 +24,9 @@ pub(crate) struct ChainVerifier<L: Deref + Clone + Send + Sync + 'static> where
2324
graph: Arc<NetworkGraph<L>>,
2425
outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Arc<Self>, L>>,
2526
peer_handler: Mutex<Option<GossipPeerManager<L>>>,
27+
/// A cache on the funding amounts for each channel that we've looked up, mapping from SCID to
28+
/// funding satoshis.
29+
channel_funding_amounts: Arc<Mutex<HashMap<u64, u64>>>,
2630
logger: L
2731
}
2832

@@ -35,14 +39,28 @@ impl<L: Deref + Clone + Send + Sync + 'static> ChainVerifier<L> where L::Target:
3539
outbound_gossiper,
3640
graph,
3741
peer_handler: Mutex::new(None),
38-
logger
42+
channel_funding_amounts: Arc::new(Mutex::new(HashMap::new())),
43+
logger,
3944
}
4045
}
4146
pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager<L>) {
4247
*self.peer_handler.lock().unwrap() = Some(peer_handler);
4348
}
4449

45-
async fn retrieve_utxo(client: Arc<RestClient>, short_channel_id: u64, logger: L) -> Result<TxOut, UtxoLookupError> {
50+
pub(crate) fn get_cached_funding_value(&self, scid: u64) -> Option<u64> {
51+
self.channel_funding_amounts.lock().unwrap().get(&scid).map(|v| *v)
52+
}
53+
54+
pub(crate) async fn retrieve_funding_value(&self, scid: u64) -> Result<u64, UtxoLookupError> {
55+
Self::retrieve_cache_txo(Arc::clone(&self.rest_client), Some(Arc::clone(&self.channel_funding_amounts)), scid, self.logger.clone())
56+
.await.map(|txo| txo.value.to_sat())
57+
}
58+
59+
pub(crate) async fn retrieve_txo(client: Arc<RestClient>, short_channel_id: u64, logger: L) -> Result<TxOut, UtxoLookupError> {
60+
Self::retrieve_cache_txo(client, None, short_channel_id, logger).await
61+
}
62+
63+
async fn retrieve_cache_txo(client: Arc<RestClient>, channel_funding_amounts: Option<Arc<Mutex<HashMap<u64, u64>>>>, short_channel_id: u64, logger: L) -> Result<TxOut, UtxoLookupError> {
4664
let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes
4765
let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32;
4866
let output_index = (short_channel_id & 0xffff) as u16;
@@ -57,7 +75,11 @@ impl<L: Deref + Clone + Send + Sync + 'static> ChainVerifier<L> where L::Target:
5775
log_error!(logger, "Could't find output {} in transaction {}", output_index, transaction.compute_txid());
5876
return Err(UtxoLookupError::UnknownTx);
5977
}
60-
Ok(transaction.output.swap_remove(output_index as usize))
78+
let txo = transaction.output.swap_remove(output_index as usize);
79+
if let Some(channel_funding_amounts) = channel_funding_amounts {
80+
channel_funding_amounts.lock().unwrap().insert(short_channel_id, txo.value.to_sat());
81+
}
82+
Ok(txo)
6183
}
6284

6385
async fn retrieve_block(client: Arc<RestClient>, block_height: u32, logger: L) -> Result<Block, UtxoLookupError> {
@@ -99,10 +121,11 @@ impl<L: Deref + Clone + Send + Sync + 'static> UtxoLookup for ChainVerifier<L> w
99121
let graph_ref = Arc::clone(&self.graph);
100122
let client_ref = Arc::clone(&self.rest_client);
101123
let gossip_ref = Arc::clone(&self.outbound_gossiper);
124+
let channel_funding_amounts_cache_ref = Arc::clone(&self.channel_funding_amounts);
102125
let pm_ref = self.peer_handler.lock().unwrap().clone();
103126
let logger_ref = self.logger.clone();
104127
tokio::spawn(async move {
105-
let res = Self::retrieve_utxo(client_ref, short_channel_id, logger_ref).await;
128+
let res = Self::retrieve_cache_txo(client_ref, Some(channel_funding_amounts_cache_ref), short_channel_id, logger_ref).await;
106129
fut.resolve(&*graph_ref, &*gossip_ref, res);
107130
if let Some(pm) = pm_ref { pm.process_events(); }
108131
});

0 commit comments

Comments
 (0)