Skip to content

Commit ce89e6b

Browse files
committed
Refactor LSPS5ServiceHandler to hold a PeerState
Going forward, we'll add serialization logic for LSPS5 types. To contain the persisted state a bit better (and to align the model with LSPS1/2), we refactor the `LSPS5ServiceHandler` to hold a `PeerState` object.
1 parent b3e9bc6 commit ce89e6b

File tree

2 files changed

+207
-100
lines changed

2 files changed

+207
-100
lines changed

lightning-liquidity/src/lsps5/service.rs

Lines changed: 145 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@ use crate::lsps5::msgs::{
1717
SetWebhookRequest, SetWebhookResponse, WebhookNotification, WebhookNotificationMethod,
1818
};
1919
use crate::message_queue::MessageQueue;
20-
use crate::prelude::hash_map::Entry;
2120
use crate::prelude::*;
22-
use crate::sync::{Arc, Mutex};
21+
use crate::sync::{Arc, Mutex, RwLock, RwLockWriteGuard};
2322
use crate::utils::time::TimeProvider;
2423

2524
use bitcoin::secp256k1::PublicKey;
@@ -117,7 +116,7 @@ where
117116
TP::Target: TimeProvider,
118117
{
119118
config: LSPS5ServiceConfig,
120-
webhooks: Mutex<HashMap<PublicKey, HashMap<LSPS5AppName, Webhook>>>,
119+
per_peer_state: RwLock<HashMap<PublicKey, PeerState>>,
121120
event_queue: Arc<EventQueue>,
122121
pending_messages: Arc<MessageQueue>,
123122
time_provider: TP,
@@ -140,7 +139,7 @@ where
140139
assert!(config.max_webhooks_per_client > 0, "`max_webhooks_per_client` must be > 0");
141140
Self {
142141
config,
143-
webhooks: Mutex::new(new_hash_map()),
142+
per_peer_state: RwLock::new(new_hash_map()),
144143
event_queue,
145144
pending_messages,
146145
time_provider,
@@ -150,18 +149,26 @@ where
150149
}
151150
}
152151

153-
fn check_prune_stale_webhooks(&self) {
152+
fn check_prune_stale_webhooks<'a>(
153+
&self, outer_state_lock: &mut RwLockWriteGuard<'a, HashMap<PublicKey, PeerState>>,
154+
) {
155+
let mut last_pruning = self.last_pruning.lock().unwrap();
154156
let now =
155157
LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch());
156-
let should_prune = {
157-
let last_pruning = self.last_pruning.lock().unwrap();
158-
last_pruning.as_ref().map_or(true, |last_time| {
159-
now.duration_since(&last_time) > PRUNE_STALE_WEBHOOKS_INTERVAL_DAYS
160-
})
161-
};
158+
159+
let should_prune = last_pruning.as_ref().map_or(true, |last_time| {
160+
now.duration_since(&last_time) > PRUNE_STALE_WEBHOOKS_INTERVAL_DAYS
161+
});
162162

163163
if should_prune {
164-
self.prune_stale_webhooks();
164+
outer_state_lock.retain(|client_id, peer_state| {
165+
if self.client_has_open_channel(client_id) {
166+
// Don't prune clients with open channels
167+
return true;
168+
}
169+
!peer_state.prune_stale_webhooks(now)
170+
});
171+
*last_pruning = Some(now);
165172
}
166173
}
167174

@@ -171,58 +178,56 @@ where
171178
) -> Result<(), LightningError> {
172179
let mut message_queue_notifier = self.pending_messages.notifier();
173180

174-
self.check_prune_stale_webhooks();
181+
let mut outer_state_lock = self.per_peer_state.write().unwrap();
182+
self.check_prune_stale_webhooks(&mut outer_state_lock);
175183

176-
let mut webhooks = self.webhooks.lock().unwrap();
184+
let peer_state =
185+
outer_state_lock.entry(counterparty_node_id).or_insert_with(PeerState::default);
177186

178-
let client_webhooks = webhooks.entry(counterparty_node_id).or_insert_with(new_hash_map);
179187
let now =
180188
LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch());
181189

