@@ -28,6 +28,7 @@ use async_std::net::Ipv4Addr;
2828use byteorder:: { BigEndian , ByteOrder } ;
2929use libp2p_core:: ConnectedPoint ;
3030use rand:: Rng ;
31+ use std:: future;
3132use std:: thread:: sleep;
3233
3334#[ derive( Default , Debug ) ]
@@ -5237,3 +5238,207 @@ fn test_graft_without_subscribe() {
52375238 // We unsubscribe from the topic.
52385239 let _ = gs. unsubscribe ( & Topic :: new ( topic) ) ;
52395240}
5241+
5242+ #[ test]
5243+ fn test_all_queues_full ( ) {
5244+ let gs_config = ConfigBuilder :: default ( )
5245+ . validation_mode ( ValidationMode :: Permissive )
5246+ . build ( )
5247+ . unwrap ( ) ;
5248+
5249+ let mut gs: Behaviour = Behaviour :: new ( MessageAuthenticity :: RandomAuthor , gs_config) . unwrap ( ) ;
5250+
5251+ let topic_hash = Topic :: new ( "Test" ) . hash ( ) ;
5252+ let mut peers = vec ! [ ] ;
5253+ let mut topics = BTreeSet :: new ( ) ;
5254+ topics. insert ( topic_hash. clone ( ) ) ;
5255+
5256+ let peer_id = PeerId :: random ( ) ;
5257+ peers. push ( peer_id) ;
5258+ gs. connected_peers . insert (
5259+ peer_id,
5260+ PeerConnections {
5261+ kind : PeerKind :: Gossipsubv1_1 ,
5262+ connections : vec ! [ ConnectionId :: new_unchecked( 0 ) ] ,
5263+ topics : topics. clone ( ) ,
5264+ sender : RpcSender :: new ( 2 ) ,
5265+ } ,
5266+ ) ;
5267+
5268+ let publish_data = vec ! [ 0 ; 42 ] ;
5269+ gs. publish ( topic_hash. clone ( ) , publish_data. clone ( ) )
5270+ . unwrap ( ) ;
5271+ let publish_data = vec ! [ 2 ; 59 ] ;
5272+ let err = gs. publish ( topic_hash, publish_data) . unwrap_err ( ) ;
5273+ assert ! ( matches!( err, PublishError :: AllQueuesFull ( f) if f == 1 ) ) ;
5274+ }
5275+
5276+ #[ test]
5277+ fn test_slow_peer_returns_swarm_event ( ) {
5278+ let gs_config = ConfigBuilder :: default ( )
5279+ . validation_mode ( ValidationMode :: Permissive )
5280+ . build ( )
5281+ . unwrap ( ) ;
5282+
5283+ let mut gs: Behaviour = Behaviour :: new ( MessageAuthenticity :: RandomAuthor , gs_config) . unwrap ( ) ;
5284+
5285+ let topic_hash = Topic :: new ( "Test" ) . hash ( ) ;
5286+ let mut peers = vec ! [ ] ;
5287+ let mut topics = BTreeSet :: new ( ) ;
5288+ topics. insert ( topic_hash. clone ( ) ) ;
5289+
5290+ let slow_peer_id = PeerId :: random ( ) ;
5291+ peers. push ( slow_peer_id) ;
5292+ gs. connected_peers . insert (
5293+ slow_peer_id,
5294+ PeerConnections {
5295+ kind : PeerKind :: Gossipsubv1_1 ,
5296+ connections : vec ! [ ConnectionId :: new_unchecked( 0 ) ] ,
5297+ topics : topics. clone ( ) ,
5298+ sender : RpcSender :: new ( 2 ) ,
5299+ } ,
5300+ ) ;
5301+ let peer_id = PeerId :: random ( ) ;
5302+ peers. push ( peer_id) ;
5303+ gs. connected_peers . insert (
5304+ peer_id,
5305+ PeerConnections {
5306+ kind : PeerKind :: Gossipsubv1_1 ,
5307+ connections : vec ! [ ConnectionId :: new_unchecked( 0 ) ] ,
5308+ topics : topics. clone ( ) ,
5309+ sender : RpcSender :: new ( gs. config . connection_handler_queue_len ( ) ) ,
5310+ } ,
5311+ ) ;
5312+
5313+ let publish_data = vec ! [ 0 ; 42 ] ;
5314+ gs. publish ( topic_hash. clone ( ) , publish_data. clone ( ) )
5315+ . unwrap ( ) ;
5316+ let publish_data = vec ! [ 2 ; 59 ] ;
5317+ gs. publish ( topic_hash. clone ( ) , publish_data) . unwrap ( ) ;
5318+ gs. heartbeat ( ) ;
5319+
5320+ // Forward message.
5321+ let publish_data = vec ! [ 2 ; 59 ] ;
5322+ let transformed = gs
5323+ . data_transform
5324+ . outbound_transform ( & topic_hash, publish_data. clone ( ) )
5325+ . unwrap ( ) ;
5326+ let raw_message = gs. build_raw_message ( topic_hash, transformed) . unwrap ( ) ;
5327+ let msg_id = gs. config . message_id ( & Message {
5328+ source : raw_message. source ,
5329+ data : publish_data,
5330+ sequence_number : raw_message. sequence_number ,
5331+ topic : raw_message. topic . clone ( ) ,
5332+ } ) ;
5333+
5334+ gs. forward_msg ( & msg_id, raw_message, None , HashSet :: new ( ) ) ;
5335+ gs. heartbeat ( ) ;
5336+
5337+ let slow_peer_failed_messages = match gs. events . pop_front ( ) . unwrap ( ) {
5338+ ToSwarm :: GenerateEvent ( Event :: SlowPeer {
5339+ peer_id,
5340+ failed_messages,
5341+ } ) if peer_id == slow_peer_id => failed_messages,
5342+ _ => panic ! ( "invalid event" ) ,
5343+ } ;
5344+
5345+ let failed_messages = FailedMessages {
5346+ publish : 1 ,
5347+ forward : 1 ,
5348+ priority : 1 ,
5349+ non_priority : 1 ,
5350+ } ;
5351+
5352+ assert_eq ! ( slow_peer_failed_messages. priority, failed_messages. priority) ;
5353+ assert_eq ! (
5354+ slow_peer_failed_messages. non_priority,
5355+ failed_messages. non_priority
5356+ ) ;
5357+ assert_eq ! ( slow_peer_failed_messages. publish, failed_messages. publish) ;
5358+ assert_eq ! ( slow_peer_failed_messages. forward, failed_messages. forward) ;
5359+ }
5360+
5361+ #[ test]
5362+ fn test_slow_peer_is_downscored_on_publish ( ) {
5363+ let gs_config = ConfigBuilder :: default ( )
5364+ . validation_mode ( ValidationMode :: Permissive )
5365+ . build ( )
5366+ . unwrap ( ) ;
5367+
5368+ let mut gs: Behaviour = Behaviour :: new ( MessageAuthenticity :: RandomAuthor , gs_config) . unwrap ( ) ;
5369+ let slow_peer_params = PeerScoreParams :: default ( ) ;
5370+ gs. with_peer_score ( slow_peer_params. clone ( ) , PeerScoreThresholds :: default ( ) )
5371+ . unwrap ( ) ;
5372+
5373+ let topic_hash = Topic :: new ( "Test" ) . hash ( ) ;
5374+ let mut peers = vec ! [ ] ;
5375+ let mut topics = BTreeSet :: new ( ) ;
5376+ topics. insert ( topic_hash. clone ( ) ) ;
5377+
5378+ let slow_peer_id = PeerId :: random ( ) ;
5379+ peers. push ( slow_peer_id) ;
5380+ let mesh = gs. mesh . entry ( topic_hash. clone ( ) ) . or_default ( ) ;
5381+ mesh. insert ( slow_peer_id) ;
5382+ gs. connected_peers . insert (
5383+ slow_peer_id,
5384+ PeerConnections {
5385+ kind : PeerKind :: Gossipsubv1_1 ,
5386+ connections : vec ! [ ConnectionId :: new_unchecked( 0 ) ] ,
5387+ topics : topics. clone ( ) ,
5388+ sender : RpcSender :: new ( 2 ) ,
5389+ } ,
5390+ ) ;
5391+ gs. peer_score . as_mut ( ) . unwrap ( ) . 0 . add_peer ( slow_peer_id) ;
5392+ let peer_id = PeerId :: random ( ) ;
5393+ peers. push ( peer_id) ;
5394+ gs. connected_peers . insert (
5395+ peer_id,
5396+ PeerConnections {
5397+ kind : PeerKind :: Gossipsubv1_1 ,
5398+ connections : vec ! [ ConnectionId :: new_unchecked( 0 ) ] ,
5399+ topics : topics. clone ( ) ,
5400+ sender : RpcSender :: new ( gs. config . connection_handler_queue_len ( ) ) ,
5401+ } ,
5402+ ) ;
5403+
5404+ let publish_data = vec ! [ 0 ; 42 ] ;
5405+ gs. publish ( topic_hash. clone ( ) , publish_data. clone ( ) )
5406+ . unwrap ( ) ;
5407+ let publish_data = vec ! [ 2 ; 59 ] ;
5408+ gs. publish ( topic_hash. clone ( ) , publish_data) . unwrap ( ) ;
5409+ gs. heartbeat ( ) ;
5410+ let slow_peer_score = gs. peer_score ( & slow_peer_id) . unwrap ( ) ;
5411+ assert_eq ! ( slow_peer_score, slow_peer_params. slow_peer_weight) ;
5412+ }
5413+
5414+ #[ async_std:: test]
5415+ async fn test_timedout_messages_are_reported ( ) {
5416+ let gs_config = ConfigBuilder :: default ( )
5417+ . validation_mode ( ValidationMode :: Permissive )
5418+ . build ( )
5419+ . unwrap ( ) ;
5420+
5421+ let mut gs: Behaviour = Behaviour :: new ( MessageAuthenticity :: RandomAuthor , gs_config) . unwrap ( ) ;
5422+
5423+ let mut sender = RpcSender :: new ( 2 ) ;
5424+ let topic_hash = Topic :: new ( "Test" ) . hash ( ) ;
5425+ let publish_data = vec ! [ 2 ; 59 ] ;
5426+ let raw_message = gs. build_raw_message ( topic_hash, publish_data) . unwrap ( ) ;
5427+
5428+ sender
5429+ . publish ( raw_message, Duration :: from_nanos ( 1 ) , None )
5430+ . unwrap ( ) ;
5431+ let mut receiver = sender. new_receiver ( ) ;
5432+ let stale = future:: poll_fn ( |cx| receiver. poll_stale ( cx) ) . await . unwrap ( ) ;
5433+ assert ! ( matches!( stale, RpcOut :: Publish { .. } ) ) ;
5434+ }
5435+
5436+ #[ test]
5437+ fn test_priority_messages_are_always_sent ( ) {
5438+ let mut sender = RpcSender :: new ( 2 ) ;
5439+ let topic_hash = Topic :: new ( "Test" ) . hash ( ) ;
5440+ // Fill the buffer with the first message.
5441+ sender. subscribe ( topic_hash. clone ( ) ) ;
5442+ sender. subscribe ( topic_hash. clone ( ) ) ;
5443+ sender. unsubscribe ( topic_hash. clone ( ) ) ;
5444+ }
0 commit comments