Skip to content

Commit 72eb131

Browse files
committed
tui: move session logging off the main thread
1 parent 7e5b3e0 commit 72eb131

File tree

2 files changed

+130
-44
lines changed

2 files changed

+130
-44
lines changed

codex-rs/tui/src/session_log.rs

Lines changed: 65 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
use std::fs::File;
21
use std::fs::OpenOptions;
2+
use std::io::BufWriter;
33
use std::io::Write;
44
use std::path::PathBuf;
55
use std::sync::LazyLock;
6-
use std::sync::Mutex;
76
use std::sync::OnceLock;
7+
use std::sync::mpsc;
8+
use std::thread;
89

910
use codex_core::config::Config;
1011
use codex_core::protocol::Op;
@@ -16,17 +17,21 @@ use crate::app_event::AppEvent;
1617
static LOGGER: LazyLock<SessionLogger> = LazyLock::new(SessionLogger::new);
1718

1819
struct SessionLogger {
19-
file: OnceLock<Mutex<File>>,
20+
worker: OnceLock<LoggerWorker>,
2021
}
2122

2223
impl SessionLogger {
2324
fn new() -> Self {
2425
Self {
25-
file: OnceLock::new(),
26+
worker: OnceLock::new(),
2627
}
2728
}
2829

2930
fn open(&self, path: PathBuf) -> std::io::Result<()> {
31+
if self.worker.get().is_some() {
32+
return Ok(());
33+
}
34+
3035
let mut opts = OpenOptions::new();
3136
opts.create(true).truncate(true).write(true);
3237

@@ -37,41 +42,78 @@ impl SessionLogger {
3742
}
3843

3944
let file = opts.open(path)?;
40-
self.file.get_or_init(|| Mutex::new(file));
45+
let (tx, rx) = mpsc::channel::<LogEntry>();
46+
let handle = thread::Builder::new()
47+
.name("tui-session-log".to_string())
48+
.spawn(move || {
49+
let mut writer = BufWriter::with_capacity(64 * 1024, file);
50+
for entry in rx {
51+
match entry {
52+
LogEntry::Line(serialized) => {
53+
if let Err(e) = writer.write_all(serialized.as_bytes()) {
54+
tracing::warn!("session log write error: {e}");
55+
continue;
56+
}
57+
if let Err(e) = writer.write_all(b"\n") {
58+
tracing::warn!("session log write error: {e}");
59+
continue;
60+
}
61+
}
62+
LogEntry::Flush => {
63+
if let Err(e) = writer.flush() {
64+
tracing::warn!("session log flush error: {e}");
65+
}
66+
}
67+
}
68+
}
69+
let _ = writer.flush();
70+
})?;
71+
72+
let _ = self.worker.set(LoggerWorker {
73+
tx,
74+
_handle: handle,
75+
});
4176
Ok(())
4277
}
4378

4479
fn write_json_line(&self, value: serde_json::Value) {
45-
let Some(mutex) = self.file.get() else {
80+
let Some(worker) = self.worker.get() else {
4681
return;
4782
};
48-
let mut guard = match mutex.lock() {
49-
Ok(g) => g,
50-
Err(poisoned) => poisoned.into_inner(),
51-
};
5283
match serde_json::to_string(&value) {
5384
Ok(serialized) => {
54-
if let Err(e) = guard.write_all(serialized.as_bytes()) {
55-
tracing::warn!("session log write error: {}", e);
56-
return;
57-
}
58-
if let Err(e) = guard.write_all(b"\n") {
59-
tracing::warn!("session log write error: {}", e);
60-
return;
61-
}
62-
if let Err(e) = guard.flush() {
63-
tracing::warn!("session log flush error: {}", e);
85+
if let Err(e) = worker.tx.send(LogEntry::Line(serialized)) {
86+
tracing::warn!("session log write error: {e}");
6487
}
6588
}
66-
Err(e) => tracing::warn!("session log serialize error: {}", e),
89+
Err(e) => tracing::warn!("session log serialize error: {e}"),
6790
}
6891
}
6992

7093
fn is_enabled(&self) -> bool {
71-
self.file.get().is_some()
94+
self.worker.get().is_some()
95+
}
96+
97+
fn flush(&self) {
98+
let Some(worker) = self.worker.get() else {
99+
return;
100+
};
101+
if let Err(e) = worker.tx.send(LogEntry::Flush) {
102+
tracing::warn!("session log flush error: {e}");
103+
}
72104
}
73105
}
74106

107+
struct LoggerWorker {
108+
tx: mpsc::Sender<LogEntry>,
109+
_handle: thread::JoinHandle<()>,
110+
}
111+
112+
enum LogEntry {
113+
Line(String),
114+
Flush,
115+
}
116+
75117
fn now_ts() -> String {
76118
// RFC3339 for readability; consumers can parse as needed.
77119
chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
@@ -194,6 +236,7 @@ pub(crate) fn log_session_end() {
194236
"kind": "session_end",
195237
});
196238
LOGGER.write_json_line(value);
239+
LOGGER.flush();
197240
}
198241

199242
fn write_record<T>(dir: &str, kind: &str, obj: &T)

codex-rs/tui2/src/session_log.rs

Lines changed: 65 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
use std::fs::File;
21
use std::fs::OpenOptions;
2+
use std::io::BufWriter;
33
use std::io::Write;
44
use std::path::PathBuf;
55
use std::sync::LazyLock;
6-
use std::sync::Mutex;
76
use std::sync::OnceLock;
7+
use std::sync::mpsc;
8+
use std::thread;
89

910
use codex_core::config::Config;
1011
use codex_core::protocol::Op;
@@ -16,17 +17,21 @@ use crate::app_event::AppEvent;
1617
static LOGGER: LazyLock<SessionLogger> = LazyLock::new(SessionLogger::new);
1718

1819
struct SessionLogger {
19-
file: OnceLock<Mutex<File>>,
20+
worker: OnceLock<LoggerWorker>,
2021
}
2122

2223
impl SessionLogger {
2324
fn new() -> Self {
2425
Self {
25-
file: OnceLock::new(),
26+
worker: OnceLock::new(),
2627
}
2728
}
2829

2930
fn open(&self, path: PathBuf) -> std::io::Result<()> {
31+
if self.worker.get().is_some() {
32+
return Ok(());
33+
}
34+
3035
let mut opts = OpenOptions::new();
3136
opts.create(true).truncate(true).write(true);
3237

@@ -37,41 +42,78 @@ impl SessionLogger {
3742
}
3843

3944
let file = opts.open(path)?;
40-
self.file.get_or_init(|| Mutex::new(file));
45+
let (tx, rx) = mpsc::channel::<LogEntry>();
46+
let handle = thread::Builder::new()
47+
.name("tui-session-log".to_string())
48+
.spawn(move || {
49+
let mut writer = BufWriter::with_capacity(64 * 1024, file);
50+
for entry in rx {
51+
match entry {
52+
LogEntry::Line(serialized) => {
53+
if let Err(e) = writer.write_all(serialized.as_bytes()) {
54+
tracing::warn!("session log write error: {e}");
55+
continue;
56+
}
57+
if let Err(e) = writer.write_all(b"\n") {
58+
tracing::warn!("session log write error: {e}");
59+
continue;
60+
}
61+
}
62+
LogEntry::Flush => {
63+
if let Err(e) = writer.flush() {
64+
tracing::warn!("session log flush error: {e}");
65+
}
66+
}
67+
}
68+
}
69+
let _ = writer.flush();
70+
})?;
71+
72+
let _ = self.worker.set(LoggerWorker {
73+
tx,
74+
_handle: handle,
75+
});
4176
Ok(())
4277
}
4378

4479
fn write_json_line(&self, value: serde_json::Value) {
45-
let Some(mutex) = self.file.get() else {
80+
let Some(worker) = self.worker.get() else {
4681
return;
4782
};
48-
let mut guard = match mutex.lock() {
49-
Ok(g) => g,
50-
Err(poisoned) => poisoned.into_inner(),
51-
};
5283
match serde_json::to_string(&value) {
5384
Ok(serialized) => {
54-
if let Err(e) = guard.write_all(serialized.as_bytes()) {
55-
tracing::warn!("session log write error: {}", e);
56-
return;
57-
}
58-
if let Err(e) = guard.write_all(b"\n") {
59-
tracing::warn!("session log write error: {}", e);
60-
return;
61-
}
62-
if let Err(e) = guard.flush() {
63-
tracing::warn!("session log flush error: {}", e);
85+
if let Err(e) = worker.tx.send(LogEntry::Line(serialized)) {
86+
tracing::warn!("session log write error: {e}");
6487
}
6588
}
66-
Err(e) => tracing::warn!("session log serialize error: {}", e),
89+
Err(e) => tracing::warn!("session log serialize error: {e}"),
6790
}
6891
}
6992

7093
fn is_enabled(&self) -> bool {
71-
self.file.get().is_some()
94+
self.worker.get().is_some()
95+
}
96+
97+
fn flush(&self) {
98+
let Some(worker) = self.worker.get() else {
99+
return;
100+
};
101+
if let Err(e) = worker.tx.send(LogEntry::Flush) {
102+
tracing::warn!("session log flush error: {e}");
103+
}
72104
}
73105
}
74106

107+
struct LoggerWorker {
108+
tx: mpsc::Sender<LogEntry>,
109+
_handle: thread::JoinHandle<()>,
110+
}
111+
112+
enum LogEntry {
113+
Line(String),
114+
Flush,
115+
}
116+
75117
fn now_ts() -> String {
76118
// RFC3339 for readability; consumers can parse as needed.
77119
chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
@@ -194,6 +236,7 @@ pub(crate) fn log_session_end() {
194236
"kind": "session_end",
195237
});
196238
LOGGER.write_json_line(value);
239+
LOGGER.flush();
197240
}
198241

199242
fn write_record<T>(dir: &str, kind: &str, obj: &T)

0 commit comments

Comments
 (0)