Skip to content

Commit 8541c15

Browse files
apollo_network: propeller
1 parent 9a54e77 commit 8541c15

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+7091
-13
lines changed

Cargo.lock

Lines changed: 80 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ members = [
6969
"crates/apollo_p2p_sync_config",
7070
"crates/apollo_proc_macros",
7171
"crates/apollo_proc_macros_tests",
72+
"crates/apollo_propeller",
7273
"crates/apollo_protobuf",
7374
"crates/apollo_reverts",
7475
"crates/apollo_rpc",
@@ -187,6 +188,7 @@ apollo_p2p_sync.path = "crates/apollo_p2p_sync"
187188
apollo_p2p_sync_config.path = "crates/apollo_p2p_sync_config"
188189
apollo_proc_macros = { path = "crates/apollo_proc_macros", version = "0.0.0" }
189190
apollo_proc_macros_tests.path = "crates/apollo_proc_macros_tests"
191+
apollo_propeller.path = "crates/apollo_propeller"
190192
apollo_protobuf.path = "crates/apollo_protobuf"
191193
apollo_reverts.path = "crates/apollo_reverts"
192194
apollo_rpc.path = "crates/apollo_rpc"
@@ -219,6 +221,7 @@ assert_matches = "1.5.0"
219221
async-recursion = "1.1.0"
220222
async-stream = "0.3.3"
221223
async-trait = "0.1.79"
224+
asynchronous-codec = "0.7.0"
222225
atomic_refcell = "0.1.13"
223226
axum = "0.6.12"
224227
base64 = "0.13.0"
@@ -290,6 +293,7 @@ libp2p = "0.56.0"
290293
libp2p-swarm-test = "0.6.0"
291294
log = "0.4"
292295
lru = "0.12.0"
296+
lru_time_cache = "0.11.11"
293297
memmap2 = "0.8.0"
294298
mempool_test_utils.path = "crates/mempool_test_utils"
295299
metrics = "0.24.1"
@@ -323,10 +327,15 @@ prost-types = "0.12.1"
323327
protoc-prebuilt = "0.3.0"
324328
pyo3 = "0.19.1"
325329
pyo3-log = "0.8.1"
330+
quick-protobuf = "0.8.1"
331+
quick-protobuf-codec = "0.3.1"
332+
quickcheck = "1.0.3"
326333
quote = "1.0.26"
327334
rand = "0.8.5"
328335
rand_chacha = "0.3.1"
329336
rand_distr = "0.4.3"
337+
rayon = "1.11.0"
338+
reed-solomon-simd = "3.1.0"
330339
regex = "1.10.4"
331340
replace_with = "0.1.7"
332341
reqwest = "0.12"

crates/apollo_consensus_manager/src/consensus_manager.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ impl ConsensusManager {
214214
sqmr_metrics: None,
215215
event_metrics: Some(EventMetrics { event_counter: CONSENSUS_NETWORK_EVENTS }),
216216
latency_metrics: Some(LatencyMetrics { ping_latency_seconds: CONSENSUS_PING_LATENCY }),
217+
propeller_metrics: None,
217218
});
218219

219220
NetworkManager::new(self.config.network_config.clone(), None, network_manager_metrics)

