Skip to content

Commit 8cf64ec

Browse files
authored
Merge pull request #697 from tnull/2025-11-async-event-persistence
Make `EventQueue` persistence `async`
2 parents cef82e4 + 4c72541 commit 8cf64ec

File tree

6 files changed

+176
-41
lines changed

6 files changed

+176
-41
lines changed

src/data_store.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,11 @@ where
172172
#[cfg(test)]
173173
mod tests {
174174
use lightning::impl_writeable_tlv_based;
175-
use lightning::util::test_utils::{TestLogger, TestStore};
175+
use lightning::util::test_utils::TestLogger;
176176

177177
use super::*;
178178
use crate::hex_utils;
179+
use crate::io::test_utils::InMemoryStore;
179180

180181
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
181182
struct TestObjectId {
@@ -234,7 +235,7 @@ mod tests {
234235

235236
#[test]
236237
fn data_is_persisted() {
237-
let store: Arc<DynStore> = Arc::new(TestStore::new(false));
238+
let store: Arc<DynStore> = Arc::new(InMemoryStore::new());
238239
let logger = Arc::new(TestLogger::new());
239240
let primary_namespace = "datastore_test_primary".to_string();
240241
let secondary_namespace = "datastore_test_secondary".to_string();

src/event.rs

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use lightning::util::config::{
2626
ChannelConfigOverrides, ChannelConfigUpdate, ChannelHandshakeConfigUpdate,
2727
};
2828
use lightning::util::errors::APIError;
29-
use lightning::util::persist::KVStoreSync;
29+
use lightning::util::persist::KVStore;
3030
use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
3131
use lightning_liquidity::lsps2::utils::compute_opening_fee;
3232
use lightning_types::payment::{PaymentHash, PaymentPreimage};
@@ -301,12 +301,14 @@ where
301301
Self { queue, waker, kv_store, logger }
302302
}
303303

304-
pub(crate) fn add_event(&self, event: Event) -> Result<(), Error> {
305-
{
304+
pub(crate) async fn add_event(&self, event: Event) -> Result<(), Error> {
305+
let data = {
306306
let mut locked_queue = self.queue.lock().unwrap();
307307
locked_queue.push_back(event);
308-
self.persist_queue(&locked_queue)?;
309-
}
308+
EventQueueSerWrapper(&locked_queue).encode()
309+
};
310+
311+
self.persist_queue(data).await?;
310312

311313
if let Some(waker) = self.waker.lock().unwrap().take() {
312314
waker.wake();
@@ -323,28 +325,30 @@ where
323325
EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await
324326
}
325327

326-
pub(crate) fn event_handled(&self) -> Result<(), Error> {
327-
{
328+
pub(crate) async fn event_handled(&self) -> Result<(), Error> {
329+
let data = {
328330
let mut locked_queue = self.queue.lock().unwrap();
329331
locked_queue.pop_front();
330-
self.persist_queue(&locked_queue)?;
331-
}
332+
EventQueueSerWrapper(&locked_queue).encode()
333+
};
334+
335+
self.persist_queue(data).await?;
332336

333337
if let Some(waker) = self.waker.lock().unwrap().take() {
334338
waker.wake();
335339
}
336340
Ok(())
337341
}
338342

339-
fn persist_queue(&self, locked_queue: &VecDeque<Event>) -> Result<(), Error> {
340-
let data = EventQueueSerWrapper(locked_queue).encode();
341-
KVStoreSync::write(
343+
async fn persist_queue(&self, encoded_queue: Vec<u8>) -> Result<(), Error> {
344+
KVStore::write(
342345
&*self.kv_store,
343346
EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
344347
EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
345348
EVENT_QUEUE_PERSISTENCE_KEY,
346-
data,
349+
encoded_queue,
347350
)
351+
.await
348352
.map_err(|e| {
349353
log_error!(
350354
self.logger,
@@ -694,7 +698,7 @@ where
694698
claim_deadline,
695699
custom_records,
696700
};
697-
match self.event_queue.add_event(event) {
701+
match self.event_queue.add_event(event).await {
698702
Ok(_) => return Ok(()),
699703
Err(e) => {
700704
log_error!(
@@ -928,7 +932,7 @@ where
928932
.map(|cf| cf.custom_tlvs().into_iter().map(|tlv| tlv.into()).collect())
929933
.unwrap_or_default(),
930934
};
931-
match self.event_queue.add_event(event) {
935+
match self.event_queue.add_event(event).await {
932936
Ok(_) => return Ok(()),
933937
Err(e) => {
934938
log_error!(self.logger, "Failed to push to event queue: {}", e);
@@ -988,7 +992,7 @@ where
988992
fee_paid_msat,
989993
};
990994

991-
match self.event_queue.add_event(event) {
995+
match self.event_queue.add_event(event).await {
992996
Ok(_) => return Ok(()),
993997
Err(e) => {
994998
log_error!(self.logger, "Failed to push to event queue: {}", e);
@@ -1019,7 +1023,7 @@ where
10191023

10201024
let event =
10211025
Event::PaymentFailed { payment_id: Some(payment_id), payment_hash, reason };
1022-
match self.event_queue.add_event(event) {
1026+
match self.event_queue.add_event(event).await {
10231027
Ok(_) => return Ok(()),
10241028
Err(e) => {
10251029
log_error!(self.logger, "Failed to push to event queue: {}", e);
@@ -1295,7 +1299,7 @@ where
12951299
claim_from_onchain_tx,
12961300
outbound_amount_forwarded_msat,
12971301
};
1298-
self.event_queue.add_event(event).map_err(|e| {
1302+
self.event_queue.add_event(event).await.map_err(|e| {
12991303
log_error!(self.logger, "Failed to push to event queue: {}", e);
13001304
ReplayEvent()
13011305
})?;
@@ -1322,7 +1326,7 @@ where
13221326
counterparty_node_id,
13231327
funding_txo,
13241328
};
1325-
match self.event_queue.add_event(event) {
1329+
match self.event_queue.add_event(event).await {
13261330
Ok(_) => {},
13271331
Err(e) => {
13281332
log_error!(self.logger, "Failed to push to event queue: {}", e);
@@ -1383,7 +1387,7 @@ where
13831387
user_channel_id: UserChannelId(user_channel_id),
13841388
counterparty_node_id: Some(counterparty_node_id),
13851389
};
1386-
match self.event_queue.add_event(event) {
1390+
match self.event_queue.add_event(event).await {
13871391
Ok(_) => {},
13881392
Err(e) => {
13891393
log_error!(self.logger, "Failed to push to event queue: {}", e);
@@ -1407,7 +1411,7 @@ where
14071411
reason: Some(reason),
14081412
};
14091413

1410-
match self.event_queue.add_event(event) {
1414+
match self.event_queue.add_event(event).await {
14111415
Ok(_) => {},
14121416
Err(e) => {
14131417
log_error!(self.logger, "Failed to push to event queue: {}", e);
@@ -1605,13 +1609,14 @@ mod tests {
16051609
use std::sync::atomic::{AtomicU16, Ordering};
16061610
use std::time::Duration;
16071611

1608-
use lightning::util::test_utils::{TestLogger, TestStore};
1612+
use lightning::util::test_utils::TestLogger;
16091613

16101614
use super::*;
1615+
use crate::io::test_utils::InMemoryStore;
16111616

16121617
#[tokio::test]
16131618
async fn event_queue_persistence() {
1614-
let store: Arc<DynStore> = Arc::new(TestStore::new(false));
1619+
let store: Arc<DynStore> = Arc::new(InMemoryStore::new());
16151620
let logger = Arc::new(TestLogger::new());
16161621
let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger)));
16171622
assert_eq!(event_queue.next_event(), None);
@@ -1621,7 +1626,7 @@ mod tests {
16211626
user_channel_id: UserChannelId(2323),
16221627
counterparty_node_id: None,
16231628
};
1624-
event_queue.add_event(expected_event.clone()).unwrap();
1629+
event_queue.add_event(expected_event.clone()).await.unwrap();
16251630

16261631
// Check we get the expected event and that it is returned until we mark it handled.
16271632
for _ in 0..5 {
@@ -1630,24 +1635,25 @@ mod tests {
16301635
}
16311636

16321637
// Check we can read back what we persisted.
1633-
let persisted_bytes = KVStoreSync::read(
1638+
let persisted_bytes = KVStore::read(
16341639
&*store,
16351640
EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
16361641
EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
16371642
EVENT_QUEUE_PERSISTENCE_KEY,
16381643
)
1644+
.await
16391645
.unwrap();
16401646
let deser_event_queue =
16411647
EventQueue::read(&mut &persisted_bytes[..], (Arc::clone(&store), logger)).unwrap();
16421648
assert_eq!(deser_event_queue.next_event_async().await, expected_event);
16431649

1644-
event_queue.event_handled().unwrap();
1650+
event_queue.event_handled().await.unwrap();
16451651
assert_eq!(event_queue.next_event(), None);
16461652
}
16471653

16481654
#[tokio::test]
16491655
async fn event_queue_concurrency() {
1650-
let store: Arc<DynStore> = Arc::new(TestStore::new(false));
1656+
let store: Arc<DynStore> = Arc::new(InMemoryStore::new());
16511657
let logger = Arc::new(TestLogger::new());
16521658
let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger)));
16531659
assert_eq!(event_queue.next_event(), None);
@@ -1675,28 +1681,28 @@ mod tests {
16751681
let mut delayed_enqueue = false;
16761682

16771683
for _ in 0..25 {
1678-
event_queue.add_event(expected_event.clone()).unwrap();
1684+
event_queue.add_event(expected_event.clone()).await.unwrap();
16791685
enqueued_events.fetch_add(1, Ordering::SeqCst);
16801686
}
16811687

16821688
loop {
16831689
tokio::select! {
16841690
_ = tokio::time::sleep(Duration::from_millis(10)), if !delayed_enqueue => {
1685-
event_queue.add_event(expected_event.clone()).unwrap();
1691+
event_queue.add_event(expected_event.clone()).await.unwrap();
16861692
enqueued_events.fetch_add(1, Ordering::SeqCst);
16871693
delayed_enqueue = true;
16881694
}
16891695
e = event_queue.next_event_async() => {
16901696
assert_eq!(e, expected_event);
1691-
event_queue.event_handled().unwrap();
1697+
event_queue.event_handled().await.unwrap();
16921698
received_events.fetch_add(1, Ordering::SeqCst);
16931699

1694-
event_queue.add_event(expected_event.clone()).unwrap();
1700+
event_queue.add_event(expected_event.clone()).await.unwrap();
16951701
enqueued_events.fetch_add(1, Ordering::SeqCst);
16961702
}
16971703
e = event_queue.next_event_async() => {
16981704
assert_eq!(e, expected_event);
1699-
event_queue.event_handled().unwrap();
1705+
event_queue.event_handled().await.unwrap();
17001706
received_events.fetch_add(1, Ordering::SeqCst);
17011707
}
17021708
}

0 commit comments

Comments
 (0)