Skip to content

Commit a016e70

Browse files
apollo_network: propeller
1 parent e8247a0 commit a016e70

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+6695
-26
lines changed

Cargo.lock

Lines changed: 79 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ members = [
6767
"crates/apollo_p2p_sync_config",
6868
"crates/apollo_proc_macros",
6969
"crates/apollo_proc_macros_tests",
70+
"crates/apollo_propeller",
7071
"crates/apollo_protobuf",
7172
"crates/apollo_reverts",
7273
"crates/apollo_rpc",
@@ -184,6 +185,7 @@ apollo_p2p_sync.path = "crates/apollo_p2p_sync"
184185
apollo_p2p_sync_config.path = "crates/apollo_p2p_sync_config"
185186
apollo_proc_macros = { path = "crates/apollo_proc_macros", version = "0.0.0" }
186187
apollo_proc_macros_tests.path = "crates/apollo_proc_macros_tests"
188+
apollo_propeller.path = "crates/apollo_propeller"
187189
apollo_protobuf.path = "crates/apollo_protobuf"
188190
apollo_reverts.path = "crates/apollo_reverts"
189191
apollo_rpc.path = "crates/apollo_rpc"
@@ -216,6 +218,7 @@ assert_matches = "1.5.0"
216218
async-recursion = "1.1.0"
217219
async-stream = "0.3.3"
218220
async-trait = "0.1.79"
221+
asynchronous-codec = "0.7.0"
219222
atomic_refcell = "0.1.13"
220223
axum = "0.6.12"
221224
base64 = "0.13.0"
@@ -286,6 +289,7 @@ libp2p = "0.56.0"
286289
libp2p-swarm-test = "0.6.0"
287290
log = "0.4"
288291
lru = "0.12.0"
292+
lru_time_cache = "0.11.11"
289293
memmap2 = "0.8.0"
290294
mempool_test_utils.path = "crates/mempool_test_utils"
291295
metrics = "0.24.1"
@@ -320,10 +324,15 @@ prost-types = "0.12.1"
320324
protoc-prebuilt = "0.3.0"
321325
pyo3 = "0.19.1"
322326
pyo3-log = "0.8.1"
327+
quick-protobuf = "0.8.1"
328+
quick-protobuf-codec = "0.3.1"
329+
quickcheck = "1.0.3"
323330
quote = "1.0.26"
324331
rand = "0.8.5"
325332
rand_chacha = "0.3.1"
326333
rand_distr = "0.4.3"
334+
rayon = "1.10"
335+
reed-solomon-simd = "3.1.0"
327336
regex = "1.10.4"
328337
replace_with = "0.1.7"
329338
reqwest = "0.12"

crates/apollo_consensus_manager/src/consensus_manager.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ impl ConsensusManager {
174174
sqmr_metrics: None,
175175
event_metrics: Some(EventMetrics { event_counter: CONSENSUS_NETWORK_EVENTS }),
176176
latency_metrics: Some(LatencyMetrics { ping_latency_seconds: CONSENSUS_PING_LATENCY }),
177+
propeller_metrics: None,
177178
});
178179

179180
NetworkManager::new(self.config.network_config.clone(), None, network_manager_metrics)

