Skip to content

Commit 335c1af

Browse files
committed
chore: introduce CodexServer as a clearinghouse for all conversations
1 parent e6dc5a6 commit 335c1af

32 files changed

+403
-654
lines changed

codex-rs/cli/src/proto.rs

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
use std::io::IsTerminal;
2-
use std::sync::Arc;
32

43
use clap::Parser;
54
use codex_common::CliConfigOverrides;
6-
use codex_core::Codex;
7-
use codex_core::CodexSpawnOk;
5+
use codex_core::ConversationManager;
6+
use codex_core::NewConversation;
87
use codex_core::config::Config;
98
use codex_core::config::ConfigOverrides;
9+
use codex_core::protocol::Event;
10+
use codex_core::protocol::EventMsg;
1011
use codex_core::protocol::Submission;
11-
use codex_core::util::notify_on_sigint;
12-
use codex_login::CodexAuth;
1312
use tokio::io::AsyncBufReadExt;
1413
use tokio::io::BufReader;
1514
use tracing::error;
@@ -36,22 +35,38 @@ pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> {
3635
.map_err(anyhow::Error::msg)?;
3736

3837
let config = Config::load_with_cli_overrides(overrides_vec, ConfigOverrides::default())?;
39-
let auth = CodexAuth::from_codex_home(&config.codex_home)?;
40-
let ctrl_c = notify_on_sigint();
41-
let CodexSpawnOk { codex, .. } = Codex::spawn(config, auth, ctrl_c.clone()).await?;
42-
let codex = Arc::new(codex);
38+
// Use conversation_manager API to start a conversation
39+
let conversation_manager = ConversationManager::default();
40+
let NewConversation {
41+
conversation_id: _,
42+
conversation,
43+
session_configured,
44+
} = conversation_manager.new_conversation(config).await?;
45+
46+
// Simulate streaming the session_configured event.
47+
let synthetic_event = Event {
48+
// Fake id value.
49+
id: "".to_string(),
50+
msg: EventMsg::SessionConfigured(session_configured),
51+
};
52+
let session_configured_event = match serde_json::to_string(&synthetic_event) {
53+
Ok(s) => s,
54+
Err(e) => {
55+
error!("Failed to serialize session_configured: {e}");
56+
return Err(anyhow::Error::from(e));
57+
}
58+
};
59+
println!("{session_configured_event}");
4360

4461
// Task that reads JSON lines from stdin and forwards to Submission Queue
4562
let sq_fut = {
46-
let codex = codex.clone();
47-
let ctrl_c = ctrl_c.clone();
63+
let conversation = conversation.clone();
4864
async move {
4965
let stdin = BufReader::new(tokio::io::stdin());
5066
let mut lines = stdin.lines();
5167
loop {
5268
let result = tokio::select! {
53-
_ = ctrl_c.notified() => {
54-
info!("Interrupted, exiting");
69+
_ = tokio::signal::ctrl_c() => {
5570
break
5671
},
5772
res = lines.next_line() => res,
@@ -65,7 +80,7 @@ pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> {
6580
}
6681
match serde_json::from_str::<Submission>(line) {
6782
Ok(sub) => {
68-
if let Err(e) = codex.submit_with_id(sub).await {
83+
if let Err(e) = conversation.submit_with_id(sub).await {
6984
error!("{e:#}");
7085
break;
7186
}
@@ -88,8 +103,8 @@ pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> {
88103
let eq_fut = async move {
89104
loop {
90105
let event = tokio::select! {
91-
_ = ctrl_c.notified() => break,
92-
event = codex.next_event() => event,
106+
_ = tokio::signal::ctrl_c() => break,
107+
event = conversation.next_event() => event,
93108
};
94109
match event {
95110
Ok(event) => {

codex-rs/core/src/codex.rs

Lines changed: 7 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use futures::prelude::*;
2020
use mcp_types::CallToolResult;
2121
use serde::Serialize;
2222
use serde_json;
23-
use tokio::sync::Notify;
2423
use tokio::sync::oneshot;
2524
use tokio::task::AbortHandle;
2625
use tracing::debug;
@@ -124,11 +123,7 @@ pub struct CodexSpawnOk {
124123

125124
impl Codex {
126125
/// Spawn a new [`Codex`] and initialize the session.
127-
pub async fn spawn(
128-
config: Config,
129-
auth: Option<CodexAuth>,
130-
ctrl_c: Arc<Notify>,
131-
) -> CodexResult<CodexSpawnOk> {
126+
pub async fn spawn(config: Config, auth: Option<CodexAuth>) -> CodexResult<CodexSpawnOk> {
132127
// experimental resume path (undocumented)
133128
let resume_path = config.experimental_resume.clone();
134129
info!("resume_path: {resume_path:?}");
@@ -156,9 +151,9 @@ impl Codex {
156151

157152
// Generate a unique ID for the lifetime of this Codex session.
158153
let session_id = Uuid::new_v4();
159-
tokio::spawn(submission_loop(
160-
session_id, config, auth, rx_sub, tx_event, ctrl_c,
161-
));
154+
155+
// This task will run until Op::Shutdown is received.
156+
tokio::spawn(submission_loop(session_id, config, auth, rx_sub, tx_event));
162157
let codex = Codex {
163158
next_id: AtomicU64::new(0),
164159
tx_sub,
@@ -210,7 +205,6 @@ impl Codex {
210205
pub(crate) struct Session {
211206
client: ModelClient,
212207
pub(crate) tx_event: Sender<Event>,
213-
ctrl_c: Arc<Notify>,
214208

215209
/// The session's current working directory. All relative paths provided by
216210
/// the model as well as sandbox policies are resolved against this path
@@ -493,7 +487,6 @@ impl Session {
493487
let result = process_exec_tool_call(
494488
exec_args.params,
495489
exec_args.sandbox_type,
496-
exec_args.ctrl_c,
497490
exec_args.sandbox_policy,
498491
exec_args.codex_linux_sandbox_exe,
499492
exec_args.stdout_stream,
@@ -578,7 +571,7 @@ impl Session {
578571
.await
579572
}
580573

581-
pub fn abort(&self) {
574+
fn abort(&self) {
582575
info!("Aborting existing session");
583576
let mut state = self.state.lock().unwrap();
584577
state.pending_approvals.clear();
@@ -709,7 +702,6 @@ async fn submission_loop(
709702
auth: Option<CodexAuth>,
710703
rx_sub: Receiver<Submission>,
711704
tx_event: Sender<Event>,
712-
ctrl_c: Arc<Notify>,
713705
) {
714706
let mut sess: Option<Arc<Session>> = None;
715707
// shorthand - send an event when there is no active session
@@ -724,21 +716,8 @@ async fn submission_loop(
724716
tx_event.send(event).await.ok();
725717
};
726718

727-
loop {
728-
let interrupted = ctrl_c.notified();
729-
let sub = tokio::select! {
730-
res = rx_sub.recv() => match res {
731-
Ok(sub) => sub,
732-
Err(_) => break,
733-
},
734-
_ = interrupted => {
735-
if let Some(sess) = sess.as_ref(){
736-
sess.abort();
737-
}
738-
continue;
739-
},
740-
};
741-
719+
// To break out of this loop, send Op::Shutdown.
720+
while let Ok(sub) = rx_sub.recv().await {
742721
debug!(?sub, "Submission");
743722
match sub.op {
744723
Op::Interrupt => {
@@ -877,7 +856,6 @@ async fn submission_loop(
877856
config.include_plan_tool,
878857
),
879858
tx_event: tx_event.clone(),
880-
ctrl_c: Arc::clone(&ctrl_c),
881859
user_instructions,
882860
base_instructions,
883861
approval_policy,
@@ -1787,7 +1765,6 @@ fn parse_container_exec_arguments(
17871765
pub struct ExecInvokeArgs<'a> {
17881766
pub params: ExecParams,
17891767
pub sandbox_type: SandboxType,
1790-
pub ctrl_c: Arc<Notify>,
17911768
pub sandbox_policy: &'a SandboxPolicy,
17921769
pub codex_linux_sandbox_exe: &'a Option<PathBuf>,
17931770
pub stdout_stream: Option<StdoutStream>,
@@ -1972,7 +1949,6 @@ async fn handle_container_exec_with_params(
19721949
ExecInvokeArgs {
19731950
params: params.clone(),
19741951
sandbox_type,
1975-
ctrl_c: sess.ctrl_c.clone(),
19761952
sandbox_policy: &sess.sandbox_policy,
19771953
codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
19781954
stdout_stream: Some(StdoutStream {
@@ -2104,7 +2080,6 @@ async fn handle_sandbox_error(
21042080
ExecInvokeArgs {
21052081
params,
21062082
sandbox_type: SandboxType::None,
2107-
ctrl_c: sess.ctrl_c.clone(),
21082083
sandbox_policy: &sess.sandbox_policy,
21092084
codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
21102085
stdout_stream: Some(StdoutStream {
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
use crate::codex::Codex;
2+
use crate::error::Result as CodexResult;
3+
use crate::protocol::Event;
4+
use crate::protocol::Op;
5+
use crate::protocol::Submission;
6+
7+
pub struct CodexConversation {
8+
codex: Codex,
9+
}
10+
11+
impl CodexConversation {
12+
pub(crate) fn new(codex: Codex) -> Self {
13+
Self { codex }
14+
}
15+
16+
pub async fn submit(&self, op: Op) -> CodexResult<String> {
17+
self.codex.submit(op).await
18+
}
19+
20+
pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> {
21+
self.codex.submit_with_id(sub).await
22+
}
23+
24+
pub async fn next_event(&self) -> CodexResult<Event> {
25+
self.codex.next_event().await
26+
}
27+
}

codex-rs/core/src/codex_wrapper.rs

Lines changed: 0 additions & 59 deletions
This file was deleted.
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
use std::collections::HashMap;
2+
use std::sync::Arc;
3+
4+
use codex_login::CodexAuth;
5+
use tokio::sync::RwLock;
6+
use uuid::Uuid;
7+
8+
use crate::codex::Codex;
9+
use crate::codex::CodexSpawnOk;
10+
use crate::codex_conversation::CodexConversation;
11+
use crate::config::Config;
12+
use crate::error::CodexErr;
13+
use crate::error::Result as CodexResult;
14+
use crate::protocol::Event;
15+
use crate::protocol::EventMsg;
16+
use crate::protocol::SessionConfiguredEvent;
17+
18+
/// Represents a newly created Codex conversation, including the first event
19+
/// (which is [`EventMsg::SessionConfigured`]).
20+
pub struct NewConversation {
21+
pub conversation_id: Uuid,
22+
pub conversation: Arc<CodexConversation>,
23+
pub session_configured: SessionConfiguredEvent,
24+
}
25+
26+
/// [`ConversationManager`] is responsible for creating conversations and
27+
/// maintaining them in memory.
28+
pub struct ConversationManager {
29+
conversations: Arc<RwLock<HashMap<Uuid, Arc<CodexConversation>>>>,
30+
}
31+
32+
impl Default for ConversationManager {
33+
fn default() -> Self {
34+
Self {
35+
conversations: Arc::new(RwLock::new(HashMap::new())),
36+
}
37+
}
38+
}
39+
40+
impl ConversationManager {
41+
pub async fn new_conversation(&self, config: Config) -> CodexResult<NewConversation> {
42+
let auth = CodexAuth::from_codex_home(&config.codex_home)?;
43+
self.new_conversation_with_auth(config, auth).await
44+
}
45+
46+
/// Used for integration tests: should not be used by ordinary business
47+
/// logic.
48+
pub async fn new_conversation_with_auth(
49+
&self,
50+
config: Config,
51+
auth: Option<CodexAuth>,
52+
) -> CodexResult<NewConversation> {
53+
let CodexSpawnOk {
54+
codex,
55+
init_id,
56+
session_id: conversation_id,
57+
} = Codex::spawn(config, auth).await?;
58+
59+
// The first event must be `SessionInitialized`. Validate and forward it
60+
// to the caller so that they can display it in the conversation
61+
// history.
62+
let event = codex.next_event().await?;
63+
let session_configured = match event {
64+
Event {
65+
id,
66+
msg: EventMsg::SessionConfigured(session_configured),
67+
} if id == init_id => session_configured,
68+
_ => {
69+
return Err(CodexErr::SessionConfiguredNotFirstEvent);
70+
}
71+
};
72+
73+
let conversation = Arc::new(CodexConversation::new(codex));
74+
self.conversations
75+
.write()
76+
.await
77+
.insert(conversation_id, conversation.clone());
78+
79+
Ok(NewConversation {
80+
conversation_id,
81+
conversation,
82+
session_configured,
83+
})
84+
}
85+
86+
pub async fn get_conversation(
87+
&self,
88+
conversation_id: Uuid,
89+
) -> CodexResult<Arc<CodexConversation>> {
90+
let conversations = self.conversations.read().await;
91+
conversations
92+
.get(&conversation_id)
93+
.cloned()
94+
.ok_or_else(|| CodexErr::ConversationNotFound(conversation_id))
95+
}
96+
}

0 commit comments

Comments
 (0)