|
1 | 1 | use std::error::Error; |
| 2 | +use std::future::Future; |
2 | 3 | use std::io; |
3 | 4 | use std::net::{Ipv4Addr, SocketAddr}; |
4 | 5 | use std::sync::Arc; |
| 6 | +use std::thread; |
5 | 7 |
|
| 8 | +use socket2::{Domain, SockAddr, Socket}; |
6 | 9 | use hyper::server::conn::http1::Builder; |
7 | 10 | use hyper_util::rt::TokioIo; |
8 | | -use tokio::net::{TcpListener, TcpSocket}; |
| 11 | +use tokio::{net::TcpListener, runtime}; |
9 | 12 | use viz::{Responder, Router, Tree}; |
10 | 13 |
|
11 | 14 | pub async fn serve(router: Router) -> Result<(), Box<dyn Error + Send + Sync>> { |
12 | | - let tree = Arc::<Tree>::new(router.into()); |
13 | 15 | let addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8080)); |
14 | | - let listener = reuse_listener(addr).expect("couldn't bind to addr"); |
| 16 | + let socket = create_socket(addr).expect("couldn't bind to addr"); |
| 17 | + let listener = TcpListener::from_std(socket.into())?; |
| 18 | + |
| 19 | + let tree = Arc::<Tree>::new(router.into()); |
| 20 | + |
| 21 | + let mut http = Builder::new(); |
| 22 | + http.pipeline_flush(true); |
15 | 23 |
|
16 | 24 | println!("Started viz server at 8080"); |
17 | 25 |
|
18 | 26 | loop { |
19 | 27 | let (tcp, _) = listener.accept().await?; |
20 | | - let io = TokioIo::new(tcp); |
| 28 | + tcp.set_nodelay(true).expect("couldn't set TCP_NODELAY!"); |
| 29 | + |
| 30 | + let http = http.clone(); |
21 | 31 | let tree = tree.clone(); |
22 | 32 |
|
23 | | - tokio::task::spawn(async move { |
24 | | - Builder::new() |
25 | | - .pipeline_flush(true) |
26 | | - .serve_connection(io, Responder::<Arc<SocketAddr>>::new(tree, None)) |
27 | | - .with_upgrades() |
| 33 | + tokio::spawn(async move { |
| 34 | + http |
| 35 | + .serve_connection( |
| 36 | + TokioIo::new(tcp), |
| 37 | + Responder::<Arc<SocketAddr>>::new(tree, None), |
| 38 | + ) |
28 | 39 | .await |
29 | 40 | }); |
30 | 41 | } |
31 | 42 | } |
32 | 43 |
|
33 | | -fn reuse_listener(addr: SocketAddr) -> io::Result<TcpListener> { |
34 | | - let socket = match addr { |
35 | | - SocketAddr::V4(_) => TcpSocket::new_v4()?, |
36 | | - SocketAddr::V6(_) => TcpSocket::new_v6()?, |
| 44 | +fn create_socket(addr: SocketAddr) -> Result<Socket, io::Error> { |
| 45 | + let domain = match addr { |
| 46 | + SocketAddr::V4(_) => Domain::IPV4, |
| 47 | + SocketAddr::V6(_) => Domain::IPV6, |
37 | 48 | }; |
38 | | - |
| 49 | + let addr = SockAddr::from(addr); |
| 50 | + let socket = Socket::new(domain, socket2::Type::STREAM, None)?; |
| 51 | + let backlog = 4096; |
39 | 52 | #[cfg(unix)] |
40 | | - { |
41 | | - if let Err(e) = socket.set_reuseport(true) { |
42 | | - eprintln!("error setting SO_REUSEPORT: {e}"); |
43 | | - } |
| 53 | + socket.set_reuse_port(true)?; |
| 54 | + socket.set_reuse_address(true)?; |
| 55 | + socket.set_nodelay(true)?; |
| 56 | + socket.set_nonblocking(true)?; |
| 57 | + socket.bind(&addr)?; |
| 58 | + socket.listen(backlog)?; |
| 59 | + |
| 60 | + Ok(socket) |
| 61 | +} |
| 62 | + |
| 63 | +pub fn run<Fut>(f: fn() -> Fut) |
| 64 | +where |
| 65 | + Fut: Future + Send + 'static, |
| 66 | +{ |
| 67 | + for _ in 1..num_cpus::get() { |
| 68 | + let runtime = runtime::Builder::new_current_thread() |
| 69 | + .enable_all() |
| 70 | + .build() |
| 71 | + .unwrap(); |
| 72 | + thread::spawn(move || { |
| 73 | + runtime.block_on(f()); |
| 74 | + }); |
44 | 75 | } |
45 | 76 |
|
46 | | - socket.set_reuseaddr(true)?; |
47 | | - socket.bind(addr)?; |
48 | | - socket.listen(1024) |
| 77 | + let runtime = runtime::Builder::new_current_thread() |
| 78 | + .enable_all() |
| 79 | + .build() |
| 80 | + .unwrap(); |
| 81 | + runtime.block_on(f()); |
49 | 82 | } |
0 commit comments