Skip to content

Commit b2ac308

Browse files
committed
node: Implement LexeBackgroundProcessor
1 parent 2e6758b commit b2ac308

File tree

6 files changed

+172
-95
lines changed

6 files changed

+172
-95
lines changed

node/src/command/test/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ fn default_args_for_user(user_pk: UserPk) -> StartArgs {
5959
/// locate the local bitcoind executable.
6060
///
6161
/// https://github.com/RCasatta/bitcoind/issues/77
62+
#[rustfmt::skip]
6263
fn bitcoind_exe_path() -> String {
6364
use std::env;
6465
// "/Users/fang/lexe/client/target/debug/build/bitcoind-65c3b20abafd4893/out/bitcoin/bitcoin-22.0/bin/bitcoind"

node/src/init.rs

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use lightning::chain;
2222
use lightning::chain::chainmonitor::ChainMonitor;
2323
use lightning::chain::keysinterface::KeysInterface;
2424
use lightning::routing::gossip::P2PGossipSync;
25-
use lightning_background_processor::BackgroundProcessor;
2625
use lightning_invoice::payment;
2726
use lightning_invoice::utils::DefaultRouter;
2827
use secrecy::ExposeSecret;
@@ -33,6 +32,7 @@ use tokio::sync::{broadcast, mpsc};
3332
use crate::api::ApiClient;
3433
use crate::event_handler::LdkEventHandler;
3534
use crate::inactivity_timer::InactivityTimer;
35+
use crate::lexe::background_processor::LexeBackgroundProcessor;
3636
use crate::lexe::bitcoind::LexeBitcoind;
3737
use crate::lexe::channel_manager::LexeChannelManager;
3838
use crate::lexe::keys_manager::LexeKeysManager;
@@ -42,8 +42,8 @@ use crate::lexe::persister::LexePersister;
4242
use crate::lexe::sync::SyncedChainListeners;
4343
use crate::types::{
4444
ApiClientType, BlockSourceType, BroadcasterType, ChainMonitorType,
45-
ChannelMonitorType, FeeEstimatorType, GossipSyncType, InvoicePayerType,
46-
NetworkGraphType, P2PGossipSyncType, PaymentInfoStorageType, WalletType,
45+
ChannelMonitorType, FeeEstimatorType, InvoicePayerType, NetworkGraphType,
46+
P2PGossipSyncType, PaymentInfoStorageType, WalletType,
4747
};
4848
use crate::{api, command};
4949

@@ -83,7 +83,6 @@ pub struct LexeNode {
8383
outbound_payments: PaymentInfoStorageType,
8484
shutdown_rx: broadcast::Receiver<()>,
8585
stop_listen_connect: Arc<AtomicBool>,
86-
background_processor: BackgroundProcessor,
8786
}
8887

8988
impl LexeNode {
@@ -285,15 +284,17 @@ impl LexeNode {
285284
));
286285

287286
// Start Background Processing
288-
let background_processor = BackgroundProcessor::start(
287+
// TODO(max): Handle the handle
288+
let bgp_shutdown_rx = shutdown_tx.subscribe();
289+
let _bgp_handle = LexeBackgroundProcessor::start(
290+
channel_manager.clone(),
291+
peer_manager.clone(),
289292
persister.clone(),
290-
invoice_payer.clone(),
291293
chain_monitor.clone(),
292-
channel_manager.clone(),
293-
GossipSyncType::P2P(gossip_sync.clone()),
294-
peer_manager.as_arc_inner(),
295-
logger.clone(),
296-
Some(scorer.clone()),
294+
invoice_payer.clone(),
295+
gossip_sync.clone(),
296+
scorer.clone(),
297+
bgp_shutdown_rx,
297298
);
298299

299300
// Spawn a task to regularly reconnect to channel peers
@@ -336,7 +337,6 @@ impl LexeNode {
336337
outbound_payments,
337338
shutdown_rx,
338339
stop_listen_connect,
339-
background_processor,
340340
};
341341
Ok(node)
342342
}
@@ -411,9 +411,6 @@ impl LexeNode {
411411
self.stop_listen_connect.store(true, Ordering::Release);
412412
self.peer_manager.disconnect_all_peers();
413413

414-
// Stop the background processor.
415-
self.background_processor.stop().unwrap();
416-
417414
Ok(())
418415
}
419416
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
use std::ops::Deref;
2+
use std::sync::{Arc, Mutex};
3+
use std::time::Duration;
4+
5+
use lightning::util::events::EventsProvider;
6+
use tokio::sync::broadcast;
7+
use tokio::task::JoinHandle;
8+
use tokio::time::{interval, interval_at, Instant};
9+
use tracing::{debug, error, info, trace};
10+
11+
use crate::lexe::channel_manager::LexeChannelManager;
12+
use crate::lexe::peer_manager::LexePeerManager;
13+
use crate::lexe::persister::LexePersister;
14+
use crate::types::{
15+
ChainMonitorType, InvoicePayerType, P2PGossipSyncType,
16+
ProbabilisticScorerType,
17+
};
18+
19+
const PROCESS_EVENTS_INTERVAL: Duration = Duration::from_millis(1000);
20+
const PEER_MANAGER_PING_INTERVAL: Duration = Duration::from_secs(15);
21+
const CHANNEL_MANAGER_TICK_INTERVAL: Duration = Duration::from_secs(60);
22+
const CHANNEL_MANAGER_POLL_INTERVAL: Duration = Duration::from_millis(1000);
23+
const NETWORK_GRAPH_INITIAL_DELAY: Duration = Duration::from_secs(60);
24+
const NETWORK_GRAPH_PRUNE_INTERVAL: Duration = Duration::from_secs(15 * 60);
25+
const PROB_SCORER_PERSIST_INTERVAL: Duration = Duration::from_secs(5 * 60);
26+
27+
/// A Tokio-native background processor that runs on a single task and does not
28+
/// spawn any OS threads. Modeled after the lightning-background-processor crate
29+
/// provided by LDK - see that crate's implementation for more details.
30+
pub struct LexeBackgroundProcessor {}
31+
32+
impl LexeBackgroundProcessor {
33+
#[allow(clippy::too_many_arguments)]
34+
pub fn start(
35+
channel_manager: LexeChannelManager,
36+
peer_manager: LexePeerManager,
37+
persister: LexePersister,
38+
chain_monitor: Arc<ChainMonitorType>,
39+
event_handler: Arc<InvoicePayerType>,
40+
gossip_sync: Arc<P2PGossipSyncType>,
41+
scorer: Arc<Mutex<ProbabilisticScorerType>>,
42+
mut shutdown_rx: broadcast::Receiver<()>,
43+
) -> JoinHandle<()> {
44+
tokio::task::spawn(async move {
45+
let mut process_timer = interval(PROCESS_EVENTS_INTERVAL);
46+
let mut pm_timer = interval(PEER_MANAGER_PING_INTERVAL);
47+
let mut cm_tick_timer = interval(CHANNEL_MANAGER_TICK_INTERVAL);
48+
let mut cm_poll_timer = interval(CHANNEL_MANAGER_POLL_INTERVAL);
49+
let start = Instant::now() + NETWORK_GRAPH_INITIAL_DELAY;
50+
let mut ng_timer = interval_at(start, NETWORK_GRAPH_PRUNE_INTERVAL);
51+
let mut ps_timer = interval(PROB_SCORER_PERSIST_INTERVAL);
52+
53+
loop {
54+
tokio::select! {
55+
// --- Event branches --- //
56+
_ = process_timer.tick() => {
57+
trace!("Processing pending events");
58+
channel_manager
59+
.process_pending_events(&event_handler);
60+
chain_monitor
61+
.process_pending_events(&event_handler);
62+
peer_manager.process_events();
63+
}
64+
_ = pm_timer.tick() => {
65+
debug!("Calling PeerManager::timer_tick_occurred()");
66+
peer_manager.timer_tick_occurred();
67+
}
68+
_ = cm_tick_timer.tick() => {
69+
debug!("Calling ChannelManager::timer_tick_occurred()");
70+
channel_manager.timer_tick_occurred();
71+
}
72+
73+
// --- Persistence branches --- //
74+
_ = cm_poll_timer.tick() => {
75+
trace!("Polling channel manager for updates");
76+
// TODO Use get_persistence_condvar_value instead
77+
let timeout = Duration::from_millis(10);
78+
let needs_persist = channel_manager
79+
.await_persistable_update_timeout(timeout);
80+
if needs_persist {
81+
// TODO Log err and shut down if persist fails
82+
persister.persist_manager(channel_manager.deref()).await
83+
.expect("TODO: Shut down if persist fails");
84+
}
85+
}
86+
_ = ng_timer.tick() => {
87+
debug!("Pruning and persisting network graph");
88+
let network_graph = gossip_sync.network_graph();
89+
network_graph.remove_stale_channels();
90+
// TODO Log err and shut down if persist fails
91+
persister.persist_graph(network_graph).await
92+
.expect("TODO: Shut down if persist fails");
93+
}
94+
_ = ps_timer.tick() => {
95+
debug!("Persisting probabilistic scorer");
96+
// TODO Log err and shut down if persist fails
97+
persister.persist_scorer(scorer.as_ref()).await
98+
.expect("TODO: Shut down if persist fails");
99+
}
100+
101+
// --- Shutdown branch --- //
102+
_ = shutdown_rx.recv() => {
103+
info!("Background processor shutting down");
104+
break;
105+
}
106+
}
107+
}
108+
109+
// Persist everything one last time.
110+
// - For the channel manager, this may prevent some races where the
111+
// node quits while channel updates were in-flight, causing
112+
// ChannelMonitor updates to be persisted without corresponding
113+
// ChannelManager updating being persisted. This does not risk the
114+
// loss of funds, but upon next boot the ChannelManager may
115+
// accidentally trigger a force close..
116+
// - For the network graph and scorer, it is possible that the node
117+
// is shut down before they have gotten a chance to be persisted,
118+
// (e.g. `shutdown_after_sync_if_no_activity` is set), and since
119+
// we're already another API call for the channel manager, we
120+
// might as well concurrently persist these as well.
121+
let network_graph = gossip_sync.network_graph();
122+
let (cm_res, ng_res, ps_res) = tokio::join!(
123+
persister.persist_manager(channel_manager.deref()),
124+
persister.persist_graph(network_graph),
125+
persister.persist_scorer(scorer.as_ref()),
126+
);
127+
for res in [cm_res, ng_res, ps_res] {
128+
if let Err(e) = res {
129+
error!("Final persistence failure: {:#}", e);
130+
}
131+
}
132+
})
133+
}
134+
}

node/src/lexe/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! The `lexe` module contains Lexe newtypes for bitcoin / lightning types
22
//! (usually) defined in LDK.
33
4+
pub mod background_processor;
45
pub mod bitcoind;
56
pub mod channel_manager;
67
pub mod keys_manager;

node/src/lexe/persister.rs

Lines changed: 23 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::io::{self, Cursor, ErrorKind};
1+
use std::io::Cursor;
22
use std::ops::Deref;
33
use std::str::FromStr;
44
use std::sync::{Arc, Mutex};
@@ -17,13 +17,10 @@ use lightning::routing::gossip::NetworkGraph as LdkNetworkGraph;
1717
use lightning::routing::scoring::{
1818
ProbabilisticScorer, ProbabilisticScoringParameters,
1919
};
20-
use lightning::util::persist::Persister;
2120
use lightning::util::ser::{ReadableArgs, Writeable};
22-
use once_cell::sync::Lazy;
23-
use tokio::runtime::{Builder, Handle, Runtime};
21+
use tokio::runtime::Handle;
2422
use tracing::{debug, error};
2523

26-
use crate::lexe::bitcoind::LexeBitcoind;
2724
use crate::lexe::channel_manager::USER_CONFIG;
2825
use crate::lexe::keys_manager::LexeKeysManager;
2926
use crate::lexe::logger::LexeTracingLogger;
@@ -317,42 +314,11 @@ impl InnerPersister {
317314
.map(|_| ())
318315
.map_err(|e| e.into())
319316
}
320-
}
321317

322-
/// A Tokio runtime which can be used to run async closures in sync fns
323-
/// downstream of thread::spawn()
324-
static PERSISTER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
325-
Builder::new_current_thread()
326-
.enable_io()
327-
// Because our reqwest::Client has a configured timeout
328-
.enable_time()
329-
.build()
330-
.unwrap()
331-
});
332-
333-
/// This trait is defined in lightning::util::Persist.
334-
///
335-
/// The methods in this trait are called inside a `thread::spawn()` within
336-
/// `BackgroundProcessor::start()`, meaning that the thread-local context for
337-
/// these function do not contain a Tokio (async) runtime. Thus, we offer a
338-
/// lazily-initialized `PERSISTER_RUNTIME` above which the `Persister` methods
339-
/// use to run async closures inside their synchronous functions.
340-
impl<'a>
341-
Persister<
342-
'a,
343-
SignerType,
344-
Arc<ChainMonitorType>,
345-
Arc<LexeBitcoind>,
346-
LexeKeysManager,
347-
Arc<LexeBitcoind>,
348-
LexeTracingLogger,
349-
Mutex<ProbabilisticScorerType>,
350-
> for InnerPersister
351-
{
352-
fn persist_manager(
318+
pub async fn persist_manager(
353319
&self,
354320
channel_manager: &ChannelManagerType,
355-
) -> Result<(), io::Error> {
321+
) -> anyhow::Result<()> {
356322
debug!("Persisting channel manager");
357323

358324
// FIXME(encrypt): Encrypt under key derived from seed
@@ -366,20 +332,17 @@ impl<'a>
366332
data,
367333
);
368334

369-
// Run an async fn inside a sync fn downstream of thread::spawn()
370-
PERSISTER_RUNTIME
371-
.block_on(async move { self.api.upsert_file(cm_file).await })
335+
self.api
336+
.upsert_file(cm_file)
337+
.await
372338
.map(|_| ())
373-
.map_err(|api_err| {
374-
error!("Could not persist channel manager: {:#}", api_err);
375-
io::Error::new(ErrorKind::Other, api_err)
376-
})
339+
.context("Could not persist channel manager")
377340
}
378341

379-
fn persist_graph(
342+
pub async fn persist_graph(
380343
&self,
381344
network_graph: &NetworkGraphType,
382-
) -> Result<(), io::Error> {
345+
) -> anyhow::Result<()> {
383346
debug!("Persisting network graph");
384347
// FIXME(encrypt): Encrypt under key derived from seed
385348
let data = network_graph.encode();
@@ -392,20 +355,17 @@ impl<'a>
392355
data,
393356
);
394357

395-
// Run an async fn inside a sync fn downstream of thread::spawn()
396-
PERSISTER_RUNTIME
397-
.block_on(async move { self.api.upsert_file(file).await })
358+
self.api
359+
.upsert_file(file)
360+
.await
398361
.map(|_| ())
399-
.map_err(|api_err| {
400-
error!("Could not persist network graph: {:#}", api_err);
401-
io::Error::new(ErrorKind::Other, api_err)
402-
})
362+
.context("Could not persist network graph")
403363
}
404364

405-
fn persist_scorer(
365+
pub async fn persist_scorer(
406366
&self,
407367
scorer_mutex: &Mutex<ProbabilisticScorerType>,
408-
) -> Result<(), io::Error> {
368+
) -> anyhow::Result<()> {
409369
debug!("Persisting probabilistic scorer");
410370

411371
let scorer_file = {
@@ -423,13 +383,11 @@ impl<'a>
423383
)
424384
};
425385

426-
PERSISTER_RUNTIME.block_on(async move {
427-
self.api
428-
.upsert_file(scorer_file)
429-
.await
430-
.map(|_| ())
431-
.map_err(|api_err| io::Error::new(ErrorKind::Other, api_err))
432-
})
386+
self.api
387+
.upsert_file(scorer_file)
388+
.await
389+
.map(|_| ())
390+
.context("Could not persist scorer")
433391
}
434392
}
435393

@@ -465,6 +423,7 @@ impl Persist<SignerType> for InnerPersister {
465423
// Even though this is a temporary failure that can be retried,
466424
// we should still log it
467425
error!("Could not persist new channel monitor: {:#}", e);
426+
// TODO(max): After the async dance this failure should be permanent
468427
ChannelMonitorUpdateErr::TemporaryFailure
469428
})
470429
}
@@ -501,6 +460,7 @@ impl Persist<SignerType> for InnerPersister {
501460
// Even though this is a temporary failure that can be retried,
502461
// we should still log it
503462
error!("Could not update persisted channel monitor: {:#}", e);
463+
// TODO(max): After the async dance this failure should be permanent
504464
ChannelMonitorUpdateErr::TemporaryFailure
505465
})
506466
}

0 commit comments

Comments
 (0)