Skip to content

Commit 41603a4

Browse files
apollo_network: propeller
1 parent bec3b66 commit 41603a4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+6697
-26
lines changed

Cargo.lock

Lines changed: 80 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ members = [
6868
"crates/apollo_p2p_sync_config",
6969
"crates/apollo_proc_macros",
7070
"crates/apollo_proc_macros_tests",
71+
"crates/apollo_propeller",
7172
"crates/apollo_protobuf",
7273
"crates/apollo_reverts",
7374
"crates/apollo_rpc",
@@ -186,6 +187,7 @@ apollo_p2p_sync.path = "crates/apollo_p2p_sync"
186187
apollo_p2p_sync_config.path = "crates/apollo_p2p_sync_config"
187188
apollo_proc_macros = { path = "crates/apollo_proc_macros", version = "0.0.0" }
188189
apollo_proc_macros_tests.path = "crates/apollo_proc_macros_tests"
190+
apollo_propeller.path = "crates/apollo_propeller"
189191
apollo_protobuf.path = "crates/apollo_protobuf"
190192
apollo_reverts.path = "crates/apollo_reverts"
191193
apollo_rpc.path = "crates/apollo_rpc"
@@ -218,6 +220,7 @@ assert_matches = "1.5.0"
218220
async-recursion = "1.1.0"
219221
async-stream = "0.3.3"
220222
async-trait = "0.1.79"
223+
asynchronous-codec = "0.7.0"
221224
atomic_refcell = "0.1.13"
222225
axum = "0.6.12"
223226
base64 = "0.13.0"
@@ -288,6 +291,7 @@ libp2p = "0.56.0"
288291
libp2p-swarm-test = "0.6.0"
289292
log = "0.4"
290293
lru = "0.12.0"
294+
lru_time_cache = "0.11.11"
291295
memmap2 = "0.8.0"
292296
mempool_test_utils.path = "crates/mempool_test_utils"
293297
metrics = "0.24.1"
@@ -322,10 +326,15 @@ prost-types = "0.12.1"
322326
protoc-prebuilt = "0.3.0"
323327
pyo3 = "0.19.1"
324328
pyo3-log = "0.8.1"
329+
quick-protobuf = "0.8.1"
330+
quick-protobuf-codec = "0.3.1"
331+
quickcheck = "1.0.3"
325332
quote = "1.0.26"
326333
rand = "0.8.5"
327334
rand_chacha = "0.3.1"
328335
rand_distr = "0.4.3"
336+
rayon = "1.10"
337+
reed-solomon-simd = "3.1.0"
329338
regex = "1.10.4"
330339
replace_with = "0.1.7"
331340
reqwest = "0.12"

crates/apollo_consensus_manager/src/consensus_manager.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ impl ConsensusManager {
174174
sqmr_metrics: None,
175175
event_metrics: Some(EventMetrics { event_counter: CONSENSUS_NETWORK_EVENTS }),
176176
latency_metrics: Some(LatencyMetrics { ping_latency_seconds: CONSENSUS_PING_LATENCY }),
177+
propeller_metrics: None,
177178
});
178179

179180
NetworkManager::new(self.config.network_config.clone(), None, network_manager_metrics)

