Skip to content

Commit b670bd3

Browse files
committed
bgp+monitor: Remove process_events_[tx/rx]
It looks like LDK #2090 *did* resolve our issue - but it notifies the BGP only after *all* monitor updates are handled. See lightningdevkit/rust-lightning#3660 (comment) for more info. Also took the opportunity to: - Instrument the channel monitor persister task - Make the monitor logs easier to grep by using a consistent message - Log how long it takes to process events - Remove the unneeded `idx` field in `lexe_ln::channel_monitor` - Remove the unneeded `Error` type in `lexe_ln::channel_monitor`
1 parent 02c15c9 commit b670bd3

File tree

4 files changed

+62
-120
lines changed

4 files changed

+62
-120
lines changed

lexe-ln/src/background_processor.rs

Lines changed: 21 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
use std::{sync::Arc, time::Duration};
22

3-
use common::{notify_once::NotifyOnce, task::LxTask};
4-
use tokio::{
5-
sync::{mpsc, oneshot},
6-
time::Instant,
7-
};
8-
use tracing::{debug, error, info_span, Instrument};
3+
use common::{notify_once::NotifyOnce, task::LxTask, time::DisplayMs};
4+
use tokio::time::Instant;
5+
use tracing::{debug, error, info, info_span, warn, Instrument};
96

