Skip to content

Commit 8aca4fc

Browse files
committed
Stage 2
1 parent 2331008 commit 8aca4fc

File tree

10 files changed

+283
-25
lines changed

10 files changed

+283
-25
lines changed

codetracer-python-recorder/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

codetracer-python-recorder/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ default = ["extension-module"]
2323
[dependencies]
2424
pyo3 = { version = "0.25.1" }
2525
runtime_tracing = "0.14.0"
26+
base64 = "0.22"
2627
bitflags = "2.4"
2728
once_cell = "1.19"
2829
dashmap = "5.5"

codetracer-python-recorder/src/runtime/io_capture/mod.rs

Lines changed: 177 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
1-
#![allow(dead_code)]
2-
31
//! Cross-platform IO capture workers for stdout, stderr, and stdin.
42
53
use std::thread::JoinHandle;
64
use std::time::Instant;
75

6+
use base64::engine::general_purpose::STANDARD as BASE64_ENGINE;
7+
use base64::Engine;
88
use crossbeam_channel::{bounded, Receiver, Sender};
9-
109
use recorder_errors::{bug, enverr, ErrorCode, RecorderResult};
10+
use runtime_tracing::{EventLogKind, RecordEvent, TraceLowLevelEvent, TraceWriter};
11+
use serde::Serialize;
1112

1213
use crate::errors::Result;
1314

15+
use super::thread_snapshots::{SnapshotEntry, ThreadSnapshotStore};
16+
use super::trace_writer_host::TraceWriterHost;
1417
use super::IoDrain;
1518

1619
#[cfg(unix)]
@@ -44,7 +47,6 @@ impl StreamKind {
4447
}
4548
}
4649

47-
#[allow(dead_code)]
4850
#[derive(Debug)]
4951
pub struct IoChunk {
5052
pub stream: StreamKind,
@@ -105,12 +107,6 @@ impl IoCapture {
105107
}
106108
}
107109

