Skip to content

Commit 6df2096

Browse files
committed
Parallelize persistence in the async bg processor
1 parent b356433 commit 6df2096

File tree

3 files changed

+98
-53
lines changed

3 files changed

+98
-53
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 90 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,13 @@ use lightning_rapid_gossip_sync::RapidGossipSync;
7070

7171
use lightning_liquidity::ALiquidityManager;
7272

73+
use core::future::Future;
7374
use core::ops::Deref;
75+
use core::pin::Pin;
7476
use core::time::Duration;
7577

78+
use lightning::util::async_poll::{MultiResultFuturePoller, ResultFuture};
79+
7680
#[cfg(feature = "std")]
7781
use core::sync::atomic::{AtomicBool, Ordering};
7882
#[cfg(feature = "std")]
@@ -627,11 +631,11 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
627631
pub async fn process_events_async<
628632
'a,
629633
UL: 'static + Deref,
630-
CF: 'static + Deref,
631-
T: 'static + Deref,
632-
F: 'static + Deref,
634+
CF: 'static + Deref + Sync,
635+
T: 'static + Deref + Sync,
636+
F: 'static + Deref + Sync,
633637
G: 'static + Deref<Target = NetworkGraph<L>>,
634-
L: 'static + Deref,
638+
L: 'static + Deref + Sync,
635639
P: 'static + Deref,
636640
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
637641
EventHandler: Fn(Event) -> EventHandlerFuture,
@@ -646,10 +650,10 @@ pub async fn process_events_async<
646650
RGS: 'static + Deref<Target = RapidGossipSync<G, L>>,
647651
PM: 'static + Deref,
648652
LM: 'static + Deref,
649-
D: 'static + Deref,
650-
O: 'static + Deref,
651-
K: 'static + Deref,
652-
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
653+
D: 'static + Deref + Sync,
654+
O: 'static + Deref + Sync,
655+
K: 'static + Deref + Sync,
656+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>> + Clone + Send,
653657
S: 'static + Deref<Target = SC> + Send + Sync,
654658
SC: for<'b> WriteableScore<'b>,
655659
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
@@ -826,17 +830,27 @@ where
826830
None => {},
827831
}
828832

833+
let mut futures = Vec::new();
834+
829835
// Persist channel manager.
830836
if channel_manager.get_cm().get_and_clear_needs_persistence() {
831837
log_trace!(logger, "Persisting ChannelManager...");
832-
kv_store
833-
.write(
834-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
835-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
836-
CHANNEL_MANAGER_PERSISTENCE_KEY,
837-
&channel_manager.get_cm().encode(),
838-
)
839-
.await?;
838+
let res = kv_store.write(
839+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
840+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
841+
CHANNEL_MANAGER_PERSISTENCE_KEY,
842+
&channel_manager.get_cm().encode(),
843+
);
844+
845+
let fut: Pin<Box<dyn Future<Output = Result<(), String>> + Send + 'static>> =
846+
Box::pin(async move {
847+
res.await.map_err(|e| {
848+
format!("Error: Failed to persist channel manager, check your disk and permissions {}", e)
849+
})
850+
});
851+
852+
futures.push(ResultFuture::Pending(fut));
853+
840854
log_trace!(logger, "Done persisting ChannelManager.");
841855
}
842856

@@ -864,17 +878,21 @@ where
864878
log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually.");
865879
log_trace!(logger, "Persisting network graph.");
866880
}
867-
if let Err(e) = kv_store
868-
.write(
869-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
870-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
871-
NETWORK_GRAPH_PERSISTENCE_KEY,
872-
&network_graph.encode(),
873-
)
874-
.await
875-
{
876-
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
877-
}
881+
let res = kv_store.write(
882+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
883+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
884+
NETWORK_GRAPH_PERSISTENCE_KEY,
885+
&network_graph.encode(),
886+
);
887+
let fut: Pin<Box<dyn Future<Output = Result<(), String>> + Send + 'static>> =
888+
Box::pin(async move {
889+
res.await.map_err(|e| {
890+
format!("Error: Failed to persist network graph, check your disk and permissions {}", e)
891+
})
892+
});
893+
894+
futures.push(ResultFuture::Pending(fut));
895+
878896
have_pruned = true;
879897
}
880898
let prune_timer =
@@ -901,21 +919,20 @@ where
901919
} else {
902920
log_trace!(logger, "Persisting scorer");
903921
}
904-
if let Err(e) = kv_store
905-
.write(
906-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
907-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
908-
SCORER_PERSISTENCE_KEY,
909-
&scorer.encode(),
910-
)
911-
.await
912-
{
913-
log_error!(
914-
logger,
915-
"Error: Failed to persist scorer, check your disk and permissions {}",
916-
e
917-
);
918-
}
922+
let res = kv_store.write(
923+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
924+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
925+
SCORER_PERSISTENCE_KEY,
926+
&scorer.encode(),
927+
);
928+
let fut: Pin<Box<dyn Future<Output = Result<(), String>> + Send + 'static>> =
929+
Box::pin(async move {
930+
res.await.map_err(|e| {
931+
format!("Error: Failed to persist scorer, check your disk and permissions {}", e)
932+
})
933+
});
934+
935+
futures.push(ResultFuture::Pending(fut));
919936
}
920937
last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
921938
},
@@ -928,14 +945,34 @@ where
928945
Some(false) => {
929946
log_trace!(logger, "Regenerating sweeper spends if necessary");
930947
if let Some(ref sweeper) = sweeper {
931-
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
948+
let sweeper = sweeper.clone();
949+
let fut: Pin<Box<dyn Future<Output = Result<(), String>> + Send + 'static>> =
950+
Box::pin(async move {
951+
sweeper.regenerate_and_broadcast_spend_if_necessary().await.map_err(
952+
|_| {
953+
format!(
954+
"Error: sweeper failed to regenerate and broadcast spends"
955+
)
956+
},
957+
)
958+
});
959+
960+
futures.push(ResultFuture::Pending(fut));
932961
}
933962
last_sweeper_call = sleeper(SWEEPER_TIMER);
934963
},
935964
Some(true) => break,
936965
None => {},
937966
}
938967

