Skip to content

Commit fd7c64e

Browse files
committed
Refactor LSPS5ServiceHandler to hold a PeerState
Going forward, we'll add serialization logic for LSPS5 types. To contain the persisted state a bit better (and to align the model with LSPS1/2), we refactor the `LSPS5ServiceHandler` to hold a `PeerState` object.
1 parent fd9a9f2 commit fd7c64e

File tree

1 file changed

+143
-100
lines changed

1 file changed

+143
-100
lines changed

lightning-liquidity/src/lsps5/service.rs

Lines changed: 143 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@ use crate::lsps5::msgs::{
1717
SetWebhookRequest, SetWebhookResponse, WebhookNotification, WebhookNotificationMethod,
1818
};
1919
use crate::message_queue::MessageQueue;
20-
use crate::prelude::hash_map::Entry;
2120
use crate::prelude::*;
22-
use crate::sync::{Arc, Mutex};
21+
use crate::sync::{Arc, Mutex, RwLock, RwLockWriteGuard};
2322
use crate::utils::time::TimeProvider;
2423

2524
use bitcoin::secp256k1::PublicKey;
@@ -117,7 +116,7 @@ where
117116
TP::Target: TimeProvider,
118117
{
119118
config: LSPS5ServiceConfig,
120-
webhooks: Mutex<HashMap<PublicKey, HashMap<LSPS5AppName, Webhook>>>,
119+
per_peer_state: RwLock<HashMap<PublicKey, PeerState>>,
121120
event_queue: Arc<EventQueue>,
122121
pending_messages: Arc<MessageQueue>,
123122
time_provider: TP,
@@ -140,7 +139,7 @@ where
140139
assert!(config.max_webhooks_per_client > 0, "`max_webhooks_per_client` must be > 0");
141140
Self {
142141
config,
143-
webhooks: Mutex::new(new_hash_map()),
142+
per_peer_state: RwLock::new(new_hash_map()),
144143
event_queue,
145144
pending_messages,
146145
time_provider,
@@ -150,18 +149,30 @@ where
150149
}
151150
}
152151

153-
fn check_prune_stale_webhooks(&self) {
152+
fn check_prune_stale_webhooks<'a>(
153+
&self, outer_state_lock: &mut RwLockWriteGuard<'a, HashMap<PublicKey, PeerState>>,
154+
) {
155+
let mut last_pruning = self.last_pruning.lock().unwrap();
154156
let now =
155157
LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch());
156-
let should_prune = {
157-
let last_pruning = self.last_pruning.lock().unwrap();
158-
last_pruning.as_ref().map_or(true, |last_time| {
159-
now.duration_since(&last_time) > PRUNE_STALE_WEBHOOKS_INTERVAL_DAYS
160-
})
161-
};
158+
159+
let should_prune = last_pruning.as_ref().map_or(true, |last_time| {
160+
now.duration_since(&last_time) > PRUNE_STALE_WEBHOOKS_INTERVAL_DAYS
161+
});
162162

163163
if should_prune {
164-
self.prune_stale_webhooks();
164+
let now = LSPSDateTime::new_from_duration_since_epoch(
165+
self.time_provider.duration_since_epoch(),
166+
);
167+
168+
outer_state_lock.retain(|client_id, peer_state| {
169+
if self.client_has_open_channel(client_id) {
170+
// Don't prune clients with open channels
171+
return true;
172+
}
173+
!peer_state.prune_stale_webhooks(now)
174+
});
175+
*last_pruning = Some(now);
165176
}
166177
}
167178

@@ -171,58 +182,53 @@ where
171182
) -> Result<(), LightningError> {
172183
let mut message_queue_notifier = self.pending_messages.notifier();
173184

174-
self.check_prune_stale_webhooks();
185+
let mut outer_state_lock = self.per_peer_state.write().unwrap();
186+
self.check_prune_stale_webhooks(&mut outer_state_lock);
175187

176-
let mut webhooks = self.webhooks.lock().unwrap();
188+
let peer_state_lock =
189+
outer_state_lock.entry(counterparty_node_id).or_insert_with(PeerState::default);
177190

178-
let client_webhooks = webhooks.entry(counterparty_node_id).or_insert_with(new_hash_map);
179191
let now =
180192
LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch());
181193

