Skip to content

Commit f183203

Browse files
committed
[rust/viz]: use multiple current thread runtimes in json
1 parent a0ebdee commit f183203

File tree

6 files changed

+84
-56
lines changed

6 files changed

+84
-56
lines changed

frameworks/Rust/viz/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ mime = "0.3"
3535
rand = { version = "0.9", features = ["small_rng"] }
3636
thiserror = "2.0"
3737
futures-util = "0.3"
38+
socket2 = "0.5.8"
39+
num_cpus = "1.16.0"
3840

3941
[target.'cfg(not(unix))'.dependencies]
4042
nanorand = { version = "0.7" }

frameworks/Rust/viz/src/main.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use serde::Serialize;
44
use viz::{
55
header::{HeaderValue, CONTENT_TYPE, SERVER},
6-
Bytes, Error, Request, Response, ResponseExt, Result, Router,
6+
Bytes, Request, Response, ResponseExt, Result, Router,
77
};
88

99
mod server;
@@ -14,15 +14,17 @@ struct Message {
1414
message: &'static str,
1515
}
1616

17+
#[inline(always)]
1718
async fn plaintext(_: Request) -> Result<Response> {
1819
let mut res = Response::text("Hello, World!");
1920
res.headers_mut()
2021
.insert(SERVER, HeaderValue::from_static("Viz"));
2122
Ok(res)
2223
}
2324