968+
// Run persistence tasks in parallel.
969+
let multi_res = MultiResultFuturePoller::new(futures).await;
970+
for res in multi_res {
971+
if let Err(e) = res {
972+
log_error!(logger, "Error: {}", e);
973+
}
974+
}
975+
939976
// Onion messenger timer tick.
940977
match check_sleeper(&mut last_onion_message_handler_call) {
941978
Some(false) => {
@@ -1025,9 +1062,9 @@ fn check_sleeper<SleepFuture: core::future::Future<Output = bool> + core::marker
10251062
/// synchronous background persistence.
10261063
pub async fn process_events_async_with_kv_store_sync<
10271064
UL: 'static + Deref,
1028-
CF: 'static + Deref,
1029-
T: 'static + Deref,
1030-
F: 'static + Deref,
1065+
CF: 'static + Deref + Sync,
1066+
T: 'static + Deref + Sync,
1067+
F: 'static + Deref + Sync,
10311068
G: 'static + Deref<Target = NetworkGraph<L>>,
10321069
L: 'static + Deref + Send + Sync,
10331070
P: 'static + Deref,
@@ -1044,10 +1081,13 @@ pub async fn process_events_async_with_kv_store_sync<
10441081
RGS: 'static + Deref<Target = RapidGossipSync<G, L>>,
10451082
PM: 'static + Deref,
10461083
LM: 'static + Deref,
1047-
D: 'static + Deref,
1048-
O: 'static + Deref,
1049-
K: 'static + Deref,
1050-
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, KVStoreSyncWrapper<K>, L, O>>,
1084+
D: 'static + Deref + Sync,
1085+
O: 'static + Deref + Sync,
1086+
K: 'static + Deref + Sync,
1087+
OS: 'static
1088+
+ Deref<Target = OutputSweeper<T, D, F, CF, KVStoreSyncWrapper<K>, L, O>>
1089+
+ Clone
1090+
+ Send,
10511091
S: 'static + Deref<Target = SC> + Send + Sync,
10521092
SC: for<'b> WriteableScore<'b>,
10531093
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,

lightning/src/util/async_poll.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,21 @@ use core::marker::Unpin;
1515
use core::pin::Pin;
1616
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
1717

18-
pub(crate) enum ResultFuture<F: Future<Output = Result<(), E>>, E: Unpin> {
18+
/// A future that can be in a pending or ready state, where the ready state contains a `Result`.
19+
pub enum ResultFuture<F: Future<Output = Result<(), E>>, E: Unpin> {
20+
/// The future is still pending and needs to be polled again.
1921
Pending(F),
22+
/// The future has completed and contains a result.
2023
Ready(Result<(), E>),
2124
}
2225

23-
pub(crate) struct MultiResultFuturePoller<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> {
26+
/// A utility to poll multiple futures that return results, collecting their results into a vector.
27+
pub struct MultiResultFuturePoller<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> {
2428
futures_state: Vec<ResultFuture<F, E>>,
2529
}
2630

2731
impl<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> MultiResultFuturePoller<F, E> {
32+
/// Creates a new `MultiResultFuturePoller` with the given futures.
2833
pub fn new(futures_state: Vec<ResultFuture<F, E>>) -> Self {
2934
Self { futures_state }
3035
}

lightning/src/util/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pub mod ser;
3232
pub mod sweep;
3333
pub mod wakers;
3434

35-
pub(crate) mod async_poll;
35+
pub mod async_poll;
3636
pub(crate) mod atomic_counter;
3737
pub(crate) mod byte_utils;
3838
pub mod hash_tables;

0 commit comments

Comments
 (0)