Skip to content

Commit d0fbc12

Browse files
committed
Use async KVStore for read_X util methods
Rather than using `KVStoreSync` we now use the async `KVStore` implementation for most `read_X` util methods used during node building. This is a first step towards making node building/startup entirely async eventually.
1 parent bbefa73 commit d0fbc12

File tree

2 files changed

+129
-89
lines changed

2 files changed

+129
-89
lines changed

src/builder.rs

Lines changed: 45 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ use crate::fee_estimator::OnchainFeeEstimator;
5454
use crate::gossip::GossipSource;
5555
use crate::io::sqlite_store::SqliteStore;
5656
use crate::io::utils::{
57-
read_external_pathfinding_scores_from_cache, read_node_metrics, write_node_metrics,
57+
read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph,
58+
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_scorer,
59+
write_node_metrics,
5860
};
5961
use crate::io::vss_store::VssStoreBuilder;
6062
use crate::io::{
@@ -1050,7 +1052,9 @@ fn build_with_store_internal(
10501052
}
10511053

10521054
// Initialize the status fields.
1053-
let node_metrics = match read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)) {
1055+
let node_metrics = match runtime
1056+
.block_on(async { read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1057+
{
10541058
Ok(metrics) => Arc::new(RwLock::new(metrics)),
10551059
Err(e) => {
10561060
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1064,7 +1068,9 @@ fn build_with_store_internal(
10641068
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger)));
10651069
let fee_estimator = Arc::new(OnchainFeeEstimator::new());
10661070

1067-
let payment_store = match io::utils::read_payments(Arc::clone(&kv_store), Arc::clone(&logger)) {
1071+
let payment_store = match runtime
1072+
.block_on(async { read_payments(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1073+
{
10681074
Ok(payments) => Arc::new(PaymentStore::new(
10691075
payments,
10701076
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
@@ -1291,24 +1297,23 @@ fn build_with_store_internal(
12911297
));
12921298

12931299
// Initialize the network graph, scorer, and router
1294-
let network_graph =
1295-
match io::utils::read_network_graph(Arc::clone(&kv_store), Arc::clone(&logger)) {
1296-
Ok(graph) => Arc::new(graph),
1297-
Err(e) => {
1298-
if e.kind() == std::io::ErrorKind::NotFound {
1299-
Arc::new(Graph::new(config.network.into(), Arc::clone(&logger)))
1300-
} else {
1301-
log_error!(logger, "Failed to read network graph from store: {}", e);
1302-
return Err(BuildError::ReadFailed);
1303-
}
1304-
},
1305-
};
1300+
let network_graph = match runtime
1301+
.block_on(async { read_network_graph(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1302+
{
1303+
Ok(graph) => Arc::new(graph),
1304+
Err(e) => {
1305+
if e.kind() == std::io::ErrorKind::NotFound {
1306+
Arc::new(Graph::new(config.network.into(), Arc::clone(&logger)))
1307+
} else {
1308+
log_error!(logger, "Failed to read network graph from store: {}", e);
1309+
return Err(BuildError::ReadFailed);
1310+
}
1311+
},
1312+
};
13061313

1307-
let local_scorer = match io::utils::read_scorer(
1308-
Arc::clone(&kv_store),
1309-
Arc::clone(&network_graph),
1310-
Arc::clone(&logger),
1311-
) {
1314+
let local_scorer = match runtime.block_on(async {
1315+
read_scorer(Arc::clone(&kv_store), Arc::clone(&network_graph), Arc::clone(&logger)).await
1316+
}) {
13121317
Ok(scorer) => scorer,
13131318
Err(e) => {
13141319
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1324,7 +1329,10 @@ fn build_with_store_internal(
13241329
let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer)));
13251330

13261331
// Restore external pathfinding scores from cache if possible.
1327-
match read_external_pathfinding_scores_from_cache(Arc::clone(&kv_store), Arc::clone(&logger)) {
1332+
match runtime.block_on(async {
1333+
read_external_pathfinding_scores_from_cache(Arc::clone(&kv_store), Arc::clone(&logger))
1334+
.await
1335+
}) {
13281336
Ok(external_scores) => {
13291337
scorer.lock().unwrap().merge(external_scores, cur_time);
13301338
log_trace!(logger, "External scores from cache merged successfully");
@@ -1605,14 +1613,17 @@ fn build_with_store_internal(
16051613
let connection_manager =
16061614
Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger)));
16071615

1608-
let output_sweeper = match io::utils::read_output_sweeper(
1609-
Arc::clone(&tx_broadcaster),
1610-
Arc::clone(&fee_estimator),
1611-
Arc::clone(&chain_source),
1612-
Arc::clone(&keys_manager),
1613-
Arc::clone(&kv_store),
1614-
Arc::clone(&logger),
1615-
) {
1616+
let output_sweeper = match runtime.block_on(async {
1617+
read_output_sweeper(
1618+
Arc::clone(&tx_broadcaster),
1619+
Arc::clone(&fee_estimator),
1620+
Arc::clone(&chain_source),
1621+
Arc::clone(&keys_manager),
1622+
Arc::clone(&kv_store),
1623+
Arc::clone(&logger),
1624+
)
1625+
.await
1626+
}) {
16161627
Ok(output_sweeper) => Arc::new(output_sweeper),
16171628
Err(e) => {
16181629
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1633,7 +1644,8 @@ fn build_with_store_internal(
16331644
},
16341645
};
16351646

1636-
let event_queue = match io::utils::read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger))
1647+
let event_queue = match runtime
1648+
.block_on(async { read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger)).await })
16371649
{
16381650
Ok(event_queue) => Arc::new(event_queue),
16391651
Err(e) => {
@@ -1646,7 +1658,9 @@ fn build_with_store_internal(
16461658
},
16471659
};
16481660

1649-
let peer_store = match io::utils::read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)) {
1661+
let peer_store = match runtime
1662+
.block_on(async { read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1663+
{
16501664
Ok(peer_store) => Arc::new(peer_store),
16511665
Err(e) => {
16521666
if e.kind() == std::io::ErrorKind::NotFound {

src/io/utils.rs

Lines changed: 84 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -88,38 +88,44 @@ pub(crate) fn read_or_generate_seed_file(
8888
}
8989

9090
/// Read a previously persisted [`NetworkGraph`] from the store.
91-
pub(crate) fn read_network_graph<L: Deref + Clone>(
91+
pub(crate) async fn read_network_graph<L: Deref + Clone>(
9292
kv_store: Arc<DynStore>, logger: L,
9393
) -> Result<NetworkGraph<L>, std::io::Error>
9494
where
9595
L::Target: LdkLogger,
9696
{
97-
let mut reader = Cursor::new(KVStoreSync::read(
98-
&*kv_store,
99-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
100-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
101-
NETWORK_GRAPH_PERSISTENCE_KEY,
102-
)?);
97+
let mut reader = Cursor::new(
98+
KVStore::read(
99+
&*kv_store,
100+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
101+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
102+
NETWORK_GRAPH_PERSISTENCE_KEY,
103+
)
104+
.await?,
105+
);
103106
NetworkGraph::read(&mut reader, logger.clone()).map_err(|e| {
104107
log_error!(logger, "Failed to deserialize NetworkGraph: {}", e);
105108
std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize NetworkGraph")
106109
})
107110
}
108111

109112
/// Read a previously persisted [`ProbabilisticScorer`] from the store.
110-
pub(crate) fn read_scorer<G: Deref<Target = NetworkGraph<L>>, L: Deref + Clone>(
113+
pub(crate) async fn read_scorer<G: Deref<Target = NetworkGraph<L>>, L: Deref + Clone>(
111114
kv_store: Arc<DynStore>, network_graph: G, logger: L,
112115
) -> Result<ProbabilisticScorer<G, L>, std::io::Error>
113116
where
114117
L::Target: LdkLogger,
115118
{
116119
let params = ProbabilisticScoringDecayParameters::default();
117-
let mut reader = Cursor::new(KVStoreSync::read(
118-
&*kv_store,
119-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
120-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
121-
SCORER_PERSISTENCE_KEY,
122-
)?);
120+
let mut reader = Cursor::new(
121+
KVStore::read(
122+
&*kv_store,
123+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
124+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
125+
SCORER_PERSISTENCE_KEY,
126+
)
127+
.await?,
128+
);
123129
let args = (params, network_graph, logger.clone());
124130
ProbabilisticScorer::read(&mut reader, args).map_err(|e| {
125131
log_error!(logger, "Failed to deserialize scorer: {}", e);
@@ -128,18 +134,21 @@ where
128134
}
129135

130136
/// Read previously persisted external pathfinding scores from the cache.
131-
pub(crate) fn read_external_pathfinding_scores_from_cache<L: Deref>(
137+
pub(crate) async fn read_external_pathfinding_scores_from_cache<L: Deref>(
132138
kv_store: Arc<DynStore>, logger: L,
133139
) -> Result<ChannelLiquidities, std::io::Error>
134140
where
135141
L::Target: LdkLogger,
136142
{
137-
let mut reader = Cursor::new(KVStoreSync::read(
138-
&*kv_store,
139-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
140-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
141-
EXTERNAL_PATHFINDING_SCORES_CACHE_KEY,
142-
)?);
143+
let mut reader = Cursor::new(
144+
KVStore::read(
145+
&*kv_store,
146+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
147+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
148+
EXTERNAL_PATHFINDING_SCORES_CACHE_KEY,
149+
)
150+
.await?,
151+
);
143152
ChannelLiquidities::read(&mut reader).map_err(|e| {
144153
log_error!(logger, "Failed to deserialize scorer: {}", e);
145154
std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize Scorer")
@@ -175,63 +184,74 @@ where
175184
}
176185

177186
/// Read previously persisted events from the store.
178-
pub(crate) fn read_event_queue<L: Deref + Clone>(
187+
pub(crate) async fn read_event_queue<L: Deref + Clone>(
179188
kv_store: Arc<DynStore>, logger: L,
180189
) -> Result<EventQueue<L>, std::io::Error>
181190
where
182191
L::Target: LdkLogger,
183192
{
184-
let mut reader = Cursor::new(KVStoreSync::read(
185-
&*kv_store,
186-
EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
187-
EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
188-
EVENT_QUEUE_PERSISTENCE_KEY,
189-
)?);
193+
let mut reader = Cursor::new(
194+
KVStore::read(
195+
&*kv_store,
196+
EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
197+
EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
198+
EVENT_QUEUE_PERSISTENCE_KEY,
199+
)
200+
.await?,
201+
);
190202
EventQueue::read(&mut reader, (kv_store, logger.clone())).map_err(|e| {
191203
log_error!(logger, "Failed to deserialize event queue: {}", e);
192204
std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize EventQueue")
193205
})
194206
}
195207

196208
/// Read previously persisted peer info from the store.
197-
pub(crate) fn read_peer_info<L: Deref + Clone>(
209+
pub(crate) async fn read_peer_info<L: Deref + Clone>(
198210
kv_store: Arc<DynStore>, logger: L,
199211
) -> Result<PeerStore<L>, std::io::Error>
200212
where
201213
L::Target: LdkLogger,
202214
{
203-
let mut reader = Cursor::new(KVStoreSync::read(
204-
&*kv_store,
205-
PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
206-
PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
207-
PEER_INFO_PERSISTENCE_KEY,
208-
)?);
215+
let mut reader = Cursor::new(
216+
KVStore::read(
217+
&*kv_store,
218+
PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
219+
PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
220+
PEER_INFO_PERSISTENCE_KEY,
221+
)
222+
.await?,
223+
);
209224
PeerStore::read(&mut reader, (kv_store, logger.clone())).map_err(|e| {
210225
log_error!(logger, "Failed to deserialize peer store: {}", e);
211226
std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize PeerStore")
212227
})
213228
}
214229

215230
/// Read previously persisted payments information from the store.
216-
pub(crate) fn read_payments<L: Deref>(
231+
pub(crate) async fn read_payments<L: Deref>(
217232
kv_store: Arc<DynStore>, logger: L,
218233
) -> Result<Vec<PaymentDetails>, std::io::Error>
219234
where
220235
L::Target: LdkLogger,
221236
{
222237
let mut res = Vec::new();
223238

224-
for stored_key in KVStoreSync::list(
239+
for stored_key in KVStore::list(
225240
&*kv_store,
226241
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
227242
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
228-
)? {
229-
let mut reader = Cursor::new(KVStoreSync::read(
230-
&*kv_store,
231-
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
232-
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
233-
&stored_key,
234-
)?);
243+
)
244+
.await?
245+
{
246+
let mut reader = Cursor::new(
247+
KVStore::read(
248+
&*kv_store,
249+
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
250+
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
251+
&stored_key,
252+
)
253+
.await?,
254+
);
235255
let payment = PaymentDetails::read(&mut reader).map_err(|e| {
236256
log_error!(logger, "Failed to deserialize PaymentDetails: {}", e);
237257
std::io::Error::new(
@@ -245,17 +265,20 @@ where
245265
}
246266

247267
/// Read `OutputSweeper` state from the store.
248-
pub(crate) fn read_output_sweeper(
268+
pub(crate) async fn read_output_sweeper(
249269
broadcaster: Arc<Broadcaster>, fee_estimator: Arc<OnchainFeeEstimator>,
250270
chain_data_source: Arc<ChainSource>, keys_manager: Arc<KeysManager>, kv_store: Arc<DynStore>,
251271
logger: Arc<Logger>,
252272
) -> Result<Sweeper, std::io::Error> {
253-
let mut reader = Cursor::new(KVStoreSync::read(
254-
&*kv_store,
255-
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
256-
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
257-
OUTPUT_SWEEPER_PERSISTENCE_KEY,
258-
)?);
273+
let mut reader = Cursor::new(
274+
KVStore::read(
275+
&*kv_store,
276+
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
277+
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
278+
OUTPUT_SWEEPER_PERSISTENCE_KEY,
279+
)
280+
.await?,
281+
);
259282
let args = (
260283
broadcaster,
261284
fee_estimator,
@@ -272,18 +295,21 @@ pub(crate) fn read_output_sweeper(
272295
Ok(sweeper)
273296
}
274297

275-
pub(crate) fn read_node_metrics<L: Deref>(
298+
pub(crate) async fn read_node_metrics<L: Deref>(
276299
kv_store: Arc<DynStore>, logger: L,
277300
) -> Result<NodeMetrics, std::io::Error>
278301
where
279302
L::Target: LdkLogger,
280303
{
281-
let mut reader = Cursor::new(KVStoreSync::read(
282-
&*kv_store,
283-
NODE_METRICS_PRIMARY_NAMESPACE,
284-
NODE_METRICS_SECONDARY_NAMESPACE,
285-
NODE_METRICS_KEY,
286-
)?);
304+
let mut reader = Cursor::new(
305+
KVStore::read(
306+
&*kv_store,
307+
NODE_METRICS_PRIMARY_NAMESPACE,
308+
NODE_METRICS_SECONDARY_NAMESPACE,
309+
NODE_METRICS_KEY,
310+
)
311+
.await?,
312+
);
287313
NodeMetrics::read(&mut reader).map_err(|e| {
288314
log_error!(logger, "Failed to deserialize NodeMetrics: {}", e);
289315
std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize NodeMetrics")

0 commit comments

Comments
 (0)