Skip to content

Commit 092a50a

Browse files
committed
notify ChainMonitor's EventNotifier on async write completion
In c084767 we a `ChainMonitor::new_async_beta` and a corresponding `MonitorUpdatingPersisterAsync` struct. To avoid circular references, the `ChainMonitor` owns the `MonitorUpdatingPersisterAsync` which uses a `FutureSpawner` to let async writes happen in the background after it returns control flow ownership to the `ChainMonitor`. This is great, except that because `MonitorUpdatingPersisterAsync` thus doesn't have a reference to the `ChainMonitor`, we have to poll for completion of the futures. We do so in the new `Persist::get_and_clear_completed_updates` that was added in the above-referenced commit. But on async monitor write completion we're supposed to call `ChainMonitor`'s `event_notifier.notify()` (ultimately waking up the background processor which `await`s the corresponding update future). This didn't happen in the new async flow. Here we move `ChainMonitor`'s `event_notifier` into an `Arc` and pass a reference to it through to the `MonitorUpdatingPersisterAsync` which can then directly `notify()` it and wake the background processor.
1 parent c1bca16 commit 092a50a

File tree

2 files changed

+23
-9
lines changed

2 files changed

+23
-9
lines changed

lightning/src/chain/chainmonitor.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
use bitcoin::block::Header;
2727
use bitcoin::hash_types::{BlockHash, Txid};
2828

29+
use bitcoin::secp256k1::PublicKey;
30+
2931
use crate::chain;
3032
use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
3133
#[cfg(peer_storage)]
@@ -57,7 +59,8 @@ use crate::util::persist::{KVStore, MonitorName, MonitorUpdatingPersisterAsync};
5759
#[cfg(peer_storage)]
5860
use crate::util::ser::{VecWriter, Writeable};
5961
use crate::util::wakers::{Future, Notifier};
60-
use bitcoin::secp256k1::PublicKey;
62+
63+
use alloc::sync::Arc;
6164
#[cfg(peer_storage)]
6265
use core::iter::Cycle;
6366
use core::ops::Deref;
@@ -267,6 +270,7 @@ pub struct AsyncPersister<
267270
FE::Target: FeeEstimator,
268271
{
269272
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>,
273+
event_notifier: Arc<Notifier>,
270274
}
271275

272276
impl<
@@ -314,15 +318,17 @@ where
314318
&self, monitor_name: MonitorName,
315319
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
316320
) -> ChannelMonitorUpdateStatus {
317-
self.persister.spawn_async_persist_new_channel(monitor_name, monitor);
321+
let notifier = Arc::clone(&self.event_notifier);
322+
self.persister.spawn_async_persist_new_channel(monitor_name, monitor, notifier);
318323
ChannelMonitorUpdateStatus::InProgress
319324
}
320325

321326
fn update_persisted_channel(
322327
&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>,
323328
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
324329
) -> ChannelMonitorUpdateStatus {
325-
self.persister.spawn_async_update_persisted_channel(monitor_name, monitor_update, monitor);
330+
let notifier = Arc::clone(&self.event_notifier);
331+
self.persister.spawn_async_update_channel(monitor_name, monitor_update, monitor, notifier);
326332
ChannelMonitorUpdateStatus::InProgress
327333
}
328334

@@ -382,7 +388,7 @@ pub struct ChainMonitor<
382388

383389
/// A [`Notifier`] used to wake up the background processor in case we have any [`Event`]s for
384390
/// it to give to users (or [`MonitorEvent`]s for `ChannelManager` to process).
385-
event_notifier: Notifier,
391+
event_notifier: Arc<Notifier>,
386392

387393
/// Messages to send to the peer. This is currently used to distribute PeerStorage to channel partners.
388394
pending_send_only_events: Mutex<Vec<MessageSendEvent>>,
@@ -430,17 +436,18 @@ impl<
430436
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, T, F>, _entropy_source: ES,
431437
_our_peerstorage_encryption_key: PeerStorageKey,
432438
) -> Self {
439+
let event_notifier = Arc::new(Notifier::new());
433440
Self {
434441
monitors: RwLock::new(new_hash_map()),
435442
chain_source,
436443
broadcaster,
437444
logger,
438445
fee_estimator: feeest,
439-
persister: AsyncPersister { persister },
440446
_entropy_source,
441447
pending_monitor_events: Mutex::new(Vec::new()),
442448
highest_chain_height: AtomicUsize::new(0),
443-
event_notifier: Notifier::new(),
449+
event_notifier: Arc::clone(&event_notifier),
450+
persister: AsyncPersister { persister, event_notifier },
444451
pending_send_only_events: Mutex::new(Vec::new()),
445452
#[cfg(peer_storage)]
446453
our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
@@ -656,7 +663,7 @@ where
656663
_entropy_source,
657664
pending_monitor_events: Mutex::new(Vec::new()),
658665
highest_chain_height: AtomicUsize::new(0),
659-
event_notifier: Notifier::new(),
666+
event_notifier: Arc::new(Notifier::new()),
660667
pending_send_only_events: Mutex::new(Vec::new()),
661668
#[cfg(peer_storage)]
662669
our_peerstorage_encryption_key: _our_peerstorage_encryption_key,

lightning/src/util/persist.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use crate::util::async_poll::{dummy_waker, AsyncResult, MaybeSend, MaybeSync};
3838
use crate::util::logger::Logger;
3939
use crate::util::native_async::FutureSpawner;
4040
use crate::util::ser::{Readable, ReadableArgs, Writeable};
41+
use crate::util::wakers::Notifier;
4142

4243
/// The alphabet of characters allowed for namespaces and keys.
4344
pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str =
@@ -875,6 +876,7 @@ where
875876
pub(crate) fn spawn_async_persist_new_channel(
876877
&self, monitor_name: MonitorName,
877878
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
879+
notifier: Arc<Notifier>,
878880
) {
879881
let inner = Arc::clone(&self.0);
880882
// Note that `persist_new_channel` is a sync method which calls all the way through to the
@@ -884,7 +886,10 @@ where
884886
let completion = (monitor.channel_id(), monitor.get_latest_update_id());
885887
self.0.future_spawner.spawn(async move {
886888
match future.await {
887-
Ok(()) => inner.async_completed_updates.lock().unwrap().push(completion),
889+
Ok(()) => {
890+
inner.async_completed_updates.lock().unwrap().push(completion);
891+
notifier.notify();
892+
},
888893
Err(e) => {
889894
log_error!(
890895
inner.logger,
@@ -895,9 +900,10 @@ where
895900
});
896901
}
897902

898-
pub(crate) fn spawn_async_update_persisted_channel(
903+
pub(crate) fn spawn_async_update_channel(
899904
&self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
900905
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
906+
notifier: Arc<Notifier>,
901907
) {
902908
let inner = Arc::clone(&self.0);
903909
// Note that `update_persisted_channel` is a sync method which calls all the way through to
@@ -914,6 +920,7 @@ where
914920
match future.await {
915921
Ok(()) => if let Some(completion) = completion {
916922
inner.async_completed_updates.lock().unwrap().push(completion);
923+
notifier.notify();
917924
},
918925
Err(e) => {
919926
log_error!(

0 commit comments

Comments
 (0)