diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index eca77d2bfa2..a7853d5f5ec 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -653,6 +653,21 @@ } ] }, + "CommandExecutionApprovalResolvedNotification": { + "properties": { + "requestId": { + "$ref": "#/definitions/RequestId" + }, + "threadId": { + "type": "string" + } + }, + "required": [ + "requestId", + "threadId" + ], + "type": "object" + }, "CommandExecutionOutputDeltaNotification": { "properties": { "delta": { @@ -853,6 +868,21 @@ ], "type": "object" }, + "FileChangeApprovalResolvedNotification": { + "properties": { + "requestId": { + "$ref": "#/definitions/RequestId" + }, + "threadId": { + "type": "string" + } + }, + "required": [ + "requestId", + "threadId" + ], + "type": "object" + }, "FileChangeOutputDeltaNotification": { "properties": { "delta": { @@ -1422,6 +1452,17 @@ ], "type": "object" }, + "RequestId": { + "anyOf": [ + { + "type": "string" + }, + { + "format": "int64", + "type": "integer" + } + ] + }, "SessionSource": { "oneOf": [ { @@ -2592,6 +2633,21 @@ ], "type": "object" }, + "ToolRequestUserInputResolvedNotification": { + "properties": { + "requestId": { + "$ref": "#/definitions/RequestId" + }, + "threadId": { + "type": "string" + } + }, + "required": [ + "requestId", + "threadId" + ], + "type": "object" + }, "Turn": { "properties": { "error": { @@ -3402,6 +3458,26 @@ "title": "Item/commandExecution/terminalInteractionNotification", "type": "object" }, + { + "properties": { + "method": { + "enum": [ + "item/commandExecution/approvalResolved" + ], + "title": "Item/commandExecution/approvalResolvedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/CommandExecutionApprovalResolvedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Item/commandExecution/approvalResolvedNotification", + "type": "object" + }, { "properties": { "method": { @@ -3422,6 +3498,46 @@ "title": "Item/fileChange/outputDeltaNotification", "type": "object" }, + { + "properties": { + "method": { + "enum": [ + "item/fileChange/approvalResolved" + ], + "title": "Item/fileChange/approvalResolvedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/FileChangeApprovalResolvedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Item/fileChange/approvalResolvedNotification", + "type": "object" + }, + { + "properties": { + "method": { + "enum": [ + "item/tool/requestUserInputResolved" + ], + "title": "Item/tool/requestUserInputResolvedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/ToolRequestUserInputResolvedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Item/tool/requestUserInputResolvedNotification", + "type": "object" + }, { "properties": { "method": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 62850442fc3..d6daf54eeb9 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -357,7 +357,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -382,7 +382,7 @@ "description": "NEW APIs", "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -406,7 +406,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -430,7 +430,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -454,7 +454,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -478,7 +478,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -502,7 +502,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -526,7 +526,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -550,7 +550,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -574,7 +574,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -598,7 +598,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -622,7 +622,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -646,7 +646,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -670,7 +670,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -694,7 +694,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -718,7 +718,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -742,7 +742,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -766,7 +766,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -790,7 +790,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -814,7 +814,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -838,7 +838,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -862,7 +862,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -886,7 +886,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -910,7 +910,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -934,7 +934,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -958,7 +958,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -981,7 +981,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -1005,7 +1005,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -1029,7 +1029,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -1053,7 +1053,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -1077,7 +1077,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -1100,7 +1100,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -1123,7 +1123,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -1148,7 +1148,7 @@ "description": "Execute a command (argv vector) under the server's sandbox.", "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -1172,7 +1172,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -1196,7 +1196,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -1220,7 +1220,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -1244,7 +1244,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -1268,7 +1268,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -1292,7 +1292,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -1315,7 +1315,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -1339,7 +1339,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -3065,7 +3065,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "message": { "type": "string" @@ -4943,7 +4943,7 @@ "$ref": "#/definitions/JSONRPCErrorError" }, "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" } }, "required": [ @@ -5011,7 +5011,7 @@ "description": "A request that expects a response.", "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "type": "string" @@ -5030,7 +5030,7 @@ "description": "A successful (non-error) response to a request.", "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "result": true }, @@ -5544,6 +5544,7 @@ "type": "object" }, "RequestId": { + "$schema": "http://json-schema.org/draft-07/schema#", "anyOf": [ { "type": "string" @@ -5553,7 +5554,7 @@ "type": "integer" } ], - "description": "ID of a request, which can be either a string or an integer." + "title": "RequestId" }, "RequestUserInputQuestion": { "properties": { @@ -6174,6 +6175,26 @@ "title": "Item/commandExecution/terminalInteractionNotification", "type": "object" }, + { + "properties": { + "method": { + "enum": [ + "item/commandExecution/approvalResolved" + ], + "title": "Item/commandExecution/approvalResolvedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/v2/CommandExecutionApprovalResolvedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Item/commandExecution/approvalResolvedNotification", + "type": "object" + }, { "properties": { "method": { @@ -6194,6 +6215,46 @@ "title": "Item/fileChange/outputDeltaNotification", "type": "object" }, + { + "properties": { + "method": { + "enum": [ + "item/fileChange/approvalResolved" + ], + "title": "Item/fileChange/approvalResolvedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/v2/FileChangeApprovalResolvedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Item/fileChange/approvalResolvedNotification", + "type": "object" + }, + { + "properties": { + "method": { + "enum": [ + "item/tool/requestUserInputResolved" + ], + "title": "Item/tool/requestUserInputResolvedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/v2/ToolRequestUserInputResolvedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Item/tool/requestUserInputResolvedNotification", + "type": "object" + }, { "properties": { "method": { @@ -6647,7 +6708,7 @@ "description": "NEW APIs Sent when approval is requested for a specific command execution. This request is used for Turns started via turn/start.", "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -6672,7 +6733,7 @@ "description": "Sent when approval is requested for a specific file change. This request is used for Turns started via turn/start.", "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -6697,7 +6758,7 @@ "description": "EXPERIMENTAL - Request input from the user for a tool call.", "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -6722,7 +6783,7 @@ "description": "Execute a dynamic tool call on the client.", "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -6746,7 +6807,7 @@ { "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -6771,7 +6832,7 @@ "description": "DEPRECATED APIs below Request to approve a patch. This request is used for Turns started via the legacy APIs (i.e. SendUserTurn, SendUserMessage).", "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -6796,7 +6857,7 @@ "description": "Request to exec a command. This request is used for Turns started via the legacy APIs (i.e. SendUserTurn, SendUserMessage).", "properties": { "id": { - "$ref": "#/definitions/RequestId" + "$ref": "#/definitions/v2/RequestId" }, "method": { "enum": [ @@ -8290,6 +8351,23 @@ "title": "CommandExecResponse", "type": "object" }, + "CommandExecutionApprovalResolvedNotification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "requestId": { + "$ref": "#/definitions/v2/RequestId" + }, + "threadId": { + "type": "string" + } + }, + "required": [ + "requestId", + "threadId" + ], + "title": "CommandExecutionApprovalResolvedNotification", + "type": "object" + }, "CommandExecutionOutputDeltaNotification": { "$schema": "http://json-schema.org/draft-07/schema#", "properties": { @@ -9448,6 +9526,23 @@ "title": "FeedbackUploadResponse", "type": "object" }, + "FileChangeApprovalResolvedNotification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "requestId": { + "$ref": "#/definitions/v2/RequestId" + }, + "threadId": { + "type": "string" + } + }, + "required": [ + "requestId", + "threadId" + ], + "title": "FileChangeApprovalResolvedNotification", + "type": "object" + }, "FileChangeOutputDeltaNotification": { "$schema": "http://json-schema.org/draft-07/schema#", "properties": { @@ -11050,6 +11145,17 @@ ], "type": "object" }, + "RequestId": { + "anyOf": [ + { + "type": "string" + }, + { + "format": "int64", + "type": "integer" + } + ] + }, "ResidencyRequirement": { "enum": [ "us" @@ -14135,6 +14241,23 @@ ], "type": "object" }, + "ToolRequestUserInputResolvedNotification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "requestId": { + "$ref": "#/definitions/v2/RequestId" + }, + "threadId": { + "type": "string" + } + }, + "required": [ + "requestId", + "threadId" + ], + "title": "ToolRequestUserInputResolvedNotification", + "type": "object" + }, "ToolsV2": { "properties": { "view_image": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/CommandExecutionApprovalResolvedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/CommandExecutionApprovalResolvedNotification.json new file mode 100644 index 00000000000..14977ddab66 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/json/v2/CommandExecutionApprovalResolvedNotification.json @@ -0,0 +1,30 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "definitions": { + "RequestId": { + "anyOf": [ + { + "type": "string" + }, + { + "format": "int64", + "type": "integer" + } + ] + } + }, + "properties": { + "requestId": { + "$ref": "#/definitions/RequestId" + }, + "threadId": { + "type": "string" + } + }, + "required": [ + "requestId", + "threadId" + ], + "title": "CommandExecutionApprovalResolvedNotification", + "type": "object" +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/json/v2/FileChangeApprovalResolvedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/FileChangeApprovalResolvedNotification.json new file mode 100644 index 00000000000..a19bd9b8748 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/json/v2/FileChangeApprovalResolvedNotification.json @@ -0,0 +1,30 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "definitions": { + "RequestId": { + "anyOf": [ + { + "type": "string" + }, + { + "format": "int64", + "type": "integer" + } + ] + } + }, + "properties": { + "requestId": { + "$ref": "#/definitions/RequestId" + }, + "threadId": { + "type": "string" + } + }, + "required": [ + "requestId", + "threadId" + ], + "title": "FileChangeApprovalResolvedNotification", + "type": "object" +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/json/v2/ToolRequestUserInputResolvedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/ToolRequestUserInputResolvedNotification.json new file mode 100644 index 00000000000..854f0baabf9 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/json/v2/ToolRequestUserInputResolvedNotification.json @@ -0,0 +1,30 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "definitions": { + "RequestId": { + "anyOf": [ + { + "type": "string" + }, + { + "format": "int64", + "type": "integer" + } + ] + } + }, + "properties": { + "requestId": { + "$ref": "#/definitions/RequestId" + }, + "threadId": { + "type": "string" + } + }, + "required": [ + "requestId", + "threadId" + ], + "title": "ToolRequestUserInputResolvedNotification", + "type": "object" +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts b/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts index 9b082158c52..425771c6030 100644 --- a/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts +++ b/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts @@ -11,11 +11,13 @@ import type { AccountRateLimitsUpdatedNotification } from "./v2/AccountRateLimit import type { AccountUpdatedNotification } from "./v2/AccountUpdatedNotification"; import type { AgentMessageDeltaNotification } from "./v2/AgentMessageDeltaNotification"; import type { AppListUpdatedNotification } from "./v2/AppListUpdatedNotification"; +import type { CommandExecutionApprovalResolvedNotification } from "./v2/CommandExecutionApprovalResolvedNotification"; import type { CommandExecutionOutputDeltaNotification } from "./v2/CommandExecutionOutputDeltaNotification"; import type { ConfigWarningNotification } from "./v2/ConfigWarningNotification"; import type { ContextCompactedNotification } from "./v2/ContextCompactedNotification"; import type { DeprecationNoticeNotification } from "./v2/DeprecationNoticeNotification"; import type { ErrorNotification } from "./v2/ErrorNotification"; +import type { FileChangeApprovalResolvedNotification } from "./v2/FileChangeApprovalResolvedNotification"; import type { FileChangeOutputDeltaNotification } from "./v2/FileChangeOutputDeltaNotification"; import type { ItemCompletedNotification } from "./v2/ItemCompletedNotification"; import type { ItemStartedNotification } from "./v2/ItemStartedNotification"; @@ -40,6 +42,7 @@ import type { ThreadStartedNotification } from "./v2/ThreadStartedNotification"; import type { ThreadStatusChangedNotification } from "./v2/ThreadStatusChangedNotification"; import type { ThreadTokenUsageUpdatedNotification } from "./v2/ThreadTokenUsageUpdatedNotification"; import type { ThreadUnarchivedNotification } from "./v2/ThreadUnarchivedNotification"; +import type { ToolRequestUserInputResolvedNotification } from "./v2/ToolRequestUserInputResolvedNotification"; import type { TurnCompletedNotification } from "./v2/TurnCompletedNotification"; import type { TurnDiffUpdatedNotification } from "./v2/TurnDiffUpdatedNotification"; import type { TurnPlanUpdatedNotification } from "./v2/TurnPlanUpdatedNotification"; @@ -50,4 +53,4 @@ import type { WindowsWorldWritableWarningNotification } from "./v2/WindowsWorldW /** * Notification sent from the server to the client. */ -export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/status/changed", "params": ThreadStatusChangedNotification } | { "method": "thread/archived", "params": ThreadArchivedNotification } | { "method": "thread/unarchived", "params": ThreadUnarchivedNotification } | { "method": "thread/closed", "params": ThreadClosedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "model/rerouted", "params": ModelReroutedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "fuzzyFileSearch/sessionCompleted", "params": FuzzyFileSearchSessionCompletedNotification } | { "method": "thread/realtime/started", "params": ThreadRealtimeStartedNotification } | { "method": "thread/realtime/itemAdded", "params": ThreadRealtimeItemAddedNotification } | { "method": "thread/realtime/outputAudio/delta", "params": ThreadRealtimeOutputAudioDeltaNotification } | { "method": "thread/realtime/error", "params": ThreadRealtimeErrorNotification } | { "method": "thread/realtime/closed", "params": ThreadRealtimeClosedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "windowsSandbox/setupCompleted", "params": WindowsSandboxSetupCompletedNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification }; +export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/status/changed", "params": ThreadStatusChangedNotification } | { "method": "thread/archived", "params": ThreadArchivedNotification } | { "method": "thread/unarchived", "params": ThreadUnarchivedNotification } | { "method": "thread/closed", "params": ThreadClosedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/commandExecution/approvalResolved", "params": CommandExecutionApprovalResolvedNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/fileChange/approvalResolved", "params": FileChangeApprovalResolvedNotification } | { "method": "item/tool/requestUserInputResolved", "params": ToolRequestUserInputResolvedNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "model/rerouted", "params": ModelReroutedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "fuzzyFileSearch/sessionCompleted", "params": FuzzyFileSearchSessionCompletedNotification } | { "method": "thread/realtime/started", "params": ThreadRealtimeStartedNotification } | { "method": "thread/realtime/itemAdded", "params": ThreadRealtimeItemAddedNotification } | { "method": "thread/realtime/outputAudio/delta", "params": ThreadRealtimeOutputAudioDeltaNotification } | { "method": "thread/realtime/error", "params": ThreadRealtimeErrorNotification } | { "method": "thread/realtime/closed", "params": ThreadRealtimeClosedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "windowsSandbox/setupCompleted", "params": WindowsSandboxSetupCompletedNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/CommandExecutionApprovalResolvedNotification.ts b/codex-rs/app-server-protocol/schema/typescript/v2/CommandExecutionApprovalResolvedNotification.ts new file mode 100644 index 00000000000..3288e5baadb --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/CommandExecutionApprovalResolvedNotification.ts @@ -0,0 +1,6 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { RequestId } from "../RequestId"; + +export type CommandExecutionApprovalResolvedNotification = { threadId: string, requestId: RequestId, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/FileChangeApprovalResolvedNotification.ts b/codex-rs/app-server-protocol/schema/typescript/v2/FileChangeApprovalResolvedNotification.ts new file mode 100644 index 00000000000..7693a2018dd --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/FileChangeApprovalResolvedNotification.ts @@ -0,0 +1,6 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { RequestId } from "../RequestId"; + +export type FileChangeApprovalResolvedNotification = { threadId: string, requestId: RequestId, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ToolRequestUserInputResolvedNotification.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ToolRequestUserInputResolvedNotification.ts new file mode 100644 index 00000000000..a6c419628c2 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ToolRequestUserInputResolvedNotification.ts @@ -0,0 +1,6 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { RequestId } from "../RequestId"; + +export type ToolRequestUserInputResolvedNotification = { threadId: string, requestId: RequestId, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts index 4c638c7d30d..9e753beda24 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts @@ -39,6 +39,7 @@ export type { CommandAction } from "./CommandAction"; export type { CommandExecParams } from "./CommandExecParams"; export type { CommandExecResponse } from "./CommandExecResponse"; export type { CommandExecutionApprovalDecision } from "./CommandExecutionApprovalDecision"; +export type { CommandExecutionApprovalResolvedNotification } from "./CommandExecutionApprovalResolvedNotification"; export type { CommandExecutionOutputDeltaNotification } from "./CommandExecutionOutputDeltaNotification"; export type { CommandExecutionRequestApprovalParams } from "./CommandExecutionRequestApprovalParams"; export type { CommandExecutionRequestApprovalResponse } from "./CommandExecutionRequestApprovalResponse"; @@ -79,6 +80,7 @@ export type { ExternalAgentConfigMigrationItemType } from "./ExternalAgentConfig export type { FeedbackUploadParams } from "./FeedbackUploadParams"; export type { FeedbackUploadResponse } from "./FeedbackUploadResponse"; export type { FileChangeApprovalDecision } from "./FileChangeApprovalDecision"; +export type { FileChangeApprovalResolvedNotification } from "./FileChangeApprovalResolvedNotification"; export type { FileChangeOutputDeltaNotification } from "./FileChangeOutputDeltaNotification"; export type { FileChangeRequestApprovalParams } from "./FileChangeRequestApprovalParams"; export type { FileChangeRequestApprovalResponse } from "./FileChangeRequestApprovalResponse"; @@ -211,6 +213,7 @@ export type { ToolRequestUserInputAnswer } from "./ToolRequestUserInputAnswer"; export type { ToolRequestUserInputOption } from "./ToolRequestUserInputOption"; export type { ToolRequestUserInputParams } from "./ToolRequestUserInputParams"; export type { ToolRequestUserInputQuestion } from "./ToolRequestUserInputQuestion"; +export type { ToolRequestUserInputResolvedNotification } from "./ToolRequestUserInputResolvedNotification"; export type { ToolRequestUserInputResponse } from "./ToolRequestUserInputResponse"; export type { ToolsV2 } from "./ToolsV2"; export type { Turn } from "./Turn"; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 3af29761062..669b8fe877a 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -548,6 +548,14 @@ macro_rules! server_request_definitions { )* } + impl ServerRequest { + pub fn id(&self) -> &RequestId { + match self { + $(Self::$variant { request_id, .. } => request_id,)* + } + } + } + #[derive(Debug, Clone, PartialEq, JsonSchema)] #[allow(clippy::large_enum_variant)] pub enum ServerRequestPayload { @@ -837,7 +845,10 @@ server_notification_definitions! { PlanDelta => "item/plan/delta" (v2::PlanDeltaNotification), CommandExecutionOutputDelta => "item/commandExecution/outputDelta" (v2::CommandExecutionOutputDeltaNotification), TerminalInteraction => "item/commandExecution/terminalInteraction" (v2::TerminalInteractionNotification), + CommandExecutionApprovalResolved => "item/commandExecution/approvalResolved" (v2::CommandExecutionApprovalResolvedNotification), FileChangeOutputDelta => "item/fileChange/outputDelta" (v2::FileChangeOutputDeltaNotification), + FileChangeApprovalResolved => "item/fileChange/approvalResolved" (v2::FileChangeApprovalResolvedNotification), + ToolRequestUserInputResolved => "item/tool/requestUserInputResolved" (v2::ToolRequestUserInputResolvedNotification), McpToolCallProgress => "item/mcpToolCall/progress" (v2::McpToolCallProgressNotification), McpServerOauthLoginCompleted => "mcpServer/oauthLogin/completed" (v2::McpServerOauthLoginCompletedNotification), AccountUpdated => "account/updated" (v2::AccountUpdatedNotification), @@ -1101,6 +1112,7 @@ mod tests { ); let payload = ServerRequestPayload::ExecCommandApproval(params); + assert_eq!(request.id(), &RequestId::Integer(7)); assert_eq!(payload.request_with_id(RequestId::Integer(7)), request); Ok(()) } diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index f7c4eec7a7e..17e552eb5db 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::path::PathBuf; +use crate::RequestId; use crate::protocol::common::AuthMode; use codex_experimental_api_macros::ExperimentalApi; use codex_protocol::account::PlanType; @@ -3717,6 +3718,30 @@ pub struct FileChangeOutputDeltaNotification { pub delta: String, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct CommandExecutionApprovalResolvedNotification { + pub thread_id: String, + pub request_id: RequestId, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct FileChangeApprovalResolvedNotification { + pub thread_id: String, + pub request_id: RequestId, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ToolRequestUserInputResolvedNotification { + pub thread_id: String, + pub request_id: RequestId, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 20b2dcf2c7b..196c75d62b7 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -712,7 +712,8 @@ Order of messages: 1. `item/started` — shows the pending `commandExecution` item with `command`, `cwd`, and other fields so you can render the proposed action. 2. `item/commandExecution/requestApproval` (request) — carries the same `itemId`, `threadId`, `turnId`, optionally `approvalId` (for subcommand callbacks), and `reason`. For normal command approvals, it also includes `command`, `cwd`, and `commandActions` for friendly display. When `initialize.params.capabilities.experimentalApi = true`, it may also include experimental `additionalPermissions` describing requested per-command sandbox access. For network-only approvals, those command fields may be omitted and `networkApprovalContext` is provided instead. Optional persistence hints may also be included via `proposedExecpolicyAmendment` and `proposedNetworkPolicyAmendments`. Clients can prefer `availableDecisions` when present to render the exact set of choices the server wants to expose, while still falling back to the older heuristics if it is omitted. 3. Client response — for example `{ "decision": "accept" }`, `{ "decision": "acceptForSession" }`, `{ "decision": { "acceptWithExecpolicyAmendment": { "execpolicy_amendment": [...] } } }`, `{ "decision": { "applyNetworkPolicyAmendment": { "network_policy_amendment": { "host": "example.com", "action": "allow" } } } }`, `{ "decision": "decline" }`, or `{ "decision": "cancel" }`. -4. `item/completed` — final `commandExecution` item with `status: "completed" | "failed" | "declined"` and execution output. Render this as the authoritative result. +4. `item/commandExecution/approvalResolved` — `{ threadId, requestId }` confirms the pending approval request has been resolved or cleared, including lifecycle cleanup on turn start/complete/interrupt. +5. `item/completed` — final `commandExecution` item with `status: "completed" | "failed" | "declined"` and execution output. Render this as the authoritative result. ### File change approvals @@ -721,10 +722,15 @@ Order of messages: 1. `item/started` — emits a `fileChange` item with `changes` (diff chunk summaries) and `status: "inProgress"`. Show the proposed edits and paths to the user. 2. `item/fileChange/requestApproval` (request) — includes `itemId`, `threadId`, `turnId`, and an optional `reason`. 3. Client response — `{ "decision": "accept" }` or `{ "decision": "decline" }`. -4. `item/completed` — returns the same `fileChange` item with `status` updated to `completed`, `failed`, or `declined` after the patch attempt. Rely on this to show success/failure and finalize the diff state in your UI. +4. `item/fileChange/approvalResolved` — `{ threadId, requestId }` confirms the pending approval request has been resolved or cleared, including lifecycle cleanup on turn start/complete/interrupt. +5. `item/completed` — returns the same `fileChange` item with `status` updated to `completed`, `failed`, or `declined` after the patch attempt. Rely on this to show success/failure and finalize the diff state in your UI. UI guidance for IDEs: surface an approval dialog as soon as the request arrives. The turn will proceed after the server receives a response to the approval request. The terminal `item/completed` notification will be sent with the appropriate status. +### request_user_input + +When the client responds to `item/tool/requestUserInput`, the server emits `item/tool/requestUserInputResolved` with `{ threadId, requestId }`. If the pending request is cleared by turn start, turn completion, or turn interruption before the client answers, the server emits the same notification for that cleanup. + ### Dynamic tool calls (experimental) `dynamicTools` on `thread/start` and the corresponding `item/tool/call` request/response flow are experimental APIs. To enable them, set `initialize.params.capabilities.experimentalApi = true`. diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 0bc2f1118f2..1f95175e138 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -1,4 +1,5 @@ use crate::codex_message_processor::ApiVersion; +use crate::codex_message_processor::abort_pending_client_requests; use crate::codex_message_processor::read_rollout_items_from_rollout; use crate::codex_message_processor::read_summary_from_rollout; use crate::codex_message_processor::summary_to_thread; @@ -6,6 +7,8 @@ use crate::error_code::INTERNAL_ERROR_CODE; use crate::error_code::INVALID_REQUEST_ERROR_CODE; use crate::outgoing_message::ClientRequestResult; use crate::outgoing_message::ThreadScopedOutgoingMessageSender; +use crate::thread_state::ServerRequestType; +use crate::thread_state::ThreadListenerCommand; use crate::thread_state::ThreadState; use crate::thread_state::TurnSummary; use crate::thread_status::ThreadWatchActiveGuard; @@ -56,6 +59,7 @@ use codex_app_server_protocol::RawResponseItemCompletedNotification; use codex_app_server_protocol::ReasoningSummaryPartAddedNotification; use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification; use codex_app_server_protocol::ReasoningTextDeltaNotification; +use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequestPayload; use codex_app_server_protocol::TerminalInteractionNotification; @@ -132,6 +136,51 @@ struct CommandExecutionCompletionItem { command_actions: Vec, } +const TURN_TRANSITION_PENDING_REQUEST_ERROR_REASON: &str = "turnTransition"; + +fn is_turn_transition_tracked_request_error(error: &JSONRPCErrorError) -> bool { + error + .data + .as_ref() + .and_then(|data| data.get("reason")) + .and_then(serde_json::Value::as_str) + == Some(TURN_TRANSITION_PENDING_REQUEST_ERROR_REASON) +} + +async fn queue_tracked_client_request_removal( + thread_state: &Arc>, + request_type: ServerRequestType, + request_id: RequestId, +) { + let (completion_tx, completion_rx) = oneshot::channel(); + let listener_command_tx = { + let state = thread_state.lock().await; + state.listener_command_tx() + }; + let Some(listener_command_tx) = listener_command_tx else { + error!("failed to remove tracked client request: thread listener is not running"); + return; + }; + + if listener_command_tx + .send(ThreadListenerCommand::ResolveServerRequest { + request_type, + request_id, + completion_tx, + }) + .is_err() + { + error!( + "failed to remove tracked client request: thread listener command channel is closed" + ); + return; + } + + if let Err(err) = completion_rx.await { + error!("failed to remove tracked client request: {err}"); + } +} + #[allow(clippy::too_many_arguments)] pub(crate) async fn apply_bespoke_event_handling( event: Event, @@ -151,11 +200,13 @@ pub(crate) async fn apply_bespoke_event_handling( } = event; match msg { EventMsg::TurnStarted(_) => { + abort_pending_client_requests(&outgoing).await; thread_watch_manager .note_turn_started(&conversation_id.to_string()) .await; } EventMsg::TurnComplete(_ev) => { + abort_pending_client_requests(&outgoing).await; let turn_failed = thread_state.lock().await.turn_summary.last_error.is_some(); thread_watch_manager .note_turn_completed(&conversation_id.to_string(), turn_failed) @@ -282,7 +333,8 @@ pub(crate) async fn apply_bespoke_event_handling( state .turn_summary .file_change_started - .insert(item_id.clone()) + .insert(item_id.clone(), patch_changes.clone()) + .is_none() }; if first_start { let item = ThreadItem::FileChange { @@ -307,15 +359,19 @@ pub(crate) async fn apply_bespoke_event_handling( reason, grant_root, }; - let rx = outgoing - .send_request(ServerRequestPayload::FileChangeRequestApproval(params)) + let (request, rx) = outgoing + .send_request_with_server_request( + ServerRequestPayload::FileChangeRequestApproval(params), + ) .await; + let pending_request_id = request.id().clone(); tokio::spawn(async move { on_file_change_request_approval_response( event_turn_id, conversation_id, item_id, patch_changes, + pending_request_id, rx, conversation, outgoing, @@ -435,11 +491,12 @@ pub(crate) async fn apply_bespoke_event_handling( proposed_network_policy_amendments: proposed_network_policy_amendments_v2, available_decisions: Some(available_decisions), }; - let rx = outgoing - .send_request(ServerRequestPayload::CommandExecutionRequestApproval( - params, - )) + let (request, rx) = outgoing + .send_request_with_server_request( + ServerRequestPayload::CommandExecutionRequestApproval(params), + ) .await; + let pending_request_id = request.id().clone(); tokio::spawn(async move { on_command_execution_request_approval_response( event_turn_id, @@ -447,6 +504,7 @@ pub(crate) async fn apply_bespoke_event_handling( approval_id, call_id, completion_item, + pending_request_id, rx, conversation, outgoing, @@ -489,14 +547,19 @@ pub(crate) async fn apply_bespoke_event_handling( item_id: request.call_id, questions, }; - let rx = outgoing - .send_request(ServerRequestPayload::ToolRequestUserInput(params)) + let (request, rx) = outgoing + .send_request_with_server_request(ServerRequestPayload::ToolRequestUserInput( + params, + )) .await; + let pending_request_id = request.id().clone(); tokio::spawn(async move { on_request_user_input_response( event_turn_id, + pending_request_id, rx, conversation, + thread_state, user_input_guard, ) .await; @@ -1136,18 +1199,20 @@ pub(crate) async fn apply_bespoke_event_handling( // Until we migrate the core to be aware of a first class FileChangeItem // and emit the corresponding EventMsg, we repurpose the call_id as the item_id. let item_id = patch_begin_event.call_id.clone(); + let changes = convert_patch_changes(&patch_begin_event.changes); let first_start = { let mut state = thread_state.lock().await; state .turn_summary .file_change_started - .insert(item_id.clone()) + .insert(item_id.clone(), changes.clone()) + .is_none() }; if first_start { let item = ThreadItem::FileChange { id: item_id.clone(), - changes: convert_patch_changes(&patch_begin_event.changes), + changes, status: PatchApplyStatus::InProgress, }; let notification = ItemStartedNotification { @@ -1228,7 +1293,10 @@ pub(crate) async fn apply_bespoke_event_handling( // We already have state tracking FileChange items on item/started, so let's use that. let is_file_change = { let state = thread_state.lock().await; - state.turn_summary.file_change_started.contains(&item_id) + state + .turn_summary + .file_change_started + .contains_key(&item_id) }; if is_file_change { let notification = FileChangeOutputDeltaNotification { @@ -1329,6 +1397,7 @@ pub(crate) async fn apply_bespoke_event_handling( } // If this is a TurnAborted, reply to any pending interrupt requests. EventMsg::TurnAborted(turn_aborted_event) => { + abort_pending_client_requests(&outgoing).await; let pending = { let mut state = thread_state.lock().await; std::mem::take(&mut state.pending_interrupts) @@ -1816,14 +1885,23 @@ async fn on_exec_approval_response( async fn on_request_user_input_response( event_turn_id: String, + pending_request_id: RequestId, receiver: oneshot::Receiver, conversation: Arc, + thread_state: Arc>, user_input_guard: ThreadWatchActiveGuard, ) { let response = receiver.await; + queue_tracked_client_request_removal( + &thread_state, + ServerRequestType::ToolRequestUserInput, + pending_request_id, + ) + .await; drop(user_input_guard); let value = match response { Ok(Ok(value)) => value, + Ok(Err(err)) if is_turn_transition_tracked_request_error(&err) => return, Ok(Err(err)) => { error!("request failed with client error: {err:?}"); let empty = CoreRequestUserInputResponse { @@ -1934,6 +2012,7 @@ async fn on_file_change_request_approval_response( conversation_id: ThreadId, item_id: String, changes: Vec, + pending_request_id: RequestId, receiver: oneshot::Receiver, codex: Arc, outgoing: ThreadScopedOutgoingMessageSender, @@ -1941,6 +2020,12 @@ async fn on_file_change_request_approval_response( permission_guard: ThreadWatchActiveGuard, ) { let response = receiver.await; + queue_tracked_client_request_removal( + &thread_state, + ServerRequestType::FileChangeRequestApproval, + pending_request_id, + ) + .await; drop(permission_guard); let (decision, completion_status) = match response { Ok(Ok(value)) => { @@ -1958,6 +2043,7 @@ async fn on_file_change_request_approval_response( // Only short-circuit on declines/cancels/failures. (decision, completion_status) } + Ok(Err(err)) if is_turn_transition_tracked_request_error(&err) => return, Ok(Err(err)) => { error!("request failed with client error: {err:?}"); (ReviewDecision::Denied, Some(PatchApplyStatus::Failed)) @@ -1999,6 +2085,7 @@ async fn on_command_execution_request_approval_response( approval_id: Option, item_id: String, completion_item: Option, + pending_request_id: RequestId, receiver: oneshot::Receiver, conversation: Arc, outgoing: ThreadScopedOutgoingMessageSender, @@ -2006,6 +2093,12 @@ async fn on_command_execution_request_approval_response( permission_guard: ThreadWatchActiveGuard, ) { let response = receiver.await; + queue_tracked_client_request_removal( + &thread_state, + ServerRequestType::CommandExecutionRequestApproval, + pending_request_id, + ) + .await; drop(permission_guard); let (decision, completion_status) = match response { Ok(Ok(value)) => { @@ -2057,6 +2150,7 @@ async fn on_command_execution_request_approval_response( }; (decision, completion_status) } + Ok(Err(err)) if is_turn_transition_tracked_request_error(&err) => return, Ok(Err(err)) => { error!("request failed with client error: {err:?}"); (ReviewDecision::Denied, Some(CommandExecutionStatus::Failed)) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 0d3b98f03cd..db1f7684679 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -12,6 +12,7 @@ use crate::outgoing_message::ConnectionRequestId; use crate::outgoing_message::OutgoingMessageSender; use crate::outgoing_message::OutgoingNotification; use crate::outgoing_message::ThreadScopedOutgoingMessageSender; +use crate::thread_state::ServerRequestType; use crate::thread_status::ThreadWatchManager; use crate::thread_status::resolve_thread_status; use chrono::DateTime; @@ -39,6 +40,7 @@ use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::CollaborationModeListParams; use codex_app_server_protocol::CollaborationModeListResponse; use codex_app_server_protocol::CommandExecParams; +use codex_app_server_protocol::CommandExecutionApprovalResolvedNotification; use codex_app_server_protocol::ConversationGitInfo; use codex_app_server_protocol::ConversationSummary; use codex_app_server_protocol::DynamicToolSpec as ApiDynamicToolSpec; @@ -49,6 +51,8 @@ use codex_app_server_protocol::ExperimentalFeatureListResponse; use codex_app_server_protocol::ExperimentalFeatureStage as ApiExperimentalFeatureStage; use codex_app_server_protocol::FeedbackUploadParams; use codex_app_server_protocol::FeedbackUploadResponse; +use codex_app_server_protocol::FileChangeApprovalResolvedNotification; +use codex_app_server_protocol::FileUpdateChange; use codex_app_server_protocol::ForkConversationParams; use codex_app_server_protocol::ForkConversationResponse; use codex_app_server_protocol::FuzzyFileSearchParams; @@ -73,6 +77,7 @@ use codex_app_server_protocol::GitInfo as ApiGitInfo; use codex_app_server_protocol::HazelnutScope as ApiHazelnutScope; use codex_app_server_protocol::InputItem as WireInputItem; use codex_app_server_protocol::InterruptConversationParams; +use codex_app_server_protocol::ItemStartedNotification; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::ListConversationsParams; use codex_app_server_protocol::ListConversationsResponse; @@ -97,9 +102,11 @@ use codex_app_server_protocol::ModelListParams; use codex_app_server_protocol::ModelListResponse; use codex_app_server_protocol::NewConversationParams; use codex_app_server_protocol::NewConversationResponse; +use codex_app_server_protocol::PatchApplyStatus; use codex_app_server_protocol::ProductSurface as ApiProductSurface; use codex_app_server_protocol::RemoveConversationListenerParams; use codex_app_server_protocol::RemoveConversationSubscriptionResponse; +use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ResumeConversationParams; use codex_app_server_protocol::ResumeConversationResponse; use codex_app_server_protocol::ReviewDelivery as ApiReviewDelivery; @@ -112,6 +119,7 @@ use codex_app_server_protocol::SendUserMessageResponse; use codex_app_server_protocol::SendUserTurnParams; use codex_app_server_protocol::SendUserTurnResponse; use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::SessionConfiguredNotification; use codex_app_server_protocol::SetDefaultModelParams; use codex_app_server_protocol::SetDefaultModelResponse; @@ -166,6 +174,7 @@ use codex_app_server_protocol::ThreadUnarchivedNotification; use codex_app_server_protocol::ThreadUnsubscribeParams; use codex_app_server_protocol::ThreadUnsubscribeResponse; use codex_app_server_protocol::ThreadUnsubscribeStatus; +use codex_app_server_protocol::ToolRequestUserInputResolvedNotification; use codex_app_server_protocol::Turn; use codex_app_server_protocol::TurnInterruptParams; use codex_app_server_protocol::TurnStartParams; @@ -299,6 +308,7 @@ use uuid::Uuid; use crate::filters::compute_source_filters; use crate::filters::source_kind_matches; +use crate::thread_state::ThreadListenerCommand; use crate::thread_state::ThreadState; use crate::thread_state::ThreadStateManager; @@ -3204,11 +3214,11 @@ impl CodexMessageProcessor { }; let command = crate::thread_state::ThreadListenerCommand::SendThreadResumeResponse( - crate::thread_state::PendingThreadResumeRequest { + Box::new(crate::thread_state::PendingThreadResumeRequest { request_id: request_id.clone(), rollout_path, config_snapshot, - }, + }), ); if listener_command_tx.send(command).is_err() { let err = JSONRPCErrorError { @@ -4826,7 +4836,9 @@ impl CodexMessageProcessor { async fn finalize_thread_teardown(&mut self, thread_id: ThreadId) { self.pending_thread_unloads.lock().await.remove(&thread_id); - self.outgoing.cancel_requests_for_thread(thread_id).await; + self.outgoing + .cancel_requests_for_thread(thread_id, None) + .await; self.thread_state_manager .remove_thread_state(thread_id) .await; @@ -4887,7 +4899,9 @@ impl CodexMessageProcessor { self.pending_thread_unloads.lock().await.insert(thread_id); // Any pending app-server -> client requests for this thread can no longer be // answered; cancel their callbacks before shutdown/unload. - self.outgoing.cancel_requests_for_thread(thread_id).await; + self.outgoing + .cancel_requests_for_thread(thread_id, None) + .await; self.thread_state_manager .remove_thread_state(thread_id) .await; @@ -6416,21 +6430,15 @@ impl CodexMessageProcessor { let Some(listener_command) = listener_command else { break; }; - match listener_command { - crate::thread_state::ThreadListenerCommand::SendThreadResumeResponse( - resume_request, - ) => { - handle_pending_thread_resume_request( - conversation_id, - codex_home.as_path(), - &thread_state, - &thread_watch_manager, - &outgoing_for_task, - resume_request, - ) - .await; - } - } + handle_thread_listener_command( + conversation_id, + codex_home.as_path(), + &thread_state, + &thread_watch_manager, + &outgoing_for_task, + listener_command, + ) + .await; } } } @@ -6739,6 +6747,124 @@ impl CodexMessageProcessor { } } +fn pending_client_request_resume_notifications( + file_change_started: &HashMap>, + request: &ServerRequest, +) -> Vec { + match request { + ServerRequest::FileChangeRequestApproval { params, .. } => file_change_started + .get(¶ms.item_id) + .map(|changes| { + vec![ServerNotification::ItemStarted(ItemStartedNotification { + thread_id: params.thread_id.clone(), + turn_id: params.turn_id.clone(), + item: ThreadItem::FileChange { + id: params.item_id.clone(), + changes: changes.clone(), + status: PatchApplyStatus::InProgress, + }, + })] + }) + .unwrap_or_default(), + ServerRequest::CommandExecutionRequestApproval { .. } + | ServerRequest::ToolRequestUserInput { .. } + | ServerRequest::DynamicToolCall { .. } + | ServerRequest::ChatgptAuthTokensRefresh { .. } + | ServerRequest::ApplyPatchApproval { .. } + | ServerRequest::ExecCommandApproval { .. } => Vec::new(), + } +} + +async fn resolve_pending_server_request( + conversation_id: ThreadId, + thread_state: &Arc>, + outgoing: &Arc, + request_type: ServerRequestType, + request_id: RequestId, +) { + let thread_id = conversation_id.to_string(); + let subscribed_connection_ids = thread_state.lock().await.subscribed_connection_ids(); + let outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing.clone(), + subscribed_connection_ids, + conversation_id, + ); + let notification = match request_type { + ServerRequestType::CommandExecutionRequestApproval => { + ServerNotification::CommandExecutionApprovalResolved( + CommandExecutionApprovalResolvedNotification { + thread_id, + request_id, + }, + ) + } + ServerRequestType::FileChangeRequestApproval => { + ServerNotification::FileChangeApprovalResolved(FileChangeApprovalResolvedNotification { + thread_id, + request_id, + }) + } + ServerRequestType::ToolRequestUserInput => { + ServerNotification::ToolRequestUserInputResolved( + ToolRequestUserInputResolvedNotification { + thread_id, + request_id, + }, + ) + } + }; + outgoing.send_server_notification(notification).await; +} + +pub(crate) async fn abort_pending_client_requests(outgoing: &ThreadScopedOutgoingMessageSender) { + outgoing + .cancel_tracked_requests_with_error(JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: "tracked client request resolved because the turn state was changed" + .to_string(), + data: Some(serde_json::json!({ "reason": "turnTransition" })), + }) + .await; +} + +async fn handle_thread_listener_command( + conversation_id: ThreadId, + codex_home: &Path, + thread_state: &Arc>, + thread_watch_manager: &ThreadWatchManager, + outgoing: &Arc, + listener_command: ThreadListenerCommand, +) { + match listener_command { + ThreadListenerCommand::SendThreadResumeResponse(resume_request) => { + handle_pending_thread_resume_request( + conversation_id, + codex_home, + thread_state, + thread_watch_manager, + outgoing, + *resume_request, + ) + .await; + } + ThreadListenerCommand::ResolveServerRequest { + request_type, + request_id, + completion_tx, + } => { + resolve_pending_server_request( + conversation_id, + thread_state, + outgoing, + request_type, + request_id, + ) + .await; + let _ = completion_tx.send(()); + } + } +} + async fn handle_pending_thread_resume_request( conversation_id: ThreadId, codex_home: &Path, @@ -6827,7 +6953,27 @@ async fn handle_pending_thread_resume_request( reasoning_effort, }; outgoing.send_response(request_id, response).await; - thread_state.lock().await.add_connection(connection_id); + + let pending_requests = outgoing.pending_requests_for_thread(conversation_id).await; + let file_change_started = { + let mut state = thread_state.lock().await; + state.add_connection(connection_id); + state.turn_summary.file_change_started.clone() + }; + + for request in pending_requests { + let notifications_before_request = + pending_client_request_resume_notifications(&file_change_started, &request); + for notification in notifications_before_request { + outgoing + .send_server_notification_to_connections(&[connection_id], notification.clone()) + .await; + } + + outgoing + .replay_request_to_connections(&[connection_id], request) + .await; + } } async fn load_thread_for_running_resume_response( @@ -7577,7 +7723,11 @@ pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread { #[cfg(test)] mod tests { use super::*; + use crate::outgoing_message::OutgoingEnvelope; + use crate::outgoing_message::OutgoingMessage; use anyhow::Result; + use codex_app_server_protocol::ServerRequestPayload; + use codex_app_server_protocol::ToolRequestUserInputParams; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; use pretty_assertions::assert_eq; @@ -7771,6 +7921,68 @@ mod tests { Ok(()) } + #[tokio::test] + async fn aborting_tracked_request_clears_pending_state() -> Result<()> { + let thread_id = ThreadId::from_string("bfd12a78-5900-467b-9bc5-d3d35df08191")?; + let thread_state = Arc::new(Mutex::new(ThreadState::default())); + let connection_id = ConnectionId(7); + thread_state.lock().await.add_connection(connection_id); + + let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(8); + let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx)); + let thread_outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing.clone(), + vec![connection_id], + thread_id, + ); + + let (request, client_request_rx) = thread_outgoing + .send_request_with_server_request(ServerRequestPayload::ToolRequestUserInput( + ToolRequestUserInputParams { + thread_id: thread_id.to_string(), + turn_id: "turn-1".to_string(), + item_id: "call-1".to_string(), + questions: vec![], + }, + )) + .await; + let request_id = request.id().clone(); + abort_pending_client_requests(&thread_outgoing).await; + + let request_message = outgoing_rx.recv().await.expect("request should be sent"); + let OutgoingEnvelope::ToConnection { + connection_id: request_connection_id, + message: + OutgoingMessage::Request(ServerRequest::ToolRequestUserInput { + request_id: sent_request_id, + .. + }), + } = request_message + else { + panic!("expected tool request to be sent to the subscribed connection"); + }; + assert_eq!(request_connection_id, connection_id); + assert_eq!(sent_request_id, request_id); + + let response = client_request_rx + .await + .expect("callback should be resolved"); + let error = response.expect_err("request should be aborted during cleanup"); + assert_eq!( + error.message, + "tracked client request resolved because the turn state was changed" + ); + assert_eq!(error.data, Some(json!({ "reason": "turnTransition" }))); + assert!( + outgoing + .pending_requests_for_thread(thread_id) + .await + .is_empty() + ); + assert!(outgoing_rx.try_recv().is_err()); + Ok(()) + } + #[test] fn summary_from_state_db_metadata_preserves_agent_nickname() -> Result<()> { let conversation_id = ThreadId::from_string("bfd12a78-5900-467b-9bc5-d3d35df08191")?; diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index 44de7834556..36ba916b7cb 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -1,3 +1,4 @@ +use std::cmp::Ordering as CmpOrdering; use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::AtomicI64; @@ -62,6 +63,8 @@ pub(crate) struct ThreadScopedOutgoingMessageSender { struct PendingCallbackEntry { callback: oneshot::Sender, thread_id: Option, + request: ServerRequest, + tracked_request: bool, } impl ThreadScopedOutgoingMessageSender { @@ -90,6 +93,23 @@ impl ThreadScopedOutgoingMessageSender { .await } + pub(crate) async fn send_request_with_server_request( + &self, + payload: ServerRequestPayload, + ) -> (ServerRequest, oneshot::Receiver) { + let payload_clone = payload.clone(); + let (request_id, rx) = self + .outgoing + .send_request_with_id_to_connections( + Some(self.connection_ids.as_slice()), + payload, + Some(self.thread_id), + true, + ) + .await; + (payload_clone.request_with_id(request_id), rx) + } + pub(crate) async fn send_server_notification(&self, notification: ServerNotification) { if self.connection_ids.is_empty() { return; @@ -99,6 +119,12 @@ impl ThreadScopedOutgoingMessageSender { .await; } + pub(crate) async fn cancel_tracked_requests_with_error(&self, error: JSONRPCErrorError) { + self.outgoing + .cancel_tracked_requests_for_thread(self.thread_id, Some(error)) + .await + } + pub(crate) async fn send_response( &self, request_id: ConnectionRequestId, @@ -129,7 +155,7 @@ impl OutgoingMessageSender { &self, request: ServerRequestPayload, ) -> (RequestId, oneshot::Receiver) { - self.send_request_with_id_to_connections(&[], request, None) + self.send_request_with_id_to_connections(None, request, None, false) .await } @@ -144,7 +170,12 @@ impl OutgoingMessageSender { return rx; } let (_request_id, receiver) = self - .send_request_with_id_to_connections(connection_ids, request, Some(thread_id)) + .send_request_with_id_to_connections( + Some(connection_ids), + request, + Some(thread_id), + false, + ) .await; receiver } @@ -155,12 +186,15 @@ impl OutgoingMessageSender { async fn send_request_with_id_to_connections( &self, - connection_ids: &[ConnectionId], + connection_ids: Option<&[ConnectionId]>, request: ServerRequestPayload, thread_id: Option, + tracked_request: bool, ) -> (RequestId, oneshot::Receiver) { let id = self.next_request_id(); let outgoing_message_id = id.clone(); + let request = request.request_with_id(outgoing_message_id.clone()); + let (tx_approve, rx_approve) = oneshot::channel(); { let mut request_id_to_callback = self.request_id_to_callback.lock().await; @@ -169,36 +203,40 @@ impl OutgoingMessageSender { PendingCallbackEntry { callback: tx_approve, thread_id, + request: request.clone(), + tracked_request, }, ); } - let outgoing_message = - OutgoingMessage::Request(request.request_with_id(outgoing_message_id.clone())); - let send_result = if connection_ids.is_empty() { - self.sender - .send(OutgoingEnvelope::Broadcast { - message: outgoing_message, - }) - .await - } else { - let mut send_error = None; - for connection_id in connection_ids { - if let Err(err) = self - .sender - .send(OutgoingEnvelope::ToConnection { - connection_id: *connection_id, - message: outgoing_message.clone(), + let outgoing_message = OutgoingMessage::Request(request); + let send_result = match connection_ids { + None => { + self.sender + .send(OutgoingEnvelope::Broadcast { + message: outgoing_message, }) .await - { - send_error = Some(err); - break; - } } - match send_error { - Some(err) => Err(err), - None => Ok(()), + Some(connection_ids) => { + let mut send_error = None; + for connection_id in connection_ids { + if let Err(err) = self + .sender + .send(OutgoingEnvelope::ToConnection { + connection_id: *connection_id, + message: outgoing_message.clone(), + }) + .await + { + send_error = Some(err); + break; + } + } + match send_error { + Some(err) => Err(err), + None => Ok(()), + } } }; @@ -210,11 +248,28 @@ impl OutgoingMessageSender { (outgoing_message_id, rx_approve) } + pub(crate) async fn replay_request_to_connections( + &self, + connection_ids: &[ConnectionId], + request: ServerRequest, + ) { + let outgoing_message = OutgoingMessage::Request(request); + for connection_id in connection_ids { + if let Err(err) = self + .sender + .send(OutgoingEnvelope::ToConnection { + connection_id: *connection_id, + message: outgoing_message.clone(), + }) + .await + { + warn!("failed to resend request to client: {err:?}"); + } + } + } + pub(crate) async fn notify_client_response(&self, id: RequestId, result: Result) { - let entry = { - let mut request_id_to_callback = self.request_id_to_callback.lock().await; - request_id_to_callback.remove_entry(&id) - }; + let entry = self.take_request_callback(&id).await; match entry { Some((id, entry)) => { @@ -229,10 +284,7 @@ impl OutgoingMessageSender { } pub(crate) async fn notify_client_error(&self, id: RequestId, error: JSONRPCErrorError) { - let entry = { - let mut request_id_to_callback = self.request_id_to_callback.lock().await; - request_id_to_callback.remove_entry(&id) - }; + let entry = self.take_request_callback(&id).await; match entry { Some((id, entry)) => { @@ -248,23 +300,83 @@ impl OutgoingMessageSender { } pub(crate) async fn cancel_request(&self, id: &RequestId) -> bool { - let entry = { - let mut request_id_to_callback = self.request_id_to_callback.lock().await; - request_id_to_callback.remove_entry(id) - }; - entry.is_some() + self.take_request_callback(id).await.is_some() } - pub(crate) async fn cancel_requests_for_thread(&self, thread_id: ThreadId) { + async fn take_request_callback( + &self, + id: &RequestId, + ) -> Option<(RequestId, PendingCallbackEntry)> { let mut request_id_to_callback = self.request_id_to_callback.lock().await; - let request_ids = request_id_to_callback + request_id_to_callback.remove_entry(id) + } + + pub(crate) async fn pending_requests_for_thread( + &self, + thread_id: ThreadId, + ) -> Vec { + let request_id_to_callback = self.request_id_to_callback.lock().await; + let mut requests = request_id_to_callback .iter() - .filter_map(|(request_id, entry)| { - (entry.thread_id == Some(thread_id)).then_some(request_id.clone()) + .filter_map(|(_, entry)| { + (entry.thread_id == Some(thread_id) && entry.tracked_request) + .then_some(entry.request.clone()) }) .collect::>(); - for request_id in request_ids { - request_id_to_callback.remove(&request_id); + sort_requests_by_id(&mut requests); + requests + } + + pub(crate) async fn cancel_requests_for_thread( + &self, + thread_id: ThreadId, + error: Option, + ) { + self.cancel_requests_for_thread_matching(thread_id, error, false) + .await + } + + pub(crate) async fn cancel_tracked_requests_for_thread( + &self, + thread_id: ThreadId, + error: Option, + ) { + self.cancel_requests_for_thread_matching(thread_id, error, true) + .await + } + + async fn cancel_requests_for_thread_matching( + &self, + thread_id: ThreadId, + error: Option, + tracked_only: bool, + ) { + let entries = { + let mut request_id_to_callback = self.request_id_to_callback.lock().await; + let request_ids = request_id_to_callback + .iter() + .filter_map(|(request_id, entry)| { + (entry.thread_id == Some(thread_id) && (!tracked_only || entry.tracked_request)) + .then_some(request_id.clone()) + }) + .collect::>(); + + let mut entries = Vec::with_capacity(request_ids.len()); + for request_id in request_ids { + if let Some(entry) = request_id_to_callback.remove(&request_id) { + entries.push(entry); + } + } + entries + }; + + if let Some(error) = error { + for entry in entries { + if let Err(err) = entry.callback.send(Err(error.clone())) { + let request_id = entry.request.id(); + warn!("could not notify callback for {request_id:?} due to: {err:?}",); + } + } } } @@ -399,6 +511,19 @@ impl OutgoingMessageSender { } } +fn compare_request_ids(left: &RequestId, right: &RequestId) -> CmpOrdering { + match (left, right) { + (RequestId::Integer(left), RequestId::Integer(right)) => left.cmp(right), + (RequestId::String(left), RequestId::String(right)) => left.cmp(right), + (RequestId::Integer(_), RequestId::String(_)) => CmpOrdering::Less, + (RequestId::String(_), RequestId::Integer(_)) => CmpOrdering::Greater, + } +} + +fn sort_requests_by_id(requests: &mut [ServerRequest]) { + requests.sort_by(|left, right| compare_request_ids(left.id(), right.id())); +} + /// Outgoing message from the server to the client. #[derive(Debug, Clone, Serialize)] #[serde(untagged)] @@ -441,14 +566,18 @@ mod tests { use codex_app_server_protocol::ApplyPatchApprovalParams; use codex_app_server_protocol::AuthMode; use codex_app_server_protocol::ConfigWarningNotification; + use codex_app_server_protocol::DynamicToolCallParams; + use codex_app_server_protocol::FileChangeRequestApprovalParams; use codex_app_server_protocol::LoginChatGptCompleteNotification; use codex_app_server_protocol::ModelRerouteReason; use codex_app_server_protocol::ModelReroutedNotification; use codex_app_server_protocol::RateLimitSnapshot; use codex_app_server_protocol::RateLimitWindow; + use codex_app_server_protocol::ToolRequestUserInputParams; use codex_protocol::ThreadId; use pretty_assertions::assert_eq; use serde_json::json; + use std::sync::Arc; use tokio::time::timeout; use uuid::Uuid; @@ -723,4 +852,115 @@ mod tests { .expect("waiter should receive a callback"); assert_eq!(result, Err(error)); } + + #[tokio::test] + async fn pending_requests_for_thread_only_returns_tracked_requests_in_request_id_order() { + let (tx, _rx) = mpsc::channel::(8); + let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let thread_id = ThreadId::new(); + let thread_outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing.clone(), + vec![ConnectionId(1)], + thread_id, + ); + + let _dynamic_tool_waiter = thread_outgoing + .send_request(ServerRequestPayload::DynamicToolCall( + DynamicToolCallParams { + thread_id: thread_id.to_string(), + turn_id: "turn-1".to_string(), + call_id: "call-0".to_string(), + tool: "tool".to_string(), + arguments: json!({}), + }, + )) + .await; + let (first_tracked_request, _first_waiter) = thread_outgoing + .send_request_with_server_request(ServerRequestPayload::ToolRequestUserInput( + ToolRequestUserInputParams { + thread_id: thread_id.to_string(), + turn_id: "turn-1".to_string(), + item_id: "call-1".to_string(), + questions: vec![], + }, + )) + .await; + let (second_tracked_request, _second_waiter) = thread_outgoing + .send_request_with_server_request(ServerRequestPayload::FileChangeRequestApproval( + FileChangeRequestApprovalParams { + thread_id: thread_id.to_string(), + turn_id: "turn-1".to_string(), + item_id: "call-2".to_string(), + reason: None, + grant_root: None, + }, + )) + .await; + + let pending_requests = outgoing.pending_requests_for_thread(thread_id).await; + assert_eq!( + pending_requests, + vec![first_tracked_request, second_tracked_request] + ); + } + + #[tokio::test] + async fn cancel_tracked_requests_for_thread_skips_untracked_requests() { + let (tx, _rx) = mpsc::channel::(8); + let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let thread_id = ThreadId::new(); + let thread_outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing.clone(), + vec![ConnectionId(1)], + thread_id, + ); + + let dynamic_tool_waiter = thread_outgoing + .send_request(ServerRequestPayload::DynamicToolCall( + DynamicToolCallParams { + thread_id: thread_id.to_string(), + turn_id: "turn-1".to_string(), + call_id: "call-0".to_string(), + tool: "tool".to_string(), + arguments: json!({}), + }, + )) + .await; + let (_tracked_request, tracked_waiter) = thread_outgoing + .send_request_with_server_request(ServerRequestPayload::ToolRequestUserInput( + ToolRequestUserInputParams { + thread_id: thread_id.to_string(), + turn_id: "turn-1".to_string(), + item_id: "call-1".to_string(), + questions: vec![], + }, + )) + .await; + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: "tracked request cancelled".to_string(), + data: None, + }; + + outgoing + .cancel_tracked_requests_for_thread(thread_id, Some(error.clone())) + .await; + + let tracked_result = timeout(Duration::from_secs(1), tracked_waiter) + .await + .expect("tracked waiter should resolve") + .expect("tracked waiter should receive a callback"); + assert_eq!(tracked_result, Err(error)); + assert!( + outgoing + .pending_requests_for_thread(thread_id) + .await + .is_empty() + ); + assert!( + timeout(Duration::from_millis(50), dynamic_tool_waiter) + .await + .is_err() + ); + } } diff --git a/codex-rs/app-server/src/thread_state.rs b/codex-rs/app-server/src/thread_state.rs index 04e2a8240ea..79ee47985bc 100644 --- a/codex-rs/app-server/src/thread_state.rs +++ b/codex-rs/app-server/src/thread_state.rs @@ -1,5 +1,7 @@ use crate::outgoing_message::ConnectionId; use crate::outgoing_message::ConnectionRequestId; +use codex_app_server_protocol::FileUpdateChange; +use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ThreadHistoryBuilder; use codex_app_server_protocol::Turn; use codex_app_server_protocol::TurnError; @@ -29,13 +31,24 @@ pub(crate) struct PendingThreadResumeRequest { } pub(crate) enum ThreadListenerCommand { - SendThreadResumeResponse(PendingThreadResumeRequest), + SendThreadResumeResponse(Box), + ResolveServerRequest { + request_id: RequestId, + request_type: ServerRequestType, + completion_tx: oneshot::Sender<()>, + }, +} + +pub(crate) enum ServerRequestType { + CommandExecutionRequestApproval, + FileChangeRequestApproval, + ToolRequestUserInput, } /// Per-conversation accumulation of the latest states e.g. error message while a turn runs. #[derive(Default, Clone)] pub(crate) struct TurnSummary { - pub(crate) file_change_started: HashSet, + pub(crate) file_change_started: HashMap>, pub(crate) command_execution_started: HashSet, pub(crate) last_error: Option, } diff --git a/codex-rs/app-server/tests/suite/v2/request_user_input.rs b/codex-rs/app-server/tests/suite/v2/request_user_input.rs index 926cb9bbc64..a3ebeb51319 100644 --- a/codex-rs/app-server/tests/suite/v2/request_user_input.rs +++ b/codex-rs/app-server/tests/suite/v2/request_user_input.rs @@ -4,11 +4,13 @@ use app_test_support::create_final_assistant_message_sse_response; use app_test_support::create_mock_responses_server_sequence; use app_test_support::create_request_user_input_sse_response; use app_test_support::to_response; +use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::ToolRequestUserInputResolvedNotification; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::UserInput as V2UserInput; @@ -86,6 +88,7 @@ async fn request_user_input_round_trip() -> Result<()> { assert_eq!(params.turn_id, turn.id); assert_eq!(params.item_id, "call1"); assert_eq!(params.questions.len(), 1); + let resolved_request_id = request_id.clone(); mcp.send_response( request_id, @@ -96,17 +99,46 @@ async fn request_user_input_round_trip() -> Result<()> { }), ) .await?; - - timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_notification_message("codex/event/task_complete"), - ) - .await??; - timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_notification_message("turn/completed"), - ) - .await??; + let mut saw_resolved = false; + let mut saw_task_complete = false; + loop { + let message = timeout(DEFAULT_READ_TIMEOUT, mcp.read_next_message()).await??; + let JSONRPCMessage::Notification(notification) = message else { + continue; + }; + match notification.method.as_str() { + "item/tool/requestUserInputResolved" => { + let resolved: ToolRequestUserInputResolvedNotification = serde_json::from_value( + notification + .params + .clone() + .expect("item/tool/requestUserInputResolved params"), + )?; + assert_eq!(resolved.thread_id, thread.id); + assert_eq!(resolved.request_id, resolved_request_id); + saw_resolved = true; + } + "codex/event/task_complete" => { + assert!( + saw_resolved, + "item/tool/requestUserInputResolved should arrive first" + ); + saw_task_complete = true; + } + "turn/completed" => { + assert!( + saw_resolved, + "item/tool/requestUserInputResolved should arrive first" + ); + assert!( + saw_task_complete, + "task_complete should arrive before turn/completed" + ); + break; + } + _ => {} + } + } Ok(()) } diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index 15bc4012eb7..55221db9c11 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -1,13 +1,26 @@ use anyhow::Result; use app_test_support::McpProcess; +use app_test_support::create_apply_patch_sse_response; use app_test_support::create_fake_rollout_with_text_elements; +use app_test_support::create_final_assistant_message_sse_response; use app_test_support::create_mock_responses_server_repeating_assistant; +use app_test_support::create_mock_responses_server_sequence; +use app_test_support::create_shell_command_sse_response; use app_test_support::rollout_path; use app_test_support::to_response; use chrono::Utc; +use codex_app_server_protocol::AskForApproval; +use codex_app_server_protocol::CommandExecutionApprovalDecision; +use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; +use codex_app_server_protocol::FileChangeApprovalDecision; +use codex_app_server_protocol::FileChangeRequestApprovalResponse; +use codex_app_server_protocol::ItemStartedNotification; use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::PatchApplyStatus; +use codex_app_server_protocol::PatchChangeKind; use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::SessionSource; use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadResumeParams; @@ -639,6 +652,329 @@ async fn thread_resume_rejoins_running_thread_even_with_override_mismatch() -> R Ok(()) } +#[tokio::test] +async fn thread_resume_replays_pending_command_execution_request_approval() -> Result<()> { + let responses = vec![ + create_final_assistant_message_sse_response("seeded")?, + create_shell_command_sse_response( + vec![ + "python3".to_string(), + "-c".to_string(), + "print(42)".to_string(), + ], + None, + Some(5000), + "call-1", + )?, + create_final_assistant_message_sse_response("done")?, + ]; + let server = create_mock_responses_server_sequence(responses).await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut primary = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??; + + let start_id = primary + .send_thread_start_request(ThreadStartParams { + model: Some("gpt-5.1-codex-max".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let seed_turn_id = primary + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![UserInput::Text { + text: "seed history".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(seed_turn_id)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("turn/completed"), + ) + .await??; + primary.clear_message_buffer(); + + let running_turn_id = primary + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![UserInput::Text { + text: "run command".to_string(), + text_elements: Vec::new(), + }], + approval_policy: Some(AskForApproval::UnlessTrusted), + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(running_turn_id)), + ) + .await??; + + let original_request = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_request_message(), + ) + .await??; + let ServerRequest::CommandExecutionRequestApproval { .. } = &original_request else { + panic!("expected CommandExecutionRequestApproval request, got {original_request:?}"); + }; + + let resume_id = primary + .send_thread_resume_request(ThreadResumeParams { + thread_id: thread.id.clone(), + ..Default::default() + }) + .await?; + let resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(resume_id)), + ) + .await??; + let ThreadResumeResponse { + thread: resumed_thread, + .. + } = to_response::(resume_resp)?; + assert_eq!(resumed_thread.id, thread.id); + assert!( + resumed_thread + .turns + .iter() + .any(|turn| matches!(turn.status, TurnStatus::InProgress)) + ); + + let replayed_request = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_request_message(), + ) + .await??; + pretty_assertions::assert_eq!(replayed_request, original_request); + + let ServerRequest::CommandExecutionRequestApproval { request_id, .. } = replayed_request else { + panic!("expected CommandExecutionRequestApproval request"); + }; + primary + .send_response( + request_id, + serde_json::to_value(CommandExecutionRequestApprovalResponse { + decision: CommandExecutionApprovalDecision::Accept, + })?, + ) + .await?; + + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + Ok(()) +} + +#[tokio::test] +async fn thread_resume_replays_pending_file_change_request_approval() -> Result<()> { + let tmp = TempDir::new()?; + let codex_home = tmp.path().join("codex_home"); + std::fs::create_dir(&codex_home)?; + let workspace = tmp.path().join("workspace"); + std::fs::create_dir(&workspace)?; + + let patch = r#"*** Begin Patch +*** Add File: README.md ++new line +*** End Patch +"#; + let responses = vec![ + create_final_assistant_message_sse_response("seeded")?, + create_apply_patch_sse_response(patch, "patch-call")?, + create_final_assistant_message_sse_response("done")?, + ]; + let server = create_mock_responses_server_sequence(responses).await; + create_config_toml(&codex_home, &server.uri())?; + + let mut primary = McpProcess::new(&codex_home).await?; + timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??; + + let start_id = primary + .send_thread_start_request(ThreadStartParams { + model: Some("gpt-5.1-codex-max".to_string()), + cwd: Some(workspace.to_string_lossy().into_owned()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let seed_turn_id = primary + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![UserInput::Text { + text: "seed history".to_string(), + text_elements: Vec::new(), + }], + cwd: Some(workspace.clone()), + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(seed_turn_id)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("turn/completed"), + ) + .await??; + primary.clear_message_buffer(); + + let running_turn_id = primary + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![UserInput::Text { + text: "apply patch".to_string(), + text_elements: Vec::new(), + }], + cwd: Some(workspace.clone()), + approval_policy: Some(AskForApproval::UnlessTrusted), + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(running_turn_id)), + ) + .await??; + + let original_started = timeout(DEFAULT_READ_TIMEOUT, async { + loop { + let notification = primary + .read_stream_until_notification_message("item/started") + .await?; + let started: ItemStartedNotification = + serde_json::from_value(notification.params.clone().expect("item/started params"))?; + if let ThreadItem::FileChange { .. } = started.item { + return Ok::(started.item); + } + } + }) + .await??; + let ThreadItem::FileChange { + ref id, + ref status, + ref changes, + } = original_started + else { + unreachable!("loop ensures we break on file change items"); + }; + assert_eq!(id, "patch-call"); + assert_eq!(status, &PatchApplyStatus::InProgress); + let expected_readme_path = workspace.join("README.md"); + assert_eq!( + changes, + &vec![codex_app_server_protocol::FileUpdateChange { + path: expected_readme_path.to_string_lossy().into_owned(), + kind: PatchChangeKind::Add, + diff: "new line\n".to_string(), + }] + ); + + let original_request = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_request_message(), + ) + .await??; + let ServerRequest::FileChangeRequestApproval { .. } = &original_request else { + panic!("expected FileChangeRequestApproval request, got {original_request:?}"); + }; + primary.clear_message_buffer(); + + let resume_id = primary + .send_thread_resume_request(ThreadResumeParams { + thread_id: thread.id.clone(), + ..Default::default() + }) + .await?; + let resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(resume_id)), + ) + .await??; + let ThreadResumeResponse { + thread: resumed_thread, + .. + } = to_response::(resume_resp)?; + assert_eq!(resumed_thread.id, thread.id); + assert!( + resumed_thread + .turns + .iter() + .any(|turn| matches!(turn.status, TurnStatus::InProgress)) + ); + + let replayed_started = timeout(DEFAULT_READ_TIMEOUT, async { + loop { + let notification = primary + .read_stream_until_notification_message("item/started") + .await?; + let started: ItemStartedNotification = + serde_json::from_value(notification.params.clone().expect("item/started params"))?; + if let ThreadItem::FileChange { .. } = started.item { + return Ok::(started.item); + } + } + }) + .await??; + assert_eq!(replayed_started, original_started); + + let replayed_request = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_request_message(), + ) + .await??; + assert_eq!(replayed_request, original_request); + + let ServerRequest::FileChangeRequestApproval { request_id, .. } = replayed_request else { + panic!("expected FileChangeRequestApproval request"); + }; + primary + .send_response( + request_id, + serde_json::to_value(FileChangeRequestApprovalResponse { + decision: FileChangeApprovalDecision::Accept, + })?, + ) + .await?; + + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + Ok(()) +} + #[tokio::test] async fn thread_resume_with_overrides_defers_updated_at_until_turn_start() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; diff --git a/codex-rs/app-server/tests/suite/v2/turn_interrupt.rs b/codex-rs/app-server/tests/suite/v2/turn_interrupt.rs index 486e915f6f6..e5fc7d2b76b 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_interrupt.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_interrupt.rs @@ -5,9 +5,11 @@ use app_test_support::McpProcess; use app_test_support::create_mock_responses_server_sequence; use app_test_support::create_shell_command_sse_response; use app_test_support::to_response; +use codex_app_server_protocol::CommandExecutionApprovalResolvedNotification; use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::TurnCompletedNotification; @@ -48,7 +50,7 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> { "call_sleep", )?]) .await; - create_config_toml(&codex_home, &server.uri())?; + create_config_toml(&codex_home, &server.uri(), "never")?; let mut mcp = McpProcess::new(&codex_home).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; @@ -120,15 +122,134 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> { Ok(()) } +#[tokio::test] +async fn turn_interrupt_resolves_pending_command_approval_request() -> Result<()> { + #[cfg(target_os = "windows")] + let shell_command = vec![ + "powershell".to_string(), + "-Command".to_string(), + "Start-Sleep -Seconds 10".to_string(), + ]; + #[cfg(not(target_os = "windows"))] + let shell_command = vec!["sleep".to_string(), "10".to_string()]; + + let tmp = TempDir::new()?; + let codex_home = tmp.path().join("codex_home"); + std::fs::create_dir(&codex_home)?; + let working_directory = tmp.path().join("workdir"); + std::fs::create_dir(&working_directory)?; + + let server = create_mock_responses_server_sequence(vec![create_shell_command_sse_response( + shell_command.clone(), + Some(&working_directory), + Some(10_000), + "call_sleep_approval", + )?]) + .await; + create_config_toml(&codex_home, &server.uri(), "untrusted")?; + + let mut mcp = McpProcess::new(&codex_home).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_req = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let thread_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(thread_req)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(thread_resp)?; + + let turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "run sleep".to_string(), + text_elements: Vec::new(), + }], + cwd: Some(working_directory), + ..Default::default() + }) + .await?; + let turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req)), + ) + .await??; + let TurnStartResponse { turn } = to_response::(turn_resp)?; + + let request = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_request_message(), + ) + .await??; + let ServerRequest::CommandExecutionRequestApproval { request_id, params } = request else { + panic!("expected CommandExecutionRequestApproval request"); + }; + assert_eq!(params.item_id, "call_sleep_approval"); + assert_eq!(params.thread_id, thread.id); + assert_eq!(params.turn_id, turn.id); + + let interrupt_id = mcp + .send_turn_interrupt_request(TurnInterruptParams { + thread_id: thread.id.clone(), + turn_id: turn.id.clone(), + }) + .await?; + let interrupt_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(interrupt_id)), + ) + .await??; + let _resp: TurnInterruptResponse = to_response::(interrupt_resp)?; + + let resolved_notification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("item/commandExecution/approvalResolved"), + ) + .await??; + let resolved: CommandExecutionApprovalResolvedNotification = serde_json::from_value( + resolved_notification + .params + .clone() + .expect("item/commandExecution/approvalResolved params must be present"), + )?; + assert_eq!(resolved.thread_id, thread.id); + assert_eq!(resolved.request_id, request_id); + + let completed_notif: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + let completed: TurnCompletedNotification = serde_json::from_value( + completed_notif + .params + .expect("turn/completed params must be present"), + )?; + assert_eq!(completed.thread_id, thread.id); + assert_eq!(completed.turn.status, TurnStatus::Interrupted); + + Ok(()) +} + // Helper to create a config.toml pointing at the mock model server. -fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> { +fn create_config_toml( + codex_home: &std::path::Path, + server_uri: &str, + approval_policy: &str, +) -> std::io::Result<()> { let config_toml = codex_home.join("config.toml"); std::fs::write( config_toml, format!( r#" model = "mock-model" -approval_policy = "never" +approval_policy = "{approval_policy}" sandbox_mode = "danger-full-access" model_provider = "mock_provider" diff --git a/codex-rs/app-server/tests/suite/v2/turn_start.rs b/codex-rs/app-server/tests/suite/v2/turn_start.rs index f396aa8c446..f53aa5f7fb8 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_start.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_start.rs @@ -14,14 +14,17 @@ use codex_app_server::INVALID_PARAMS_ERROR_CODE; use codex_app_server_protocol::ByteRange; use codex_app_server_protocol::ClientInfo; use codex_app_server_protocol::CommandExecutionApprovalDecision; +use codex_app_server_protocol::CommandExecutionApprovalResolvedNotification; use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; use codex_app_server_protocol::CommandExecutionStatus; use codex_app_server_protocol::FileChangeApprovalDecision; +use codex_app_server_protocol::FileChangeApprovalResolvedNotification; use codex_app_server_protocol::FileChangeOutputDeltaNotification; use codex_app_server_protocol::FileChangeRequestApprovalResponse; use codex_app_server_protocol::ItemCompletedNotification; use codex_app_server_protocol::ItemStartedNotification; use codex_app_server_protocol::JSONRPCError; +use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::PatchApplyStatus; @@ -1071,6 +1074,7 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> { panic!("expected CommandExecutionRequestApproval request"); }; assert_eq!(params.item_id, "call1"); + let resolved_request_id = request_id.clone(); // Approve and wait for task completion mcp.send_response( @@ -1080,16 +1084,47 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> { })?, ) .await?; - timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_notification_message("codex/event/task_complete"), - ) - .await??; - timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_notification_message("turn/completed"), - ) - .await??; + let mut saw_resolved = false; + let mut saw_task_complete = false; + loop { + let message = timeout(DEFAULT_READ_TIMEOUT, mcp.read_next_message()).await??; + let JSONRPCMessage::Notification(notification) = message else { + continue; + }; + match notification.method.as_str() { + "item/commandExecution/approvalResolved" => { + let resolved: CommandExecutionApprovalResolvedNotification = + serde_json::from_value( + notification + .params + .clone() + .expect("item/commandExecution/approvalResolved params"), + )?; + assert_eq!(resolved.thread_id, thread.id); + assert_eq!(resolved.request_id, resolved_request_id); + saw_resolved = true; + } + "codex/event/task_complete" => { + assert!( + saw_resolved, + "item/commandExecution/approvalResolved should arrive first" + ); + saw_task_complete = true; + } + "turn/completed" => { + assert!( + saw_resolved, + "item/commandExecution/approvalResolved should arrive first" + ); + assert!( + saw_task_complete, + "task_complete should arrive before turn/completed" + ); + break; + } + _ => {} + } + } // Second turn with approval_policy=never should not elicit approval let second_turn_id = mcp @@ -1527,6 +1562,7 @@ async fn turn_start_file_change_approval_v2() -> Result<()> { assert_eq!(params.item_id, "patch-call"); assert_eq!(params.thread_id, thread.id); assert_eq!(params.turn_id, turn.id); + let resolved_request_id = request_id.clone(); let expected_readme_path = workspace.join("README.md"); let expected_readme_path = expected_readme_path.to_string_lossy().into_owned(); pretty_assertions::assert_eq!( @@ -1545,18 +1581,63 @@ async fn turn_start_file_change_approval_v2() -> Result<()> { })?, ) .await?; - - let output_delta_notif = timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_notification_message("item/fileChange/outputDelta"), - ) - .await??; - let output_delta: FileChangeOutputDeltaNotification = serde_json::from_value( - output_delta_notif - .params - .clone() - .expect("item/fileChange/outputDelta params"), - )?; + let mut saw_resolved = false; + let mut saw_task_complete = false; + let mut output_delta: Option = None; + let mut completed_file_change: Option = None; + while !(saw_task_complete && output_delta.is_some() && completed_file_change.is_some()) { + let message = timeout(DEFAULT_READ_TIMEOUT, mcp.read_next_message()).await??; + let JSONRPCMessage::Notification(notification) = message else { + continue; + }; + match notification.method.as_str() { + "item/fileChange/approvalResolved" => { + let resolved: FileChangeApprovalResolvedNotification = serde_json::from_value( + notification + .params + .clone() + .expect("item/fileChange/approvalResolved params"), + )?; + assert_eq!(resolved.thread_id, thread.id); + assert_eq!(resolved.request_id, resolved_request_id); + saw_resolved = true; + } + "item/fileChange/outputDelta" => { + assert!( + saw_resolved, + "item/fileChange/approvalResolved should arrive first" + ); + let notification: FileChangeOutputDeltaNotification = serde_json::from_value( + notification + .params + .clone() + .expect("item/fileChange/outputDelta params"), + )?; + output_delta = Some(notification); + } + "item/completed" => { + let completed: ItemCompletedNotification = serde_json::from_value( + notification.params.clone().expect("item/completed params"), + )?; + if let ThreadItem::FileChange { .. } = completed.item { + assert!( + saw_resolved, + "item/fileChange/approvalResolved should arrive first" + ); + completed_file_change = Some(completed.item); + } + } + "codex/event/task_complete" => { + assert!( + saw_resolved, + "item/fileChange/approvalResolved should arrive first" + ); + saw_task_complete = true; + } + _ => {} + } + } + let output_delta = output_delta.expect("file change output delta should be observed"); assert_eq!(output_delta.thread_id, thread.id); assert_eq!(output_delta.turn_id, turn.id); assert_eq!(output_delta.item_id, "patch-call"); @@ -1566,35 +1647,14 @@ async fn turn_start_file_change_approval_v2() -> Result<()> { output_delta.delta ); - let completed_file_change = timeout(DEFAULT_READ_TIMEOUT, async { - loop { - let completed_notif = mcp - .read_stream_until_notification_message("item/completed") - .await?; - let completed: ItemCompletedNotification = serde_json::from_value( - completed_notif - .params - .clone() - .expect("item/completed params"), - )?; - if let ThreadItem::FileChange { .. } = completed.item { - return Ok::(completed.item); - } - } - }) - .await??; + let completed_file_change = + completed_file_change.expect("file change completion should be observed"); let ThreadItem::FileChange { ref id, status, .. } = completed_file_change else { unreachable!("loop ensures we break on file change items"); }; assert_eq!(id, "patch-call"); assert_eq!(status, PatchApplyStatus::Completed); - timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_notification_message("codex/event/task_complete"), - ) - .await??; - let readme_contents = std::fs::read_to_string(expected_readme_path)?; assert_eq!(readme_contents, "new line\n");