Skip to content

Commit 34d7a9b

Browse files
committed
Make event handling fallible
Previously, we would require our users to handle all events successfully inline or panic will trying to do so. If they would exit the `EventHandler` any other way we'd forget about the event and wouldn't replay them after restart. Here, we implement fallible event handling, allowing the user to return `Err(())` which signals to our event providers they should abort event processing and replay any unhandled events later (i.e., in the next invocation).
1 parent 1d421d3 commit 34d7a9b

File tree

8 files changed

+238
-140
lines changed

8 files changed

+238
-140
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,7 @@ use core::task;
552552
/// # }
553553
/// # struct EventHandler {}
554554
/// # impl EventHandler {
555-
/// # async fn handle_event(&self, _: lightning::events::Event) {}
555+
/// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ()> { Ok(()) }
556556
/// # }
557557
/// # #[derive(Eq, PartialEq, Clone, Hash)]
558558
/// # struct SocketDescriptor {}
@@ -646,7 +646,7 @@ pub async fn process_events_async<
646646
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
647647
L: 'static + Deref + Send + Sync,
648648
P: 'static + Deref + Send + Sync,
649-
EventHandlerFuture: core::future::Future<Output = ()>,
649+
EventHandlerFuture: core::future::Future<Output = Result<(),()>>,
650650
EventHandler: Fn(Event) -> EventHandlerFuture,
651651
PS: 'static + Deref + Send,
652652
M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
@@ -692,12 +692,13 @@ where
692692
if update_scorer(scorer, &event, duration_since_epoch) {
693693
log_trace!(logger, "Persisting scorer after update");
694694
if let Err(e) = persister.persist_scorer(&scorer) {
695-
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
695+
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
696+
return Err(());
696697
}
697698
}
698699
}
699700
}
700-
event_handler(event).await;
701+
event_handler(event).await
701702
}
702703
};
703704
define_run_body!(
@@ -731,7 +732,7 @@ where
731732

732733
#[cfg(feature = "futures")]
733734
async fn process_onion_message_handler_events_async<
734-
EventHandlerFuture: core::future::Future<Output = ()>,
735+
EventHandlerFuture: core::future::Future<Output = Result<(), ()>>,
735736
EventHandler: Fn(Event) -> EventHandlerFuture,
736737
PM: 'static + Deref + Send + Sync,
737738
>(
@@ -741,10 +742,11 @@ where
741742
PM::Target: APeerManager + Send + Sync,
742743
{
743744
let events = core::cell::RefCell::new(Vec::new());
744-
peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e));
745+
peer_manager.onion_message_handler().process_pending_events(&|e| Ok(events.borrow_mut().push(e)));
745746

746747
for event in events.into_inner() {
747-
handler(event).await
748+
// Ignore any errors as onion messages are best effort anyways.
749+
let _ = handler(event).await;
748750
}
749751
}
750752

