Skip to content

Commit 29d4685

Browse files
committed
Introduce EventQueueNotifierGuard type
Previously, when enqueuing new events to the `EventQueue`, we'd directly attempt to wake any notifiers/notify any threads waiting on the `Condvar` about the newly available events. This could of course mean we'd notify them while ourselves still holding some locks, e.g., on the peer state. Here, we instead introduce a `EventQueueNotifierGuard` type that will notify about pending events if necesssary, which mitigates any potential lock contention: we now simply have to ensure that any method calling `enqueue` holds the notifier before retrieving any locks.
1 parent 8b995e6 commit 29d4685

File tree

5 files changed

+125
-20
lines changed

5 files changed

+125
-20
lines changed

lightning-liquidity/src/events.rs

Lines changed: 103 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ use alloc::collections::VecDeque;
2424
use alloc::vec::Vec;
2525

2626
use core::future::Future;
27+
#[cfg(debug_assertions)]
28+
use core::sync::atomic::{AtomicU8, Ordering};
2729
use core::task::{Poll, Waker};
2830

2931
/// The maximum queue size we allow before starting to drop events.
@@ -33,37 +35,40 @@ pub(crate) struct EventQueue {
3335
queue: Arc<Mutex<VecDeque<LiquidityEvent>>>,
3436
waker: Arc<Mutex<Option<Waker>>>,
3537
#[cfg(feature = "std")]
36-
condvar: crate::sync::Condvar,
38+
condvar: Arc<crate::sync::Condvar>,
39+
#[cfg(debug_assertions)]
40+
num_held_notifier_guards: Arc<AtomicU8>,
3741
}
3842

