Skip to content

Commit b0525ec

Browse files
committed
fix: Clippy and format
1 parent d68dc37 commit b0525ec

File tree

4 files changed

+86
-70
lines changed

4 files changed

+86
-70
lines changed

src/main.rs

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@ extern crate core_affinity;
1919

2020
use config::Config;
2121
use core_affinity::CoreId;
22-
use fork::{fork, Fork};
22+
use fork::{Fork, fork};
2323
use itertools::iproduct;
24-
use nix::sys::signal::{kill, Signal};
24+
use nix::sys::signal::{Signal, kill};
2525
use nix::sys::wait::waitpid;
2626
use nix::unistd::Pid;
2727
use std::time::SystemTime;
2828
use std::{env, thread, time};
2929

30-
use berserker::{worker::new_worker, WorkloadConfig};
30+
use berserker::{WorkloadConfig, worker::new_worker};
3131

3232
fn main() {
3333
let args: Vec<String> = env::args().collect();
@@ -70,8 +70,13 @@ fn main() {
7070

7171
let handles: Vec<_> = iproduct!(core_ids.into_iter(), 0..config.workers)
7272
.map(|(cpu, process)| {
73-
let worker =
74-
new_worker(config, cpu, process, &mut lower, &mut upper);
73+
let worker = new_worker(
74+
config.clone(),
75+
cpu,
76+
process,
77+
&mut lower,
78+
&mut upper,
79+
);
7580

7681
match fork() {
7782
Ok(Fork::Parent(child)) => {
@@ -102,17 +107,20 @@ fn main() {
102107
thread::scope(|s| {
103108
if config.duration != 0 {
104109
// Spin a watcher thread
105-
s.spawn(move || loop {
106-
thread::sleep(time::Duration::from_secs(1));
107-
let elapsed = duration_timer.elapsed().unwrap().as_secs();
108-
109-
if elapsed > config.duration {
110-
for handle in processes.iter().flatten() {
111-
info!("Terminating: {}", *handle);
112-
let _ = kill(Pid::from_raw(*handle), Signal::SIGTERM);
110+
s.spawn(move || {
111+
loop {
112+
thread::sleep(time::Duration::from_secs(1));
113+
let elapsed = duration_timer.elapsed().unwrap().as_secs();
114+
115+
if elapsed > config.duration {
116+
for handle in processes.iter().flatten() {
117+
info!("Terminating: {}", *handle);
118+
let _ =
119+
kill(Pid::from_raw(*handle), Signal::SIGTERM);
120+
}
121+
122+
break;
113123
}
114-
115-
break;
116124
}
117125
});
118126
}

src/worker/bpf.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{
22
cmp,
3-
ffi::{c_char, CString},
3+
ffi::{CString, c_char},
44
fmt::Display,
55
mem, slice, thread,
66
};
@@ -17,7 +17,7 @@ use aya_obj::generated::{
1717

1818
use crate::{BaseConfig, Worker, WorkerError, Workload, WorkloadConfig};
1919

20-
#[derive(Debug, Clone, Copy)]
20+
#[derive(Debug, Clone)]
2121
pub struct BpfWorker {
2222
config: BaseConfig,
2323
workload: WorkloadConfig,

src/worker/network.rs

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
use core_affinity::CoreId;
22
use log::{debug, info, trace};
3-
use rand::{thread_rng, Rng};
3+
use rand::{Rng, thread_rng};
44
use rand_distr::Exp;
55
use std::collections::HashMap;
66
use std::os::unix::io::AsRawFd;
77
use std::str;
88
use std::time::{SystemTime, UNIX_EPOCH};
99
use std::{
1010
fmt::Display,
11-
io::{prelude::*, BufReader},
11+
io::{BufReader, prelude::*},
1212
net::TcpListener,
1313
thread,
1414
};
@@ -17,10 +17,10 @@ use crate::{BaseConfig, Worker, WorkerError, Workload, WorkloadConfig};
1717

1818
use smoltcp::iface::{Config, Interface, SocketSet};
1919
use smoltcp::phy::{
20-
wait as phy_wait, Device, FaultInjector, Medium, Tracer, TunTapInterface,
20+
Device, FaultInjector, Medium, Tracer, TunTapInterface, wait as phy_wait,
2121
};
22-
use smoltcp::socket::tcp;
2322
use smoltcp::socket::AnySocket;
23+
use smoltcp::socket::tcp;
2424
use smoltcp::time::Instant;
2525
use smoltcp::wire::{EthernetAddress, IpAddress, IpCidr, Ipv4Address};
2626

@@ -61,31 +61,33 @@ impl NetworkWorker {
6161
// through streams and replies. This way the connections will have
6262
// high latency, but for the purpose of networking workload it
6363
// doesn't matter.
64-
thread::spawn(move || loop {
65-
let mut buf_reader = BufReader::new(&stream);
66-
let mut buffer = String::new();
67-
68-
match buf_reader.read_line(&mut buffer) {
69-
Ok(0) => {
70-
// EOF, exit
71-
break;
72-
}
73-
Ok(_n) => {
74-
trace!("Received {:?}", buffer);
75-
76-
let response = "hello\n";
77-
match stream.write_all(response.as_bytes()) {
78-
Ok(_) => {
79-
// Response is sent, handle the next one
80-
}
81-
Err(e) => {
82-
trace!("ERROR: sending response, {}", e);
83-
break;
64+
thread::spawn(move || {
65+
loop {
66+
let mut buf_reader = BufReader::new(&stream);
67+
let mut buffer = String::new();
68+
69+
match buf_reader.read_line(&mut buffer) {
70+
Ok(0) => {
71+
// EOF, exit
72+
break;
73+
}
74+
Ok(_n) => {
75+
trace!("Received {:?}", buffer);
76+
77+
let response = "hello\n";
78+
match stream.write_all(response.as_bytes()) {
79+
Ok(_) => {
80+
// Response is sent, handle the next one
81+
}
82+
Err(e) => {
83+
trace!("ERROR: sending response, {}", e);
84+
break;
85+
}
8486
}
8587
}
86-
}
87-
Err(e) => {
88-
trace!("ERROR: reading a line, {}", e)
88+
Err(e) => {
89+
trace!("ERROR: reading a line, {}", e)
90+
}
8991
}
9092
}
9193
});

src/worker/processes.rs

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
use std::{fmt::Display, process::Command, thread, time};
22

33
use core_affinity::CoreId;
4-
use fork::{fork, Fork};
4+
use fork::{Fork, fork};
55
use log::{info, warn};
66
use nix::{sys::wait::waitpid, unistd::Pid};
7-
use rand::{distributions::Alphanumeric, thread_rng, Rng};
7+
use rand::{Rng, distributions::Alphanumeric, thread_rng};
88
use rand_distr::Exp;
99

1010
use crate::{BaseConfig, Worker, WorkerError, Workload, WorkloadConfig};
1111

12-
#[derive(Debug, Clone, Copy)]
12+
#[derive(Debug, Clone)]
1313
pub struct ProcessesWorker {
1414
config: BaseConfig,
1515
workload: WorkloadConfig,
@@ -77,31 +77,37 @@ impl Worker for ProcessesWorker {
7777
unreachable!()
7878
};
7979

80-
loop {
81-
let lifetime: f64 =
82-
thread_rng().sample(Exp::new(departure_rate).unwrap());
80+
thread::scope(|s| {
81+
loop {
82+
let lifetime: f64 =
83+
thread_rng().sample(Exp::new(departure_rate).unwrap());
8384

84-
let worker = *self;
85-
thread::spawn(move || {
86-
worker.spawn_process((lifetime * 1000.0).round() as u64)
87-
});
85+
let worker = self;
8886

89-
let interval: f64 =
90-
thread_rng().sample(Exp::new(arrival_rate).unwrap());
91-
info!(
92-
"{}-{}: Interval {}, rounded {}, lifetime {}, rounded {}",
93-
self.config.cpu.id,
94-
self.config.process,
95-
interval,
96-
(interval * 1000.0).round() as u64,
97-
lifetime,
98-
(lifetime * 1000.0).round() as u64
99-
);
100-
thread::sleep(time::Duration::from_millis(
101-
(interval * 1000.0).round() as u64,
102-
));
103-
info!("{}-{}: Continue", self.config.cpu.id, self.config.process);
104-
}
87+
s.spawn(move || {
88+
worker.spawn_process((lifetime * 1000.0).round() as u64)
89+
});
90+
91+
let interval: f64 =
92+
thread_rng().sample(Exp::new(arrival_rate).unwrap());
93+
info!(
94+
"{}-{}: Interval {}, rounded {}, lifetime {}, rounded {}",
95+
self.config.cpu.id,
96+
self.config.process,
97+
interval,
98+
(interval * 1000.0).round() as u64,
99+
lifetime,
100+
(lifetime * 1000.0).round() as u64
101+
);
102+
thread::sleep(time::Duration::from_millis(
103+
(interval * 1000.0).round() as u64,
104+
));
105+
info!(
106+
"{}-{}: Continue",
107+
self.config.cpu.id, self.config.process
108+
);
109+
}
110+
})
105111
}
106112
}
107113

0 commit comments

Comments
 (0)