Skip to content

Commit d380cdd

Browse files
committed
Refactor io_lines.rs -> mod io_capture
codetracer-python-recorder/src/runtime/io_capture/events.rs: codetracer-python-recorder/src/runtime/io_capture/install.rs: codetracer-python-recorder/src/runtime/io_capture/mod.rs: codetracer-python-recorder/src/runtime/io_capture/proxies.rs: codetracer-python-recorder/src/runtime/io_capture/sink.rs: codetracer-python-recorder/src/runtime/io_lines.rs: codetracer-python-recorder/src/runtime/mod.rs: design-docs/adr/0008-line-aware-io-capture.md: design-docs/io-capture-line-proxy-implementation-plan.md: design-docs/io-capture-line-proxy-implementation-plan.status.md: Signed-off-by: Tzanko Matev <[email protected]>
1 parent 8da1046 commit d380cdd

File tree

10 files changed

+1196
-1171
lines changed

10 files changed

+1196
-1171
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use crate::runtime::line_snapshots::FrameId;
2+
use pyo3::Python;
3+
use std::fmt;
4+
use std::thread::ThreadId;
5+
use std::time::Instant;
6+
7+
/// Distinguishes the proxied streams.
8+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
9+
pub enum IoStream {
10+
Stdout,
11+
Stderr,
12+
Stdin,
13+
}
14+
15+
impl fmt::Display for IoStream {
16+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
17+
match self {
18+
IoStream::Stdout => write!(f, "stdout"),
19+
IoStream::Stderr => write!(f, "stderr"),
20+
IoStream::Stdin => write!(f, "stdin"),
21+
}
22+
}
23+
}
24+
25+
/// Operations surfaced by the proxies.
26+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
27+
pub enum IoOperation {
28+
Write,
29+
Writelines,
30+
Flush,
31+
Read,
32+
ReadLine,
33+
ReadLines,
34+
ReadInto,
35+
}
36+
37+
/// Raw proxy payload collected during Stage 1.
38+
#[derive(Clone, Debug)]
39+
pub struct ProxyEvent {
40+
pub stream: IoStream,
41+
pub operation: IoOperation,
42+
pub payload: Vec<u8>,
43+
pub thread_id: ThreadId,
44+
pub timestamp: Instant,
45+
pub frame_id: Option<FrameId>,
46+
}
47+
48+
/// Sink for proxy events. Later stages swap in a real writer-backed implementation.
49+
pub trait ProxySink: Send + Sync + 'static {
50+
fn record(&self, py: Python<'_>, event: ProxyEvent);
51+
}
52+
53+
/// No-op sink for scenarios where IO capture is disabled but proxies must install.
54+
pub struct NullSink;
55+
56+
impl ProxySink for NullSink {
57+
fn record(&self, _py: Python<'_>, _event: ProxyEvent) {}
58+
}
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
use crate::runtime::io_capture::events::ProxySink;
2+
use crate::runtime::io_capture::proxies::{LineAwareStderr, LineAwareStdin, LineAwareStdout};
3+
use pyo3::prelude::*;
4+
use std::sync::Arc;
5+
6+
#[cfg_attr(not(test), allow(dead_code))]
7+
/// Controller that installs the proxies and restores the original streams.
8+
pub struct IoStreamProxies {
9+
_sink: Arc<dyn ProxySink>,
10+
_stdout_proxy: Py<LineAwareStdout>,
11+
_stderr_proxy: Py<LineAwareStderr>,
12+
_stdin_proxy: Py<LineAwareStdin>,
13+
original_stdout: PyObject,
14+
original_stderr: PyObject,
15+
original_stdin: PyObject,
16+
installed: bool,
17+
}
18+
19+
#[cfg_attr(not(test), allow(dead_code))]
20+
impl IoStreamProxies {
21+
pub fn install(py: Python<'_>, sink: Arc<dyn ProxySink>) -> PyResult<Self> {
22+
let sys = py.import("sys")?;
23+
let stdout_original = sys.getattr("stdout")?.unbind();
24+
let stderr_original = sys.getattr("stderr")?.unbind();
25+
let stdin_original = sys.getattr("stdin")?.unbind();
26+
27+
let stdout_proxy =
28+
Py::new(py, LineAwareStdout::new(stdout_original.clone_ref(py), sink.clone()))?;
29+
let stderr_proxy =
30+
Py::new(py, LineAwareStderr::new(stderr_original.clone_ref(py), sink.clone()))?;
31+
let stdin_proxy =
32+
Py::new(py, LineAwareStdin::new(stdin_original.clone_ref(py), sink.clone()))?;
33+
34+
sys.setattr("stdout", stdout_proxy.clone_ref(py))?;
35+
sys.setattr("stderr", stderr_proxy.clone_ref(py))?;
36+
sys.setattr("stdin", stdin_proxy.clone_ref(py))?;
37+
38+
Ok(Self {
39+
_sink: sink,
40+
_stdout_proxy: stdout_proxy,
41+
_stderr_proxy: stderr_proxy,
42+
_stdin_proxy: stdin_proxy,
43+
original_stdout: stdout_original,
44+
original_stderr: stderr_original,
45+
original_stdin: stdin_original,
46+
installed: true,
47+
})
48+
}
49+
50+
pub fn uninstall(&mut self, py: Python<'_>) -> PyResult<()> {
51+
if !self.installed {
52+
return Ok(());
53+
}
54+
let sys = py.import("sys")?;
55+
sys.setattr("stdout", &self.original_stdout)?;
56+
sys.setattr("stderr", &self.original_stderr)?;
57+
sys.setattr("stdin", &self.original_stdin)?;
58+
self.installed = false;
59+
Ok(())
60+
}
61+
62+
}
63+
64+
impl Drop for IoStreamProxies {
65+
fn drop(&mut self) {
66+
Python::with_gil(|py| {
67+
if let Err(err) = self.uninstall(py) {
68+
err.print(py);
69+
}
70+
});
71+
}
72+
}
73+
74+
#[cfg(test)]
75+
mod tests {
76+
use super::*;
77+
use crate::runtime::io_capture::events::{IoOperation, IoStream, ProxyEvent};
78+
use pyo3::Python;
79+
use std::ffi::CString;
80+
use std::sync::Mutex;
81+
82+
#[derive(Default)]
83+
struct RecordingSink {
84+
events: Mutex<Vec<ProxyEvent>>,
85+
}
86+
87+
impl RecordingSink {
88+
fn new() -> Self {
89+
Self {
90+
events: Mutex::new(Vec::new()),
91+
}
92+
}
93+
94+
fn events(&self) -> Vec<ProxyEvent> {
95+
self.events.lock().expect("lock poisoned").clone()
96+
}
97+
}
98+
99+
impl ProxySink for RecordingSink {
100+
fn record(&self, _py: Python<'_>, event: ProxyEvent) {
101+
self.events.lock().expect("lock poisoned").push(event);
102+
}
103+
}
104+
105+
fn with_string_io<F, R>(py: Python<'_>, sink: Arc<dyn ProxySink>, func: F) -> PyResult<R>
106+
where
107+
F: FnOnce(&mut IoStreamProxies) -> PyResult<R>,
108+
{
109+
let sys = py.import("sys")?;
110+
let io = py.import("io")?;
111+
let stdout_buf = io.call_method0("StringIO")?;
112+
let stderr_buf = io.call_method0("StringIO")?;
113+
let stdin_buf = io.call_method1("StringIO", ("line1\nline2\n",))?;
114+
sys.setattr("stdout", stdout_buf)?;
115+
sys.setattr("stderr", stderr_buf)?;
116+
sys.setattr("stdin", stdin_buf)?;
117+
118+
let mut proxies = IoStreamProxies::install(py, sink)?;
119+
let result = func(&mut proxies)?;
120+
proxies.uninstall(py)?;
121+
Ok(result)
122+
}
123+
124+
#[test]
125+
fn stdout_write_is_captured() {
126+
Python::with_gil(|py| {
127+
let sink = Arc::new(RecordingSink::new());
128+
with_string_io(py, sink.clone(), |_| {
129+
let code = CString::new("print('hello', end='')").unwrap();
130+
py.run(code.as_c_str(), None, None)?;
131+
Ok(())
132+
})
133+
.unwrap();
134+
let events = sink.events();
135+
assert!(!events.is_empty());
136+
assert_eq!(events[0].stream, IoStream::Stdout);
137+
assert_eq!(events[0].operation, IoOperation::Write);
138+
assert_eq!(std::str::from_utf8(&events[0].payload).unwrap(), "hello");
139+
});
140+
}
141+
142+
#[test]
143+
fn stderr_write_is_captured() {
144+
Python::with_gil(|py| {
145+
let sink = Arc::new(RecordingSink::new());
146+
with_string_io(py, sink.clone(), |_| {
147+
let code = CString::new("import sys\nsys.stderr.write('oops')").unwrap();
148+
py.run(code.as_c_str(), None, None)?;
149+
Ok(())
150+
})
151+
.unwrap();
152+
let events = sink.events();
153+
assert!(!events.is_empty());
154+
assert_eq!(events[0].stream, IoStream::Stderr);
155+
assert_eq!(events[0].operation, IoOperation::Write);
156+
assert_eq!(std::str::from_utf8(&events[0].payload).unwrap(), "oops");
157+
});
158+
}
159+
160+
#[test]
161+
fn stdin_read_is_captured() {
162+
Python::with_gil(|py| {
163+
let sink = Arc::new(RecordingSink::new());
164+
with_string_io(py, sink.clone(), |_| {
165+
let code = CString::new("import sys\n_ = sys.stdin.readline()").unwrap();
166+
py.run(code.as_c_str(), None, None)?;
167+
Ok(())
168+
})
169+
.unwrap();
170+
let events = sink.events();
171+
assert!(!events.is_empty());
172+
let latest = events.last().unwrap();
173+
assert_eq!(latest.stream, IoStream::Stdin);
174+
assert_eq!(latest.operation, IoOperation::ReadLine);
175+
assert_eq!(std::str::from_utf8(&latest.payload).unwrap(), "line1\n");
176+
});
177+
}
178+
179+
#[test]
180+
fn reentrant_sink_does_not_loop() {
181+
#[derive(Default)]
182+
struct Reentrant {
183+
inner: RecordingSink,
184+
}
185+
186+
impl ProxySink for Reentrant {
187+
fn record(&self, py: Python<'_>, event: ProxyEvent) {
188+
self.inner.record(py, event.clone());
189+
let _ = py
190+
.import("sys")
191+
.and_then(|sys| sys.getattr("stdout"))
192+
.and_then(|stdout| stdout.call_method1("write", ("[sink]",)));
193+
}
194+
}
195+
196+
Python::with_gil(|py| {
197+
let sink = Arc::new(Reentrant::default());
198+
with_string_io(py, sink.clone(), |_| {
199+
let code = CString::new("print('loop')").unwrap();
200+
py.run(code.as_c_str(), None, None)?;
201+
Ok(())
202+
})
203+
.unwrap();
204+
let events = sink.inner.events();
205+
let payloads: Vec<&[u8]> = events
206+
.iter()
207+
.map(|event| event.payload.as_slice())
208+
.filter(|payload| !payload.is_empty() && *payload != b"\n")
209+
.collect();
210+
assert_eq!(payloads.len(), 1);
211+
assert_eq!(payloads[0], b"loop");
212+
});
213+
}
214+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
pub mod events;
2+
pub mod sink;
3+
pub mod proxies;
4+
pub mod install;
5+
6+
#[allow(unused_imports)]
7+
pub use events::{IoOperation, IoStream, NullSink, ProxyEvent, ProxySink};
8+
#[allow(unused_imports)]
9+
pub use sink::{IoChunk, IoChunkConsumer, IoChunkFlags, IoEventSink};
10+
#[allow(unused_imports)]
11+
pub use proxies::{LineAwareStderr, LineAwareStdin, LineAwareStdout};
12+
#[allow(unused_imports)]
13+
pub use install::IoStreamProxies;

0 commit comments

Comments
 (0)