Skip to content

Commit cf32618

Browse files
MoritzFluMoritz Flüchter
andauthored
Add support for DuckDB Backend, some fixes to recording (#9)
* Implement DuckDB CUrsor * Storing current state, not working * It works but values written are wrong for time series * Change sequence creation to IF EXISTS * Fix kernel trace file names in process * Add duckdb to viz tool * Add duckdb to path selector * Improve flow selection * Fix dport mapping for sock recording * Udpate * Fix DataValue parsing from DuckDB Union * Update README.md * Move queries to extra file * Add test db files to gitignore * Add duckdb dependency to github workflow * Switch to bundled version --------- Co-authored-by: Moritz Flüchter <moritz.fluechter@uni-tuebingen.de>
1 parent 5b1396d commit cf32618

File tree

23 files changed

+1268
-150
lines changed

23 files changed

+1268
-150
lines changed

.github/workflows/tcbee.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ jobs:
1919
- name: Install Dependencies
2020
run: |
2121
sudo apt update
22-
sudo apt install -y llvm clang libelf-dev libclang-dev pkg-config fontconfig libfontconfig1-dev
22+
sudo apt install -y llvm clang libelf-dev libclang-dev pkg-config fontconfig libfontconfig1-dev
2323
cargo install bpf-linker
2424
2525
- name: TCBee-Record Build

README.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,9 @@ The current Todo-List includes
4040

4141
- Documentation for the tools and interfaces
4242
- Add plugins for the calculation of common TCP congestion metrics
43-
- Implement InfluxDB interface for faster processing
43+
- Add full IPv6 support for kernel hooks
4444
- Test and benchmark bottlenecks (eBPF Ringbuf size, File writer, etc.)
4545
- Cleanup of eBPF and user space code
46-
- Change UI based on selected traces
4746
- ...
4847

4948
The current version is tested for linux kernel 6.13.6 and may not work on older or newer kernel versions.
@@ -56,7 +55,7 @@ TCBee
5655

5756
* provides a command-line program to record flows and track current data rates
5857
* monitors both packet headers for incoming and outgoing packets
59-
* hooks onto the linux kernel functions `tcp_sendmsg` and `tcp_recvmsg` to read kernel metrics
58+
* hooks onto the linux kernel tcp sending and receiving functions to read kernel metrics **per packet**
6059
* stores recorded data in a structured flow database
6160
* provides a simple plugin interface to calculate metrics from recorded data and save the results
6261
* comes with a visualization tool to analyse and compare TCP flow metrics

tcbee-process/run.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
#!/bin/bash
22
# Runs the rust program as sudo, needed privileges
3-
RUST_LOG=debug cargo run --release --config 'target."cfg(all())".runner="sudo -E"'
3+
RUST_LOG=debug cargo run --release -- --output ./db.duck -t

tcbee-process/src/bindings/sock.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ impl EventIndexer for sock_trace_entry {
186186

187187
// TODO: check byte order if ports are correct
188188
// Dport could be be bytes
189-
let sport = u16::from_le_bytes(srcbytes);
189+
let sport = u16::from_be_bytes(srcbytes);
190190
let dport = u16::from_le_bytes(dstbytes);
191191

192192
IpTuple {

tcbee-process/src/db_writer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ pub struct DBWriter {
2828

2929
impl DBWriter {
3030
pub fn new(
31-
file: &str,
31+
backend: DBBackend,
3232
rx: Receiver<DBOperation>,
3333
status: ProgressBar,
3434
) -> Result<DBWriter, Box<dyn Error>> {
3535
let db: Box<dyn TSDBInterface + Send> =
36-
database_factory::<SQLiteTSDB>(DBBackend::SQLite(file.to_string()))?;
36+
database_factory::<SQLiteTSDB>(backend)?;
3737

3838
let streams: HashMap<IpTuple, FlowTracker> = HashMap::new();
3939

tcbee-process/src/main.rs

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ mod bindings {
1010
pub mod cwnd;
1111
}
1212

13-
use argparse::{ArgumentParser, Store};
13+
use argparse::{ArgumentParser, Store, StoreOption, StoreTrue};
1414
use bindings::{sock::sock_trace_entry, cwnd::cwnd_trace_entry, tcp_packet::TcpPacket, tcp_probe::TcpProbe};
1515
use db_writer::{DBOperation, DBWriter};
1616
use flow_tracker::{EventIndexer, EventType, FlowTracker, TsTracker};
@@ -24,6 +24,7 @@ use tokio::{
2424
task::{self, JoinHandle},
2525
};
2626
use tokio_util::sync::CancellationToken;
27+
use ts_storage::DBBackend;
2728

2829
use core::num;
2930
use std::{
@@ -95,17 +96,60 @@ async fn start_file_reader<
9596
async fn main() -> Result<(), Box<dyn Error>> {
9697
env_logger::init();
9798

98-
let mut dir: String = "/tmp/".to_string();
99+
let mut source: String = "/tmp/".to_string();
100+
let mut output: String = "".to_string();
101+
let mut sqlite: bool = false;
102+
let mut duckdb: bool = false;
103+
99104
{
100105
let mut argparser = ArgumentParser::new();
101-
argparser.refer(&mut dir).add_option(
102-
&["-d", "--dir"],
106+
argparser.refer(&mut source).add_option(
107+
&["-s", "--source"],
108+
Store,
109+
"Directory to search for TCBee recording *.tcp files!",
110+
);
111+
argparser.refer(&mut output).add_option(
112+
&["-o", "--output"],
103113
Store,
104-
"Directory to read recording results from. Defaults to /tmp/",
114+
"Path for outoput database file",
115+
);
116+
argparser.refer(&mut sqlite).add_option(
117+
&["-q", "--sqlite"],
118+
StoreTrue,
119+
"Store result to SQLITE",
105120
);
121+
argparser.refer(&mut duckdb).add_option(
122+
&["-d", "--duckdb"],
123+
StoreTrue,
124+
"Store result to DuckDB, better performance",
125+
);
126+
106127
argparser.parse_args_or_exit();
107128
}
108129

130+
if !sqlite && !duckdb {
131+
print!("Please select either --sqlite or --duckdb");
132+
return Ok(());
133+
}
134+
if sqlite && duckdb {
135+
print!("Please select either --sqlite or --duckdb");
136+
return Ok(());
137+
}
138+
139+
if output.is_empty() {
140+
if sqlite {
141+
output = "/tmp/db.sqlite".to_string();
142+
}
143+
if duckdb {
144+
output = "/tmp/db.duck".to_string();
145+
}
146+
}
147+
148+
let mut backend = DBBackend::SQLite(output.clone());
149+
if duckdb {
150+
backend = DBBackend::DuckDB(output);
151+
}
152+
109153
let progress_bars = MultiProgress::new();
110154

111155
let status = progress_bars.add(ProgressBar::new(5));
@@ -123,7 +167,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
123167
println!("Starting readers, initial processing may be slow due to setup of streams!");
124168

125169
// Create DB Backend handler
126-
let db_res = DBWriter::new("db.sqlite", rx,status);
170+
let db_res = DBWriter::new(backend, rx,status);
127171
if db_res.is_err() {
128172
panic!("Could not open Database! Error: {}", db_res.err().unwrap())
129173
}
@@ -146,49 +190,49 @@ async fn main() -> Result<(), Box<dyn Error>> {
146190

147191
let threads = vec![
148192
start_file_reader::<TcpPacket>(
149-
prepend_string("xdp.tcp".to_string(),&dir),
193+
prepend_string("xdp.tcp".to_string(),&source),
150194
tx.clone(),
151195
stop_token.clone(),
152196
&progress_bars,
153197
)
154198
.await,
155199
start_file_reader::<TcpPacket>(
156-
prepend_string("tc.tcp".to_string(),&dir),
200+
prepend_string("tc.tcp".to_string(),&source),
157201
tx.clone(),
158202
stop_token.clone(),
159203
&progress_bars,
160204
)
161205
.await,
162206
start_file_reader::<TcpProbe>(
163-
prepend_string("probe.tcp".to_string(),&dir),
207+
prepend_string("probe.tcp".to_string(),&source),
164208
tx.clone(),
165209
stop_token.clone(),
166210
&progress_bars,
167211
)
168212
.await,
169213
start_file_reader::<sock_trace_entry>(
170-
prepend_string("sock_send.tcp".to_string(),&dir),
214+
prepend_string("send_sock.tcp".to_string(),&source),
171215
tx.clone(),
172216
stop_token.clone(),
173217
&progress_bars,
174218
)
175219
.await,
176220
start_file_reader::<sock_trace_entry>(
177-
prepend_string("sock_recv.tcp".to_string(),&dir),
221+
prepend_string("recv_sock.tcp".to_string(),&source),
178222
tx.clone(),
179223
stop_token.clone(),
180224
&progress_bars,
181225
)
182226
.await,
183227
start_file_reader::<cwnd_trace_entry>(
184-
prepend_string("recv_cwnd.tcp".to_string(),&dir),
228+
prepend_string("recv_cwnd.tcp".to_string(),&source),
185229
tx.clone(),
186230
stop_token.clone(),
187231
&progress_bars,
188232
)
189233
.await,
190234
start_file_reader::<cwnd_trace_entry>(
191-
prepend_string("send_cwnd.tcp".to_string(),&dir),
235+
prepend_string("send_cwnd.tcp".to_string(),&source),
192236
tx.clone(),
193237
stop_token.clone(),
194238
&progress_bars,

tcbee-viz/src/modules/backend/intermediate_backend.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::modules::{
1616
use crate::TSDBInterface;
1717
use iced::widget::canvas::Cache;
1818
use plotters::style::RGBAColor;
19-
use ts_storage::{database_factory, sqlite::SQLiteTSDB, DBBackend, DataValue, Flow};
19+
use ts_storage::{database_factory, duckdb::DuckDBTSDB, sqlite::SQLiteTSDB, DBBackend, DataValue, Flow};
2020
use ts_storage::{DataPoint, TimeSeries};
2121
use std::{cell::RefCell, f64::{MAX, MIN}, path::PathBuf, slice::Iter, sync::RwLock};
2222

@@ -29,6 +29,7 @@ use super::{app_settings::ApplicationSettings, struct_tcp_flow_wrapper::TcpFlowW
2929
pub enum DataSource {
3030
Influx,
3131
Sqllite,
32+
DuckDB,
3233
None,
3334
}
3435

@@ -45,6 +46,7 @@ impl ToString for DataSource {
4546
match self {
4647
DataSource::Influx => String::from("Influx"),
4748
DataSource::Sqllite => String::from("Sqllite"),
49+
DataSource::DuckDB => String::from("DuckDB"),
4850
DataSource::None => String::from("Nothing selected"),
4951
}
5052
}
@@ -245,9 +247,10 @@ impl IntermediateBackend {
245247
// DEBUG
246248
pub fn receive_flow_formatted(&self, flow: &Flow) -> String {
247249
format!(
248-
"ID:{:?} Port: (src: {:?}) IP(src: {:?} : dst: {:?}) ",
250+
"ID:{:?} Port: {:?}-{:?} IP: {:?}-{:?}",
249251
flow.get_id().unwrap(),
250252
flow.tuple.sport,
253+
flow.tuple.dport,
251254
flow.tuple.src,
252255
flow.tuple.dst
253256
)
@@ -276,7 +279,19 @@ impl IntermediateBackend {
276279
database_factory::<SQLiteTSDB>(DBBackend::SQLite(path_db.clone()))
277280
.expect("could not parse database"),
278281
);
279-
println!("initialized database of time {:?}", source);
282+
println!("initialized SQLITE database of time {:?}", source);
283+
IntermediateBackend {
284+
source_type: source.clone(),
285+
database_interface: Some(db_interface),
286+
database_path: Some(PathBuf::from(path_db))
287+
}
288+
},
289+
DataSource::DuckDB => {
290+
let db_interface: Arc<Box<dyn TSDBInterface>> = Arc::new(
291+
database_factory::<DuckDBTSDB>(DBBackend::DuckDB(path_db.clone()))
292+
.expect("could not parse database"),
293+
);
294+
println!("initialized DUCKDB database of time {:?}", source);
280295
IntermediateBackend {
281296
source_type: source.clone(),
282297
database_interface: Some(db_interface),

tcbee-viz/src/modules/backend/lib_system_io.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub fn receive_source_from_path(path: &PathBuf) -> Option<DataSource> {
1010
match extension {
1111
"sqlite" => Some(DataSource::Sqllite),
1212
"influx" => Some(DataSource::Influx),
13+
"duck" => Some(DataSource::DuckDB),
1314
_ => None,
1415
}
1516
}

tcbee-viz/src/modules/ui/lib_screens/screen_home.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ impl ScreenHome {
7171
let headline = text("Selecting Database").size(TEXT_HEADLINE_0_SIZE);
7272
let description = text(
7373
"
74-
This program aims to replace tcptrace+xplot for analyzing PCAP-Files containing TCP-Flows.\n
74+
This program aims to visualize recorded metrics TCP-Flows for an explorative analysis.\n
7575
Its able to visualize single and multiple flows with their corresponding timeseries information.\n
7676
Modification of the database is supplied via an extendable feature-system.\n
7777
",
@@ -149,7 +149,7 @@ impl ScreenHome {
149149
// FIXME maybe async this operation?
150150

151151
let file_selection = FileDialog::new()
152-
.add_filter("text", &["sqlite"])
152+
.add_filter("*.sqlite or *.duck", &["sqlite","duck"])
153153
.set_directory("~/")
154154
.pick_file();
155155
// FIXME improve error handling

tcbee-viz/src/modules/ui/lib_widgets/app_widgets.rs

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,31 +6,29 @@
66

77
use std::sync::{Arc, RwLock};
88

9-
use crate::modules::{
9+
use crate::{modules::{
1010
backend::{
1111
app_settings::ApplicationSettings, intermediate_backend::IntermediateBackend, lib_system_io::receive_file_metadata, plot_data_preprocessing::convert_rgba_to_iced_color, struct_tcp_flow_wrapper::TcpFlowWrapper
1212
},
1313
ui::{
1414
lib_styling::app_style_settings::{
15-
HORIZONTAL_LINE_SECONDARY_HEIGHT, PADDING_AROUND_CONTENT, SLIDER_STEP_SIZE,
16-
SPACE_BETWEEN_ELEMENTS, SPLIT_CHART_MAX_HEIGHT, SPLIT_CHART_MIN_HEIGHT,
17-
TEXT_HEADLINE_0_SIZE, TEXT_HEADLINE_1_SIZE, TEXT_HEADLINE_2_SIZE,
15+
HORIZONTAL_LINE_PRIMARY_HEIGHT, HORIZONTAL_LINE_SECONDARY_HEIGHT, PADDING_AROUND_CONTENT, SLIDER_STEP_SIZE, SPACE_BETWEEN_ELEMENTS, SPLIT_CHART_MAX_HEIGHT, SPLIT_CHART_MIN_HEIGHT, TEXT_HEADLINE_0_SIZE, TEXT_HEADLINE_1_SIZE, TEXT_HEADLINE_2_SIZE
1816
},
1917
lib_widgets::lib_graphs::{
2018
struct_processed_plot_data::ProcessedPlotData,
2119
struct_string_series_wrapper::{view_wrapper, StringSeriesWrapper},
2220
struct_zoom_bounds::ZoomBound2D,
2321
},
2422
},
25-
};
23+
}, Message};
2624
use iced::{
27-
widget::{
25+
theme::palette::Background, widget::{
2826
button, checkbox, radio, scrollable, slider, text, Button, Checkbox, Column, Row, Rule,
29-
Space,
30-
},
31-
Alignment, Element, Length,
27+
Space, Text,
28+
}, Alignment, Element, Length
3229
};
3330

31+
/// OTHER FUNCTIONS
3432
pub fn display_current_mouse_position<'a, Message: 'a>(
3533
maybe_position: Option<(f64, f64)>,
3634
) -> Column<'a, Message> {
@@ -82,18 +80,38 @@ fn generate_selections_for_flows<'a, Message: 'a + Clone>(
8280
let interface = &ref_backend.database_interface;
8381

8482
if let Some(db_connection) = interface {
83+
84+
let header_1: Row<'_, _> = Row::<Message>::new()
85+
.push(Text::new("ID").width(Length::FillPortion(1))) // Space for radio button
86+
.push(Text::new("Source").width(Length::FillPortion(3)))
87+
.push(Text::new("Destination").width(Length::FillPortion(3)));
88+
89+
90+
new_col = new_col.push(header_1);
91+
8592
let all_flows = db_connection.list_flows().expect("could not find flows");
8693
for entry in all_flows {
87-
new_col = new_col.push(
88-
radio(
89-
ref_backend.receive_flow_formatted(&entry),
90-
// self.backend_interface.receive_flow_formatted(&entry),
91-
entry.get_id().expect("no flow id").clone(),
94+
95+
let tuple = &entry.tuple;
96+
let first_flow_row = Row::<Message>::new()
97+
.push(Text::new(entry.get_id().unwrap()).width(Length::FillPortion(1)))
98+
.push(Text::new(tuple.src.to_string()).width(Length::FillPortion(3)))
99+
.push(Text::new(tuple.dst.to_string()).width(Length::FillPortion(3)));
100+
let second_flow_row = Row::<Message>::new()
101+
.push(
102+
radio(
103+
"",
104+
entry.get_id().expect("no flow id"),
92105
focused_flow.flow_id,
93-
// self.backend_interface.receive_active_flow_id(),
94106
message_on_click.clone(),
95-
), // text(format!("flow: {:?}",entry))
96-
);
107+
)
108+
.width(Length::FillPortion(1))
109+
)
110+
.push(Text::new(tuple.sport.to_string()).width(Length::FillPortion(3)))
111+
.push(Text::new(tuple.dport.to_string()).width(Length::FillPortion(3)));
112+
113+
114+
new_col = new_col.push(Rule::horizontal(HORIZONTAL_LINE_SECONDARY_HEIGHT)).push(first_flow_row).push(second_flow_row);
97115
}
98116
}
99117
new_col

0 commit comments

Comments
 (0)