Skip to content

Commit 5428236

Browse files
apollo_propeller: add MessageProcessor scaffolding
1 parent 3e54bba commit 5428236

File tree

4 files changed

+101
-0
lines changed

4 files changed

+101
-0
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/apollo_propeller/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ prost.workspace = true
1919
reed-solomon-simd.workspace = true
2020
sha2.workspace = true
2121
thiserror.workspace = true
22+
tokio.workspace = true
2223
tracing.workspace = true
2324
unsigned-varint.workspace = true
2425

crates/apollo_propeller/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ pub mod padding;
99
mod padding_test;
1010
pub mod protocol;
1111
// TODO(AndrewL): Consider renaming this to `erasure_coding` or `error_correction_code`.
12+
pub mod message_processor;
1213
pub mod reed_solomon;
1314
#[cfg(test)]
1415
mod reed_solomon_test;
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
use std::ops::ControlFlow;
2+
use std::sync::Arc;
3+
use std::time::Duration;
4+
5+
use libp2p::identity::{PeerId, PublicKey};
6+
use tokio::sync::mpsc;
7+
use tokio::time::sleep_until;
8+
use tracing::{debug, trace};
9+
10+
use crate::tree::PropellerScheduleManager;
11+
use crate::types::{Channel, Event, MessageRoot};
12+
use crate::unit::PropellerUnit;
13+
use crate::ShardIndex;
14+
15+
pub type UnitToValidate = (PeerId, PropellerUnit);
16+
17+
#[derive(Debug)]
18+
pub enum EventStateManagerToEngine {
19+
BehaviourEvent(Event),
20+
Finalized { channel: Channel, publisher: PeerId, message_root: MessageRoot },
21+
BroadcastUnit { unit: PropellerUnit, peers: Vec<PeerId> },
22+
}
23+
24+
/// Message processor that handles validation and state management for a single message.
25+
pub struct MessageProcessor {
26+
pub channel: Channel,
27+
pub publisher: PeerId,
28+
pub message_root: MessageRoot,
29+
pub my_shard_index: ShardIndex,
30+
31+
pub publisher_public_key: PublicKey,
32+
pub tree_manager: Arc<PropellerScheduleManager>,
33+
pub local_peer_id: PeerId,
34+
35+
// Unbounded because these bridge sync -> async contexts and shard messages from the network
36+
// must not be dropped or delayed.
37+
pub unit_rx: mpsc::UnboundedReceiver<UnitToValidate>,
38+
pub engine_tx: mpsc::UnboundedSender<EventStateManagerToEngine>,
39+
40+
pub timeout: Duration,
41+
}
42+
43+
impl MessageProcessor {
44+
pub async fn run(mut self) {
45+
debug!(
46+
"[MSG_PROC] Started for channel={:?} publisher={:?} root={:?}",
47+
self.channel, self.publisher, self.message_root
48+
);
49+
50+
// Local state variables
51+
let deadline = tokio::time::Instant::now() + self.timeout;
52+
53+
// TODO(AndrewL): remove this
54+
#[allow(clippy::never_loop)]
55+
loop {
56+
tokio::select! {
57+
_ = sleep_until(deadline) => {
58+
let _ = self.emit_timeout_and_finalize().await;
59+
break;
60+
}
61+
}
62+
}
63+
64+
debug!(
65+
"[MSG_PROC] Stopped for channel={:?} publisher={:?} root={:?}",
66+
self.channel, self.publisher, self.message_root
67+
);
68+
}
69+
70+
async fn emit_timeout_and_finalize(&mut self) -> ControlFlow<()> {
71+
trace!(
72+
"[MSG_PROC] Timeout reached for channel={:?} publisher={:?} root={:?}",
73+
self.channel,
74+
self.publisher,
75+
self.message_root
76+
);
77+
78+
self.emit_and_finalize(Event::MessageTimeout {
79+
channel: self.channel,
80+
publisher: self.publisher,
81+
message_root: self.message_root,
82+
})
83+
}
84+
85+
fn emit_and_finalize(&self, event: Event) -> ControlFlow<()> {
86+
self.engine_tx
87+
.send(EventStateManagerToEngine::BehaviourEvent(event))
88+
.expect("Engine task has exited");
89+
self.engine_tx
90+
.send(EventStateManagerToEngine::Finalized {
91+
channel: self.channel,
92+
publisher: self.publisher,
93+
message_root: self.message_root,
94+
})
95+
.expect("Engine task has exited");
96+
ControlFlow::Break(())
97+
}
98+
}

0 commit comments

Comments
 (0)