Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 148 additions & 22 deletions codex-rs/tui/src/session_log.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::fs::File;
use std::fs::OpenOptions;
use std::io::BufWriter;
use std::io::Write;
use std::path::PathBuf;
use std::sync::LazyLock;
use std::sync::Mutex;
use std::sync::OnceLock;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

use codex_core::config::Config;
use codex_core::protocol::Op;
Expand All @@ -14,19 +16,28 @@ use serde_json::json;
use crate::app_event::AppEvent;

static LOGGER: LazyLock<SessionLogger> = LazyLock::new(SessionLogger::new);
const FLUSH_INTERVAL: Duration = if cfg!(test) {
Duration::from_millis(10)
} else {
Duration::from_millis(500)
};

struct SessionLogger {
file: OnceLock<Mutex<File>>,
worker: OnceLock<LoggerWorker>,
}

impl SessionLogger {
fn new() -> Self {
Self {
file: OnceLock::new(),
worker: OnceLock::new(),
}
}

fn open(&self, path: PathBuf) -> std::io::Result<()> {
if self.worker.get().is_some() {
return Ok(());
}

let mut opts = OpenOptions::new();
opts.create(true).truncate(true).write(true);

Expand All @@ -37,41 +48,100 @@ impl SessionLogger {
}

let file = opts.open(path)?;
self.file.get_or_init(|| Mutex::new(file));
let (tx, rx) = mpsc::channel::<LogEntry>();
let handle = thread::Builder::new()
.name("tui-session-log".to_string())
.spawn(move || {
let mut writer = BufWriter::with_capacity(64 * 1024, file);
let mut dirty = false;
loop {
match rx.recv_timeout(FLUSH_INTERVAL) {
Ok(entry) => match entry {
LogEntry::Line(serialized) => {
if let Err(e) = writer.write_all(serialized.as_bytes()) {
tracing::warn!("session log write error: {e}");
continue;
}
if let Err(e) = writer.write_all(b"\n") {
tracing::warn!("session log write error: {e}");
continue;
}
dirty = true;
}
LogEntry::Flush(done_tx) => {
if let Err(e) = writer.flush() {
tracing::warn!("session log flush error: {e}");
} else {
dirty = false;
}
let _ = done_tx.send(());
}
},
Err(mpsc::RecvTimeoutError::Timeout) => {
if dirty {
if let Err(e) = writer.flush() {
tracing::warn!("session log flush error: {e}");
} else {
dirty = false;
}
}
}
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
}
if dirty {
let _ = writer.flush();
}
})?;

let _ = self.worker.set(LoggerWorker {
tx,
_handle: handle,
});
Ok(())
}

fn write_json_line(&self, value: serde_json::Value) {
let Some(mutex) = self.file.get() else {
let Some(worker) = self.worker.get() else {
return;
};
let mut guard = match mutex.lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
match serde_json::to_string(&value) {
Ok(serialized) => {
if let Err(e) = guard.write_all(serialized.as_bytes()) {
tracing::warn!("session log write error: {}", e);
return;
}
if let Err(e) = guard.write_all(b"\n") {
tracing::warn!("session log write error: {}", e);
return;
}
if let Err(e) = guard.flush() {
tracing::warn!("session log flush error: {}", e);
if let Err(e) = worker.tx.send(LogEntry::Line(serialized)) {
tracing::warn!("session log write error: {e}");
}
}
Err(e) => tracing::warn!("session log serialize error: {}", e),
Err(e) => tracing::warn!("session log serialize error: {e}"),
}
}

fn is_enabled(&self) -> bool {
self.file.get().is_some()
self.worker.get().is_some()
}

fn flush(&self) {
let Some(worker) = self.worker.get() else {
return;
};
let (done_tx, done_rx) = mpsc::channel();
if let Err(e) = worker.tx.send(LogEntry::Flush(done_tx)) {
tracing::warn!("session log flush error: {e}");
return;
}
let _ = done_rx.recv();
}
}

struct LoggerWorker {
tx: mpsc::Sender<LogEntry>,
_handle: thread::JoinHandle<()>,
}

enum LogEntry {
Line(String),
Flush(mpsc::Sender<()>),
}

fn now_ts() -> String {
// RFC3339 for readability; consumers can parse as needed.
chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
Expand Down Expand Up @@ -194,6 +264,7 @@ pub(crate) fn log_session_end() {
"kind": "session_end",
});
LOGGER.write_json_line(value);
LOGGER.flush();
}

fn write_record<T>(dir: &str, kind: &str, obj: &T)
Expand All @@ -208,3 +279,58 @@ where
});
LOGGER.write_json_line(value);
}

#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use serde_json::json;
use std::thread;
use std::time::Duration;
use tempfile::tempdir;

use super::SessionLogger;

#[test]
fn flush_writes_line_before_returning() {
let dir = tempdir().expect("tempdir");
let path = dir.path().join("session.jsonl");
let logger = SessionLogger::new();
logger.open(path.clone()).expect("open session log");

let record = json!({
"ts": "T",
"dir": "meta",
"kind": "session_end",
});
logger.write_json_line(record.clone());
logger.flush();

let contents = std::fs::read_to_string(path).expect("read log");
let lines: Vec<&str> = contents.lines().collect();
assert_eq!(lines.len(), 1);
let parsed: serde_json::Value = serde_json::from_str(lines[0]).expect("parse json");
assert_eq!(parsed, record);
}

#[test]
fn periodic_flush_persists_lines() {
let dir = tempdir().expect("tempdir");
let path = dir.path().join("session.jsonl");
let logger = SessionLogger::new();
logger.open(path.clone()).expect("open session log");

let record = json!({
"ts": "T2",
"dir": "meta",
"kind": "session_start",
});
logger.write_json_line(record.clone());
thread::sleep(Duration::from_millis(50));

let contents = std::fs::read_to_string(path).expect("read log");
let lines: Vec<&str> = contents.lines().collect();
assert_eq!(lines.len(), 1);
let parsed: serde_json::Value = serde_json::from_str(lines[0]).expect("parse json");
assert_eq!(parsed, record);
}
}
Loading
Loading