Skip to content

Commit bb9e75c

Browse files
committed
chore: introduce CodexServer as a clearinghouse for all conversations
1 parent e8670ad commit bb9e75c

25 files changed

+393
-589
lines changed

codex-rs/cli/src/proto.rs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
use std::io::IsTerminal;
2-
use std::sync::Arc;
32

43
use clap::Parser;
54
use codex_common::CliConfigOverrides;
6-
use codex_core::Codex;
7-
use codex_core::CodexSpawnOk;
5+
use codex_core::ConversationManager;
6+
use codex_core::NewConversation;
87
use codex_core::config::Config;
98
use codex_core::config::ConfigOverrides;
109
use codex_core::protocol::Submission;
11-
use codex_core::util::notify_on_sigint;
12-
use codex_login::CodexAuth;
1310
use tokio::io::AsyncBufReadExt;
1411
use tokio::io::BufReader;
1512
use tracing::error;
@@ -36,21 +33,23 @@ pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> {
3633
.map_err(anyhow::Error::msg)?;
3734

3835
let config = Config::load_with_cli_overrides(overrides_vec, ConfigOverrides::default())?;
39-
let auth = CodexAuth::from_codex_home(&config.codex_home)?;
40-
let ctrl_c = notify_on_sigint();
41-
let CodexSpawnOk { codex, .. } = Codex::spawn(config, auth, ctrl_c.clone()).await?;
42-
let codex = Arc::new(codex);
36+
// Use conversation_manager API to start a conversation
37+
let conversation_manager = ConversationManager::default();
38+
let NewConversation {
39+
conversation_id: _,
40+
conversation,
41+
session_configured: _,
42+
} = conversation_manager.new_conversation(config).await?;
4343

4444
// Task that reads JSON lines from stdin and forwards to Submission Queue
4545
let sq_fut = {
46-
let codex = codex.clone();
47-
let ctrl_c = ctrl_c.clone();
46+
let conversation = conversation.clone();
4847
async move {
4948
let stdin = BufReader::new(tokio::io::stdin());
5049
let mut lines = stdin.lines();
5150
loop {
5251
let result = tokio::select! {
53-
_ = ctrl_c.notified() => {
52+
_ = conversation.on_abort() => {
5453
info!("Interrupted, exiting");
5554
break
5655
},
@@ -65,7 +64,7 @@ pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> {
6564
}
6665
match serde_json::from_str::<Submission>(line) {
6766
Ok(sub) => {
68-
if let Err(e) = codex.submit_with_id(sub).await {
67+
if let Err(e) = conversation.submit_with_id(sub).await {
6968
error!("{e:#}");
7069
break;
7170
}
@@ -88,8 +87,8 @@ pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> {
8887
let eq_fut = async move {
8988
loop {
9089
let event = tokio::select! {
91-
_ = ctrl_c.notified() => break,
92-
event = codex.next_event() => event,
90+
_ = conversation.on_abort() => break,
91+
event = conversation.next_event() => event,
9392
};
9493
match event {
9594
Ok(event) => {

codex-rs/core/src/codex.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ impl Codex {
126126
pub async fn spawn(
127127
config: Config,
128128
auth: Option<CodexAuth>,
129-
ctrl_c: Arc<Notify>,
129+
on_abort: Arc<Notify>,
130130
) -> CodexResult<CodexSpawnOk> {
131131
// experimental resume path (undocumented)
132132
let resume_path = config.experimental_resume.clone();
@@ -156,7 +156,7 @@ impl Codex {
156156
// Generate a unique ID for the lifetime of this Codex session.
157157
let session_id = Uuid::new_v4();
158158
tokio::spawn(submission_loop(
159-
session_id, config, auth, rx_sub, tx_event, ctrl_c,
159+
session_id, config, auth, rx_sub, tx_event, on_abort,
160160
));
161161
let codex = Codex {
162162
next_id: AtomicU64::new(0),
@@ -577,7 +577,7 @@ impl Session {
577577
.await
578578
}
579579

580-
pub fn abort(&self) {
580+
fn abort(&self) {
581581
info!("Aborting existing session");
582582
let mut state = self.state.lock().unwrap();
583583
state.pending_approvals.clear();
@@ -708,7 +708,7 @@ async fn submission_loop(
708708
auth: Option<CodexAuth>,
709709
rx_sub: Receiver<Submission>,
710710
tx_event: Sender<Event>,
711-
ctrl_c: Arc<Notify>,
711+
on_abort: Arc<Notify>,
712712
) {
713713
let mut sess: Option<Arc<Session>> = None;
714714
// shorthand - send an event when there is no active session
@@ -724,13 +724,13 @@ async fn submission_loop(
724724
};
725725

726726
loop {
727-
let interrupted = ctrl_c.notified();
727+
let aborted = on_abort.notified();
728728
let sub = tokio::select! {
729729
res = rx_sub.recv() => match res {
730730
Ok(sub) => sub,
731731
Err(_) => break,
732732
},
733-
_ = interrupted => {
733+
_ = aborted => {
734734
if let Some(sess) = sess.as_ref(){
735735
sess.abort();
736736
}
@@ -876,7 +876,7 @@ async fn submission_loop(
876876
config.include_plan_tool,
877877
),
878878
tx_event: tx_event.clone(),
879-
ctrl_c: Arc::clone(&ctrl_c),
879+
ctrl_c: Arc::clone(&on_abort),
880880
user_instructions,
881881
base_instructions,
882882
approval_policy,
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use std::sync::Arc;
2+
3+
use tokio::sync::Notify;
4+
use tokio::sync::futures::Notified;
5+
6+
use crate::codex::Codex;
7+
use crate::error::Result as CodexResult;
8+
use crate::protocol::Event;
9+
use crate::protocol::Op;
10+
use crate::protocol::Submission;
11+
12+
pub struct CodexConversation {
13+
codex: Codex,
14+
on_abort: Arc<Notify>,
15+
}
16+
17+
impl CodexConversation {
18+
pub(crate) fn new(codex: Codex, on_abort: Arc<Notify>) -> Self {
19+
Self { codex, on_abort }
20+
}
21+
22+
pub async fn submit(&self, op: Op) -> CodexResult<String> {
23+
self.codex.submit(op).await
24+
}
25+
26+
pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> {
27+
self.codex.submit_with_id(sub).await
28+
}
29+
30+
pub async fn next_event(&self) -> CodexResult<Event> {
31+
self.codex.next_event().await
32+
}
33+
34+
pub fn abort(&self) {
35+
self.on_abort.notify_waiters();
36+
}
37+
38+
/// await this to get notified when the user _aborts_ the conversation.
39+
pub fn on_abort(&self) -> Notified {
40+
self.on_abort.notified()
41+
}
42+
}

codex-rs/core/src/codex_wrapper.rs

Lines changed: 0 additions & 59 deletions
This file was deleted.
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
use std::collections::HashMap;
2+
use std::sync::Arc;
3+
4+
use codex_login::CodexAuth;
5+
use tokio::sync::Notify;
6+
use tokio::sync::RwLock;
7+
use uuid::Uuid;
8+
9+
use crate::codex::Codex;
10+
use crate::codex::CodexSpawnOk;
11+
use crate::codex_conversation::CodexConversation;
12+
use crate::config::Config;
13+
use crate::error::CodexErr;
14+
use crate::error::Result as CodexResult;
15+
use crate::protocol::Event;
16+
use crate::protocol::EventMsg;
17+
use crate::protocol::SessionConfiguredEvent;
18+
19+
/// Represents a newly created Codex conversation, including the first event
20+
/// (which is [`EventMsg::SessionConfigured`]).
21+
pub struct NewConversation {
22+
pub conversation_id: Uuid,
23+
pub conversation: Arc<CodexConversation>,
24+
pub session_configured: SessionConfiguredEvent,
25+
}
26+
27+
/// [`ConversationManager`] is responsible for creating conversations and
28+
/// maintaining them in memory.
29+
pub struct ConversationManager {
30+
conversations: Arc<RwLock<HashMap<Uuid, Arc<CodexConversation>>>>,
31+
}
32+
33+
impl Default for ConversationManager {
34+
fn default() -> Self {
35+
Self {
36+
conversations: Arc::new(RwLock::new(HashMap::new())),
37+
}
38+
}
39+
}
40+
41+
impl ConversationManager {
42+
pub async fn new_conversation(&self, config: Config) -> CodexResult<NewConversation> {
43+
let auth = CodexAuth::from_codex_home(&config.codex_home)?;
44+
self.new_conversation_with_auth(config, auth).await
45+
}
46+
47+
/// Used for integration tests: should not be used by ordinary business
48+
/// logic.
49+
pub async fn new_conversation_with_auth(
50+
&self,
51+
config: Config,
52+
auth: Option<CodexAuth>,
53+
) -> CodexResult<NewConversation> {
54+
let on_abort = Arc::new(Notify::new());
55+
56+
let CodexSpawnOk {
57+
codex,
58+
init_id,
59+
session_id: conversation_id,
60+
} = Codex::spawn(config, auth, on_abort.clone()).await?;
61+
62+
// The first event must be `SessionInitialized`. Validate and forward it
63+
// to the caller so that they can display it in the conversation
64+
// history.
65+
let event = codex.next_event().await?;
66+
let session_configured = match event {
67+
Event {
68+
id,
69+
msg: EventMsg::SessionConfigured(session_configured),
70+
} if id == init_id => session_configured,
71+
_ => {
72+
return Err(CodexErr::SessionConfiguredNotFirstEvent);
73+
}
74+
};
75+
76+
let conversation = Arc::new(CodexConversation::new(codex, on_abort));
77+
self.conversations
78+
.write()
79+
.await
80+
.insert(conversation_id, conversation.clone());
81+
82+
Ok(NewConversation {
83+
conversation_id,
84+
conversation,
85+
session_configured,
86+
})
87+
}
88+
89+
pub async fn get_conversation(
90+
&self,
91+
conversation_id: Uuid,
92+
) -> CodexResult<Arc<CodexConversation>> {
93+
let conversations = self.conversations.read().await;
94+
conversations
95+
.get(&conversation_id)
96+
.cloned()
97+
.ok_or_else(|| CodexErr::ConversationNotFound(conversation_id))
98+
}
99+
}

codex-rs/core/src/error.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use serde_json;
33
use std::io;
44
use thiserror::Error;
55
use tokio::task::JoinError;
6+
use uuid::Uuid;
67

78
pub type Result<T> = std::result::Result<T, CodexErr>;
89

@@ -44,6 +45,12 @@ pub enum CodexErr {
4445
#[error("stream disconnected before completion: {0}")]
4546
Stream(String),
4647

48+
#[error("no conversation with id: {0}")]
49+
ConversationNotFound(Uuid),
50+
51+
#[error("session configured event was not the first event in the stream")]
52+
SessionConfiguredNotFirstEvent,
53+
4754
/// Returned by run_command_stream when the spawned child process timed out (10s).
4855
#[error("timeout waiting for child process to exit")]
4956
Timeout,

codex-rs/core/src/lib.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,8 @@ mod chat_completions;
1111
mod client;
1212
mod client_common;
1313
pub mod codex;
14-
pub use codex::Codex;
15-
pub use codex::CodexSpawnOk;
16-
pub mod codex_wrapper;
14+
mod codex_conversation;
15+
pub use codex_conversation::CodexConversation;
1716
pub mod config;
1817
pub mod config_profile;
1918
pub mod config_types;
@@ -34,6 +33,9 @@ pub use model_provider_info::ModelProviderInfo;
3433
pub use model_provider_info::WireApi;
3534
pub use model_provider_info::built_in_model_providers;
3635
pub use model_provider_info::create_oss_provider_with_base_url;
36+
mod conversation_manager;
37+
pub use conversation_manager::ConversationManager;
38+
pub use conversation_manager::NewConversation;
3739
pub mod model_family;
3840
mod models;
3941
mod openai_model_info;

0 commit comments

Comments
 (0)