Skip to content

Commit 218721a

Browse files
committed
Stage 3
codetracer-python-recorder/Cargo.lock: codetracer-python-recorder/Cargo.toml: codetracer-python-recorder/src/ffi.rs: codetracer-python-recorder/src/policy.rs: codetracer-python-recorder/src/runtime/io_capture/events.rs: codetracer-python-recorder/src/runtime/io_capture/install.rs: codetracer-python-recorder/src/runtime/io_capture/mod.rs: codetracer-python-recorder/src/runtime/io_capture/mute.rs: codetracer-python-recorder/src/runtime/io_capture/proxies.rs: codetracer-python-recorder/src/runtime/io_capture/sink.rs: codetracer-python-recorder/src/runtime/logging.rs: codetracer-python-recorder/src/runtime/mod.rs: codetracer-python-recorder/src/session.rs: design-docs/io-capture-line-proxy-implementation-plan.status.md: Signed-off-by: Tzanko Matev <[email protected]>
1 parent d380cdd commit 218721a

File tree

14 files changed

+818
-63
lines changed

14 files changed

+818
-63
lines changed

codetracer-python-recorder/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

codetracer-python-recorder/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ serde = { version = "1.0", features = ["derive"] }
3131
serde_json = "1.0"
3232
uuid = { version = "1.10", features = ["v4"] }
3333
recorder-errors = { version = "0.1.0", path = "crates/recorder-errors" }
34+
base64 = "0.22"
3435

3536
[dev-dependencies]
3637
pyo3 = { version = "0.25.1", features = ["auto-initialize"] }

