Skip to content

Commit 6fd3054

Browse files
committed
fix: use just one socket, not two sockets as Nikola suggested
i(alexander) wasn't sure if it can work well bidirectionally, but it seems it's ok
1 parent 6282334 commit 6fd3054

File tree

1 file changed

+9
-29
lines changed

1 file changed

+9
-29
lines changed

src/db-backend/src/rr_dispatcher.rs

Lines changed: 9 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ pub struct CtRRWorker {
2626
pub ct_rr_worker_exe: PathBuf,
2727
pub rr_trace_folder: PathBuf,
2828
process: Option<Child>,
29-
sending_stream: Option<UnixStream>,
30-
receiving_stream: Option<UnixStream>,
29+
stream: Option<UnixStream>,
3130
}
3231

3332
#[derive(Default)]
@@ -44,8 +43,7 @@ impl CtRRWorker {
4443
ct_rr_worker_exe: PathBuf::from(ct_rr_worker_exe),
4544
rr_trace_folder: PathBuf::from(rr_trace_folder),
4645
process: None,
47-
sending_stream: None,
48-
receiving_stream: None,
46+
stream: None,
4947
}
5048
}
5149

@@ -78,29 +76,11 @@ impl CtRRWorker {
7876

7977
let run_id = std::process::id() as usize;
8078

81-
// sending socket:
82-
let sending_socket_path = ct_rr_worker_socket_path("backend", &self.name, run_id)?;
83-
info!(
84-
"try to connect to worker with sending socket in {}",
85-
sending_socket_path.display()
86-
);
87-
loop {
88-
if let Ok(sending_stream) = UnixStream::connect(&sending_socket_path) {
89-
self.sending_stream = Some(sending_stream);
90-
break;
91-
}
92-
thread::sleep(Duration::from_millis(1));
93-
// TODO: handle different kinds of errors
94-
}
95-
// receiving socket:
96-
let receiving_socket_path = ct_rr_worker_socket_path("worker", &self.name, run_id)?;
97-
info!(
98-
"try to connect to worker with receiving socket in {}",
99-
receiving_socket_path.display()
100-
);
79+
let socket_path = ct_rr_worker_socket_path("", &self.name, run_id)?;
80+
info!("try to connect to worker with socket in {}", socket_path.display());
10181
loop {
102-
if let Ok(receiving_stream) = UnixStream::connect(&receiving_socket_path) {
103-
self.receiving_stream = Some(receiving_stream);
82+
if let Ok(stream) = UnixStream::connect(&socket_path) {
83+
self.stream = Some(stream);
10484
break;
10585
}
10686
thread::sleep(Duration::from_millis(1));
@@ -115,16 +95,16 @@ impl CtRRWorker {
11595
let raw_json = serde_json::to_string(&query)?;
11696

11797
info!("send to worker {raw_json}\n");
118-
self.sending_stream
98+
self.stream
11999
.as_mut()
120100
.expect("valid sending stream")
121101
.write(&format!("{raw_json}\n").into_bytes())?;
122102

123103
let mut res = "".to_string();
124104
info!("wait to read");
125105

126-
let mut reader = BufReader::new(self.receiving_stream.as_mut().expect("valid receiving stream"));
127-
reader.read_line(&mut res)?;
106+
let mut reader = BufReader::new(self.stream.as_mut().expect("valid receiving stream"));
107+
reader.read_line(&mut res)?; // TODO: more robust reading/read all
128108

129109
res = String::from(res.trim()); // trim newlines/whitespace!
130110

0 commit comments

Comments
 (0)