@@ -846,7 +848,7 @@ impl BackgroundProcessor {
846848
}
847849
}
848850
}
849-
event_handler.handle_event(event);
851+
event_handler.handle_event(event)
850852
};
851853
define_run_body!(
852854
persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
@@ -1424,7 +1426,7 @@ mod tests {
14241426
// Initiate the background processors to watch each node.
14251427
let data_dir = nodes[0].kv_store.get_data_dir();
14261428
let persister = Arc::new(Persister::new(data_dir));
1427-
let event_handler = |_: _| {};
1429+
let event_handler = |_: _| { Ok(()) };
14281430
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
14291431

14301432
macro_rules! check_persisted_data {
@@ -1491,7 +1493,7 @@ mod tests {
14911493
let (_, nodes) = create_nodes(1, "test_timer_tick_called");
14921494
let data_dir = nodes[0].kv_store.get_data_dir();
14931495
let persister = Arc::new(Persister::new(data_dir));
1494-
let event_handler = |_: _| {};
1496+
let event_handler = |_: _| { Ok(()) };
14951497
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
14961498
loop {
14971499
let log_entries = nodes[0].logger.lines.lock().unwrap();
@@ -1520,7 +1522,7 @@ mod tests {
15201522

15211523
let data_dir = nodes[0].kv_store.get_data_dir();
15221524
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1523-
let event_handler = |_: _| {};
1525+
let event_handler = |_: _| { Ok(()) };
15241526
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
15251527
match bg_processor.join() {
15261528
Ok(_) => panic!("Expected error persisting manager"),
@@ -1542,7 +1544,7 @@ mod tests {
15421544
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
15431545

15441546
let bp_future = super::process_events_async(
1545-
persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1547+
persister, |_: _| {async { Ok(()) }}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
15461548
nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
15471549
Some(nodes[0].scorer.clone()), move |dur: Duration| {
15481550
Box::pin(async move {
@@ -1566,7 +1568,7 @@ mod tests {
15661568
let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
15671569
let data_dir = nodes[0].kv_store.get_data_dir();
15681570
let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1569-
let event_handler = |_: _| {};
1571+
let event_handler = |_: _| { Ok(()) };
15701572
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
15711573

15721574
match bg_processor.stop() {
@@ -1584,7 +1586,7 @@ mod tests {
15841586
let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
15851587
let data_dir = nodes[0].kv_store.get_data_dir();
15861588
let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1587-
let event_handler = |_: _| {};
1589+
let event_handler = |_: _| { Ok(()) };
15881590
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
15891591

15901592
match bg_processor.stop() {
@@ -1606,11 +1608,14 @@ mod tests {
16061608
// Set up a background event handler for FundingGenerationReady events.
16071609
let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
16081610
let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
1609-
let event_handler = move |event: Event| match event {
1610-
Event::FundingGenerationReady { .. } => funding_generation_send.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1611-
Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
1612-
Event::ChannelReady { .. } => {},
1613-
_ => panic!("Unexpected event: {:?}", event),
1611+
let event_handler = move |event: Event| {
1612+
match event {
1613+
Event::FundingGenerationReady { .. } => funding_generation_send.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1614+
Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
1615+
Event::ChannelReady { .. } => {},
1616+
_ => panic!("Unexpected event: {:?}", event),
1617+
}
1618+
Ok(())
16141619
};
16151620

16161621
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1646,11 +1651,14 @@ mod tests {
16461651

16471652
// Set up a background event handler for SpendableOutputs events.
16481653
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1649-
let event_handler = move |event: Event| match event {
1650-
Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
1651-
Event::ChannelReady { .. } => {},
1652-
Event::ChannelClosed { .. } => {},
1653-
_ => panic!("Unexpected event: {:?}", event),
1654+
let event_handler = move |event: Event| {
1655+
match event {
1656+
Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
1657+
Event::ChannelReady { .. } => {},
1658+
Event::ChannelClosed { .. } => {},
1659+
_ => panic!("Unexpected event: {:?}", event),
1660+
}
1661+
Ok(())
16541662
};
16551663
let persister = Arc::new(Persister::new(data_dir));
16561664
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1763,7 +1771,7 @@ mod tests {
17631771
let (_, nodes) = create_nodes(2, "test_scorer_persistence");
17641772
let data_dir = nodes[0].kv_store.get_data_dir();
17651773
let persister = Arc::new(Persister::new(data_dir));
1766-
let event_handler = |_: _| {};
1774+
let event_handler = |_: _| { Ok(()) };
17671775
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
17681776

17691777
loop {
@@ -1836,7 +1844,7 @@ mod tests {
18361844
let data_dir = nodes[0].kv_store.get_data_dir();
18371845
let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
18381846

1839-
let event_handler = |_: _| {};
1847+
let event_handler = |_: _| { Ok(()) };
18401848
let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
18411849

18421850
do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes,
@@ -1857,7 +1865,7 @@ mod tests {
18571865

18581866
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
18591867
let bp_future = super::process_events_async(
1860-
persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1868+
persister, |_: _| {async { Ok(()) }}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
18611869
nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
18621870
Some(nodes[0].scorer.clone()), move |dur: Duration| {
18631871
let mut exit_receiver = exit_receiver.clone();
@@ -1984,12 +1992,15 @@ mod tests {
19841992
#[test]
19851993
fn test_payment_path_scoring() {
19861994
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1987-
let event_handler = move |event: Event| match event {
1988-
Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1989-
Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1990-
Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1991-
Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1992-
_ => panic!("Unexpected event: {:?}", event),
1995+
let event_handler = move |event: Event| {
1996+
match event {
1997+
Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1998+
Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1999+
Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
2000+
Event::ProbeFailed { .. } => sender.send(event).unwrap(),
2001+
_ => panic!("Unexpected event: {:?}", event),
2002+
}
2003+
Ok(())
19932004
};
19942005

19952006
let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
@@ -2022,6 +2033,7 @@ mod tests {
20222033
Event::ProbeFailed { .. } => { sender_ref.send(event).await.unwrap() },
20232034
_ => panic!("Unexpected event: {:?}", event),
20242035
}
2036+
Ok(())
20252037
}
20262038
};
20272039

lightning-invoice/src/utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1390,6 +1390,7 @@ mod test {
13901390
} else {
13911391
other_events.borrow_mut().push(event);
13921392
}
1393+
Ok(())
13931394
};
13941395
nodes[fwd_idx].node.process_pending_events(&forward_event_handler);
13951396
nodes[fwd_idx].node.process_pending_events(&forward_event_handler);

lightning/src/chain/chainmonitor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ where C::Target: chain::Filter,
516516
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
517517
use crate::events::EventsProvider;
518518
let events = core::cell::RefCell::new(Vec::new());
519-
let event_handler = |event: events::Event| events.borrow_mut().push(event);
519+
let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event));
520520
self.process_pending_events(&event_handler);
521521
events.into_inner()
522522
}
@@ -527,7 +527,7 @@ where C::Target: chain::Filter,
527527
/// See the trait-level documentation of [`EventsProvider`] for requirements.
528528
///
529529
/// [`EventsProvider`]: crate::events::EventsProvider
530-
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
530+
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(),()>>, H: Fn(Event) -> Future>(
531531
&self, handler: H
532532
) {
533533
// Sadly we can't hold the monitors read lock through an async call. Thus we have to do a

lightning/src/chain/channelmonitor.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1169,19 +1169,36 @@ macro_rules! _process_events_body {
11691169
pending_events = inner.pending_events.clone();
11701170
repeated_events = inner.get_repeated_events();
11711171
} else { break; }
1172-
let num_events = pending_events.len();
11731172

1174-
for event in pending_events.into_iter().chain(repeated_events.into_iter()) {
1173+
let mut num_handled_events = 0;
1174+
let mut handling_failed = false;
1175+
for event in pending_events.into_iter() {
11751176
$event_to_handle = event;
1176-
$handle_event;
1177+
match $handle_event {
1178+
Ok(()) => num_handled_events += 1,
1179+
Err(()) => {
1180+
// If we encounter an error we stop handling events and make sure to replay
1181+
// any unhandled events on the next invocation.
1182+
handling_failed = true;
1183+
break;
1184+
}
1185+
}
1186+
}
1187+
1188+
for event in repeated_events.into_iter() {
1189+
// For repeated events we ignore any errors as they will be replayed eventually
1190+
// anyways.
1191+
$event_to_handle = event;
1192+
$handle_event.ok();
11771193
}
11781194

11791195
if let Some(us) = $self_opt {
11801196
let mut inner = us.inner.lock().unwrap();
1181-
inner.pending_events.drain(..num_events);
1197+
inner.pending_events.drain(..num_handled_events);
11821198
inner.is_processing_pending_events = false;
1183-
if !inner.pending_events.is_empty() {
1184-
// If there's more events to process, go ahead and do so.
1199+
if !handling_failed && !inner.pending_events.is_empty() {
1200+
// If there's more events to process and we didn't fail so far, go ahead and do
1201+
// so.
11851202
continue;
11861203
}
11871204
}
@@ -1507,7 +1524,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
15071524
/// Processes any events asynchronously.
15081525
///
15091526
/// See [`Self::process_pending_events`] for more information.
1510-
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
1527+
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ()>>, H: Fn(Event) -> Future>(
15111528
&self, handler: &H
15121529
) {
15131530
let mut ev;

lightning/src/events/mod.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2193,8 +2193,10 @@ pub trait MessageSendEventsProvider {
21932193
///
21942194
/// In order to ensure no [`Event`]s are lost, implementors of this trait will persist [`Event`]s
21952195
/// and replay any unhandled events on startup. An [`Event`] is considered handled when
2196-
/// [`process_pending_events`] returns, thus handlers MUST fully handle [`Event`]s and persist any
2197-
/// relevant changes to disk *before* returning.
2196+
/// [`process_pending_events`] returns `Ok(())`, thus handlers MUST fully handle [`Event`]s and
2197+
/// persist any relevant changes to disk *before* returning `Ok(())`. In case of a (e.g.,
2198+
/// persistence failure) implementors should return `Err(())`, signalling to the [`EventsProvider`]
2199+
/// to replay unhandled events on the next invocation.
21982200
///
21992201
/// Further, because an application may crash between an [`Event`] being handled and the
22002202
/// implementor of this trait being re-serialized, [`Event`] handling must be idempotent - in
@@ -2225,22 +2227,22 @@ pub trait EventsProvider {
22252227
///
22262228
/// An async variation also exists for implementations of [`EventsProvider`] that support async
22272229
/// event handling. The async event handler should satisfy the generic bounds: `F:
2228-
/// core::future::Future, H: Fn(Event) -> F`.
2230+
/// core::future::Future<Output = Result<(), ()>>, H: Fn(Event) -> F`.
22292231
pub trait EventHandler {
22302232
/// Handles the given [`Event`].
22312233
///
22322234
/// See [`EventsProvider`] for details that must be considered when implementing this method.
2233-
fn handle_event(&self, event: Event);
2235+
fn handle_event(&self, event: Event) -> Result<(), ()>;
22342236
}
22352237

2236-
impl<F> EventHandler for F where F: Fn(Event) {
2237-
fn handle_event(&self, event: Event) {
2238+
impl<F> EventHandler for F where F: Fn(Event) -> Result<(), ()> {
2239+
fn handle_event(&self, event: Event) -> Result<(), ()> {
22382240
self(event)
22392241
}
22402242
}
22412243

22422244
impl<T: EventHandler> EventHandler for Arc<T> {
2243-
fn handle_event(&self, event: Event) {
2245+
fn handle_event(&self, event: Event) -> Result<(), ()> {
22442246
self.deref().handle_event(event)
22452247
}
22462248
}

0 commit comments

Comments
 (0)