Skip to content

Commit e93275b

Browse files
committed
Use async kv store with OutputSweeper
1 parent 479e4a5 commit e93275b

File tree

3 files changed

+204
-107
lines changed

3 files changed

+204
-107
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ use lightning::sign::ChangeDestinationSourceSync;
4040
use lightning::sign::EntropySource;
4141
use lightning::sign::OutputSpender;
4242
use lightning::util::logger::Logger;
43+
use lightning::util::persist::{KVStore, Persister};
4344
#[cfg(feature = "std")]
44-
use lightning::util::persist::PersisterSync;
45-
use lightning::util::persist::{KVStoreSync, Persister};
45+
use lightning::util::persist::{KVStoreSync, PersisterSync};
4646
use lightning::util::sweep::OutputSweeper;
4747
#[cfg(feature = "std")]
4848
use lightning::util::sweep::OutputSweeperSync;
@@ -698,7 +698,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
698698
/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>>;
699699
/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
700700
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, StoreSync>;
701-
/// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<StoreSync>, Arc<Logger>, Arc<O>>;
701+
/// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<Store>, Arc<Logger>, Arc<O>>;
702702
///
703703
/// # struct Node<
704704
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
@@ -842,7 +842,7 @@ where
842842
LM::Target: ALiquidityManager,
843843
O::Target: 'static + OutputSpender,
844844
D::Target: 'static + ChangeDestinationSource,
845-
K::Target: 'static + KVStoreSync,
845+
K::Target: 'static + KVStore,
846846
{
847847
let mut should_break = false;
848848
let async_event_handler = |event| {
@@ -1047,7 +1047,7 @@ where
10471047
LM::Target: ALiquidityManager,
10481048
O::Target: 'static + OutputSpender,
10491049
D::Target: 'static + ChangeDestinationSource,
1050-
K::Target: 'static + KVStoreSync,
1050+
K::Target: 'static + KVStore,
10511051
{
10521052
let persister = PersisterSyncWrapper::<'static, PS, CM, L, S>::new(persister);
10531053
process_events_full_async(

lightning/src/util/persist.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,64 @@ pub trait KVStoreSync {
121121
) -> Result<Vec<String>, io::Error>;
122122
}
123123

124+
/// A wrapper around a [`KVStoreSync`] that implements the [`KVStore`] trait.
125+
#[cfg(any(test, feature = "_test_utils"))]
126+
pub struct KVStoreSyncWrapper<K: Deref>(pub K)
127+
where
128+
K::Target: KVStoreSync;
129+
130+
#[cfg(not(any(test, feature = "_test_utils")))]
131+
pub(crate) struct KVStoreSyncWrapper<K: Deref>(pub K)
132+
where
133+
K::Target: KVStoreSync;
134+
135+
impl<K: Deref> Deref for KVStoreSyncWrapper<K>
136+
where
137+
K::Target: KVStoreSync,
138+
{
139+
type Target = Self;
140+
fn deref(&self) -> &Self {
141+
self
142+
}
143+
}
144+
145+
impl<K: Deref> KVStore for KVStoreSyncWrapper<K>
146+
where
147+
K::Target: KVStoreSync,
148+
{
149+
fn read(
150+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
151+
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> {
152+
let res = self.0.read(primary_namespace, secondary_namespace, key);
153+
154+
Box::pin(async move { res })
155+
}
156+
157+
fn write(
158+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
159+
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> {
160+
let res = self.0.write(primary_namespace, secondary_namespace, key, buf);
161+
162+
Box::pin(async move { res })
163+
}
164+
165+
fn remove(
166+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
167+
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> {
168+
let res = self.0.remove(primary_namespace, secondary_namespace, key, lazy);
169+
170+
Box::pin(async move { res })
171+
}
172+
173+
fn list(
174+
&self, primary_namespace: &str, secondary_namespace: &str,
175+
) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> {
176+
let res = self.0.list(primary_namespace, secondary_namespace);
177+
178+
Box::pin(async move { res })
179+
}
180+
}
181+
124182
/// A trait that provides a key-value store interface for persisting data.
125183
pub trait KVStore {
126184
/// Returns the data stored for the given `primary_namespace`, `secondary_namespace`, and

0 commit comments

Comments
 (0)