182-
let num_webhooks = client_webhooks.len();
194+
let num_webhooks = peer_state_lock.app_names().len();
183195
let mut no_change = false;
184-
match client_webhooks.entry(params.app_name.clone()) {
185-
Entry::Occupied(mut entry) => {
186-
no_change = entry.get().url == params.webhook;
187-
let last_used = if no_change { entry.get().last_used } else { now };
188-
let last_notification_sent = entry.get().last_notification_sent;
189-
entry.insert(Webhook {
190-
_app_name: params.app_name.clone(),
191-
url: params.webhook.clone(),
192-
_counterparty_node_id: counterparty_node_id,
193-
last_used,
194-
last_notification_sent,
195-
});
196-
},
197-
Entry::Vacant(entry) => {
198-
if num_webhooks >= self.config.max_webhooks_per_client as usize {
199-
let error = LSPS5ProtocolError::TooManyWebhooks;
200-
let msg = LSPS5Message::Response(
201-
request_id,
202-
LSPS5Response::SetWebhookError(error.clone().into()),
203-
)
204-
.into();
205-
message_queue_notifier.enqueue(&counterparty_node_id, msg);
206-
return Err(LightningError {
207-
err: error.message().into(),
208-
action: ErrorAction::IgnoreAndLog(Level::Info),
209-
});
210-
}
211196

212-
entry.insert(Webhook {
213-
_app_name: params.app_name.clone(),
214-
url: params.webhook.clone(),
215-
_counterparty_node_id: counterparty_node_id,
216-
last_used: now,
217-
last_notification_sent: None,
197+
if let Some(webhook) = peer_state_lock.webhook_mut(&params.app_name.clone()) {
198+
no_change = webhook.url == params.webhook;
199+
if !no_change {
200+
webhook.last_used = now
201+
}
202+
} else {
203+
if num_webhooks >= self.config.max_webhooks_per_client as usize {
204+
let error = LSPS5ProtocolError::TooManyWebhooks;
205+
let msg = LSPS5Message::Response(
206+
request_id,
207+
LSPS5Response::SetWebhookError(error.clone().into()),
208+
)
209+
.into();
210+
message_queue_notifier.enqueue(&counterparty_node_id, msg);
211+
return Err(LightningError {
212+
err: error.message().into(),
213+
action: ErrorAction::IgnoreAndLog(Level::Info),
218214
});
219-
},
215+
}
216+
217+
let webhook = Webhook {
218+
_app_name: params.app_name.clone(),
219+
url: params.webhook.clone(),
220+
_counterparty_node_id: counterparty_node_id,
221+
last_used: now,
222+
last_notification_sent: None,
223+
};
224+
225+
peer_state_lock.insert_webhook(params.app_name.clone(), webhook);
220226
}
221227

222228
if !no_change {
223229
self.send_webhook_registered_notification(
224230
counterparty_node_id,
225-
params.app_name,
231+
params.app_name.clone(),
226232
params.webhook,
227233
)
228234
.map_err(|e| {
@@ -242,7 +248,7 @@ where
242248
let msg = LSPS5Message::Response(
243249
request_id,
244250
LSPS5Response::SetWebhook(SetWebhookResponse {
245-
num_webhooks: client_webhooks.len() as u32,
251+
num_webhooks: peer_state_lock.app_names().len() as u32,
246252
max_webhooks: self.config.max_webhooks_per_client,
247253
no_change,
248254
}),
@@ -258,14 +264,11 @@ where
258264
) -> Result<(), LightningError> {
259265
let mut message_queue_notifier = self.pending_messages.notifier();
260266

261-
self.check_prune_stale_webhooks();
262-
263-
let webhooks = self.webhooks.lock().unwrap();
267+
let mut outer_state_lock = self.per_peer_state.write().unwrap();
268+
self.check_prune_stale_webhooks(&mut outer_state_lock);
264269

265-
let app_names = webhooks
266-
.get(&counterparty_node_id)
267-
.map(|client_webhooks| client_webhooks.keys().cloned().collect::<Vec<_>>())
268-
.unwrap_or_else(Vec::new);
270+
let app_names =
271+
outer_state_lock.get(&counterparty_node_id).map(|p| p.app_names()).unwrap_or_default();
269272

270273
let max_webhooks = self.config.max_webhooks_per_client;
271274

@@ -282,12 +285,11 @@ where
282285
) -> Result<(), LightningError> {
283286
let mut message_queue_notifier = self.pending_messages.notifier();
284287

285-
self.check_prune_stale_webhooks();
288+
let mut outer_state_lock = self.per_peer_state.write().unwrap();
289+
self.check_prune_stale_webhooks(&mut outer_state_lock);
286290

287-
let mut webhooks = self.webhooks.lock().unwrap();
288-
289-
if let Some(client_webhooks) = webhooks.get_mut(&counterparty_node_id) {
290-
if client_webhooks.remove(&params.app_name).is_some() {
291+
if let Some(peer_state) = outer_state_lock.get_mut(&counterparty_node_id) {
292+
if peer_state.remove_webhook(&params.app_name) {
291293
let response = RemoveWebhookResponse {};
292294
let msg =
293295
LSPS5Message::Response(request_id, LSPS5Response::RemoveWebhook(response))
@@ -408,11 +410,13 @@ where
408410
fn send_notifications_to_client_webhooks(
409411
&self, client_id: PublicKey, notification: WebhookNotification,
410412
) -> Result<(), LSPS5ProtocolError> {
411-
let mut webhooks = self.webhooks.lock().unwrap();
413+
let mut outer_state_lock = self.per_peer_state.write().unwrap();
414+
self.check_prune_stale_webhooks(&mut outer_state_lock);
412415

413-
let client_webhooks = match webhooks.get_mut(&client_id) {
414-
Some(webhooks) if !webhooks.is_empty() => webhooks,
415-
_ => return Ok(()),
416+
let peer_state = if let Some(peer_state) = outer_state_lock.get_mut(&client_id) {
417+
peer_state
418+
} else {
419+
return Ok(());
416420
};
417421

418422
let now =
@@ -421,7 +425,7 @@ where
421425
// We must avoid sending multiple notifications of the same method
422426
// (other than lsps5.webhook_registered) close in time.
423427
if notification.method != WebhookNotificationMethod::LSPS5WebhookRegistered {
424-
let rate_limit_applies = client_webhooks.iter().any(|(_, webhook)| {
428+
let rate_limit_applies = peer_state.webhooks().iter().any(|(_, webhook)| {
425429
webhook.last_notification_sent.as_ref().map_or(false, |last_sent| {
426430
now.duration_since(&last_sent) < NOTIFICATION_COOLDOWN_TIME
427431
})
@@ -432,7 +436,7 @@ where
432436
}
433437
}
434438

435-
for (app_name, webhook) in client_webhooks.iter_mut() {
439+
for (app_name, webhook) in peer_state.webhooks_mut().iter_mut() {
436440
webhook.last_used = now;
437441
webhook.last_notification_sent = Some(now);
438442
self.send_notification(
@@ -490,26 +494,6 @@ where
490494
.map_err(|_| LSPS5ProtocolError::UnknownError)
491495
}
492496

493-
fn prune_stale_webhooks(&self) {
494-
let now =
495-
LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch());
496-
let mut webhooks = self.webhooks.lock().unwrap();
497-
498-
webhooks.retain(|client_id, client_webhooks| {
499-
if !self.client_has_open_channel(client_id) {
500-
client_webhooks.retain(|_, webhook| {
501-
now.duration_since(&webhook.last_used) < MIN_WEBHOOK_RETENTION_DAYS
502-
});
503-
!client_webhooks.is_empty()
504-
} else {
505-
true
506-
}
507-
});
508-
509-
let mut last_pruning = self.last_pruning.lock().unwrap();
510-
*last_pruning = Some(now);
511-
}
512-
513497
fn client_has_open_channel(&self, client_id: &PublicKey) -> bool {
514498
self.channel_manager
515499
.get_cm()
@@ -519,20 +503,16 @@ where
519503
}
520504

521505
pub(crate) fn peer_connected(&self, counterparty_node_id: &PublicKey) {
522-
let mut webhooks = self.webhooks.lock().unwrap();
523-
if let Some(client_webhooks) = webhooks.get_mut(counterparty_node_id) {
524-
for webhook in client_webhooks.values_mut() {
525-
webhook.last_notification_sent = None;
526-
}
506+
let mut outer_state_lock = self.per_peer_state.write().unwrap();
507+
if let Some(peer_state) = outer_state_lock.get_mut(counterparty_node_id) {
508+
peer_state.reset_notification_cooldown();
527509
}
528510
}
529511

530512
pub(crate) fn peer_disconnected(&self, counterparty_node_id: &PublicKey) {
531-
let mut webhooks = self.webhooks.lock().unwrap();
532-
if let Some(client_webhooks) = webhooks.get_mut(counterparty_node_id) {
533-
for webhook in client_webhooks.values_mut() {
534-
webhook.last_notification_sent = None;
535-
}
513+
let mut outer_state_lock = self.per_peer_state.write().unwrap();
514+
if let Some(peer_state) = outer_state_lock.get_mut(counterparty_node_id) {
515+
peer_state.reset_notification_cooldown();
536516
}
537517
}
538518
}
@@ -578,3 +558,66 @@ where
578558
}
579559
}
580560
}
561+
562+
#[derive(Debug, Default)]
563+
struct PeerState {
564+
webhooks: Vec<(LSPS5AppName, Webhook)>,
565+
}
566+
567+
impl PeerState {
568+
fn webhook_mut(&mut self, name: &LSPS5AppName) -> Option<&mut Webhook> {
569+
self.webhooks.iter_mut().find_map(|(n, h)| if n == name { Some(h) } else { None })
570+
}
571+
572+
fn webhooks(&self) -> &Vec<(LSPS5AppName, Webhook)> {
573+
&self.webhooks
574+
}
575+
576+
fn webhooks_mut(&mut self) -> &mut Vec<(LSPS5AppName, Webhook)> {
577+
&mut self.webhooks
578+
}
579+
580+
fn app_names(&self) -> Vec<LSPS5AppName> {
581+
self.webhooks.iter().map(|(n, _)| n).cloned().collect()
582+
}
583+
584+
fn insert_webhook(&mut self, name: LSPS5AppName, hook: Webhook) -> bool {
585+
for (n, h) in self.webhooks.iter_mut() {
586+
if *n == name {
587+
*h = hook;
588+
return true;
589+
}
590+
}
591+
592+
self.webhooks.push((name, hook));
593+
false
594+
}
595+
596+
fn remove_webhook(&mut self, name: &LSPS5AppName) -> bool {
597+
let mut removed = false;
598+
self.webhooks.retain(|(n, _)| {
599+
if n != name {
600+
true
601+
} else {
602+
removed = true;
603+
false
604+
}
605+
});
606+
removed
607+
}
608+
609+
fn reset_notification_cooldown(&mut self) {
610+
for (_, h) in self.webhooks.iter_mut() {
611+
h.last_notification_sent = None;
612+
}
613+
}
614+
615+
// Returns whether the entire state is empty and can be pruned.
616+
fn prune_stale_webhooks(&mut self, now: LSPSDateTime) -> bool {
617+
self.webhooks.retain(|(_, webhook)| {
618+
now.duration_since(&webhook.last_used) < MIN_WEBHOOK_RETENTION_DAYS
619+
});
620+
621+
self.webhooks.is_empty()
622+
}
623+
}

0 commit comments

Comments
 (0)