@@ -420,6 +420,127 @@ async fn emits_inbound_connection_closed_if_channel_is_dropped() {
420
420
) ) ;
421
421
}
422
422
423
+ /// Send multiple requests concurrently.
424
+ #[ async_std:: test]
425
+ #[ cfg( feature = "cbor" ) ]
426
+ async fn concurrent_ping_protocol ( ) {
427
+ use std:: { collections:: HashMap , num:: NonZero } ;
428
+
429
+ use libp2p_core:: ConnectedPoint ;
430
+ use libp2p_swarm:: { dial_opts:: PeerCondition , DialError } ;
431
+
432
+ let protocols = iter:: once ( ( StreamProtocol :: new ( "/ping/1" ) , ProtocolSupport :: Full ) ) ;
433
+ let cfg = request_response:: Config :: default ( ) ;
434
+
435
+ let mut swarm1 = Swarm :: new_ephemeral ( |_| {
436
+ request_response:: cbor:: Behaviour :: < Ping , Pong > :: new ( protocols. clone ( ) , cfg. clone ( ) )
437
+ } ) ;
438
+ let peer1_id = * swarm1. local_peer_id ( ) ;
439
+ let mut swarm2 = Swarm :: new_ephemeral ( |_| {
440
+ request_response:: cbor:: Behaviour :: < Ping , Pong > :: new ( protocols, cfg)
441
+ } ) ;
442
+ let peer2_id = * swarm2. local_peer_id ( ) ;
443
+
444
+ let ( peer1_listen_addr, _) = swarm1. listen ( ) . with_memory_addr_external ( ) . await ;
445
+ swarm2. add_peer_address ( peer1_id, peer1_listen_addr) ;
446
+
447
+ let peer1 = async move {
448
+ loop {
449
+ match swarm1. next_swarm_event ( ) . await . try_into_behaviour_event ( ) {
450
+ Ok ( request_response:: Event :: Message {
451
+ peer,
452
+ message :
453
+ request_response:: Message :: Request {
454
+ request, channel, ..
455
+ } ,
456
+ ..
457
+ } ) => {
458
+ assert_eq ! ( & peer, & peer2_id) ;
459
+ swarm1
460
+ . behaviour_mut ( )
461
+ . send_response ( channel, Pong ( request. 0 ) )
462
+ . unwrap ( ) ;
463
+ }
464
+ Ok ( request_response:: Event :: ResponseSent { peer, .. } ) => {
465
+ assert_eq ! ( & peer, & peer2_id) ;
466
+ }
467
+ Ok ( e) => {
468
+ panic ! ( "Peer1: Unexpected event: {e:?}" )
469
+ }
470
+ Err ( ..) => { }
471
+ }
472
+ }
473
+ } ;
474
+
475
+ let peer2 = async {
476
+ let mut count = 0 ;
477
+ let num_pings: u8 = rand:: thread_rng ( ) . gen_range ( 1 ..100 ) ;
478
+ let mut expected_pongs = HashMap :: new ( ) ;
479
+ for i in 0 ..num_pings {
480
+ let ping_bytes = vec ! [ i] ;
481
+ let req_id = swarm2
482
+ . behaviour_mut ( )
483
+ . send_request ( & peer1_id, Ping ( ping_bytes. clone ( ) ) ) ;
484
+ assert ! ( swarm2. behaviour( ) . is_pending_outbound( & peer1_id, & req_id) ) ;
485
+ assert ! ( expected_pongs. insert( req_id, ping_bytes) . is_none( ) ) ;
486
+ }
487
+
488
+ let mut started_dialing = false ;
489
+ let mut is_connected = false ;
490
+
491
+ loop {
492
+ match swarm2. next_swarm_event ( ) . await {
493
+ SwarmEvent :: Behaviour ( request_response:: Event :: Message {
494
+ peer,
495
+ message :
496
+ request_response:: Message :: Response {
497
+ request_id,
498
+ response,
499
+ } ,
500
+ ..
501
+ } ) => {
502
+ count += 1 ;
503
+ let expected_pong = expected_pongs. remove ( & request_id) . unwrap ( ) ;
504
+ assert_eq ! ( response, Pong ( expected_pong) ) ;
505
+ assert_eq ! ( & peer, & peer1_id) ;
506
+ if count >= num_pings {
507
+ break ;
508
+ }
509
+ }
510
+ SwarmEvent :: Dialing { peer_id, .. } => {
511
+ assert_eq ! ( & peer_id. unwrap( ) , & peer1_id) ;
512
+ assert ! ( !started_dialing) ;
513
+ started_dialing = true ;
514
+ }
515
+ SwarmEvent :: ConnectionEstablished {
516
+ peer_id,
517
+ endpoint : ConnectedPoint :: Dialer { .. } ,
518
+ num_established,
519
+ ..
520
+ } => {
521
+ assert_eq ! ( & peer_id, & peer1_id) ;
522
+ assert_eq ! ( num_established, NonZero :: new( 1 ) . unwrap( ) ) ;
523
+ assert ! ( !is_connected) ;
524
+ is_connected = true ;
525
+ }
526
+ SwarmEvent :: OutgoingConnectionError { peer_id, error, .. } if started_dialing => {
527
+ assert_eq ! ( & peer_id. unwrap( ) , & peer1_id) ;
528
+ assert ! ( started_dialing) ;
529
+ assert ! ( matches!(
530
+ error,
531
+ DialError :: DialPeerConditionFalse ( PeerCondition :: DisconnectedAndNotDialing )
532
+ ) ) ;
533
+ }
534
+ e => panic ! ( "Peer2: Unexpected event: {e:?}" ) ,
535
+ }
536
+ }
537
+ assert_eq ! ( count, num_pings) ;
538
+ } ;
539
+
540
+ async_std:: task:: spawn ( Box :: pin ( peer1) ) ;
541
+ peer2. await ;
542
+ }
543
+
423
544
// Simple Ping-Pong Protocol
424
545
#[ derive( Debug , Clone , PartialEq , Eq , Serialize , Deserialize ) ]
425
546
struct Ping ( Vec < u8 > ) ;
0 commit comments