3943
impl EventQueue {
4044
pub fn new() -> Self {
4145
let queue = Arc::new(Mutex::new(VecDeque::new()));
4246
let waker = Arc::new(Mutex::new(None));
43-
#[cfg(feature = "std")]
44-
{
45-
let condvar = crate::sync::Condvar::new();
46-
Self { queue, waker, condvar }
47+
Self {
48+
queue,
49+
waker,
50+
#[cfg(feature = "std")]
51+
condvar: Arc::new(crate::sync::Condvar::new()),
52+
#[cfg(debug_assertions)]
53+
num_held_notifier_guards: Arc::new(AtomicU8::new(0)),
4754
}
48-
#[cfg(not(feature = "std"))]
49-
Self { queue, waker }
5055
}
5156

5257
pub fn enqueue<E: Into<LiquidityEvent>>(&self, event: E) {
58+
#[cfg(debug_assertions)]
5359
{
54-
let mut queue = self.queue.lock().unwrap();
55-
if queue.len() < MAX_EVENT_QUEUE_SIZE {
56-
queue.push_back(event.into());
57-
} else {
58-
return;
59-
}
60+
let num_held_notifier_guards = self.num_held_notifier_guards.load(Ordering::Relaxed);
61+
debug_assert!(
62+
num_held_notifier_guards > 0,
63+
"We should be holding at least one notifier guard whenever enqueuing new events"
64+
);
6065
}
61-
62-
if let Some(waker) = self.waker.lock().unwrap().take() {
63-
waker.wake();
66+
let mut queue = self.queue.lock().unwrap();
67+
if queue.len() < MAX_EVENT_QUEUE_SIZE {
68+
queue.push_back(event.into());
69+
} else {
70+
return;
6471
}
65-
#[cfg(feature = "std")]
66-
self.condvar.notify_one();
6772
}
6873

6974
pub fn next_event(&self) -> Option<LiquidityEvent> {
@@ -102,6 +107,81 @@ impl EventQueue {
102107
pub fn get_and_clear_pending_events(&self) -> Vec<LiquidityEvent> {
103108
self.queue.lock().unwrap().split_off(0).into()
104109
}
110+
111+
// Returns an [`EventQueueNotifierGuard`] that will notify about new event when dropped.
112+
pub fn notifier(&self) -> EventQueueNotifierGuard {
113+
#[cfg(debug_assertions)]
114+
{
115+
self.num_held_notifier_guards.fetch_add(1, Ordering::Relaxed);
116+
}
117+
EventQueueNotifierGuard {
118+
queue: Arc::clone(&self.queue),
119+
waker: Arc::clone(&self.waker),
120+
#[cfg(feature = "std")]
121+
condvar: Arc::clone(&self.condvar),
122+
#[cfg(debug_assertions)]
123+
num_held_notifier_guards: Arc::clone(&self.num_held_notifier_guards),
124+
}
125+
}
126+
}
127+
128+
impl Drop for EventQueue {
129+
fn drop(&mut self) {
130+
#[cfg(debug_assertions)]
131+
{
132+
let num_held_notifier_guards = self.num_held_notifier_guards.load(Ordering::Relaxed);
133+
debug_assert!(
134+
num_held_notifier_guards == 0,
135+
"We should not be holding any notifier guards when the event queue is dropped"
136+
);
137+
}
138+
}
139+
}
140+
141+
// A guard type that will notify about new events when dropped.
142+
#[must_use]
143+
pub(crate) struct EventQueueNotifierGuard {
144+
queue: Arc<Mutex<VecDeque<LiquidityEvent>>>,
145+
waker: Arc<Mutex<Option<Waker>>>,
146+
#[cfg(feature = "std")]
147+
condvar: Arc<crate::sync::Condvar>,
148+
#[cfg(debug_assertions)]
149+
num_held_notifier_guards: Arc<AtomicU8>,
150+
}
151+
152+
impl Drop for EventQueueNotifierGuard {
153+
fn drop(&mut self) {
154+
let should_notify = !self.queue.lock().unwrap().is_empty();
155+
156+
if should_notify {
157+
if let Some(waker) = self.waker.lock().unwrap().take() {
158+
waker.wake();
159+
}
160+
161+
#[cfg(feature = "std")]
162+
self.condvar.notify_one();
163+
}
164+
165+
#[cfg(debug_assertions)]
166+
{
167+
let res = self.num_held_notifier_guards.fetch_update(
168+
Ordering::Relaxed,
169+
Ordering::Relaxed,
170+
|x| Some(x.saturating_sub(1)),
171+
);
172+
match res {
173+
Ok(previous_value) if previous_value == 0 => debug_assert!(
174+
false,
175+
"num_held_notifier_guards counter out-of-sync! This should never happen!"
176+
),
177+
Err(_) => debug_assert!(
178+
false,
179+
"num_held_notifier_guards counter out-of-sync! This should never happen!"
180+
),
181+
_ => {},
182+
}
183+
}
184+
}
105185
}
106186

107187
/// An event which you should probably take some action in response to.
@@ -195,6 +275,7 @@ mod tests {
195275
});
196276

197277
for _ in 0..3 {
278+
let _guard = event_queue.notifier();
198279
event_queue.enqueue(expected_event.clone());
199280
}
200281

@@ -220,13 +301,15 @@ mod tests {
220301
let mut delayed_enqueue = false;
221302

222303
for _ in 0..25 {
304+
let _guard = event_queue.notifier();
223305
event_queue.enqueue(expected_event.clone());
224306
enqueued_events.fetch_add(1, Ordering::SeqCst);
225307
}
226308

227309
loop {
228310
tokio::select! {
229311
_ = tokio::time::sleep(Duration::from_millis(10)), if !delayed_enqueue => {
312+
let _guard = event_queue.notifier();
230313
event_queue.enqueue(expected_event.clone());
231314
enqueued_events.fetch_add(1, Ordering::SeqCst);
232315
delayed_enqueue = true;
@@ -235,6 +318,7 @@ mod tests {
235318
assert_eq!(e, expected_event);
236319
received_events.fetch_add(1, Ordering::SeqCst);
237320

321+
let _guard = event_queue.notifier();
238322
event_queue.enqueue(expected_event.clone());
239323
enqueued_events.fetch_add(1, Ordering::SeqCst);
240324
}
@@ -267,6 +351,7 @@ mod tests {
267351
std::thread::spawn(move || {
268352
// Sleep a bit before we enqueue the events everybody is waiting for.
269353
std::thread::sleep(Duration::from_millis(20));
354+
let _guard = thread_queue.notifier();
270355
thread_queue.enqueue(thread_event.clone());
271356
thread_queue.enqueue(thread_event.clone());
272357
});

lightning-liquidity/src/lsps0/client.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ where
6161
fn handle_response(
6262
&self, response: LSPS0Response, counterparty_node_id: &PublicKey,
6363
) -> Result<(), LightningError> {
64+
let _event_queue_notifier = self.pending_events.notifier();
65+
6466
match response {
6567
LSPS0Response::ListProtocols(LSPS0ListProtocolsResponse { protocols }) => {
6668
self.pending_events.enqueue(LSPS0ClientEvent::ListProtocolsResponse {

lightning-liquidity/src/lsps1/client.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,9 @@ where
110110
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
111111
result: LSPS1GetInfoResponse,
112112
) -> Result<(), LightningError> {
113-
let outer_state_lock = self.per_peer_state.write().unwrap();
113+
let _event_queue_notifier = self.pending_events.notifier();
114114

115+
let outer_state_lock = self.per_peer_state.write().unwrap();
115116
match outer_state_lock.get(counterparty_node_id) {
116117
Some(inner_state_lock) => {
117118
let mut peer_state_lock = inner_state_lock.lock().unwrap();
@@ -147,6 +148,8 @@ where
147148
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
148149
error: LSPSResponseError,
149150
) -> Result<(), LightningError> {
151+
let _event_queue_notifier = self.pending_events.notifier();
152+
150153
let outer_state_lock = self.per_peer_state.read().unwrap();
151154
match outer_state_lock.get(counterparty_node_id) {
152155
Some(inner_state_lock) => {
@@ -224,6 +227,8 @@ where
224227
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
225228
response: LSPS1CreateOrderResponse,
226229
) -> Result<(), LightningError> {
230+
let _event_queue_notifier = self.pending_events.notifier();
231+
227232
let outer_state_lock = self.per_peer_state.read().unwrap();
228233
match outer_state_lock.get(counterparty_node_id) {
229234
Some(inner_state_lock) => {
@@ -266,6 +271,8 @@ where
266271
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
267272
error: LSPSResponseError,
268273
) -> Result<(), LightningError> {
274+
let _event_queue_notifier = self.pending_events.notifier();
275+
269276
let outer_state_lock = self.per_peer_state.read().unwrap();
270277
match outer_state_lock.get(counterparty_node_id) {
271278
Some(inner_state_lock) => {
@@ -343,6 +350,8 @@ where
343350
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
344351
response: LSPS1CreateOrderResponse,
345352
) -> Result<(), LightningError> {
353+
let _event_queue_notifier = self.pending_events.notifier();
354+
346355
let outer_state_lock = self.per_peer_state.read().unwrap();
347356
match outer_state_lock.get(counterparty_node_id) {
348357
Some(inner_state_lock) => {
@@ -385,6 +394,8 @@ where
385394
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
386395
error: LSPSResponseError,
387396
) -> Result<(), LightningError> {
397+
let _event_queue_notifier = self.pending_events.notifier();
398+
388399
let outer_state_lock = self.per_peer_state.read().unwrap();
389400
match outer_state_lock.get(counterparty_node_id) {
390401
Some(inner_state_lock) => {

lightning-liquidity/src/lsps2/client.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ where
191191
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
192192
result: LSPS2GetInfoResponse,
193193
) -> Result<(), LightningError> {
194+
let _event_queue_notifier = self.pending_events.notifier();
195+
194196
let outer_state_lock = self.per_peer_state.read().unwrap();
195197
match outer_state_lock.get(counterparty_node_id) {
196198
Some(inner_state_lock) => {
@@ -257,6 +259,8 @@ where
257259
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
258260
result: LSPS2BuyResponse,
259261
) -> Result<(), LightningError> {
262+
let _event_queue_notifier = self.pending_events.notifier();
263+
260264
let outer_state_lock = self.per_peer_state.read().unwrap();
261265
match outer_state_lock.get(counterparty_node_id) {
262266
Some(inner_state_lock) => {

lightning-liquidity/src/lsps2/service.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,8 @@ where
787787
&self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64,
788788
payment_hash: PaymentHash,
789789
) -> Result<(), APIError> {
790+
let _event_queue_notifier = self.pending_events.notifier();
791+
790792
let peer_by_intercept_scid = self.peer_by_intercept_scid.read().unwrap();
791793
if let Some(counterparty_node_id) = peer_by_intercept_scid.get(&intercept_scid) {
792794
let outer_state_lock = self.per_peer_state.read().unwrap();
@@ -1076,6 +1078,7 @@ where
10761078
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
10771079
params: LSPS2GetInfoRequest,
10781080
) -> Result<(), LightningError> {
1081+
let _event_queue_notifier = self.pending_events.notifier();
10791082
let (result, response) = {
10801083
let mut outer_state_lock = self.per_peer_state.write().unwrap();
10811084
let inner_state_lock =
@@ -1095,7 +1098,6 @@ where
10951098
token: params.token,
10961099
};
10971100
self.pending_events.enqueue(event);
1098-
10991101
(Ok(()), msg)
11001102
},
11011103
(e, msg) => (e, msg),
@@ -1112,6 +1114,7 @@ where
11121114
fn handle_buy_request(
11131115
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey, params: LSPS2BuyRequest,
11141116
) -> Result<(), LightningError> {
1117+
let _event_queue_notifier = self.pending_events.notifier();
11151118
if let Some(payment_size_msat) = params.payment_size_msat {
11161119
if payment_size_msat < params.opening_fee_params.min_payment_size_msat {
11171120
let response = LSPS2Response::BuyError(LSPSResponseError {

0 commit comments

Comments
 (0)