Skip to content

Commit 5fc3314

Browse files
authored
Read socket rework (#35)
* 1gbps throughput * warnings * tokio mpsc * migrate sender to mpsc * reducing acks sent * fix server accept logic * cleanup + minor speedup * codecov bring back bins --------- Co-authored-by: Frank Lee <>
1 parent 3424b6f commit 5fc3314

File tree

16 files changed

+772
-650
lines changed

16 files changed

+772
-650
lines changed

.github/workflows/bluefin.yml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ jobs:
1414
runs-on: ${{ matrix.os }}
1515
strategy:
1616
matrix:
17-
os: [ubuntu-latest, macos-latest]
17+
os: [ ubuntu-latest, macos-latest ]
1818
include:
1919
- os: ubuntu-latest
2020
target: Linux
@@ -23,11 +23,11 @@ jobs:
2323
target: Macos
2424

2525
steps:
26-
- uses: actions/checkout@v3
27-
- name: Build
28-
run: cargo build --verbose
29-
- name: Run tests
30-
run: cargo test --verbose
26+
- uses: actions/checkout@v3
27+
- name: Build
28+
run: cargo build --verbose
29+
- name: Run tests
30+
run: cargo test --verbose
3131

3232
coverage:
3333
runs-on: ubuntu-latest
@@ -40,7 +40,7 @@ jobs:
4040
- name: Install cargo-llvm-cov
4141
uses: taiki-e/install-action@cargo-llvm-cov
4242
- name: Generate code coverage
43-
run: cargo llvm-cov --all-features --workspace --lcov --ignore-filename-regex "error.rs|*/bin/*.rs" --output-path lcov.info
43+
run: cargo llvm-cov --all-features --workspace --lcov --ignore-filename-regex "error.rs" --output-path lcov.info
4444
- name: Upload coverage to Codecov
4545
uses: codecov/codecov-action@v3
4646
with:

Cargo.toml

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,14 @@ repository = "https://github.com/franklee26/bluefin"
99
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
1010

1111
[dependencies]
12-
etherparse = "0.15.0"
1312
local-ip-address = "0.6.3"
1413
rand = "0.8.5"
1514
rstest = "0.23.0"
16-
thiserror = "2.0.3"
17-
tokio = { version = "1.41.1", features = ["full", "tracing"] }
15+
thiserror = "2.0.9"
16+
tokio = { version = "1.42.0", features = ["full", "tracing"] }
1817
console-subscriber = "0.4.1"
1918
libc = "0.2.164"
20-
sysctl = "0.6.0"
19+
socket2 = "0.5.8"
2120

2221
[dev-dependencies]
2322
local-ip-address = "0.6.3"
@@ -38,7 +37,7 @@ path = "src/bin/server.rs"
3837
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage,coverage_nightly)', 'cfg(kani)'] }
3938

4039
[profile.release]
41-
opt-level = 3
40+
opt-level = 3
4241
codegen-units = 1
4342
lto = "fat"
4443
debug = true

src/bin/client.rs

