Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/apollo_consensus_manager/src/consensus_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,15 @@ impl ConsensusManager {
.register_broadcast_topic::<StreamMessage<ProposalPart, HeightAndRound>>(
Topic::new(self.config.proposals_topic.clone()),
self.config.broadcast_buffer_size,
self.config.broadcast_buffer_size,
)
.expect("Failed to register broadcast topic");

let votes_broadcast_channels = network_manager
.register_broadcast_topic::<Vote>(
Topic::new(self.config.votes_topic.clone()),
self.config.broadcast_buffer_size,
self.config.broadcast_buffer_size,
)
.expect("Failed to register broadcast topic");

Expand Down
1 change: 1 addition & 0 deletions crates/apollo_mempool_p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub fn create_p2p_propagator_and_runner(
.register_broadcast_topic(
Topic::new(MEMPOOL_TOPIC),
mempool_p2p_config.network_buffer_size,
mempool_p2p_config.network_buffer_size,
)
.expect("Failed to register broadcast topic");
let network_future = network_manager.run().instrument(info_span!("[Mempool network]"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ async fn main() {
info!("My PeerId: {peer_id}");

let network_channels = network_manager
.register_broadcast_topic::<StressTestMessage>(TOPIC.clone(), args.buffer_size)
.register_broadcast_topic::<StressTestMessage>(TOPIC.clone(), args.buffer_size, args.buffer_size)
.unwrap();
let BroadcastTopicChannels { broadcasted_messages_receiver, broadcast_topic_client } =
network_channels;
Expand Down
8 changes: 4 additions & 4 deletions crates/apollo_network/src/e2e_broadcast_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ async fn broadcast_subscriber_end_to_end_test() {
create_network_manager(create_swarm(Some(bootstrap_peer_multiaddr)).await);

let mut subscriber_channels1_1 =
network_manager1.register_broadcast_topic::<Number>(topic1.clone(), BUFFER_SIZE).unwrap();
network_manager1.register_broadcast_topic::<Number>(topic1.clone(), BUFFER_SIZE, BUFFER_SIZE).unwrap();
let mut subscriber_channels1_2 =
network_manager1.register_broadcast_topic::<Number>(topic2.clone(), BUFFER_SIZE).unwrap();
network_manager1.register_broadcast_topic::<Number>(topic2.clone(), BUFFER_SIZE, BUFFER_SIZE).unwrap();

let subscriber_channels2_1 =
network_manager2.register_broadcast_topic::<Number>(topic1.clone(), BUFFER_SIZE).unwrap();
network_manager2.register_broadcast_topic::<Number>(topic1.clone(), BUFFER_SIZE, BUFFER_SIZE).unwrap();
let subscriber_channels2_2 =
network_manager2.register_broadcast_topic::<Number>(topic2.clone(), BUFFER_SIZE).unwrap();
network_manager2.register_broadcast_topic::<Number>(topic2.clone(), BUFFER_SIZE, BUFFER_SIZE).unwrap();

tokio::select! {
_ = network_manager1.run() => panic!("network manager ended"),
Expand Down
7 changes: 4 additions & 3 deletions crates/apollo_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
pub fn register_broadcast_topic<T>(
&mut self,
topic: Topic,
buffer_size: usize,
sending_buffer_size: usize,
receiving_buffer_size: usize,
) -> Result<BroadcastTopicChannels<T>, SubscriptionError>
where
T: TryFrom<Bytes> + 'static,
Expand All @@ -215,9 +216,9 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
let topic_hash = topic.hash();

let (messages_to_broadcast_sender, messages_to_broadcast_receiver) =
futures::channel::mpsc::channel(buffer_size);
futures::channel::mpsc::channel(sending_buffer_size);
let (broadcasted_messages_sender, broadcasted_messages_receiver) =
futures::channel::mpsc::channel(buffer_size);
futures::channel::mpsc::channel(receiving_buffer_size);

let insert_result = self
.messages_to_broadcast_receivers
Expand Down
4 changes: 2 additions & 2 deletions crates/apollo_network/src/network_manager/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ async fn broadcast_message() {
);

let mut broadcast_topic_client = network_manager
.register_broadcast_topic(topic.clone(), BUFFER_SIZE)
.register_broadcast_topic(topic.clone(), BUFFER_SIZE, BUFFER_SIZE)
.unwrap()
.broadcast_topic_client;
broadcast_topic_client.broadcast_message(message.clone()).await.unwrap();
Expand Down Expand Up @@ -383,7 +383,7 @@ async fn receive_broadcasted_message_and_report_it() {
mut broadcast_topic_client,
mut broadcasted_messages_receiver,
..
} = network_manager.register_broadcast_topic::<Bytes>(topic.clone(), BUFFER_SIZE).unwrap();
} = network_manager.register_broadcast_topic::<Bytes>(topic.clone(), BUFFER_SIZE, BUFFER_SIZE).unwrap();

tokio::select! {
_ = network_manager.run() => panic!("network manager ended"),
Expand Down
2 changes: 1 addition & 1 deletion crates/apollo_network/src/network_manager/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ where

let mut network_manager = NetworkManager::new(network_config, None, None);
let broadcast_channels =
network_manager.register_broadcast_topic(topic.clone(), BUFFER_SIZE).unwrap();
network_manager.register_broadcast_topic(topic.clone(), BUFFER_SIZE, BUFFER_SIZE).unwrap();

tokio::task::spawn(async move {
let result = network_manager.run().await;
Expand Down
Loading