@@ -3,28 +3,32 @@ use std::time::Duration;
33use futures:: { FutureExt , StreamExt } ;
44use libp2p:: core:: multiaddr:: Protocol ;
55use libp2p:: swarm:: SwarmEvent ;
6- use libp2p:: { Multiaddr , Swarm } ;
6+ use libp2p:: { Multiaddr , PeerId , Swarm } ;
77use libp2p_swarm_test:: SwarmExt ;
88use starknet_api:: core:: ChainId ;
99
1010use crate :: discovery:: DiscoveryConfig ;
1111use crate :: gossipsub_impl:: Topic ;
1212use crate :: mixed_behaviour:: MixedBehaviour ;
13- use crate :: network_manager:: { BroadcastTopicClientTrait , GenericNetworkManager } ;
13+ use crate :: network_manager:: {
14+ BroadcastTopicClientTrait ,
15+ GenericNetworkManager ,
16+ PropellerClientTrait ,
17+ } ;
1418use crate :: peer_manager:: PeerManagerConfig ;
1519use crate :: { sqmr, Bytes } ;
1620
1721const TIMEOUT : Duration = Duration :: from_secs ( 5 ) ;
1822
19- async fn create_swarm ( bootstrap_peer_multiaddr : Option < Multiaddr > ) -> Swarm < MixedBehaviour > {
23+ async fn create_swarm ( bootstrap_peer_multiaddr : Option < Vec < Multiaddr > > ) -> Swarm < MixedBehaviour > {
2024 let mut swarm = Swarm :: new_ephemeral_tokio ( |keypair| {
2125 MixedBehaviour :: new (
2226 sqmr:: Config :: default ( ) ,
2327 DiscoveryConfig :: default ( ) ,
2428 PeerManagerConfig :: default ( ) ,
2529 None ,
2630 keypair. clone ( ) ,
27- bootstrap_peer_multiaddr. map ( |multiaddr| vec ! [ multiaddr ] ) ,
31+ bootstrap_peer_multiaddr,
2832 ChainId :: Mainnet ,
2933 None ,
3034 )
@@ -60,6 +64,28 @@ fn create_network_manager(
6064 )
6165}
6266
67+ async fn create_network_managers (
68+ num : usize ,
69+ ) -> Vec < ( PeerId , GenericNetworkManager < Swarm < MixedBehaviour > > ) > {
70+ let mut bootstrap_addresses = vec ! [ ] ;
71+ let mut network_managers = vec ! [ ] ;
72+ for _ in 0 ..num {
73+ let swarm = create_swarm ( if bootstrap_addresses. is_empty ( ) {
74+ None
75+ } else {
76+ Some ( bootstrap_addresses. clone ( ) )
77+ } )
78+ . await ;
79+ let local_peer_id = swarm. local_peer_id ( ) ;
80+ let address = swarm. external_addresses ( ) . next ( ) . unwrap ( ) . clone ( ) ;
81+ // Add peer ID to the multiaddr for bootstrap addresses
82+ let bootstrap_address = address. with_p2p ( * local_peer_id) . unwrap ( ) ;
83+ bootstrap_addresses. push ( bootstrap_address) ;
84+ network_managers. push ( ( * local_peer_id, create_network_manager ( swarm) ) ) ;
85+ }
86+ network_managers
87+ }
88+
6389const BUFFER_SIZE : usize = 100 ;
6490
6591#[ derive( Debug , Clone , Copy , PartialEq , Eq ) ]
@@ -92,9 +118,9 @@ async fn broadcast_subscriber_end_to_end_test() {
92118 bootstrap_peer_multiaddr. with_p2p ( * bootstrap_swarm. local_peer_id ( ) ) . unwrap ( ) ;
93119 let bootstrap_network_manager = create_network_manager ( bootstrap_swarm) ;
94120 let mut network_manager1 =
95- create_network_manager ( create_swarm ( Some ( bootstrap_peer_multiaddr. clone ( ) ) ) . await ) ;
121+ create_network_manager ( create_swarm ( Some ( vec ! [ bootstrap_peer_multiaddr. clone( ) ] ) ) . await ) ;
96122 let mut network_manager2 =
97- create_network_manager ( create_swarm ( Some ( bootstrap_peer_multiaddr) ) . await ) ;
123+ create_network_manager ( create_swarm ( Some ( vec ! [ bootstrap_peer_multiaddr] ) ) . await ) ;
98124
99125 let mut subscriber_channels1_1 =
100126 network_manager1. register_broadcast_topic :: < Number > ( topic1. clone ( ) , BUFFER_SIZE ) . unwrap ( ) ;
@@ -137,3 +163,47 @@ async fn broadcast_subscriber_end_to_end_test() {
137163 }
138164 }
139165}
166+
167+ #[ tokio:: test]
168+ async fn propeller_message_forwarding_test ( ) {
169+ // This test verifies that propeller messages are properly forwarded through channels
170+ // when received by the network manager, similar to how gossipsub messages work.
171+
172+ let mut network_managers = create_network_managers ( 2 ) . await ;
173+ let ( peer_1, mut nm_1) = network_managers. remove ( 0 ) ;
174+ let ( peer_2, mut nm_2) = network_managers. remove ( 0 ) ;
175+
176+ let peers = vec ! [ ( peer_1, 1000 ) , ( peer_2, 500 ) ] ;
177+
178+ let mut channels_1 =
179+ nm_1. register_propeller_channels :: < Vec < u8 > > ( BUFFER_SIZE , peers. clone ( ) ) . unwrap ( ) ;
180+ let mut channels_2 = nm_2. register_propeller_channels :: < Vec < u8 > > ( BUFFER_SIZE , peers) . unwrap ( ) ;
181+
182+ let message = vec ! [ 123 ; 64 ] ; // must be a multiple of 64
183+ let message_id = 1 ;
184+
185+ tokio:: select! {
186+ _ = nm_1. run( ) => panic!( "network manager ended" ) ,
187+ _ = nm_2. run( ) => panic!( "network manager ended" ) ,
188+ result = tokio:: time:: timeout(
189+ TIMEOUT , async move {
190+ tokio:: time:: sleep( Duration :: from_secs( 1 ) ) . await ;
191+
192+ println!( "Sending message" ) ;
193+ channels_1. propeller_client. send_message( message. clone( ) , message_id) . await . unwrap( ) ;
194+
195+ println!( "Receiving message" ) ;
196+ let ( received_message_id, received_message) =
197+ channels_2. propeller_messages_receiver. next( ) . await . unwrap( ) ;
198+
199+ assert_eq!( received_message. unwrap( ) , message) ;
200+ assert_eq!( received_message_id, message_id) ;
201+
202+ assert!( channels_1. propeller_messages_receiver. next( ) . now_or_never( ) . is_none( ) ) ;
203+ assert!( channels_2. propeller_messages_receiver. next( ) . now_or_never( ) . is_none( ) ) ;
204+ }
205+ ) => {
206+ result. unwrap( )
207+ }
208+ }
209+ }
0 commit comments