Skip to content

Commit 5a97792

Browse files
MoritzFluMoritz Flüchter
andauthored
Add feature to read kernel tcp socket metrics (#2)
* Add first tcp_sock_tracer * First working tcp kernel sockets * Fix IP read * Add arrayref to cargo * Add db/Cargo.lock to gitignore * Working concept * Add tcp_sock tracing fields --------- Co-authored-by: Moritz Flüchter <moritz.fluechter@uni-tuebingen.de>
1 parent f4d4661 commit 5a97792

File tree

22 files changed

+66227
-38
lines changed

22 files changed

+66227
-38
lines changed

db/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
target/*
22
*.sqlite
3+
Cargo.lock

db/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ sqlite = { version = "0.36.1", features = ["bundled"] }
1717
indicatif = "0.17.11"
1818
env_logger = "0.11.6"
1919
aya-ebpf = "0.1.1"
20+
arrayref = "0.3.9"
2021

2122
[net]
2223
net.git-fetch-with-cli= true

db/src/bindings/sock.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
2+
3+
use ts_storage::{DataValue, IpTuple};
4+
5+
use crate::{db_writer::DBOperation, flow_tracker::{EventIndexer, AF_INET}, reader::FromBuffer};
6+
7+
use arrayref::array_ref;
8+
9+
#[repr(C)]
10+
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Default)]
11+
pub struct sock_trace_entry {
12+
pub time: u64,
13+
pub addr_v4: u64,
14+
pub src_v6: [u8; 16],
15+
pub dst_v6: [u8; 16],
16+
pub ports: u32,
17+
pub family: u16,
18+
// SOCK Stats
19+
pub pacing_rate: u64,
20+
pub max_pacing_rate: u64,
21+
// INET_CONN Stats
22+
pub backoff: u8,
23+
pub rto: u32,
24+
// INET_CONN -> icsk_ack
25+
pub ato: u32,
26+
pub rcv_mss: u16,
27+
// TCP_SOCK Stats
28+
pub snd_cwnd: u32,
29+
pub bytes_acked: u64,
30+
pub snd_ssthresh: u32,
31+
pub total_retrans: u32,
32+
pub probes: u8,
33+
pub lost: u32,
34+
pub sacked_out: u32,
35+
pub retrans: u32,
36+
pub rcv_ssthresh: u32,
37+
pub rttvar: u32,
38+
pub advmss: u16,
39+
pub reordering: u32,
40+
pub rcv_rtt: u32,
41+
pub rcv_space: u32,
42+
pub bytes_received: u64,
43+
pub segs_out: u32,
44+
pub segs_in: u32,
45+
// TCP_SOCK -> tcp_options_received
46+
pub snd_wscale: u16,
47+
pub rcv_wscale: u16,
48+
pub div: u32
49+
}
50+
51+
impl FromBuffer for sock_trace_entry {
52+
fn from_buffer(buf: &Vec<u8>) -> Self {
53+
unsafe { *(buf.as_ptr() as *const sock_trace_entry) }
54+
55+
}
56+
}
57+
58+
impl EventIndexer for sock_trace_entry {
59+
fn get_field(&self, index: usize) -> Option<DataValue> {
60+
match index {
61+
_ => None, // TODO: better error handling
62+
}
63+
}
64+
fn get_default_field(&self, index: usize) -> DataValue {
65+
match index {
66+
_ => panic!("Tried to access out of bounds index!"), // TODO: better error handling
67+
}
68+
}
69+
fn get_field_name(&self, index: usize) -> &str {
70+
match index {
71+
_ => panic!("Tried to access out of bounds index!"), // TODO: better error handling
72+
}
73+
}
74+
fn get_ip_tuple(&self) -> IpTuple {
75+
let src: IpAddr;
76+
let dst: IpAddr;
77+
78+
//print!("Family: {}",self.family);
79+
80+
81+
if self.family == AF_INET {
82+
// TODO: check offsets
83+
let mut bytes = self.addr_v4.to_be_bytes();
84+
85+
let mut srcbytes = array_ref![bytes,0,4].clone();
86+
let mut dstbytes = array_ref![bytes,4,4].clone();
87+
//srcbytes.reverse();
88+
89+
srcbytes.reverse();
90+
dstbytes.reverse();
91+
src = IpAddr::V4(Ipv4Addr::from(srcbytes));
92+
dst = IpAddr::V4(Ipv4Addr::from(dstbytes));
93+
} else {
94+
src = IpAddr::V6(Ipv6Addr::from(self.src_v6));
95+
dst = IpAddr::V6(Ipv6Addr::from(self.dst_v6));
96+
}
97+
98+
let port_bytes = self.ports.to_be_bytes();
99+
100+
let srcbytes = array_ref![port_bytes,0,2].clone();
101+
let dstbytes = array_ref![port_bytes,2,2].clone();
102+
103+
// TODO: check byte order if ports are correct
104+
// Dport could be be bytes
105+
let sport = u16::from_le_bytes(srcbytes);
106+
let dport = u16::from_le_bytes(dstbytes);
107+
108+
IpTuple {
109+
src: src,
110+
dst: dst,
111+
sport: sport as i64,
112+
dport: dport as i64,
113+
l4proto: 6,
114+
}
115+
}
116+
fn get_max_index(&self) -> usize {
117+
0
118+
}
119+
fn get_timestamp(&self) -> f64 {
120+
self.time as f64
121+
}
122+
fn as_db_op(self) -> DBOperation {
123+
DBOperation::Socket(self)
124+
}
125+
}

db/src/bindings/tcp_probe.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl EventIndexer for TcpProbe {
8383
fn get_ip_tuple(&self) -> IpTuple {
8484
let src: IpAddr;
8585
let dst: IpAddr;
86-
86+
8787
if self.family == AF_INET {
8888
//IPv4
8989
src = IpAddr::V4(Ipv4Addr::from(shorten_to_ipv4(self.saddr)));

db/src/db_writer.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@ use log::error;
44
use ts_storage::{database_factory, sqlite::SQLiteTSDB, DBBackend, IpTuple, TSDBInterface};
55
use tokio::sync::mpsc::Receiver;
66

7-
use crate::{bindings::{tcp_packet::TcpPacket, tcp_probe::TcpProbe}, flow_tracker::{EventIndexer, EventType, FlowTracker}};
7+
use crate::{bindings::{sock::sock_trace_entry, tcp_packet::TcpPacket, tcp_probe::TcpProbe}, flow_tracker::{EventIndexer, EventType, FlowTracker}};
88

99
#[derive(Debug)]
1010
pub enum DBOperation {
1111
Packet(TcpPacket),
12-
Probe(TcpProbe)
12+
Probe(TcpProbe),
13+
Socket(sock_trace_entry)
1314
}
1415

1516

@@ -94,7 +95,24 @@ impl DBWriter {
9495

9596
//println!("Probe {:?}",data.ssthresh);
9697

98+
},
99+
DBOperation::Socket(sock) => {
100+
if sock.div != 0xffffffff {
101+
println!(
102+
"Malformed!: {:?},div: {}",sock,sock.div
103+
);
104+
panic!("Malformed packet!");
105+
} else {
106+
if sock.family == 2 && sock.snd_cwnd > 0 {
107+
println!("Time: {}, Stream: {:?} CWND: {:?}",sock.time,sock.get_ip_tuple(),sock.snd_cwnd.to_le_bytes());
108+
}
109+
//println!(
110+
// "Packet!: {:?}, CWND: {}",sock.get_ip_tuple(),sock.snd_cwnd );
111+
}
112+
//let a = sock.get_ip_tuple();
113+
//println!("Socket: {:?}",sock.get_ip_tuple());
97114
}
115+
98116
}
99117
}
100118

db/src/main.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ mod bindings {
66
pub mod tcp_packet;
77
pub mod tcp_probe;
88
pub mod ctypes;
9+
pub mod sock;
910
}
1011

11-
use bindings::{tcp_packet::TcpPacket, tcp_probe::TcpProbe};
12+
use bindings::{sock::sock_trace_entry, tcp_packet::TcpPacket, tcp_probe::TcpProbe};
1213
use db_writer::{DBOperation, DBWriter};
1314
use flow_tracker::{EventIndexer, EventType, FlowTracker, TsTracker};
1415
use log::{error, info};
@@ -99,9 +100,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
99100

100101
// Start all tasks
101102
let threads = vec![
102-
start_file_reader::<TcpPacket>("/tmp/xdp.tcp", tx.clone(), stop_token.clone()).await,
103-
start_file_reader::<TcpPacket>("/tmp/tc.tcp", tx.clone(), stop_token.clone()).await,
104-
start_file_reader::<TcpProbe>("/tmp/probe.tcp", tx.clone(), stop_token.clone()).await,
103+
//start_file_reader::<TcpPacket>("/tmp/xdp.tcp", tx.clone(), stop_token.clone()).await,
104+
//start_file_reader::<TcpPacket>("/tmp/tc.tcp", tx.clone(), stop_token.clone()).await,
105+
//start_file_reader::<TcpProbe>("/tmp/probe.tcp", tx.clone(), stop_token.clone()).await,
106+
start_file_reader::<sock_trace_entry>("/tmp/sock.tcp", tx.clone(), stop_token.clone()).await
105107
];
106108

107109
// Wait for file threads to finish!

db/src/reader.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ impl<T: EventIndexer + Debug + FromBuffer> FileReader<T> {
5454
// TODO: I DONT KNOW WHY THIS 4 BYTE MISALIGNMENT HAPPENS, IT JUST DOES
5555
// FIX THIS!
5656
let entry_size = core::mem::size_of::<T>() - 4;
57+
//let entry_size = 68;
5758

5859
debug!("Entry size: {} bytes for {}", entry_size, self.path);
5960

tcbee/Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tcbee/debu_run.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
#!/bin/bash
2-
RUST_LOG=debug cargo run --release --config 'target."cfg(all())".runner="sudo -E"' -- -q lo
2+
RUST_LOG=info cargo run --release --config 'target."cfg(all())".runner="sudo -E"' -- -q lo

tcbee/tcbee-common/src/bindings/mod.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ pub mod eth_header;
66
pub mod ip4_header;
77
pub mod ip6_header;
88
pub mod flow;
9+
pub mod tcp_sock;
910

1011
#[cfg(feature = "user")]
1112
use aya::Pod;
1213
use tcp_bad_csum::tcp_bad_csum_entry;
1314
use tcp_probe::tcp_probe_entry;
1415
use tcp_retransmit_synack::tcp_retransmit_synack_entry;
1516
use tcp_header::tcp_packet_trace;
17+
use tcp_sock::sock_trace_entry;
1618

1719
#[repr(C)]
1820
#[derive(Default)]
@@ -82,10 +84,18 @@ impl EBPFTracePointType for tcp_bad_csum_entry {
8284
const NAME: &'static str = "tcp_bad_csum";
8385
const CATEGORY: &'static str = "tcp";
8486
}
87+
88+
impl EBPFTracePointType for sock_trace_entry {
89+
const QUEUE_NAME: &'static str = "TCP_SOCK";
90+
const NAME: &'static str = "tcp_sock";
91+
const CATEGORY: &'static str = "tcp";
92+
}
8593
#[cfg(feature = "user")]
8694
// Needed to be able to parse as queue entry from eBPF queue
8795
unsafe impl Pod for tcp_probe_entry {}
8896
#[cfg(feature = "user")]
8997
unsafe impl Pod for tcp_retransmit_synack_entry {}
9098
#[cfg(feature = "user")]
91-
unsafe impl Pod for tcp_bad_csum_entry {}
99+
unsafe impl Pod for tcp_bad_csum_entry {}
100+
#[cfg(feature = "user")]
101+
unsafe impl Pod for sock_trace_entry {}

0 commit comments

Comments
 (0)