25+
#[inline(always)]
2426
async fn json(_: Request) -> Result<Response> {
25-
let mut resp = Response::builder()
27+
let mut res = Response::builder()
2628
.body(
2729
http_body_util::Full::new(Bytes::from(
2830
serde_json::to_vec(&Message {
@@ -33,20 +35,23 @@ async fn json(_: Request) -> Result<Response> {
3335
.into(),
3436
)
3537
.unwrap();
36-
let headers = resp.headers_mut();
38+
let headers = res.headers_mut();
3739
headers.insert(SERVER, HeaderValue::from_static("Viz"));
3840
headers.insert(
3941
CONTENT_TYPE,
4042
HeaderValue::from_static(mime::APPLICATION_JSON.as_ref()),
4143
);
42-
Ok(resp)
44+
Ok(res)
4345
}
4446

45-
#[tokio::main]
46-
async fn main() -> Result<()> {
47+
async fn app() {
4748
let app = Router::new()
4849
.get("/plaintext", plaintext)
4950
.get("/json", json);
5051

51-
server::serve(app).await.map_err(Error::Boxed)
52+
server::serve(app).await.unwrap();
53+
}
54+
55+
fn main() {
56+
server::run(app)
5257
}

frameworks/Rust/viz/src/main_diesel.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,7 @@ async fn updates(req: Request) -> Result<Response> {
7777
Ok(res)
7878
}
7979

80-
#[tokio::main]
81-
async fn main() {
80+
async fn app() {
8281
let max = available_parallelism().map(|n| n.get()).unwrap_or(16) as u32;
8382

8483
let pool = Pool::<AsyncPgConnection>::builder()
@@ -99,3 +98,7 @@ async fn main() {
9998

10099
server::serve(app).await.unwrap()
101100
}
101+
102+
fn main() {
103+
server::run(app)
104+
}

frameworks/Rust/viz/src/main_pg.rs

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
use std::{
2-
sync::Arc,
3-
thread::{available_parallelism, spawn},
4-
};
1+
use std::sync::Arc;
52

63
use viz::{
74
header::{HeaderValue, SERVER},
@@ -76,26 +73,7 @@ async fn updates(req: Request) -> Result<Response> {
7673
Ok(res)
7774
}
7875

79-
fn main() {
80-
let rt = tokio::runtime::Builder::new_current_thread()
81-
.enable_all()
82-
.build()
83-
.unwrap();
84-
85-
for _ in 1..available_parallelism().map(|n| n.get()).unwrap_or(16) {
86-
spawn(move || {
87-
let rt = tokio::runtime::Builder::new_current_thread()
88-
.enable_all()
89-
.build()
90-
.unwrap();
91-
rt.block_on(serve());
92-
});
93-
}
94-
95-
rt.block_on(serve());
96-
}
97-
98-
async fn serve() {
76+
async fn app() {
9977
let conn = PgConnection::connect(DB_URL).await;
10078

10179
let app = Router::new()
@@ -107,3 +85,7 @@ async fn serve() {
10785

10886
server::serve(app).await.unwrap()
10987
}
88+
89+
fn main() {
90+
server::run(app)
91+
}

frameworks/Rust/viz/src/main_sqlx.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,7 @@ async fn updates(mut req: Request) -> Result<Response> {
7979
Ok(res)
8080
}
8181

82-
#[tokio::main]
83-
async fn main() -> Result<()> {
82+
async fn app() -> Result<()> {
8483
let max = available_parallelism().map(|n| n.get()).unwrap_or(16) as u32;
8584

8685
let pool = PgPoolOptions::new()
@@ -103,6 +102,10 @@ async fn main() -> Result<()> {
103102
server::serve(app).await.map_err(Error::Boxed)
104103
}
105104

105+
fn main() {
106+
server::run(app)
107+
}
108+
106109
markup::define! {
107110
FortunesTemplate(items: Vec<Fortune>) {
108111
{markup::doctype()}

frameworks/Rust/viz/src/server.rs

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,82 @@
11
use std::error::Error;
2+
use std::future::Future;
23
use std::io;
34
use std::net::{Ipv4Addr, SocketAddr};
45
use std::sync::Arc;
6+
use std::thread;
57

8+
use socket2::{Domain, SockAddr, Socket};
69
use hyper::server::conn::http1::Builder;
710
use hyper_util::rt::TokioIo;
8-
use tokio::net::{TcpListener, TcpSocket};
11+
use tokio::{net::TcpListener, runtime};
912
use viz::{Responder, Router, Tree};
1013

1114
pub async fn serve(router: Router) -> Result<(), Box<dyn Error + Send + Sync>> {
12-
let tree = Arc::<Tree>::new(router.into());
1315
let addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8080));
14-
let listener = reuse_listener(addr).expect("couldn't bind to addr");
16+
let socket = create_socket(addr).expect("couldn't bind to addr");
17+
let listener = TcpListener::from_std(socket.into())?;
18+
19+
let tree = Arc::<Tree>::new(router.into());
20+
21+
let mut http = Builder::new();
22+
http.pipeline_flush(true);
1523

1624
println!("Started viz server at 8080");
1725

1826
loop {
1927
let (tcp, _) = listener.accept().await?;
20-
let io = TokioIo::new(tcp);
28+
tcp.set_nodelay(true).expect("couldn't set TCP_NODELAY!");
29+
30+
let http = http.clone();
2131
let tree = tree.clone();
2232

23-
tokio::task::spawn(async move {
24-
Builder::new()
25-
.pipeline_flush(true)
26-
.serve_connection(io, Responder::<Arc<SocketAddr>>::new(tree, None))
27-
.with_upgrades()
33+
tokio::spawn(async move {
34+
http
35+
.serve_connection(
36+
TokioIo::new(tcp),
37+
Responder::<Arc<SocketAddr>>::new(tree, None),
38+
)
2839
.await
2940
});
3041
}
3142
}
3243

33-
fn reuse_listener(addr: SocketAddr) -> io::Result<TcpListener> {
34-
let socket = match addr {
35-
SocketAddr::V4(_) => TcpSocket::new_v4()?,
36-
SocketAddr::V6(_) => TcpSocket::new_v6()?,
44+
fn create_socket(addr: SocketAddr) -> Result<Socket, io::Error> {
45+
let domain = match addr {
46+
SocketAddr::V4(_) => Domain::IPV4,
47+
SocketAddr::V6(_) => Domain::IPV6,
3748
};
38-
49+
let addr = SockAddr::from(addr);
50+
let socket = Socket::new(domain, socket2::Type::STREAM, None)?;
51+
let backlog = 4096;
3952
#[cfg(unix)]
40-
{
41-
if let Err(e) = socket.set_reuseport(true) {
42-
eprintln!("error setting SO_REUSEPORT: {e}");
43-
}
53+
socket.set_reuse_port(true)?;
54+
socket.set_reuse_address(true)?;
55+
socket.set_nodelay(true)?;
56+
socket.set_nonblocking(true)?;
57+
socket.bind(&addr)?;
58+
socket.listen(backlog)?;
59+
60+
Ok(socket)
61+
}
62+
63+
pub fn run<Fut>(f: fn() -> Fut)
64+
where
65+
Fut: Future + Send + 'static,
66+
{
67+
for _ in 1..num_cpus::get() {
68+
let runtime = runtime::Builder::new_current_thread()
69+
.enable_all()
70+
.build()
71+
.unwrap();
72+
thread::spawn(move || {
73+
runtime.block_on(f());
74+
});
4475
}
4576

46-
socket.set_reuseaddr(true)?;
47-
socket.bind(addr)?;
48-
socket.listen(1024)
77+
let runtime = runtime::Builder::new_current_thread()
78+
.enable_all()
79+
.build()
80+
.unwrap();
81+
runtime.block_on(f());
4982
}

0 commit comments

Comments
 (0)