crates/apollo_mempool_p2p/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ pub fn create_p2p_propagator_and_runner(
6060
sqmr_metrics: None,
6161
event_metrics: Some(EventMetrics { event_counter: MEMPOOL_P2P_NETWORK_EVENTS }),
6262
latency_metrics: Some(LatencyMetrics { ping_latency_seconds: MEMPOOL_P2P_PING_LATENCY }),
63+
propeller_metrics: None,
6364
});
6465
let mut network_manager = NetworkManager::new(
6566
mempool_p2p_config.network_config,

crates/apollo_network/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ testing = []
1212
apollo_config.workspace = true
1313
apollo_metrics.workspace = true
1414
apollo_network_types.workspace = true
15+
apollo_propeller.workspace = true
1516
async-stream.workspace = true
1617
async-trait.workspace = true
1718
bytes.workspace = true

crates/apollo_network/src/bin/broadcast_network_stress_test_node/args.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ pub enum NetworkProtocol {
3333
/// Use Reversed SQMR where receivers initiate requests to broadcasters
3434
#[value(name = "reversed-sqmr")]
3535
ReveresedSqmr,
36+
/// Use Propeller for leader-based erasure-coded broadcasting
37+
#[value(name = "propeller")]
38+
Propeller,
3639
}
3740

3841
impl Display for Mode {

crates/apollo_network/src/bin/broadcast_network_stress_test_node/message_handling.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@ use apollo_network::network_manager::{
22
BroadcastTopicClient,
33
BroadcastTopicClientTrait,
44
BroadcastTopicServer,
5+
PropellerClient,
6+
PropellerClientTrait,
7+
PropellerMessageServer,
8+
ReceivedPropellerMessage,
59
SqmrClientSender,
610
SqmrServerReceiver,
711
};
@@ -16,6 +20,26 @@ pub enum MessageSender {
1620
Gossipsub(BroadcastTopicClient<TopicType>),
1721
Sqmr(SqmrClientSender<TopicType, TopicType>),
1822
ReveresedSqmr(ReveresedSqmrSender),
23+
Propeller(PropellerSender),
24+
}
25+
26+
/// Wrapper for Propeller client that handles message ID generation
27+
pub struct PropellerSender {
28+
client: PropellerClient<TopicType>,
29+
}
30+
31+
impl PropellerSender {
32+
pub fn new(client: PropellerClient<TopicType>) -> Self {
33+
Self { client }
34+
}
35+
36+
async fn send_message(&mut self, message: TopicType) {
37+
if let Err(e) = self.client.send_message(message).await {
38+
error!("Failed to send Propeller message: {:?}", e);
39+
} else {
40+
trace!("Sent Propeller message");
41+
}
42+
}
1943
}
2044

2145
/// Wrapper for ReveresedSqmr that maintains the last active query
@@ -88,6 +112,9 @@ impl MessageSender {
88112
// Then broadcast the message to all active queries
89113
sender.broadcast_to_queries(message).await;
90114
}
115+
MessageSender::Propeller(sender) => {
116+
sender.send_message(message).await;
117+
}
91118
}
92119
}
93120
}
@@ -96,6 +123,7 @@ pub enum MessageReceiver {
96123
Gossipsub(BroadcastTopicServer<TopicType>),
97124
Sqmr(SqmrServerReceiver<TopicType, TopicType>),
98125
ReveresedSqmr(SqmrClientSender<TopicType, TopicType>),
126+
Propeller(PropellerMessageServer<TopicType>),
99127
}
100128

101129
impl MessageReceiver {
@@ -147,6 +175,24 @@ impl MessageReceiver {
147175
}
148176
}
149177
},
178+
MessageReceiver::Propeller(receiver) => receiver
179+
.for_each(
180+
|(sender, message_root, result): ReceivedPropellerMessage<TopicType>| async move {
181+
match result {
182+
Ok(message) => {
183+
trace!("Received Propeller message with ID: {}", message_root);
184+
f(message, Some(sender));
185+
}
186+
Err(e) => {
187+
error!(
188+
"Failed to deserialize Propeller message {}: {:?}",
189+
message_root, e
190+
);
191+
}
192+
}
193+
},
194+
)
195+
.await,
150196
}
151197
}
152198
}

