Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions lightning-block-sync/src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ use bitcoin::hash_types::BlockHash;
use bitcoin::transaction::{OutPoint, TxOut};

use lightning::ln::peer_handler::APeerManager;

use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoLookupError, UtxoResult};

use lightning::util::logger::Logger;
use lightning::util::native_async::FutureSpawner;

use std::collections::VecDeque;
use std::future::Future;
Expand Down Expand Up @@ -43,17 +42,6 @@ pub trait UtxoSource: BlockSource + 'static {
fn is_output_unspent<'a>(&'a self, outpoint: OutPoint) -> AsyncBlockSourceResult<'a, bool>;
}

/// A generic trait which is able to spawn futures in the background.
///
/// If the `tokio` feature is enabled, this is implemented on `TokioSpawner` struct which
/// delegates to `tokio::spawn()`.
pub trait FutureSpawner: Send + Sync + 'static {
/// Spawns the given future as a background task.
///
/// This method MUST NOT block on the given future immediately.
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T);
}

#[cfg(feature = "tokio")]
/// A trivial [`FutureSpawner`] which delegates to `tokio::spawn`.
pub struct TokioSpawner;
Expand Down
164 changes: 162 additions & 2 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ use crate::ln::our_peer_storage::{DecryptedOurPeerStorage, PeerStorageMonitorHol
use crate::ln::types::ChannelId;
use crate::prelude::*;
use crate::sign::ecdsa::EcdsaChannelSigner;
use crate::sign::{EntropySource, PeerStorageKey};
use crate::sign::{EntropySource, PeerStorageKey, SignerProvider};
use crate::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard};
use crate::types::features::{InitFeatures, NodeFeatures};
use crate::util::async_poll::{MaybeSend, MaybeSync};
use crate::util::errors::APIError;
use crate::util::logger::{Logger, WithContext};
use crate::util::persist::MonitorName;
use crate::util::native_async::FutureSpawner;
use crate::util::persist::{KVStore, MonitorName, MonitorUpdatingPersisterAsync};
#[cfg(peer_storage)]
use crate::util::ser::{VecWriter, Writeable};
use crate::util::wakers::{Future, Notifier};
Expand Down Expand Up @@ -192,6 +194,17 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
/// restart, this method must in that case be idempotent, ensuring it can handle scenarios where
/// the monitor already exists in the archive.
fn archive_persisted_channel(&self, monitor_name: MonitorName);

/// Fetches the set of [`ChannelMonitorUpdate`]s, previously persisted with
/// [`Self::update_persisted_channel`], which have completed.
///
/// Returning an update here is equivalent to calling
/// [`ChainMonitor::channel_monitor_updated`]. Because of this, this method is defaulted and
/// hidden in the docs.
#[doc(hidden)]
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
Vec::new()
}
}

struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
Expand Down Expand Up @@ -235,6 +248,93 @@ impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedChannelMonitor<'_, Chann
}
}

/// An unconstructable [`Persist`]er which is used under the hood when you call
/// [`ChainMonitor::new_async_beta`].
pub struct AsyncPersister<
K: Deref + MaybeSend + MaybeSync + 'static,
S: FutureSpawner,
L: Deref + MaybeSend + MaybeSync + 'static,
ES: Deref + MaybeSend + MaybeSync + 'static,
SP: Deref + MaybeSend + MaybeSync + 'static,
BI: Deref + MaybeSend + MaybeSync + 'static,
FE: Deref + MaybeSend + MaybeSync + 'static,
> where
K::Target: KVStore + MaybeSync,
L::Target: Logger,
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator,
{
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>,
}

impl<
K: Deref + MaybeSend + MaybeSync + 'static,
S: FutureSpawner,
L: Deref + MaybeSend + MaybeSync + 'static,
ES: Deref + MaybeSend + MaybeSync + 'static,
SP: Deref + MaybeSend + MaybeSync + 'static,
BI: Deref + MaybeSend + MaybeSync + 'static,
FE: Deref + MaybeSend + MaybeSync + 'static,
> Deref for AsyncPersister<K, S, L, ES, SP, BI, FE>
where
K::Target: KVStore + MaybeSync,
L::Target: Logger,
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator,
{
type Target = Self;
fn deref(&self) -> &Self {
self
}
}

