Skip to content

Commit a04084b

Browse files
committed
implement SimUdp
1 parent 2648575 commit a04084b

File tree

2 files changed

+83
-1
lines changed

2 files changed

+83
-1
lines changed

pulsebeam-runtime/src/net.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub struct SendPacket {
4141
/// UnifiedSocket enum for different transport types
4242
pub enum UnifiedSocket<'a> {
4343
Udp(UdpTransport<'a>),
44+
SimUdp(SimUdpTransport),
4445
}
4546

4647
impl<'a> UnifiedSocket<'a> {
@@ -52,6 +53,7 @@ impl<'a> UnifiedSocket<'a> {
5253
) -> io::Result<Self> {
5354
let sock = match transport {
5455
Transport::Udp => Self::Udp(UdpTransport::bind(addr, external_addr)?),
56+
Transport::SimUdp => Self::SimUdp(SimUdpTransport::bind(addr, external_addr).await?),
5557
_ => todo!(),
5658
};
5759
tracing::debug!("bound to {addr} ({transport:?})");
@@ -61,6 +63,7 @@ impl<'a> UnifiedSocket<'a> {
6163
pub fn local_addr(&self) -> SocketAddr {
6264
match self {
6365
Self::Udp(inner) => inner.local_addr(),
66+
Self::SimUdp(inner) => inner.local_addr(),
6467
}
6568
}
6669

@@ -69,6 +72,7 @@ impl<'a> UnifiedSocket<'a> {
6972
pub async fn readable(&self) -> io::Result<()> {
7073
match self {
7174
Self::Udp(inner) => inner.readable().await?,
75+
Self::SimUdp(inner) => inner.readable().await?,
7276
}
7377
Ok(())
7478
}
@@ -78,6 +82,7 @@ impl<'a> UnifiedSocket<'a> {
7882
pub async fn writable(&self) -> io::Result<()> {
7983
match self {
8084
Self::Udp(inner) => inner.writable().await?,
85+
Self::SimUdp(inner) => inner.writable().await?,
8186
}
8287
Ok(())
8388
}
@@ -93,6 +98,7 @@ impl<'a> UnifiedSocket<'a> {
9398
) -> std::io::Result<usize> {
9499
match self {
95100
Self::Udp(inner) => inner.try_recv_batch(packets, batch_size),
101+
Self::SimUdp(inner) => inner.try_recv_batch(packets, batch_size),
96102
}
97103
}
98104

@@ -103,6 +109,7 @@ impl<'a> UnifiedSocket<'a> {
103109
pub fn try_send_batch(&self, packets: &[SendPacket]) -> std::io::Result<usize> {
104110
match self {
105111
Self::Udp(inner) => inner.try_send_batch(packets),
112+
Self::SimUdp(inner) => inner.try_send_batch(packets),
106113
}
107114
}
108115
}
@@ -217,6 +224,81 @@ impl<'a> UdpTransport<'a> {
217224
}
218225
}
219226

227+
pub struct SimUdpTransport {
228+
sock: turmoil::net::UdpSocket,
229+
local_addr: SocketAddr,
230+
}
231+
232+
impl SimUdpTransport {
233+
pub const MTU: usize = 1500;
234+
235+
pub async fn bind(addr: SocketAddr, external_addr: Option<SocketAddr>) -> io::Result<Self> {
236+
let sock = turmoil::net::UdpSocket::bind(addr).await?;
237+
let local_addr = external_addr.unwrap_or(sock.local_addr()?);
238+
239+
Ok(SimUdpTransport { sock, local_addr })
240+
}
241+
242+
pub fn local_addr(&self) -> SocketAddr {
243+
self.local_addr
244+
}
245+
246+
#[inline]
247+
pub async fn readable(&self) -> io::Result<()> {
248+
self.sock.readable().await
249+
}
250+
251+
#[inline]
252+
pub async fn writable(&self) -> io::Result<()> {
253+
self.sock.writable().await
254+
}
255+
256+
#[inline]
257+
pub fn try_recv_batch(
258+
&self,
259+
packets: &mut Vec<RecvPacket>,
260+
batch_size: usize,
261+
) -> std::io::Result<usize> {
262+
let mut count = 0;
263+
let mut buf = [0u8; Self::MTU];
264+
265+
while count < batch_size {
266+
match self.sock.try_recv_from(&mut buf) {
267+
Ok((len, src)) => {
268+
packets.push(RecvPacket {
269+
buf: Bytes::copy_from_slice(&buf[..len]),
270+
src,
271+
dst: self.local_addr,
272+
});
273+
count += 1;
274+
}
275+
Err(e) if e.kind() == ErrorKind::WouldBlock => break,
276+
Err(e) => {
277+
tracing::warn!("Receive packet failed: {}", e);
278+
return Err(e);
279+
}
280+
}
281+
}
282+
Ok(count)
283+
}
284+
285+
#[inline]
286+
pub fn try_send_batch(&self, packets: &[SendPacket]) -> std::io::Result<usize> {
287+
let mut count = 0;
288+
for packet in packets.iter() {
289+
match self.sock.try_send_to(&packet.buf, packet.dst) {
290+
Ok(_) => count += 1,
291+
Err(e) if e.kind() == ErrorKind::WouldBlock => break,
292+
Err(e) => {
293+
tracing::warn!("Send packet to {} failed: {}", packet.dst, e);
294+
continue;
295+
}
296+
}
297+
}
298+
Ok(count)
299+
}
300+
}
301+
220302
#[cfg(test)]
221303
mod test {
222304
use super::*;

pulsebeam/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ pub async fn run(cpu_rt: rt::Runtime) {
7777
external_ip,
7878
);
7979

80-
node::run(cpu_rt, external_addr, unified_socket, http_socket, true)
80+
node::run(cpu_rt, external_addr, unified_socket, http_socket, false)
8181
.await
8282
.unwrap();
8383
}

0 commit comments

Comments
 (0)