crates/apollo_mempool_p2p/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ pub fn create_p2p_propagator_and_runner(
7474
sqmr_metrics: None,
7575
event_metrics: Some(EventMetrics { event_counter: MEMPOOL_P2P_NETWORK_EVENTS }),
7676
latency_metrics: Some(LatencyMetrics { ping_latency_seconds: MEMPOOL_P2P_PING_LATENCY }),
77+
propeller_metrics: None,
7778
});
7879
let mut network_manager = NetworkManager::new(
7980
mempool_p2p_config.network_config,

crates/apollo_network/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ testing = []
1212
apollo_config.workspace = true
1313
apollo_metrics.workspace = true
1414
apollo_network_types.workspace = true
15+
apollo_propeller.workspace = true
1516
async-stream.workspace = true
1617
async-trait.workspace = true
1718
bytes.workspace = true

crates/apollo_network/src/discovery/flow_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ impl DiscoveryMixedBehaviour {
3333
PeerManagerConfig::default(),
3434
None, // No event tracker for tests
3535
None, // No latency metrics for tests
36+
None,
3637
key,
3738
bootstrap_peer_multiaddr,
3839
ChainId::Mainnet,

crates/apollo_network/src/e2e_broadcast_test.rs

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,35 @@ use std::time::Duration;
33
use futures::{FutureExt, StreamExt};
44
use libp2p::core::multiaddr::Protocol;
55
use libp2p::swarm::SwarmEvent;
6-
use libp2p::{Multiaddr, Swarm};
6+
use libp2p::{Multiaddr, PeerId, Swarm};
77
use libp2p_swarm_test::SwarmExt;
88
use starknet_api::core::ChainId;
99

1010
use crate::discovery::DiscoveryConfig;
1111
use crate::gossipsub_impl::Topic;
1212
use crate::mixed_behaviour::MixedBehaviour;
13-
use crate::network_manager::{BroadcastTopicClientTrait, GenericNetworkManager};
13+
use crate::network_manager::{
14+
BroadcastTopicClientTrait,
15+
GenericNetworkManager,
16+
PropellerClientTrait,
17+
};
1418
use crate::peer_manager::PeerManagerConfig;
1519
use crate::prune_dead_connections::{DEFAULT_PING_INTERVAL, DEFAULT_PING_TIMEOUT};
1620
use crate::{sqmr, Bytes};
1721

1822
const TIMEOUT: Duration = Duration::from_secs(5);
1923

20-
async fn create_swarm(bootstrap_peer_multiaddr: Option<Multiaddr>) -> Swarm<MixedBehaviour> {
24+
async fn create_swarm(bootstrap_peer_multiaddr: Option<Vec<Multiaddr>>) -> Swarm<MixedBehaviour> {
2125
let mut swarm = Swarm::new_ephemeral_tokio(|keypair| {
2226
MixedBehaviour::new(
2327
sqmr::Config::default(),
2428
DiscoveryConfig::default(),
2529
PeerManagerConfig::default(),
2630
None,
2731
None,
32+
None,
2833
keypair.clone(),
29-
bootstrap_peer_multiaddr.map(|multiaddr| vec![multiaddr]),
34+
bootstrap_peer_multiaddr,
3035
ChainId::Mainnet,
3136
None,
3237
DEFAULT_PING_INTERVAL,
@@ -64,6 +69,28 @@ fn create_network_manager(
6469
)
6570
}
6671

72+
async fn create_network_managers(
73+
num: usize,
74+
) -> Vec<(PeerId, GenericNetworkManager<Swarm<MixedBehaviour>>)> {
75+
let mut bootstrap_addresses = vec![];
76+
let mut network_managers = vec![];
77+
for _ in 0..num {
78+
let swarm = create_swarm(if bootstrap_addresses.is_empty() {
79+
None
80+
} else {
81+
Some(bootstrap_addresses.clone())
82+
})
83+
.await;
84+
let local_peer_id = swarm.local_peer_id();
85+
let address = swarm.external_addresses().next().unwrap().clone();
86+
// Add peer ID to the multiaddr for bootstrap addresses
87+
let bootstrap_address = address.with_p2p(*local_peer_id).unwrap();
88+
bootstrap_addresses.push(bootstrap_address);
89+
network_managers.push((*local_peer_id, create_network_manager(swarm)));
90+
}
91+
network_managers
92+
}
93+
6794
const BUFFER_SIZE: usize = 100;
6895

6996
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -96,9 +123,9 @@ async fn broadcast_subscriber_end_to_end_test() {
96123
bootstrap_peer_multiaddr.with_p2p(*bootstrap_swarm.local_peer_id()).unwrap();
97124
let bootstrap_network_manager = create_network_manager(bootstrap_swarm);
98125
let mut network_manager1 =
99-
create_network_manager(create_swarm(Some(bootstrap_peer_multiaddr.clone())).await);
126+
create_network_manager(create_swarm(Some(vec![bootstrap_peer_multiaddr.clone()])).await);
100127
let mut network_manager2 =
101-
create_network_manager(create_swarm(Some(bootstrap_peer_multiaddr)).await);
128+
create_network_manager(create_swarm(Some(vec![bootstrap_peer_multiaddr])).await);
102129

103130
let mut subscriber_channels1_1 =
104131
network_manager1.register_broadcast_topic::<Number>(topic1.clone(), BUFFER_SIZE).unwrap();
@@ -141,3 +168,47 @@ async fn broadcast_subscriber_end_to_end_test() {
141168
}
142169
}
143170
}
171+
172+
#[tokio::test]
173+
async fn propeller_message_forwarding_test() {
174+
// This test verifies that propeller messages are properly forwarded through channels
175+
// when received by the network manager, similar to how gossipsub messages work.
176+
177+
let mut network_managers = create_network_managers(2).await;
178+
let (peer_1, mut nm_1) = network_managers.remove(0);
179+
let (peer_2, mut nm_2) = network_managers.remove(0);
180+
181+
let peers = vec![(peer_1, 1000), (peer_2, 500)];
182+
183+
let mut channels_1 =
184+
nm_1.register_propeller_channels::<Vec<u8>>(BUFFER_SIZE, peers.clone()).await.unwrap();
185+
let mut channels_2 =
186+
nm_2.register_propeller_channels::<Vec<u8>>(BUFFER_SIZE, peers).await.unwrap();
187+
188+
let message = vec![123; 64]; // must be a multiple of 64
189+
190+
tokio::select! {
191+
_ = nm_1.run() => panic!("network manager ended"),
192+
_ = nm_2.run() => panic!("network manager ended"),
193+
result = tokio::time::timeout(
194+
TIMEOUT, async move {
195+
tokio::time::sleep(Duration::from_secs(1)).await;
196+
197+
println!("Sending message");
198+
channels_1.propeller_client.send_message(message.clone()).await.unwrap();
199+
200+
println!("Receiving message");
201+
let (publisher, _, received_message) =
202+
channels_2.propeller_messages_receiver.next().await.unwrap();
203+
204+
assert_eq!(received_message.unwrap(), message);
205+
assert_eq!(publisher, peer_1);
206+
207+
assert!(channels_1.propeller_messages_receiver.next().now_or_never().is_none());
208+
assert!(channels_2.propeller_messages_receiver.next().now_or_never().is_none());
209+
}
210+
) => {
211+
result.unwrap()
212+
}
213+
}
214+
}

crates/apollo_network/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ pub mod misconduct_score;
205205
mod mixed_behaviour;
206206
pub mod network_manager;
207207
pub mod peer_manager;
208+
pub mod propeller_impl;
208209
mod prune_dead_connections;
209210
pub mod sqmr;
210211
#[cfg(test)]

crates/apollo_network/src/metrics.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use apollo_metrics::metrics::{
88
MetricGauge,
99
MetricHistogram,
1010
};
11+
use apollo_propeller::metrics::PropellerMetrics;
1112
use libp2p::gossipsub::{PublishError, TopicHash};
1213
use strum::{IntoStaticStr, VariantNames};
1314
use strum_macros::EnumVariantNames;
@@ -195,6 +196,7 @@ pub struct NetworkMetrics {
195196
pub sqmr_metrics: Option<SqmrNetworkMetrics>,
196197
pub event_metrics: Option<EventMetrics>,
197198
pub latency_metrics: Option<LatencyMetrics>,
199+
pub propeller_metrics: Option<PropellerMetrics>,
198200
}
199201

200202
impl NetworkMetrics {
@@ -217,5 +219,8 @@ impl NetworkMetrics {
217219
if let Some(latency_metrics) = self.latency_metrics.as_ref() {
218220
latency_metrics.register();
219221
}
222+
if let Some(propeller_metrics) = self.propeller_metrics.as_ref() {
223+
propeller_metrics.register();
224+
}
220225
}
221226
}

0 commit comments

Comments
 (0)