107
use crate::{
118
alias::{LexeChainMonitorType, LexeOnionMessengerType},
@@ -49,16 +46,6 @@ pub fn start<CM, PM, PS, EH>(
4946
chain_monitor: Arc<LexeChainMonitorType<PS>>,
5047
onion_messenger: Arc<LexeOnionMessengerType<CM>>,
5148
event_handler: EH,
52-
// TODO(max): A `process_events` notification should be sent every time
53-
// an event is generated which does not also cause the future returned
54-
// by `get_event_or_persistence_needed_future()` to resolve.
55-
//
56-
// Ideally, we can remove this channel entirely, but a manual trigger
57-
// is currently still required after every channel monitor
58-
// persist (which may resume monitor updating and create more
59-
// events). This was supposed to be resolved by LDK#2052 and
60-
// LDK#2090, but our integration tests still fail without this channel.
61-
mut process_events_rx: mpsc::Receiver<oneshot::Sender<()>>,
6249
mut shutdown: NotifyOnce,
6350
) -> LxTask<()>
6451
where
@@ -71,17 +58,17 @@ where
7158
"background processor",
7259
info_span!("(bgp)"),
7360
async move {
74-
let now = Instant::now();
61+
let bgp_start = Instant::now();
7562

7663
let mk_interval = |delay: Duration, interval: Duration| {
7764
// Remove the staggering in debug mode in an attempt to catch
7865
// any subtle race conditions which may arise
79-
let start = if cfg!(debug_assertions) {
80-
now
66+
let timer_start = if cfg!(debug_assertions) {
67+
bgp_start
8168
} else {
82-
now + delay
69+
bgp_start + delay
8370
};
84-
tokio::time::interval_at(start, interval)
71+
tokio::time::interval_at(timer_start, interval)
8572
};
8673

8774
let mut process_events_timer =
@@ -100,10 +87,9 @@ where
10087
//
10188
// - Our process events timer ticked
10289
// - The channel manager got an update (event or repersist)
103-
// - The chain monitor got an update
90+
// - The chain monitor got an update (typically that all updates
91+
// were persisted for a channel monitor)
10492
// - The onion messenger got an update
105-
// - The background processor was explicitly triggered
106-
let mut processed_txs = Vec::new();
10793
let process_events_fut = async {
10894
tokio::select! {
10995
biased;
@@ -116,27 +102,18 @@ where
116102
debug!("Triggered: Chain monitor update"),
117103
_ = onion_messenger.get_update_future() =>
118104
debug!("Triggered: Onion messenger update"),
119-
// TODO(max): If LDK has fixed the BGP waking issue,
120-
// our integration tests should pass with this branch
121-
// commented out.
122-
Some(tx) = process_events_rx.recv() => {
123-
debug!("Triggered: process_events channel");
124-
processed_txs.push(tx);
125-
}
126105
};
127106

128107
// We're about to process events. Prevent duplicate work by
129108
// resetting the process_events_timer & clearing out the
130109
// process_events channel.
131110
process_events_timer.reset();
132-
while let Ok(tx) = process_events_rx.try_recv() {
133-
processed_txs.push(tx);
134-
}
135111
};
136112

137113
tokio::select! {
138114
() = process_events_fut => {
139115
debug!("Processing pending events");
116+
let process_start = Instant::now();
140117

141118
channel_manager
142119
.process_pending_events_async(mk_event_handler_fut)
@@ -159,11 +136,6 @@ where
159136
peer_manager.process_events();
160137
}.instrument(info_span!("(process-bgp)(peer-man)")).await;
161138

162-
// Notify waiters that events have been processed.
163-
for tx in processed_txs {
164-
let _ = tx.send(());
165-
}
166-
167139
if channel_manager.get_and_clear_needs_persistence() {
168140
let try_persist = persister
169141
.persist_manager(channel_manager.deref())
@@ -177,6 +149,16 @@ where
177149
break shutdown.send();
178150
}
179151
}
152+
153+
let elapsed = process_start.elapsed();
154+
let elapsed_ms = DisplayMs(elapsed);
155+
if elapsed > Duration::from_secs(10) {
156+
warn!("Event processing took {elapsed_ms}");
157+
} else if elapsed > Duration::from_secs(1) {
158+
info!("Event processing took {elapsed_ms}");
159+
} else {
160+
debug!("Event processing took {elapsed_ms}");
161+
}
180162
}
181163

182164
_ = pm_timer.tick() =>

lexe-ln/src/channel_monitor.rs

Lines changed: 35 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,18 @@
11
use std::{
22
fmt::{self, Display},
33
sync::Arc,
4-
time::Duration,
54
};
65

6+
use anyhow::{anyhow, Context};
77
use common::{ln::channel::LxOutPoint, notify_once::NotifyOnce, task::LxTask};
88
use lightning::chain::transaction::OutPoint;
9-
use thiserror::Error;
10-
use tokio::sync::{mpsc, oneshot};
11-
use tracing::{debug, error, info};
9+
use tokio::sync::mpsc;
10+
use tracing::{debug, error, info, info_span};
1211

1312
use crate::{
1413
alias::LexeChainMonitorType, traits::LexePersister, BoxedAnyhowFuture,
1514
};
1615

17-
/// How long we'll wait to receive a reply from the background processor that
18-
/// event processing is complete.
19-
// 45s because we've seen this timeout on the node.
20-
const PROCESS_EVENTS_TIMEOUT: Duration = Duration::from_secs(45);
21-
2216
/// Represents a channel monitor update. See docs on each field for details.
2317
pub struct LxChannelMonitorUpdate {
2418
pub funding_txo: LxOutPoint,
@@ -63,31 +57,29 @@ impl Display for ChannelMonitorUpdateKind {
6357
pub fn spawn_channel_monitor_persister_task<PS>(
6458
chain_monitor: Arc<LexeChainMonitorType<PS>>,
6559
mut channel_monitor_persister_rx: mpsc::Receiver<LxChannelMonitorUpdate>,
66-
process_events_tx: mpsc::Sender<oneshot::Sender<()>>,
6760
mut shutdown: NotifyOnce,
6861
) -> LxTask<()>
6962
where
7063
PS: LexePersister,
7164
{
7265
debug!("Starting channel monitor persister task");
73-
LxTask::spawn("channel monitor persister", async move {
74-
let mut idx = 0;
66+
const SPAN_NAME: &str = "(chan-monitor-persister)";
67+
LxTask::spawn_with_span(SPAN_NAME, info_span!(SPAN_NAME), async move {
7568
loop {
7669
tokio::select! {
7770
Some(update) = channel_monitor_persister_rx.recv() => {
78-
idx += 1;
7971

8072
let handle_result = handle_update(
8173
chain_monitor.as_ref(),
8274
update,
83-
idx,
84-
&process_events_tx,
8575
).await;
8676

8777
if let Err(e) = handle_result {
8878
error!("Monitor persist error: {e:#}");
8979

90-
// All errors are considered fatal.
80+
// Channel monitor persistence errors are serious;
81+
// all errors are considered fatal.
82+
// Shut down to prevent any loss of funds.
9183
shutdown.send();
9284
break;
9385
}
@@ -101,24 +93,6 @@ where
10193
})
10294
}
10395

104-
/// Errors that can occur when handling a channel monitor update.
105-
///
106-
/// This enum is intentionally kept private; it exists solely to prevent the
107-
/// caller from having to use some variant of `err_str.contains(..)`
108-
#[derive(Debug, Error)]
109-
enum Error {
110-
#[error("Couldn't persist {kind} channel #{idx}: {inner:#}")]
111-
PersistFailure {
112-
kind: ChannelMonitorUpdateKind,
113-
idx: usize,
114-
inner: anyhow::Error,
115-
},
116-
#[error("Chain monitor returned err: {0:?}")]
117-
ChainMonitor(lightning::util::errors::APIError),
118-
#[error("Timed out waiting for events to be processed")]
119-
EventsProcessTimeout,
120-
}
121-
12296
/// A helper to prevent [`spawn_channel_monitor_persister_task`]'s control flow
12397
/// from getting too complex.
12498
///
@@ -127,46 +101,36 @@ enum Error {
127101
async fn handle_update<PS: LexePersister>(
128102
chain_monitor: &LexeChainMonitorType<PS>,
129103
update: LxChannelMonitorUpdate,
130-
idx: usize,
131-
process_events_tx: &mpsc::Sender<oneshot::Sender<()>>,
132-
) -> Result<(), Error> {
133-
debug!("Handling channel monitor update #{idx}");
104+
) -> anyhow::Result<()> {
105+
let LxChannelMonitorUpdate {
106+
funding_txo,
107+
update_id,
108+
api_call_fut,
109+
kind,
110+
} = update;
134111

135-
// Run the persist future.
136-
let kind = update.kind;
137-
if let Err(inner) = update.api_call_fut.await {
138-
// Channel monitor persistence errors are serious;
139-
// return err and shut down to prevent any loss of funds.
140-
return Err(Error::PersistFailure { kind, idx, inner });
141-
}
112+
debug!(%kind, %funding_txo, %update_id, "Handling channel monitor update");
142113

143-
// Update the chain monitor with the update id and funding txo the channel
144-
// monitor update.
145-
let chain_monitor_update_res = chain_monitor.channel_monitor_updated(
146-
OutPoint::from(update.funding_txo),
147-
update.update_id,
148-
);
149-
if let Err(e) = chain_monitor_update_res {
150-
// If the update wasn't accepted, the channel is disabled, so no
151-
// transactions can be made. Just return err and shut down.
152-
return Err(Error::ChainMonitor(e));
153-
}
154-
155-
// Trigger the background processor to reprocess events, as the completed
156-
// channel monitor update may have generated an event that can be handled,
157-
// such as to restore monitor updating and broadcast a funding tx.
158-
// Furthermore, wait for the event to be handled.
159-
debug!("Triggering BGP via process_events_tx");
160-
let (processed_tx, processed_rx) = oneshot::channel();
161-
let _ = process_events_tx.try_send(processed_tx);
162-
163-
tokio::time::timeout(PROCESS_EVENTS_TIMEOUT, processed_rx)
114+
// Run the persist future.
115+
api_call_fut
164116
.await
165-
.map_err(|_| Error::EventsProcessTimeout)?
166-
// Channel sender dropped, probably means we're shutting down.
167-
.ok();
168-
169-
info!("Success: persisted {kind} channel #{idx}");
117+
.with_context(|| format!("{kind} {funding_txo} {update_id}"))
118+
.context("Channel monitor persist API call failed")?;
119+
120+
// Notify the chain monitor that the monitor update has been persisted.
121+
// - This should trigger a log like "Completed off-chain monitor update ..."
122+
// - NOTE: After this update, there may still be more updates to persist.
123+
// The LDK log message will say "all off-chain updates complete" or "still
124+
// have pending off-chain updates" (common during payments)
125+
// - NOTE: Only after *all* channel monitor updates are handled will the
126+
// channel be reenabled and the BGP woken to process events via the chain
127+
// monitor future.
128+
chain_monitor
129+
.channel_monitor_updated(OutPoint::from(funding_txo), update_id)
130+
.map_err(|e| anyhow!("{kind} {funding_txo} {update_id}: {e:?}"))
131+
.context("channel_monitor_updated returned Err")?;
132+
133+
info!(%kind, %funding_txo, %update_id, "Success: persisted monitor");
170134

171135
Ok(())
172136
}

node/src/persister/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -780,7 +780,8 @@ impl Persist<SignerType> for NodePersister {
780780
) -> ChannelMonitorUpdateStatus {
781781
let funding_txo = LxOutPoint::from(funding_txo);
782782
let update_id = monitor.get_latest_update_id();
783-
info!(%funding_txo, %update_id, "Persisting new channel");
783+
let kind = ChannelMonitorUpdateKind::New;
784+
info!(%kind, %funding_txo, %update_id, "Persisting channel monitor");
784785

785786
let file_id =
786787
VfsFileId::new(CHANNEL_MONITORS_DIR, funding_txo.to_string());
@@ -809,8 +810,6 @@ impl Persist<SignerType> for NodePersister {
809810
}
810811
});
811812

812-
let kind = ChannelMonitorUpdateKind::New;
813-
814813
let update = LxChannelMonitorUpdate {
815814
funding_txo,
816815
update_id,
@@ -847,7 +846,8 @@ impl Persist<SignerType> for NodePersister {
847846
.as_ref()
848847
.map(|u| u.update_id)
849848
.unwrap_or_else(|| monitor.get_latest_update_id());
850-
info!(%funding_txo, %update_id, "Updating persisted channel");
849+
let kind = ChannelMonitorUpdateKind::Updated;
850+
info!(%kind, %funding_txo, %update_id, "Persisting channel monitor");
851851

852852
let file_id =
853853
VfsFileId::new(CHANNEL_MONITORS_DIR, funding_txo.to_string());
@@ -871,8 +871,6 @@ impl Persist<SignerType> for NodePersister {
871871
}
872872
});
873873

874-
let kind = ChannelMonitorUpdateKind::Updated;
875-
876874
let update = LxChannelMonitorUpdate {
877875
funding_txo,
878876
update_id,
@@ -898,6 +896,8 @@ impl Persist<SignerType> for NodePersister {
898896
}
899897

900898
fn archive_persisted_channel(&self, funding_txo: OutPoint) {
899+
info!(%funding_txo, "Archiving channel monitor");
900+
901901
let backend_api = self.backend_api.clone();
902902
let authenticator = self.authenticator.clone();
903903
let vfs_master_key = self.vfs_master_key.clone();

node/src/run.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -618,12 +618,9 @@ impl UserNode {
618618
.push(event::spawn_event_replayer_task(event_handler.clone()));
619619

620620
// Set up the channel monitor persistence task
621-
let (process_events_tx, process_events_rx) =
622-
mpsc::channel(DEFAULT_CHANNEL_SIZE);
623621
let task = channel_monitor::spawn_channel_monitor_persister_task(
624622
chain_monitor.clone(),
625623
channel_monitor_persister_rx,
626-
process_events_tx,
627624
shutdown.clone(),
628625
);
629626
static_tasks.push(task);
@@ -756,7 +753,6 @@ impl UserNode {
756753
chain_monitor.clone(),
757754
onion_messenger.clone(),
758755
event_handler,
759-
process_events_rx,
760756
shutdown.clone(),
761757
);
762758
static_tasks.push(bg_processor_task);

0 commit comments

Comments
 (0)