Skip to content

Commit d984aca

Browse files
authored
Merge pull request #670 from tnull/2025-10-asyncify-more-things
Asyncify test suite and reintroduce VSS-internal runtime
2 parents 9d71d3a + 8cad63b commit d984aca

File tree

12 files changed

+611
-469
lines changed

12 files changed

+611
-469
lines changed

.github/workflows/vss-integration.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,5 @@ jobs:
4444
run: |
4545
cd ldk-node
4646
export TEST_VSS_BASE_URL="http://localhost:8080/vss"
47-
RUSTFLAGS="--cfg vss_test" cargo build --verbose --color always
47+
RUSTFLAGS="--cfg vss_test" cargo test io::vss_store
4848
RUSTFLAGS="--cfg vss_test" cargo test --test integration_tests_vss

src/builder.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1169,6 +1169,7 @@ fn build_with_store_internal(
11691169
if e.kind() == std::io::ErrorKind::NotFound {
11701170
Arc::new(RwLock::new(NodeMetrics::default()))
11711171
} else {
1172+
log_error!(logger, "Failed to read node metrics from store: {}", e);
11721173
return Err(BuildError::ReadFailed);
11731174
}
11741175
},
@@ -1232,7 +1233,8 @@ fn build_with_store_internal(
12321233
Arc::clone(&kv_store),
12331234
Arc::clone(&logger),
12341235
)),
1235-
Err(_) => {
1236+
Err(e) => {
1237+
log_error!(logger, "Failed to read payment data from store: {}", e);
12361238
return Err(BuildError::ReadFailed);
12371239
},
12381240
};
@@ -1365,7 +1367,7 @@ fn build_with_store_internal(
13651367
if e.kind() == lightning::io::ErrorKind::NotFound {
13661368
Vec::new()
13671369
} else {
1368-
log_error!(logger, "Failed to read channel monitors: {}", e.to_string());
1370+
log_error!(logger, "Failed to read channel monitors from store: {}", e.to_string());
13691371
return Err(BuildError::ReadFailed);
13701372
}
13711373
},
@@ -1390,6 +1392,7 @@ fn build_with_store_internal(
13901392
if e.kind() == std::io::ErrorKind::NotFound {
13911393
Arc::new(Graph::new(config.network.into(), Arc::clone(&logger)))
13921394
} else {
1395+
log_error!(logger, "Failed to read network graph from store: {}", e);
13931396
return Err(BuildError::ReadFailed);
13941397
}
13951398
},
@@ -1406,6 +1409,7 @@ fn build_with_store_internal(
14061409
let params = ProbabilisticScoringDecayParameters::default();
14071410
ProbabilisticScorer::new(params, Arc::clone(&network_graph), Arc::clone(&logger))
14081411
} else {
1412+
log_error!(logger, "Failed to read scoring data from store: {}", e);
14091413
return Err(BuildError::ReadFailed);
14101414
}
14111415
},
@@ -1491,7 +1495,7 @@ fn build_with_store_internal(
14911495
);
14921496
let (_hash, channel_manager) =
14931497
<(BlockHash, ChannelManager)>::read(&mut reader, read_args).map_err(|e| {
1494-
log_error!(logger, "Failed to read channel manager from KVStore: {}", e);
1498+
log_error!(logger, "Failed to read channel manager from store: {}", e);
14951499
BuildError::ReadFailed
14961500
})?;
14971501
channel_manager
@@ -1720,6 +1724,7 @@ fn build_with_store_internal(
17201724
Arc::clone(&logger),
17211725
))
17221726
} else {
1727+
log_error!(logger, "Failed to read output sweeper data from store: {}", e);
17231728
return Err(BuildError::ReadFailed);
17241729
}
17251730
},
@@ -1732,6 +1737,7 @@ fn build_with_store_internal(
17321737
if e.kind() == std::io::ErrorKind::NotFound {
17331738
Arc::new(EventQueue::new(Arc::clone(&kv_store), Arc::clone(&logger)))
17341739
} else {
1740+
log_error!(logger, "Failed to read event queue from store: {}", e);
17351741
return Err(BuildError::ReadFailed);
17361742
}
17371743
},
@@ -1743,6 +1749,7 @@ fn build_with_store_internal(
17431749
if e.kind() == std::io::ErrorKind::NotFound {
17441750
Arc::new(PeerStore::new(Arc::clone(&kv_store), Arc::clone(&logger)))
17451751
} else {
1752+
log_error!(logger, "Failed to read peer data from store: {}", e);
17461753
return Err(BuildError::ReadFailed);
17471754
}
17481755
},

src/event.rs

Lines changed: 4 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use core::future::Future;
99
use core::task::{Poll, Waker};
1010
use std::collections::VecDeque;
1111
use std::ops::Deref;
12-
use std::sync::{Arc, Condvar, Mutex};
12+
use std::sync::{Arc, Mutex};
1313

