Skip to content

Commit 13e1d03

Browse files
authored
Delegate review to codex instance (#5572)
In this PR, I am exploring migrating task kind to an invocation of Codex. The main reason would be getting rid off multiple `ConversationHistory` state and streamlining our context/history management. This approach depends on opening a channel between the sub-codex and codex. This channel is responsible for forwarding `interactive` (`approvals`) and `non-interactive` events. The `task` is responsible for handling those events. This opens the door for implementing `codex as a tool`, replacing `compact` and `review`, and potentially subagents. One consideration is this code is very similar to `app-server` specially in the approval part. If in the future we wanted an interactive `sub-codex` we should consider using `codex-mcp`
1 parent db31f69 commit 13e1d03

28 files changed

+805
-302
lines changed

codex-rs/core/src/apply_patch.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,13 @@ pub(crate) async fn apply_patch(
6161
// that similar patches can be auto-approved in the future during
6262
// this session.
6363
let rx_approve = sess
64-
.request_patch_approval(turn_context, call_id.to_owned(), &action, None, None)
64+
.request_patch_approval(
65+
turn_context,
66+
call_id.to_owned(),
67+
convert_apply_patch_to_protocol(&action),
68+
None,
69+
None,
70+
)
6571
.await;
6672
match rx_approve.await.unwrap_or_default() {
6773
ReviewDecision::Approved | ReviewDecision::ApprovedForSession => {

codex-rs/core/src/chat_completions.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use codex_protocol::models::ContentItem;
2020
use codex_protocol::models::FunctionCallOutputContentItem;
2121
use codex_protocol::models::ReasoningItemContent;
2222
use codex_protocol::models::ResponseItem;
23+
use codex_protocol::protocol::SessionSource;
2324
use eventsource_stream::Eventsource;
2425
use futures::Stream;
2526
use futures::StreamExt;
@@ -41,6 +42,7 @@ pub(crate) async fn stream_chat_completions(
4142
client: &CodexHttpClient,
4243
provider: &ModelProviderInfo,
4344
otel_event_manager: &OtelEventManager,
45+
session_source: &SessionSource,
4446
) -> Result<ResponseStream> {
4547
if prompt.output_schema.is_some() {
4648
return Err(CodexErr::UnsupportedOperation(
@@ -343,7 +345,15 @@ pub(crate) async fn stream_chat_completions(
343345
loop {
344346
attempt += 1;
345347

346-
let req_builder = provider.create_request_builder(client, &None).await?;
348+
let mut req_builder = provider.create_request_builder(client, &None).await?;
349+
350+
// Include session source for backend telemetry and routing.
351+
let task_type = match serde_json::to_value(session_source) {
352+
Ok(serde_json::Value::String(s)) => s,
353+
Ok(other) => other.to_string(),
354+
Err(_) => "unknown".to_string(),
355+
};
356+
req_builder = req_builder.header("Codex-Task-Type", task_type);
347357

348358
let res = otel_event_manager
349359
.log_request(attempt, || {

codex-rs/core/src/client.rs

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use codex_protocol::ConversationId;
1313
use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
1414
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
1515
use codex_protocol::models::ResponseItem;
16+
use codex_protocol::protocol::SessionSource;
1617
use eventsource_stream::Eventsource;
1718
use futures::prelude::*;
1819
use regex_lite::Regex;
@@ -56,7 +57,6 @@ use crate::openai_model_info::get_model_info;
5657
use crate::protocol::RateLimitSnapshot;
5758
use crate::protocol::RateLimitWindow;
5859
use crate::protocol::TokenUsage;
59-
use crate::state::TaskKind;
6060
use crate::token_data::PlanType;
6161
use crate::tools::spec::create_tools_json_for_responses_api;
6262
use crate::util::backoff;
@@ -87,8 +87,10 @@ pub struct ModelClient {
8787
conversation_id: ConversationId,
8888
effort: Option<ReasoningEffortConfig>,
8989
summary: ReasoningSummaryConfig,
90+
session_source: SessionSource,
9091
}
9192

93+
#[allow(clippy::too_many_arguments)]
9294
impl ModelClient {
9395
pub fn new(
9496
config: Arc<Config>,
@@ -98,6 +100,7 @@ impl ModelClient {
98100
effort: Option<ReasoningEffortConfig>,
99101
summary: ReasoningSummaryConfig,
100102
conversation_id: ConversationId,
103+
session_source: SessionSource,
101104
) -> Self {
102105
let client = create_client();
103106

@@ -110,6 +113,7 @@ impl ModelClient {
110113
conversation_id,
111114
effort,
112115
summary,
116+
session_source,
113117
}
114118
}
115119

@@ -127,13 +131,6 @@ impl ModelClient {
127131
})
128132
}
129133

130-
/// Dispatches to either the Responses or Chat implementation depending on
131-
/// the provider config. Public callers always invoke `stream()` – the
132-
/// specialised helpers are private to avoid accidental misuse.
133-
pub async fn stream(&self, prompt: &Prompt) -> Result<ResponseStream> {
134-
self.stream_with_task_kind(prompt, TaskKind::Regular).await
135-
}
136-
137134
pub fn config(&self) -> Arc<Config> {
138135
Arc::clone(&self.config)
139136
}
@@ -142,13 +139,9 @@ impl ModelClient {
142139
&self.provider
143140
}
144141

145-
pub(crate) async fn stream_with_task_kind(
146-
&self,
147-
prompt: &Prompt,
148-
task_kind: TaskKind,
149-
) -> Result<ResponseStream> {
142+
pub async fn stream(&self, prompt: &Prompt) -> Result<ResponseStream> {
150143
match self.provider.wire_api {
151-
WireApi::Responses => self.stream_responses(prompt, task_kind).await,
144+
WireApi::Responses => self.stream_responses(prompt).await,
152145
WireApi::Chat => {
153146
// Create the raw streaming connection first.
154147
let response_stream = stream_chat_completions(
@@ -157,6 +150,7 @@ impl ModelClient {
157150
&self.client,
158151
&self.provider,
159152
&self.otel_event_manager,
153+
&self.session_source,
160154
)
161155
.await?;
162156

@@ -189,11 +183,7 @@ impl ModelClient {
189183
}
190184

191185
/// Implementation for the OpenAI *Responses* experimental API.
192-
async fn stream_responses(
193-
&self,
194-
prompt: &Prompt,
195-
task_kind: TaskKind,
196-
) -> Result<ResponseStream> {
186+
async fn stream_responses(&self, prompt: &Prompt) -> Result<ResponseStream> {
197187
if let Some(path) = &*CODEX_RS_SSE_FIXTURE {
198188
// short circuit for tests
199189
warn!(path, "Streaming from fixture");
@@ -268,7 +258,7 @@ impl ModelClient {
268258
let max_attempts = self.provider.request_max_retries();
269259
for attempt in 0..=max_attempts {
270260
match self
271-
.attempt_stream_responses(attempt, &payload_json, &auth_manager, task_kind)
261+
.attempt_stream_responses(attempt, &payload_json, &auth_manager)
272262
.await
273263
{
274264
Ok(stream) => {
@@ -296,7 +286,6 @@ impl ModelClient {
296286
attempt: u64,
297287
payload_json: &Value,
298288
auth_manager: &Option<Arc<AuthManager>>,
299-
task_kind: TaskKind,
300289
) -> std::result::Result<ResponseStream, StreamAttemptError> {
301290
// Always fetch the latest auth in case a prior attempt refreshed the token.
302291
let auth = auth_manager.as_ref().and_then(|m| m.auth());
@@ -314,12 +303,19 @@ impl ModelClient {
314303
.await
315304
.map_err(StreamAttemptError::Fatal)?;
316305

306+
// Include session source for backend telemetry and routing.
307+
let task_type = match serde_json::to_value(&self.session_source) {
308+
Ok(serde_json::Value::String(s)) => s,
309+
Ok(other) => other.to_string(),
310+
Err(_) => "unknown".to_string(),
311+
};
312+
req_builder = req_builder.header("Codex-Task-Type", task_type);
313+
317314
req_builder = req_builder
318315
// Send session_id for compatibility.
319316
.header("conversation_id", self.conversation_id.to_string())
320317
.header("session_id", self.conversation_id.to_string())
321318
.header(reqwest::header::ACCEPT, "text/event-stream")
322-
.header("Codex-Task-Type", task_kind.header_value())
323319
.json(payload_json);
324320

325321
if let Some(auth) = auth.as_ref()
@@ -462,6 +458,10 @@ impl ModelClient {
462458
self.otel_event_manager.clone()
463459
}
464460

461+
pub fn get_session_source(&self) -> SessionSource {
462+
self.session_source.clone()
463+
}
464+
465465
/// Returns the currently configured model slug.
466466
pub fn get_model(&self) -> String {
467467
self.config.model.clone()

codex-rs/core/src/client_common.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ use tokio::sync::mpsc;
2323
/// Review thread system prompt. Edit `core/src/review_prompt.md` to customize.
2424
pub const REVIEW_PROMPT: &str = include_str!("../review_prompt.md");
2525

26+
// Centralized templates for review-related user messages
27+
pub const REVIEW_EXIT_SUCCESS_TMPL: &str = include_str!("../templates/review/exit_success.xml");
28+
pub const REVIEW_EXIT_INTERRUPTED_TMPL: &str =
29+
include_str!("../templates/review/exit_interrupted.xml");
30+
2631
/// API request payload for a single model turn
2732
#[derive(Default, Debug, Clone)]
2833
pub struct Prompt {

0 commit comments

Comments
 (0)