Skip to content

Commit 2336917

Browse files
committed
Add EventQueue persistence
We add simple `persist` call to `EventQueue` that persists it under a `event_queue` key.
1 parent 593dc84 commit 2336917

File tree

6 files changed

+86
-7
lines changed

6 files changed

+86
-7
lines changed

lightning-liquidity/src/events/event_queue.rs

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,23 @@
11
use super::LiquidityEvent;
2+
3+
use crate::persist::{
4+
LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_KEY,
5+
LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
6+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
7+
};
28
use crate::sync::{Arc, Mutex};
39

10+
use alloc::boxed::Box;
411
use alloc::collections::VecDeque;
512
use alloc::vec::Vec;
613

714
use core::future::Future;
15+
use core::pin::Pin;
816
use core::task::{Poll, Waker};
917

18+
use lightning::util::persist::KVStore;
19+
use lightning::util::ser::{CollectionLength, MaybeReadable, Readable, Writeable, Writer};
20+
1021
/// The maximum queue size we allow before starting to drop events.
1122
pub const MAX_EVENT_QUEUE_SIZE: usize = 1000;
1223

@@ -15,17 +26,19 @@ pub(crate) struct EventQueue {
1526
waker: Arc<Mutex<Option<Waker>>>,
1627
#[cfg(feature = "std")]
1728
condvar: Arc<crate::sync::Condvar>,
29+
kv_store: Arc<dyn KVStore + Send + Sync>,
1830
}
1931

