Skip to content

Commit d078293

Browse files
authored
ntex optimizations (#9406)
* cpu affinity * ntex optimizations * remove smallvec * wip
1 parent bb5e72a commit d078293

File tree

9 files changed

+149
-82
lines changed

9 files changed

+149
-82
lines changed

frameworks/Rust/ntex/Cargo.toml

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@ default = []
3434
tokio = ["ntex/tokio"]
3535

3636
# compio runtime
37-
compio = ["ntex/compio", ]
37+
compio = ["ntex/compio"]
3838

3939
[dependencies]
40-
ntex = "2.4"
41-
ntex-compio = "0.1.2"
40+
ntex = "2.8"
41+
ntex-compio = "0.2"
4242
ntex-bytes = { version = "0.1.21", features=["simd"] }
4343
mimalloc = { version = "0.1.25", default-features = false }
4444
snmalloc-rs = { version = "0.3.3", features = ["native-cpu"] }
@@ -47,16 +47,18 @@ buf-min = { version = "0.7", features = ["ntex-bytes"] }
4747
env_logger = "0.11"
4848
nanorand = { version = "0.7", default-features = false, features = ["std", "wyrand", "tls"] }
4949
atoi = "2.0"
50-
num_cpus = "1.16"
51-
smallvec = "1.13"
50+
core_affinity = "0.8"
5251
futures = "0.3"
53-
serde = { version = "1.0", features = ["derive"] }
54-
serde_json = "1.0"
52+
sonic-rs = "0.3.16"
53+
serde = { version = "1", features = ["derive"] }
54+
serde_json = "1"
5555
log = { version = "0.4", features = ["release_max_level_off"] }
56-
compio-driver = { version = "0.4", features = ["io-uring", "io-uring-socket"]}
5756
tok_io = {version = "1", package = "tokio" }
5857
tokio-postgres = { git="https://github.com/fafhrd91/postgres.git", branch="ntex-2" }
5958

59+
[target.'cfg(target_os = "linux")'.dependencies]
60+
compio-driver = { version = "*", features = ["io-uring"]}
61+
6062
[profile.release]
6163
opt-level = 3
6264
codegen-units = 1

frameworks/Rust/ntex/src/db.rs

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,31 @@
22
use std::{borrow::Cow, cell::RefCell, fmt::Write as FmtWrite};
33

44
use nanorand::{Rng, WyRand};
5-
use ntex::util::{BufMut, Bytes, BytesMut};
6-
use smallvec::SmallVec;
5+
use ntex::util::{Bytes, BytesMut};
76
use tokio_postgres::types::ToSql;
87
use tokio_postgres::{connect, Client, Statement};
9-
use yarte::{ywrite_html, Serialize};
8+
use yarte::TemplateBytesTrait;
109

1110
use super::utils;
1211

13-
#[derive(Copy, Clone, Serialize, Debug)]
12+
#[derive(Copy, Clone, Debug, sonic_rs::Serialize)]
1413
pub struct World {
1514
pub id: i32,
1615
pub randomnumber: i32,
1716
}
1817

19-
#[derive(Serialize, Debug)]
18+
#[derive(Debug, sonic_rs::Serialize)]
2019
pub struct Fortune {
2120
pub id: i32,
2221
pub message: Cow<'static, str>,
2322
}
2423

24+
#[derive(yarte::TemplateBytes)]
25+
#[template(path = "fortune.hbs")]
26+
pub struct FortunesTemplate<'a> {
27+
pub fortunes: &'a Vec<Fortune>,
28+
}
29+
2530
/// Postgres interface
2631
pub struct PgConnection {
2732
cl: Client,
@@ -30,6 +35,7 @@ pub struct PgConnection {
3035
rng: WyRand,
3136
updates: Vec<Statement>,
3237
buf: RefCell<BytesMut>,
38+
fbuf: RefCell<Vec<Fortune>>,
3339
}
3440

3541
impl PgConnection {
@@ -69,6 +75,7 @@ impl PgConnection {
6975
updates,
7076
rng: WyRand::new(),
7177
buf: RefCell::new(BytesMut::with_capacity(10 * 1024 * 1024)),
78+
fbuf: RefCell::new(Vec::with_capacity(64)),
7279
}
7380
}
7481
}
@@ -81,23 +88,26 @@ impl PgConnection {
8188

8289
let mut body = self.buf.borrow_mut();
8390
utils::reserve(&mut body, 1024);
84-
World {
85-
id: row.get(0),
86-
randomnumber: row.get(1),
87-
}
88-
.to_bytes_mut(&mut *body);
91+
sonic_rs::to_writer(
92+
utils::BytesWriter(&mut body),
93+
&World {
94+
id: row.get(0),
95+
randomnumber: row.get(1),
96+
},
97+
)
98+
.unwrap();
8999
body.split().freeze()
90100
}
91101

