Skip to content

Commit e727137

Browse files
MoritzFluMoritz Flüchter
andauthored
Fix TCP Kernel Sockets for DB Writer (#3)
* Add first tcp_sock_tracer * First working tcp kernel sockets * Fix IP read * Add arrayref to cargo * Add db/Cargo.lock to gitignore * Working concept * Add tcp_sock tracing fields * Fix struct (de-) serialization for files * Add sock_trace_entry to db writer * Fix byte alignment errors for trace entries * Add port filter mechanism * Speedup UI refresh rate * Fix endianness of dport * Add missing HandlerConstraint --------- Co-authored-by: Moritz Flüchter <moritz.fluechter@uni-tuebingen.de>
1 parent 5a97792 commit e727137

File tree

20 files changed

+345
-124
lines changed

20 files changed

+345
-124
lines changed

db/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ indicatif = "0.17.11"
1818
env_logger = "0.11.6"
1919
aya-ebpf = "0.1.1"
2020
arrayref = "0.3.9"
21+
bincode = "1.3.3"
2122

2223
[net]
2324
net.git-fetch-with-cli= true

db/src/bindings/sock.rs

Lines changed: 94 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
22

3+
use serde::Deserialize;
34
use ts_storage::{DataValue, IpTuple};
45

56
use crate::{db_writer::DBOperation, flow_tracker::{EventIndexer, AF_INET}, reader::FromBuffer};
67

78
use arrayref::array_ref;
8-
99
#[repr(C)]
10-
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Default)]
10+
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Default, Deserialize)]
1111
pub struct sock_trace_entry {
1212
pub time: u64,
1313
pub addr_v4: u64,
14-
pub src_v6: [u8; 16],
15-
pub dst_v6: [u8; 16],
14+
pub src_v6: [u8; 16usize],
15+
pub dst_v6: [u8; 16usize],
1616
pub ports: u32,
1717
pub family: u16,
1818
// SOCK Stats
@@ -45,29 +45,113 @@ pub struct sock_trace_entry {
4545
// TCP_SOCK -> tcp_options_received
4646
pub snd_wscale: u16,
4747
pub rcv_wscale: u16,
48-
pub div: u32
48+
pub div: [u8; 4usize],
4949
}
5050

5151
impl FromBuffer for sock_trace_entry {
5252
fn from_buffer(buf: &Vec<u8>) -> Self {
53-
unsafe { *(buf.as_ptr() as *const sock_trace_entry) }
53+
//unsafe { *(buf.as_ptr() as *const sock_trace_entry) }
54+
55+
let try_deserialize = bincode::deserialize::<'_, sock_trace_entry>(buf);
56+
57+
if try_deserialize.is_err() {
58+
sock_trace_entry::default()
59+
} else {
60+
try_deserialize.unwrap()
61+
}
5462

5563
}
64+
const ENTRY_SIZE: usize = 160;
5665
}
5766

5867
impl EventIndexer for sock_trace_entry {
5968
fn get_field(&self, index: usize) -> Option<DataValue> {
6069
match index {
70+
0 => if self.pacing_rate > 0 {Some(DataValue::Int(self.pacing_rate as i64))} else {None},
71+
1 => if self.max_pacing_rate > 0 {Some(DataValue::Int(self.max_pacing_rate as i64))} else {None},
72+
2 => if self.backoff > 0 {Some(DataValue::Int(self.backoff as i64))} else {None},
73+
3 => if self.rto > 0 {Some(DataValue::Int(self.rto as i64))} else {None},
74+
4 => if self.ato > 0 {Some(DataValue::Int(self.ato as i64))} else {None},
75+
5 => if self.rcv_mss > 0 {Some(DataValue::Int(self.rcv_mss as i64))} else {None},
76+
6 => if self.snd_cwnd > 0 {Some(DataValue::Int(self.snd_cwnd as i64))} else {None},
77+
7 => if self.bytes_acked > 0 {Some(DataValue::Int(self.bytes_acked as i64))} else {None},
78+
8 => if self.snd_ssthresh > 0 {Some(DataValue::Int(self.snd_ssthresh as i64))} else {None},
79+
9 => if self.total_retrans > 0 {Some(DataValue::Int(self.total_retrans as i64))} else {None},
80+
10 => if self.probes > 0 {Some(DataValue::Int(self.probes as i64))} else {None},
81+
11 => if self.lost > 0 {Some(DataValue::Int(self.lost as i64))} else {None},
82+
12 => if self.sacked_out > 0 {Some(DataValue::Int(self.sacked_out as i64))} else {None},
83+
13 => if self.retrans > 0 {Some(DataValue::Int(self.retrans as i64))} else {None},
84+
14 => if self.rcv_ssthresh > 0 {Some(DataValue::Int(self.rcv_ssthresh as i64))} else {None},
85+
15 => if self.rttvar > 0 {Some(DataValue::Int(self.rttvar as i64))} else {None},
86+
16 => if self.advmss > 0 {Some(DataValue::Int(self.advmss as i64))} else {None},
87+
17 => if self.reordering > 0 {Some(DataValue::Int(self.reordering as i64))} else {None},
88+
18 => if self.rcv_rtt > 0 {Some(DataValue::Int(self.rcv_rtt as i64))} else {None},
89+
19 => if self.rcv_space > 0 {Some(DataValue::Int(self.rcv_space as i64))} else {None},
90+
20 => if self.bytes_received > 0 {Some(DataValue::Int(self.bytes_received as i64))} else {None},
91+
21 => if self.segs_out> 0 {Some(DataValue::Int(self.segs_out as i64))} else {None},
92+
22 => if self.segs_in > 0 {Some(DataValue::Int(self.segs_in as i64))} else {None},
93+
23 => if self.snd_wscale > 0 {Some(DataValue::Int(self.snd_wscale as i64))} else {None},
94+
24 => if self.rcv_wscale > 0 {Some(DataValue::Int(self.rcv_wscale as i64))} else {None},
6195
_ => None, // TODO: better error handling
6296
}
6397
}
6498
fn get_default_field(&self, index: usize) -> DataValue {
6599
match index {
100+
0 => DataValue::Int(0),
101+
1 => DataValue::Int(0),
102+
2 => DataValue::Int(0),
103+
3 => DataValue::Int(0),
104+
4 => DataValue::Int(0),
105+
5 => DataValue::Int(0),
106+
6 => DataValue::Int(0),
107+
7 => DataValue::Int(0),
108+
8 => DataValue::Int(0),
109+
9 => DataValue::Int(0),
110+
10 => DataValue::Int(0),
111+
11 => DataValue::Int(0),
112+
12 => DataValue::Int(0),
113+
13 => DataValue::Int(0),
114+
14 => DataValue::Int(0),
115+
15 => DataValue::Int(0),
116+
16 => DataValue::Int(0),
117+
17 => DataValue::Int(0),
118+
18 => DataValue::Int(0),
119+
19 => DataValue::Int(0),
120+
20 => DataValue::Int(0),
121+
21 => DataValue::Int(0),
122+
22 => DataValue::Int(0),
123+
23 => DataValue::Int(0),
124+
24 => DataValue::Int(0),
66125
_ => panic!("Tried to access out of bounds index!"), // TODO: better error handling
67126
}
68127
}
69128
fn get_field_name(&self, index: usize) -> &str {
70129
match index {
130+
0 => "pacing_rate",
131+
1 => "max_pacing_rate",
132+
2 => "backoff",
133+
3 => "rto",
134+
4 => "ato",
135+
5 => "rcv_mss",
136+
6 => "snd_cwnd",
137+
7 => "bytes_acked",
138+
8 => "snd_ssthresh",
139+
9 => "total_retrans",
140+
10 => "probes",
141+
11 => "lost",
142+
12 => "sacked_out",
143+
13 => "retrans",
144+
14 => "rcv_ssthresh",
145+
15 => "rttvar",
146+
16 => "advmss",
147+
17 => "reordering",
148+
18 => "rcv_rtt",
149+
19 => "rcv_space",
150+
20 => "bytes_received",
151+
21 => "segs_out",
152+
22 => "segs_in",
153+
23 => "snd_wscale",
154+
24 => "rcv_wscale",
71155
_ => panic!("Tried to access out of bounds index!"), // TODO: better error handling
72156
}
73157
}
@@ -114,12 +198,15 @@ impl EventIndexer for sock_trace_entry {
114198
}
115199
}
116200
fn get_max_index(&self) -> usize {
117-
0
201+
24
118202
}
119203
fn get_timestamp(&self) -> f64 {
120204
self.time as f64
121205
}
122206
fn as_db_op(self) -> DBOperation {
123207
DBOperation::Socket(self)
124208
}
209+
fn get_struct_length(&self) -> usize {
210+
160
211+
}
125212
}

