Skip to content

Commit ec4510b

Browse files
committed
monitor+: Remove api_call_fut hack
- Adds `LexeInnerPersister::persist_monitor` - Uses this method instead of `api_call_fut` - Misc other improvements
1 parent ba369c6 commit ec4510b

File tree

5 files changed

+140
-147
lines changed

5 files changed

+140
-147
lines changed

common/src/constants.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,10 @@ pub const IMPORTANT_PERSIST_RETRIES: usize = 5;
7676

7777
/// The vfs directory name used by singleton objects.
7878
pub const SINGLETON_DIRECTORY: &str = ".";
79-
/// The vfs directory name used to persist events.
79+
8080
pub const EVENTS_DIR: &str = "events";
81+
pub const CHANNEL_MONITORS_DIR: &str = "channel_monitors";
82+
pub const CHANNEL_MONITORS_ARCHIVE_DIR: &str = "channel_monitors_archive";
8183

8284
pub const CHANNEL_MANAGER_FILENAME: &str = "channel_manager";
8385
pub const PW_ENC_ROOT_SEED_FILENAME: &str = "password_encrypted_root_seed";

lexe-ln/src/channel_monitor.rs

Lines changed: 16 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,13 @@ use tokio::sync::mpsc;
1010
use tracing::{debug, error, info, info_span, Instrument};
1111

1212
use crate::{
13-
alias::LexeChainMonitorType, traits::LexePersister, BoxedAnyhowFuture,
13+
alias::LexeChainMonitorType,
14+
traits::{LexeInnerPersister, LexePersister},
1415
};
1516

