Skip to content

Commit 471d594

Browse files
committed
Milestone 5 - Step 2
codetracer-python-recorder/src/runtime/tracer/io.rs: codetracer-python-recorder/src/runtime/tracer/runtime_tracer.rs: design-docs/codetracer-architecture-refactor-implementation-plan.status.md: Signed-off-by: Tzanko Matev <[email protected]>
1 parent 49d1eb9 commit 471d594

File tree

3 files changed

+236
-152
lines changed

3 files changed

+236
-152
lines changed
Lines changed: 214 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,216 @@
11
//! IO capture coordination for `RuntimeTracer`.
22
3-
// Placeholder module; implementations will arrive during Milestone 5.
3+
use crate::runtime::io_capture::{
4+
IoCapturePipeline, IoCaptureSettings, IoChunk, IoChunkFlags, IoStream, ScopedMuteIoCapture,
5+
};
6+
use crate::runtime::line_snapshots::{FrameId, LineSnapshotStore};
7+
use pyo3::prelude::*;
8+
use runtime_tracing::{
9+
EventLogKind, Line, NonStreamingTraceWriter, PathId, RecordEvent, TraceLowLevelEvent,
10+
TraceWriter,
11+
};
12+
use serde::Serialize;
13+
use std::path::Path;
14+
use std::sync::Arc;
15+
use std::thread::ThreadId;
16+
17+
/// Coordinates installation, flushing, and teardown of the IO capture pipeline.
18+
pub struct IoCoordinator {
19+
snapshots: Arc<LineSnapshotStore>,
20+
pipeline: Option<IoCapturePipeline>,
21+
}
22+
23+
impl IoCoordinator {
24+
/// Create a coordinator with a fresh snapshot store and no active pipeline.
25+
pub fn new() -> Self {
26+
Self {
27+
snapshots: Arc::new(LineSnapshotStore::new()),
28+
pipeline: None,
29+
}
30+
}
31+
32+
/// Expose the shared snapshot store for collaborators (tests, IO capture).
33+
pub fn snapshot_store(&self) -> Arc<LineSnapshotStore> {
34+
Arc::clone(&self.snapshots)
35+
}
36+
37+
/// Install the IO capture pipeline using the provided settings.
38+
pub fn install(&mut self, py: Python<'_>, settings: IoCaptureSettings) -> PyResult<()> {
39+
self.pipeline = IoCapturePipeline::install(py, Arc::clone(&self.snapshots), settings)?;
40+
Ok(())
41+
}
42+
43+
/// Flush buffered output for the active thread before emitting a step event.
44+
pub fn flush_before_step(
45+
&self,
46+
thread_id: ThreadId,
47+
writer: &mut NonStreamingTraceWriter,
48+
) -> bool {
49+
let Some(pipeline) = self.pipeline.as_ref() else {
50+
return false;
51+
};
52+
53+
pipeline.flush_before_step(thread_id);
54+
self.drain_chunks(pipeline, writer)
55+
}
56+
57+
/// Flush every buffered chunk regardless of thread affinity.
58+
pub fn flush_all(&self, writer: &mut NonStreamingTraceWriter) -> bool {
59+
let Some(pipeline) = self.pipeline.as_ref() else {
60+
return false;
61+
};
62+
63+
pipeline.flush_all();
64+
self.drain_chunks(pipeline, writer)
65+
}
66+
67+
/// Drain remaining chunks and uninstall the capture pipeline.
68+
pub fn teardown(&mut self, py: Python<'_>, writer: &mut NonStreamingTraceWriter) -> bool {
69+
let Some(mut pipeline) = self.pipeline.take() else {
70+
return false;
71+
};
72+
73+
pipeline.flush_all();
74+
let mut recorded = false;
75+
76+
for chunk in pipeline.drain_chunks() {
77+
recorded |= self.record_chunk(writer, chunk);
78+
}
79+
80+
pipeline.uninstall(py);
81+
82+
for chunk in pipeline.drain_chunks() {
83+
recorded |= self.record_chunk(writer, chunk);
84+
}
85+
86+
recorded
87+
}
88+
89+
/// Clear the snapshot cache once tracing concludes.
90+
pub fn clear_snapshots(&self) {
91+
self.snapshots.clear();
92+
}
93+
94+
/// Record the latest frame snapshot for the active thread.
95+
pub fn record_snapshot(
96+
&self,
97+
thread_id: ThreadId,
98+
path_id: PathId,
99+
line: Line,
100+
frame_id: FrameId,
101+
) {
102+
self.snapshots.record(thread_id, path_id, line, frame_id);
103+
}
104+
105+
fn drain_chunks(
106+
&self,
107+
pipeline: &IoCapturePipeline,
108+
writer: &mut NonStreamingTraceWriter,
109+
) -> bool {
110+
let mut recorded = false;
111+
for chunk in pipeline.drain_chunks() {
112+
recorded |= self.record_chunk(writer, chunk);
113+
}
114+
recorded
115+
}
116+
117+
fn record_chunk(&self, writer: &mut NonStreamingTraceWriter, mut chunk: IoChunk) -> bool {
118+
if chunk.path_id.is_none() {
119+
if let Some(path) = chunk.path.as_deref() {
120+
let path_id = TraceWriter::ensure_path_id(writer, Path::new(path));
121+
chunk.path_id = Some(path_id);
122+
}
123+
}
124+
125+
let kind = match chunk.stream {
126+
IoStream::Stdout => EventLogKind::Write,
127+
IoStream::Stderr => EventLogKind::WriteOther,
128+
IoStream::Stdin => EventLogKind::Read,
129+
};
130+
131+
let metadata = self.build_metadata(&chunk);
132+
let content = String::from_utf8_lossy(&chunk.payload).into_owned();
133+
134+
TraceWriter::add_event(
135+
writer,
136+
TraceLowLevelEvent::Event(RecordEvent {
137+
kind,
138+
metadata,
139+
content,
140+
}),
141+
);
142+
143+
true
144+
}
145+
146+
fn build_metadata(&self, chunk: &IoChunk) -> String {
147+
#[derive(Serialize)]
148+
struct IoEventMetadata<'a> {
149+
stream: &'a str,
150+
thread: String,
151+
path_id: Option<usize>,
152+
line: Option<i64>,
153+
frame_id: Option<u64>,
154+
flags: Vec<&'a str>,
155+
}
156+
157+
let snapshot = self.snapshots.snapshot_for_thread(chunk.thread_id);
158+
let path_id = chunk
159+
.path_id
160+
.map(|id| id.0)
161+
.or_else(|| snapshot.as_ref().map(|snap| snap.path_id().0));
162+
let line = chunk
163+
.line
164+
.map(|line| line.0)
165+
.or_else(|| snapshot.as_ref().map(|snap| snap.line().0));
166+
let frame_id = chunk
167+
.frame_id
168+
.or_else(|| snapshot.as_ref().map(|snap| snap.frame_id()));
169+
170+
let metadata = IoEventMetadata {
171+
stream: match chunk.stream {
172+
IoStream::Stdout => "stdout",
173+
IoStream::Stderr => "stderr",
174+
IoStream::Stdin => "stdin",
175+
},
176+
thread: format!("{:?}", chunk.thread_id),
177+
path_id,
178+
line,
179+
frame_id: frame_id.map(|id| id.as_raw()),
180+
flags: flag_labels(chunk.flags),
181+
};
182+
183+
match serde_json::to_string(&metadata) {
184+
Ok(json) => json,
185+
Err(err) => {
186+
let _mute = ScopedMuteIoCapture::new();
187+
log::error!("failed to serialise IO metadata: {err}");
188+
"{}".to_string()
189+
}
190+
}
191+
}
192+
}
193+
194+
/// Translate chunk flags into telemetry labels.
195+
pub fn flag_labels(flags: IoChunkFlags) -> Vec<&'static str> {
196+
let mut labels = Vec::new();
197+
if flags.contains(IoChunkFlags::NEWLINE_TERMINATED) {
198+
labels.push("newline");
199+
}
200+
if flags.contains(IoChunkFlags::EXPLICIT_FLUSH) {
201+
labels.push("flush");
202+
}
203+
if flags.contains(IoChunkFlags::STEP_BOUNDARY) {
204+
labels.push("step_boundary");
205+
}
206+
if flags.contains(IoChunkFlags::TIME_SPLIT) {
207+
labels.push("time_split");
208+
}
209+
if flags.contains(IoChunkFlags::INPUT_CHUNK) {
210+
labels.push("input");
211+
}
212+
if flags.contains(IoChunkFlags::FD_MIRROR) {
213+
labels.push("mirror");
214+
}
215+
labels
216+
}

0 commit comments

Comments
 (0)