Skip to content

Commit e09be2a

Browse files
committed
raptorcast: expose priority api for publishing to validators
this change doesn't use priority api yet, also we never publish to secondary nodes with priority, therefore no similar api is added for them
1 parent e4277e6 commit e09be2a

File tree

14 files changed

+347
-155
lines changed

14 files changed

+347
-155
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

monad-dataplane/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ edition = "2021"
99
bench = false
1010

1111
[dependencies]
12+
monad-types = { workspace = true }
13+
1214
bytes = { workspace = true }
1315
env_logger = { workspace = true }
1416
futures = { workspace = true }

monad-dataplane/src/lib.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use std::{
2727
use addrlist::Addrlist;
2828
use bytes::Bytes;
2929
use futures::channel::oneshot;
30+
use monad_types::UdpPriority;
3031
use monoio::{spawn, time::Instant, IoUringDriver, RuntimeBuilder};
3132
use tcp::{TcpConfig, TcpControl, TcpRateLimit};
3233
use tokio::sync::mpsc::{self, error::TrySendError};
@@ -40,13 +41,6 @@ pub mod udp;
4041

4142
pub(crate) use udp::UdpMessageType;
4243

43-
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
44-
#[repr(usize)]
45-
pub enum UdpPriority {
46-
High = 0,
47-
Regular = 1,
48-
}
49-
5044
pub struct DataplaneBuilder {
5145
local_addr: SocketAddr,
5246
trusted_addresses: Vec<IpAddr>,

monad-dataplane/src/udp.rs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use monoio::{net::udp::UdpSocket, spawn, time};
2727
use tokio::sync::mpsc;
2828
use tracing::{debug, error, trace, warn};
2929

30-
use super::{RecvUdpMsg, UdpMsg, UdpPriority};
30+
use super::{RecvUdpMsg, UdpMsg};
3131
use crate::buffer_ext::SocketBufferExt;
3232

3333
#[derive(Debug, Clone, Copy)]
@@ -36,18 +36,6 @@ pub(crate) enum UdpMessageType {
3636
Direct,
3737
}
3838

39-
impl TryFrom<usize> for UdpPriority {
40-
type Error = &'static str;
41-
42-
fn try_from(value: usize) -> Result<Self, Self::Error> {
43-
match value {
44-
0 => Ok(UdpPriority::High),
45-
1 => Ok(UdpPriority::Regular),
46-
_ => Err("invalid priority index"),
47-
}
48-
}
49-
}
50-
5139
struct PriorityQueues {
5240
queues: [VecDeque<UdpMsg>; 2],
5341
}

monad-dataplane/tests/tests.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ use futures::{channel::oneshot, executor, FutureExt};
2626
use monad_dataplane::{
2727
tcp::tx::{MSG_WAIT_TIMEOUT, QUEUED_MESSAGE_LIMIT},
2828
udp::DEFAULT_SEGMENT_SIZE,
29-
BroadcastMsg, DataplaneBuilder, RecvUdpMsg, TcpMsg, UdpPriority, UnicastMsg,
29+
BroadcastMsg, DataplaneBuilder, RecvUdpMsg, TcpMsg, UnicastMsg,
3030
};
31+
use monad_types::UdpPriority;
3132
use ntest::timeout;
3233
use rand::Rng;
3334
use rstest::*;

monad-executor-glue/src/lib.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ pub enum RouterCommand<ST: CertificateSignatureRecoverable, OM> {
6060
target: RouterTarget<CertificateSignaturePubKey<ST>>,
6161
message: OM,
6262
},
63+
PublishWithPriority {
64+
// NOTE(dshulyak) priority for tcp messages is ignored
65+
target: RouterTarget<CertificateSignaturePubKey<ST>>,
66+
message: OM,
67+
priority: monad_types::UdpPriority,
68+
},
6369
PublishToFullNodes {
6470
epoch: Epoch, // Epoch gets embedded into the raptorcast message
6571
message: OM,
@@ -87,6 +93,15 @@ impl<ST: CertificateSignatureRecoverable, OM> Debug for RouterCommand<ST, OM> {
8793
Self::Publish { target, message: _ } => {
8894
f.debug_struct("Publish").field("target", target).finish()
8995
}
96+
Self::PublishWithPriority {
97+
target,
98+
message: _,
99+
priority,
100+
} => f
101+
.debug_struct("PublishWithPriority")
102+
.field("target", target)
103+
.field("priority", priority)
104+
.finish(),
90105
Self::PublishToFullNodes { epoch, message: _ } => f
91106
.debug_struct("PublishToFullNodes")
92107
.field("epoch", epoch)

monad-mock-swarm/src/mock.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,13 @@ impl<S: SwarmRelation> Executor for MockExecutor<S> {
400400
RouterCommand::PublishToFullNodes { .. } => {
401401
// TODO
402402
}
403+
RouterCommand::PublishWithPriority {
404+
target,
405+
message,
406+
priority: _,
407+
} => {
408+
self.router.send_outbound(self.tick, target, message);
409+
}
403410
}
404411
}
405412
}

monad-peer-disc-swarm/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,11 @@ impl<S: PeerDiscSwarmRelation> Executor for MockPeerDiscExecutor<S> {
148148
RouterCommand::Publish { target, message } => {
149149
self.router.send_outbound(self.tick, target, message)
150150
}
151+
RouterCommand::PublishWithPriority {
152+
target,
153+
message,
154+
priority: _,
155+
} => self.router.send_outbound(self.tick, target, message),
151156
RouterCommand::AddEpochValidatorSet { .. } => {}
152157
RouterCommand::UpdateCurrentRound(..) => {}
153158
RouterCommand::GetPeers => {}

0 commit comments

Comments
 (0)