108-
impl IoDrain for IoCapture {
109-
fn drain(&mut self, _py: pyo3::Python<'_>) -> RecorderResult<()> {
110-
self.shutdown()
111-
}
112-
}
113-
114110
impl Drop for IoCapture {
115111
fn drop(&mut self) {
116112
if self.platform.is_some() {
@@ -121,6 +117,177 @@ impl Drop for IoCapture {
121117
}
122118
}
123119

120+
#[derive(Clone)]
121+
struct IoEventSink {
122+
writer: TraceWriterHost,
123+
snapshots: ThreadSnapshotStore,
124+
start: Instant,
125+
}
126+
127+
impl IoEventSink {
128+
fn new(writer: TraceWriterHost, snapshots: ThreadSnapshotStore) -> Self {
129+
Self {
130+
writer,
131+
snapshots,
132+
start: Instant::now(),
133+
}
134+
}
135+
136+
fn pump(self, receiver: IoChunkReceiver) -> Result<()> {
137+
let mut sink = self;
138+
while let Ok(chunk) = receiver.recv() {
139+
sink.record_chunk(chunk)?;
140+
}
141+
Ok(())
142+
}
143+
144+
fn record_chunk(&mut self, chunk: IoChunk) -> Result<()> {
145+
let event = TraceLowLevelEvent::Event(RecordEvent {
146+
kind: map_stream_to_event_kind(chunk.stream),
147+
metadata: self.serialize_metadata(&chunk)?,
148+
content: BASE64_ENGINE.encode(&chunk.bytes),
149+
});
150+
151+
let mut writer = self.writer.lock()?;
152+
writer.add_event(event);
153+
Ok(())
154+
}
155+
156+
fn serialize_metadata(&self, chunk: &IoChunk) -> Result<String> {
157+
let metadata = IoChunkMetadata {
158+
stream: chunk.stream.as_str(),
159+
timestamp_ns: self.relative_timestamp_ns(chunk.timestamp),
160+
thread_id: format!("{:?}", chunk.producer_thread),
161+
byte_len: chunk.bytes.len(),
162+
snapshot: self.snapshot_metadata(chunk.producer_thread),
163+
};
164+
165+
serde_json::to_string(&metadata).map_err(|err| {
166+
bug!(
167+
ErrorCode::Unknown,
168+
"failed to encode IO chunk metadata as JSON"
169+
)
170+
.with_context("error", err.to_string())
171+
})
172+
}
173+
174+
fn relative_timestamp_ns(&self, timestamp: Instant) -> u128 {
175+
match timestamp.checked_duration_since(self.start) {
176+
Some(duration) => duration.as_nanos(),
177+
None => 0,
178+
}
179+
}
180+
181+
fn snapshot_metadata(&self, thread_id: std::thread::ThreadId) -> Option<IoSnapshotMetadata> {
182+
let snapshot = self
183+
.snapshots
184+
.snapshot_for(thread_id)
185+
.or_else(|| self.snapshots.latest());
186+
snapshot.map(IoSnapshotMetadata::from)
187+
}
188+
}
189+
190+
#[derive(Serialize)]
191+
struct IoChunkMetadata<'a> {
192+
stream: &'a str,
193+
timestamp_ns: u128,
194+
thread_id: String,
195+
byte_len: usize,
196+
#[serde(skip_serializing_if = "Option::is_none")]
197+
snapshot: Option<IoSnapshotMetadata>,
198+
}
199+
200+
#[derive(Serialize)]
201+
struct IoSnapshotMetadata {
202+
path_id: usize,
203+
line: i64,
204+
frame_id: usize,
205+
}
206+
207+
impl From<SnapshotEntry> for IoSnapshotMetadata {
208+
fn from(entry: SnapshotEntry) -> Self {
209+
IoSnapshotMetadata {
210+
path_id: entry.path_id.0,
211+
line: entry.line.0,
212+
frame_id: entry.frame_id,
213+
}
214+
}
215+
}
216+
217+
fn map_stream_to_event_kind(stream: StreamKind) -> EventLogKind {
218+
match stream {
219+
StreamKind::Stdout => EventLogKind::Write,
220+
StreamKind::Stderr => EventLogKind::WriteOther,
221+
StreamKind::Stdin => EventLogKind::Read,
222+
}
223+
}
224+
225+
pub struct ActiveCapture {
226+
capture: IoCapture,
227+
sink: Option<WorkerHandle>,
228+
}
229+
230+
impl ActiveCapture {
231+
pub fn start(writer: TraceWriterHost, snapshots: ThreadSnapshotStore) -> Result<Self> {
232+
let mut capture = IoCapture::start()?;
233+
let receiver = capture.take_receiver().ok_or_else(|| {
234+
bug!(
235+
ErrorCode::Unknown,
236+
"IO capture receiver already taken before sink start"
237+
)
238+
})?;
239+
240+
let sink_worker = spawn_reader_thread("sink", move || {
241+
IoEventSink::new(writer, snapshots).pump(receiver)
242+
})?;
243+
244+
Ok(Self {
245+
capture,
246+
sink: Some(sink_worker),
247+
})
248+
}
249+
250+
fn join_sink(&mut self) -> Result<()> {
251+
if let Some(handle) = self.sink.take() {
252+
match handle.join() {
253+
Ok(result) => result,
254+
Err(panic) => {
255+
let message = if let Some(msg) = panic.downcast_ref::<&'static str>() {
256+
(*msg).to_string()
257+
} else if let Some(msg) = panic.downcast_ref::<String>() {
258+
msg.clone()
259+
} else {
260+
"unknown panic".to_string()
261+
};
262+
Err(bug!(ErrorCode::Unknown, "IO capture sink worker panicked")
263+
.with_context("details", message))
264+
}
265+
}
266+
} else {
267+
Ok(())
268+
}
269+
}
270+
}
271+
272+
impl IoDrain for ActiveCapture {
273+
fn drain(&mut self, _py: pyo3::Python<'_>) -> RecorderResult<()> {
274+
let shutdown_result = self.capture.shutdown();
275+
let sink_result = self.join_sink();
276+
shutdown_result.and(sink_result)
277+
}
278+
}
279+
280+
impl Drop for ActiveCapture {
281+
fn drop(&mut self) {
282+
if let Err(err) = self.capture.shutdown() {
283+
log::error!("failed to shutdown IO capture cleanly: {}", err);
284+
}
285+
if let Err(err) = self.join_sink() {
286+
log::error!("failed to join IO sink worker: {}", err);
287+
}
288+
}
289+
}
290+
124291
pub(super) fn spawn_reader_thread<F>(name: &'static str, worker: F) -> Result<WorkerHandle>
125292
where
126293
F: FnOnce() -> Result<()> + Send + 'static,

codetracer-python-recorder/src/runtime/io_capture/unix.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
#![allow(dead_code)]
2-
31
use std::fs::File;
42
use std::io::{Read, Write};
53
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};

codetracer-python-recorder/src/runtime/io_capture/windows.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
#![allow(dead_code)]
2-
31
use std::io::ErrorKind;
42

53
use crossbeam_channel::Sender;

codetracer-python-recorder/src/runtime/mod.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub use output_paths::TraceOutputPaths;
1414

1515
use activation::ActivationController;
1616
use frame_inspector::capture_frame;
17+
use io_capture::ActiveCapture;
1718
use logging::log_event;
1819
use thread_snapshots::{SnapshotEntry, ThreadSnapshotStore};
1920
use trace_writer_host::{register_step_with_guard, TraceWriterHost};
@@ -263,7 +264,7 @@ impl RuntimeTracer {
263264
self.encountered_failure = false;
264265
set_active_trace_id(Some(self.trace_id.clone()));
265266
self.thread_snapshots.reset();
266-
self.io_drain = Box::new(NoopIoDrain::default());
267+
self.configure_io_capture()?;
267268
Ok(())
268269
}
269270