92102
pub async fn get_worlds(&self, num: usize) -> Bytes {
93103
let mut rng = self.rng.clone();
94-
let mut queries = SmallVec::<[_; 32]>::new();
104+
let mut queries = Vec::with_capacity(num);
95105
(0..num).for_each(|_| {
96106
let w_id = (rng.generate::<u32>() % 10_000 + 1) as i32;
97107
queries.push(self.cl.query_one(&self.world, &[&w_id]));
98108
});
99109

100-
let mut worlds = SmallVec::<[_; 32]>::new();
110+
let mut worlds = Vec::with_capacity(num);
101111
for fut in queries {
102112
let row = fut.await.unwrap();
103113
worlds.push(World {
@@ -108,25 +118,19 @@ impl PgConnection {
108118

109119
let mut body = self.buf.borrow_mut();
110120
utils::reserve(&mut body, 2 * 1024);
111-
body.put_u8(b'[');
112-
worlds.iter().for_each(|w| {
113-
w.to_bytes_mut(&mut *body);
114-
body.put_u8(b',');
115-
});
116-
let idx = body.len() - 1;
117-
body[idx] = b']';
121+
sonic_rs::to_writer(utils::BytesWriter(&mut body), &worlds[..]).unwrap();
118122
body.split().freeze()
119123
}
120124

121125
pub async fn update(&self, num: usize) -> Bytes {
122126
let mut rng = nanorand::tls_rng();
123-
let mut queries = SmallVec::<[_; 32]>::new();
127+
let mut queries = Vec::with_capacity(num);
124128
(0..num).for_each(|_| {
125129
let w_id = (rng.generate::<u32>() % 10_000 + 1) as i32;
126130
queries.push(self.cl.query_one(&self.world, &[&w_id]));
127131
});
128132

129-
let mut worlds = SmallVec::<[_; 32]>::new();
133+
let mut worlds = Vec::with_capacity(num);
130134
for fut in queries.into_iter() {
131135
let row = fut.await.unwrap();
132136
worlds.push(World {
@@ -147,23 +151,18 @@ impl PgConnection {
147151

148152
let mut body = self.buf.borrow_mut();
149153
utils::reserve(&mut body, 2 * 1024);
150-
body.put_u8(b'[');
151-
worlds.iter().for_each(|w| {
152-
w.to_bytes_mut(&mut *body);
153-
body.put_u8(b',');
154-
});
155-
let idx = body.len() - 1;
156-
body[idx] = b']';
154+
sonic_rs::to_writer(utils::BytesWriter(&mut body), &worlds[..]).unwrap();
157155
body.split().freeze()
158156
}
159157

160158
pub async fn tell_fortune(&self) -> Bytes {
161159
let rows = self.cl.query_raw(&self.fortune, &[]).await.unwrap();
162160

163-
let mut fortunes: SmallVec<[_; 32]> = smallvec::smallvec![Fortune {
161+
let mut fortunes = self.fbuf.borrow_mut();
162+
fortunes.push(Fortune {
164163
id: 0,
165164
message: Cow::Borrowed("Additional fortune added at request time."),
166-
}];
165+
});
167166
fortunes.extend(rows.iter().map(|row| Fortune {
168167
id: row.get(0),
169168
message: Cow::Owned(row.get(1)),
@@ -172,7 +171,13 @@ impl PgConnection {
172171

173172
let mut body = std::mem::replace(&mut *self.buf.borrow_mut(), BytesMut::new());
174173
utils::reserve(&mut body, 4 * 1024);
175-
ywrite_html!(body, "{{> fortune }}");
174+
175+
FortunesTemplate {
176+
fortunes: &*fortunes,
177+
}
178+
.write_call(&mut body);
179+
fortunes.clear();
180+
176181
let result = body.split().freeze();
177182
let _ = std::mem::replace(&mut *self.buf.borrow_mut(), body);
178183
result

frameworks/Rust/ntex/src/main.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
33

44
use ntex::http::header::{CONTENT_TYPE, SERVER};
5-
use ntex::{http, time::Seconds, util::BytesMut, util::PoolId, web};
6-
use yarte::Serialize;
5+
use ntex::{http, time::Seconds, util::BytesMut, util::PoolId, util::Ready, web};
6+
use sonic_rs::Serialize;
77

88
mod utils;
99

@@ -15,10 +15,13 @@ pub struct Message {
1515
#[web::get("/json")]
1616
async fn json() -> web::HttpResponse {
1717
let mut body = BytesMut::with_capacity(utils::SIZE);
18-
Message {
19-
message: "Hello, World!",
20-
}
21-
.to_bytes_mut(&mut body);
18+
sonic_rs::to_writer(
19+
utils::BytesWriter(&mut body),
20+
&Message {
21+
message: "Hello, World!",
22+
},
23+
)
24+
.unwrap();
2225

2326
let mut response = web::HttpResponse::with_body(http::StatusCode::OK, body.into());
2427
response.headers_mut().insert(SERVER, utils::HDR_SERVER);
@@ -45,6 +48,10 @@ async fn plaintext() -> web::HttpResponse {
4548
async fn main() -> std::io::Result<()> {
4649
println!("Started http server: 127.0.0.1:8080");
4750

51+
let cores = core_affinity::get_core_ids().unwrap();
52+
let total_cores = cores.len();
53+
let cores = std::sync::Arc::new(std::sync::Mutex::new(cores));
54+
4855
// start http server
4956
ntex::server::build()
5057
.backlog(1024)
@@ -60,7 +67,17 @@ async fn main() -> std::io::Result<()> {
6067
.payload_read_rate(Seconds::ZERO, Seconds::ZERO, 0)
6168
.h1(web::App::new().service(json).service(plaintext).finish())
6269
})?
63-
.workers(num_cpus::get())
70+
.configure(move |cfg| {
71+
let cores = cores.clone();
72+
cfg.on_worker_start(move |_| {
73+
if let Some(core) = cores.lock().unwrap().pop() {
74+
core_affinity::set_for_current(core);
75+
}
76+
Ready::<_, &str>::Ok(())
77+
});
78+
Ok(())
79+
})?
80+
.workers(total_cores)
6481
.run()
6582
.await
6683
}

frameworks/Rust/ntex/src/main_db.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
#[cfg(not(target_os = "macos"))]
22
#[global_allocator]
33
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
4-
// static GLOBAL: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
54

65
use ntex::http::header::{CONTENT_TYPE, SERVER};
76
use ntex::http::{HttpService, KeepAlive, Request, Response, StatusCode};
87
use ntex::service::{Service, ServiceCtx, ServiceFactory};
98
use ntex::web::{Error, HttpResponse};
10-
use ntex::{time::Seconds, util::PoolId};
9+
use ntex::{time::Seconds, util::PoolId, util::Ready};
1110

1211
mod db;
1312
mod utils;
@@ -83,6 +82,10 @@ impl ServiceFactory<Request> for AppFactory {
8382
async fn main() -> std::io::Result<()> {
8483
println!("Starting http server: 127.0.0.1:8080");
8584

85+
let cores = core_affinity::get_core_ids().unwrap();
86+
let total_cores = cores.len();
87+
let cores = std::sync::Arc::new(std::sync::Mutex::new(cores));
88+
8689
ntex::server::build()
8790
.backlog(1024)
8891
.bind("techempower", "0.0.0.0:8080", |cfg| {
@@ -97,7 +100,17 @@ async fn main() -> std::io::Result<()> {
97100
.payload_read_rate(Seconds::ZERO, Seconds::ZERO, 0)
98101
.h1(AppFactory)
99102
})?
100-
.workers(num_cpus::get())
103+
.configure(move |cfg| {
104+
let cores = cores.clone();
105+
cfg.on_worker_start(move |_| {
106+
if let Some(core) = cores.lock().unwrap().pop() {
107+
core_affinity::set_for_current(core);
108+
}
109+
Ready::<_, &str>::Ok(())
110+
});
111+
Ok(())
112+
})?
113+
.workers(total_cores)
101114
.run()
102115
.await
103116
}

frameworks/Rust/ntex/src/main_plt.rs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
33

44
use std::{future::Future, io, pin::Pin, task::Context, task::Poll};
55

6-
use ntex::{fn_service, http::h1, io::Io, io::RecvError, util::ready, util::PoolId};
7-
use yarte::Serialize;
6+
use ntex::util::{ready, PoolId, Ready};
7+
use ntex::{fn_service, http::h1, io::Io, io::RecvError};
8+
use sonic_rs::Serialize;
89

910
mod utils;
1011

@@ -42,10 +43,13 @@ impl Future for App {
4243
buf.extend_from_slice(JSON);
4344
this.codec.set_date_header(buf);
4445

45-
Message {
46-
message: "Hello, World!",
47-
}
48-
.to_bytes_mut(buf);
46+
sonic_rs::to_writer(
47+
utils::BytesWriter(buf),
48+
&Message {
49+
message: "Hello, World!",
50+
},
51+
)
52+
.unwrap();
4953
}
5054
"/plaintext" => {
5155
buf.extend_from_slice(PLAIN);
@@ -75,6 +79,10 @@ impl Future for App {
7579
async fn main() -> io::Result<()> {
7680
println!("Started http server: 127.0.0.1:8080");
7781

82+
let cores = core_affinity::get_core_ids().unwrap();
83+
let total_cores = cores.len();
84+
let cores = std::sync::Arc::new(std::sync::Mutex::new(cores));
85+
7886
// start http server
7987
ntex::server::build()
8088
.backlog(1024)
@@ -88,7 +96,17 @@ async fn main() -> io::Result<()> {
8896
codec: h1::Codec::default(),
8997
})
9098
})?
91-
.workers(num_cpus::get())
99+
.configure(move |cfg| {
100+
let cores = cores.clone();
101+
cfg.on_worker_start(move |_| {
102+
if let Some(core) = cores.lock().unwrap().pop() {
103+
core_affinity::set_for_current(core);
104+
}
105+
Ready::<_, &str>::Ok(())
106+
});
107+
Ok(())
108+
})?
109+
.workers(total_cores)
92110
.run()
93111
.await
94112
}

0 commit comments

Comments
 (0)