diff --git a/crates/apollo_consensus_manager/src/consensus_manager.rs b/crates/apollo_consensus_manager/src/consensus_manager.rs index 09a0437e465..e918cbc6079 100644 --- a/crates/apollo_consensus_manager/src/consensus_manager.rs +++ b/crates/apollo_consensus_manager/src/consensus_manager.rs @@ -106,6 +106,7 @@ impl ConsensusManager { .register_broadcast_topic::>( Topic::new(self.config.proposals_topic.clone()), self.config.broadcast_buffer_size, + self.config.broadcast_buffer_size, ) .expect("Failed to register broadcast topic"); @@ -113,6 +114,7 @@ impl ConsensusManager { .register_broadcast_topic::( Topic::new(self.config.votes_topic.clone()), self.config.broadcast_buffer_size, + self.config.broadcast_buffer_size, ) .expect("Failed to register broadcast topic"); diff --git a/crates/apollo_mempool_p2p/src/lib.rs b/crates/apollo_mempool_p2p/src/lib.rs index 1e27f04b2e6..50ab31a4506 100644 --- a/crates/apollo_mempool_p2p/src/lib.rs +++ b/crates/apollo_mempool_p2p/src/lib.rs @@ -62,6 +62,7 @@ pub fn create_p2p_propagator_and_runner( .register_broadcast_topic( Topic::new(MEMPOOL_TOPIC), mempool_p2p_config.network_buffer_size, + mempool_p2p_config.network_buffer_size, ) .expect("Failed to register broadcast topic"); let network_future = network_manager.run().instrument(info_span!("[Mempool network]")); diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/main.rs b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/main.rs index 8ca817367f9..849c67c933d 100644 --- a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/main.rs +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/main.rs @@ -200,7 +200,7 @@ async fn main() { info!("My PeerId: {peer_id}"); let network_channels = network_manager - .register_broadcast_topic::(TOPIC.clone(), args.buffer_size) + .register_broadcast_topic::(TOPIC.clone(), args.buffer_size, args.buffer_size) .unwrap(); let BroadcastTopicChannels { broadcasted_messages_receiver, broadcast_topic_client } = network_channels; diff --git a/crates/apollo_network/src/e2e_broadcast_test.rs b/crates/apollo_network/src/e2e_broadcast_test.rs index 941238499ec..833a05b6c31 100644 --- a/crates/apollo_network/src/e2e_broadcast_test.rs +++ b/crates/apollo_network/src/e2e_broadcast_test.rs @@ -96,14 +96,14 @@ async fn broadcast_subscriber_end_to_end_test() { create_network_manager(create_swarm(Some(bootstrap_peer_multiaddr)).await); let mut subscriber_channels1_1 = - network_manager1.register_broadcast_topic::(topic1.clone(), BUFFER_SIZE).unwrap(); + network_manager1.register_broadcast_topic::(topic1.clone(), BUFFER_SIZE, BUFFER_SIZE).unwrap(); let mut subscriber_channels1_2 = - network_manager1.register_broadcast_topic::(topic2.clone(), BUFFER_SIZE).unwrap(); + network_manager1.register_broadcast_topic::(topic2.clone(), BUFFER_SIZE, BUFFER_SIZE).unwrap(); let subscriber_channels2_1 = - network_manager2.register_broadcast_topic::(topic1.clone(), BUFFER_SIZE).unwrap(); + network_manager2.register_broadcast_topic::(topic1.clone(), BUFFER_SIZE, BUFFER_SIZE).unwrap(); let subscriber_channels2_2 = - network_manager2.register_broadcast_topic::(topic2.clone(), BUFFER_SIZE).unwrap(); + network_manager2.register_broadcast_topic::(topic2.clone(), BUFFER_SIZE, BUFFER_SIZE).unwrap(); tokio::select! { _ = network_manager1.run() => panic!("network manager ended"), diff --git a/crates/apollo_network/src/network_manager/mod.rs b/crates/apollo_network/src/network_manager/mod.rs index 65c06beb49c..568ff6af7a8 100644 --- a/crates/apollo_network/src/network_manager/mod.rs +++ b/crates/apollo_network/src/network_manager/mod.rs @@ -204,7 +204,8 @@ impl GenericNetworkManager { pub fn register_broadcast_topic( &mut self, topic: Topic, - buffer_size: usize, + sending_buffer_size: usize, + receiving_buffer_size: usize, ) -> Result, SubscriptionError> where T: TryFrom + 'static, @@ -215,9 +216,9 @@ impl GenericNetworkManager { let topic_hash = topic.hash(); let (messages_to_broadcast_sender, messages_to_broadcast_receiver) = - futures::channel::mpsc::channel(buffer_size); + futures::channel::mpsc::channel(sending_buffer_size); let (broadcasted_messages_sender, broadcasted_messages_receiver) = - futures::channel::mpsc::channel(buffer_size); + futures::channel::mpsc::channel(receiving_buffer_size); let insert_result = self .messages_to_broadcast_receivers diff --git a/crates/apollo_network/src/network_manager/test.rs b/crates/apollo_network/src/network_manager/test.rs index 75930ca5bef..46a0795c1a2 100644 --- a/crates/apollo_network/src/network_manager/test.rs +++ b/crates/apollo_network/src/network_manager/test.rs @@ -338,7 +338,7 @@ async fn broadcast_message() { ); let mut broadcast_topic_client = network_manager - .register_broadcast_topic(topic.clone(), BUFFER_SIZE) + .register_broadcast_topic(topic.clone(), BUFFER_SIZE, BUFFER_SIZE) .unwrap() .broadcast_topic_client; broadcast_topic_client.broadcast_message(message.clone()).await.unwrap(); @@ -383,7 +383,7 @@ async fn receive_broadcasted_message_and_report_it() { mut broadcast_topic_client, mut broadcasted_messages_receiver, .. - } = network_manager.register_broadcast_topic::(topic.clone(), BUFFER_SIZE).unwrap(); + } = network_manager.register_broadcast_topic::(topic.clone(), BUFFER_SIZE, BUFFER_SIZE).unwrap(); tokio::select! { _ = network_manager.run() => panic!("network manager ended"), diff --git a/crates/apollo_network/src/network_manager/test_utils.rs b/crates/apollo_network/src/network_manager/test_utils.rs index 358c54ef7b6..fd5132fc3b6 100644 --- a/crates/apollo_network/src/network_manager/test_utils.rs +++ b/crates/apollo_network/src/network_manager/test_utils.rs @@ -202,7 +202,7 @@ where let mut network_manager = NetworkManager::new(network_config, None, None); let broadcast_channels = - network_manager.register_broadcast_topic(topic.clone(), BUFFER_SIZE).unwrap(); + network_manager.register_broadcast_topic(topic.clone(), BUFFER_SIZE, BUFFER_SIZE).unwrap(); tokio::task::spawn(async move { let result = network_manager.run().await;