Skip to content

Commit 29dfeb8

Browse files
committed
Backfill TXO values
1 parent c02f2ec commit 29dfeb8

File tree

3 files changed

+41
-4
lines changed

3 files changed

+41
-4
lines changed

src/config.rs

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use crate::hex_utils;
2+
use crate::verifier::ChainVerifier;
23

34
use std::env;
45
use std::net::{SocketAddr, ToSocketAddrs};
6+
use std::ops::Deref;
7+
use std::sync::Arc;
58
use std::time::Duration;
69

710
use bitcoin::io::Cursor;
@@ -10,10 +13,14 @@ use bitcoin::hashes::hex::FromHex;
1013
use bitcoin::secp256k1::PublicKey;
1114
use futures::stream::{FuturesUnordered, StreamExt};
1215
use lightning::ln::msgs::ChannelAnnouncement;
16+
use lightning::util::logger::Logger;
1317
use lightning::util::ser::Readable;
1418
use lightning_block_sync::http::HttpEndpoint;
19+
use lightning_block_sync::rest::RestClient;
1520
use tokio_postgres::Config;
1621

22+
use tokio::sync::Semaphore;
23+
1724
pub(crate) const SCHEMA_VERSION: i32 = 15;
1825
pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; // three hours
1926
pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks
@@ -168,7 +175,9 @@ pub(crate) fn db_index_creation_query() -> &'static str {
168175
"
169176
}
170177

171-
pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client) {
178+
pub(crate) async fn upgrade_db<L: Deref + Clone + Send + Sync + 'static>(
179+
schema: i32, client: &mut tokio_postgres::Client, logger: L,
180+
) where L::Target: Logger {
172181
if schema == 1 {
173182
let tx = client.transaction().await.unwrap();
174183
tx.execute("ALTER TABLE channel_updates DROP COLUMN chain_hash", &[]).await.unwrap();
@@ -315,7 +324,31 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client)
315324
tx.commit().await.unwrap();
316325
}
317326
if schema >= 1 && schema <= 14 {
318-
client.execute("ALTER TABLE channel_announcements ADD COLUMN funding_amount_sats bigint DEFAULT null", &[]).await.unwrap();
327+
println!("Upgrading to schema 15 requiring UTXO lookups for each historical channel announcement. This may take some time");
328+
// Note that we don't bother doing this one in a transaction, and as such need to support
329+
// resuming on a crash.
330+
let _ = client.execute("ALTER TABLE channel_announcements ADD COLUMN funding_amount_sats bigint DEFAULT null", &[]).await;
331+
tokio::spawn(async move {
332+
let client = crate::connect_to_db().await;
333+
let mut scids = Box::pin(client.query_raw("SELECT DISTINCT ON (short_channel_id) short_channel_id FROM channel_announcements WHERE funding_amount_sats IS NULL;", &[0i64][1..]).await.unwrap());
334+
let sem = Arc::new(Semaphore::new(16));
335+
while let Some(scid_res) = scids.next().await {
336+
let scid: i64 = scid_res.unwrap().get(0);
337+
let permit = Arc::clone(&sem).acquire_owned().await.unwrap();
338+
let logger = logger.clone();
339+
tokio::spawn(async move {
340+
let rest_client = Arc::new(RestClient::new(bitcoin_rest_endpoint()).unwrap());
341+
let txo = ChainVerifier::retrieve_txo(rest_client, scid as u64, logger).await
342+
.expect("We shouldn't have accepted a channel announce with a bad TXO");
343+
let client = crate::connect_to_db().await;
344+
client.execute("UPDATE channel_announcements SET funding_amount_sats = $1 WHERE short_channel_id = $2", &[&(txo.value.to_sat() as i64), &scid]).await.unwrap();
345+
std::mem::drop(permit);
346+
});
347+
}
348+
let _all_updates_complete = sem.acquire_many(16).await.unwrap();
349+
client.execute("ALTER TABLE channel_announcements ALTER funding_amount_sats SET NOT NULL", &[]).await.unwrap();
350+
client.execute("UPDATE config SET db_schema = 15 WHERE id = 1", &[]).await.unwrap();
351+
});
319352
}
320353
if schema <= 1 || schema > SCHEMA_VERSION {
321354
panic!("Unknown schema in db: {}, we support up to {}", schema, SCHEMA_VERSION);

src/persistence.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub(crate) struct GossipPersister<L: Deref> where L::Target: Logger {
2323
logger: L
2424
}
2525

26-
impl<L: Deref> GossipPersister<L> where L::Target: Logger {
26+
impl<L: Deref + Clone + Send + Sync + 'static> GossipPersister<L> where L::Target: Logger {
2727
pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> (Self, mpsc::Sender<GossipMessage>) {
2828
let (gossip_persistence_sender, gossip_persistence_receiver) =
2929
mpsc::channel::<GossipMessage>(100);
@@ -50,7 +50,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
5050

5151
let cur_schema = client.query("SELECT db_schema FROM config WHERE id = $1", &[&1]).await.unwrap();
5252
if !cur_schema.is_empty() {
53-
config::upgrade_db(cur_schema[0].get(0), &mut client).await;
53+
config::upgrade_db(cur_schema[0].get(0), &mut client, self.logger.clone()).await;
5454
}
5555

5656
let preparation = client.execute("set time zone UTC", &[]).await;

src/verifier.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ impl<L: Deref + Clone + Send + Sync + 'static> ChainVerifier<L> where L::Target:
5656
.await.map(|txo| txo.value.to_sat())
5757
}
5858

59+
pub(crate) async fn retrieve_txo(client: Arc<RestClient>, short_channel_id: u64, logger: L) -> Result<TxOut, UtxoLookupError> {
60+
Self::retrieve_cache_txo(client, None, short_channel_id, logger).await
61+
}
62+
5963
async fn retrieve_cache_txo(client: Arc<RestClient>, channel_funding_amounts: Option<Arc<Mutex<HashMap<u64, u64>>>>, short_channel_id: u64, logger: L) -> Result<TxOut, UtxoLookupError> {
6064
let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes
6165
let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32;

0 commit comments

Comments
 (0)