Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lightning-background-processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ bitcoin-io = { version = "0.1.2", default-features = false }
lightning = { version = "0.2.0", path = "../lightning", default-features = false }
lightning-rapid-gossip-sync = { version = "0.2.0", path = "../lightning-rapid-gossip-sync", default-features = false }
lightning-liquidity = { version = "0.2.0", path = "../lightning-liquidity", default-features = false }
futures = "0.3.31"

[dev-dependencies]
tokio = { version = "1.35", features = [ "macros", "rt", "rt-multi-thread", "sync", "time" ] }
Expand Down
102 changes: 30 additions & 72 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use lightning::util::persist::{KVStore, Persister};
use lightning::util::sweep::OutputSweeper;
#[cfg(feature = "std")]
use lightning::util::sweep::OutputSweeperSync;
use lightning::util::wakers::Sleep;
#[cfg(feature = "std")]
use lightning::util::wakers::Sleeper;
use lightning_rapid_gossip_sync::RapidGossipSync;
Expand Down Expand Up @@ -981,7 +982,7 @@ impl BackgroundProcessor {
D: 'static + Deref,
O: 'static + Deref,
K: 'static + Deref,
OS: 'static + Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>> + Send,
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>> + Send,
>(
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
Expand All @@ -999,79 +1000,33 @@ impl BackgroundProcessor {
OM::Target: AOnionMessenger,
PM::Target: APeerManager,
LM::Target: ALiquidityManager,
D::Target: ChangeDestinationSourceSync,
D::Target: ChangeDestinationSource,
O::Target: 'static + OutputSpender,
K::Target: 'static + KVStore,
{
let stop_thread = Arc::new(AtomicBool::new(false));
let stop_thread_clone = stop_thread.clone();
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
let event_handler = |event| {
let network_graph = gossip_sync.network_graph();
if let Some(network_graph) = network_graph {
handle_network_graph_update(network_graph, &event)
}
if let Some(ref scorer) = scorer {
use std::time::SystemTime;
let duration_since_epoch = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Time should be sometime after 1970");
if update_scorer(scorer, &event, duration_since_epoch) {
log_trace!(logger, "Persisting scorer after update");
if let Err(e) = persister.persist_scorer(&scorer) {
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
}
}
}
event_handler.handle_event(event)
};
define_run_body!(
let fut = process_events_async(
persister,
|event| async { event_handler.handle_event(event) },
chain_monitor,
chain_monitor.process_pending_events(&event_handler),
channel_manager,
channel_manager.get_cm().process_pending_events(&event_handler),
onion_messenger,
if let Some(om) = &onion_messenger {
om.get_om().process_pending_events(&event_handler)
},
peer_manager,
gossip_sync,
{
if let Some(ref sweeper) = sweeper {
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary();
}
},
peer_manager,
liquidity_manager,
sweeper,
logger,
scorer,
stop_thread.load(Ordering::Acquire),
{
let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) {
(Some(om), Some(lm)) => Sleeper::from_four_futures(
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
&chain_monitor.get_update_future(),
&om.get_om().get_update_future(),
&lm.get_lm().get_pending_msgs_future(),
),
(Some(om), None) => Sleeper::from_three_futures(
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
&chain_monitor.get_update_future(),
&om.get_om().get_update_future(),
),
(None, Some(lm)) => Sleeper::from_three_futures(
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
&chain_monitor.get_update_future(),
&lm.get_lm().get_pending_msgs_future(),
),
(None, None) => Sleeper::from_two_futures(
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
&chain_monitor.get_update_future(),
),
};
sleeper.wait_timeout(Duration::from_millis(100));
move |dur: Duration| {
let stop_thread_clone = stop_thread.clone();

Box::pin(async move {
Sleep::new(dur).await;
stop_thread_clone.load(Ordering::Acquire)
})
},
|_| Instant::now(),
|time: &Instant, dur| time.elapsed().as_secs() > dur,
false,
|| {
use std::time::SystemTime;
Expand All @@ -1081,7 +1036,10 @@ impl BackgroundProcessor {
.expect("Time should be sometime after 1970"),
)
},
)
);

// TODO: Implement simple executor in utils.
futures::executor::block_on(fut).map_err(Into::into)
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
}
Expand Down Expand Up @@ -1925,7 +1883,7 @@ mod tests {
nodes[0].p2p_gossip_sync(),
nodes[0].peer_manager.clone(),
Some(Arc::clone(&nodes[0].liquidity_manager)),
Some(nodes[0].sweeper.clone()),
Some(nodes[0].sweeper.sweeper_async().clone()),
nodes[0].logger.clone(),
Some(nodes[0].scorer.clone()),
);
Expand Down Expand Up @@ -2020,7 +1978,7 @@ mod tests {
nodes[0].no_gossip_sync(),
nodes[0].peer_manager.clone(),
Some(Arc::clone(&nodes[0].liquidity_manager)),
Some(nodes[0].sweeper.clone()),
Some(nodes[0].sweeper.sweeper_async().clone()),
nodes[0].logger.clone(),
Some(nodes[0].scorer.clone()),
);
Expand Down Expand Up @@ -2064,7 +2022,7 @@ mod tests {
nodes[0].no_gossip_sync(),
nodes[0].peer_manager.clone(),
Some(Arc::clone(&nodes[0].liquidity_manager)),
Some(nodes[0].sweeper.clone()),
Some(nodes[0].sweeper.sweeper_async().clone()),
nodes[0].logger.clone(),
Some(nodes[0].scorer.clone()),
);
Expand Down Expand Up @@ -2135,7 +2093,7 @@ mod tests {
nodes[0].p2p_gossip_sync(),
nodes[0].peer_manager.clone(),
Some(Arc::clone(&nodes[0].liquidity_manager)),
Some(nodes[0].sweeper.clone()),
Some(nodes[0].sweeper.sweeper_async().clone()),
nodes[0].logger.clone(),
Some(nodes[0].scorer.clone()),
);
Expand Down Expand Up @@ -2166,7 +2124,7 @@ mod tests {
nodes[0].no_gossip_sync(),
nodes[0].peer_manager.clone(),
Some(Arc::clone(&nodes[0].liquidity_manager)),
Some(nodes[0].sweeper.clone()),
Some(nodes[0].sweeper.sweeper_async().clone()),
nodes[0].logger.clone(),
Some(nodes[0].scorer.clone()),
);
Expand Down Expand Up @@ -2214,7 +2172,7 @@ mod tests {
nodes[0].no_gossip_sync(),
nodes[0].peer_manager.clone(),
Some(Arc::clone(&nodes[0].liquidity_manager)),
Some(nodes[0].sweeper.clone()),
Some(nodes[0].sweeper.sweeper_async().clone()),
nodes[0].logger.clone(),
Some(nodes[0].scorer.clone()),
);
Expand Down Expand Up @@ -2278,7 +2236,7 @@ mod tests {
nodes[0].no_gossip_sync(),
nodes[0].peer_manager.clone(),
Some(Arc::clone(&nodes[0].liquidity_manager)),
Some(nodes[0].sweeper.clone()),
Some(nodes[0].sweeper.sweeper_async().clone()),
nodes[0].logger.clone(),
Some(nodes[0].scorer.clone()),
);
Expand Down Expand Up @@ -2443,7 +2401,7 @@ mod tests {
nodes[0].no_gossip_sync(),
nodes[0].peer_manager.clone(),
Some(Arc::clone(&nodes[0].liquidity_manager)),
Some(nodes[0].sweeper.clone()),
Some(nodes[0].sweeper.sweeper_async().clone()),
nodes[0].logger.clone(),
Some(nodes[0].scorer.clone()),
);
Expand Down Expand Up @@ -2474,7 +2432,7 @@ mod tests {
nodes[0].no_gossip_sync(),
nodes[0].peer_manager.clone(),
Some(Arc::clone(&nodes[0].liquidity_manager)),
Some(nodes[0].sweeper.clone()),
Some(nodes[0].sweeper.sweeper_async().clone()),
nodes[0].logger.clone(),
Some(nodes[0].scorer.clone()),
);
Expand Down Expand Up @@ -2571,7 +2529,7 @@ mod tests {
nodes[0].rapid_gossip_sync(),
nodes[0].peer_manager.clone(),
Some(Arc::clone(&nodes[0].liquidity_manager)),
Some(nodes[0].sweeper.clone()),
Some(nodes[0].sweeper.sweeper_async().clone()),
nodes[0].logger.clone(),
Some(nodes[0].scorer.clone()),
);
Expand Down Expand Up @@ -2768,7 +2726,7 @@ mod tests {
nodes[0].no_gossip_sync(),
nodes[0].peer_manager.clone(),
Some(Arc::clone(&nodes[0].liquidity_manager)),
Some(nodes[0].sweeper.clone()),
Some(nodes[0].sweeper.sweeper_async().clone()),
nodes[0].logger.clone(),
Some(nodes[0].scorer.clone()),
);
Expand Down
45 changes: 45 additions & 0 deletions lightning/src/util/wakers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ use std::time::Duration;
use core::future::Future as StdFuture;
use core::pin::Pin;
use core::task::{Context, Poll};
use std::{
sync::atomic::{AtomicBool, Ordering},
thread,
};

/// Used to signal to one of many waiters that the condition they're waiting on has happened.
///
Expand Down Expand Up @@ -340,6 +344,47 @@ impl Sleeper {
}
}

pub struct Sleep {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already Sleeper in the repository, but from_single_future takes a future and isn't just a sleep.

is_done: Arc<AtomicBool>,
waker: Arc<Mutex<Option<Waker>>>,
}

impl Sleep {
pub fn new(duration: Duration) -> Self {
let is_done = Arc::new(AtomicBool::new(false));
let waker: Arc<Mutex<Option<Waker>>> = Arc::new(Mutex::new(None));

let is_done_clone = is_done.clone();
let waker_clone = waker.clone();

thread::spawn(move || {
thread::sleep(duration);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chatgpt implementation - probably a lot wrong about it.

is_done_clone.store(true, Ordering::SeqCst);

if let Some(w) = waker_clone.lock().unwrap().take() {
w.wake();
}
});

Self { is_done, waker }
}
}

impl core::future::Future for Sleep {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.is_done.load(Ordering::SeqCst) {
Poll::Ready(())
} else {
let mut waker_lock = self.waker.lock().unwrap();
// Store latest waker in case the task is moved or re-polled
*waker_lock = Some(cx.waker().clone());
Poll::Pending
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading