Skip to content
This repository was archived by the owner on Jan 6, 2025. It is now read-only.

Commit 9861f57

Browse files
committed
Introduce MessageQueue trait to DRY up enqueing and PM triggering
1 parent 1f11274 commit 9861f57

File tree

11 files changed

+300
-322
lines changed

11 files changed

+300
-322
lines changed

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ pub mod lsps0;
4444
pub mod lsps1;
4545
pub mod lsps2;
4646
mod manager;
47+
pub mod message_queue;
4748
mod sync;
49+
#[cfg(test)]
50+
mod tests;
4851
mod utils;
4952

5053
pub use manager::{LiquidityClientConfig, LiquidityManager, LiquidityServiceConfig};

src/lsps0/client.rs

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77
use crate::events::{Event, EventQueue};
88
use crate::lsps0::event::LSPS0ClientEvent;
99
use crate::lsps0::msgs::{
10-
LSPS0Message, LSPS0Request, LSPS0Response, LSPSMessage, ListProtocolsRequest,
11-
ListProtocolsResponse, ProtocolMessageHandler, ResponseError,
10+
LSPS0Message, LSPS0Request, LSPS0Response, ListProtocolsRequest, ListProtocolsResponse,
11+
ProtocolMessageHandler, ResponseError,
1212
};
13-
use crate::prelude::Vec;
14-
use crate::sync::{Arc, Mutex};
13+
use crate::message_queue::MessageQueue;
14+
use crate::sync::Arc;
1515
use crate::utils;
1616

1717
use lightning::ln::msgs::{ErrorAction, LightningError};
@@ -23,23 +23,24 @@ use bitcoin::secp256k1::PublicKey;
2323
use core::ops::Deref;
2424

2525
/// A message handler capable of sending and handling LSPS0 messages.
26-
pub struct LSPS0ClientHandler<ES: Deref>
26+
pub struct LSPS0ClientHandler<ES: Deref, MQ: Deref>
2727
where
2828
ES::Target: EntropySource,
29+
MQ::Target: MessageQueue,
2930
{
3031
entropy_source: ES,
31-
pending_messages: Arc<Mutex<Vec<(PublicKey, LSPSMessage)>>>,
32+
pending_messages: MQ,
3233
pending_events: Arc<EventQueue>,
3334
}
3435

35-
impl<ES: Deref> LSPS0ClientHandler<ES>
36+
impl<ES: Deref, MQ: Deref> LSPS0ClientHandler<ES, MQ>
3637
where
3738
ES::Target: EntropySource,
39+
MQ::Target: MessageQueue,
3840
{
3941
/// Returns a new instance of [`LSPS0ClientHandler`].
4042
pub(crate) fn new(
41-
entropy_source: ES, pending_messages: Arc<Mutex<Vec<(PublicKey, LSPSMessage)>>>,
42-
pending_events: Arc<EventQueue>,
43+
entropy_source: ES, pending_messages: MQ, pending_events: Arc<EventQueue>,
4344
) -> Self {
4445
Self { entropy_source, pending_messages, pending_events }
4546
}
@@ -49,17 +50,13 @@ where
4950
/// Please refer to the [LSPS0
5051
/// specifcation](https://github.com/BitcoinAndLightningLayerSpecs/lsp/tree/main/LSPS0#lsps-specification-support-query)
5152
/// for more information.
52-
pub fn list_protocols(&self, counterparty_node_id: PublicKey) {
53+
pub fn list_protocols(&self, counterparty_node_id: &PublicKey) {
5354
let msg = LSPS0Message::Request(
5455
utils::generate_request_id(&self.entropy_source),
5556
LSPS0Request::ListProtocols(ListProtocolsRequest {}),
5657
);
5758

58-
self.enqueue_message(counterparty_node_id, msg);
59-
}
60-
61-
fn enqueue_message(&self, counterparty_node_id: PublicKey, message: LSPS0Message) {
62-
self.pending_messages.lock().unwrap().push((counterparty_node_id, message.into()));
59+
self.pending_messages.enqueue(counterparty_node_id, msg.into());
6360
}
6461

6562
fn handle_response(
@@ -88,9 +85,10 @@ where
8885
}
8986
}
9087

91-
impl<ES: Deref> ProtocolMessageHandler for LSPS0ClientHandler<ES>
88+
impl<ES: Deref, MQ: Deref> ProtocolMessageHandler for LSPS0ClientHandler<ES, MQ>
9289
where
9390
ES::Target: EntropySource,
91+
MQ::Target: MessageQueue,
9492
{
9593
type ProtocolMessage = LSPS0Message;
9694
const PROTOCOL_NUMBER: Option<u16> = None;
@@ -119,20 +117,14 @@ mod tests {
119117
use alloc::string::ToString;
120118
use alloc::sync::Arc;
121119

122-
use crate::lsps0::msgs::RequestId;
120+
use crate::lsps0::msgs::{LSPSMessage, RequestId};
121+
use crate::tests::utils::{TestEntropy, TestMessageQueue};
123122

124123
use super::*;
125124

126-
struct TestEntropy {}
127-
impl EntropySource for TestEntropy {
128-
fn get_secure_random_bytes(&self) -> [u8; 32] {
129-
[0; 32]
130-
}
131-
}
132-
133125
#[test]
134126
fn test_list_protocols() {
135-
let pending_messages = Arc::new(Mutex::new(vec![]));
127+
let pending_messages = Arc::new(TestMessageQueue::new());
136128
let entropy_source = Arc::new(TestEntropy {});
137129
let event_queue = Arc::new(EventQueue::new());
138130

@@ -147,8 +139,8 @@ mod tests {
147139
)
148140
.unwrap();
149141

150-
lsps0_handler.list_protocols(counterparty_node_id);
151-
let pending_messages = pending_messages.lock().unwrap();
142+
lsps0_handler.list_protocols(&counterparty_node_id);
143+
let pending_messages = pending_messages.get_and_clear_pending_msgs();
152144

153145
assert_eq!(pending_messages.len(), 1);
154146

src/lsps0/service.rs

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,35 +14,37 @@
1414
//! information.
1515
1616
use crate::lsps0::msgs::{
17-
LSPS0Message, LSPS0Request, LSPS0Response, LSPSMessage, ListProtocolsResponse,
18-
ProtocolMessageHandler, RequestId,
17+
LSPS0Message, LSPS0Request, LSPS0Response, ListProtocolsResponse, ProtocolMessageHandler,
18+
RequestId,
1919
};
20+
use crate::message_queue::MessageQueue;
2021
use crate::prelude::Vec;
21-
use crate::sync::{Arc, Mutex};
2222

2323
use lightning::ln::msgs::{ErrorAction, LightningError};
2424
use lightning::util::logger::Level;
2525

2626
use bitcoin::secp256k1::PublicKey;
2727

28+
use core::ops::Deref;
29+
2830
/// The main server-side object allowing to send and receive LSPS0 messages.
29-
pub struct LSPS0ServiceHandler {
30-
pending_messages: Arc<Mutex<Vec<(PublicKey, LSPSMessage)>>>,
31+
pub struct LSPS0ServiceHandler<MQ: Deref>
32+
where
33+
MQ::Target: MessageQueue,
34+
{
35+
pending_messages: MQ,
3136
protocols: Vec<u16>,
3237
}
3338

34-
impl LSPS0ServiceHandler {
39+
impl<MQ: Deref> LSPS0ServiceHandler<MQ>
40+
where
41+
MQ::Target: MessageQueue,
42+
{
3543
/// Returns a new instance of [`LSPS0ServiceHandler`].
36-
pub(crate) fn new(
37-
protocols: Vec<u16>, pending_messages: Arc<Mutex<Vec<(PublicKey, LSPSMessage)>>>,
38-
) -> Self {
44+
pub(crate) fn new(protocols: Vec<u16>, pending_messages: MQ) -> Self {
3945
Self { protocols, pending_messages }
4046
}
4147

42-
fn enqueue_message(&self, counterparty_node_id: PublicKey, message: LSPS0Message) {
43-
self.pending_messages.lock().unwrap().push((counterparty_node_id, message.into()));
44-
}
45-
4648
fn handle_request(
4749
&self, request_id: RequestId, request: LSPS0Request, counterparty_node_id: &PublicKey,
4850
) -> Result<(), lightning::ln::msgs::LightningError> {
@@ -54,14 +56,17 @@ impl LSPS0ServiceHandler {
5456
protocols: self.protocols.clone(),
5557
}),
5658
);
57-
self.enqueue_message(*counterparty_node_id, msg);
59+
self.pending_messages.enqueue(counterparty_node_id, msg.into());
5860
Ok(())
5961
}
6062
}
6163
}
6264
}
6365

64-
impl ProtocolMessageHandler for LSPS0ServiceHandler {
66+
impl<MQ: Deref> ProtocolMessageHandler for LSPS0ServiceHandler<MQ>
67+
where
68+
MQ::Target: MessageQueue,
69+
{
6570
type ProtocolMessage = LSPS0Message;
6671
const PROTOCOL_NUMBER: Option<u16> = None;
6772

@@ -86,7 +91,8 @@ impl ProtocolMessageHandler for LSPS0ServiceHandler {
8691
#[cfg(test)]
8792
mod tests {
8893

89-
use crate::lsps0::msgs::ListProtocolsRequest;
94+
use crate::lsps0::msgs::{LSPSMessage, ListProtocolsRequest};
95+
use crate::tests::utils::TestMessageQueue;
9096
use crate::utils;
9197
use alloc::string::ToString;
9298
use alloc::sync::Arc;
@@ -96,7 +102,7 @@ mod tests {
96102
#[test]
97103
fn test_handle_list_protocols_request() {
98104
let protocols: Vec<u16> = vec![];
99-
let pending_messages = Arc::new(Mutex::new(vec![]));
105+
let pending_messages = Arc::new(TestMessageQueue::new());
100106

101107
let lsps0_handler = Arc::new(LSPS0ServiceHandler::new(protocols, pending_messages.clone()));
102108

@@ -110,7 +116,7 @@ mod tests {
110116
.unwrap();
111117

112118
lsps0_handler.handle_message(list_protocols_request, &counterparty_node_id).unwrap();
113-
let pending_messages = pending_messages.lock().unwrap();
119+
let pending_messages = pending_messages.get_and_clear_pending_msgs();
114120

115121
assert_eq!(pending_messages.len(), 1);
116122

0 commit comments

Comments
 (0)