Skip to content

Commit b6f3705

Browse files
authored
Merge pull request #4119 from TheBlueMatt/2025-09-fewer-arcs
Drop a pile of Arcs and `OutputSweeper::new_with_kv_store_sync`
2 parents d076584 + 5e679b4 commit b6f3705

File tree

7 files changed

+202
-221
lines changed

7 files changed

+202
-221
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 113 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,9 @@ use lightning::onion_message::messenger::AOnionMessenger;
4848
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
4949
use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
5050
use lightning::routing::utxo::UtxoLookup;
51-
use lightning::sign::ChangeDestinationSource;
52-
#[cfg(feature = "std")]
53-
use lightning::sign::ChangeDestinationSourceSync;
54-
use lightning::sign::EntropySource;
55-
use lightning::sign::OutputSpender;
51+
use lightning::sign::{
52+
ChangeDestinationSource, ChangeDestinationSourceSync, EntropySource, OutputSpender,
53+
};
5654
use lightning::util::logger::Logger;
5755
use lightning::util::persist::{
5856
KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
@@ -61,9 +59,7 @@ use lightning::util::persist::{
6159
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
6260
SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
6361
};
64-
use lightning::util::sweep::OutputSweeper;
65-
#[cfg(feature = "std")]
66-
use lightning::util::sweep::OutputSweeperSync;
62+
use lightning::util::sweep::{OutputSweeper, OutputSweeperSync};
6763
#[cfg(feature = "std")]
6864
use lightning::util::wakers::Sleeper;
6965
use lightning_rapid_gossip_sync::RapidGossipSync;
@@ -304,7 +300,7 @@ where
304300

305301
/// Updates scorer based on event and returns whether an update occurred so we can decide whether
306302
/// to persist.
307-
fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
303+
fn update_scorer<'a, S: Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
308304
scorer: &'a S, event: &Event, duration_since_epoch: Duration,
309305
) -> bool {
310306
match event {
@@ -866,31 +862,30 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
866862
///```
867863
pub async fn process_events_async<
868864
'a,
869-
UL: 'static + Deref,
870-
CF: 'static + Deref,
871-
T: 'static + Deref,
872-
F: 'static + Deref,
873-
G: 'static + Deref<Target = NetworkGraph<L>>,
874-
L: 'static + Deref,
875-
P: 'static + Deref,
865+
UL: Deref,
866+
CF: Deref,
867+
T: Deref,
868+
F: Deref,
869+
G: Deref<Target = NetworkGraph<L>>,
870+
L: Deref,
871+
P: Deref,
876872
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
877873
EventHandler: Fn(Event) -> EventHandlerFuture,
878-
ES: 'static + Deref + Send,
879-
M: 'static
880-
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
874+
ES: Deref + Send,
875+
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
881876
+ Send
882877
+ Sync,
883-
CM: 'static + Deref,
884-
OM: 'static + Deref,
885-
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>>,
886-
RGS: 'static + Deref<Target = RapidGossipSync<G, L>>,
887-
PM: 'static + Deref,
888-
LM: 'static + Deref,
889-
D: 'static + Deref,
890-
O: 'static + Deref,
891-
K: 'static + Deref,
892-
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
893-
S: 'static + Deref<Target = SC> + Send + Sync,
878+
CM: Deref,
879+
OM: Deref,
880+
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
881+
RGS: Deref<Target = RapidGossipSync<G, L>>,
882+
PM: Deref,
883+
LM: Deref,
884+
D: Deref,
885+
O: Deref,
886+
K: Deref,
887+
OS: Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
888+
S: Deref<Target = SC> + Send + Sync,
894889
SC: for<'b> WriteableScore<'b>,
895890
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
896891
Sleeper: Fn(Duration) -> SleepFuture,
@@ -902,20 +897,20 @@ pub async fn process_events_async<
902897
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
903898
) -> Result<(), lightning::io::Error>
904899
where
905-
UL::Target: 'static + UtxoLookup,
906-
CF::Target: 'static + chain::Filter,
907-
T::Target: 'static + BroadcasterInterface,
908-
F::Target: 'static + FeeEstimator,
909-
L::Target: 'static + Logger,
910-
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
911-
ES::Target: 'static + EntropySource,
900+
UL::Target: UtxoLookup,
901+
CF::Target: chain::Filter,
902+
T::Target: BroadcasterInterface,
903+
F::Target: FeeEstimator,
904+
L::Target: Logger,
905+
P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
906+
ES::Target: EntropySource,
912907
CM::Target: AChannelManager,
913908
OM::Target: AOnionMessenger,
914909
PM::Target: APeerManager,
915910
LM::Target: ALiquidityManager,
916-
O::Target: 'static + OutputSpender,
917-
D::Target: 'static + ChangeDestinationSource,
918-
K::Target: 'static + KVStore,
911+
O::Target: OutputSpender,
912+
D::Target: ChangeDestinationSource,
913+
K::Target: KVStore,
919914
{
920915
let async_event_handler = |event| {
921916
let network_graph = gossip_sync.network_graph();
@@ -1340,31 +1335,30 @@ fn check_and_reset_sleeper<
13401335
/// Async events processor that is based on [`process_events_async`] but allows for [`KVStoreSync`] to be used for
13411336
/// synchronous background persistence.
13421337
pub async fn process_events_async_with_kv_store_sync<
1343-
UL: 'static + Deref,
1344-
CF: 'static + Deref,
1345-
T: 'static + Deref,
1346-
F: 'static + Deref,
1347-
G: 'static + Deref<Target = NetworkGraph<L>>,
1348-
L: 'static + Deref + Send + Sync,
1349-
P: 'static + Deref,
1338+
UL: Deref,
1339+
CF: Deref,
1340+
T: Deref,
1341+
F: Deref,
1342+
G: Deref<Target = NetworkGraph<L>>,
1343+
L: Deref + Send + Sync,
1344+
P: Deref,
13501345
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
13511346
EventHandler: Fn(Event) -> EventHandlerFuture,
1352-
ES: 'static + Deref + Send,
1353-
M: 'static
1354-
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
1347+
ES: Deref + Send,
1348+
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
13551349
+ Send
13561350
+ Sync,
1357-
CM: 'static + Deref + Send + Sync,
1358-
OM: 'static + Deref,
1359-
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>>,
1360-
RGS: 'static + Deref<Target = RapidGossipSync<G, L>>,
1361-
PM: 'static + Deref,
1362-
LM: 'static + Deref,
1363-
D: 'static + Deref,
1364-
O: 'static + Deref,
1365-
K: 'static + Deref,
1366-
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, KVStoreSyncWrapper<K>, L, O>>,
1367-
S: 'static + Deref<Target = SC> + Send + Sync,
1351+
CM: Deref + Send + Sync,
1352+
OM: Deref,
1353+
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
1354+
RGS: Deref<Target = RapidGossipSync<G, L>>,
1355+
PM: Deref,
1356+
LM: Deref,
1357+
D: Deref,
1358+
O: Deref,
1359+
K: Deref,
1360+
OS: Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>>,
1361+
S: Deref<Target = SC> + Send + Sync,
13681362
SC: for<'b> WriteableScore<'b>,
13691363
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
13701364
Sleeper: Fn(Duration) -> SleepFuture,
@@ -1376,20 +1370,20 @@ pub async fn process_events_async_with_kv_store_sync<
13761370
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
13771371
) -> Result<(), lightning::io::Error>
13781372
where
1379-
UL::Target: 'static + UtxoLookup,
1380-
CF::Target: 'static + chain::Filter,
1381-
T::Target: 'static + BroadcasterInterface,
1382-
F::Target: 'static + FeeEstimator,
1383-
L::Target: 'static + Logger,
1384-
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1385-
ES::Target: 'static + EntropySource,
1373+
UL::Target: UtxoLookup,
1374+
CF::Target: chain::Filter,
1375+
T::Target: BroadcasterInterface,
1376+
F::Target: FeeEstimator,
1377+
L::Target: Logger,
1378+
P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
1379+
ES::Target: EntropySource,
13861380
CM::Target: AChannelManager,
13871381
OM::Target: AOnionMessenger,
13881382
PM::Target: APeerManager,
13891383
LM::Target: ALiquidityManager,
1390-
O::Target: 'static + OutputSpender,
1391-
D::Target: 'static + ChangeDestinationSource,
1392-
K::Target: 'static + KVStoreSync,
1384+
O::Target: OutputSpender,
1385+
D::Target: ChangeDestinationSourceSync,
1386+
K::Target: KVStoreSync,
13931387
{
13941388
let kv_store = KVStoreSyncWrapper(kv_store);
13951389
process_events_async(
@@ -1401,7 +1395,7 @@ where
14011395
gossip_sync,
14021396
peer_manager,
14031397
liquidity_manager,
1404-
sweeper,
1398+
sweeper.as_ref().map(|os| os.sweeper_async()),
14051399
logger,
14061400
scorer,
14071401
sleeper,
@@ -1846,11 +1840,13 @@ mod tests {
18461840
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
18471841
};
18481842
use lightning::util::ser::Writeable;
1849-
use lightning::util::sweep::{OutputSpendStatus, OutputSweeperSync, PRUNE_DELAY_BLOCKS};
1843+
use lightning::util::sweep::{
1844+
OutputSpendStatus, OutputSweeper, OutputSweeperSync, PRUNE_DELAY_BLOCKS,
1845+
};
18501846
use lightning::util::test_utils;
18511847
use lightning::{get_event, get_event_msg};
18521848
use lightning_liquidity::utils::time::DefaultTimeProvider;
1853-
use lightning_liquidity::{ALiquidityManagerSync, LiquidityManagerSync};
1849+
use lightning_liquidity::{ALiquidityManagerSync, LiquidityManager, LiquidityManagerSync};
18541850
use lightning_persister::fs_store::FilesystemStore;
18551851
use lightning_rapid_gossip_sync::RapidGossipSync;
18561852
use std::collections::VecDeque;
@@ -1953,7 +1949,7 @@ mod tests {
19531949
Arc<ChannelManager>,
19541950
Arc<dyn Filter + Sync + Send>,
19551951
Arc<Persister>,
1956-
Arc<DefaultTimeProvider>,
1952+
DefaultTimeProvider,
19571953
>;
19581954

19591955
struct Node {
@@ -2779,7 +2775,18 @@ mod tests {
27792775
let kv_store_sync = Arc::new(
27802776
Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
27812777
);
2782-
let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync));
2778+
let kv_store = KVStoreSyncWrapper(kv_store_sync);
2779+
2780+
// Yes, you can unsafe { turn off the borrow checker }
2781+
let lm_async: &'static LiquidityManager<_, _, _, _, _, _> = unsafe {
2782+
&*(nodes[0].liquidity_manager.get_lm_async()
2783+
as *const LiquidityManager<_, _, _, _, _, _>)
2784+
as &'static LiquidityManager<_, _, _, _, _, _>
2785+
};
2786+
let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
2787+
&*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
2788+
as &'static OutputSweeper<_, _, _, _, _, _, _>
2789+
};
27832790

27842791
let bp_future = super::process_events_async(
27852792
kv_store,
@@ -2789,8 +2796,8 @@ mod tests {
27892796
Some(Arc::clone(&nodes[0].messenger)),
27902797
nodes[0].rapid_gossip_sync(),
27912798
Arc::clone(&nodes[0].peer_manager),
2792-
Some(nodes[0].liquidity_manager.get_lm_async()),
2793-
Some(nodes[0].sweeper.sweeper_async()),
2799+
Some(lm_async),
2800+
Some(sweeper_async),
27942801
Arc::clone(&nodes[0].logger),
27952802
Some(Arc::clone(&nodes[0].scorer)),
27962803
move |dur: Duration| {
@@ -3287,7 +3294,18 @@ mod tests {
32873294
let data_dir = nodes[0].kv_store.get_data_dir();
32883295
let kv_store_sync =
32893296
Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
3290-
let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync));
3297+
let kv_store = KVStoreSyncWrapper(kv_store_sync);
3298+
3299+
// Yes, you can unsafe { turn off the borrow checker }
3300+
let lm_async: &'static LiquidityManager<_, _, _, _, _, _> = unsafe {
3301+
&*(nodes[0].liquidity_manager.get_lm_async()
3302+
as *const LiquidityManager<_, _, _, _, _, _>)
3303+
as &'static LiquidityManager<_, _, _, _, _, _>
3304+
};
3305+
let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
3306+
&*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
3307+
as &'static OutputSweeper<_, _, _, _, _, _, _>
3308+
};
32913309

32923310
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
32933311
let bp_future = super::process_events_async(
@@ -3298,8 +3316,8 @@ mod tests {
32983316
Some(Arc::clone(&nodes[0].messenger)),
32993317
nodes[0].rapid_gossip_sync(),
33003318
Arc::clone(&nodes[0].peer_manager),
3301-
Some(nodes[0].liquidity_manager.get_lm_async()),
3302-
Some(nodes[0].sweeper.sweeper_async()),
3319+
Some(lm_async),
3320+
Some(sweeper_async),
33033321
Arc::clone(&nodes[0].logger),
33043322
Some(Arc::clone(&nodes[0].scorer)),
33053323
move |dur: Duration| {
@@ -3501,10 +3519,21 @@ mod tests {
35013519
let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
35023520
let data_dir = nodes[0].kv_store.get_data_dir();
35033521
let kv_store_sync = Arc::new(Persister::new(data_dir));
3504-
let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync));
3522+
let kv_store = KVStoreSyncWrapper(kv_store_sync);
35053523

35063524
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
35073525

3526+
// Yes, you can unsafe { turn off the borrow checker }
3527+
let lm_async: &'static LiquidityManager<_, _, _, _, _, _> = unsafe {
3528+
&*(nodes[0].liquidity_manager.get_lm_async()
3529+
as *const LiquidityManager<_, _, _, _, _, _>)
3530+
as &'static LiquidityManager<_, _, _, _, _, _>
3531+
};
3532+
let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
3533+
&*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
3534+
as &'static OutputSweeper<_, _, _, _, _, _, _>
3535+
};
3536+
35083537
let bp_future = super::process_events_async(
35093538
kv_store,
35103539
event_handler,
@@ -3513,8 +3542,8 @@ mod tests {
35133542
Some(Arc::clone(&nodes[0].messenger)),
35143543
nodes[0].no_gossip_sync(),
35153544
Arc::clone(&nodes[0].peer_manager),
3516-
Some(nodes[0].liquidity_manager.get_lm_async()),
3517-
Some(nodes[0].sweeper.sweeper_async()),
3545+
Some(lm_async),
3546+
Some(sweeper_async),
35183547
Arc::clone(&nodes[0].logger),
35193548
Some(Arc::clone(&nodes[0].scorer)),
35203549
move |dur: Duration| {

0 commit comments

Comments
 (0)