crates/apollo_mempool_p2p/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ pub fn create_p2p_propagator_and_runner(
6060
sqmr_metrics: None,
6161
event_metrics: Some(EventMetrics { event_counter: MEMPOOL_P2P_NETWORK_EVENTS }),
6262
latency_metrics: Some(LatencyMetrics { ping_latency_seconds: MEMPOOL_P2P_PING_LATENCY }),
63+
propeller_metrics: None,
6364
});
6465
let mut network_manager = NetworkManager::new(
6566
mempool_p2p_config.network_config,

crates/apollo_network/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ testing = []
1212
apollo_config.workspace = true
1313
apollo_metrics.workspace = true
1414
apollo_network_types.workspace = true
15+
apollo_propeller.workspace = true
1516
async-stream.workspace = true
1617
async-trait.workspace = true
1718
bytes.workspace = true

crates/apollo_network/src/discovery/flow_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ impl DiscoveryMixedBehaviour {
3333
PeerManagerConfig::default(),
3434
None, // No event tracker for tests
3535
None, // No latency metrics for tests
36+
None,
3637
key,
3738
bootstrap_peer_multiaddr,
3839
ChainId::Mainnet,

crates/apollo_network/src/e2e_broadcast_test.rs

Lines changed: 75 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,25 @@ use std::time::Duration;
33
use futures::{FutureExt, StreamExt};
44
use libp2p::core::multiaddr::Protocol;
55
use libp2p::swarm::SwarmEvent;
6-
use libp2p::{Multiaddr, Swarm};
6+
use libp2p::{Multiaddr, PeerId, Swarm};
77
use libp2p_swarm_test::SwarmExt;
88
use starknet_api::core::ChainId;
99

1010
use crate::discovery::DiscoveryConfig;
1111
use crate::gossipsub_impl::Topic;
1212
use crate::mixed_behaviour::MixedBehaviour;
13-
use crate::network_manager::{BroadcastTopicClientTrait, GenericNetworkManager};
13+
use crate::network_manager::{
14+
BroadcastTopicClientTrait,
15+
GenericNetworkManager,
16+
PropellerClientTrait,
17+
};
1418
use crate::peer_manager::PeerManagerConfig;
1519
use crate::prune_dead_connections::{DEFAULT_PING_INTERVAL, DEFAULT_PING_TIMEOUT};
1620
use crate::{sqmr, Bytes};
1721

1822
const TIMEOUT: Duration = Duration::from_secs(5);
1923

20-
async fn create_swarm(bootstrap_peer_multiaddr: Option<Multiaddr>) -> Swarm<MixedBehaviour> {
24+
async fn create_swarm(bootstrap_peer_multiaddr: Option<Vec<Multiaddr>>) -> Swarm<MixedBehaviour> {
2125
let mut swarm = Swarm::new_ephemeral_tokio(|keypair| {
2226
MixedBehaviour::new(
2327
sqmr::Config::default(),
@@ -26,7 +30,7 @@ async fn create_swarm(bootstrap_peer_multiaddr: Option<Multiaddr>) -> Swarm<Mixe
2630
None,
2731
None,
2832
keypair.clone(),
29-
bootstrap_peer_multiaddr.map(|multiaddr| vec![multiaddr]),
33+
bootstrap_peer_multiaddr,
3034
ChainId::Mainnet,
3135
None,
3236
DEFAULT_PING_INTERVAL,
@@ -64,6 +68,28 @@ fn create_network_manager(
6468
)
6569
}
6670

71+
async fn create_network_managers(
72+
num: usize,
73+
) -> Vec<(PeerId, GenericNetworkManager<Swarm<MixedBehaviour>>)> {
74+
let mut bootstrap_addresses = vec![];
75+
let mut network_managers = vec![];
76+
for _ in 0..num {
77+
let swarm = create_swarm(if bootstrap_addresses.is_empty() {
78+
None
79+
} else {
80+
Some(bootstrap_addresses.clone())
81+
})
82+
.await;
83+
let local_peer_id = swarm.local_peer_id();
84+
let address = swarm.external_addresses().next().unwrap().clone();
85+
// Add peer ID to the multiaddr for bootstrap addresses
86+
let bootstrap_address = address.with_p2p(*local_peer_id).unwrap();
87+
bootstrap_addresses.push(bootstrap_address);
88+
network_managers.push((*local_peer_id, create_network_manager(swarm)));
89+
}
90+
network_managers
91+
}
92+
6793
const BUFFER_SIZE: usize = 100;
6894

6995
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -96,9 +122,9 @@ async fn broadcast_subscriber_end_to_end_test() {
96122
bootstrap_peer_multiaddr.with_p2p(*bootstrap_swarm.local_peer_id()).unwrap();
97123
let bootstrap_network_manager = create_network_manager(bootstrap_swarm);
98124
let mut network_manager1 =
99-
create_network_manager(create_swarm(Some(bootstrap_peer_multiaddr.clone())).await);
125+
create_network_manager(create_swarm(Some(vec![bootstrap_peer_multiaddr.clone()])).await);
100126
let mut network_manager2 =
101-
create_network_manager(create_swarm(Some(bootstrap_peer_multiaddr)).await);
127+
create_network_manager(create_swarm(Some(vec![bootstrap_peer_multiaddr])).await);
102128

103129
let mut subscriber_channels1_1 =
104130
network_manager1.register_broadcast_topic::<Number>(topic1.clone(), BUFFER_SIZE).unwrap();
@@ -141,3 +167,46 @@ async fn broadcast_subscriber_end_to_end_test() {
141167
}
142168
}
143169
}
170+
171+
#[tokio::test]
172+
async fn propeller_message_forwarding_test() {
173+
// This test verifies that propeller messages are properly forwarded through channels
174+
// when received by the network manager, similar to how gossipsub messages work.
175+
176+
let mut network_managers = create_network_managers(2).await;
177+
let (peer_1, mut nm_1) = network_managers.remove(0);
178+
let (peer_2, mut nm_2) = network_managers.remove(0);
179+
180+
let peers = vec![(peer_1, 1000), (peer_2, 500)];
181+
182+
let mut channels_1 =
183+
nm_1.register_propeller_channels::<Vec<u8>>(BUFFER_SIZE, peers.clone()).unwrap();
184+
let mut channels_2 = nm_2.register_propeller_channels::<Vec<u8>>(BUFFER_SIZE, peers).unwrap();
185+
186+
let message = vec![123; 64]; // must be a multiple of 64
187+
188+
tokio::select! {
189+
_ = nm_1.run() => panic!("network manager ended"),
190+
_ = nm_2.run() => panic!("network manager ended"),
191+
result = tokio::time::timeout(
192+
TIMEOUT, async move {
193+
tokio::time::sleep(Duration::from_secs(1)).await;
194+
195+
println!("Sending message");
196+
channels_1.propeller_client.send_message(message.clone()).await.unwrap();
197+
198+
println!("Receiving message");
199+
let (publisher, _, received_message) =
200+
channels_2.propeller_messages_receiver.next().await.unwrap();
201+
202+
assert_eq!(received_message.unwrap(), message);
203+
assert_eq!(publisher, peer_1);
204+
205+
assert!(channels_1.propeller_messages_receiver.next().now_or_never().is_none());
206+
assert!(channels_2.propeller_messages_receiver.next().now_or_never().is_none());
207+
}
208+
) => {
209+
result.unwrap()
210+
}
211+
}
212+
}

crates/apollo_network/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ pub mod misconduct_score;
205205
mod mixed_behaviour;
206206
pub mod network_manager;
207207
pub mod peer_manager;
208+
pub mod propeller_impl;
208209
mod prune_dead_connections;
209210
mod sqmr;
210211
#[cfg(test)]

crates/apollo_network/src/metrics.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::collections::HashMap;
22

33
use apollo_metrics::generate_permutation_labels;
44
use apollo_metrics::metrics::{LabeledMetricCounter, MetricCounter, MetricGauge, MetricHistogram};
5+
use apollo_propeller::metrics::PropellerMetrics;
56
use libp2p::gossipsub::{PublishError, TopicHash};
67
use strum::{IntoStaticStr, VariantNames};
78
use strum_macros::EnumVariantNames;
@@ -147,6 +148,7 @@ pub struct NetworkMetrics {
147148
pub sqmr_metrics: Option<SqmrNetworkMetrics>,
148149
pub event_metrics: Option<EventMetrics>,
149150
pub latency_metrics: Option<LatencyMetrics>,
151+
pub propeller_metrics: Option<PropellerMetrics>,
150152
}
151153

152154
impl NetworkMetrics {
@@ -169,5 +171,8 @@ impl NetworkMetrics {
169171
if let Some(latency_metrics) = self.latency_metrics.as_ref() {
170172
latency_metrics.register();
171173
}
174+
if let Some(propeller_metrics) = self.propeller_metrics.as_ref() {
175+
propeller_metrics.register();
176+
}
172177
}
173178
}

0 commit comments

Comments
 (0)