@@ -97,6 +97,7 @@ WHERE thread_id = ?
9797 /// Query behavior:
9898 /// - starts from `threads` filtered to active threads and allowed sources
9999 /// (`push_thread_filters`)
100+ /// - excludes threads with `memory_mode != 'enabled'`
100101 /// - excludes the current thread id
101102 /// - keeps only threads in the age window:
102103 /// `updated_at >= now - max_age_days` and `updated_at <= now - min_rollout_idle_hours`
@@ -174,6 +175,7 @@ LEFT JOIN jobs
174175 SortKey :: UpdatedAt ,
175176 None ,
176177 ) ;
178+ builder. push ( " AND threads.memory_mode = 'enabled'" ) ;
177179 builder
178180 . push ( " AND id != " )
179181 . push_bind ( current_thread_id. as_str ( ) ) ;
@@ -1591,6 +1593,77 @@ mod tests {
15911593 let _ = tokio:: fs:: remove_dir_all ( codex_home) . await ;
15921594 }
15931595
1596+ #[ tokio:: test]
1597+ async fn claim_stage1_jobs_skips_threads_with_disabled_memory_mode ( ) {
1598+ let codex_home = unique_temp_dir ( ) ;
1599+ let runtime = StateRuntime :: init ( codex_home. clone ( ) , "test-provider" . to_string ( ) , None )
1600+ . await
1601+ . expect ( "initialize runtime" ) ;
1602+
1603+ let now = Utc :: now ( ) ;
1604+ let eligible_at = now - Duration :: hours ( 13 ) ;
1605+
1606+ let current_thread_id =
1607+ ThreadId :: from_string ( & Uuid :: new_v4 ( ) . to_string ( ) ) . expect ( "current thread id" ) ;
1608+ let disabled_thread_id =
1609+ ThreadId :: from_string ( & Uuid :: new_v4 ( ) . to_string ( ) ) . expect ( "disabled thread id" ) ;
1610+ let enabled_thread_id =
1611+ ThreadId :: from_string ( & Uuid :: new_v4 ( ) . to_string ( ) ) . expect ( "enabled thread id" ) ;
1612+
1613+ let mut current =
1614+ test_thread_metadata ( & codex_home, current_thread_id, codex_home. join ( "current" ) ) ;
1615+ current. created_at = now;
1616+ current. updated_at = now;
1617+ runtime
1618+ . upsert_thread ( & current)
1619+ . await
1620+ . expect ( "upsert current thread" ) ;
1621+
1622+ let mut disabled =
1623+ test_thread_metadata ( & codex_home, disabled_thread_id, codex_home. join ( "disabled" ) ) ;
1624+ disabled. created_at = eligible_at;
1625+ disabled. updated_at = eligible_at;
1626+ runtime
1627+ . upsert_thread ( & disabled)
1628+ . await
1629+ . expect ( "upsert disabled thread" ) ;
1630+ sqlx:: query ( "UPDATE threads SET memory_mode = 'disabled' WHERE id = ?" )
1631+ . bind ( disabled_thread_id. to_string ( ) )
1632+ . execute ( runtime. pool . as_ref ( ) )
1633+ . await
1634+ . expect ( "disable thread memory mode" ) ;
1635+
1636+ let mut enabled =
1637+ test_thread_metadata ( & codex_home, enabled_thread_id, codex_home. join ( "enabled" ) ) ;
1638+ enabled. created_at = eligible_at;
1639+ enabled. updated_at = eligible_at;
1640+ runtime
1641+ . upsert_thread ( & enabled)
1642+ . await
1643+ . expect ( "upsert enabled thread" ) ;
1644+
1645+ let allowed_sources = vec ! [ "cli" . to_string( ) ] ;
1646+ let claims = runtime
1647+ . claim_stage1_jobs_for_startup (
1648+ current_thread_id,
1649+ Stage1StartupClaimParams {
1650+ scan_limit : 10 ,
1651+ max_claimed : 10 ,
1652+ max_age_days : 30 ,
1653+ min_rollout_idle_hours : 12 ,
1654+ allowed_sources : allowed_sources. as_slice ( ) ,
1655+ lease_seconds : 3600 ,
1656+ } ,
1657+ )
1658+ . await
1659+ . expect ( "claim stage1 startup jobs" ) ;
1660+
1661+ assert_eq ! ( claims. len( ) , 1 ) ;
1662+ assert_eq ! ( claims[ 0 ] . thread. id, enabled_thread_id) ;
1663+
1664+ let _ = tokio:: fs:: remove_dir_all ( codex_home) . await ;
1665+ }
1666+
15941667 #[ tokio:: test]
15951668 async fn claim_stage1_jobs_enforces_global_running_cap ( ) {
15961669 let codex_home = unique_temp_dir ( ) ;
0 commit comments