2032
impl EventQueue {
21-
pub fn new() -> Self {
33+
pub fn new(kv_store: Arc<dyn KVStore + Send + Sync>) -> Self {
2234
let queue = Arc::new(Mutex::new(VecDeque::new()));
2335
let waker = Arc::new(Mutex::new(None));
2436
Self {
2537
queue,
2638
waker,
2739
#[cfg(feature = "std")]
2840
condvar: Arc::new(crate::sync::Condvar::new()),
41+
kv_store,
2942
}
3043
}
3144

@@ -70,6 +83,20 @@ impl EventQueue {
7083
pub fn notifier(&self) -> EventQueueNotifierGuard<'_> {
7184
EventQueueNotifierGuard(self)
7285
}
86+
87+
pub fn persist(
88+
&self,
89+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + Send>> {
90+
let queue = self.queue.lock().unwrap();
91+
let encoded = EventQueueSerWrapper(&queue).encode();
92+
93+
self.kv_store.write(
94+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
95+
LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
96+
LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_KEY,
97+
encoded,
98+
)
99+
}
73100
}
74101

75102
// A guard type that will notify about new events when dropped.
@@ -122,6 +149,35 @@ impl Future for EventFuture {
122149
}
123150
}
124151

152+
pub(crate) struct EventQueueDeserWrapper(pub VecDeque<LiquidityEvent>);
153+
154+
impl Readable for EventQueueDeserWrapper {
155+
fn read<R: lightning::io::Read>(
156+
reader: &mut R,
157+
) -> Result<Self, lightning::ln::msgs::DecodeError> {
158+
let len: CollectionLength = Readable::read(reader)?;
159+
let mut queue = VecDeque::with_capacity(len.0 as usize);
160+
for _ in 0..len.0 {
161+
if let Some(event) = MaybeReadable::read(reader)? {
162+
queue.push_back(event);
163+
}
164+
}
165+
Ok(Self(queue))
166+
}
167+
}
168+
169+
struct EventQueueSerWrapper<'a>(&'a VecDeque<LiquidityEvent>);
170+
171+
impl Writeable for EventQueueSerWrapper<'_> {
172+
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), lightning::io::Error> {
173+
CollectionLength(self.0.len() as u64).write(writer)?;
174+
for e in self.0.iter() {
175+
e.write(writer)?;
176+
}
177+
Ok(())
178+
}
179+
}
180+
125181
#[cfg(test)]
126182
mod tests {
127183
#[tokio::test]
@@ -131,10 +187,13 @@ mod tests {
131187
use crate::lsps0::event::LSPS0ClientEvent;
132188
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
133189
use core::sync::atomic::{AtomicU16, Ordering};
190+
use lightning::util::persist::KVStoreSyncWrapper;
191+
use lightning::util::test_utils::TestStore;
134192
use std::sync::Arc;
135193
use std::time::Duration;
136194

137-
let event_queue = Arc::new(EventQueue::new());
195+
let kv_store = Arc::new(KVStoreSyncWrapper(Arc::new(TestStore::new(false))));
196+
let event_queue = Arc::new(EventQueue::new(kv_store));
138197
assert_eq!(event_queue.next_event(), None);
139198

140199
let secp_ctx = Secp256k1::new();

lightning-liquidity/src/lsps0/client.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,12 @@ where
113113

114114
#[cfg(test)]
115115
mod tests {
116-
117116
use alloc::string::ToString;
118117
use alloc::sync::Arc;
119118

119+
use lightning::util::persist::KVStoreSyncWrapper;
120+
use lightning::util::test_utils::TestStore;
121+
120122
use crate::lsps0::ser::{LSPSMessage, LSPSRequestId};
121123
use crate::tests::utils::{self, TestEntropy};
122124

@@ -126,7 +128,8 @@ mod tests {
126128
fn test_list_protocols() {
127129
let pending_messages = Arc::new(MessageQueue::new());
128130
let entropy_source = Arc::new(TestEntropy {});
129-
let event_queue = Arc::new(EventQueue::new());
131+
let kv_store = Arc::new(KVStoreSyncWrapper(Arc::new(TestStore::new(false))));
132+
let event_queue = Arc::new(EventQueue::new(kv_store));
130133

131134
let lsps0_handler = Arc::new(LSPS0ClientHandler::new(
132135
entropy_source,

lightning-liquidity/src/lsps5/client.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,8 @@ mod tests {
444444
use crate::{lsps0::ser::LSPSRequestId, lsps5::msgs::SetWebhookResponse};
445445
use bitcoin::{key::Secp256k1, secp256k1::SecretKey};
446446
use core::sync::atomic::{AtomicU64, Ordering};
447+
use lightning::util::persist::KVStoreSyncWrapper;
448+
use lightning::util::test_utils::TestStore;
447449

448450
struct UniqueTestEntropy {
449451
counter: AtomicU64,
@@ -467,7 +469,9 @@ mod tests {
467469
) {
468470
let test_entropy_source = Arc::new(UniqueTestEntropy { counter: AtomicU64::new(2) });
469471
let message_queue = Arc::new(MessageQueue::new());
470-
let event_queue = Arc::new(EventQueue::new());
472+
473+
let kv_store = Arc::new(KVStoreSyncWrapper(Arc::new(TestStore::new(false))));
474+
let event_queue = Arc::new(EventQueue::new(kv_store));
471475
let client = LSPS5ClientHandler::new(
472476
test_entropy_source,
473477
Arc::clone(&message_queue),

lightning-liquidity/src/manager.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ where
329329
client_config: Option<LiquidityClientConfig>, time_provider: TP,
330330
) -> Self {
331331
let pending_messages = Arc::new(MessageQueue::new());
332-
let pending_events = Arc::new(EventQueue::new());
332+
let pending_events = Arc::new(EventQueue::new(Arc::clone(&kv_store)));
333333
let ignored_peers = RwLock::new(new_hash_set());
334334

335335
let mut supported_protocols = Vec::new();
@@ -574,6 +574,9 @@ where
574574
&self,
575575
) -> Pin<Box<dyn StdFuture<Output = Result<(), lightning::io::Error>> + Send>> {
576576
let mut futures = Vec::new();
577+
578+
futures.push(self.pending_events.persist());
579+
577580
if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() {
578581
futures.push(lsps2_service_handler.persist());
579582
}

lightning-liquidity/src/persist.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,16 @@
1414
/// [`LiquidityManager`]: crate::LiquidityManager
1515
pub const LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE: &str = "liquidity";
1616

17+
/// The secondary namespace under which the [`LiquidityManager`] event queue will be persisted.
18+
///
19+
/// [`LiquidityManager`]: crate::LiquidityManager
20+
pub const LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
21+
22+
/// The key under which the [`LiquidityManager`] event queue will be persisted.
23+
///
24+
/// [`LiquidityManager`]: crate::LiquidityManager
25+
pub const LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_KEY: &str = "event_queue";
26+
1727
/// The secondary namespace under which the [`LSPS2ServiceHandler`] data will be persisted.
1828
///
1929
/// [`LSPS2ServiceHandler`]: crate::lsps2::service::LSPS2ServiceHandler

lightning/src/util/ser.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ impl Readable for BigSize {
553553
/// To ensure we only have one valid encoding per value, we add 0xffff to values written as eight
554554
/// bytes. Thus, 0xfffe is serialized as 0xfffe, whereas 0xffff is serialized as
555555
/// 0xffff0000000000000000 (i.e. read-eight-bytes then zero).
556-
struct CollectionLength(pub u64);
556+
pub struct CollectionLength(pub u64);
557557
impl Writeable for CollectionLength {
558558
#[inline]
559559
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {

0 commit comments

Comments
 (0)