Skip to content

Commit 82f5718

Browse files
committed
lexe-ln: read initial migrations concurrently in main fetch block
1 parent b5074c5 commit 82f5718

File tree

3 files changed

+51
-10
lines changed

3 files changed

+51
-10
lines changed

lexe-ln/src/migrations.rs

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33
//! Similar to database migrations but for VFS-persisted state. Each migration
44
//! creates an empty marker file in `migrations/<name>` when complete.
55
6-
use std::collections::HashSet;
6+
use std::{collections::HashSet, future::Future, pin::Pin, sync::Arc};
77

8-
use anyhow::Context;
8+
use anyhow::{Context, format_err};
99
use bytes::Bytes;
10+
use futures::{
11+
FutureExt,
12+
future::{self, BoxFuture},
13+
};
1014
use lexe_api::{
1115
types::Empty,
1216
vfs::{self, Vfs, VfsDirectory, VfsFileId},
@@ -31,8 +35,20 @@ pub struct Migrations {
3135
}
3236

3337
impl Migrations {
38+
/// Return a future that reads migrations once and shares the result with
39+
/// all awaiters.
40+
pub fn read_once<'vfs>(
41+
vfs: &'vfs (impl Vfs + Send + Sync),
42+
) -> MigrationsReadOnce<'vfs> {
43+
let fut = Migrations::read(vfs)
44+
.map(|res| res.map(Arc::new).map_err(Arc::new))
45+
.boxed()
46+
.shared();
47+
MigrationsReadOnce { fut }
48+
}
49+
3450
/// Read all applied migrations from the VFS.
35-
pub async fn read(vfs: &impl Vfs) -> anyhow::Result<Self> {
51+
async fn read(vfs: &(impl Vfs + Send + Sync)) -> anyhow::Result<Self> {
3652
let dir = VfsDirectory::new(vfs::MIGRATIONS_DIR);
3753
let dir_list = vfs
3854
.list_directory(&dir)
@@ -67,3 +83,24 @@ impl Migrations {
6783
self.applied.contains(name)
6884
}
6985
}
86+
87+
/// A future that reads migrations once and shares the result with all awaiters.
88+
#[derive(Clone)]
89+
pub struct MigrationsReadOnce<'vfs> {
90+
fut: future::Shared<
91+
BoxFuture<'vfs, Result<Arc<Migrations>, Arc<anyhow::Error>>>,
92+
>,
93+
}
94+
95+
impl<'vfs> Future for MigrationsReadOnce<'vfs> {
96+
type Output = anyhow::Result<Arc<Migrations>>;
97+
98+
fn poll(
99+
self: Pin<&mut Self>,
100+
cx: &mut std::task::Context<'_>,
101+
) -> std::task::Poll<Self::Output> {
102+
Pin::new(&mut self.get_mut().fut)
103+
.poll(cx)
104+
.map_err(|err| format_err!("Could not read migrations: {err:#}"))
105+
}
106+
}

lexe-ln/src/persister.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use tracing::{info, warn};
2828
use crate::{
2929
alias::LexeChainMonitorType,
3030
event::EventId,
31-
migrations::{self, Migrations},
31+
migrations::{self, Migrations, MigrationsReadOnce},
3232
payments::{
3333
PaymentMetadata, PaymentV2, PaymentWithMetadata,
3434
manager::{CheckedPayment, PersistedPayment},
@@ -583,12 +583,13 @@ pub trait LexePersisterMethods: Vfs {
583583
///
584584
/// Idempotent: Creates a marker file at `migrations/payments_v2` once
585585
/// complete, and skips the migration if the marker file already exists.
586-
#[tracing::instrument(skip_all, name = "(migrate-payments-v2)")]
586+
// #[tracing::instrument(skip_all, name = "(migrate-payments-v2)")]
587587
async fn migrate_to_payments_v2(
588588
&self,
589-
initial_migrations: &Migrations,
589+
initial_migrations_fut: MigrationsReadOnce<'_>,
590590
) -> anyhow::Result<()> {
591591
// Check if migration has already run
592+
let initial_migrations = initial_migrations_fut.await?;
592593
if initial_migrations.is_applied(migrations::MARKER_PAYMENTS_V2) {
593594
return Ok(());
594595
}

node/src/run.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -329,9 +329,8 @@ impl UserNode {
329329
shutdown.clone(),
330330
));
331331

332-
// Read initial applied migrations
333-
// TODO(phlip9): fetch migrations concurrently in main fetch block
334-
let initial_migrations = Migrations::read(&*persister).await?;
332+
// Future that reads initial applied migrations
333+
let initial_migrations_fut = Migrations::read_once(&*persister);
335334

336335
// A future which reads the approved versions list
337336
let read_maybe_approved_versions = persister::read_approved_versions(
@@ -341,10 +340,11 @@ impl UserNode {
341340
);
342341

343342
// Fetch pending payments to initialize payments manager with
343+
let initial_migrations_fut_clone = initial_migrations_fut.clone();
344344
let pending_payments_fut = async {
345345
// But first, migrate to payments v2 if needed
346346
persister
347-
.migrate_to_payments_v2(&initial_migrations)
347+
.migrate_to_payments_v2(initial_migrations_fut_clone)
348348
.await
349349
.context("payments_v2 migration failed")?;
350350

@@ -357,20 +357,23 @@ impl UserNode {
357357
// Read as much as possible concurrently to reduce init time
358358
#[rustfmt::skip] // Does not respect 80 char line width
359359
let (
360+
try_initial_migrations,
360361
try_maybe_approved_versions,
361362
try_maybe_changeset,
362363
try_existing_scids,
363364
try_pending_payments,
364365
try_maybe_revocable_clients,
365366
try_channel_monitor_bytes,
366367
) = tokio::join!(
368+
initial_migrations_fut,
367369
read_maybe_approved_versions,
368370
persister.read_wallet_changeset(),
369371
persister.read_scids(),
370372
pending_payments_fut,
371373
persister.read_json::<RevocableClients>(&REVOCABLE_CLIENTS_FILE_ID),
372374
persister.fetch_channel_monitor_bytes(),
373375
);
376+
let initial_migrations = try_initial_migrations?;
374377
if deploy_env.is_staging_or_prod() {
375378
// Erroring here prevents an attacker with access to a target user's
376379
// gdrive from deleting the user's approved versions list in an

0 commit comments

Comments
 (0)