182-
let num_webhooks = client_webhooks.len();
190+
let num_webhooks = peer_state.webhooks_len();
183191
let mut no_change = false;
184-
match client_webhooks.entry(params.app_name.clone()) {
185-
Entry::Occupied(mut entry) => {
186-
no_change = entry.get().url == params.webhook;
187-
let last_used = if no_change { entry.get().last_used } else { now };
188-
let last_notification_sent = entry.get().last_notification_sent;
189-
entry.insert(Webhook {
190-
_app_name: params.app_name.clone(),
191-
url: params.webhook.clone(),
192-
_counterparty_node_id: counterparty_node_id,
193-
last_used,
194-
last_notification_sent,
195-
});
196-
},
197-
Entry::Vacant(entry) => {
198-
if num_webhooks >= self.config.max_webhooks_per_client as usize {
199-
let error = LSPS5ProtocolError::TooManyWebhooks;
200-
let msg = LSPS5Message::Response(
201-
request_id,
202-
LSPS5Response::SetWebhookError(error.clone().into()),
203-
)
204-
.into();
205-
message_queue_notifier.enqueue(&counterparty_node_id, msg);
206-
return Err(LightningError {
207-
err: error.message().into(),
208-
action: ErrorAction::IgnoreAndLog(Level::Info),
209-
});
210-
}
211192

212-
entry.insert(Webhook {
213-
_app_name: params.app_name.clone(),
214-
url: params.webhook.clone(),
215-
_counterparty_node_id: counterparty_node_id,
216-
last_used: now,
217-
last_notification_sent: None,
193+
if let Some(webhook) = peer_state.webhook_mut(&params.app_name) {
194+
no_change = webhook.url == params.webhook;
195+
if !no_change {
196+
// The URL was updated.
197+
webhook.url = params.webhook.clone();
198+
webhook.last_used = now;
199+
webhook.last_notification_sent = None;
200+
}
201+
} else {
202+
if num_webhooks >= self.config.max_webhooks_per_client as usize {
203+
let error = LSPS5ProtocolError::TooManyWebhooks;
204+
let msg = LSPS5Message::Response(
205+
request_id,
206+
LSPS5Response::SetWebhookError(error.clone().into()),
207+
)
208+
.into();
209+
message_queue_notifier.enqueue(&counterparty_node_id, msg);
210+
return Err(LightningError {
211+
err: error.message().into(),
212+
action: ErrorAction::IgnoreAndLog(Level::Info),
218213
});
219-
},
214+
}
215+
216+
let webhook = Webhook {
217+
_app_name: params.app_name.clone(),
218+
url: params.webhook.clone(),
219+
_counterparty_node_id: counterparty_node_id,
220+
last_used: now,
221+
last_notification_sent: None,
222+
};
223+
224+
peer_state.insert_webhook(params.app_name.clone(), webhook);
220225
}
221226

222227
if !no_change {
223228
self.send_webhook_registered_notification(
224229
counterparty_node_id,
225-
params.app_name,
230+
params.app_name.clone(),
226231
params.webhook,
227232
)
228233
.map_err(|e| {
@@ -242,7 +247,7 @@ where
242247
let msg = LSPS5Message::Response(
243248
request_id,
244249
LSPS5Response::SetWebhook(SetWebhookResponse {
245-
num_webhooks: client_webhooks.len() as u32,
250+
num_webhooks: peer_state.webhooks_len() as u32,
246251
max_webhooks: self.config.max_webhooks_per_client,
247252
no_change,
248253
}),
@@ -258,14 +263,11 @@ where
258263
) -> Result<(), LightningError> {
259264
let mut message_queue_notifier = self.pending_messages.notifier();
260265

261-
self.check_prune_stale_webhooks();
262-
263-
let webhooks = self.webhooks.lock().unwrap();
266+
let mut outer_state_lock = self.per_peer_state.write().unwrap();
267+
self.check_prune_stale_webhooks(&mut outer_state_lock);
264268

265-
let app_names = webhooks
266-
.get(&counterparty_node_id)
267-
.map(|client_webhooks| client_webhooks.keys().cloned().collect::<Vec<_>>())
268-
.unwrap_or_else(Vec::new);
269+
let app_names =
270+
outer_state_lock.get(&counterparty_node_id).map(|p| p.app_names()).unwrap_or_default();
269271

270272
let max_webhooks = self.config.max_webhooks_per_client;
271273

@@ -282,12 +284,11 @@ where
282284
) -> Result<(), LightningError> {
283285
let mut message_queue_notifier = self.pending_messages.notifier();
284286

285-
self.check_prune_stale_webhooks();
287+
let mut outer_state_lock = self.per_peer_state.write().unwrap();
288+
self.check_prune_stale_webhooks(&mut outer_state_lock);
286289

287-
let mut webhooks = self.webhooks.lock().unwrap();
288-
289-
if let Some(client_webhooks) = webhooks.get_mut(&counterparty_node_id) {
290-
if client_webhooks.remove(&params.app_name).is_some() {
290+
if let Some(peer_state) = outer_state_lock.get_mut(&counterparty_node_id) {
291+
if peer_state.remove_webhook(&params.app_name) {
291292
let response = RemoveWebhookResponse {};
292293
let msg =
293294
LSPS5Message::Response(request_id, LSPS5Response::RemoveWebhook(response))
@@ -408,11 +409,13 @@ where
408409
fn send_notifications_to_client_webhooks(
409410
&self, client_id: PublicKey, notification: WebhookNotification,
410411
) -> Result<(), LSPS5ProtocolError> {
411-
let mut webhooks = self.webhooks.lock().unwrap();
412+
let mut outer_state_lock = self.per_peer_state.write().unwrap();
413+
self.check_prune_stale_webhooks(&mut outer_state_lock);
412414

413-
let client_webhooks = match webhooks.get_mut(&client_id) {
414-
Some(webhooks) if !webhooks.is_empty() => webhooks,
415-
_ => return Ok(()),
415+
let peer_state = if let Some(peer_state) = outer_state_lock.get_mut(&client_id) {
416+
peer_state
417+
} else {
418+
return Ok(());
416419
};
417420

418421
let now =
@@ -421,7 +424,7 @@ where
421424
// We must avoid sending multiple notifications of the same method
422425
// (other than lsps5.webhook_registered) close in time.
423426
if notification.method != WebhookNotificationMethod::LSPS5WebhookRegistered {
424-
let rate_limit_applies = client_webhooks.iter().any(|(_, webhook)| {
427+
let rate_limit_applies = peer_state.webhooks().iter().any(|(_, webhook)| {
425428
webhook.last_notification_sent.as_ref().map_or(false, |last_sent| {
426429
now.duration_since(&last_sent) < NOTIFICATION_COOLDOWN_TIME
427430
})
@@ -432,7 +435,7 @@ where
432435
}
433436
}
434437

435-
for (app_name, webhook) in client_webhooks.iter_mut() {
438+
for (app_name, webhook) in peer_state.webhooks_mut().iter_mut() {
436439
self.send_notification(
437440
client_id,
438441
app_name.clone(),
@@ -490,26 +493,6 @@ where
490493
.map_err(|_| LSPS5ProtocolError::UnknownError)
491494
}
492495

493-
fn prune_stale_webhooks(&self) {
494-
let now =
495-
LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch());
496-
let mut webhooks = self.webhooks.lock().unwrap();
497-
498-
webhooks.retain(|client_id, client_webhooks| {
499-
if !self.client_has_open_channel(client_id) {
500-
client_webhooks.retain(|_, webhook| {
501-
now.duration_since(&webhook.last_used) < MIN_WEBHOOK_RETENTION_DAYS
502-
});
503-
!client_webhooks.is_empty()
504-
} else {
505-
true
506-
}
507-
});
508-
509-
let mut last_pruning = self.last_pruning.lock().unwrap();
510-
*last_pruning = Some(now);
511-
}
512-
513496
fn client_has_open_channel(&self, client_id: &PublicKey) -> bool {
514497
self.channel_manager
515498
.get_cm()
@@ -519,20 +502,16 @@ where
519502
}
520503

521504
pub(crate) fn peer_connected(&self, counterparty_node_id: &PublicKey) {
522-
let mut webhooks = self.webhooks.lock().unwrap();
523-
if let Some(client_webhooks) = webhooks.get_mut(counterparty_node_id) {
524-
for webhook in client_webhooks.values_mut() {
525-
webhook.last_notification_sent = None;
526-
}
505+
let mut outer_state_lock = self.per_peer_state.write().unwrap();
506+
if let Some(peer_state) = outer_state_lock.get_mut(counterparty_node_id) {
507+
peer_state.reset_notification_cooldown();
527508
}
528509
}
529510

530511
pub(crate) fn peer_disconnected(&self, counterparty_node_id: &PublicKey) {
531-
let mut webhooks = self.webhooks.lock().unwrap();
532-
if let Some(client_webhooks) = webhooks.get_mut(counterparty_node_id) {
533-
for webhook in client_webhooks.values_mut() {
534-
webhook.last_notification_sent = None;
535-
}
512+
let mut outer_state_lock = self.per_peer_state.write().unwrap();
513+
if let Some(peer_state) = outer_state_lock.get_mut(counterparty_node_id) {
514+
peer_state.reset_notification_cooldown();
536515
}
537516
}
538517
}
@@ -578,3 +557,69 @@ where
578557
}
579558
}
580559
}
560+
561+
#[derive(Debug, Default)]
562+
struct PeerState {
563+
webhooks: Vec<(LSPS5AppName, Webhook)>,
564+
}
565+
566+
impl PeerState {
567+
fn webhook_mut(&mut self, name: &LSPS5AppName) -> Option<&mut Webhook> {
568+
self.webhooks.iter_mut().find_map(|(n, h)| if n == name { Some(h) } else { None })
569+
}
570+
571+
fn webhooks(&self) -> &Vec<(LSPS5AppName, Webhook)> {
572+
&self.webhooks
573+
}
574+
575+
fn webhooks_mut(&mut self) -> &mut Vec<(LSPS5AppName, Webhook)> {
576+
&mut self.webhooks
577+
}
578+
579+
fn webhooks_len(&self) -> usize {
580+
self.webhooks.len()
581+
}
582+
583+
fn app_names(&self) -> Vec<LSPS5AppName> {
584+
self.webhooks.iter().map(|(n, _)| n).cloned().collect()
585+
}
586+
587+
fn insert_webhook(&mut self, name: LSPS5AppName, hook: Webhook) {
588+
for (n, h) in self.webhooks.iter_mut() {
589+
if *n == name {
590+
*h = hook;
591+
return;
592+
}
593+
}
594+
595+
self.webhooks.push((name, hook));
596+
}
597+
598+
fn remove_webhook(&mut self, name: &LSPS5AppName) -> bool {
599+
let mut removed = false;
600+
self.webhooks.retain(|(n, _)| {
601+
if n != name {
602+
true
603+
} else {
604+
removed = true;
605+
false
606+
}
607+
});
608+
removed
609+
}
610+
611+
fn reset_notification_cooldown(&mut self) {
612+
for (_, h) in self.webhooks.iter_mut() {
613+
h.last_notification_sent = None;
614+
}
615+
}
616+
617+
// Returns whether the entire state is empty and can be pruned.
618+
fn prune_stale_webhooks(&mut self, now: LSPSDateTime) -> bool {
619+
self.webhooks.retain(|(_, webhook)| {
620+
now.duration_since(&webhook.last_used) < MIN_WEBHOOK_RETENTION_DAYS
621+
});
622+
623+
self.webhooks.is_empty()
624+
}
625+
}

0 commit comments

Comments
 (0)