16-
// `api_call_fut` is a future which makes an api call (typically with
17-
// retries) to the backend to persist the channel monitor state, returning
18-
// an `anyhow::Result<()>` once either (1) persistence succeeds or (2)
19-
// there were too many failures to keep trying. We take this future as
20-
// input (instead of e.g. a `VfsFile`) because it is the cleanest and
21-
// easiest way to abstract over the user node and LSP's differing api
22-
// clients, vfs structures, and expected error types.
23-
//
24-
// TODO(max): Add a required `upsert_monitor` method to the `LexePersister`
25-
// trait to avoid this.
26-
pub type MonitorChannelItem = (LxChannelMonitorUpdate, BoxedAnyhowFuture);
27-
2817
/// Represents a channel monitor update. See docs on each field for details.
2918
pub struct LxChannelMonitorUpdate {
19+
#[allow(dead_code)] // Conceptually part of the update.
3020
kind: ChannelMonitorUpdateKind,
3121
funding_txo: LxOutPoint,
3222
/// The ID of the channel monitor update, given by
@@ -85,27 +75,24 @@ impl Display for ChannelMonitorUpdateKind {
8575
/// This prevent a race conditions where two monitor updates come in quick
8676
/// succession and the newer channel monitor state is overwritten by the older
8777
/// channel monitor state.
88-
pub fn spawn_channel_monitor_persister_task<PS>(
78+
pub fn spawn_channel_monitor_persister_task<PS: LexePersister>(
79+
persister: PS,
8980
chain_monitor: Arc<LexeChainMonitorType<PS>>,
90-
mut channel_monitor_persister_rx: mpsc::Receiver<MonitorChannelItem>,
81+
mut channel_monitor_persister_rx: mpsc::Receiver<LxChannelMonitorUpdate>,
9182
mut shutdown: NotifyOnce,
92-
) -> LxTask<()>
93-
where
94-
PS: LexePersister,
95-
{
83+
) -> LxTask<()> {
9684
debug!("Starting channel monitor persister task");
9785
const SPAN_NAME: &str = "(chan-monitor-persister)";
9886
LxTask::spawn_with_span(SPAN_NAME, info_span!(SPAN_NAME), async move {
9987
loop {
10088
tokio::select! {
101-
Some((update, api_call_fut))
102-
= channel_monitor_persister_rx.recv() => {
89+
Some(update) = channel_monitor_persister_rx.recv() => {
10390
let update_span = update.span();
10491

10592
let handle_result = handle_update(
93+
&persister,
10694
chain_monitor.as_ref(),
10795
update,
108-
api_call_fut,
10996
)
11097
.instrument(update_span.clone())
11198
.await;
@@ -137,23 +124,18 @@ where
137124
/// Since channel monitor persistence is very important, all [`Err`]s are
138125
/// considered fatal; the caller should send a shutdown signal and exit.
139126
async fn handle_update<PS: LexePersister>(
127+
persister: &PS,
140128
chain_monitor: &LexeChainMonitorType<PS>,
141129
update: LxChannelMonitorUpdate,
142-
api_call_fut: BoxedAnyhowFuture,
143130
) -> anyhow::Result<()> {
144-
let LxChannelMonitorUpdate {
145-
funding_txo,
146-
update_id,
147-
kind: _,
148-
span: _,
149-
} = update;
150-
151131
debug!("Handling channel monitor update");
152132

153-
// Run the persist future.
154-
api_call_fut
133+
// Persist the monitor.
134+
let funding_txo = OutPoint::from(update.funding_txo);
135+
persister
136+
.persist_monitor(chain_monitor, &update.funding_txo)
155137
.await
156-
.context("Channel monitor persist API call failed")?;
138+
.context("persist_monitor failed")?;
157139

158140
// Notify the chain monitor that the monitor update has been persisted.
159141
// - This should trigger a log like "Completed off-chain monitor update ..."
@@ -164,7 +146,7 @@ async fn handle_update<PS: LexePersister>(
164146
// channel be reenabled and the BGP woken to process events via the chain
165147
// monitor future.
166148
chain_monitor
167-
.channel_monitor_updated(OutPoint::from(funding_txo), update_id)
149+
.channel_monitor_updated(funding_txo, update.update_id)
168150
.map_err(|e| anyhow!("channel_monitor_updated returned Err: {e:?}"))?;
169151

170152
info!("Success: persisted monitor");

lexe-ln/src/traits.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
use std::{future::Future, ops::Deref, str::FromStr};
22

3-
use anyhow::Context;
3+
use anyhow::{anyhow, Context};
44
use async_trait::async_trait;
55
use common::{
66
api::{
77
user::NodePk,
88
vfs::{Vfs, VfsDirectory, VfsFileId},
99
},
1010
constants,
11-
ln::payments::{LxPaymentId, PaymentIndex},
11+
ln::{
12+
channel::LxOutPoint,
13+
payments::{LxPaymentId, PaymentIndex},
14+
},
1215
notify_once::NotifyOnce,
1316
};
1417
use lightning::{
@@ -75,6 +78,31 @@ pub trait LexeInnerPersister: Vfs + Persist<SignerType> {
7578
.await
7679
}
7780

81+
async fn persist_monitor<PS: LexePersister>(
82+
&self,
83+
chain_monitor: &LexeChainMonitorType<PS>,
84+
funding_txo: &LxOutPoint,
85+
) -> anyhow::Result<()> {
86+
let file = {
87+
let locked_monitor =
88+
chain_monitor.get_monitor((*funding_txo).into()).map_err(
89+
|e| anyhow!("No monitor for this funding_txo: {e:?}"),
90+
)?;
91+
92+
// NOTE: The VFS filename uses the `ToString` impl of `LxOutPoint`
93+
// rather than `lightning::chain::transaction::OutPoint` or
94+
// `bitcoin::OutPoint`! `LxOutPoint`'s FromStr/Display impls are
95+
// guaranteed to roundtrip, and will be stable across LDK versions.
96+
let filename = funding_txo.to_string();
97+
let file_id =
98+
VfsFileId::new(constants::CHANNEL_MONITORS_DIR, filename);
99+
self.encrypt_ldk_writeable(file_id, &*locked_monitor)
100+
};
101+
102+
self.persist_file(&file, constants::IMPORTANT_PERSIST_RETRIES)
103+
.await
104+
}
105+
78106
/// Reads all persisted events, along with their event IDs.
79107
async fn read_events(&self) -> anyhow::Result<Vec<(EventId, Event)>> {
80108
let dir = VfsDirectory::new(constants::EVENTS_DIR);

0 commit comments

Comments
 (0)