Skip to content

Commit 6282334

Browse files
committed
feat: finish initial ct rr worker communication: now with sockets
there were problems with stdio, this is just ok for now some help from Nikola for some bugs
1 parent 33507cb commit 6282334

File tree

3 files changed

+96
-34
lines changed

3 files changed

+96
-34
lines changed

src/db-backend/src/main.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,18 +126,25 @@ fn main() -> Result<(), Box<dyn Error>> {
126126
match cli.cmd {
127127
Commands::DapServer { socket_path, stdio } => {
128128
if stdio {
129-
let _ = db_backend::dap_server::run_stdio();
129+
// thread::spawn(move || {
130+
let res = db_backend::dap_server::run_stdio();
131+
if let Err(e) = res {
132+
error!("dap server run error: {e:?}");
133+
}
134+
// })
130135
} else {
131136
let socket_path = if let Some(p) = socket_path {
132137
p
133138
} else {
134139
let pid = std::process::id() as usize;
135140
db_backend::dap_server::socket_path_for(pid)
136141
};
137-
138-
info!("dap_server::run {:?}", socket_path);
139-
140-
let _ = db_backend::dap_server::run(&socket_path);
142+
// thread::spawn(move || {
143+
let res = db_backend::dap_server::run(&socket_path);
144+
if let Err(e) = res {
145+
error!("dap server run error: {e:?}");
146+
}
147+
// })
141148
};
142149
}
143150
Commands::IndexDiff {

src/db-backend/src/paths.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
// copied from Stan's paths.rs in src/db-backend in the public codetracer repo
2+
// added run_dir/ct_rr_worker_socket_path (and copied to db-backend too)
3+
14
use std::env;
2-
use std::path::PathBuf;
5+
use std::error::Error;
6+
use std::path::{Path, PathBuf};
37
use std::sync::{LazyLock, Mutex};
48

59
pub struct Paths {
@@ -24,3 +28,24 @@ impl Default for Paths {
2428
}
2529

2630
pub static CODETRACER_PATHS: LazyLock<Mutex<Paths>> = LazyLock::new(|| Mutex::new(Paths::default()));
31+
32+
pub fn run_dir_for(tmp_path: &Path, run_id: usize) -> Result<PathBuf, Box<dyn Error>> {
33+
let run_dir = tmp_path.join(format!("run-{run_id}"));
34+
std::fs::create_dir_all(&run_dir)?;
35+
Ok(run_dir)
36+
}
37+
38+
pub fn ct_rr_worker_socket_path(from: &str, worker_name: &str, run_id: usize) -> Result<PathBuf, Box<dyn Error>> {
39+
let tmp_path: PathBuf = { CODETRACER_PATHS.lock()?.tmp_path.clone() };
40+
let run_dir = run_dir_for(&tmp_path, run_id)?;
41+
// eventually: TODO: unique index or better cleanup
42+
// if worker with the same name started/restarted multiple times
43+
// by the same backend instance
44+
Ok(run_dir.join(format!("ct_rr_worker_{worker_name}_from_{from}.sock")))
45+
46+
// TODO: decide if we need to check/eventually remove or the unique run folder/paths are enough:
47+
//
48+
// if std::fs::metadata(&receiving_socket_path).is_ok() {
49+
// let _ = std::fs::remove_file(&receiving_socket_path); // try to remove if existing: ignore error
50+
// }
51+
}

src/db-backend/src/rr_dispatcher.rs

Lines changed: 58 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
use std::error::Error;
22
use std::io::Write;
3-
use std::io::{BufRead, BufReader, Read};
3+
use std::io::{BufRead, BufReader};
4+
use std::os::unix::net::UnixStream;
45
use std::path::{Path, PathBuf};
56
use std::process::{Child, Command, Stdio};
67
use std::thread;
8+
use std::time::Duration;
79

810
use log::info;
911

12+
use crate::paths::ct_rr_worker_socket_path;
1013
use crate::query::CtRRQuery;
1114

1215
#[derive(Debug)]
@@ -23,6 +26,8 @@ pub struct CtRRWorker {
2326
pub ct_rr_worker_exe: PathBuf,
2427
pub rr_trace_folder: PathBuf,
2528
process: Option<Child>,
29+
sending_stream: Option<UnixStream>,
30+
receiving_stream: Option<UnixStream>,
2631
}
2732

2833
#[derive(Default)]
@@ -39,6 +44,8 @@ impl CtRRWorker {
3944
ct_rr_worker_exe: PathBuf::from(ct_rr_worker_exe),
4045
rr_trace_folder: PathBuf::from(rr_trace_folder),
4146
process: None,
47+
sending_stream: None,
48+
receiving_stream: None,
4249
}
4350
}
4451

@@ -59,47 +66,70 @@ impl CtRRWorker {
5966
.spawn()?;
6067

6168
self.process = Some(ct_worker);
62-
thread::sleep_ms(1_000);
69+
self.setup_worker_sockets()?;
6370
self.active = true;
6471
Ok(())
6572
}
6673

74+
fn setup_worker_sockets(&mut self) -> Result<(), Box<dyn Error>> {
75+
// assuming that the ct rr worker creates the sockets!
76+
// code copied and adapted from `connect_socket_with_backend_and_loop` in ct-rr-worker
77+
// which is itself copied/adapted/written from/based on https://emmanuelbosquet.com/2022/whatsaunixsocket/
78+
79+
let run_id = std::process::id() as usize;
80+
81+
// sending socket:
82+
let sending_socket_path = ct_rr_worker_socket_path("backend", &self.name, run_id)?;
83+
info!(
84+
"try to connect to worker with sending socket in {}",
85+
sending_socket_path.display()
86+
);
87+
loop {
88+
if let Ok(sending_stream) = UnixStream::connect(&sending_socket_path) {
89+
self.sending_stream = Some(sending_stream);
90+
break;
91+
}
92+
thread::sleep(Duration::from_millis(1));
93+
// TODO: handle different kinds of errors
94+
}
95+
// receiving socket:
96+
let receiving_socket_path = ct_rr_worker_socket_path("worker", &self.name, run_id)?;
97+
info!(
98+
"try to connect to worker with receiving socket in {}",
99+
receiving_socket_path.display()
100+
);
101+
loop {
102+
if let Ok(receiving_stream) = UnixStream::connect(&receiving_socket_path) {
103+
self.receiving_stream = Some(receiving_stream);
104+
break;
105+
}
106+
thread::sleep(Duration::from_millis(1));
107+
// TODO: handle different kinds of errors
108+
}
109+
110+
Ok(())
111+
}
112+
67113
// for now: don't return a typed value here, only ok or an error
68114
pub fn run_query(&mut self, query: CtRRQuery) -> Result<(), Box<dyn Error>> {
69-
let mut stdin = self
70-
.process
71-
.as_mut()
72-
.expect("valid process")
73-
.stdin
74-
.take()
75-
.expect("stdin: TODO error");
76-
let mut stdout = self
77-
.process
78-
.as_mut()
79-
.expect("valid process")
80-
.stdout
81-
.take()
82-
.expect("stdout: TODO error");
83-
84115
let raw_json = serde_json::to_string(&query)?;
85-
let reader = BufReader::new(stdout);
86116

87117
info!("send to worker {raw_json}\n");
88-
write!(stdin, "{}\n", raw_json)?;
118+
self.sending_stream
119+
.as_mut()
120+
.expect("valid sending stream")
121+
.write(&format!("{raw_json}\n").into_bytes())?;
89122

90123
let mut res = "".to_string();
91124
info!("wait to read");
92125

93-
for line_result in reader.lines() {
94-
info!("line_result {line_result:?}");
95-
if let Ok(line) = line_result {
96-
res.push_str(&line);
97-
res.push_str("\n");
98-
} else {
99-
continue;
100-
}
101-
}
126+
let mut reader = BufReader::new(self.receiving_stream.as_mut().expect("valid receiving stream"));
127+
reader.read_line(&mut res)?;
128+
129+
res = String::from(res.trim()); // trim newlines/whitespace!
130+
102131
info!("res {res}");
132+
103133
if res == "ok" {
104134
Ok(())
105135
} else {

0 commit comments

Comments
 (0)