@@ -1419,12 +1419,100 @@ mod tests {
14191419 assert_eq ! ( network_graph. read_only( ) . channels( ) . len( ) , 0 ) ;
14201420 }
14211421
1422+ macro_rules! do_test_payment_path_scoring {
1423+ ( $nodes: expr, $receive: expr) => {
1424+ // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1425+ // that we update the scorer upon a payment path succeeding (note that the channel must be
1426+ // public or else we won't score it).
1427+ // A background event handler for FundingGenerationReady events must be hooked up to a
1428+ // running background processor.
1429+ let scored_scid = 4242 ;
1430+ let secp_ctx = Secp256k1 :: new( ) ;
1431+ let node_1_privkey = SecretKey :: from_slice( & [ 42 ; 32 ] ) . unwrap( ) ;
1432+ let node_1_id = PublicKey :: from_secret_key( & secp_ctx, & node_1_privkey) ;
1433+
1434+ let path = vec![ RouteHop {
1435+ pubkey: node_1_id,
1436+ node_features: NodeFeatures :: empty( ) ,
1437+ short_channel_id: scored_scid,
1438+ channel_features: ChannelFeatures :: empty( ) ,
1439+ fee_msat: 0 ,
1440+ cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32 ,
1441+ } ] ;
1442+
1443+ $nodes[ 0 ] . scorer. lock( ) . unwrap( ) . expect( TestResult :: PaymentFailure { path: path. clone( ) , short_channel_id: scored_scid } ) ;
1444+ $nodes[ 0 ] . node. push_pending_event( Event :: PaymentPathFailed {
1445+ payment_id: None ,
1446+ payment_hash: PaymentHash ( [ 42 ; 32 ] ) ,
1447+ payment_failed_permanently: false ,
1448+ failure: PathFailure :: OnPath { network_update: None } ,
1449+ path: path. clone( ) ,
1450+ short_channel_id: Some ( scored_scid) ,
1451+ } ) ;
1452+ let event = $receive. expect( "PaymentPathFailed not handled within deadline" ) ;
1453+ match event {
1454+ Event :: PaymentPathFailed { .. } => { } ,
1455+ _ => panic!( "Unexpected event" ) ,
1456+ }
1457+
1458+ // Ensure we'll score payments that were explicitly failed back by the destination as
1459+ // ProbeSuccess.
1460+ $nodes[ 0 ] . scorer. lock( ) . unwrap( ) . expect( TestResult :: ProbeSuccess { path: path. clone( ) } ) ;
1461+ $nodes[ 0 ] . node. push_pending_event( Event :: PaymentPathFailed {
1462+ payment_id: None ,
1463+ payment_hash: PaymentHash ( [ 42 ; 32 ] ) ,
1464+ payment_failed_permanently: true ,
1465+ failure: PathFailure :: OnPath { network_update: None } ,
1466+ path: path. clone( ) ,
1467+ short_channel_id: None ,
1468+ } ) ;
1469+ let event = $receive. expect( "PaymentPathFailed not handled within deadline" ) ;
1470+ match event {
1471+ Event :: PaymentPathFailed { .. } => { } ,
1472+ _ => panic!( "Unexpected event" ) ,
1473+ }
1474+
1475+ $nodes[ 0 ] . scorer. lock( ) . unwrap( ) . expect( TestResult :: PaymentSuccess { path: path. clone( ) } ) ;
1476+ $nodes[ 0 ] . node. push_pending_event( Event :: PaymentPathSuccessful {
1477+ payment_id: PaymentId ( [ 42 ; 32 ] ) ,
1478+ payment_hash: None ,
1479+ path: path. clone( ) ,
1480+ } ) ;
1481+ let event = $receive. expect( "PaymentPathSuccessful not handled within deadline" ) ;
1482+ match event {
1483+ Event :: PaymentPathSuccessful { .. } => { } ,
1484+ _ => panic!( "Unexpected event" ) ,
1485+ }
1486+
1487+ $nodes[ 0 ] . scorer. lock( ) . unwrap( ) . expect( TestResult :: ProbeSuccess { path: path. clone( ) } ) ;
1488+ $nodes[ 0 ] . node. push_pending_event( Event :: ProbeSuccessful {
1489+ payment_id: PaymentId ( [ 42 ; 32 ] ) ,
1490+ payment_hash: PaymentHash ( [ 42 ; 32 ] ) ,
1491+ path: path. clone( ) ,
1492+ } ) ;
1493+ let event = $receive. expect( "ProbeSuccessful not handled within deadline" ) ;
1494+ match event {
1495+ Event :: ProbeSuccessful { .. } => { } ,
1496+ _ => panic!( "Unexpected event" ) ,
1497+ }
1498+
1499+ $nodes[ 0 ] . scorer. lock( ) . unwrap( ) . expect( TestResult :: ProbeFailure { path: path. clone( ) } ) ;
1500+ $nodes[ 0 ] . node. push_pending_event( Event :: ProbeFailed {
1501+ payment_id: PaymentId ( [ 42 ; 32 ] ) ,
1502+ payment_hash: PaymentHash ( [ 42 ; 32 ] ) ,
1503+ path,
1504+ short_channel_id: Some ( scored_scid) ,
1505+ } ) ;
1506+ let event = $receive. expect( "ProbeFailure not handled within deadline" ) ;
1507+ match event {
1508+ Event :: ProbeFailed { .. } => { } ,
1509+ _ => panic!( "Unexpected event" ) ,
1510+ }
1511+ }
1512+ }
1513+
14221514 #[ test]
14231515 fn test_payment_path_scoring ( ) {
1424- // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1425- // that we update the scorer upon a payment path succeeding (note that the channel must be
1426- // public or else we won't score it).
1427- // Set up a background event handler for FundingGenerationReady events.
14281516 let ( sender, receiver) = std:: sync:: mpsc:: sync_channel ( 1 ) ;
14291517 let event_handler = move |event : Event | match event {
14301518 Event :: PaymentPathFailed { .. } => sender. send ( event) . unwrap ( ) ,
@@ -1439,101 +1527,56 @@ mod tests {
14391527 let persister = Arc :: new ( Persister :: new ( data_dir) ) ;
14401528 let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . no_gossip_sync ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) , Some ( nodes[ 0 ] . scorer . clone ( ) ) ) ;
14411529
1442- let scored_scid = 4242 ;
1443- let secp_ctx = Secp256k1 :: new ( ) ;
1444- let node_1_privkey = SecretKey :: from_slice ( & [ 42 ; 32 ] ) . unwrap ( ) ;
1445- let node_1_id = PublicKey :: from_secret_key ( & secp_ctx, & node_1_privkey) ;
1446-
1447- let path = vec ! [ RouteHop {
1448- pubkey: node_1_id,
1449- node_features: NodeFeatures :: empty( ) ,
1450- short_channel_id: scored_scid,
1451- channel_features: ChannelFeatures :: empty( ) ,
1452- fee_msat: 0 ,
1453- cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32 ,
1454- } ] ;
1455-
1456- nodes[ 0 ] . scorer . lock ( ) . unwrap ( ) . expect ( TestResult :: PaymentFailure { path : path. clone ( ) , short_channel_id : scored_scid } ) ;
1457- nodes[ 0 ] . node . push_pending_event ( Event :: PaymentPathFailed {
1458- payment_id : None ,
1459- payment_hash : PaymentHash ( [ 42 ; 32 ] ) ,
1460- payment_failed_permanently : false ,
1461- failure : PathFailure :: OnPath { network_update : None } ,
1462- path : path. clone ( ) ,
1463- short_channel_id : Some ( scored_scid) ,
1464- } ) ;
1465- let event = receiver
1466- . recv_timeout ( Duration :: from_secs ( EVENT_DEADLINE ) )
1467- . expect ( "PaymentPathFailed not handled within deadline" ) ;
1468- match event {
1469- Event :: PaymentPathFailed { .. } => { } ,
1470- _ => panic ! ( "Unexpected event" ) ,
1471- }
1530+ do_test_payment_path_scoring ! ( nodes, receiver. recv_timeout( Duration :: from_secs( EVENT_DEADLINE ) ) ) ;
14721531
1473- // Ensure we'll score payments that were explicitly failed back by the destination as
1474- // ProbeSuccess.
1475- nodes[ 0 ] . scorer . lock ( ) . unwrap ( ) . expect ( TestResult :: ProbeSuccess { path : path. clone ( ) } ) ;
1476- nodes[ 0 ] . node . push_pending_event ( Event :: PaymentPathFailed {
1477- payment_id : None ,
1478- payment_hash : PaymentHash ( [ 42 ; 32 ] ) ,
1479- payment_failed_permanently : true ,
1480- failure : PathFailure :: OnPath { network_update : None } ,
1481- path : path. clone ( ) ,
1482- short_channel_id : None ,
1483- } ) ;
1484- let event = receiver
1485- . recv_timeout ( Duration :: from_secs ( EVENT_DEADLINE ) )
1486- . expect ( "PaymentPathFailed not handled within deadline" ) ;
1487- match event {
1488- Event :: PaymentPathFailed { .. } => { } ,
1489- _ => panic ! ( "Unexpected event" ) ,
1532+ if !std:: thread:: panicking ( ) {
1533+ bg_processor. stop ( ) . unwrap ( ) ;
14901534 }
1535+ }
14911536
1492- nodes[ 0 ] . scorer . lock ( ) . unwrap ( ) . expect ( TestResult :: PaymentSuccess { path : path. clone ( ) } ) ;
1493- nodes[ 0 ] . node . push_pending_event ( Event :: PaymentPathSuccessful {
1494- payment_id : PaymentId ( [ 42 ; 32 ] ) ,
1495- payment_hash : None ,
1496- path : path. clone ( ) ,
1497- } ) ;
1498- let event = receiver
1499- . recv_timeout ( Duration :: from_secs ( EVENT_DEADLINE ) )
1500- . expect ( "PaymentPathSuccessful not handled within deadline" ) ;
1501- match event {
1502- Event :: PaymentPathSuccessful { .. } => { } ,
1503- _ => panic ! ( "Unexpected event" ) ,
1504- }
1537+ #[ tokio:: test]
1538+ #[ cfg( feature = "futures" ) ]
1539+ async fn test_payment_path_scoring_async ( ) {
1540+ let ( sender, mut receiver) = tokio:: sync:: mpsc:: channel ( 1 ) ;
1541+ let event_handler = move |event : Event | {
1542+ let sender_ref = sender. clone ( ) ;
1543+ async move {
1544+ match event {
1545+ Event :: PaymentPathFailed { .. } => { sender_ref. send ( event) . await . unwrap ( ) } ,
1546+ Event :: PaymentPathSuccessful { .. } => { sender_ref. send ( event) . await . unwrap ( ) } ,
1547+ Event :: ProbeSuccessful { .. } => { sender_ref. send ( event) . await . unwrap ( ) } ,
1548+ Event :: ProbeFailed { .. } => { sender_ref. send ( event) . await . unwrap ( ) } ,
1549+ _ => panic ! ( "Unexpected event: {:?}" , event) ,
1550+ }
1551+ }
1552+ } ;
15051553
1506- nodes[ 0 ] . scorer . lock ( ) . unwrap ( ) . expect ( TestResult :: ProbeSuccess { path : path. clone ( ) } ) ;
1507- nodes[ 0 ] . node . push_pending_event ( Event :: ProbeSuccessful {
1508- payment_id : PaymentId ( [ 42 ; 32 ] ) ,
1509- payment_hash : PaymentHash ( [ 42 ; 32 ] ) ,
1510- path : path. clone ( ) ,
1511- } ) ;
1512- let event = receiver
1513- . recv_timeout ( Duration :: from_secs ( EVENT_DEADLINE ) )
1514- . expect ( "ProbeSuccessful not handled within deadline" ) ;
1515- match event {
1516- Event :: ProbeSuccessful { .. } => { } ,
1517- _ => panic ! ( "Unexpected event" ) ,
1518- }
1554+ let nodes = create_nodes ( 1 , "test_payment_path_scoring_async" . to_string ( ) ) ;
1555+ let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
1556+ let persister = Arc :: new ( Persister :: new ( data_dir) ) ;
15191557
1520- nodes[ 0 ] . scorer . lock ( ) . unwrap ( ) . expect ( TestResult :: ProbeFailure { path : path. clone ( ) } ) ;
1521- nodes[ 0 ] . node . push_pending_event ( Event :: ProbeFailed {
1522- payment_id : PaymentId ( [ 42 ; 32 ] ) ,
1523- payment_hash : PaymentHash ( [ 42 ; 32 ] ) ,
1524- path,
1525- short_channel_id : Some ( scored_scid) ,
1558+ let ( exit_sender, exit_receiver) = tokio:: sync:: watch:: channel ( ( ) ) ;
1559+
1560+ let bp_future = super :: process_events_async (
1561+ persister, event_handler, nodes[ 0 ] . chain_monitor . clone ( ) , nodes[ 0 ] . node . clone ( ) ,
1562+ nodes[ 0 ] . no_gossip_sync ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ,
1563+ Some ( nodes[ 0 ] . scorer . clone ( ) ) , move |dur : Duration | {
1564+ let mut exit_receiver = exit_receiver. clone ( ) ;
1565+ Box :: pin ( async move {
1566+ tokio:: select! {
1567+ _ = tokio:: time:: sleep( dur) => false ,
1568+ _ = exit_receiver. changed( ) => true ,
1569+ }
1570+ } )
1571+ } , false ,
1572+ ) ;
1573+ // TODO: Drop _local and simply spawn after #2003
1574+ let local_set = tokio:: task:: LocalSet :: new ( ) ;
1575+ local_set. spawn_local ( bp_future) ;
1576+ local_set. spawn_local ( async move {
1577+ do_test_payment_path_scoring ! ( nodes, receiver. recv( ) . await ) ;
1578+ exit_sender. send ( ( ) ) . unwrap ( ) ;
15261579 } ) ;
1527- let event = receiver
1528- . recv_timeout ( Duration :: from_secs ( EVENT_DEADLINE ) )
1529- . expect ( "ProbeFailure not handled within deadline" ) ;
1530- match event {
1531- Event :: ProbeFailed { .. } => { } ,
1532- _ => panic ! ( "Unexpected event" ) ,
1533- }
1534-
1535- if !std:: thread:: panicking ( ) {
1536- bg_processor. stop ( ) . unwrap ( ) ;
1537- }
1580+ local_set. await ;
15381581 }
15391582}
0 commit comments