Skip to content

Commit a378267

Browse files
committed
Parallelize read_payments
Previously, we would read entries of our payment store sequentially. This is more or less fine when we read from a local store, but when we read from a remote (e.g., VSS) store, all the latency could result in considerable slowdown during startup. Here, we opt to read store entries in batches.
1 parent d0fbc12 commit a378267

File tree

2 files changed

+41
-10
lines changed

2 files changed

+41
-10
lines changed

src/io/utils.rs

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -236,22 +236,49 @@ where
236236
{
237237
let mut res = Vec::new();
238238

239-
for stored_key in KVStore::list(
239+
let mut stored_keys = KVStore::list(
240240
&*kv_store,
241241
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
242242
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
243243
)
244-
.await?
245-
{
246-
let mut reader = Cursor::new(
247-
KVStore::read(
244+
.await?;
245+
246+
const BATCH_SIZE: usize = 20;
247+
248+
let mut set = tokio::task::JoinSet::new();
249+
250+
// Fill JoinSet with tasks if possible
251+
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
252+
if let Some(next_key) = stored_keys.pop() {
253+
let fut = KVStore::read(
248254
&*kv_store,
249255
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
250256
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
251-
&stored_key,
252-
)
253-
.await?,
254-
);
257+
&next_key,
258+
);
259+
set.spawn(fut);
260+
debug_assert!(set.len() <= BATCH_SIZE);
261+
}
262+
}
263+
264+
while let Some(read_res) = set.join_next().await {
265+
// Exit early if we get an IO error.
266+
let read_res = read_res??;
267+
268+
// Refill set for every finished future, if we still have something to do.
269+
if let Some(next_key) = stored_keys.pop() {
270+
let fut = KVStore::read(
271+
&*kv_store,
272+
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
273+
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
274+
&next_key,
275+
);
276+
set.spawn(fut);
277+
debug_assert!(set.len() <= BATCH_SIZE);
278+
}
279+
280+
// Handle result.
281+
let mut reader = Cursor::new(read_res);
255282
let payment = PaymentDetails::read(&mut reader).map_err(|e| {
256283
log_error!(logger, "Failed to deserialize PaymentDetails: {}", e);
257284
std::io::Error::new(
@@ -261,6 +288,10 @@ where
261288
})?;
262289
res.push(payment);
263290
}
291+
292+
debug_assert!(set.is_empty());
293+
debug_assert!(stored_keys.is_empty());
294+
264295
Ok(res)
265296
}
266297

src/runtime.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ impl Runtime {
207207
);
208208
}
209209

210-
fn handle(&self) -> &tokio::runtime::Handle {
210+
pub fn handle(&self) -> &tokio::runtime::Handle {
211211
match &self.mode {
212212
RuntimeMode::Owned(rt) => rt.handle(),
213213
RuntimeMode::Handle(handle) => handle,

0 commit comments

Comments
 (0)