diff --git a/codex-rs/tui/src/session_log.rs b/codex-rs/tui/src/session_log.rs index b2858e8f210..8d3f11e4334 100644 --- a/codex-rs/tui/src/session_log.rs +++ b/codex-rs/tui/src/session_log.rs @@ -1,10 +1,12 @@ -use std::fs::File; use std::fs::OpenOptions; +use std::io::BufWriter; use std::io::Write; use std::path::PathBuf; use std::sync::LazyLock; -use std::sync::Mutex; use std::sync::OnceLock; +use std::sync::mpsc; +use std::thread; +use std::time::Duration; use codex_core::config::Config; use codex_core::protocol::Op; @@ -14,19 +16,28 @@ use serde_json::json; use crate::app_event::AppEvent; static LOGGER: LazyLock = LazyLock::new(SessionLogger::new); +const FLUSH_INTERVAL: Duration = if cfg!(test) { + Duration::from_millis(10) +} else { + Duration::from_millis(500) +}; struct SessionLogger { - file: OnceLock>, + worker: OnceLock, } impl SessionLogger { fn new() -> Self { Self { - file: OnceLock::new(), + worker: OnceLock::new(), } } fn open(&self, path: PathBuf) -> std::io::Result<()> { + if self.worker.get().is_some() { + return Ok(()); + } + let mut opts = OpenOptions::new(); opts.create(true).truncate(true).write(true); @@ -37,41 +48,100 @@ impl SessionLogger { } let file = opts.open(path)?; - self.file.get_or_init(|| Mutex::new(file)); + let (tx, rx) = mpsc::channel::(); + let handle = thread::Builder::new() + .name("tui-session-log".to_string()) + .spawn(move || { + let mut writer = BufWriter::with_capacity(64 * 1024, file); + let mut dirty = false; + loop { + match rx.recv_timeout(FLUSH_INTERVAL) { + Ok(entry) => match entry { + LogEntry::Line(serialized) => { + if let Err(e) = writer.write_all(serialized.as_bytes()) { + tracing::warn!("session log write error: {e}"); + continue; + } + if let Err(e) = writer.write_all(b"\n") { + tracing::warn!("session log write error: {e}"); + continue; + } + dirty = true; + } + LogEntry::Flush(done_tx) => { + if let Err(e) = writer.flush() { + tracing::warn!("session log flush error: {e}"); + } else { + dirty = false; + } + let _ = done_tx.send(()); + } + }, + Err(mpsc::RecvTimeoutError::Timeout) => { + if dirty { + if let Err(e) = writer.flush() { + tracing::warn!("session log flush error: {e}"); + } else { + dirty = false; + } + } + } + Err(mpsc::RecvTimeoutError::Disconnected) => break, + } + } + if dirty { + let _ = writer.flush(); + } + })?; + + let _ = self.worker.set(LoggerWorker { + tx, + _handle: handle, + }); Ok(()) } fn write_json_line(&self, value: serde_json::Value) { - let Some(mutex) = self.file.get() else { + let Some(worker) = self.worker.get() else { return; }; - let mut guard = match mutex.lock() { - Ok(g) => g, - Err(poisoned) => poisoned.into_inner(), - }; match serde_json::to_string(&value) { Ok(serialized) => { - if let Err(e) = guard.write_all(serialized.as_bytes()) { - tracing::warn!("session log write error: {}", e); - return; - } - if let Err(e) = guard.write_all(b"\n") { - tracing::warn!("session log write error: {}", e); - return; - } - if let Err(e) = guard.flush() { - tracing::warn!("session log flush error: {}", e); + if let Err(e) = worker.tx.send(LogEntry::Line(serialized)) { + tracing::warn!("session log write error: {e}"); } } - Err(e) => tracing::warn!("session log serialize error: {}", e), + Err(e) => tracing::warn!("session log serialize error: {e}"), } } fn is_enabled(&self) -> bool { - self.file.get().is_some() + self.worker.get().is_some() + } + + fn flush(&self) { + let Some(worker) = self.worker.get() else { + return; + }; + let (done_tx, done_rx) = mpsc::channel(); + if let Err(e) = worker.tx.send(LogEntry::Flush(done_tx)) { + tracing::warn!("session log flush error: {e}"); + return; + } + let _ = done_rx.recv(); } } +struct LoggerWorker { + tx: mpsc::Sender, + _handle: thread::JoinHandle<()>, +} + +enum LogEntry { + Line(String), + Flush(mpsc::Sender<()>), +} + fn now_ts() -> String { // RFC3339 for readability; consumers can parse as needed. chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true) @@ -194,6 +264,7 @@ pub(crate) fn log_session_end() { "kind": "session_end", }); LOGGER.write_json_line(value); + LOGGER.flush(); } fn write_record(dir: &str, kind: &str, obj: &T) @@ -208,3 +279,58 @@ where }); LOGGER.write_json_line(value); } + +#[cfg(test)] +mod tests { + use pretty_assertions::assert_eq; + use serde_json::json; + use std::thread; + use std::time::Duration; + use tempfile::tempdir; + + use super::SessionLogger; + + #[test] + fn flush_writes_line_before_returning() { + let dir = tempdir().expect("tempdir"); + let path = dir.path().join("session.jsonl"); + let logger = SessionLogger::new(); + logger.open(path.clone()).expect("open session log"); + + let record = json!({ + "ts": "T", + "dir": "meta", + "kind": "session_end", + }); + logger.write_json_line(record.clone()); + logger.flush(); + + let contents = std::fs::read_to_string(path).expect("read log"); + let lines: Vec<&str> = contents.lines().collect(); + assert_eq!(lines.len(), 1); + let parsed: serde_json::Value = serde_json::from_str(lines[0]).expect("parse json"); + assert_eq!(parsed, record); + } + + #[test] + fn periodic_flush_persists_lines() { + let dir = tempdir().expect("tempdir"); + let path = dir.path().join("session.jsonl"); + let logger = SessionLogger::new(); + logger.open(path.clone()).expect("open session log"); + + let record = json!({ + "ts": "T2", + "dir": "meta", + "kind": "session_start", + }); + logger.write_json_line(record.clone()); + thread::sleep(Duration::from_millis(50)); + + let contents = std::fs::read_to_string(path).expect("read log"); + let lines: Vec<&str> = contents.lines().collect(); + assert_eq!(lines.len(), 1); + let parsed: serde_json::Value = serde_json::from_str(lines[0]).expect("parse json"); + assert_eq!(parsed, record); + } +} diff --git a/codex-rs/tui2/src/session_log.rs b/codex-rs/tui2/src/session_log.rs index b2858e8f210..8d3f11e4334 100644 --- a/codex-rs/tui2/src/session_log.rs +++ b/codex-rs/tui2/src/session_log.rs @@ -1,10 +1,12 @@ -use std::fs::File; use std::fs::OpenOptions; +use std::io::BufWriter; use std::io::Write; use std::path::PathBuf; use std::sync::LazyLock; -use std::sync::Mutex; use std::sync::OnceLock; +use std::sync::mpsc; +use std::thread; +use std::time::Duration; use codex_core::config::Config; use codex_core::protocol::Op; @@ -14,19 +16,28 @@ use serde_json::json; use crate::app_event::AppEvent; static LOGGER: LazyLock = LazyLock::new(SessionLogger::new); +const FLUSH_INTERVAL: Duration = if cfg!(test) { + Duration::from_millis(10) +} else { + Duration::from_millis(500) +}; struct SessionLogger { - file: OnceLock>, + worker: OnceLock, } impl SessionLogger { fn new() -> Self { Self { - file: OnceLock::new(), + worker: OnceLock::new(), } } fn open(&self, path: PathBuf) -> std::io::Result<()> { + if self.worker.get().is_some() { + return Ok(()); + } + let mut opts = OpenOptions::new(); opts.create(true).truncate(true).write(true); @@ -37,41 +48,100 @@ impl SessionLogger { } let file = opts.open(path)?; - self.file.get_or_init(|| Mutex::new(file)); + let (tx, rx) = mpsc::channel::(); + let handle = thread::Builder::new() + .name("tui-session-log".to_string()) + .spawn(move || { + let mut writer = BufWriter::with_capacity(64 * 1024, file); + let mut dirty = false; + loop { + match rx.recv_timeout(FLUSH_INTERVAL) { + Ok(entry) => match entry { + LogEntry::Line(serialized) => { + if let Err(e) = writer.write_all(serialized.as_bytes()) { + tracing::warn!("session log write error: {e}"); + continue; + } + if let Err(e) = writer.write_all(b"\n") { + tracing::warn!("session log write error: {e}"); + continue; + } + dirty = true; + } + LogEntry::Flush(done_tx) => { + if let Err(e) = writer.flush() { + tracing::warn!("session log flush error: {e}"); + } else { + dirty = false; + } + let _ = done_tx.send(()); + } + }, + Err(mpsc::RecvTimeoutError::Timeout) => { + if dirty { + if let Err(e) = writer.flush() { + tracing::warn!("session log flush error: {e}"); + } else { + dirty = false; + } + } + } + Err(mpsc::RecvTimeoutError::Disconnected) => break, + } + } + if dirty { + let _ = writer.flush(); + } + })?; + + let _ = self.worker.set(LoggerWorker { + tx, + _handle: handle, + }); Ok(()) } fn write_json_line(&self, value: serde_json::Value) { - let Some(mutex) = self.file.get() else { + let Some(worker) = self.worker.get() else { return; }; - let mut guard = match mutex.lock() { - Ok(g) => g, - Err(poisoned) => poisoned.into_inner(), - }; match serde_json::to_string(&value) { Ok(serialized) => { - if let Err(e) = guard.write_all(serialized.as_bytes()) { - tracing::warn!("session log write error: {}", e); - return; - } - if let Err(e) = guard.write_all(b"\n") { - tracing::warn!("session log write error: {}", e); - return; - } - if let Err(e) = guard.flush() { - tracing::warn!("session log flush error: {}", e); + if let Err(e) = worker.tx.send(LogEntry::Line(serialized)) { + tracing::warn!("session log write error: {e}"); } } - Err(e) => tracing::warn!("session log serialize error: {}", e), + Err(e) => tracing::warn!("session log serialize error: {e}"), } } fn is_enabled(&self) -> bool { - self.file.get().is_some() + self.worker.get().is_some() + } + + fn flush(&self) { + let Some(worker) = self.worker.get() else { + return; + }; + let (done_tx, done_rx) = mpsc::channel(); + if let Err(e) = worker.tx.send(LogEntry::Flush(done_tx)) { + tracing::warn!("session log flush error: {e}"); + return; + } + let _ = done_rx.recv(); } } +struct LoggerWorker { + tx: mpsc::Sender, + _handle: thread::JoinHandle<()>, +} + +enum LogEntry { + Line(String), + Flush(mpsc::Sender<()>), +} + fn now_ts() -> String { // RFC3339 for readability; consumers can parse as needed. chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true) @@ -194,6 +264,7 @@ pub(crate) fn log_session_end() { "kind": "session_end", }); LOGGER.write_json_line(value); + LOGGER.flush(); } fn write_record(dir: &str, kind: &str, obj: &T) @@ -208,3 +279,58 @@ where }); LOGGER.write_json_line(value); } + +#[cfg(test)] +mod tests { + use pretty_assertions::assert_eq; + use serde_json::json; + use std::thread; + use std::time::Duration; + use tempfile::tempdir; + + use super::SessionLogger; + + #[test] + fn flush_writes_line_before_returning() { + let dir = tempdir().expect("tempdir"); + let path = dir.path().join("session.jsonl"); + let logger = SessionLogger::new(); + logger.open(path.clone()).expect("open session log"); + + let record = json!({ + "ts": "T", + "dir": "meta", + "kind": "session_end", + }); + logger.write_json_line(record.clone()); + logger.flush(); + + let contents = std::fs::read_to_string(path).expect("read log"); + let lines: Vec<&str> = contents.lines().collect(); + assert_eq!(lines.len(), 1); + let parsed: serde_json::Value = serde_json::from_str(lines[0]).expect("parse json"); + assert_eq!(parsed, record); + } + + #[test] + fn periodic_flush_persists_lines() { + let dir = tempdir().expect("tempdir"); + let path = dir.path().join("session.jsonl"); + let logger = SessionLogger::new(); + logger.open(path.clone()).expect("open session log"); + + let record = json!({ + "ts": "T2", + "dir": "meta", + "kind": "session_start", + }); + logger.write_json_line(record.clone()); + thread::sleep(Duration::from_millis(50)); + + let contents = std::fs::read_to_string(path).expect("read log"); + let lines: Vec<&str> = contents.lines().collect(); + assert_eq!(lines.len(), 1); + let parsed: serde_json::Value = serde_json::from_str(lines[0]).expect("parse json"); + assert_eq!(parsed, record); + } +}