Skip to content

Commit d172334

Browse files
authored
TCP/IP Analysis support (#35)
1 parent 4653075 commit d172334

File tree

8 files changed

+668
-17
lines changed

8 files changed

+668
-17
lines changed

Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,16 @@ rpmalloc = { version = "0.2.2", optional = true }
5656
jemallocator = { package = "tikv-jemallocator", version = "0.6", optional = true }
5757
mimalloc = { version = "0.1.39", default-features = false, optional = true }
5858

59+
[target.'cfg(target_os = "linux")'.dependencies]
60+
pcap = "2.0"
61+
pnet = "0.35"
62+
serde_json = "1.0"
63+
pktparse = { version = "0.7.1", features = ["serde"] }
64+
5965
[target.'cfg(target_family = "unix")'.dependencies]
6066
daemonize = "0.5.0"
6167
nix = { version = "0.30.1", features = ["user", "signal"] }
62-
sysinfo = { version = "0.37", default-features = false, features = ["system"] }
68+
sysinfo = "0.30"
6369

6470
[dev-dependencies]
6571
serde_json = "1.0"

src/main.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
22

3-
pub mod alloc;
3+
mod alloc;
44
#[cfg(target_family = "unix")]
55
mod daemon;
66
mod error;
@@ -22,24 +22,38 @@ pub struct Opt {
2222
#[derive(clap::Args, Clone)]
2323
pub struct Args {
2424
/// Debug mode
25-
#[clap(long, default_value = "info", env = "PINGLY_LOG")]
25+
#[arg(long, default_value = "info", env = "PINGLY_LOG")]
2626
pub log: String,
2727

2828
/// Bind address
29-
#[clap(short, long, default_value = "0.0.0.0:8181")]
29+
#[arg(short, long, default_value = "0.0.0.0:8181")]
3030
pub bind: SocketAddr,
3131

3232
/// Concurrent connections
33-
#[clap(short, long, default_value = "1024")]
33+
#[arg(short, long, default_value = "1024")]
3434
pub concurrent: usize,
3535

36+
/// Keep alive timeout (seconds)
37+
#[arg(short, long, default_value = "60")]
38+
pub keep_alive_timeout: u64,
39+
3640
/// TLS certificate file path
37-
#[clap(short = 'C', long)]
41+
#[arg(short = 'C', long)]
3842
pub tls_cert: Option<PathBuf>,
3943

4044
/// TLS private key file path (EC/PKCS8/RSA)
41-
#[clap(short = 'K', long)]
45+
#[arg(short = 'K', long)]
4246
pub tls_key: Option<PathBuf>,
47+
48+
/// Enable packet capture for TCP/IP analysis (requires root privileges)
49+
#[cfg(target_os = "linux")]
50+
#[arg(long, short = 'T')]
51+
pub tcp_capture_packet: bool,
52+
53+
/// Network interface to capture packets from (default: auto-detect)
54+
#[cfg(target_os = "linux")]
55+
#[arg(long, short = 'I')]
56+
pub tcp_capture_interface: Option<String>,
4357
}
4458

4559
#[derive(Subcommand)]
@@ -70,6 +84,7 @@ pub enum Commands {
7084

7185
fn main() -> Result<()> {
7286
let opt = Opt::parse();
87+
#[cfg(target_family = "unix")]
7388
let daemon = daemon::Daemon::default();
7489
match opt.commands {
7590
Commands::Run(config) => server::run(config),

src/server/mod.rs

Lines changed: 97 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ use axum::{
1515
use axum_extra::response::ErasedJson;
1616
use axum_server::Handle;
1717
use hyper_util::rt::TokioTimer;
18-
use tower::limit::ConcurrencyLimitLayer;
18+
use tower::{limit::ConcurrencyLimitLayer, ServiceBuilder};
19+
1920
use tower_http::{
2021
cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer},
2122
trace::{DefaultMakeSpan, DefaultOnFailure, DefaultOnResponse, TraceLayer},
@@ -27,6 +28,9 @@ use tracker::{
2728
info::{ConnectionTrack, Track, TrackInfo},
2829
};
2930

31+
#[cfg(target_os = "linux")]
32+
use tracker::capture::TcpCaptureTrack;
33+
3034
use crate::{error::Error, Args, Result};
3135

3236
#[tokio::main]
@@ -44,8 +48,8 @@ pub async fn run(args: Args) -> Result<()> {
4448
tracing::info!("Concurrent limit: {}", args.concurrent);
4549
tracing::info!("Bind address: {}", args.bind);
4650

47-
// init global layer provider
48-
let global_layer = tower::ServiceBuilder::new()
51+
// Init global layer
52+
let global_layer = ServiceBuilder::new()
4953
.layer(
5054
TraceLayer::new_for_http()
5155
.make_span_with(DefaultMakeSpan::new().level(Level::INFO))
@@ -61,17 +65,50 @@ pub async fn run(args: Args) -> Result<()> {
6165
)
6266
.layer(ConcurrencyLimitLayer::new(args.concurrent));
6367

64-
let router = Router::new()
68+
// Create the router with the tracking endpoints
69+
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))]
70+
let mut router = Router::new()
6571
.route("/api/all", any(track))
6672
.route("/api/tls", any(tls_track))
6773
.route("/api/http1", any(http1_track))
68-
.route("/api/http2", any(http2_track))
69-
.layer(global_layer);
74+
.route("/api/http2", any(http2_track));
7075

7176
// Signal the server to shutdown using Handle.
7277
let handle = Handle::new();
7378

79+
// Add TCP tracking layer
80+
#[cfg(target_os = "linux")]
81+
{
82+
let mut tcp_capture_track: Option<TcpCaptureTrack> = None;
83+
if args.tcp_capture_packet {
84+
tracing::info!("Enabling TCP/IP packet capture (requires root)");
85+
let capture = TcpCaptureTrack::new(128, args.bind.port());
86+
if let Err(err) = capture.start_capture(args.tcp_capture_interface.clone()) {
87+
tracing::error!("Failed to start TCP/IP packet capture: {err}");
88+
} else {
89+
if let Some(iface) = args.tcp_capture_interface {
90+
tracing::info!(
91+
"TCP/IP packet capture started successfully on interface {iface}"
92+
);
93+
}
94+
tcp_capture_track = Some(capture);
95+
}
96+
}
97+
98+
if let Some(capture) = tcp_capture_track.clone() {
99+
router = router
100+
.route("/api/tcp", any(tcp_track))
101+
.layer(Extension(capture));
102+
}
103+
104+
tokio::spawn(signal::graceful_shutdown(
105+
handle.clone(),
106+
tcp_capture_track.clone(),
107+
));
108+
}
109+
74110
// Spawn a task to gracefully shutdown server.
111+
#[cfg(not(target_os = "linux"))]
75112
tokio::spawn(signal::graceful_shutdown(handle.clone()));
76113

77114
// Load TLS configuration with HTTP/2 ALPN preference
@@ -99,7 +136,11 @@ pub async fn run(args: Args) -> Result<()> {
99136
server
100137
.handle(handle)
101138
.map(TrackAcceptor::new)
102-
.serve(router.into_make_service_with_connect_info::<SocketAddr>())
139+
.serve(
140+
router
141+
.layer(global_layer)
142+
.into_make_service_with_connect_info::<SocketAddr>(),
143+
)
103144
.await
104145
.map_err(Into::into)
105146
}
@@ -115,12 +156,42 @@ impl IntoResponse for Error {
115156
pub async fn track(
116157
Extension(ConnectInfo(addr)): Extension<ConnectInfo<SocketAddr>>,
117158
Extension(track): Extension<ConnectionTrack>,
159+
#[cfg(target_os = "linux")] tcp_capture: Option<Extension<TcpCaptureTrack>>,
118160
req: Request<Body>,
119161
) -> Result<ErasedJson> {
120-
tokio::task::spawn_blocking(move || TrackInfo::new(Track::All, addr, req, track))
162+
// get TCP packets if capture is available
163+
#[cfg(target_os = "linux")]
164+
let tcp_packets = if let Some(Extension(capture)) = tcp_capture {
165+
// small delay to capture packets
166+
tokio::time::sleep(Duration::from_millis(100)).await;
167+
168+
let client_ip = addr.ip().to_string();
169+
let client_port = addr.port();
170+
171+
let packets = capture.get_packets_for_client(&client_ip, client_port);
172+
capture.clear_packets_for_client(&client_ip, client_port);
173+
packets
174+
} else {
175+
Vec::new()
176+
};
177+
178+
#[cfg(target_os = "linux")]
179+
{
180+
tokio::task::spawn_blocking(move || {
181+
TrackInfo::new_with_tcp(Track::All, addr, req, track, tcp_packets)
182+
})
121183
.await
122184
.map(ErasedJson::pretty)
123185
.map_err(Error::from)
186+
}
187+
188+
#[cfg(not(target_os = "linux"))]
189+
{
190+
tokio::task::spawn_blocking(move || TrackInfo::new(Track::All, addr, req, track))
191+
.await
192+
.map(ErasedJson::pretty)
193+
.map_err(Error::from)
194+
}
124195
}
125196

126197
#[inline]
@@ -158,3 +229,21 @@ pub async fn http2_track(
158229
.map(ErasedJson::pretty)
159230
.map_err(Error::from)
160231
}
232+
233+
#[inline]
234+
#[cfg(target_os = "linux")]
235+
pub async fn tcp_track(
236+
Extension(ConnectInfo(addr)): Extension<ConnectInfo<SocketAddr>>,
237+
Extension(capture): Extension<TcpCaptureTrack>,
238+
) -> Result<ErasedJson> {
239+
tokio::time::sleep(Duration::from_millis(100)).await;
240+
241+
let client_ip = addr.ip().to_string();
242+
let client_port = addr.port();
243+
244+
let packets = capture.get_packets_for_client(&client_ip, client_port);
245+
246+
capture.clear_packets_for_client(&client_ip, client_port);
247+
248+
Ok(ErasedJson::pretty(&packets))
249+
}

src/server/signal.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,37 @@ use axum_server::Handle;
44
use tokio::time::sleep;
55
use tracing::info;
66

7-
pub(super) async fn graceful_shutdown(handle: Handle) {
7+
#[cfg(target_os = "linux")]
8+
use super::tracker::capture::TcpCaptureTrack;
9+
10+
pub(super) async fn graceful_shutdown(
11+
handle: Handle,
12+
#[cfg(target_os = "linux")] capture: Option<TcpCaptureTrack>,
13+
) {
814
tokio::signal::ctrl_c()
915
.await
1016
.expect("Ctrl+C signal hanlde error");
1117

1218
info!("Ctrl+C signal received: starting graceful shutdown");
1319

20+
#[cfg(target_os = "linux")]
21+
if let Some(capture) = capture {
22+
capture.shutdown();
23+
}
24+
1425
// Signal the server to shutdown using Handle.
1526
handle.graceful_shutdown(Some(Duration::from_secs(1)));
1627

1728
// Print alive connection count every second.
1829
loop {
1930
sleep(Duration::from_secs(1)).await;
20-
info!("Alive connections: {}", handle.connection_count());
31+
let connections = handle.connection_count();
32+
info!("Alive connections: {}", connections);
33+
34+
// Exit the loop when all connections are closed
35+
if connections == 0 {
36+
info!("All connections closed, exiting gracefully");
37+
break;
38+
}
2139
}
2240
}

0 commit comments

Comments
 (0)