Skip to content

Commit 18ecca4

Browse files
committed
chore: introduce CodexServer as a clearinghouse for all conversations
1 parent e6dc5a6 commit 18ecca4

32 files changed

+379
-654
lines changed

codex-rs/cli/src/proto.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
use std::io::IsTerminal;
2-
use std::sync::Arc;
32

43
use clap::Parser;
54
use codex_common::CliConfigOverrides;
6-
use codex_core::Codex;
7-
use codex_core::CodexSpawnOk;
5+
use codex_core::ConversationManager;
6+
use codex_core::NewConversation;
87
use codex_core::config::Config;
98
use codex_core::config::ConfigOverrides;
109
use codex_core::protocol::Submission;
11-
use codex_core::util::notify_on_sigint;
12-
use codex_login::CodexAuth;
1310
use tokio::io::AsyncBufReadExt;
1411
use tokio::io::BufReader;
1512
use tracing::error;
@@ -36,22 +33,23 @@ pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> {
3633
.map_err(anyhow::Error::msg)?;
3734

3835
let config = Config::load_with_cli_overrides(overrides_vec, ConfigOverrides::default())?;
39-
let auth = CodexAuth::from_codex_home(&config.codex_home)?;
40-
let ctrl_c = notify_on_sigint();
41-
let CodexSpawnOk { codex, .. } = Codex::spawn(config, auth, ctrl_c.clone()).await?;
42-
let codex = Arc::new(codex);
36+
// Use conversation_manager API to start a conversation
37+
let conversation_manager = ConversationManager::default();
38+
let NewConversation {
39+
conversation_id: _,
40+
conversation,
41+
session_configured: _,
42+
} = conversation_manager.new_conversation(config).await?;
4343

4444
// Task that reads JSON lines from stdin and forwards to Submission Queue
4545
let sq_fut = {
46-
let codex = codex.clone();
47-
let ctrl_c = ctrl_c.clone();
46+
let conversation = conversation.clone();
4847
async move {
4948
let stdin = BufReader::new(tokio::io::stdin());
5049
let mut lines = stdin.lines();
5150
loop {
5251
let result = tokio::select! {
53-
_ = ctrl_c.notified() => {
54-
info!("Interrupted, exiting");
52+
_ = tokio::signal::ctrl_c() => {
5553
break
5654
},
5755
res = lines.next_line() => res,
@@ -65,7 +63,7 @@ pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> {
6563
}
6664
match serde_json::from_str::<Submission>(line) {
6765
Ok(sub) => {
68-
if let Err(e) = codex.submit_with_id(sub).await {
66+
if let Err(e) = conversation.submit_with_id(sub).await {
6967
error!("{e:#}");
7068
break;
7169
}
@@ -88,8 +86,8 @@ pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> {
8886
let eq_fut = async move {
8987
loop {
9088
let event = tokio::select! {
91-
_ = ctrl_c.notified() => break,
92-
event = codex.next_event() => event,
89+
_ = tokio::signal::ctrl_c() => break,
90+
event = conversation.next_event() => event,
9391
};
9492
match event {
9593
Ok(event) => {

codex-rs/core/src/codex.rs

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use futures::prelude::*;
2020
use mcp_types::CallToolResult;
2121
use serde::Serialize;
2222
use serde_json;
23-
use tokio::sync::Notify;
2423
use tokio::sync::oneshot;
2524
use tokio::task::AbortHandle;
2625
use tracing::debug;
@@ -124,11 +123,7 @@ pub struct CodexSpawnOk {
124123

125124
impl Codex {
126125
/// Spawn a new [`Codex`] and initialize the session.
127-
pub async fn spawn(
128-
config: Config,
129-
auth: Option<CodexAuth>,
130-
ctrl_c: Arc<Notify>,
131-
) -> CodexResult<CodexSpawnOk> {
126+
pub async fn spawn(config: Config, auth: Option<CodexAuth>) -> CodexResult<CodexSpawnOk> {
132127
// experimental resume path (undocumented)
133128
let resume_path = config.experimental_resume.clone();
134129
info!("resume_path: {resume_path:?}");
@@ -156,9 +151,7 @@ impl Codex {
156151

157152
// Generate a unique ID for the lifetime of this Codex session.
158153
let session_id = Uuid::new_v4();
159-
tokio::spawn(submission_loop(
160-
session_id, config, auth, rx_sub, tx_event, ctrl_c,
161-
));
154+
tokio::spawn(submission_loop(session_id, config, auth, rx_sub, tx_event));
162155
let codex = Codex {
163156
next_id: AtomicU64::new(0),
164157
tx_sub,
@@ -210,7 +203,6 @@ impl Codex {
210203
pub(crate) struct Session {
211204
client: ModelClient,
212205
pub(crate) tx_event: Sender<Event>,
213-
ctrl_c: Arc<Notify>,
214206

215207
/// The session's current working directory. All relative paths provided by
216208
/// the model as well as sandbox policies are resolved against this path
@@ -493,7 +485,6 @@ impl Session {
493485
let result = process_exec_tool_call(
494486
exec_args.params,
495487
exec_args.sandbox_type,
496-
exec_args.ctrl_c,
497488
exec_args.sandbox_policy,
498489
exec_args.codex_linux_sandbox_exe,
499490
exec_args.stdout_stream,
@@ -578,7 +569,7 @@ impl Session {
578569
.await
579570
}
580571

581-
pub fn abort(&self) {
572+
fn abort(&self) {
582573
info!("Aborting existing session");
583574
let mut state = self.state.lock().unwrap();
584575
state.pending_approvals.clear();
@@ -709,7 +700,6 @@ async fn submission_loop(
709700
auth: Option<CodexAuth>,
710701
rx_sub: Receiver<Submission>,
711702
tx_event: Sender<Event>,
712-
ctrl_c: Arc<Notify>,
713703
) {
714704
let mut sess: Option<Arc<Session>> = None;
715705
// shorthand - send an event when there is no active session
@@ -724,21 +714,7 @@ async fn submission_loop(
724714
tx_event.send(event).await.ok();
725715
};
726716

727-
loop {
728-
let interrupted = ctrl_c.notified();
729-
let sub = tokio::select! {
730-
res = rx_sub.recv() => match res {
731-
Ok(sub) => sub,
732-
Err(_) => break,
733-
},
734-
_ = interrupted => {
735-
if let Some(sess) = sess.as_ref(){
736-
sess.abort();
737-
}
738-
continue;
739-
},
740-
};
741-
717+
while let Ok(sub) = rx_sub.recv().await {
742718
debug!(?sub, "Submission");
743719
match sub.op {
744720
Op::Interrupt => {
@@ -877,7 +853,6 @@ async fn submission_loop(
877853
config.include_plan_tool,
878854
),
879855
tx_event: tx_event.clone(),
880-
ctrl_c: Arc::clone(&ctrl_c),
881856
user_instructions,
882857
base_instructions,
883858
approval_policy,
@@ -1787,7 +1762,6 @@ fn parse_container_exec_arguments(
17871762
pub struct ExecInvokeArgs<'a> {
17881763
pub params: ExecParams,
17891764
pub sandbox_type: SandboxType,
1790-
pub ctrl_c: Arc<Notify>,
17911765
pub sandbox_policy: &'a SandboxPolicy,
17921766
pub codex_linux_sandbox_exe: &'a Option<PathBuf>,
17931767
pub stdout_stream: Option<StdoutStream>,
@@ -1972,7 +1946,6 @@ async fn handle_container_exec_with_params(
19721946
ExecInvokeArgs {
19731947
params: params.clone(),
19741948
sandbox_type,
1975-
ctrl_c: sess.ctrl_c.clone(),
19761949
sandbox_policy: &sess.sandbox_policy,
19771950
codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
19781951
stdout_stream: Some(StdoutStream {
@@ -2104,7 +2077,6 @@ async fn handle_sandbox_error(
21042077
ExecInvokeArgs {
21052078
params,
21062079
sandbox_type: SandboxType::None,
2107-
ctrl_c: sess.ctrl_c.clone(),
21082080
sandbox_policy: &sess.sandbox_policy,
21092081
codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
21102082
stdout_stream: Some(StdoutStream {
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
use crate::codex::Codex;
2+
use crate::error::Result as CodexResult;
3+
use crate::protocol::Event;
4+
use crate::protocol::Op;
5+
use crate::protocol::Submission;
6+
7+
pub struct CodexConversation {
8+
codex: Codex,
9+
}
10+
11+
impl CodexConversation {
12+
pub(crate) fn new(codex: Codex) -> Self {
13+
Self { codex }
14+
}
15+
16+
pub async fn submit(&self, op: Op) -> CodexResult<String> {
17+
self.codex.submit(op).await
18+
}
19+
20+
pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> {
21+
self.codex.submit_with_id(sub).await
22+
}
23+
24+
pub async fn next_event(&self) -> CodexResult<Event> {
25+
self.codex.next_event().await
26+
}
27+
}

codex-rs/core/src/codex_wrapper.rs

Lines changed: 0 additions & 59 deletions
This file was deleted.
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
use std::collections::HashMap;
2+
use std::sync::Arc;
3+
4+
use codex_login::CodexAuth;
5+
use tokio::sync::RwLock;
6+
use uuid::Uuid;
7+
8+
use crate::codex::Codex;
9+
use crate::codex::CodexSpawnOk;
10+
use crate::codex_conversation::CodexConversation;
11+
use crate::config::Config;
12+
use crate::error::CodexErr;
13+
use crate::error::Result as CodexResult;
14+
use crate::protocol::Event;
15+
use crate::protocol::EventMsg;
16+
use crate::protocol::SessionConfiguredEvent;
17+
18+
/// Represents a newly created Codex conversation, including the first event
19+
/// (which is [`EventMsg::SessionConfigured`]).
20+
pub struct NewConversation {
21+
pub conversation_id: Uuid,
22+
pub conversation: Arc<CodexConversation>,
23+
pub session_configured: SessionConfiguredEvent,
24+
}
25+
26+
/// [`ConversationManager`] is responsible for creating conversations and
27+
/// maintaining them in memory.
28+
pub struct ConversationManager {
29+
conversations: Arc<RwLock<HashMap<Uuid, Arc<CodexConversation>>>>,
30+
}
31+
32+
impl Default for ConversationManager {
33+
fn default() -> Self {
34+
Self {
35+
conversations: Arc::new(RwLock::new(HashMap::new())),
36+
}
37+
}
38+
}
39+
40+
impl ConversationManager {
41+
pub async fn new_conversation(&self, config: Config) -> CodexResult<NewConversation> {
42+
let auth = CodexAuth::from_codex_home(&config.codex_home)?;
43+
self.new_conversation_with_auth(config, auth).await
44+
}
45+
46+
/// Used for integration tests: should not be used by ordinary business
47+
/// logic.
48+
pub async fn new_conversation_with_auth(
49+
&self,
50+
config: Config,
51+
auth: Option<CodexAuth>,
52+
) -> CodexResult<NewConversation> {
53+
let CodexSpawnOk {
54+
codex,
55+
init_id,
56+
session_id: conversation_id,
57+
} = Codex::spawn(config, auth).await?;
58+
59+
// The first event must be `SessionInitialized`. Validate and forward it
60+
// to the caller so that they can display it in the conversation
61+
// history.
62+
let event = codex.next_event().await?;
63+
let session_configured = match event {
64+
Event {
65+
id,
66+
msg: EventMsg::SessionConfigured(session_configured),
67+
} if id == init_id => session_configured,
68+
_ => {
69+
return Err(CodexErr::SessionConfiguredNotFirstEvent);
70+
}
71+
};
72+
73+
let conversation = Arc::new(CodexConversation::new(codex));
74+
self.conversations
75+
.write()
76+
.await
77+
.insert(conversation_id, conversation.clone());
78+
79+
Ok(NewConversation {
80+
conversation_id,
81+
conversation,
82+
session_configured,
83+
})
84+
}
85+
86+
pub async fn get_conversation(
87+
&self,
88+
conversation_id: Uuid,
89+
) -> CodexResult<Arc<CodexConversation>> {
90+
let conversations = self.conversations.read().await;
91+
conversations
92+
.get(&conversation_id)
93+
.cloned()
94+
.ok_or_else(|| CodexErr::ConversationNotFound(conversation_id))
95+
}
96+
}

codex-rs/core/src/error.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use serde_json;
33
use std::io;
44
use thiserror::Error;
55
use tokio::task::JoinError;
6+
use uuid::Uuid;
67

78
pub type Result<T> = std::result::Result<T, CodexErr>;
89

@@ -44,6 +45,12 @@ pub enum CodexErr {
4445
#[error("stream disconnected before completion: {0}")]
4546
Stream(String),
4647

48+
#[error("no conversation with id: {0}")]
49+
ConversationNotFound(Uuid),
50+
51+
#[error("session configured event was not the first event in the stream")]
52+
SessionConfiguredNotFirstEvent,
53+
4754
/// Returned by run_command_stream when the spawned child process timed out (10s).
4855
#[error("timeout waiting for child process to exit")]
4956
Timeout,

0 commit comments

Comments
 (0)