diff --git a/codex-rs/state/migrations/0011_logs_partition_prune_indexes.sql b/codex-rs/state/migrations/0011_logs_partition_prune_indexes.sql new file mode 100644 index 00000000000..06a4cde563a --- /dev/null +++ b/codex-rs/state/migrations/0011_logs_partition_prune_indexes.sql @@ -0,0 +1,4 @@ +CREATE INDEX idx_logs_thread_id_ts ON logs(thread_id, ts DESC, ts_nanos DESC, id DESC); + +CREATE INDEX idx_logs_process_uuid_threadless_ts ON logs(process_uuid, ts DESC, ts_nanos DESC, id DESC) +WHERE thread_id IS NULL; diff --git a/codex-rs/state/migrations/0012_logs_estimated_bytes.sql b/codex-rs/state/migrations/0012_logs_estimated_bytes.sql new file mode 100644 index 00000000000..a3fb3495b01 --- /dev/null +++ b/codex-rs/state/migrations/0012_logs_estimated_bytes.sql @@ -0,0 +1,9 @@ +ALTER TABLE logs ADD COLUMN estimated_bytes INTEGER NOT NULL DEFAULT 0; + +UPDATE logs +SET estimated_bytes = + LENGTH(CAST(COALESCE(message, '') AS BLOB)) + + LENGTH(CAST(level AS BLOB)) + + LENGTH(CAST(target AS BLOB)) + + LENGTH(CAST(COALESCE(module_path, '') AS BLOB)) + + LENGTH(CAST(COALESCE(file, '') AS BLOB)); diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index f9beacdae8a..4570532e693 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -27,11 +27,13 @@ use sqlx::ConnectOptions; use sqlx::QueryBuilder; use sqlx::Row; use sqlx::Sqlite; +use sqlx::SqliteConnection; use sqlx::SqlitePool; use sqlx::sqlite::SqliteConnectOptions; use sqlx::sqlite::SqliteJournalMode; use sqlx::sqlite::SqlitePoolOptions; use sqlx::sqlite::SqliteSynchronous; +use std::collections::BTreeSet; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; @@ -42,6 +44,12 @@ use uuid::Uuid; mod memories; // Memory-specific CRUD and phase job lifecycle methods live in `runtime/memories.rs`. +// "Partition" is the retention bucket we cap at 10 MiB: +// - one bucket per non-null thread_id +// - one bucket per threadless (thread_id IS NULL) non-null process_uuid +// - one bucket for threadless rows with process_uuid IS NULL +const LOG_PARTITION_SIZE_LIMIT_BYTES: i64 = 10 * 1024 * 1024; + #[derive(Clone)] pub struct StateRuntime { codex_home: PathBuf, @@ -359,10 +367,16 @@ FROM threads return Ok(()); } + let mut tx = self.pool.begin().await?; let mut builder = QueryBuilder::::new( - "INSERT INTO logs (ts, ts_nanos, level, target, message, thread_id, process_uuid, module_path, file, line) ", + "INSERT INTO logs (ts, ts_nanos, level, target, message, thread_id, process_uuid, module_path, file, line, estimated_bytes) ", ); builder.push_values(entries, |mut row, entry| { + let estimated_bytes = entry.message.as_ref().map_or(0, String::len) as i64 + + entry.level.len() as i64 + + entry.target.len() as i64 + + entry.module_path.as_ref().map_or(0, String::len) as i64 + + entry.file.as_ref().map_or(0, String::len) as i64; row.push_bind(entry.ts) .push_bind(entry.ts_nanos) .push_bind(&entry.level) @@ -372,9 +386,228 @@ FROM threads .push_bind(&entry.process_uuid) .push_bind(&entry.module_path) .push_bind(&entry.file) - .push_bind(entry.line); + .push_bind(entry.line) + .push_bind(estimated_bytes); }); - builder.build().execute(self.pool.as_ref()).await?; + builder.build().execute(&mut *tx).await?; + self.prune_logs_after_insert(entries, &mut tx).await?; + tx.commit().await?; + Ok(()) + } + + /// Enforce per-partition log size caps after a successful batch insert. + /// + /// We maintain two independent budgets: + /// - Thread logs: rows with `thread_id IS NOT NULL`, capped per `thread_id`. + /// - Threadless process logs: rows with `thread_id IS NULL` ("threadless"), + /// capped per `process_uuid` (including `process_uuid IS NULL` as its own + /// threadless partition). + /// + /// "Threadless" means the log row is not associated with any conversation + /// thread, so retention is keyed by process identity instead. + /// + /// This runs inside the same transaction as the insert so callers never + /// observe "inserted but not yet pruned" rows. + async fn prune_logs_after_insert( + &self, + entries: &[LogEntry], + tx: &mut SqliteConnection, + ) -> anyhow::Result<()> { + let thread_ids: BTreeSet<&str> = entries + .iter() + .filter_map(|entry| entry.thread_id.as_deref()) + .collect(); + if !thread_ids.is_empty() { + // Cheap precheck: only run the heavier window-function prune for + // threads that are currently above the cap. + let mut over_limit_threads_query = + QueryBuilder::::new("SELECT thread_id FROM logs WHERE thread_id IN ("); + { + let mut separated = over_limit_threads_query.separated(", "); + for thread_id in &thread_ids { + separated.push_bind(*thread_id); + } + } + over_limit_threads_query.push(") GROUP BY thread_id HAVING SUM("); + over_limit_threads_query.push("estimated_bytes"); + over_limit_threads_query.push(") > "); + over_limit_threads_query.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES); + let over_limit_thread_ids: Vec = over_limit_threads_query + .build() + .fetch_all(&mut *tx) + .await? + .into_iter() + .map(|row| row.try_get("thread_id")) + .collect::>()?; + if !over_limit_thread_ids.is_empty() { + // Enforce a strict per-thread cap by deleting every row whose + // newest-first cumulative bytes exceed the partition budget. + let mut prune_threads = QueryBuilder::::new( + r#" +DELETE FROM logs +WHERE id IN ( + SELECT id + FROM ( + SELECT + id, + SUM( +"#, + ); + prune_threads.push("estimated_bytes"); + prune_threads.push( + r#" + ) OVER ( + PARTITION BY thread_id + ORDER BY ts DESC, ts_nanos DESC, id DESC + ) AS cumulative_bytes + FROM logs + WHERE thread_id IN ( +"#, + ); + { + let mut separated = prune_threads.separated(", "); + for thread_id in &over_limit_thread_ids { + separated.push_bind(thread_id); + } + } + prune_threads.push( + r#" + ) + ) + WHERE cumulative_bytes > +"#, + ); + prune_threads.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES); + prune_threads.push("\n)"); + prune_threads.build().execute(&mut *tx).await?; + } + } + + let threadless_process_uuids: BTreeSet<&str> = entries + .iter() + .filter(|entry| entry.thread_id.is_none()) + .filter_map(|entry| entry.process_uuid.as_deref()) + .collect(); + let has_threadless_null_process_uuid = entries + .iter() + .any(|entry| entry.thread_id.is_none() && entry.process_uuid.is_none()); + if !threadless_process_uuids.is_empty() { + // Threadless logs are budgeted separately per process UUID. + let mut over_limit_processes_query = QueryBuilder::::new( + "SELECT process_uuid FROM logs WHERE thread_id IS NULL AND process_uuid IN (", + ); + { + let mut separated = over_limit_processes_query.separated(", "); + for process_uuid in &threadless_process_uuids { + separated.push_bind(*process_uuid); + } + } + over_limit_processes_query.push(") GROUP BY process_uuid HAVING SUM("); + over_limit_processes_query.push("estimated_bytes"); + over_limit_processes_query.push(") > "); + over_limit_processes_query.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES); + let over_limit_process_uuids: Vec = over_limit_processes_query + .build() + .fetch_all(&mut *tx) + .await? + .into_iter() + .map(|row| row.try_get("process_uuid")) + .collect::>()?; + if !over_limit_process_uuids.is_empty() { + // Same strict cap policy as thread pruning, but only for + // threadless rows in the affected process UUIDs. + let mut prune_threadless_process_logs = QueryBuilder::::new( + r#" +DELETE FROM logs +WHERE id IN ( + SELECT id + FROM ( + SELECT + id, + SUM( +"#, + ); + prune_threadless_process_logs.push("estimated_bytes"); + prune_threadless_process_logs.push( + r#" + ) OVER ( + PARTITION BY process_uuid + ORDER BY ts DESC, ts_nanos DESC, id DESC + ) AS cumulative_bytes + FROM logs + WHERE thread_id IS NULL + AND process_uuid IN ( +"#, + ); + { + let mut separated = prune_threadless_process_logs.separated(", "); + for process_uuid in &over_limit_process_uuids { + separated.push_bind(process_uuid); + } + } + prune_threadless_process_logs.push( + r#" + ) + ) + WHERE cumulative_bytes > +"#, + ); + prune_threadless_process_logs.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES); + prune_threadless_process_logs.push("\n)"); + prune_threadless_process_logs + .build() + .execute(&mut *tx) + .await?; + } + } + if has_threadless_null_process_uuid { + // Rows without a process UUID still need a cap; treat NULL as its + // own threadless partition. + let mut null_process_usage_query = QueryBuilder::::new("SELECT SUM("); + null_process_usage_query.push("estimated_bytes"); + null_process_usage_query.push( + ") AS total_bytes FROM logs WHERE thread_id IS NULL AND process_uuid IS NULL", + ); + let total_null_process_bytes: Option = null_process_usage_query + .build() + .fetch_one(&mut *tx) + .await? + .try_get("total_bytes")?; + + if total_null_process_bytes.unwrap_or(0) > LOG_PARTITION_SIZE_LIMIT_BYTES { + let mut prune_threadless_null_process_logs = QueryBuilder::::new( + r#" +DELETE FROM logs +WHERE id IN ( + SELECT id + FROM ( + SELECT + id, + SUM( +"#, + ); + prune_threadless_null_process_logs.push("estimated_bytes"); + prune_threadless_null_process_logs.push( + r#" + ) OVER ( + PARTITION BY process_uuid + ORDER BY ts DESC, ts_nanos DESC, id DESC + ) AS cumulative_bytes + FROM logs + WHERE thread_id IS NULL + AND process_uuid IS NULL + ) + WHERE cumulative_bytes > +"#, + ); + prune_threadless_null_process_logs.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES); + prune_threadless_null_process_logs.push("\n)"); + prune_threadless_null_process_logs + .build() + .execute(&mut *tx) + .await?; + } + } Ok(()) } @@ -2553,6 +2786,300 @@ VALUES (?, ?, ?, ?, ?) let _ = tokio::fs::remove_dir_all(codex_home).await; } + #[tokio::test] + async fn insert_logs_prunes_old_rows_when_thread_exceeds_size_limit() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + let six_mebibytes = "a".repeat(6 * 1024 * 1024); + runtime + .insert_logs(&[ + LogEntry { + ts: 1, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some(six_mebibytes.clone()), + thread_id: Some("thread-1".to_string()), + process_uuid: Some("proc-1".to_string()), + file: Some("main.rs".to_string()), + line: Some(1), + module_path: Some("mod".to_string()), + }, + LogEntry { + ts: 2, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some(six_mebibytes.clone()), + thread_id: Some("thread-1".to_string()), + process_uuid: Some("proc-1".to_string()), + file: Some("main.rs".to_string()), + line: Some(2), + module_path: Some("mod".to_string()), + }, + ]) + .await + .expect("insert test logs"); + + let rows = runtime + .query_logs(&LogQuery { + thread_ids: vec!["thread-1".to_string()], + ..Default::default() + }) + .await + .expect("query thread logs"); + + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].ts, 2); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + + #[tokio::test] + async fn insert_logs_prunes_single_thread_row_when_it_exceeds_size_limit() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + let eleven_mebibytes = "d".repeat(11 * 1024 * 1024); + runtime + .insert_logs(&[LogEntry { + ts: 1, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some(eleven_mebibytes), + thread_id: Some("thread-oversized".to_string()), + process_uuid: Some("proc-1".to_string()), + file: Some("main.rs".to_string()), + line: Some(1), + module_path: Some("mod".to_string()), + }]) + .await + .expect("insert test log"); + + let rows = runtime + .query_logs(&LogQuery { + thread_ids: vec!["thread-oversized".to_string()], + ..Default::default() + }) + .await + .expect("query thread logs"); + + assert!(rows.is_empty()); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + + #[tokio::test] + async fn insert_logs_prunes_threadless_rows_per_process_uuid_only() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + let six_mebibytes = "b".repeat(6 * 1024 * 1024); + runtime + .insert_logs(&[ + LogEntry { + ts: 1, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some(six_mebibytes.clone()), + thread_id: None, + process_uuid: Some("proc-1".to_string()), + file: Some("main.rs".to_string()), + line: Some(1), + module_path: Some("mod".to_string()), + }, + LogEntry { + ts: 2, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some(six_mebibytes.clone()), + thread_id: None, + process_uuid: Some("proc-1".to_string()), + file: Some("main.rs".to_string()), + line: Some(2), + module_path: Some("mod".to_string()), + }, + LogEntry { + ts: 3, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some(six_mebibytes), + thread_id: Some("thread-1".to_string()), + process_uuid: Some("proc-1".to_string()), + file: Some("main.rs".to_string()), + line: Some(3), + module_path: Some("mod".to_string()), + }, + ]) + .await + .expect("insert test logs"); + + let rows = runtime + .query_logs(&LogQuery { + thread_ids: vec!["thread-1".to_string()], + include_threadless: true, + ..Default::default() + }) + .await + .expect("query thread and threadless logs"); + + let mut timestamps: Vec = rows.into_iter().map(|row| row.ts).collect(); + timestamps.sort_unstable(); + assert_eq!(timestamps, vec![2, 3]); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + + #[tokio::test] + async fn insert_logs_prunes_single_threadless_process_row_when_it_exceeds_size_limit() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + let eleven_mebibytes = "e".repeat(11 * 1024 * 1024); + runtime + .insert_logs(&[LogEntry { + ts: 1, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some(eleven_mebibytes), + thread_id: None, + process_uuid: Some("proc-oversized".to_string()), + file: Some("main.rs".to_string()), + line: Some(1), + module_path: Some("mod".to_string()), + }]) + .await + .expect("insert test log"); + + let rows = runtime + .query_logs(&LogQuery { + include_threadless: true, + ..Default::default() + }) + .await + .expect("query threadless logs"); + + assert!(rows.is_empty()); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + + #[tokio::test] + async fn insert_logs_prunes_threadless_rows_with_null_process_uuid() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + let six_mebibytes = "c".repeat(6 * 1024 * 1024); + runtime + .insert_logs(&[ + LogEntry { + ts: 1, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some(six_mebibytes.clone()), + thread_id: None, + process_uuid: None, + file: Some("main.rs".to_string()), + line: Some(1), + module_path: Some("mod".to_string()), + }, + LogEntry { + ts: 2, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some(six_mebibytes), + thread_id: None, + process_uuid: None, + file: Some("main.rs".to_string()), + line: Some(2), + module_path: Some("mod".to_string()), + }, + LogEntry { + ts: 3, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some("small".to_string()), + thread_id: None, + process_uuid: Some("proc-1".to_string()), + file: Some("main.rs".to_string()), + line: Some(3), + module_path: Some("mod".to_string()), + }, + ]) + .await + .expect("insert test logs"); + + let rows = runtime + .query_logs(&LogQuery { + include_threadless: true, + ..Default::default() + }) + .await + .expect("query threadless logs"); + + let mut timestamps: Vec = rows.into_iter().map(|row| row.ts).collect(); + timestamps.sort_unstable(); + assert_eq!(timestamps, vec![2, 3]); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + + #[tokio::test] + async fn insert_logs_prunes_single_threadless_null_process_row_when_it_exceeds_limit() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + let eleven_mebibytes = "f".repeat(11 * 1024 * 1024); + runtime + .insert_logs(&[LogEntry { + ts: 1, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some(eleven_mebibytes), + thread_id: None, + process_uuid: None, + file: Some("main.rs".to_string()), + line: Some(1), + module_path: Some("mod".to_string()), + }]) + .await + .expect("insert test log"); + + let rows = runtime + .query_logs(&LogQuery { + include_threadless: true, + ..Default::default() + }) + .await + .expect("query threadless logs"); + + assert!(rows.is_empty()); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + fn test_thread_metadata( codex_home: &Path, thread_id: ThreadId,