Lines changed: 40 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,54 +16,56 @@ async fn main() -> BluefinResult<()> {
1616
let ports = [1320, 1322, 1323, 1324, 1325];
1717
let mut tasks = vec![];
1818
for ix in 0..2 {
19-
// sleep(Duration::from_secs(3)).await;
20-
let task = spawn(async move {
21-
let mut total_bytes = 0;
22-
let mut client = BluefinClient::new(std::net::SocketAddr::V4(SocketAddrV4::new(
19+
let mut client = BluefinClient::new(std::net::SocketAddr::V4(SocketAddrV4::new(
20+
Ipv4Addr::new(127, 0, 0, 1),
21+
ports[ix],
22+
)));
23+
if let Ok(mut conn) = client
24+
.connect(std::net::SocketAddr::V4(SocketAddrV4::new(
2325
Ipv4Addr::new(127, 0, 0, 1),
24-
ports[ix],
25-
)));
26-
let mut conn = client
27-
.connect(std::net::SocketAddr::V4(SocketAddrV4::new(
28-
Ipv4Addr::new(127, 0, 0, 1),
29-
1318,
30-
)))
31-
.await?;
26+
1318,
27+
)))
28+
.await
29+
{
30+
let task = spawn(async move {
31+
let mut total_bytes = 0;
3232

33-
let bytes = [1, 2, 3, 4, 5, 6, 7];
34-
let mut size = conn.send(&bytes).await?;
35-
total_bytes += size;
36-
println!("Sent {} bytes", size);
33+
let bytes = [1, 2, 3, 4, 5, 6, 7];
34+
let mut size = conn.send(&bytes)?;
35+
total_bytes += size;
36+
println!("Sent {} bytes", size);
3737

38-
size = conn.send(&[12, 12, 12, 12, 12, 12]).await?;
39-
total_bytes += size;
40-
println!("Sent {} bytes", size);
38+
size = conn.send(&[12, 12, 12, 12, 12, 12])?;
39+
total_bytes += size;
40+
println!("Sent {} bytes", size);
4141

42-
size = conn.send(&[13; 100]).await?;
43-
total_bytes += size;
44-
println!("Sent {} bytes", size);
42+
size = conn.send(&[13; 100])?;
43+
total_bytes += size;
44+
println!("Sent {} bytes", size);
4545

46-
sleep(Duration::from_secs(1)).await;
46+
sleep(Duration::from_secs(1)).await;
4747

48-
size = conn.send(&[14, 14, 14, 14, 14, 14]).await?;
49-
total_bytes += size;
50-
println!("Sent {} bytes", size);
48+
size = conn.send(&[14, 14, 14, 14, 14, 14])?;
49+
total_bytes += size;
50+
println!("Sent {} bytes", size);
5151

52-
for ix in 0..5000000 {
53-
// let my_array: [u8; 32] = rand::random();
5452
let my_array = [0u8; 1500];
55-
size = conn.send(&my_array).await?;
56-
total_bytes += size;
57-
if ix % 3000 == 0 {
58-
sleep(Duration::from_millis(3)).await;
53+
for ix in 0..10000000 {
54+
// let my_array: [u8; 32] = rand::random();
55+
size = conn.send(&my_array)?;
56+
total_bytes += size;
57+
if ix % 4000 == 0 {
58+
sleep(Duration::from_millis(1)).await;
59+
}
5960
}
60-
}
61-
println!("Sent {} bytes", total_bytes);
62-
sleep(Duration::from_secs(2)).await;
61+
println!("Sent {} bytes", total_bytes);
62+
sleep(Duration::from_secs(3)).await;
6363

64-
Ok::<(), BluefinError>(())
65-
});
66-
tasks.push(task);
64+
Ok::<(), BluefinError>(())
65+
});
66+
tasks.push(task);
67+
sleep(Duration::from_millis(1)).await;
68+
}
6769
}
6870

6971
for t in tasks {

src/bin/server.rs

Lines changed: 51 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
2+
use bluefin::{net::server::BluefinServer, utils::common::BluefinResult};
23
use std::{
34
cmp::{max, min},
45
net::{Ipv4Addr, SocketAddrV4},
56
time::Instant,
67
};
7-
8-
use bluefin::{net::server::BluefinServer, utils::common::BluefinResult};
98
use tokio::{spawn, task::JoinSet};
109

1110
#[cfg_attr(coverage_nightly, coverage(off))]
@@ -24,68 +23,93 @@ async fn run() -> BluefinResult<()> {
2423
Ipv4Addr::new(127, 0, 0, 1),
2524
1318,
2625
)));
27-
server.set_num_reader_workers(300)?;
26+
server.set_num_reader_workers(3)?;
2827
server.bind().await?;
2928
let mut join_set = JoinSet::new();
3029

31-
const MAX_NUM_CONNECTIONS: usize = 2;
32-
for conn_num in 0..MAX_NUM_CONNECTIONS {
33-
let mut s = server.clone();
34-
let _num = conn_num;
30+
let mut _num = 0;
31+
while let Ok(mut conn) = server.accept().await {
3532
let _ = join_set.spawn(async move {
36-
let _conn = s.accept().await;
37-
38-
match _conn {
39-
Ok(mut conn) => {
4033
let mut total_bytes = 0;
41-
let mut recv_bytes = [0u8; 80000];
34+
let mut recv_bytes = [0u8; 10000];
4235
let mut min_bytes = usize::MAX;
4336
let mut max_bytes = 0;
44-
let mut iteration = 1;
37+
let mut iteration: i64 = 1;
4538
let mut num_iterations_without_print = 0;
39+
let mut max_throughput = 0.0;
40+
let mut min_throughput = f64::MAX;
4641
let now = Instant::now();
4742
loop {
48-
let size = conn.recv(&mut recv_bytes, 80000).await.unwrap();
43+
let size = conn.recv(&mut recv_bytes, 10000).await.unwrap();
4944
total_bytes += size;
5045
min_bytes = min(size, min_bytes);
5146
max_bytes = max(size, max_bytes);
5247
// eprintln!("read {} bytes --- total bytes: {}", size, total_bytes);
5348

5449
/*
5550
println!(
56-
"({:x}_{:x}) >>> Received: {:?} (total: {})",
51+
"({:x}_{:x}) >>> Received: {} bytes",
5752
conn.src_conn_id,
5853
conn.dst_conn_id,
59-
&recv_bytes[..size],
6054
total_bytes
6155
);
6256
*/
6357
num_iterations_without_print += 1;
64-
if total_bytes >= 100000 && num_iterations_without_print == 200 {
58+
if total_bytes >= 1000000 && num_iterations_without_print == 3500 {
6559
let elapsed = now.elapsed().as_secs();
60+
if elapsed == 0 {
61+
eprintln!("(#{})Total bytes: {} (0s???)", _num, total_bytes);
62+
num_iterations_without_print = 0;
63+
continue;
64+
}
6665
let through_put = u64::try_from(total_bytes).unwrap() / elapsed;
66+
let through_put_mb = through_put as f64 / 1e6;
6767
let avg_recv_bytes: f64 = total_bytes as f64 / iteration as f64;
68-
eprintln!(
69-
"{} {:.1} kb/s or {:.1} mb/s (read {:.1} kb/iteration, min: {:.1} kb, max: {:.1} kb)",
68+
69+
if through_put_mb > max_throughput {
70+
max_throughput = through_put_mb;
71+
}
72+
73+
if through_put_mb < min_throughput {
74+
min_throughput = through_put_mb;
75+
}
76+
77+
if through_put_mb < 1000.0 {
78+
eprintln!(
79+
"{} {:.1} kb/s or {:.1} mb/s (read {:.1} kb/iteration, min: {:.1} kb, max: {:.1} kb) (max {:.1} mb/s, min {:.1} mb/s)",
7080
_num,
7181
through_put as f64 / 1e3,
72-
through_put as f64 / 1e6,
82+
through_put_mb,
7383
avg_recv_bytes / 1e3,
7484
min_bytes as f64 / 1e3,
75-
max_bytes as f64 / 1e3
85+
max_bytes as f64 / 1e3,
86+
max_throughput,
87+
min_throughput
7688
);
77-
num_iterations_without_print = 0;
89+
} else {
90+
eprintln!(
91+
"{} {:.2} gb/s (read {:.1} kb/iter, min: {:.1} kb, max: {:.1} kb) (max {:.2} gb/s, min {:.1} kb/s)",
92+
_num,
93+
through_put_mb / 1e3,
94+
avg_recv_bytes / 1e3,
95+
min_bytes as f64 / 1e3,
96+
max_bytes as f64 / 1e3,
97+
max_throughput / 1e3,
98+
min_throughput
99+
);
100+
}
101+
num_iterations_without_print = 0;
78102
// break;
79103
}
80104
iteration += 1;
81105
}
82-
}
83-
Err(e) => {
84-
eprintln!("Could not accept connection due to error: {:?}", e);
85-
}
86-
}
87106
});
107+
_num += 1;
108+
if _num >= 2 {
109+
break;
110+
}
88111
}
112+
89113
join_set.join_all().await;
90114
Ok(())
91115
}

src/core/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,17 @@ pub mod error;
55
pub mod header;
66
pub mod packet;
77

8+
pub trait Extract: Default {
9+
/// Replace self with default and returns the initial value.
10+
fn extract(&mut self) -> Self;
11+
}
12+
13+
impl<T: Default> Extract for T {
14+
fn extract(&mut self) -> Self {
15+
std::mem::replace(self, T::default())
16+
}
17+
}
18+
819
pub trait Serialisable {
920
fn serialise(&self) -> Vec<u8>;
1021
fn deserialise(bytes: &[u8]) -> Result<Self, BluefinError>

src/core/packet.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,19 @@ impl Serialisable for BluefinPacket {
5151
}
5252
}
5353

54+
impl Default for BluefinPacket {
55+
#[allow(invalid_value)]
56+
#[inline]
57+
fn default() -> Self {
58+
// SAFETY
59+
// Actually, this isn't safe and access to this kind of zero'd value would result
60+
// in panics. There does not exist a 'default' bluefin packet. Therefore, the
61+
// purpose of this is to quickly instantiate a 'filler' bluefin packet BUT this
62+
// default value should NEVER be read/used.
63+
unsafe { std::mem::zeroed() }
64+
}
65+
}
66+
5467
impl BluefinPacket {
5568
#[inline]
5669
pub fn builder() -> BluefinPacketBuilder {

src/net/client.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ use std::{
77
use rand::Rng;
88
use tokio::{net::UdpSocket, sync::RwLock};
99

10+
use super::{
11+
connection::{BluefinConnection, ConnectionBuffer, ConnectionManager},
12+
AckBuffer, ConnectionManagedBuffers,
13+
};
14+
use crate::utils::get_udp_socket;
1015
use crate::{
1116
core::{context::BluefinHost, error::BluefinError, header::PacketType, Serialisable},
1217
net::{
@@ -15,12 +20,7 @@ use crate::{
1520
utils::common::BluefinResult,
1621
};
1722

18-
use super::{
19-
connection::{BluefinConnection, ConnectionBuffer, ConnectionManager},
20-
AckBuffer, ConnectionManagedBuffers,
21-
};
22-
23-
const NUM_TX_WORKERS_FOR_CLIENT_DEFAULT: u16 = 5;
23+
const NUM_TX_WORKERS_FOR_CLIENT_DEFAULT: u16 = 1;
2424

2525
pub struct BluefinClient {
2626
socket: Option<Arc<UdpSocket>>,
@@ -53,7 +53,7 @@ impl BluefinClient {
5353
}
5454

5555
pub async fn connect(&mut self, dst_addr: SocketAddr) -> BluefinResult<BluefinConnection> {
56-
let socket = Arc::new(UdpSocket::bind(self.src_addr).await?);
56+
let socket = Arc::new(get_udp_socket(self.src_addr)?);
5757
self.socket = Some(Arc::clone(&socket));
5858
self.dst_addr = Some(dst_addr);
5959

@@ -137,8 +137,8 @@ impl BluefinClient {
137137
packet_number + 2,
138138
Arc::clone(&conn_buffer),
139139
Arc::clone(&ack_buff),
140-
Arc::clone(self.socket.as_ref().unwrap()),
141140
self.dst_addr.unwrap(),
141+
self.src_addr,
142142
))
143143
}
144144
}

0 commit comments

Comments
 (0)