impl<
K: Deref + MaybeSend + MaybeSync + 'static,
S: FutureSpawner,
L: Deref + MaybeSend + MaybeSync + 'static,
ES: Deref + MaybeSend + MaybeSync + 'static,
SP: Deref + MaybeSend + MaybeSync + 'static,
BI: Deref + MaybeSend + MaybeSync + 'static,
FE: Deref + MaybeSend + MaybeSync + 'static,
> Persist<<SP::Target as SignerProvider>::EcdsaSigner> for AsyncPersister<K, S, L, ES, SP, BI, FE>
where
K::Target: KVStore + MaybeSync,
L::Target: Logger,
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator,
<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
{
fn persist_new_channel(
&self, monitor_name: MonitorName,
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
) -> ChannelMonitorUpdateStatus {
self.persister.spawn_async_persist_new_channel(monitor_name, monitor);
ChannelMonitorUpdateStatus::InProgress
}

fn update_persisted_channel(
&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>,
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
) -> ChannelMonitorUpdateStatus {
self.persister.spawn_async_update_persisted_channel(monitor_name, monitor_update, monitor);
ChannelMonitorUpdateStatus::InProgress
}

fn archive_persisted_channel(&self, monitor_name: MonitorName) {
self.persister.spawn_async_archive_persisted_channel(monitor_name);
}

fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
self.persister.get_and_clear_completed_updates()
}
}

/// An implementation of [`chain::Watch`] for monitoring channels.
///
/// Connected and disconnected blocks must be provided to `ChainMonitor` as documented by
Expand Down Expand Up @@ -291,6 +391,63 @@ pub struct ChainMonitor<
our_peerstorage_encryption_key: PeerStorageKey,
}

impl<
K: Deref + MaybeSend + MaybeSync + 'static,
S: FutureSpawner,
SP: Deref + MaybeSend + MaybeSync + 'static,
C: Deref,
T: Deref + MaybeSend + MaybeSync + 'static,
F: Deref + MaybeSend + MaybeSync + 'static,
L: Deref + MaybeSend + MaybeSync + 'static,
ES: Deref + MaybeSend + MaybeSync + 'static,
>
ChainMonitor<
<SP::Target as SignerProvider>::EcdsaSigner,
C,
T,
F,
L,
AsyncPersister<K, S, L, ES, SP, T, F>,
ES,
> where
K::Target: KVStore + MaybeSync,
SP::Target: SignerProvider + Sized,
C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
ES::Target: EntropySource + Sized,
<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
{
/// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
///
/// This behaves the same as [`ChainMonitor::new`] except that it relies on
/// [`MonitorUpdatingPersisterAsync`] and thus allows persistence to be completed async.
///
/// Note that async monitor updating is considered beta, and bugs may be triggered by its use.
pub fn new_async_beta(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are now three ways to do persistence: sync, the previous async way via implementing a different Persist and this new_async_beta?

Is there any form of consolidation possible between the two async setups?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I mention it in the last commit, but I think the eventual consolidation should be that we merge MonitorUpdatingPersister into ChainMonitor and then the Persist interface is just the interface between ChannelManager and ChainMonitor, a user will always just instantiate a ChainMonitor with either a KVStore or a KVStoreSync and we'll deal with the rest.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me. I just wondered if we should already now steer towards MonitorUpdatingPersister with an async kv store as the only way to do async. I don't think it is more "beta" than the current callback-based async?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I'd missed this. I don't really see a strong reason to change the current API and remove the manual-async approach immediately. Its not additional code to maintain (given the new logic uses it under the hood anyway) and we do have folks using it. That said, it does probably make sense to deprecate it, which I'll go ahead and do here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh actually nevermind, we should have a discussion about if we want to support async outside of rust, which would need the old API (or a way to make async KVStore work outside of rust, which I think we can do eventually as well).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't consider the bindings. Not great to remain stuck with multiple ways to do it, but not sure what we can do either.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can map the async stuff to bindings eventually, so its not like we're stuck, just a question of priorities.

chain_source: Option<C>, broadcaster: T, logger: L, feeest: F,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why not simply fee_estimator?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cause I copied the one from new 🤷‍♂️

persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, T, F>, _entropy_source: ES,
_our_peerstorage_encryption_key: PeerStorageKey,
) -> Self {
Self {
monitors: RwLock::new(new_hash_map()),
chain_source,
broadcaster,
logger,
fee_estimator: feeest,
persister: AsyncPersister { persister },
_entropy_source,
pending_monitor_events: Mutex::new(Vec::new()),
highest_chain_height: AtomicUsize::new(0),
event_notifier: Notifier::new(),
pending_send_only_events: Mutex::new(Vec::new()),
#[cfg(peer_storage)]
our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
}
}
}

impl<
ChannelSigner: EcdsaChannelSigner,
C: Deref,
Expand Down Expand Up @@ -1357,6 +1514,9 @@ where
fn release_pending_monitor_events(
&self,
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() {
let _ = self.channel_monitor_updated(channel_id, update_id);
}
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
for monitor_state in self.monitors.read().unwrap().values() {
let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
Expand Down
Loading
Loading