Skip to content

Commit 7fdd266

Browse files
committed
chore: introduce CodexServer as a clearinghouse for all conversations
1 parent e8670ad commit 7fdd266

29 files changed

+368
-645
lines changed

codex-rs/cli/src/proto.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
use std::io::IsTerminal;
2-
use std::sync::Arc;
32

43
use clap::Parser;
54
use codex_common::CliConfigOverrides;
6-
use codex_core::Codex;
7-
use codex_core::CodexSpawnOk;
5+
use codex_core::ConversationManager;
6+
use codex_core::NewConversation;
87
use codex_core::config::Config;
98
use codex_core::config::ConfigOverrides;
109
use codex_core::protocol::Submission;
11-
use codex_core::util::notify_on_sigint;
12-
use codex_login::CodexAuth;
1310
use tokio::io::AsyncBufReadExt;
1411
use tokio::io::BufReader;
1512
use tracing::error;
@@ -36,22 +33,23 @@ pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> {
3633
.map_err(anyhow::Error::msg)?;
3734

3835
let config = Config::load_with_cli_overrides(overrides_vec, ConfigOverrides::default())?;
39-
let auth = CodexAuth::from_codex_home(&config.codex_home)?;
40-
let ctrl_c = notify_on_sigint();
41-
let CodexSpawnOk { codex, .. } = Codex::spawn(config, auth, ctrl_c.clone()).await?;
42-
let codex = Arc::new(codex);
36+
// Use conversation_manager API to start a conversation
37+
let conversation_manager = ConversationManager::default();
38+
let NewConversation {
39+
conversation_id: _,
40+
conversation,
41+
session_configured: _,
42+
} = conversation_manager.new_conversation(config).await?;
4343

4444
// Task that reads JSON lines from stdin and forwards to Submission Queue
4545
let sq_fut = {
46-
let codex = codex.clone();
47-
let ctrl_c = ctrl_c.clone();
46+
let conversation = conversation.clone();
4847
async move {
4948
let stdin = BufReader::new(tokio::io::stdin());
5049
let mut lines = stdin.lines();
5150
loop {
5251
let result = tokio::select! {
53-
_ = ctrl_c.notified() => {
54-
info!("Interrupted, exiting");
52+
_ = tokio::signal::ctrl_c() => {
5553
break
5654
},
5755
res = lines.next_line() => res,
@@ -65,7 +63,7 @@ pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> {
6563
}
6664
match serde_json::from_str::<Submission>(line) {
6765
Ok(sub) => {
68-
if let Err(e) = codex.submit_with_id(sub).await {
66+
if let Err(e) = conversation.submit_with_id(sub).await {
6967
error!("{e:#}");
7068
break;
7169
}
@@ -88,8 +86,8 @@ pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> {
8886
let eq_fut = async move {
8987
loop {
9088
let event = tokio::select! {
91-
_ = ctrl_c.notified() => break,
92-
event = codex.next_event() => event,
89+
_ = tokio::signal::ctrl_c() => break,
90+
event = conversation.next_event() => event,
9391
};
9492
match event {
9593
Ok(event) => {

codex-rs/core/src/codex.rs

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use futures::prelude::*;
2020
use mcp_types::CallToolResult;
2121
use serde::Serialize;
2222
use serde_json;
23-
use tokio::sync::Notify;
2423
use tokio::sync::oneshot;
2524
use tokio::task::AbortHandle;
2625
use tracing::debug;
@@ -123,11 +122,7 @@ pub struct CodexSpawnOk {
123122

124123
impl Codex {
125124
/// Spawn a new [`Codex`] and initialize the session.
126-
pub async fn spawn(
127-
config: Config,
128-
auth: Option<CodexAuth>,
129-
ctrl_c: Arc<Notify>,
130-
) -> CodexResult<CodexSpawnOk> {
125+
pub async fn spawn(config: Config, auth: Option<CodexAuth>) -> CodexResult<CodexSpawnOk> {
131126
// experimental resume path (undocumented)
132127
let resume_path = config.experimental_resume.clone();
133128
info!("resume_path: {resume_path:?}");
@@ -155,9 +150,7 @@ impl Codex {
155150

156151
// Generate a unique ID for the lifetime of this Codex session.
157152
let session_id = Uuid::new_v4();
158-
tokio::spawn(submission_loop(
159-
session_id, config, auth, rx_sub, tx_event, ctrl_c,
160-
));
153+
tokio::spawn(submission_loop(session_id, config, auth, rx_sub, tx_event));
161154
let codex = Codex {
162155
next_id: AtomicU64::new(0),
163156
tx_sub,
@@ -209,7 +202,6 @@ impl Codex {
209202
pub(crate) struct Session {
210203
client: ModelClient,
211204
pub(crate) tx_event: Sender<Event>,
212-
ctrl_c: Arc<Notify>,
213205

214206
/// The session's current working directory. All relative paths provided by
215207
/// the model as well as sandbox policies are resolved against this path
@@ -492,7 +484,6 @@ impl Session {
492484
let result = process_exec_tool_call(
493485
exec_args.params,
494486
exec_args.sandbox_type,
495-
exec_args.ctrl_c,
496487
exec_args.sandbox_policy,
497488
exec_args.codex_linux_sandbox_exe,
498489
exec_args.stdout_stream,
@@ -577,7 +568,7 @@ impl Session {
577568
.await
578569
}
579570

580-
pub fn abort(&self) {
571+
fn abort(&self) {
581572
info!("Aborting existing session");
582573
let mut state = self.state.lock().unwrap();
583574
state.pending_approvals.clear();
@@ -708,7 +699,6 @@ async fn submission_loop(
708699
auth: Option<CodexAuth>,
709700
rx_sub: Receiver<Submission>,
710701
tx_event: Sender<Event>,
711-
ctrl_c: Arc<Notify>,
712702
) {
713703
let mut sess: Option<Arc<Session>> = None;
714704
// shorthand - send an event when there is no active session
@@ -723,21 +713,7 @@ async fn submission_loop(
723713
tx_event.send(event).await.ok();
724714
};
725715

726-
loop {
727-
let interrupted = ctrl_c.notified();
728-
let sub = tokio::select! {
729-
res = rx_sub.recv() => match res {
730-
Ok(sub) => sub,
731-
Err(_) => break,
732-
},
733-
_ = interrupted => {
734-
if let Some(sess) = sess.as_ref(){
735-
sess.abort();
736-
}
737-
continue;
738-
},
739-
};
740-
716+
while let Ok(sub) = rx_sub.recv().await {
741717
debug!(?sub, "Submission");
742718
match sub.op {
743719
Op::Interrupt => {
@@ -876,7 +852,6 @@ async fn submission_loop(
876852
config.include_plan_tool,
877853
),
878854
tx_event: tx_event.clone(),
879-
ctrl_c: Arc::clone(&ctrl_c),
880855
user_instructions,
881856
base_instructions,
882857
approval_policy,
@@ -1779,7 +1754,6 @@ fn parse_container_exec_arguments(
17791754
pub struct ExecInvokeArgs<'a> {
17801755
pub params: ExecParams,
17811756
pub sandbox_type: SandboxType,
1782-
pub ctrl_c: Arc<Notify>,
17831757
pub sandbox_policy: &'a SandboxPolicy,
17841758
pub codex_linux_sandbox_exe: &'a Option<PathBuf>,
17851759
pub stdout_stream: Option<StdoutStream>,
@@ -1964,7 +1938,6 @@ async fn handle_container_exec_with_params(
19641938
ExecInvokeArgs {
19651939
params: params.clone(),
19661940
sandbox_type,
1967-
ctrl_c: sess.ctrl_c.clone(),
19681941
sandbox_policy: &sess.sandbox_policy,
19691942
codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
19701943
stdout_stream: Some(StdoutStream {
@@ -2096,7 +2069,6 @@ async fn handle_sandbox_error(
20962069
ExecInvokeArgs {
20972070
params,
20982071
sandbox_type: SandboxType::None,
2099-
ctrl_c: sess.ctrl_c.clone(),
21002072
sandbox_policy: &sess.sandbox_policy,
21012073
codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
21022074
stdout_stream: Some(StdoutStream {
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
use crate::codex::Codex;
2+
use crate::error::Result as CodexResult;
3+
use crate::protocol::Event;
4+
use crate::protocol::Op;
5+
use crate::protocol::Submission;
6+
7+
pub struct CodexConversation {
8+
codex: Codex,
9+
}
10+
11+
impl CodexConversation {
12+
pub(crate) fn new(codex: Codex) -> Self {
13+
Self { codex }
14+
}
15+
16+
pub async fn submit(&self, op: Op) -> CodexResult<String> {
17+
self.codex.submit(op).await
18+
}
19+
20+
pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> {
21+
self.codex.submit_with_id(sub).await
22+
}
23+
24+
pub async fn next_event(&self) -> CodexResult<Event> {
25+
self.codex.next_event().await
26+
}
27+
}

codex-rs/core/src/codex_wrapper.rs

Lines changed: 0 additions & 59 deletions
This file was deleted.
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
use std::collections::HashMap;
2+
use std::sync::Arc;
3+
4+
use codex_login::CodexAuth;
5+
use tokio::sync::RwLock;
6+
use uuid::Uuid;
7+
8+
use crate::codex::Codex;
9+
use crate::codex::CodexSpawnOk;
10+
use crate::codex_conversation::CodexConversation;
11+
use crate::config::Config;
12+
use crate::error::CodexErr;
13+
use crate::error::Result as CodexResult;
14+
use crate::protocol::Event;
15+
use crate::protocol::EventMsg;
16+
use crate::protocol::SessionConfiguredEvent;
17+
18+
/// Represents a newly created Codex conversation, including the first event
19+
/// (which is [`EventMsg::SessionConfigured`]).
20+
pub struct NewConversation {
21+
pub conversation_id: Uuid,
22+
pub conversation: Arc<CodexConversation>,
23+
pub session_configured: SessionConfiguredEvent,
24+
}
25+
26+
/// [`ConversationManager`] is responsible for creating conversations and
27+
/// maintaining them in memory.
28+
pub struct ConversationManager {
29+
conversations: Arc<RwLock<HashMap<Uuid, Arc<CodexConversation>>>>,
30+
}
31+
32+
impl Default for ConversationManager {
33+
fn default() -> Self {
34+
Self {
35+
conversations: Arc::new(RwLock::new(HashMap::new())),
36+
}
37+
}
38+
}
39+
40+
impl ConversationManager {
41+
pub async fn new_conversation(&self, config: Config) -> CodexResult<NewConversation> {
42+
let auth = CodexAuth::from_codex_home(&config.codex_home)?;
43+
self.new_conversation_with_auth(config, auth).await
44+
}
45+
46+
/// Used for integration tests: should not be used by ordinary business
47+
/// logic.
48+
pub async fn new_conversation_with_auth(
49+
&self,
50+
config: Config,
51+
auth: Option<CodexAuth>,
52+
) -> CodexResult<NewConversation> {
53+
let CodexSpawnOk {
54+
codex,
55+
init_id,
56+
session_id: conversation_id,
57+
} = Codex::spawn(config, auth).await?;
58+
59+
// The first event must be `SessionInitialized`. Validate and forward it
60+
// to the caller so that they can display it in the conversation
61+
// history.
62+
let event = codex.next_event().await?;
63+
let session_configured = match event {
64+
Event {
65+
id,
66+
msg: EventMsg::SessionConfigured(session_configured),
67+
} if id == init_id => session_configured,
68+
_ => {
69+
return Err(CodexErr::SessionConfiguredNotFirstEvent);
70+
}
71+
};
72+
73+
let conversation = Arc::new(CodexConversation::new(codex));
74+
self.conversations
75+
.write()
76+
.await
77+
.insert(conversation_id, conversation.clone());
78+
79+
Ok(NewConversation {
80+
conversation_id,
81+
conversation,
82+
session_configured,
83+
})
84+
}
85+
86+
pub async fn get_conversation(
87+
&self,
88+
conversation_id: Uuid,
89+
) -> CodexResult<Arc<CodexConversation>> {
90+
let conversations = self.conversations.read().await;
91+
conversations
92+
.get(&conversation_id)
93+
.cloned()
94+
.ok_or_else(|| CodexErr::ConversationNotFound(conversation_id))
95+
}
96+
}

codex-rs/core/src/error.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use serde_json;
33
use std::io;
44
use thiserror::Error;
55
use tokio::task::JoinError;
6+
use uuid::Uuid;
67

78
pub type Result<T> = std::result::Result<T, CodexErr>;
89

@@ -44,6 +45,12 @@ pub enum CodexErr {
4445
#[error("stream disconnected before completion: {0}")]
4546
Stream(String),
4647

48+
#[error("no conversation with id: {0}")]
49+
ConversationNotFound(Uuid),
50+
51+
#[error("session configured event was not the first event in the stream")]
52+
SessionConfiguredNotFirstEvent,
53+
4754
/// Returned by run_command_stream when the spawned child process timed out (10s).
4855
#[error("timeout waiting for child process to exit")]
4956
Timeout,

0 commit comments

Comments
 (0)