-
-
Notifications
You must be signed in to change notification settings - Fork 61
Expand file tree
/
Copy pathserver.rs
More file actions
136 lines (122 loc) · 5.18 KB
/
server.rs
File metadata and controls
136 lines (122 loc) · 5.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
mod auto;
mod context;
mod http;
mod socks;
use std::{net::SocketAddr, time::Duration};
use tokio::net::{TcpListener, TcpStream};
use tracing_subscriber::{EnvFilter, FmtSubscriber};
use self::{auto::AutoDetectServer, context::Context, http::HttpServer, socks::Socks5Server};
use crate::{AuthMode, BootArgs, Proxy, Result, connect::Connector};
/// Trait for connection acceptors that handle incoming TCP streams.
pub trait Acceptor {
/// Accepts and processes an incoming connection.
async fn accept(self, conn: (TcpStream, SocketAddr));
}
/// The [`Server`] trait defines a common interface for starting HTTP and SOCKS5 servers.
///
/// This trait is intended to be implemented by types that represent server configurations
/// for HTTP and SOCKS5 proxy servers. The `start` method is used to start the server and
/// handle incoming connections.
pub trait Server {
/// Starts the proxy server and runs until shutdown.
///
/// This method binds to the configured address, begins accepting connections,
/// and handles them according to the proxy protocol (HTTP/HTTPS/SOCKS5).
/// It runs indefinitely until an error occurs or a shutdown signal is received.
async fn start(self) -> std::io::Result<()>;
/// Accepts incoming TCP connections with retry on temporary failures.
///
/// This method continuously attempts to accept connections from the listener.
/// If a temporary error occurs (e.g., resource temporarily unavailable),
/// it waits 50ms before retrying to avoid busy-waiting.
#[inline]
async fn incoming(listener: &mut TcpListener) -> (TcpStream, SocketAddr) {
loop {
match listener.accept().await {
Ok(conn) => return conn,
Err(err) => {
tracing::trace!("Failed to accept connection: {err}");
// If the error is temporary, wait before retrying
tokio::time::sleep(Duration::from_millis(50)).await
}
}
}
}
}
/// Run the server with the provided boot arguments.
pub fn run(args: BootArgs) -> Result<()> {
// Initialize the logger with a filter that ignores WARN level logs for netlink_proto
let filter = EnvFilter::from_default_env()
.add_directive(args.log.into())
.add_directive("netlink_proto=error".parse()?);
tracing::subscriber::set_global_default(
FmtSubscriber::builder()
.with_max_level(args.log)
.with_env_filter(filter)
.finish(),
)?;
let workers = args
.workers
.unwrap_or(std::thread::available_parallelism()?.get());
tracing::info!("OS: {}", std::env::consts::OS);
tracing::info!("Arch: {}", std::env::consts::ARCH);
tracing::info!("Version: {}", env!("CARGO_PKG_VERSION"));
tracing::info!("Concurrent: {}", args.concurrent);
tracing::info!("Worker threads: {}", workers);
tracing::info!("Connect timeout: {:?}s", args.connect_timeout);
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(workers)
.build()?
.block_on(async {
#[cfg(target_os = "linux")]
if let Some(cidr) = &args.cidr {
crate::route::sysctl_ipv6_no_local_bind(cidr);
crate::route::sysctl_ipv6_all_enable_ipv6(cidr);
crate::route::sysctl_route_add_cidr(cidr).await;
}
let context = move |auth: AuthMode| Context {
auth,
bind: args.bind,
concurrent: args.concurrent,
connect_timeout: args.connect_timeout,
connector: Connector::new(
args.cidr,
args.cidr_range,
args.fallback,
args.connect_timeout,
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
args.tcp_user_timeout,
args.reuseaddr,
),
};
tokio::select! {
result = async {
match args.proxy {
Proxy::Http { auth } => {
HttpServer::new(context(auth))?.start().await
}
Proxy::Https { auth, tls_cert, tls_key } => {
HttpServer::new(context(auth))?
.with_https(tls_cert, tls_key)?
.start()
.await
}
Proxy::Socks5 { auth } => {
Socks5Server::new(context(auth))?.start().await
}
Proxy::Auto { auth, tls_cert, tls_key } => {
AutoDetectServer::new(context(auth), tls_cert, tls_key)?
.start()
.await
}
}
} => result,
_ = tokio::signal::ctrl_c() => {
tracing::info!("Shutdown signal received, shutting down gracefully...");
Ok(())
},
}
})?;
Ok(())
}