@@ -297,6 +298,13 @@ impl RuntimeTracer {
297298
self.thread_snapshots.clone()
298299
}
299300

301+
fn configure_io_capture(&mut self) -> PyResult<()> {
302+
let capture = ActiveCapture::start(self.writer.clone(), self.thread_snapshots.clone())
303+
.map_err(ffi::map_recorder_error)?;
304+
self.io_drain = Box::new(capture);
305+
Ok(())
306+
}
307+
300308
fn cleanup_partial_outputs(&self) -> RecorderResult<()> {
301309
if let Some(outputs) = &self.output_paths {
302310
for path in [outputs.events(), outputs.metadata(), outputs.paths()] {

codetracer-python-recorder/src/runtime/thread_snapshots.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use log::warn;
88
use runtime_tracing::{Line, PathId};
99

1010
/// Snapshot of the last recorded step for a thread.
11-
#[allow(dead_code)]
1211
#[derive(Clone, Debug)]
1312
pub struct SnapshotEntry {
1413
pub path_id: PathId,
@@ -46,14 +45,12 @@ impl ThreadSnapshotStore {
4645
}
4746

4847
/// Return the latest snapshot for a given thread id.
49-
#[allow(dead_code)]
5048
pub fn snapshot_for(&self, thread_id: ThreadId) -> Option<SnapshotEntry> {
5149
let store = lock(&self.inner);
5250
store.get(&thread_id).cloned()
5351
}
5452

5553
/// Return the most recent snapshot observed across all threads.
56-
#[allow(dead_code)]
5754
pub fn latest(&self) -> Option<SnapshotEntry> {
5855
let global = lock(&self.last_global);
5956
global.clone()
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
"""Integration tests for runtime IO capture."""
2+
3+
from __future__ import annotations
4+
5+
import base64
6+
import json
7+
import os
8+
import subprocess
9+
import sys
10+
import textwrap
11+
from collections import defaultdict
12+
from pathlib import Path
13+
from typing import DefaultDict, Iterable, Tuple
14+
15+
16+
def _load_io_events(trace_file: Path) -> Iterable[Tuple[int, dict[str, object], bytes]]:
17+
events = json.loads(trace_file.read_text())
18+
for entry in events:
19+
payload = entry.get("Event")
20+
if not payload:
21+
continue
22+
metadata = json.loads(payload["metadata"])
23+
chunk = base64.b64decode(payload["content"])
24+
yield payload["kind"], metadata, chunk
25+
26+
27+
def test_io_capture_records_all_streams(tmp_path: Path) -> None:
28+
script = textwrap.dedent(
29+
"""
30+
import sys
31+
import codetracer_python_recorder as codetracer
32+
33+
sys.stdout.write("hello stdout\\n")
34+
sys.stdout.flush()
35+
sys.stderr.write("warning\\n")
36+
sys.stderr.flush()
37+
data = sys.stdin.readline()
38+
sys.stdout.write(f"input={data}")
39+
sys.stdout.flush()
40+
codetracer.stop()
41+
"""
42+
)
43+
44+
env = os.environ.copy()
45+
env["CODETRACER_TRACE"] = str(tmp_path)
46+
env["CODETRACER_FORMAT"] = "json"
47+
48+
completed = subprocess.run(
49+
[sys.executable, "-c", script],
50+
input="feed\n",
51+
text=True,
52+
capture_output=True,
53+
env=env,
54+
check=True,
55+
)
56+
57+
assert completed.stdout == "hello stdout\ninput=feed\n"
58+
assert completed.stderr == "warning\n"
59+
60+
trace_file = tmp_path / "trace.json"
61+
assert trace_file.exists(), "expected trace artefact"
62+
63+
buffers: DefaultDict[str, bytearray] = defaultdict(bytearray)
64+
65+
for kind, metadata, chunk in _load_io_events(trace_file):
66+
stream = metadata["stream"]
67+
buffers[stream].extend(chunk)
68+
69+
assert metadata["byte_len"] == len(chunk)
70+
assert metadata["timestamp_ns"] >= 0
71+
assert isinstance(metadata["thread_id"], str)
72+
snapshot = metadata.get("snapshot")
73+
if snapshot is not None:
74+
assert {"path_id", "line", "frame_id"}.issubset(snapshot)
75+
76+
if stream == "stdout":
77+
assert kind == 0 # EventLogKind::Write
78+
elif stream == "stderr":
79+
assert kind == 2 # EventLogKind::WriteOther
80+
elif stream == "stdin":
81+
assert kind == 3 # EventLogKind::Read
82+
83+
assert buffers["stdout"].decode() == "hello stdout\ninput=feed\n"
84+
assert buffers["stderr"].decode() == "warning\n"
85+
assert buffers["stdin"].decode() == "feed\n"

0 commit comments

Comments
 (0)