1414
use bitcoin::blockdata::locktime::absolute::LockTime;
1515
use bitcoin::secp256k1::PublicKey;
@@ -287,7 +287,6 @@ where
287287
{
288288
queue: Arc<Mutex<VecDeque<Event>>>,
289289
waker: Arc<Mutex<Option<Waker>>>,
290-
notifier: Condvar,
291290
kv_store: Arc<DynStore>,
292291
logger: L,
293292
}
@@ -299,8 +298,7 @@ where
299298
pub(crate) fn new(kv_store: Arc<DynStore>, logger: L) -> Self {
300299
let queue = Arc::new(Mutex::new(VecDeque::new()));
301300
let waker = Arc::new(Mutex::new(None));
302-
let notifier = Condvar::new();
303-
Self { queue, waker, notifier, kv_store, logger }
301+
Self { queue, waker, kv_store, logger }
304302
}
305303

306304
pub(crate) fn add_event(&self, event: Event) -> Result<(), Error> {
@@ -310,8 +308,6 @@ where
310308
self.persist_queue(&locked_queue)?;
311309
}
312310

313-
self.notifier.notify_one();
314-
315311
if let Some(waker) = self.waker.lock().unwrap().take() {
316312
waker.wake();
317313
}
@@ -327,19 +323,12 @@ where
327323
EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await
328324
}
329325

330-
pub(crate) fn wait_next_event(&self) -> Event {
331-
let locked_queue =
332-
self.notifier.wait_while(self.queue.lock().unwrap(), |queue| queue.is_empty()).unwrap();
333-
locked_queue.front().unwrap().clone()
334-
}
335-
336326
pub(crate) fn event_handled(&self) -> Result<(), Error> {
337327
{
338328
let mut locked_queue = self.queue.lock().unwrap();
339329
locked_queue.pop_front();
340330
self.persist_queue(&locked_queue)?;
341331
}
342-
self.notifier.notify_one();
343332

344333
if let Some(waker) = self.waker.lock().unwrap().take() {
345334
waker.wake();
@@ -383,8 +372,7 @@ where
383372
let read_queue: EventQueueDeserWrapper = Readable::read(reader)?;
384373
let queue = Arc::new(Mutex::new(read_queue.0));
385374
let waker = Arc::new(Mutex::new(None));
386-
let notifier = Condvar::new();
387-
Ok(Self { queue, waker, notifier, kv_store, logger })
375+
Ok(Self { queue, waker, kv_store, logger })
388376
}
389377
}
390378

@@ -1637,7 +1625,6 @@ mod tests {
16371625

16381626
// Check we get the expected event and that it is returned until we mark it handled.
16391627
for _ in 0..5 {
1640-
assert_eq!(event_queue.wait_next_event(), expected_event);
16411628
assert_eq!(event_queue.next_event_async().await, expected_event);
16421629
assert_eq!(event_queue.next_event(), Some(expected_event.clone()));
16431630
}
@@ -1652,7 +1639,7 @@ mod tests {
16521639
.unwrap();
16531640
let deser_event_queue =
16541641
EventQueue::read(&mut &persisted_bytes[..], (Arc::clone(&store), logger)).unwrap();
1655-
assert_eq!(deser_event_queue.wait_next_event(), expected_event);
1642+
assert_eq!(deser_event_queue.next_event_async().await, expected_event);
16561643

16571644
event_queue.event_handled().unwrap();
16581645
assert_eq!(event_queue.next_event(), None);
@@ -1721,32 +1708,5 @@ mod tests {
17211708
}
17221709
}
17231710
assert_eq!(event_queue.next_event(), None);
1724-
1725-
// Check we operate correctly, even when mixing and matching blocking and async API calls.
1726-
let (tx, mut rx) = tokio::sync::watch::channel(());
1727-
let thread_queue = Arc::clone(&event_queue);
1728-
let thread_event = expected_event.clone();
1729-
std::thread::spawn(move || {
1730-
let e = thread_queue.wait_next_event();
1731-
assert_eq!(e, thread_event);
1732-
thread_queue.event_handled().unwrap();
1733-
tx.send(()).unwrap();
1734-
});
1735-
1736-
let thread_queue = Arc::clone(&event_queue);
1737-
let thread_event = expected_event.clone();
1738-
std::thread::spawn(move || {
1739-
// Sleep a bit before we enqueue the events everybody is waiting for.
1740-
std::thread::sleep(Duration::from_millis(20));
1741-
thread_queue.add_event(thread_event.clone()).unwrap();
1742-
thread_queue.add_event(thread_event.clone()).unwrap();
1743-
});
1744-
1745-
let e = event_queue.next_event_async().await;
1746-
assert_eq!(e, expected_event.clone());
1747-
event_queue.event_handled().unwrap();
1748-
1749-
rx.changed().await.unwrap();
1750-
assert_eq!(event_queue.next_event(), None);
17511711
}
17521712
}

0 commit comments

Comments
 (0)