db/src/bindings/tcp_packet.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
22

3+
use serde::Deserialize;
34
use ts_storage::{DataValue, IpTuple};
45

56
use crate::{db_writer::DBOperation, flow_tracker::EventIndexer, reader::FromBuffer};
67

78
#[repr(C)]
8-
#[derive(Debug, Clone, Copy, Default)]
9+
#[derive(Debug, Clone, Copy, Default, Deserialize)]
910
pub struct TcpPacket {
1011
pub time: u64,
1112
pub saddr: u32,
@@ -24,14 +25,21 @@ pub struct TcpPacket {
2425
pub flag_syn: bool,
2526
pub flag_fin: bool,
2627
pub checksum: u16,
27-
pub div: u32,
28+
pub div: [u8; 4usize],
2829
}
2930

3031
impl FromBuffer for TcpPacket {
3132
fn from_buffer(buf: &Vec<u8>) -> Self {
32-
unsafe { *(buf.as_ptr() as *const TcpPacket) }
33+
let try_deserialize = bincode::deserialize::<'_, TcpPacket>(buf);
34+
35+
if try_deserialize.is_err() {
36+
TcpPacket::default()
37+
} else {
38+
try_deserialize.unwrap()
39+
}
3340

3441
}
42+
const ENTRY_SIZE: usize = 74;
3543
}
3644

3745
impl EventIndexer for TcpPacket {
@@ -110,4 +118,7 @@ impl EventIndexer for TcpPacket {
110118
fn as_db_op(self) -> DBOperation {
111119
DBOperation::Packet(self)
112120
}
121+
fn get_struct_length(&self) -> usize {
122+
74
123+
}
113124
}

db/src/bindings/tcp_probe.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
22

3+
use serde::Deserialize;
34
use ts_storage::{DataValue, IpTuple};
45

56
use crate::{
67
db_writer::DBOperation, flow_tracker::{EventIndexer, AF_INET}, reader::FromBuffer, shorten_to_ipv4, shorten_to_ipv6
78
};
89

910
#[repr(C)]
10-
#[derive(Debug, Clone, Copy, Default)]
11+
#[derive(Debug, Clone, Copy, Default, Deserialize)]
1112
pub struct TcpProbe {
1213
pub time: u64,
1314
pub saddr: [u8; 28usize],
@@ -25,13 +26,20 @@ pub struct TcpProbe {
2526
pub srtt: u32,
2627
pub rcv_wnd: u32,
2728
pub sock_cookie: u64,
28-
pub div: u32,
29+
pub div: [u8; 4usize],
2930
}
3031

3132
impl FromBuffer for TcpProbe {
3233
fn from_buffer(buf: &Vec<u8>) -> Self {
33-
unsafe { *(buf.as_ptr() as *const TcpProbe) }
34+
let try_deserialize = bincode::deserialize::<'_, TcpProbe>(buf);
35+
36+
if try_deserialize.is_err() {
37+
TcpProbe::default()
38+
} else {
39+
try_deserialize.unwrap()
40+
}
3441
}
42+
const ENTRY_SIZE: usize = 116;
3543
}
3644

3745
impl EventIndexer for TcpProbe {
@@ -109,4 +117,7 @@ impl EventIndexer for TcpProbe {
109117
fn as_db_op(self) -> DBOperation {
110118
DBOperation::Probe(self)
111119
}
120+
fn get_struct_length(&self) -> usize {
121+
116
122+
}
112123
}

0 commit comments

Comments
 (0)