apollo_propeller: implemented MessageProcessor#12780
apollo_propeller: implemented MessageProcessor#12780sirandreww-starkware wants to merge 1 commit intomain-v0.14.2from
Conversation
b098818 to
7769109
Compare
guy-starkware
left a comment
There was a problem hiding this comment.
@guy-starkware reviewed 4 files and all commit messages.
Reviewable status: all files reviewed, 2 unresolved discussions (waiting on noamsp-starkware, ShahakShama, and sirandreww-starkware).
7769109 to
2c7c3cf
Compare
5428236 to
6ee3b2a
Compare
ShahakShama
left a comment
There was a problem hiding this comment.
@ShahakShama made 12 comments.
Reviewable status: 0 of 4 files reviewed, 13 unresolved discussions (waiting on guy-starkware, noamsp-starkware, and sirandreww-starkware).
crates/apollo_propeller/src/message_processor.rs line 41 at r1 (raw file):
signature: Option<Vec<u8>>, reconstructed_message: Option<Vec<u8>>, count_at_reconstruction: usize,
This is unclear. rename and/or document
crates/apollo_propeller/src/message_processor.rs line 42 at r1 (raw file):
reconstructed_message: Option<Vec<u8>>, count_at_reconstruction: usize, additional_shards_after_reconstruction: usize,
Same here
crates/apollo_propeller/src/message_processor.rs line 48 at r1 (raw file):
fn new() -> Self { Self { received_shards: Vec::new(),
Where is this vector filled up?
crates/apollo_propeller/src/message_processor.rs line 71 at r1 (raw file):
fn capture_signature(&mut self, unit: &PropellerUnit) { if self.signature.is_none() { self.signature = Some(unit.signature().to_vec());
Shouldn't you check it's the same signature if it's some and return an error if it's not?
crates/apollo_propeller/src/message_processor.rs line 76 at r1 (raw file):
/// Total shard count used for the access-threshold check. fn access_count(&self) -> usize {
access is an unclear word here. Try renaming
crates/apollo_propeller/src/message_processor.rs line 135 at r1 (raw file):
let (result, returned_validator, unit) = Self::validate_on_rayon(validator, sender, unit).await;
Consider renaming to capture not the fact you use rayon but the thing you're getting from using rayon
crates/apollo_propeller/src/message_processor.rs line 139 at r1 (raw file):
if let Err(err) = result { trace!("[MSG_PROC] Validation failed for index={:?}: {:?}", unit.index(), err);
Add TODO to penalize sender of bad shard (we'll do this TODO way later on, but still)
crates/apollo_propeller/src/message_processor.rs line 170 at r1 (raw file):
unit: PropellerUnit, ) -> ValidationResult { let (tx, rx) = oneshot::channel();
No need to create a oneshot channel for this. Just return the result in the rayon closure and await the rayon spawn
crates/apollo_propeller/src/message_processor.rs line 186 at r1 (raw file):
} fn broadcast_shard(&self, unit: &PropellerUnit) {
Consider erasing this function and putting its code inside maybe_broadcast_my_shard
crates/apollo_propeller/src/message_processor.rs line 197 at r1 (raw file):
trace!("[MSG_PROC] Broadcasting unit index={:?} to {} peers", unit.index(), peers.len()); self.engine_tx .send(EventStateManagerToEngine::BroadcastUnit { unit: unit.clone(), peers })
Consider renaming BroadcastUnit to SendUnit or SendUnitToPeers since you're giving a specific set of peers
crates/apollo_propeller/src/message_processor.rs line 201 at r1 (raw file):
} // --- Reconstruction ----------------------------------------------------
Remove this
6ee3b2a to
2dbf031
Compare
2c7c3cf to
08dab67
Compare
sirandreww-starkware
left a comment
There was a problem hiding this comment.
@sirandreww-starkware made 13 comments and resolved 3 discussions.
Reviewable status: 0 of 4 files reviewed, 10 unresolved discussions (waiting on guy-starkware, noamsp-starkware, and ShahakShama).
crates/apollo_propeller/src/message_processor.rs line 41 at r1 (raw file):
Previously, ShahakShama wrote…
This is unclear. rename and/or document
renamed
crates/apollo_propeller/src/message_processor.rs line 42 at r1 (raw file):
Previously, ShahakShama wrote…
Same here
renamed
crates/apollo_propeller/src/message_processor.rs line 48 at r1 (raw file):
Previously, ShahakShama wrote…
Where is this vector filled up?
Are you asking me to re-name or to add a comment or genuinely where in the code is this pushed to?
crates/apollo_propeller/src/message_processor.rs line 71 at r1 (raw file):
Previously, ShahakShama wrote…
Shouldn't you check it's the same signature if it's some and return an error if it's not?
These units passed validation, they all have the same signature. I can add an assert just in case? or a comment?
crates/apollo_propeller/src/message_processor.rs line 76 at r1 (raw file):
Previously, ShahakShama wrote…
access is an unclear word here. Try renaming
Done
crates/apollo_propeller/src/message_processor.rs line 135 at r1 (raw file):
Previously, ShahakShama wrote…
Consider renaming to capture not the fact you use rayon but the thing you're getting from using rayon
Done
not even using rayon now
crates/apollo_propeller/src/message_processor.rs line 139 at r1 (raw file):
Previously, ShahakShama wrote…
Add TODO to penalize sender of bad shard (we'll do this TODO way later on, but still)
Done
crates/apollo_propeller/src/message_processor.rs line 170 at r1 (raw file):
Previously, ShahakShama wrote…
No need to create a oneshot channel for this. Just return the result in the rayon closure and await the rayon spawn
Not part of rayon API (rayon::spawn returns ())
Changed this now that we're using tokio::spawn_blocking
crates/apollo_propeller/src/message_processor.rs line 186 at r1 (raw file):
Previously, ShahakShama wrote…
Consider erasing this function and putting its code inside maybe_broadcast_my_shard
it is used in 2 locations
crates/apollo_propeller/src/message_processor.rs line 197 at r1 (raw file):
Previously, ShahakShama wrote…
Consider renaming BroadcastUnit to SendUnit or SendUnitToPeers since you're giving a specific set of peers
Done
crates/apollo_propeller/src/message_processor.rs line 201 at r1 (raw file):
Previously, ShahakShama wrote…
Remove this
Done
08dab67 to
e09b2bf
Compare
e09b2bf to
d2aa768
Compare
ShahakShama
left a comment
There was a problem hiding this comment.
@ShahakShama reviewed 3 files and all commit messages, made 14 comments, and resolved 7 discussions.
Reviewable status: 3 of 4 files reviewed, 14 unresolved discussions (waiting on guy-starkware, noamsp-starkware, and sirandreww-starkware).
crates/apollo_propeller/src/message_processor.rs line 71 at r1 (raw file):
Previously, sirandreww-starkware (Andrew Luka) wrote…
These units passed validation, they all have the same signature. I can add an assert just in case? or a comment?
You can add a comment
crates/apollo_propeller/src/message_processor.rs line 186 at r1 (raw file):
Previously, sirandreww-starkware (Andrew Luka) wrote…
it is used in 2 locations
Then a different consideration: remove maybe_broadcast_my_shard and put the 2 lines of code it has inside the main loop
crates/apollo_propeller/src/message_processor.rs line 29 at r5 (raw file):
#[derive(Debug)] struct ReconstructionSuccess {
Rename to ReconstructionOutput
crates/apollo_propeller/src/message_processor.rs line 39 at r5 (raw file):
PreConstruction { received_shards: Vec<PropellerUnit>, broadcast_my_shard: bool,
Rename to did_broadcast_my_shard or broadcasted_my_shard
crates/apollo_propeller/src/message_processor.rs line 43 at r5 (raw file):
}, // No need to track the unit indices after reconstruction (unit duplication already validated) PostConstruction {
Add a short documentation saying that the message was constructed but not yet delivered to the application (I forgot that was the algorithm on initial review)
crates/apollo_propeller/src/message_processor.rs line 45 at r5 (raw file):
PostConstruction { reconstructed_message: Option<Vec<u8>>, broadcast_my_shard: bool,
Why do you need this PostConstruction? once you perform the construction you broadcast your shard if needed
crates/apollo_propeller/src/message_processor.rs line 46 at r5 (raw file):
reconstructed_message: Option<Vec<u8>>, broadcast_my_shard: bool, shard_count_at_reconstruction: usize,
Why not just one counter of num_received_shards?
crates/apollo_propeller/src/message_processor.rs line 71 at r5 (raw file):
} fn set_broadcast_my_shard(&mut self) {
Can you erase this function and use record_shard? I think it will be clearer
crates/apollo_propeller/src/message_processor.rs line 197 at r5 (raw file):
let (result, returned_validator, unit) = Self::validate_blocking(validator, sender, unit).await;
Don't we want to process multiple shards simultaneously? same for update_state
If the answer is yes, just add a TODO to fix this for now
crates/apollo_propeller/src/message_processor.rs line 206 at r5 (raw file):
} self.maybe_broadcast_my_shard(&unit, &state);
I think all these can be one function of state called add_unit
It should return an enum called AddUnitOutput with variants: NoOp, BroadcastUnit(unit), EmitMessage(message)
maybe another variant of ShouldReconstruct(Vec<Shard>, bool (should_broadcast_my_shard))
crates/apollo_propeller/src/message_processor.rs line 240 at r5 (raw file):
fn maybe_broadcast_my_shard(&self, unit: &PropellerUnit, state: &ReconstructionState) { if unit.index() == self.my_shard_index && !state.broadcast_my_shard() {
Why is the right most condition required if we validated no duplications?
crates/apollo_propeller/src/message_processor.rs line 266 at r5 (raw file):
state: &mut ReconstructionState, ) -> ControlFlow<()> { if state.is_reconstructed() {
I don't like how this function does 2 completely different things depending on the state
If you accept my suggestion above to move all this logic into the state, I think it will solve some if not all of this issue
crates/apollo_propeller/src/message_processor.rs line 280 at r5 (raw file):
match self.reconstruct_blocking(state).await { Ok(success) => self.handle_reconstruction_success(success, shard_count, state),
If you accept my rename suggestion, don't forget to rename success as well
crates/apollo_propeller/src/message_processor.rs line 318 at r5 (raw file):
} fn handle_reconstruction_success(
If you accept my rename suggestion, don't forget to rename this as well
2dbf031 to
fafe1a0
Compare
d2aa768 to
67a32fd
Compare
67a32fd to
1af42ad
Compare
sirandreww-starkware
left a comment
There was a problem hiding this comment.
@sirandreww-starkware made 14 comments.
Reviewable status: 2 of 4 files reviewed, 14 unresolved discussions (waiting on guy-starkware, noamsp-starkware, and ShahakShama).
crates/apollo_propeller/src/message_processor.rs line 71 at r1 (raw file):
Previously, ShahakShama wrote…
You can add a comment
I did where the else would be
crates/apollo_propeller/src/message_processor.rs line 186 at r1 (raw file):
Previously, ShahakShama wrote…
Then a different consideration: remove maybe_broadcast_my_shard and put the 2 lines of code it has inside the main loop
Why? this isn't better just different
crates/apollo_propeller/src/message_processor.rs line 29 at r5 (raw file):
Previously, ShahakShama wrote…
Rename to ReconstructionOutput
Done
crates/apollo_propeller/src/message_processor.rs line 39 at r5 (raw file):
Previously, ShahakShama wrote…
Rename to did_broadcast_my_shard or broadcasted_my_shard
Done
crates/apollo_propeller/src/message_processor.rs line 43 at r5 (raw file):
Previously, ShahakShama wrote…
Add a short documentation saying that the message was constructed but not yet delivered to the application (I forgot that was the algorithm on initial review)
Done
crates/apollo_propeller/src/message_processor.rs line 45 at r5 (raw file):
Previously, ShahakShama wrote…
Why do you need this PostConstruction? once you perform the construction you broadcast your shard if needed
Removed
crates/apollo_propeller/src/message_processor.rs line 46 at r5 (raw file):
Previously, ShahakShama wrote…
Why not just one counter of num_received_shards?
Removed
crates/apollo_propeller/src/message_processor.rs line 71 at r5 (raw file):
Previously, ShahakShama wrote…
Can you erase this function and use record_shard? I think it will be clearer
Done
crates/apollo_propeller/src/message_processor.rs line 197 at r5 (raw file):
Previously, ShahakShama wrote…
Don't we want to process multiple shards simultaneously? same for update_state
If the answer is yes, just add a TODO to fix this for now
No for both, the validation is more efficient when done sequentially and the update state is efficient enough. Plus each message is being handled in parallel. Added a TODO so we can have this conversation later.
crates/apollo_propeller/src/message_processor.rs line 206 at r5 (raw file):
Previously, ShahakShama wrote…
I think all these can be one function of state called add_unit
It should return an enum called AddUnitOutput with variants: NoOp, BroadcastUnit(unit), EmitMessage(message)
maybe another variant ofShouldReconstruct(Vec<Shard>, bool (should_broadcast_my_shard))
We talked about this over zoom and we agreed to keep the current design. Feel free to add other notes
crates/apollo_propeller/src/message_processor.rs line 240 at r5 (raw file):
Previously, ShahakShama wrote…
Why is the right most condition required if we validated no duplications?
Because if we're in post construction we would have already broadcast our state.
crates/apollo_propeller/src/message_processor.rs line 266 at r5 (raw file):
Previously, ShahakShama wrote…
I don't like how this function does 2 completely different things depending on the state
If you accept my suggestion above to move all this logic into the state, I think it will solve some if not all of this issue
Not sure what to do here, any Ideas?
crates/apollo_propeller/src/message_processor.rs line 280 at r5 (raw file):
Previously, ShahakShama wrote…
If you accept my rename suggestion, don't forget to rename success as well
Done
crates/apollo_propeller/src/message_processor.rs line 318 at r5 (raw file):
Previously, ShahakShama wrote…
If you accept my rename suggestion, don't forget to rename this as well
Done
sirandreww-starkware
left a comment
There was a problem hiding this comment.
@sirandreww-starkware reviewed 4 files and all commit messages.
Reviewable status: all files reviewed, 14 unresolved discussions (waiting on noamsp-starkware and ShahakShama).
Note
Medium Risk
Touches core message validation/reconstruction and asynchronous control flow, so logic bugs could cause missed delivery or premature finalization; changes are contained to
apollo_propellerbut affect correctness under load/timeouts.Overview
MessageProcessoris expanded from a timeout-only stub into a full shard-processing loop: it validates incomingPropellerUnits viaUnitValidatoron blocking threads, tracks reconstruction progress, and triggers erasure-code reconstruction oncePropellerScheduleManager’s build threshold is met.After reconstruction it may synthesize and broadcast the node’s own shard (if not already seen), keeps collecting shards until the access threshold is reached, then emits
Event::MessageReceived; it also emitsMessageReconstructionFailedon reconstruction errors andMessageTimeouton overall timeout, and standardizes the engine command toSendUnitToPeerswith randomized peer order (addsrand).Written by Cursor Bugbot for commit 1af42ad. This will update automatically on new commits. Configure here.