crates/apollo_network/src/bin/broadcast_network_stress_test_node/metrics.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ use apollo_network::metrics::{
1313
EVENT_TYPE_LABELS,
1414
NETWORK_BROADCAST_DROP_LABELS,
1515
};
16+
use apollo_propeller::metrics::{
17+
PropellerMetrics,
18+
COLLECTION_LENGTH_LABELS,
19+
SHARD_SEND_FAILURE_LABELS,
20+
SHARD_VALIDATION_FAILURE_LABELS,
21+
};
1622
use libp2p::gossipsub::{Sha256Topic, Topic};
1723
use libp2p::PeerId;
1824
use sysinfo::{Networks, System};
@@ -66,6 +72,24 @@ define_metrics!(
6672
LabeledMetricCounter { NETWORK_EVENT_COUNTER, "network_event_counter", "Network events counter by type", init = 0, labels = EVENT_TYPE_LABELS },
6773

6874
MetricHistogram { PING_LATENCY_SECONDS, "ping_latency_seconds", "Ping latency in seconds" },
75+
76+
// Propeller Protocol Metrics
77+
MetricCounter { PROPELLER_SHARDS_PUBLISHED, "propeller_shards_published", "Total number of shards published (created) by this node", init = 0 },
78+
MetricCounter { PROPELLER_SHARDS_SENT, "propeller_shards_sent", "Total number of shards sent to peers (includes forwarding)", init = 0 },
79+
LabeledMetricCounter { PROPELLER_SHARDS_SEND_FAILED, "propeller_shards_send_failed", "Total number of shard send failures, labeled by reason", init = 0, labels = SHARD_SEND_FAILURE_LABELS },
80+
MetricCounter { PROPELLER_SHARD_BYTES_SENT, "propeller_shard_bytes_sent", "Total bytes sent in shard data (payload only)", init = 0 },
81+
MetricCounter { PROPELLER_SHARDS_RECEIVED, "propeller_shards_received", "Total number of shards received from peers", init = 0 },
82+
MetricCounter { PROPELLER_SHARDS_VALIDATED, "propeller_shards_validated", "Total number of shards successfully validated", init = 0 },
83+
LabeledMetricCounter { PROPELLER_SHARDS_VALIDATION_FAILED, "propeller_shards_validation_failed", "Total number of shards that failed validation, labeled by reason", init = 0, labels = SHARD_VALIDATION_FAILURE_LABELS },
84+
MetricCounter { PROPELLER_SHARDS_FORWARDED, "propeller_shards_forwarded", "Total number of shards forwarded to children in tree", init = 0 },
85+
MetricCounter { PROPELLER_SHARD_BYTES_RECEIVED, "propeller_shard_bytes_received", "Total bytes received in shard data (payload only)", init = 0 },
86+
MetricCounter { PROPELLER_MESSAGES_RECONSTRUCTED, "propeller_messages_reconstructed", "Total number of messages successfully reconstructed from shards", init = 0 },
87+
MetricCounter { PROPELLER_MESSAGES_RECONSTRUCTION_FAILED, "propeller_messages_reconstruction_failed", "Total number of message reconstruction failures", init = 0 },
88+
MetricCounter { PROPELLER_TREES_GENERATED, "propeller_trees_generated", "Total number of tree generation operations", init = 0 },
89+
LabeledMetricGauge { PROPELLER_COLLECTION_LENGTHS, "propeller_collection_lengths", "Length of various collections (queues, sets, caches) tracked by label", labels = COLLECTION_LENGTH_LABELS },
90+
MetricHistogram { PROPELLER_SHARD_VALIDATION_DURATION, "propeller_shard_validation_duration", "Time to validate a single shard (seconds)" },
91+
MetricHistogram { PROPELLER_MESSAGE_RECONSTRUCTION_DURATION, "propeller_message_reconstruction_duration", "Time to reconstruct a message from shards (seconds)" },
92+
MetricHistogram { PROPELLER_MESSAGE_END_TO_END_LATENCY, "propeller_message_end_to_end_latency", "End-to-end latency from first shard received to message reconstructed (seconds)" },
6993
},
7094
);
7195

@@ -156,13 +180,33 @@ pub fn create_network_metrics() -> NetworkMetrics {
156180
// Create latency metrics for ping monitoring
157181
let latency_metrics = LatencyMetrics { ping_latency_seconds: PING_LATENCY_SECONDS };
158182

183+
let propeller_metrics = PropellerMetrics {
184+
shards_published: PROPELLER_SHARDS_PUBLISHED,
185+
shards_sent: PROPELLER_SHARDS_SENT,
186+
shards_send_failed: PROPELLER_SHARDS_SEND_FAILED,
187+
shard_bytes_sent: PROPELLER_SHARD_BYTES_SENT,
188+
shards_received: PROPELLER_SHARDS_RECEIVED,
189+
shards_validated: PROPELLER_SHARDS_VALIDATED,
190+
shards_validation_failed: PROPELLER_SHARDS_VALIDATION_FAILED,
191+
shards_forwarded: PROPELLER_SHARDS_FORWARDED,
192+
shard_bytes_received: PROPELLER_SHARD_BYTES_RECEIVED,
193+
messages_reconstructed: PROPELLER_MESSAGES_RECONSTRUCTED,
194+
messages_reconstruction_failed: PROPELLER_MESSAGES_RECONSTRUCTION_FAILED,
195+
trees_generated: PROPELLER_TREES_GENERATED,
196+
collection_lengths: PROPELLER_COLLECTION_LENGTHS,
197+
shard_validation_duration: PROPELLER_SHARD_VALIDATION_DURATION,
198+
message_reconstruction_duration: PROPELLER_MESSAGE_RECONSTRUCTION_DURATION,
199+
message_end_to_end_latency: PROPELLER_MESSAGE_END_TO_END_LATENCY,
200+
};
201+
159202
NetworkMetrics {
160203
num_connected_peers: NETWORK_CONNECTED_PEERS,
161204
num_blacklisted_peers: NETWORK_BLACKLISTED_PEERS,
162205
broadcast_metrics_by_topic: Some(broadcast_metrics_by_topic),
163206
sqmr_metrics: Some(sqmr_metrics),
164207
event_metrics: Some(event_metrics),
165208
latency_metrics: Some(latency_metrics),
209+
propeller_metrics: Some(propeller_metrics),
166210
}
167211
}
168212

0 commit comments

Comments
 (0)