Skip to content

Commit 4c72541

Browse files
committed
Make EventQueue persistence async
Previously, we'd still use `KVStoreSync` for persistence of our event queue, which also meant calling the sync persistence through our otherwise-async background processor/event handling flow. Here we switch our `EventQueue` persistence to be async, which gets us one step further towards async-everything.
1 parent f5822a0 commit 4c72541

File tree

2 files changed

+38
-30
lines changed

2 files changed

+38
-30
lines changed

src/event.rs

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use lightning::util::config::{
2626
ChannelConfigOverrides, ChannelConfigUpdate, ChannelHandshakeConfigUpdate,
2727
};
2828
use lightning::util::errors::APIError;
29-
use lightning::util::persist::KVStoreSync;
29+
use lightning::util::persist::KVStore;
3030
use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
3131
use lightning_liquidity::lsps2::utils::compute_opening_fee;
3232
use lightning_types::payment::{PaymentHash, PaymentPreimage};
@@ -301,12 +301,14 @@ where
301301
Self { queue, waker, kv_store, logger }
302302
}
303303

304-
pub(crate) fn add_event(&self, event: Event) -> Result<(), Error> {
305-
{
304+
pub(crate) async fn add_event(&self, event: Event) -> Result<(), Error> {
305+
let data = {
306306
let mut locked_queue = self.queue.lock().unwrap();
307307
locked_queue.push_back(event);
308-
self.persist_queue(&locked_queue)?;
309-
}
308+
EventQueueSerWrapper(&locked_queue).encode()
309+
};
310+
311+
self.persist_queue(data).await?;
310312

311313
if let Some(waker) = self.waker.lock().unwrap().take() {
312314
waker.wake();
@@ -323,28 +325,30 @@ where
323325
EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await
324326
}
325327

326-
pub(crate) fn event_handled(&self) -> Result<(), Error> {
327-
{
328+
pub(crate) async fn event_handled(&self) -> Result<(), Error> {
329+
let data = {
328330
let mut locked_queue = self.queue.lock().unwrap();
329331
locked_queue.pop_front();
330-
self.persist_queue(&locked_queue)?;
331-
}
332+
EventQueueSerWrapper(&locked_queue).encode()
333+
};
334+
335+
self.persist_queue(data).await?;
332336

333337
if let Some(waker) = self.waker.lock().unwrap().take() {
334338
waker.wake();
335339
}
336340
Ok(())
337341
}
338342

339-
fn persist_queue(&self, locked_queue: &VecDeque<Event>) -> Result<(), Error> {
340-
let data = EventQueueSerWrapper(locked_queue).encode();
341-
KVStoreSync::write(
343+
async fn persist_queue(&self, encoded_queue: Vec<u8>) -> Result<(), Error> {
344+
KVStore::write(
342345
&*self.kv_store,
343346
EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
344347
EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
345348
EVENT_QUEUE_PERSISTENCE_KEY,
346-
data,
349+
encoded_queue,
347350
)
351+
.await
348352
.map_err(|e| {
349353
log_error!(
350354
self.logger,
@@ -694,7 +698,7 @@ where
694698
claim_deadline,
695699
custom_records,
696700
};
697-
match self.event_queue.add_event(event) {
701+
match self.event_queue.add_event(event).await {
698702
Ok(_) => return Ok(()),
699703
Err(e) => {
700704
log_error!(
@@ -928,7 +932,7 @@ where
928932
.map(|cf| cf.custom_tlvs().into_iter().map(|tlv| tlv.into()).collect())
929933
.unwrap_or_default(),
930934
};
931-
match self.event_queue.add_event(event) {
935+
match self.event_queue.add_event(event).await {
932936
Ok(_) => return Ok(()),
933937
Err(e) => {
934938
log_error!(self.logger, "Failed to push to event queue: {}", e);
@@ -988,7 +992,7 @@ where
988992
fee_paid_msat,
989993
};
990994

991-
match self.event_queue.add_event(event) {
995+
match self.event_queue.add_event(event).await {
992996
Ok(_) => return Ok(()),
993997
Err(e) => {
994998
log_error!(self.logger, "Failed to push to event queue: {}", e);
@@ -1019,7 +1023,7 @@ where
10191023

10201024
let event =
10211025
Event::PaymentFailed { payment_id: Some(payment_id), payment_hash, reason };
1022-
match self.event_queue.add_event(event) {
1026+
match self.event_queue.add_event(event).await {
10231027
Ok(_) => return Ok(()),
10241028
Err(e) => {
10251029
log_error!(self.logger, "Failed to push to event queue: {}", e);
@@ -1295,7 +1299,7 @@ where
12951299
claim_from_onchain_tx,
12961300
outbound_amount_forwarded_msat,
12971301
};
1298-
self.event_queue.add_event(event).map_err(|e| {
1302+
self.event_queue.add_event(event).await.map_err(|e| {
12991303
log_error!(self.logger, "Failed to push to event queue: {}", e);
13001304
ReplayEvent()
13011305
})?;
@@ -1322,7 +1326,7 @@ where
13221326
counterparty_node_id,
13231327
funding_txo,
13241328
};
1325-
match self.event_queue.add_event(event) {
1329+
match self.event_queue.add_event(event).await {
13261330
Ok(_) => {},
13271331
Err(e) => {
13281332
log_error!(self.logger, "Failed to push to event queue: {}", e);
@@ -1383,7 +1387,7 @@ where
13831387
user_channel_id: UserChannelId(user_channel_id),
13841388
counterparty_node_id: Some(counterparty_node_id),
13851389
};
1386-
match self.event_queue.add_event(event) {
1390+
match self.event_queue.add_event(event).await {
13871391
Ok(_) => {},
13881392
Err(e) => {
13891393
log_error!(self.logger, "Failed to push to event queue: {}", e);
@@ -1407,7 +1411,7 @@ where
14071411
reason: Some(reason),
14081412
};
14091413

1410-
match self.event_queue.add_event(event) {
1414+
match self.event_queue.add_event(event).await {
14111415
Ok(_) => {},
14121416
Err(e) => {
14131417
log_error!(self.logger, "Failed to push to event queue: {}", e);
@@ -1622,7 +1626,7 @@ mod tests {
16221626
user_channel_id: UserChannelId(2323),
16231627
counterparty_node_id: None,
16241628
};
1625-
event_queue.add_event(expected_event.clone()).unwrap();
1629+
event_queue.add_event(expected_event.clone()).await.unwrap();
16261630

16271631
// Check we get the expected event and that it is returned until we mark it handled.
16281632
for _ in 0..5 {
@@ -1631,18 +1635,19 @@ mod tests {
16311635
}
16321636

16331637
// Check we can read back what we persisted.
1634-
let persisted_bytes = KVStoreSync::read(
1638+
let persisted_bytes = KVStore::read(
16351639
&*store,
16361640
EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
16371641
EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
16381642
EVENT_QUEUE_PERSISTENCE_KEY,
16391643
)
1644+
.await
16401645
.unwrap();
16411646
let deser_event_queue =
16421647
EventQueue::read(&mut &persisted_bytes[..], (Arc::clone(&store), logger)).unwrap();
16431648
assert_eq!(deser_event_queue.next_event_async().await, expected_event);
16441649

1645-
event_queue.event_handled().unwrap();
1650+
event_queue.event_handled().await.unwrap();
16461651
assert_eq!(event_queue.next_event(), None);
16471652
}
16481653

@@ -1676,28 +1681,28 @@ mod tests {
16761681
let mut delayed_enqueue = false;
16771682

16781683
for _ in 0..25 {
1679-
event_queue.add_event(expected_event.clone()).unwrap();
1684+
event_queue.add_event(expected_event.clone()).await.unwrap();
16801685
enqueued_events.fetch_add(1, Ordering::SeqCst);
16811686
}
16821687

16831688
loop {
16841689
tokio::select! {
16851690
_ = tokio::time::sleep(Duration::from_millis(10)), if !delayed_enqueue => {
1686-
event_queue.add_event(expected_event.clone()).unwrap();
1691+
event_queue.add_event(expected_event.clone()).await.unwrap();
16871692
enqueued_events.fetch_add(1, Ordering::SeqCst);
16881693
delayed_enqueue = true;
16891694
}
16901695
e = event_queue.next_event_async() => {
16911696
assert_eq!(e, expected_event);
1692-
event_queue.event_handled().unwrap();
1697+
event_queue.event_handled().await.unwrap();
16931698
received_events.fetch_add(1, Ordering::SeqCst);
16941699

1695-
event_queue.add_event(expected_event.clone()).unwrap();
1700+
event_queue.add_event(expected_event.clone()).await.unwrap();
16961701
enqueued_events.fetch_add(1, Ordering::SeqCst);
16971702
}
16981703
e = event_queue.next_event_async() => {
16991704
assert_eq!(e, expected_event);
1700-
event_queue.event_handled().unwrap();
1705+
event_queue.event_handled().await.unwrap();
17011706
received_events.fetch_add(1, Ordering::SeqCst);
17021707
}
17031708
}

src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -777,7 +777,10 @@ impl Node {
777777
///
778778
/// **Note:** This **MUST** be called after each event has been handled.
779779
pub fn event_handled(&self) -> Result<(), Error> {
780-
self.event_queue.event_handled().map_err(|e| {
780+
// We use our runtime for the sync variant to ensure `tokio::task::block_in_place` is
781+
// always called if we'd ever hit this in an outer runtime context.
782+
let fut = self.event_queue.event_handled();
783+
self.runtime.block_on(fut).map_err(|e| {
781784
log_error!(
782785
self.logger,
783786
"Couldn't mark event handled due to persistence failure: {}",

0 commit comments

Comments
 (0)