@@ -833,12 +833,14 @@ where
833
833
mod tests {
834
834
use super :: * ;
835
835
use lightning:: util:: test_utils:: { TestLogger , TestStore } ;
836
+ use std:: sync:: atomic:: { AtomicU16 , Ordering } ;
837
+ use std:: time:: Duration ;
836
838
837
- #[ test]
838
- fn event_queue_persistence ( ) {
839
+ #[ tokio :: test]
840
+ async fn event_queue_persistence ( ) {
839
841
let store = Arc :: new ( TestStore :: new ( false ) ) ;
840
842
let logger = Arc :: new ( TestLogger :: new ( ) ) ;
841
- let event_queue = EventQueue :: new ( Arc :: clone ( & store) , Arc :: clone ( & logger) ) ;
843
+ let event_queue = Arc :: new ( EventQueue :: new ( Arc :: clone ( & store) , Arc :: clone ( & logger) ) ) ;
842
844
assert_eq ! ( event_queue. next_event( ) , None ) ;
843
845
844
846
let expected_event = Event :: ChannelReady {
@@ -851,6 +853,7 @@ mod tests {
851
853
// Check we get the expected event and that it is returned until we mark it handled.
852
854
for _ in 0 ..5 {
853
855
assert_eq ! ( event_queue. wait_next_event( ) , expected_event) ;
856
+ assert_eq ! ( event_queue. next_event_async( ) . await , expected_event) ;
854
857
assert_eq ! ( event_queue. next_event( ) , Some ( expected_event. clone( ) ) ) ;
855
858
}
856
859
@@ -869,4 +872,96 @@ mod tests {
869
872
event_queue. event_handled ( ) . unwrap ( ) ;
870
873
assert_eq ! ( event_queue. next_event( ) , None ) ;
871
874
}
875
+
876
+ #[ tokio:: test]
877
+ async fn event_queue_concurrency ( ) {
878
+ let store = Arc :: new ( TestStore :: new ( false ) ) ;
879
+ let logger = Arc :: new ( TestLogger :: new ( ) ) ;
880
+ let event_queue = Arc :: new ( EventQueue :: new ( Arc :: clone ( & store) , Arc :: clone ( & logger) ) ) ;
881
+ assert_eq ! ( event_queue. next_event( ) , None ) ;
882
+
883
+ let expected_event = Event :: ChannelReady {
884
+ channel_id : ChannelId ( [ 23u8 ; 32 ] ) ,
885
+ user_channel_id : UserChannelId ( 2323 ) ,
886
+ counterparty_node_id : None ,
887
+ } ;
888
+
889
+ // Check `next_event_async` won't return if the queue is empty and always rather timeout.
890
+ tokio:: select! {
891
+ _ = tokio:: time:: sleep( Duration :: from_secs( 1 ) ) => {
892
+ // Timeout
893
+ }
894
+ _ = event_queue. next_event_async( ) => {
895
+ panic!( ) ;
896
+ }
897
+ }
898
+
899
+ assert_eq ! ( event_queue. next_event( ) , None ) ;
900
+ // Check we get the expected number of events when polling/enqueuing concurrently.
901
+ let enqueued_events = AtomicU16 :: new ( 0 ) ;
902
+ let received_events = AtomicU16 :: new ( 0 ) ;
903
+ let mut delayed_enqueue = false ;
904
+
905
+ for _ in 0 ..25 {
906
+ event_queue. add_event ( expected_event. clone ( ) ) . unwrap ( ) ;
907
+ enqueued_events. fetch_add ( 1 , Ordering :: SeqCst ) ;
908
+ }
909
+
910
+ loop {
911
+ tokio:: select! {
912
+ _ = tokio:: time:: sleep( Duration :: from_millis( 10 ) ) , if !delayed_enqueue => {
913
+ event_queue. add_event( expected_event. clone( ) ) . unwrap( ) ;
914
+ enqueued_events. fetch_add( 1 , Ordering :: SeqCst ) ;
915
+ delayed_enqueue = true ;
916
+ }
917
+ e = event_queue. next_event_async( ) => {
918
+ assert_eq!( e, expected_event) ;
919
+ event_queue. event_handled( ) . unwrap( ) ;
920
+ received_events. fetch_add( 1 , Ordering :: SeqCst ) ;
921
+
922
+ event_queue. add_event( expected_event. clone( ) ) . unwrap( ) ;
923
+ enqueued_events. fetch_add( 1 , Ordering :: SeqCst ) ;
924
+ }
925
+ e = event_queue. next_event_async( ) => {
926
+ assert_eq!( e, expected_event) ;
927
+ event_queue. event_handled( ) . unwrap( ) ;
928
+ received_events. fetch_add( 1 , Ordering :: SeqCst ) ;
929
+ }
930
+ }
931
+
932
+ if delayed_enqueue
933
+ && received_events. load ( Ordering :: SeqCst ) == enqueued_events. load ( Ordering :: SeqCst )
934
+ {
935
+ break ;
936
+ }
937
+ }
938
+ assert_eq ! ( event_queue. next_event( ) , None ) ;
939
+
940
+ // Check we operate correctly, even when mixing and matching blocking and async API calls.
941
+ let ( tx, mut rx) = tokio:: sync:: watch:: channel ( ( ) ) ;
942
+ let thread_queue = Arc :: clone ( & event_queue) ;
943
+ let thread_event = expected_event. clone ( ) ;
944
+ std:: thread:: spawn ( move || {
945
+ let e = thread_queue. wait_next_event ( ) ;
946
+ assert_eq ! ( e, thread_event) ;
947
+ thread_queue. event_handled ( ) . unwrap ( ) ;
948
+ tx. send ( ( ) ) . unwrap ( ) ;
949
+ } ) ;
950
+
951
+ let thread_queue = Arc :: clone ( & event_queue) ;
952
+ let thread_event = expected_event. clone ( ) ;
953
+ std:: thread:: spawn ( move || {
954
+ // Sleep a bit before we enqueue the events everybody is waiting for.
955
+ std:: thread:: sleep ( Duration :: from_millis ( 20 ) ) ;
956
+ thread_queue. add_event ( thread_event. clone ( ) ) . unwrap ( ) ;
957
+ thread_queue. add_event ( thread_event. clone ( ) ) . unwrap ( ) ;
958
+ } ) ;
959
+
960
+ let e = event_queue. next_event_async ( ) . await ;
961
+ assert_eq ! ( e, expected_event. clone( ) ) ;
962
+ event_queue. event_handled ( ) . unwrap ( ) ;
963
+
964
+ rx. changed ( ) . await . unwrap ( ) ;
965
+ assert_eq ! ( event_queue. next_event( ) , None ) ;
966
+ }
872
967
}
0 commit comments