Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 31 additions & 16 deletions codex-rs/cli/src/proto.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use std::io::IsTerminal;
use std::sync::Arc;

use clap::Parser;
use codex_common::CliConfigOverrides;
use codex_core::Codex;
use codex_core::CodexSpawnOk;
use codex_core::ConversationManager;
use codex_core::NewConversation;
use codex_core::config::Config;
use codex_core::config::ConfigOverrides;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Submission;
use codex_core::util::notify_on_sigint;
use codex_login::CodexAuth;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tracing::error;
Expand All @@ -36,22 +35,38 @@ pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> {
.map_err(anyhow::Error::msg)?;

let config = Config::load_with_cli_overrides(overrides_vec, ConfigOverrides::default())?;
let auth = CodexAuth::from_codex_home(&config.codex_home)?;
let ctrl_c = notify_on_sigint();
let CodexSpawnOk { codex, .. } = Codex::spawn(config, auth, ctrl_c.clone()).await?;
let codex = Arc::new(codex);
// Use conversation_manager API to start a conversation
let conversation_manager = ConversationManager::default();
let NewConversation {
conversation_id: _,
conversation,
session_configured,
} = conversation_manager.new_conversation(config).await?;

// Simulate streaming the session_configured event.
let synthetic_event = Event {
// Fake id value.
id: "".to_string(),
msg: EventMsg::SessionConfigured(session_configured),
};
let session_configured_event = match serde_json::to_string(&synthetic_event) {
Ok(s) => s,
Err(e) => {
error!("Failed to serialize session_configured: {e}");
return Err(anyhow::Error::from(e));
}
};
println!("{session_configured_event}");

// Task that reads JSON lines from stdin and forwards to Submission Queue
let sq_fut = {
let codex = codex.clone();
let ctrl_c = ctrl_c.clone();
let conversation = conversation.clone();
async move {
let stdin = BufReader::new(tokio::io::stdin());
let mut lines = stdin.lines();
loop {
let result = tokio::select! {
_ = ctrl_c.notified() => {
info!("Interrupted, exiting");
_ = tokio::signal::ctrl_c() => {
break
},
res = lines.next_line() => res,
Expand All @@ -65,7 +80,7 @@ pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> {
}
match serde_json::from_str::<Submission>(line) {
Ok(sub) => {
if let Err(e) = codex.submit_with_id(sub).await {
if let Err(e) = conversation.submit_with_id(sub).await {
error!("{e:#}");
break;
}
Expand All @@ -88,8 +103,8 @@ pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> {
let eq_fut = async move {
loop {
let event = tokio::select! {
_ = ctrl_c.notified() => break,
event = codex.next_event() => event,
_ = tokio::signal::ctrl_c() => break,
event = conversation.next_event() => event,
};
match event {
Ok(event) => {
Expand Down
39 changes: 7 additions & 32 deletions codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use futures::prelude::*;
use mcp_types::CallToolResult;
use serde::Serialize;
use serde_json;
use tokio::sync::Notify;
use tokio::sync::oneshot;
use tokio::task::AbortHandle;
use tracing::debug;
Expand Down Expand Up @@ -124,11 +123,7 @@ pub struct CodexSpawnOk {

impl Codex {
/// Spawn a new [`Codex`] and initialize the session.
pub async fn spawn(
config: Config,
auth: Option<CodexAuth>,
ctrl_c: Arc<Notify>,
) -> CodexResult<CodexSpawnOk> {
pub async fn spawn(config: Config, auth: Option<CodexAuth>) -> CodexResult<CodexSpawnOk> {
// experimental resume path (undocumented)
let resume_path = config.experimental_resume.clone();
info!("resume_path: {resume_path:?}");
Expand Down Expand Up @@ -156,9 +151,9 @@ impl Codex {

// Generate a unique ID for the lifetime of this Codex session.
let session_id = Uuid::new_v4();
tokio::spawn(submission_loop(
session_id, config, auth, rx_sub, tx_event, ctrl_c,
));

// This task will run until Op::Shutdown is received.
tokio::spawn(submission_loop(session_id, config, auth, rx_sub, tx_event));
let codex = Codex {
next_id: AtomicU64::new(0),
tx_sub,
Expand Down Expand Up @@ -210,7 +205,6 @@ impl Codex {
pub(crate) struct Session {
client: ModelClient,
pub(crate) tx_event: Sender<Event>,
ctrl_c: Arc<Notify>,

/// The session's current working directory. All relative paths provided by
/// the model as well as sandbox policies are resolved against this path
Expand Down Expand Up @@ -493,7 +487,6 @@ impl Session {
let result = process_exec_tool_call(
exec_args.params,
exec_args.sandbox_type,
exec_args.ctrl_c,
exec_args.sandbox_policy,
exec_args.codex_linux_sandbox_exe,
exec_args.stdout_stream,
Expand Down Expand Up @@ -578,7 +571,7 @@ impl Session {
.await
}

pub fn abort(&self) {
fn abort(&self) {
info!("Aborting existing session");
let mut state = self.state.lock().unwrap();
state.pending_approvals.clear();
Expand Down Expand Up @@ -709,7 +702,6 @@ async fn submission_loop(
auth: Option<CodexAuth>,
rx_sub: Receiver<Submission>,
tx_event: Sender<Event>,
ctrl_c: Arc<Notify>,
) {
let mut sess: Option<Arc<Session>> = None;
// shorthand - send an event when there is no active session
Expand All @@ -724,21 +716,8 @@ async fn submission_loop(
tx_event.send(event).await.ok();
};

loop {
let interrupted = ctrl_c.notified();
let sub = tokio::select! {
res = rx_sub.recv() => match res {
Ok(sub) => sub,
Err(_) => break,
},
_ = interrupted => {
if let Some(sess) = sess.as_ref(){
sess.abort();
}
continue;
},
};

// To break out of this loop, send Op::Shutdown.
while let Ok(sub) = rx_sub.recv().await {
debug!(?sub, "Submission");
match sub.op {
Op::Interrupt => {
Expand Down Expand Up @@ -877,7 +856,6 @@ async fn submission_loop(
config.include_plan_tool,
),
tx_event: tx_event.clone(),
ctrl_c: Arc::clone(&ctrl_c),
user_instructions,
base_instructions,
approval_policy,
Expand Down Expand Up @@ -1787,7 +1765,6 @@ fn parse_container_exec_arguments(
pub struct ExecInvokeArgs<'a> {
pub params: ExecParams,
pub sandbox_type: SandboxType,
pub ctrl_c: Arc<Notify>,
pub sandbox_policy: &'a SandboxPolicy,
pub codex_linux_sandbox_exe: &'a Option<PathBuf>,
pub stdout_stream: Option<StdoutStream>,
Expand Down Expand Up @@ -1972,7 +1949,6 @@ async fn handle_container_exec_with_params(
ExecInvokeArgs {
params: params.clone(),
sandbox_type,
ctrl_c: sess.ctrl_c.clone(),
sandbox_policy: &sess.sandbox_policy,
codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
stdout_stream: Some(StdoutStream {
Expand Down Expand Up @@ -2104,7 +2080,6 @@ async fn handle_sandbox_error(
ExecInvokeArgs {
params,
sandbox_type: SandboxType::None,
ctrl_c: sess.ctrl_c.clone(),
sandbox_policy: &sess.sandbox_policy,
codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
stdout_stream: Some(StdoutStream {
Expand Down
30 changes: 30 additions & 0 deletions codex-rs/core/src/codex_conversation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use crate::codex::Codex;
use crate::error::Result as CodexResult;
use crate::protocol::Event;
use crate::protocol::Op;
use crate::protocol::Submission;

pub struct CodexConversation {
codex: Codex,
}

/// Conduit for the bidirectional stream of messages that compose a conversation
/// in Codex.
impl CodexConversation {
pub(crate) fn new(codex: Codex) -> Self {
Self { codex }
}

pub async fn submit(&self, op: Op) -> CodexResult<String> {
self.codex.submit(op).await
}

/// Use sparingly: this is intended to be removed soon.
pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> {
self.codex.submit_with_id(sub).await
}

pub async fn next_event(&self) -> CodexResult<Event> {
self.codex.next_event().await
}
}
59 changes: 0 additions & 59 deletions codex-rs/core/src/codex_wrapper.rs

This file was deleted.

96 changes: 96 additions & 0 deletions codex-rs/core/src/conversation_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::collections::HashMap;
use std::sync::Arc;

use codex_login::CodexAuth;
use tokio::sync::RwLock;
use uuid::Uuid;

use crate::codex::Codex;
use crate::codex::CodexSpawnOk;
use crate::codex_conversation::CodexConversation;
use crate::config::Config;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::SessionConfiguredEvent;

/// Represents a newly created Codex conversation, including the first event
/// (which is [`EventMsg::SessionConfigured`]).
pub struct NewConversation {
pub conversation_id: Uuid,
pub conversation: Arc<CodexConversation>,
pub session_configured: SessionConfiguredEvent,
}

/// [`ConversationManager`] is responsible for creating conversations and
/// maintaining them in memory.
pub struct ConversationManager {
conversations: Arc<RwLock<HashMap<Uuid, Arc<CodexConversation>>>>,
}

impl Default for ConversationManager {
fn default() -> Self {
Self {
conversations: Arc::new(RwLock::new(HashMap::new())),
}
}
}

impl ConversationManager {
pub async fn new_conversation(&self, config: Config) -> CodexResult<NewConversation> {
let auth = CodexAuth::from_codex_home(&config.codex_home)?;
self.new_conversation_with_auth(config, auth).await
}

/// Used for integration tests: should not be used by ordinary business
/// logic.
pub async fn new_conversation_with_auth(
&self,
config: Config,
auth: Option<CodexAuth>,
) -> CodexResult<NewConversation> {
let CodexSpawnOk {
codex,
init_id,
session_id: conversation_id,
} = Codex::spawn(config, auth).await?;

// The first event must be `SessionInitialized`. Validate and forward it
// to the caller so that they can display it in the conversation
// history.
let event = codex.next_event().await?;
let session_configured = match event {
Event {
id,
msg: EventMsg::SessionConfigured(session_configured),
} if id == init_id => session_configured,
_ => {
return Err(CodexErr::SessionConfiguredNotFirstEvent);
}
};

let conversation = Arc::new(CodexConversation::new(codex));
self.conversations
.write()
.await
.insert(conversation_id, conversation.clone());

Ok(NewConversation {
conversation_id,
conversation,
session_configured,
})
}

pub async fn get_conversation(
&self,
conversation_id: Uuid,
) -> CodexResult<Arc<CodexConversation>> {
let conversations = self.conversations.read().await;
conversations
.get(&conversation_id)
.cloned()
.ok_or_else(|| CodexErr::ConversationNotFound(conversation_id))
}
}
Loading
Loading