Skip to content

Commit 428a9f6

Browse files
authored
feat: wait for backfill to be ready (openai#10790)
1 parent 529b539 commit 428a9f6

File tree

3 files changed

+43
-19
lines changed

3 files changed

+43
-19
lines changed

codex-rs/core/src/rollout/tests.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ async fn insert_state_db_thread(
6262
codex_state::StateRuntime::init(home.to_path_buf(), TEST_PROVIDER.to_string(), None)
6363
.await
6464
.expect("state db should initialize");
65+
runtime
66+
.mark_backfill_complete(None)
67+
.await
68+
.expect("backfill should be complete");
6569
let created_at = chrono::Utc
6670
.with_ymd_and_hms(2025, 1, 3, 12, 0, 0)
6771
.single()
@@ -280,6 +284,10 @@ async fn find_thread_path_repairs_missing_db_row_after_filesystem_fallback() {
280284
codex_state::StateRuntime::init(home.to_path_buf(), TEST_PROVIDER.to_string(), None)
281285
.await
282286
.expect("state db should initialize");
287+
_runtime
288+
.mark_backfill_complete(None)
289+
.await
290+
.expect("backfill should be complete");
283291

284292
let found = crate::rollout::find_thread_path_by_id_str(home, &uuid.to_string())
285293
.await

codex-rs/core/src/state_db.rs

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -54,30 +54,20 @@ pub(crate) async fn init_if_enabled(
5454
return None;
5555
}
5656
};
57-
let should_backfill = match runtime.get_backfill_state().await {
58-
Ok(state) => state.status != codex_state::BackfillStatus::Complete,
57+
let backfill_state = match runtime.get_backfill_state().await {
58+
Ok(state) => state,
5959
Err(err) => {
6060
warn!(
6161
"failed to read backfill state at {}: {err}",
6262
config.codex_home.display()
6363
);
64-
true
64+
return None;
6565
}
6666
};
67-
if should_backfill {
68-
let runtime_for_backfill = Arc::clone(&runtime);
69-
let config_for_backfill = config.clone();
70-
let otel_for_backfill = otel.cloned();
71-
tokio::task::spawn(async move {
72-
metadata::backfill_sessions(
73-
runtime_for_backfill.as_ref(),
74-
&config_for_backfill,
75-
otel_for_backfill.as_ref(),
76-
)
77-
.await;
78-
});
67+
if backfill_state.status != codex_state::BackfillStatus::Complete {
68+
metadata::backfill_sessions(runtime.as_ref(), config, otel).await;
7969
}
80-
Some(runtime)
70+
require_backfill_complete(runtime, config.codex_home.as_path()).await
8171
}
8272

8373
/// Get the DB if the feature is enabled and the DB exists.
@@ -88,13 +78,14 @@ pub async fn get_state_db(config: &Config, otel: Option<&OtelManager>) -> Option
8878
{
8979
return None;
9080
}
91-
codex_state::StateRuntime::init(
81+
let runtime = codex_state::StateRuntime::init(
9282
config.codex_home.clone(),
9383
config.model_provider_id.clone(),
9484
otel.cloned(),
9585
)
9686
.await
97-
.ok()
87+
.ok()?;
88+
require_backfill_complete(runtime, config.codex_home.as_path()).await
9889
}
9990

10091
/// Open the state runtime when the SQLite file exists, without feature gating.
@@ -112,7 +103,31 @@ pub async fn open_if_present(codex_home: &Path, default_provider: &str) -> Optio
112103
)
113104
.await
114105
.ok()?;
115-
Some(runtime)
106+
require_backfill_complete(runtime, codex_home).await
107+
}
108+
109+
async fn require_backfill_complete(
110+
runtime: StateDbHandle,
111+
codex_home: &Path,
112+
) -> Option<StateDbHandle> {
113+
match runtime.get_backfill_state().await {
114+
Ok(state) if state.status == codex_state::BackfillStatus::Complete => Some(runtime),
115+
Ok(state) => {
116+
warn!(
117+
"state db backfill not complete at {} (status: {})",
118+
codex_home.display(),
119+
state.status.as_str()
120+
);
121+
None
122+
}
123+
Err(err) => {
124+
warn!(
125+
"failed to read backfill state at {}: {err}",
126+
codex_home.display()
127+
);
128+
None
129+
}
130+
}
116131
}
117132

118133
fn cursor_to_anchor(cursor: Option<&Cursor>) -> Option<codex_state::Anchor> {

codex-rs/core/tests/suite/rollout_list_find.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ async fn upsert_thread_metadata(codex_home: &Path, thread_id: ThreadId, rollout_
5959
let runtime = StateRuntime::init(codex_home.to_path_buf(), "test-provider".to_string(), None)
6060
.await
6161
.unwrap();
62+
runtime.mark_backfill_complete(None).await.unwrap();
6263
let mut builder = ThreadMetadataBuilder::new(
6364
thread_id,
6465
rollout_path,

0 commit comments

Comments
 (0)