Skip to content

Commit 870b27b

Browse files
committed
Push Rx to its own thread
1 parent a324f93 commit 870b27b

File tree

2 files changed

+43
-34
lines changed

2 files changed

+43
-34
lines changed

Cargo.toml

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "net_hex"
3-
version = "0.3.0"
3+
version = "0.4.1"
44
authors = ["Jack Newman jacknewman12@gmail.com"]
55
edition = "2018"
66
license = "MIT"
@@ -11,11 +11,8 @@ license = "MIT"
1111
hexplay = "*"
1212
hex = "0.3.2"
1313
structopt = "0.2"
14-
nom = "*"
1514
crossbeam-channel = "0.3.8"
16-
tui = "0.6.1"
17-
termion = "1.5"
18-
rand = "*"
15+
crossbeam-utils = "0.6"
1916

2017
[dependencies.pnet]
2118
version = "0.22.0"

src/main.rs

Lines changed: 41 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
extern crate crossbeam_channel;
2+
extern crate crossbeam_utils;
23
extern crate hex;
34
extern crate hexplay;
45
extern crate pnet;
@@ -32,11 +33,11 @@ fn print_interfaces() {
3233
struct Opt {
3334
/// Number of packet to receive before exiting
3435
#[structopt(short = "c", long = "count", default_value = "-1")]
35-
count: i64,
36+
rx_count: i64,
3637

3738
/// Number of packet to receive before exiting
3839
#[structopt(short = "t", long = "timeout")]
39-
timeout: Option<u64>,
40+
rx_timeout: Option<u64>,
4041

4142
/// Number of packet to transmit before exiting
4243
#[structopt(short = "s", long = "send", default_value = "1")]
@@ -66,9 +67,6 @@ fn main() {
6667
std::process::exit(0);
6768
};
6869

69-
let rx_timeout = opt.timeout.map(Duration::from_secs);
70-
let mut rx_countlimit = opt.count;
71-
7270
// Find the network interface with the provided name
7371
let interfaces = datalink::interfaces();
7472
let interface = interfaces
@@ -88,6 +86,9 @@ fn main() {
8886
Err(e) => panic!("Error while creating datalink channel: {:?}", e),
8987
};
9088

89+
// Progressing to rx and tx steps. Add a waitgroup for them
90+
let wg = crossbeam_utils::sync::WaitGroup::new();
91+
9192
// Decode the hex input if the user specified one
9293
if let Some(arg) = opt.bytes {
9394
let bytes = match Vec::from_hex(arg) {
@@ -100,6 +101,7 @@ fn main() {
100101

101102
let rate = opt.tx_rate;
102103
let count = opt.tx_send;
104+
let wg = wg.clone();
103105
thread::spawn(move || {
104106
// Transmit those bytes
105107
let ticker = rate
@@ -117,33 +119,43 @@ fn main() {
117119
tick.recv().expect("Ticker died?");
118120
}
119121
}
122+
drop(wg);
120123
});
121124
}
122125

123-
// Now do the Rx part
124-
let now = Instant::now();
125-
while rx_countlimit != 0 {
126-
match rx.next() {
127-
Ok(packet) => {
128-
println!("----- Recv Packet -----");
129-
use hexplay::HexViewBuilder;
130-
let view = HexViewBuilder::new(packet).row_width(16).finish();
131-
println!("{}", view);
132-
rx_countlimit -= 1;
133-
}
134-
Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
135-
// Timeout errors are fine. Ignore.
136-
}
137-
Err(e) => {
138-
// If any other error occurs, we can handle it here
139-
panic!("An error occurred while reading: {:?}", e);
140-
}
141-
}
142-
if let Some(rx_timeout) = rx_timeout {
143-
// If there is a timeout enabled. Check it
144-
if now.elapsed() > rx_timeout {
145-
std::process::exit(0);
126+
{
127+
let rx_timeout = opt.rx_timeout.map(Duration::from_secs);
128+
let mut rx_countlimit = opt.rx_count;
129+
let wg = wg.clone();
130+
// Now do the Rx part
131+
thread::spawn(move || {
132+
let now = Instant::now();
133+
while rx_countlimit != 0 {
134+
match rx.next() {
135+
Ok(packet) => {
136+
println!("Recv Packet");
137+
use hexplay::HexViewBuilder;
138+
let view = HexViewBuilder::new(packet).row_width(16).finish();
139+
println!("{}", view);
140+
rx_countlimit -= 1;
141+
}
142+
Err(ref e) if e.kind() == io::ErrorKind::TimedOut => {
143+
// Timeout errors are fine. Ignore.
144+
}
145+
Err(e) => {
146+
// If any other error occurs, we can handle it here
147+
panic!("An error occurred while reading: {:?}", e);
148+
}
149+
}
150+
if let Some(rx_timeout) = rx_timeout {
151+
// If there is a timeout enabled. Check it
152+
if now.elapsed() > rx_timeout {
153+
std::process::exit(0);
154+
}
155+
}
146156
}
147-
}
157+
drop(wg);
158+
});
148159
}
160+
wg.wait();
149161
}

0 commit comments

Comments
 (0)