Skip to content

Commit 8473096

Browse files
authored
Add app-server transport layer with websocket support (openai#10693)
- Adds --listen <URL> to codex app-server with two listen modes: - stdio:// (default, existing behavior) - ws://IP:PORT (new websocket transport) - Refactors message routing to be connection-aware: - Tracks per-connection session state (initialize/experimental capability) - Routes responses/errors to the originating connection - Broadcasts server notifications/requests to initialized connections - Updates initialization semantics to be per connection (not process-global), and updates app-server docs accordingly. - Adds websocket accept/read/write handling (JSON-RPC per text frame, ping/pong handling, connection lifecycle events). Testing - Unit tests for transport URL parsing and targeted response/error routing. - New websocket integration test validating: - per-connection initialization requirements - no cross-connection response leakage - same request IDs on different connections route independently.
1 parent 428a9f6 commit 8473096

File tree

13 files changed

+1404
-309
lines changed

13 files changed

+1404
-309
lines changed

codex-rs/Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

codex-rs/app-server/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ codex-rmcp-client = { workspace = true }
3333
codex-utils-absolute-path = { workspace = true }
3434
codex-utils-json-to-toml = { workspace = true }
3535
chrono = { workspace = true }
36+
clap = { workspace = true, features = ["derive"] }
37+
futures = { workspace = true }
3638
serde = { workspace = true, features = ["derive"] }
3739
serde_json = { workspace = true }
3840
tempfile = { workspace = true }
@@ -45,6 +47,7 @@ tokio = { workspace = true, features = [
4547
"rt-multi-thread",
4648
"signal",
4749
] }
50+
tokio-tungstenite = { workspace = true }
4851
tracing = { workspace = true, features = ["log"] }
4952
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] }
5053
uuid = { workspace = true, features = ["serde", "v7"] }
@@ -59,12 +62,14 @@ axum = { workspace = true, default-features = false, features = [
5962
base64 = { workspace = true }
6063
codex-execpolicy = { workspace = true }
6164
core_test_support = { workspace = true }
65+
codex-utils-cargo-bin = { workspace = true }
6266
os_info = { workspace = true }
6367
pretty_assertions = { workspace = true }
6468
rmcp = { workspace = true, default-features = false, features = [
6569
"server",
6670
"transport-streamable-http-server",
6771
] }
6872
serial_test = { workspace = true }
73+
tokio-tungstenite = { workspace = true }
6974
wiremock = { workspace = true }
7075
shlex = { workspace = true }

codex-rs/app-server/README.md

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,14 @@
1919

2020
## Protocol
2121

22-
Similar to [MCP](https://modelcontextprotocol.io/), `codex app-server` supports bidirectional communication, streaming JSONL over stdio. The protocol is JSON-RPC 2.0, though the `"jsonrpc":"2.0"` header is omitted.
22+
Similar to [MCP](https://modelcontextprotocol.io/), `codex app-server` supports bidirectional communication using JSON-RPC 2.0 messages (with the `"jsonrpc":"2.0"` header omitted on the wire).
23+
24+
Supported transports:
25+
26+
- stdio (`--listen stdio://`, default): newline-delimited JSON (JSONL)
27+
- websocket (`--listen ws://IP:PORT`): one JSON-RPC message per websocket text frame (**experimental / unsupported**)
28+
29+
Websocket transport is currently experimental and unsupported. Do not rely on it for production workloads.
2330

2431
## Message Schema
2532

@@ -42,15 +49,15 @@ Use the thread APIs to create, list, or archive conversations. Drive a conversat
4249

4350
## Lifecycle Overview
4451

45-
- Initialize once: Immediately after launching the codex app-server process, send an `initialize` request with your client metadata, then emit an `initialized` notification. Any other request before this handshake gets rejected.
52+
- Initialize once per connection: Immediately after opening a transport connection, send an `initialize` request with your client metadata, then emit an `initialized` notification. Any other request on that connection before this handshake gets rejected.
4653
- Start (or resume) a thread: Call `thread/start` to open a fresh conversation. The response returns the thread object and you’ll also get a `thread/started` notification. If you’re continuing an existing conversation, call `thread/resume` with its ID instead. If you want to branch from an existing conversation, call `thread/fork` to create a new thread id with copied history.
4754
- Begin a turn: To send user input, call `turn/start` with the target `threadId` and the user's input. Optional fields let you override model, cwd, sandbox policy, etc. This immediately returns the new turn object and triggers a `turn/started` notification.
4855
- Stream events: After `turn/start`, keep reading JSON-RPC notifications on stdout. You’ll see `item/started`, `item/completed`, deltas like `item/agentMessage/delta`, tool progress, etc. These represent streaming model output plus any side effects (commands, tool calls, reasoning notes).
4956
- Finish the turn: When the model is done (or the turn is interrupted via making the `turn/interrupt` call), the server sends `turn/completed` with the final turn state and token usage.
5057

5158
## Initialization
5259

53-
Clients must send a single `initialize` request before invoking any other method, then acknowledge with an `initialized` notification. The server returns the user agent string it will present to upstream services; subsequent requests issued before initialization receive a `"Not initialized"` error, and repeated `initialize` calls receive an `"Already initialized"` error.
60+
Clients must send a single `initialize` request per transport connection before invoking any other method on that connection, then acknowledge with an `initialized` notification. The server returns the user agent string it will present to upstream services; subsequent requests issued before initialization receive a `"Not initialized"` error, and repeated `initialize` calls on the same connection receive an `"Already initialized"` error.
5461

5562
Applications building on top of `codex app-server` should identify themselves via the `clientInfo` parameter.
5663

codex-rs/app-server/src/bespoke_event_handling.rs

Lines changed: 28 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,7 +1093,7 @@ pub(crate) async fn apply_bespoke_event_handling(
10931093
),
10941094
data: None,
10951095
};
1096-
outgoing.send_error(request_id, error).await;
1096+
outgoing.send_error(request_id.clone(), error).await;
10971097
return;
10981098
}
10991099
}
@@ -1107,7 +1107,7 @@ pub(crate) async fn apply_bespoke_event_handling(
11071107
),
11081108
data: None,
11091109
};
1110-
outgoing.send_error(request_id, error).await;
1110+
outgoing.send_error(request_id.clone(), error).await;
11111111
return;
11121112
}
11131113
};
@@ -1831,6 +1831,7 @@ async fn construct_mcp_tool_call_end_notification(
18311831
mod tests {
18321832
use super::*;
18331833
use crate::CHANNEL_CAPACITY;
1834+
use crate::outgoing_message::OutgoingEnvelope;
18341835
use crate::outgoing_message::OutgoingMessage;
18351836
use crate::outgoing_message::OutgoingMessageSender;
18361837
use anyhow::Result;
@@ -1858,6 +1859,21 @@ mod tests {
18581859
Arc::new(Mutex::new(HashMap::new()))
18591860
}
18601861

1862+
async fn recv_broadcast_message(
1863+
rx: &mut mpsc::Receiver<OutgoingEnvelope>,
1864+
) -> Result<OutgoingMessage> {
1865+
let envelope = rx
1866+
.recv()
1867+
.await
1868+
.ok_or_else(|| anyhow!("should send one message"))?;
1869+
match envelope {
1870+
OutgoingEnvelope::Broadcast { message } => Ok(message),
1871+
OutgoingEnvelope::ToConnection { connection_id, .. } => {
1872+
bail!("unexpected targeted message for connection {connection_id:?}")
1873+
}
1874+
}
1875+
}
1876+
18611877
#[test]
18621878
fn file_change_accept_for_session_maps_to_approved_for_session() {
18631879
let (decision, completion_status) =
@@ -1910,10 +1926,7 @@ mod tests {
19101926
)
19111927
.await;
19121928

1913-
let msg = rx
1914-
.recv()
1915-
.await
1916-
.ok_or_else(|| anyhow!("should send one notification"))?;
1929+
let msg = recv_broadcast_message(&mut rx).await?;
19171930
match msg {
19181931
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
19191932
assert_eq!(n.turn.id, event_turn_id);
@@ -1952,10 +1965,7 @@ mod tests {
19521965
)
19531966
.await;
19541967

1955-
let msg = rx
1956-
.recv()
1957-
.await
1958-
.ok_or_else(|| anyhow!("should send one notification"))?;
1968+
let msg = recv_broadcast_message(&mut rx).await?;
19591969
match msg {
19601970
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
19611971
assert_eq!(n.turn.id, event_turn_id);
@@ -1994,10 +2004,7 @@ mod tests {
19942004
)
19952005
.await;
19962006

1997-
let msg = rx
1998-
.recv()
1999-
.await
2000-
.ok_or_else(|| anyhow!("should send one notification"))?;
2007+
let msg = recv_broadcast_message(&mut rx).await?;
20012008
match msg {
20022009
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
20032010
assert_eq!(n.turn.id, event_turn_id);
@@ -2046,10 +2053,7 @@ mod tests {
20462053
)
20472054
.await;
20482055

2049-
let msg = rx
2050-
.recv()
2051-
.await
2052-
.ok_or_else(|| anyhow!("should send one notification"))?;
2056+
let msg = recv_broadcast_message(&mut rx).await?;
20532057
match msg {
20542058
OutgoingMessage::AppServerNotification(ServerNotification::TurnPlanUpdated(n)) => {
20552059
assert_eq!(n.thread_id, conversation_id.to_string());
@@ -2117,10 +2121,7 @@ mod tests {
21172121
)
21182122
.await;
21192123

2120-
let first = rx
2121-
.recv()
2122-
.await
2123-
.ok_or_else(|| anyhow!("expected usage notification"))?;
2124+
let first = recv_broadcast_message(&mut rx).await?;
21242125
match first {
21252126
OutgoingMessage::AppServerNotification(
21262127
ServerNotification::ThreadTokenUsageUpdated(payload),
@@ -2136,10 +2137,7 @@ mod tests {
21362137
other => bail!("unexpected notification: {other:?}"),
21372138
}
21382139

2139-
let second = rx
2140-
.recv()
2141-
.await
2142-
.ok_or_else(|| anyhow!("expected rate limit notification"))?;
2140+
let second = recv_broadcast_message(&mut rx).await?;
21432141
match second {
21442142
OutgoingMessage::AppServerNotification(
21452143
ServerNotification::AccountRateLimitsUpdated(payload),
@@ -2276,10 +2274,7 @@ mod tests {
22762274
.await;
22772275

22782276
// Verify: A turn 1
2279-
let msg = rx
2280-
.recv()
2281-
.await
2282-
.ok_or_else(|| anyhow!("should send first notification"))?;
2277+
let msg = recv_broadcast_message(&mut rx).await?;
22832278
match msg {
22842279
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
22852280
assert_eq!(n.turn.id, a_turn1);
@@ -2297,10 +2292,7 @@ mod tests {
22972292
}
22982293

22992294
// Verify: B turn 1
2300-
let msg = rx
2301-
.recv()
2302-
.await
2303-
.ok_or_else(|| anyhow!("should send second notification"))?;
2295+
let msg = recv_broadcast_message(&mut rx).await?;
23042296
match msg {
23052297
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
23062298
assert_eq!(n.turn.id, b_turn1);
@@ -2318,10 +2310,7 @@ mod tests {
23182310
}
23192311

23202312
// Verify: A turn 2
2321-
let msg = rx
2322-
.recv()
2323-
.await
2324-
.ok_or_else(|| anyhow!("should send third notification"))?;
2313+
let msg = recv_broadcast_message(&mut rx).await?;
23252314
match msg {
23262315
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
23272316
assert_eq!(n.turn.id, a_turn2);
@@ -2487,10 +2476,7 @@ mod tests {
24872476
)
24882477
.await;
24892478

2490-
let msg = rx
2491-
.recv()
2492-
.await
2493-
.ok_or_else(|| anyhow!("should send one notification"))?;
2479+
let msg = recv_broadcast_message(&mut rx).await?;
24942480
match msg {
24952481
OutgoingMessage::AppServerNotification(ServerNotification::TurnDiffUpdated(
24962482
notification,

0 commit comments

Comments
 (0)