@@ -26,6 +26,8 @@ use lightning::chain::chainmonitor::{ChainMonitor, Persist};
26
26
use lightning::events::EventHandler;
27
27
#[cfg(feature = "std")]
28
28
use lightning::events::EventsProvider;
29
+ #[cfg(feature = "futures")]
30
+ use lightning::events::ReplayEvent;
29
31
use lightning::events::{Event, PathFailure};
30
32
31
33
use lightning::ln::channelmanager::AChannelManager;
@@ -583,6 +585,7 @@ use futures_util::{dummy_waker, Selector, SelectorOutput};
583
585
/// could setup `process_events_async` like this:
584
586
/// ```
585
587
/// # use lightning::io;
588
+ /// # use lightning::events::ReplayEvent;
586
589
/// # use std::sync::{Arc, RwLock};
587
590
/// # use std::sync::atomic::{AtomicBool, Ordering};
588
591
/// # use std::time::SystemTime;
@@ -600,7 +603,7 @@ use futures_util::{dummy_waker, Selector, SelectorOutput};
600
603
/// # }
601
604
/// # struct EventHandler {}
602
605
/// # impl EventHandler {
603
- /// # async fn handle_event(&self, _: lightning::events::Event) { }
606
+ /// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
604
607
/// # }
605
608
/// # #[derive(Eq, PartialEq, Clone, Hash)]
606
609
/// # struct SocketDescriptor {}
@@ -698,7 +701,7 @@ pub async fn process_events_async<
698
701
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
699
702
L: 'static + Deref + Send + Sync,
700
703
P: 'static + Deref + Send + Sync,
701
- EventHandlerFuture: core::future::Future<Output = () >,
704
+ EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent> >,
702
705
EventHandler: Fn(Event) -> EventHandlerFuture,
703
706
PS: 'static + Deref + Send,
704
707
M: 'static
@@ -751,12 +754,16 @@ where
751
754
if update_scorer(scorer, &event, duration_since_epoch) {
752
755
log_trace!(logger, "Persisting scorer after update");
753
756
if let Err(e) = persister.persist_scorer(&scorer) {
754
- log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
757
+ log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
758
+ // We opt not to abort early on persistence failure here as persisting
759
+ // the scorer is non-critical and we still hope that it will have
760
+ // resolved itself when it is potentially critical in event handling
761
+ // below.
755
762
}
756
763
}
757
764
}
758
765
}
759
- event_handler(event).await;
766
+ event_handler(event).await
760
767
})
761
768
};
762
769
define_run_body!(
@@ -913,7 +920,7 @@ impl BackgroundProcessor {
913
920
}
914
921
}
915
922
}
916
- event_handler.handle_event(event);
923
+ event_handler.handle_event(event)
917
924
};
918
925
define_run_body!(
919
926
persister,
@@ -1012,10 +1019,13 @@ mod tests {
1012
1019
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
1013
1020
use bitcoin::transaction::Version;
1014
1021
use bitcoin::{Amount, ScriptBuf, Txid};
1022
+ use core::sync::atomic::{AtomicBool, Ordering};
1015
1023
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
1016
1024
use lightning::chain::transaction::OutPoint;
1017
1025
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1018
- use lightning::events::{Event, MessageSendEvent, MessageSendEventsProvider, PathFailure};
1026
+ use lightning::events::{
1027
+ Event, MessageSendEvent, MessageSendEventsProvider, PathFailure, ReplayEvent,
1028
+ };
1019
1029
use lightning::ln::channelmanager;
1020
1030
use lightning::ln::channelmanager::{
1021
1031
ChainParameters, PaymentId, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA,
@@ -1757,7 +1767,7 @@ mod tests {
1757
1767
// Initiate the background processors to watch each node.
1758
1768
let data_dir = nodes[0].kv_store.get_data_dir();
1759
1769
let persister = Arc::new(Persister::new(data_dir));
1760
- let event_handler = |_: _| {} ;
1770
+ let event_handler = |_: _| Ok(()) ;
1761
1771
let bg_processor = BackgroundProcessor::start(
1762
1772
persister,
1763
1773
event_handler,
@@ -1847,7 +1857,7 @@ mod tests {
1847
1857
let (_, nodes) = create_nodes(1, "test_timer_tick_called");
1848
1858
let data_dir = nodes[0].kv_store.get_data_dir();
1849
1859
let persister = Arc::new(Persister::new(data_dir));
1850
- let event_handler = |_: _| {} ;
1860
+ let event_handler = |_: _| Ok(()) ;
1851
1861
let bg_processor = BackgroundProcessor::start(
1852
1862
persister,
1853
1863
event_handler,
@@ -1889,7 +1899,7 @@ mod tests {
1889
1899
let persister = Arc::new(
1890
1900
Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
1891
1901
);
1892
- let event_handler = |_: _| {} ;
1902
+ let event_handler = |_: _| Ok(()) ;
1893
1903
let bg_processor = BackgroundProcessor::start(
1894
1904
persister,
1895
1905
event_handler,
@@ -1924,7 +1934,7 @@ mod tests {
1924
1934
1925
1935
let bp_future = super::process_events_async(
1926
1936
persister,
1927
- |_: _| async {},
1937
+ |_: _| async { Ok(()) },
1928
1938
nodes[0].chain_monitor.clone(),
1929
1939
nodes[0].node.clone(),
1930
1940
Some(nodes[0].messenger.clone()),
@@ -1957,7 +1967,7 @@ mod tests {
1957
1967
let data_dir = nodes[0].kv_store.get_data_dir();
1958
1968
let persister =
1959
1969
Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1960
- let event_handler = |_: _| {} ;
1970
+ let event_handler = |_: _| Ok(()) ;
1961
1971
let bg_processor = BackgroundProcessor::start(
1962
1972
persister,
1963
1973
event_handler,
@@ -1986,7 +1996,7 @@ mod tests {
1986
1996
let data_dir = nodes[0].kv_store.get_data_dir();
1987
1997
let persister =
1988
1998
Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1989
- let event_handler = |_: _| {} ;
1999
+ let event_handler = |_: _| Ok(()) ;
1990
2000
let bg_processor = BackgroundProcessor::start(
1991
2001
persister,
1992
2002
event_handler,
@@ -2021,13 +2031,16 @@ mod tests {
2021
2031
// Set up a background event handler for FundingGenerationReady events.
2022
2032
let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
2023
2033
let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
2024
- let event_handler = move |event: Event| match event {
2025
- Event::FundingGenerationReady { .. } => funding_generation_send
2026
- .send(handle_funding_generation_ready!(event, channel_value))
2027
- .unwrap(),
2028
- Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
2029
- Event::ChannelReady { .. } => {},
2030
- _ => panic!("Unexpected event: {:?}", event),
2034
+ let event_handler = move |event: Event| {
2035
+ match event {
2036
+ Event::FundingGenerationReady { .. } => funding_generation_send
2037
+ .send(handle_funding_generation_ready!(event, channel_value))
2038
+ .unwrap(),
2039
+ Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
2040
+ Event::ChannelReady { .. } => {},
2041
+ _ => panic!("Unexpected event: {:?}", event),
2042
+ }
2043
+ Ok(())
2031
2044
};
2032
2045
2033
2046
let bg_processor = BackgroundProcessor::start(
@@ -2082,11 +2095,14 @@ mod tests {
2082
2095
2083
2096
// Set up a background event handler for SpendableOutputs events.
2084
2097
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
2085
- let event_handler = move |event: Event| match event {
2086
- Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
2087
- Event::ChannelReady { .. } => {},
2088
- Event::ChannelClosed { .. } => {},
2089
- _ => panic!("Unexpected event: {:?}", event),
2098
+ let event_handler = move |event: Event| {
2099
+ match event {
2100
+ Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
2101
+ Event::ChannelReady { .. } => {},
2102
+ Event::ChannelClosed { .. } => {},
2103
+ _ => panic!("Unexpected event: {:?}", event),
2104
+ }
2105
+ Ok(())
2090
2106
};
2091
2107
let persister = Arc::new(Persister::new(data_dir));
2092
2108
let bg_processor = BackgroundProcessor::start(
@@ -2215,12 +2231,60 @@ mod tests {
2215
2231
}
2216
2232
}
2217
2233
2234
+ #[test]
2235
+ fn test_event_handling_failures_are_replayed() {
2236
+ let (_, nodes) = create_nodes(2, "test_event_handling_failures_are_replayed");
2237
+ let channel_value = 100000;
2238
+ let data_dir = nodes[0].kv_store.get_data_dir();
2239
+ let persister = Arc::new(Persister::new(data_dir.clone()));
2240
+
2241
+ let (first_event_send, first_event_recv) = std::sync::mpsc::sync_channel(1);
2242
+ let (second_event_send, second_event_recv) = std::sync::mpsc::sync_channel(1);
2243
+ let should_fail_event_handling = Arc::new(AtomicBool::new(true));
2244
+ let event_handler = move |event: Event| {
2245
+ if let Ok(true) = should_fail_event_handling.compare_exchange(
2246
+ true,
2247
+ false,
2248
+ Ordering::Acquire,
2249
+ Ordering::Relaxed,
2250
+ ) {
2251
+ first_event_send.send(event).unwrap();
2252
+ return Err(ReplayEvent());
2253
+ }
2254
+
2255
+ second_event_send.send(event).unwrap();
2256
+ Ok(())
2257
+ };
2258
+
2259
+ let bg_processor = BackgroundProcessor::start(
2260
+ persister,
2261
+ event_handler,
2262
+ nodes[0].chain_monitor.clone(),
2263
+ nodes[0].node.clone(),
2264
+ Some(nodes[0].messenger.clone()),
2265
+ nodes[0].no_gossip_sync(),
2266
+ nodes[0].peer_manager.clone(),
2267
+ nodes[0].logger.clone(),
2268
+ Some(nodes[0].scorer.clone()),
2269
+ );
2270
+
2271
+ begin_open_channel!(nodes[0], nodes[1], channel_value);
2272
+ assert_eq!(
2273
+ first_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE)),
2274
+ second_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
2275
+ );
2276
+
2277
+ if !std::thread::panicking() {
2278
+ bg_processor.stop().unwrap();
2279
+ }
2280
+ }
2281
+
2218
2282
#[test]
2219
2283
fn test_scorer_persistence() {
2220
2284
let (_, nodes) = create_nodes(2, "test_scorer_persistence");
2221
2285
let data_dir = nodes[0].kv_store.get_data_dir();
2222
2286
let persister = Arc::new(Persister::new(data_dir));
2223
- let event_handler = |_: _| {} ;
2287
+ let event_handler = |_: _| Ok(()) ;
2224
2288
let bg_processor = BackgroundProcessor::start(
2225
2289
persister,
2226
2290
event_handler,
@@ -2315,7 +2379,7 @@ mod tests {
2315
2379
let data_dir = nodes[0].kv_store.get_data_dir();
2316
2380
let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
2317
2381
2318
- let event_handler = |_: _| {} ;
2382
+ let event_handler = |_: _| Ok(()) ;
2319
2383
let background_processor = BackgroundProcessor::start(
2320
2384
persister,
2321
2385
event_handler,
@@ -2350,7 +2414,7 @@ mod tests {
2350
2414
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
2351
2415
let bp_future = super::process_events_async(
2352
2416
persister,
2353
- |_: _| async {},
2417
+ |_: _| async { Ok(()) },
2354
2418
nodes[0].chain_monitor.clone(),
2355
2419
nodes[0].node.clone(),
2356
2420
Some(nodes[0].messenger.clone()),
@@ -2492,12 +2556,15 @@ mod tests {
2492
2556
#[test]
2493
2557
fn test_payment_path_scoring() {
2494
2558
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
2495
- let event_handler = move |event: Event| match event {
2496
- Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
2497
- Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
2498
- Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
2499
- Event::ProbeFailed { .. } => sender.send(event).unwrap(),
2500
- _ => panic!("Unexpected event: {:?}", event),
2559
+ let event_handler = move |event: Event| {
2560
+ match event {
2561
+ Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
2562
+ Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
2563
+ Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
2564
+ Event::ProbeFailed { .. } => sender.send(event).unwrap(),
2565
+ _ => panic!("Unexpected event: {:?}", event),
2566
+ }
2567
+ Ok(())
2501
2568
};
2502
2569
2503
2570
let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
@@ -2543,6 +2610,7 @@ mod tests {
2543
2610
Event::ProbeFailed { .. } => sender_ref.send(event).await.unwrap(),
2544
2611
_ => panic!("Unexpected event: {:?}", event),
2545
2612
}
2613
+ Ok(())
2546
2614
}
2547
2615
};
2548
2616
0 commit comments