Skip to content

Commit 9412c42

Browse files
committed
node: Proper async channel monitor persist
1 parent b6fd67e commit 9412c42

File tree

4 files changed

+166
-42
lines changed

4 files changed

+166
-42
lines changed

node/src/init.rs

Lines changed: 67 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,24 @@ use common::rng::Crng;
1919
use lightning::chain;
2020
use lightning::chain::chainmonitor::ChainMonitor;
2121
use lightning::chain::keysinterface::KeysInterface;
22+
use lightning::chain::transaction::OutPoint;
2223
use lightning::routing::gossip::P2PGossipSync;
2324
use lightning_invoice::payment;
2425
use lightning_invoice::utils::DefaultRouter;
2526
use tokio::net::TcpListener;
2627
use tokio::runtime::Handle;
2728
use tokio::sync::{broadcast, mpsc};
2829
use tracing::{debug, error, info, instrument};
30+
use tokio::task::JoinHandle;
2931

3032
use crate::api::ApiClient;
3133
use crate::event_handler::LdkEventHandler;
3234
use crate::inactivity_timer::InactivityTimer;
3335
use crate::lexe::background_processor::LexeBackgroundProcessor;
3436
use crate::lexe::bitcoind::LexeBitcoind;
35-
use crate::lexe::channel_manager::LexeChannelManager;
37+
use crate::lexe::channel_manager::{
38+
LexeChannelManager, LxChannelMonitorUpdate,
39+
};
3640
use crate::lexe::keys_manager::LexeKeysManager;
3741
use crate::lexe::logger::LexeTracingLogger;
3842
use crate::lexe::peer_manager::LexePeerManager;
@@ -132,8 +136,20 @@ impl LexeNode {
132136
let fee_estimator = bitcoind.clone();
133137
let broadcaster = bitcoind.clone();
134138

139+
// Init Tokio channels
140+
let (activity_tx, activity_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
141+
let (shutdown_tx, shutdown_rx) =
142+
broadcast::channel(DEFAULT_CHANNEL_SIZE);
143+
let (channel_monitor_updated_tx, channel_monitor_updated_rx) =
144+
mpsc::channel(DEFAULT_CHANNEL_SIZE);
145+
135146
// Initialize Persister
136-
let persister = LexePersister::new(api.clone(), node_pk, measurement);
147+
let persister = LexePersister::new(
148+
api.clone(),
149+
node_pk,
150+
measurement,
151+
channel_monitor_updated_tx,
152+
);
137153

138154
// Initialize the ChainMonitor
139155
let chain_monitor = Arc::new(ChainMonitor::new(
@@ -144,6 +160,16 @@ impl LexeNode {
144160
persister.clone(),
145161
));
146162

163+
// Set up the persister -> chain monitor channel
164+
// TODO(max): Handle the handle
165+
let channel_monitor_updated_shutdown_rx = shutdown_tx.subscribe();
166+
let _channel_monitor_updated_handle =
167+
spawn_channel_monitor_updated_task(
168+
chain_monitor.clone(),
169+
channel_monitor_updated_rx,
170+
channel_monitor_updated_shutdown_rx,
171+
);
172+
147173
// Read the `ChannelMonitor`s and initialize the `P2PGossipSync`
148174
let (channel_monitors_res, gossip_sync_res) = tokio::join!(
149175
channel_monitors(&persister, keys_manager.clone()),
@@ -198,12 +224,7 @@ impl LexeNode {
198224
)
199225
.await;
200226

201-
// Init Tokio channels
202-
let (activity_tx, activity_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
203-
let (shutdown_tx, shutdown_rx) =
204-
broadcast::channel(DEFAULT_CHANNEL_SIZE);
205-
206-
// build owner service TLS config for authenticating owner
227+
// Build owner service TLS config for authenticating owner
207228
let node_dns = args.node_dns_name.clone();
208229
let owner_tls = node_run_tls_config(rng, root_seed, vec![node_dns])
209230
.context("Failed to build owner service TLS config")?;
@@ -297,7 +318,8 @@ impl LexeNode {
297318
);
298319

299320
// Spawn a task to regularly reconnect to channel peers
300-
spawn_p2p_reconnect_task(
321+
// TODO(max): Handle the handle
322+
let _reconnect_handle = spawn_p2p_reconnect_task(
301323
channel_manager.clone(),
302324
peer_manager.clone(),
303325
stop_listen_connect.clone(),
@@ -577,7 +599,7 @@ fn spawn_p2p_reconnect_task(
577599
peer_manager: LexePeerManager,
578600
stop_listen_connect: Arc<AtomicBool>,
579601
persister: LexePersister,
580-
) {
602+
) -> JoinHandle<()> {
581603
tokio::spawn(async move {
582604
let mut interval = tokio::time::interval(Duration::from_secs(60));
583605
loop {
@@ -611,5 +633,39 @@ fn spawn_p2p_reconnect_task(
611633
}
612634
}
613635
}
614-
});
636+
})
637+
}
638+
639+
/// Spawns a task that that lets the persister make calls to the chain monitor.
640+
/// For now, it simply listens on `channel_monitor_updated_rx` and calls
641+
/// `ChainMonitor::channel_monitor_updated()` with any received values. This is
642+
/// required because (a) the chain monitor cannot be initialized without the
643+
/// persister, therefore (b) the persister cannot hold the chain monitor,
644+
/// therefore there needs to be another means of letting the persister notify
645+
/// the channel manager of events.
646+
pub fn spawn_channel_monitor_updated_task(
647+
chain_monitor: Arc<ChainMonitorType>,
648+
mut channel_monitor_updated_rx: mpsc::Receiver<LxChannelMonitorUpdate>,
649+
mut shutdown_rx: broadcast::Receiver<()>,
650+
) -> JoinHandle<()> {
651+
info!("Starting channel_monitor_updated task");
652+
tokio::spawn(async move {
653+
loop {
654+
tokio::select! {
655+
Some(update) = channel_monitor_updated_rx.recv() => {
656+
if let Err(e) = chain_monitor.channel_monitor_updated(
657+
OutPoint::from(update.funding_txo),
658+
update.update_id,
659+
) {
660+
// ApiError impls Debug but not std::error::Error
661+
error!("channel_monitor_updated returned Err: {:?}", e);
662+
}
663+
}
664+
_ = shutdown_rx.recv() => {
665+
info!("channel_monitor_updated task shutting down");
666+
break;
667+
}
668+
}
669+
}
670+
})
615671
}

node/src/lexe/channel_manager/types.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
use bitcoin::secp256k1::PublicKey;
2+
use lightning::chain::chainmonitor::MonitorUpdateId;
23
use lightning::ln::channelmanager::{ChannelCounterparty, ChannelDetails};
34
use serde::Serialize;
45

56
use crate::lexe::types::LxOutPoint;
67

8+
pub struct LxChannelMonitorUpdate {
9+
pub funding_txo: LxOutPoint,
10+
pub update_id: MonitorUpdateId,
11+
}
12+
713
#[derive(Serialize)]
814
pub struct LxChannelDetails {
915
pub channel_id: [u8; 32],

node/src/lexe/persister.rs

Lines changed: 84 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ use lightning::routing::scoring::{
1818
ProbabilisticScorer, ProbabilisticScoringParameters,
1919
};
2020
use lightning::util::ser::{ReadableArgs, Writeable};
21-
use tokio::runtime::Handle;
21+
use tokio::sync::mpsc;
2222
use tracing::{debug, error};
2323

24-
use crate::lexe::channel_manager::USER_CONFIG;
24+
use crate::lexe::channel_manager::{LxChannelMonitorUpdate, USER_CONFIG};
2525
use crate::lexe::keys_manager::LexeKeysManager;
2626
use crate::lexe::logger::LexeTracingLogger;
2727
use crate::lexe::peer_manager::ChannelPeer;
@@ -53,8 +53,14 @@ impl LexePersister {
5353
api: ApiClientType,
5454
node_pk: PublicKey,
5555
measurement: Measurement,
56+
channel_monitor_updated_tx: mpsc::Sender<LxChannelMonitorUpdate>,
5657
) -> Self {
57-
let inner = InnerPersister::new(api, node_pk, measurement);
58+
let inner = InnerPersister::new(
59+
api,
60+
node_pk,
61+
measurement,
62+
channel_monitor_updated_tx,
63+
);
5864
Self { inner }
5965
}
6066
}
@@ -73,18 +79,21 @@ pub struct InnerPersister {
7379
api: ApiClientType,
7480
node_pk: PublicKey,
7581
measurement: Measurement,
82+
channel_monitor_updated_tx: mpsc::Sender<LxChannelMonitorUpdate>,
7683
}
7784

7885
impl InnerPersister {
7986
fn new(
8087
api: ApiClientType,
8188
node_pk: PublicKey,
8289
measurement: Measurement,
90+
channel_monitor_updated_tx: mpsc::Sender<LxChannelMonitorUpdate>,
8391
) -> Self {
8492
Self {
8593
api,
8694
node_pk,
8795
measurement,
96+
channel_monitor_updated_tx,
8897
}
8998
}
9099

@@ -153,6 +162,7 @@ impl InnerPersister {
153162
keys_manager: LexeKeysManager,
154163
) -> anyhow::Result<Vec<(BlockHash, ChannelMonitorType)>> {
155164
debug!("Reading channel monitors");
165+
// TODO Also attempt to read from the cloud
156166

157167
let cm_dir = Directory {
158168
node_pk: self.node_pk,
@@ -396,11 +406,10 @@ impl Persist<SignerType> for InnerPersister {
396406
&self,
397407
funding_txo: OutPoint,
398408
monitor: &ChannelMonitorType,
399-
_update_id: MonitorUpdateId,
409+
update_id: MonitorUpdateId,
400410
) -> Result<(), ChannelMonitorUpdateErr> {
401-
let outpoint = LxOutPoint::from(funding_txo);
402-
let outpoint_str = outpoint.to_string();
403-
debug!("Persisting new channel {}", outpoint_str);
411+
let funding_txo = LxOutPoint::from(funding_txo);
412+
debug!("Persisting new channel {}", funding_txo);
404413

405414
// FIXME(encrypt): Encrypt under key derived from seed
406415
let data = monitor.encode();
@@ -409,31 +418,53 @@ impl Persist<SignerType> for InnerPersister {
409418
self.node_pk,
410419
self.measurement,
411420
CHANNEL_MONITORS_DIRECTORY.to_owned(),
412-
outpoint_str,
421+
funding_txo.to_string(),
413422
data,
414423
);
424+
let update = LxChannelMonitorUpdate {
425+
funding_txo,
426+
update_id,
427+
};
415428

416-
// Run an async fn inside a sync fn inside a Tokio runtime
417-
Handle::current()
418-
.block_on(async move { self.api.create_file(&cm_file).await })
419-
.map(|_| ())
420-
.map_err(|e| {
421-
// TODO(max): Implement durability then make this err permanent
422-
error!("Could not persist new channel monitor: {:#}", e);
423-
ChannelMonitorUpdateErr::TemporaryFailure
424-
})
429+
// Spawn a task for persisting the channel monitor
430+
let api_clone = self.api.clone();
431+
let channel_monitor_updated_tx =
432+
self.channel_monitor_updated_tx.clone();
433+
tokio::spawn(async move {
434+
// Retry indefinitely until it succeeds
435+
loop {
436+
// TODO Also attempt to persist to cloud backup
437+
match api_clone.create_file(&cm_file).await {
438+
Ok(_file) => {
439+
if let Err(e) =
440+
channel_monitor_updated_tx.try_send(update)
441+
{
442+
error!("Couldn't notify chain monitor: {:#}", e);
443+
}
444+
return;
445+
}
446+
Err(e) => {
447+
error!("Couldn't persist new channel monitor: {:#}", e)
448+
}
449+
}
450+
}
451+
});
452+
453+
// As documented in the `Persist` trait docs, return `TemporaryFailure`,
454+
// which freezes the channel until persistence succeeds.
455+
Err(ChannelMonitorUpdateErr::TemporaryFailure)
425456
}
426457

427458
fn update_persisted_channel(
428459
&self,
429460
funding_txo: OutPoint,
461+
// TODO: We probably want to use this for rollback protection
430462
_update: &Option<ChannelMonitorUpdate>,
431463
monitor: &ChannelMonitorType,
432-
_update_id: MonitorUpdateId,
464+
update_id: MonitorUpdateId,
433465
) -> Result<(), ChannelMonitorUpdateErr> {
434-
let outpoint = LxOutPoint::from(funding_txo);
435-
let outpoint_str = outpoint.to_string();
436-
debug!("Updating persisted channel {}", outpoint_str);
466+
let funding_txo = LxOutPoint::from(funding_txo);
467+
debug!("Updating persisted channel {}", funding_txo);
437468

438469
// FIXME(encrypt): Encrypt under key derived from seed
439470
let data = monitor.encode();
@@ -442,18 +473,40 @@ impl Persist<SignerType> for InnerPersister {
442473
self.node_pk,
443474
self.measurement,
444475
CHANNEL_MONITORS_DIRECTORY.to_owned(),
445-
outpoint_str,
476+
funding_txo.to_string(),
446477
data,
447478
);
479+
let update = LxChannelMonitorUpdate {
480+
funding_txo,
481+
update_id,
482+
};
448483

449-
// Run an async fn inside a sync fn inside a Tokio runtime
450-
Handle::current()
451-
.block_on(async move { self.api.upsert_file(&cm_file).await })
452-
.map(|_| ())
453-
.map_err(|e| {
454-
// TODO(max): Implement durability then make this err permanent
455-
error!("Could not update persisted channel monitor: {:#}", e);
456-
ChannelMonitorUpdateErr::TemporaryFailure
457-
})
484+
// Spawn a task for persisting the channel monitor
485+
let api_clone = self.api.clone();
486+
let channel_monitor_updated_tx =
487+
self.channel_monitor_updated_tx.clone();
488+
tokio::spawn(async move {
489+
// Retry indefinitely until it succeeds
490+
loop {
491+
// TODO Also attempt to persist to cloud backup
492+
match api_clone.upsert_file(&cm_file).await {
493+
Ok(_) => {
494+
if let Err(e) =
495+
channel_monitor_updated_tx.try_send(update)
496+
{
497+
error!("Couldn't notify chain monitor: {:#}", e);
498+
}
499+
return;
500+
}
501+
Err(e) => {
502+
error!("Could not update channel monitor: {:#}", e)
503+
}
504+
}
505+
}
506+
});
507+
508+
// As documented in the `Persist` trait docs, return `TemporaryFailure`,
509+
// which freezes the channel until persistence succeeds.
510+
Err(ChannelMonitorUpdateErr::TemporaryFailure)
458511
}
459512
}

node/src/lexe/types.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@ impl From<OutPoint> for LxOutPoint {
3434
}
3535
}
3636

37+
impl From<LxOutPoint> for OutPoint {
38+
fn from(op: LxOutPoint) -> Self {
39+
Self {
40+
txid: op.txid,
41+
index: op.index,
42+
}
43+
}
44+
}
45+
3746
/// Deserializes from <txid>_<index>
3847
impl FromStr for LxOutPoint {
3948
type Err = anyhow::Error;

0 commit comments

Comments
 (0)