codetracer-python-recorder/src/ffi.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -145,12 +145,9 @@ mod tests {
145145
#[test]
146146
fn map_recorder_error_sets_python_attributes() {
147147
Python::with_gil(|py| {
148-
let err = usage!(
149-
ErrorCode::UnsupportedFormat,
150-
"invalid trace format"
151-
)
152-
.with_context("format", "yaml")
153-
.with_source(std::io::Error::new(std::io::ErrorKind::Other, "boom"));
148+
let err = usage!(ErrorCode::UnsupportedFormat, "invalid trace format")
149+
.with_context("format", "yaml")
150+
.with_source(std::io::Error::new(std::io::ErrorKind::Other, "boom"));
154151
let pyerr = map_recorder_error(err);
155152
let ty = pyerr.get_type(py);
156153
assert!(ty.is(py.get_type::<PyUsageError>()));
@@ -191,9 +188,8 @@ mod tests {
191188
#[test]
192189
fn dispatch_converts_recorder_error_to_pyerr() {
193190
Python::with_gil(|py| {
194-
let result: PyResult<()> = dispatch("dispatch_env", || {
195-
Err(enverr!(ErrorCode::Io, "disk full"))
196-
});
191+
let result: PyResult<()> =
192+
dispatch("dispatch_env", || Err(enverr!(ErrorCode::Io, "disk full")));
197193
let err = result.expect_err("expected PyErr");
198194
let ty = err.get_type(py);
199195
assert!(ty.is(py.get_type::<PyEnvironmentError>()));

codetracer-python-recorder/src/policy.rs

Lines changed: 124 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ pub const ENV_LOG_LEVEL: &str = "CODETRACER_LOG_LEVEL";
2020
pub const ENV_LOG_FILE: &str = "CODETRACER_LOG_FILE";
2121
/// Environment variable enabling JSON error trailers on stderr.
2222
pub const ENV_JSON_ERRORS: &str = "CODETRACER_JSON_ERRORS";
23+
/// Environment variable toggling IO capture strategies.
24+
pub const ENV_CAPTURE_IO: &str = "CODETRACER_CAPTURE_IO";
2325

2426
static POLICY: OnceCell<RwLock<RecorderPolicy>> = OnceCell::new();
2527

@@ -61,6 +63,21 @@ impl FromStr for OnRecorderError {
6163
}
6264
}
6365

66+
#[derive(Debug, Clone, PartialEq, Eq)]
67+
pub struct IoCapturePolicy {
68+
pub line_proxies: bool,
69+
pub fd_fallback: bool,
70+
}
71+
72+
impl Default for IoCapturePolicy {
73+
fn default() -> Self {
74+
Self {
75+
line_proxies: true,
76+
fd_fallback: false,
77+
}
78+
}
79+
}
80+
6481
/// Recorder-wide runtime configuration.
6582
#[derive(Debug, Clone, PartialEq, Eq)]
6683
pub struct RecorderPolicy {
@@ -70,6 +87,7 @@ pub struct RecorderPolicy {
7087
pub log_level: Option<String>,
7188
pub log_file: Option<PathBuf>,
7289
pub json_errors: bool,
90+
pub io_capture: IoCapturePolicy,
7391
}
7492

7593
impl Default for RecorderPolicy {
@@ -81,6 +99,7 @@ impl Default for RecorderPolicy {
8199
log_level: None,
82100
log_file: None,
83101
json_errors: false,
102+
io_capture: IoCapturePolicy::default(),
84103
}
85104
}
86105
}
@@ -111,6 +130,16 @@ impl RecorderPolicy {
111130
if let Some(json_errors) = update.json_errors {
112131
self.json_errors = json_errors;
113132
}
133+
if let Some(line_proxies) = update.io_capture_line_proxies {
134+
self.io_capture.line_proxies = line_proxies;
135+
if !self.io_capture.line_proxies {
136+
self.io_capture.fd_fallback = false;
137+
}
138+
}
139+
if let Some(fd_fallback) = update.io_capture_fd_fallback {
140+
// fd fallback requires proxies to be on.
141+
self.io_capture.fd_fallback = fd_fallback && self.io_capture.line_proxies;
142+
}
114143
}
115144
}
116145

@@ -130,6 +159,8 @@ struct PolicyUpdate {
130159
log_level: Option<String>,
131160
log_file: Option<PolicyPath>,
132161
json_errors: Option<bool>,
162+
io_capture_line_proxies: Option<bool>,
163+
io_capture_fd_fallback: Option<bool>,
133164
}
134165

135166
/// Snapshot the current policy.
@@ -178,6 +209,12 @@ pub fn configure_policy_from_env() -> RecorderResult<()> {
178209
update.json_errors = Some(parse_bool(&value)?);
179210
}
180211

212+
if let Ok(value) = env::var(ENV_CAPTURE_IO) {
213+
let (line_proxies, fd_fallback) = parse_capture_io(&value)?;
214+
update.io_capture_line_proxies = Some(line_proxies);
215+
update.io_capture_fd_fallback = Some(fd_fallback);
216+
}
217+
181218
apply_policy_update(update);
182219
Ok(())
183220
}
@@ -194,6 +231,54 @@ fn parse_bool(value: &str) -> RecorderResult<bool> {
194231
}
195232
}
196233

234+
fn parse_capture_io(value: &str) -> RecorderResult<(bool, bool)> {
235+
let trimmed = value.trim();
236+
if trimmed.is_empty() {
237+
let default = IoCapturePolicy::default();
238+
return Ok((default.line_proxies, default.fd_fallback));
239+
}
240+
241+
let lower = trimmed.to_ascii_lowercase();
242+
if matches!(
243+
lower.as_str(),
244+
"0" | "off" | "false" | "disable" | "disabled" | "none"
245+
) {
246+
return Ok((false, false));
247+
}
248+
if matches!(lower.as_str(), "1" | "on" | "true" | "enable" | "enabled") {
249+
return Ok((true, false));
250+
}
251+
252+
let mut line_proxies = false;
253+
let mut fd_fallback = false;
254+
for token in lower.split(',') {
255+
match token.trim() {
256+
"" => {}
257+
"proxies" | "proxy" => line_proxies = true,
258+
"fd" | "mirror" | "fallback" => {
259+
line_proxies = true;
260+
fd_fallback = true;
261+
}
262+
other => {
263+
return Err(usage!(
264+
ErrorCode::InvalidPolicyValue,
265+
"invalid CODETRACER_CAPTURE_IO value '{}'",
266+
other
267+
));
268+
}
269+
}
270+
}
271+
272+
if !line_proxies && !fd_fallback {
273+
return Err(usage!(
274+
ErrorCode::InvalidPolicyValue,
275+
"CODETRACER_CAPTURE_IO must enable at least 'proxies' or 'fd'"
276+
));
277+
}
278+
279+
Ok((line_proxies, fd_fallback))
280+
}
281+
197282
// === PyO3 helpers ===
198283

199284
use pyo3::prelude::*;
@@ -202,14 +287,16 @@ use pyo3::types::PyDict;
202287
use crate::ffi;
203288

204289
#[pyfunction(name = "configure_policy")]
205-
#[pyo3(signature = (on_recorder_error=None, require_trace=None, keep_partial_trace=None, log_level=None, log_file=None, json_errors=None))]
290+
#[pyo3(signature = (on_recorder_error=None, require_trace=None, keep_partial_trace=None, log_level=None, log_file=None, json_errors=None, io_capture_line_proxies=None, io_capture_fd_fallback=None))]
206291
pub fn configure_policy_py(
207292
on_recorder_error: Option<&str>,
208293
require_trace: Option<bool>,
209294
keep_partial_trace: Option<bool>,
210295
log_level: Option<&str>,
211296
log_file: Option<&str>,
212297
json_errors: Option<bool>,
298+
io_capture_line_proxies: Option<bool>,
299+
io_capture_fd_fallback: Option<bool>,
213300
) -> PyResult<()> {
214301
let mut update = PolicyUpdate::default();
215302

@@ -245,6 +332,14 @@ pub fn configure_policy_py(
245332
update.json_errors = Some(value);
246333
}
247334

335+
if let Some(value) = io_capture_line_proxies {
336+
update.io_capture_line_proxies = Some(value);
337+
}
338+
339+
if let Some(value) = io_capture_fd_fallback {
340+
update.io_capture_fd_fallback = Some(value);
341+
}
342+
248343
apply_policy_update(update);
249344
Ok(())
250345
}
@@ -278,6 +373,11 @@ pub fn py_policy_snapshot(py: Python<'_>) -> PyResult<PyObject> {
278373
dict.set_item("log_file", py.None())?;
279374
}
280375
dict.set_item("json_errors", snapshot.json_errors)?;
376+
377+
let io_dict = PyDict::new(py);
378+
io_dict.set_item("line_proxies", snapshot.io_capture.line_proxies)?;
379+
io_dict.set_item("fd_fallback", snapshot.io_capture.fd_fallback)?;
380+
dict.set_item("io_capture", io_dict)?;
281381
Ok(dict.into())
282382
}
283383

@@ -301,6 +401,8 @@ mod tests {
301401
assert!(!snap.json_errors);
302402
assert!(snap.log_level.is_none());
303403
assert!(snap.log_file.is_none());
404+
assert!(snap.io_capture.line_proxies);
405+
assert!(!snap.io_capture.fd_fallback);
304406
}
305407

306408
#[test]
@@ -313,6 +415,8 @@ mod tests {
313415
update.log_level = Some("debug".to_string());
314416
update.log_file = Some(PolicyPath::Value(PathBuf::from("/tmp/log.txt")));
315417
update.json_errors = Some(true);
418+
update.io_capture_line_proxies = Some(true);
419+
update.io_capture_fd_fallback = Some(true);
316420

317421
apply_policy_update(update);
318422

@@ -323,6 +427,8 @@ mod tests {
323427
assert_eq!(snap.log_level.as_deref(), Some("debug"));
324428
assert_eq!(snap.log_file.as_deref(), Some(Path::new("/tmp/log.txt")));
325429
assert!(snap.json_errors);
430+
assert!(snap.io_capture.line_proxies);
431+
assert!(snap.io_capture.fd_fallback);
326432
reset_policy();
327433
}
328434

@@ -336,6 +442,7 @@ mod tests {
336442
env::set_var(ENV_LOG_LEVEL, "info");
337443
env::set_var(ENV_LOG_FILE, "/tmp/out.log");
338444
env::set_var(ENV_JSON_ERRORS, "yes");
445+
env::set_var(ENV_CAPTURE_IO, "proxies,fd");
339446

340447
configure_policy_from_env().expect("configure from env");
341448

@@ -348,6 +455,8 @@ mod tests {
348455
assert_eq!(snap.log_level.as_deref(), Some("info"));
349456
assert_eq!(snap.log_file.as_deref(), Some(Path::new("/tmp/out.log")));
350457
assert!(snap.json_errors);
458+
assert!(snap.io_capture.line_proxies);
459+
assert!(snap.io_capture.fd_fallback);
351460
reset_policy();
352461
}
353462

@@ -364,6 +473,19 @@ mod tests {
364473
reset_policy();
365474
}
366475

476+
#[test]
477+
fn configure_policy_from_env_rejects_invalid_capture_io() {
478+
reset_policy();
479+
let env_guard = env_lock();
480+
env::set_var(ENV_CAPTURE_IO, "invalid-token");
481+
482+
let err = configure_policy_from_env().expect_err("invalid capture io should error");
483+
assert_eq!(err.code, ErrorCode::InvalidPolicyValue);
484+
485+
drop(env_guard);
486+
reset_policy();
487+
}
488+
367489
fn env_lock() -> EnvGuard {
368490
EnvGuard
369491
}
@@ -379,6 +501,7 @@ mod tests {
379501
ENV_LOG_LEVEL,
380502
ENV_LOG_FILE,
381503
ENV_JSON_ERRORS,
504+
ENV_CAPTURE_IO,
382505
] {
383506
env::remove_var(key);
384507
}

codetracer-python-recorder/src/runtime/io_capture/events.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::runtime::line_snapshots::FrameId;
22
use pyo3::Python;
3+
use runtime_tracing::{Line, PathId};
34
use std::fmt;
45
use std::thread::ThreadId;
56
use std::time::Instant;
@@ -43,6 +44,9 @@ pub struct ProxyEvent {
4344
pub thread_id: ThreadId,
4445
pub timestamp: Instant,
4546
pub frame_id: Option<FrameId>,
47+
pub path_id: Option<PathId>,
48+
pub line: Option<Line>,
49+
pub path: Option<String>,
4650
}
4751

4852
/// Sink for proxy events. Later stages swap in a real writer-backed implementation.

codetracer-python-recorder/src/runtime/io_capture/install.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use std::sync::Arc;
66
#[cfg_attr(not(test), allow(dead_code))]
77
/// Controller that installs the proxies and restores the original streams.
88
pub struct IoStreamProxies {
9-
_sink: Arc<dyn ProxySink>,
109
_stdout_proxy: Py<LineAwareStdout>,
1110
_stderr_proxy: Py<LineAwareStderr>,
1211
_stdin_proxy: Py<LineAwareStdin>,
@@ -24,19 +23,24 @@ impl IoStreamProxies {
2423
let stderr_original = sys.getattr("stderr")?.unbind();
2524
let stdin_original = sys.getattr("stdin")?.unbind();
2625

27-
let stdout_proxy =
28-
Py::new(py, LineAwareStdout::new(stdout_original.clone_ref(py), sink.clone()))?;
29-
let stderr_proxy =
30-
Py::new(py, LineAwareStderr::new(stderr_original.clone_ref(py), sink.clone()))?;
31-
let stdin_proxy =
32-
Py::new(py, LineAwareStdin::new(stdin_original.clone_ref(py), sink.clone()))?;
26+
let stdout_proxy = Py::new(
27+
py,
28+
LineAwareStdout::new(stdout_original.clone_ref(py), sink.clone()),
29+
)?;
30+
let stderr_proxy = Py::new(
31+
py,
32+
LineAwareStderr::new(stderr_original.clone_ref(py), sink.clone()),
33+
)?;
34+
let stdin_proxy = Py::new(
35+
py,
36+
LineAwareStdin::new(stdin_original.clone_ref(py), sink.clone()),
37+
)?;
3338

3439
sys.setattr("stdout", stdout_proxy.clone_ref(py))?;
3540
sys.setattr("stderr", stderr_proxy.clone_ref(py))?;
3641
sys.setattr("stdin", stdin_proxy.clone_ref(py))?;
3742

3843
Ok(Self {
39-
_sink: sink,
4044
_stdout_proxy: stdout_proxy,
4145
_stderr_proxy: stderr_proxy,
4246
_stdin_proxy: stdin_proxy,
@@ -58,7 +62,6 @@ impl IoStreamProxies {
5862
self.installed = false;
5963
Ok(())
6064
}
61-
6265
}
6366

6467
impl Drop for IoStreamProxies {
Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
pub mod events;
2-
pub mod sink;
3-
pub mod proxies;
42
pub mod install;
3+
pub mod mute;
4+
pub mod proxies;
5+
pub mod sink;
56

67
#[allow(unused_imports)]
78
pub use events::{IoOperation, IoStream, NullSink, ProxyEvent, ProxySink};
89
#[allow(unused_imports)]
9-
pub use sink::{IoChunk, IoChunkConsumer, IoChunkFlags, IoEventSink};
10+
pub use install::IoStreamProxies;
11+
#[allow(unused_imports)]
12+
pub use mute::{is_io_capture_muted, ScopedMuteIoCapture};
1013
#[allow(unused_imports)]
1114
pub use proxies::{LineAwareStderr, LineAwareStdin, LineAwareStdout};
1215
#[allow(unused_imports)]
13-
pub use install::IoStreamProxies;
16+
pub use sink::{IoChunk, IoChunkConsumer, IoChunkFlags, IoEventSink};

0 commit comments

Comments
 (0)