diff --git a/CHANGELOG.md b/CHANGELOG.md index 02e877c29..1d74edc06 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +- Remote mode: `ask_user` now reaches both the editor and connected SSE/web clients simultaneously, with the first answer from either winning, instead of only the web client when one is connected. + - Remote REST API: `GET /api/v1/chats/:id` `pendingToolCalls` now includes `ask_user` tool calls while waiting for an answer, with a `requestId` field for `POST /api/v1/answer`. - Add OpenAI Responses API for GitHub Copilot models that require it (gpt-5.5, gpt-5.4-mini). diff --git a/src/eca/remote/messenger.clj b/src/eca/remote/messenger.clj index 058b775db..139bef582 100644 --- a/src/eca/remote/messenger.clj +++ b/src/eca/remote/messenger.clj @@ -68,18 +68,26 @@ (editor-diagnostics [_this uri] (messenger/editor-diagnostics inner uri)) (ask-question [_this params] - ;; No SSE clients: fall back to inner so JSON-RPC editor sessions work - ;; unchanged. Otherwise reuse the caller-supplied :request-id (set by - ;; ask_user to match the id in tool-call state) and route the answer via - ;; SSE + /api/v1/answer. + ;; No SSE clients: delegate to inner. Otherwise ask both the editor (inner) + ;; and SSE clients; the first answer wins. (if (empty? @sse-connections*) (messenger/ask-question inner params) (let [request-id (or (:request-id params) (str (random-uuid))) - p (promise) - wire-params (-> params (dissoc :request-id) (assoc :requestId request-id))] - (swap! pending-questions* assoc request-id p) + result (promise) + inner-result (messenger/ask-question inner params) + wire-params (-> params (dissoc :request-id) (assoc :requestId request-id)) + ;; Watch the editor's answer and forward it; deliver is idempotent + ;; so the first answer (editor or SSE) wins. + watcher (future + (try + (deliver result (deref inner-result)) + (catch Throwable _ nil) + (finally + (swap! pending-questions* dissoc request-id))))] + (swap! pending-questions* assoc request-id + {:promise result :inner-result inner-result :watcher watcher}) (sse/broadcast! sse-connections* "chat:ask-question" (->camel wire-params)) - p)))) + result)))) (defn make-broadcast-messenger "Creates a BroadcastMessenger with a fresh pending-questions registry. @@ -89,17 +97,18 @@ (->BroadcastMessenger inner sse-connections* (atom {}))) (defn answer-question! - "Resolves a pending question (previously registered by `ask-question`) by - request-id. Delivers `{:answer answer :cancelled (boolean cancelled)}` to - the registered promise and removes the entry from the registry. - Returns true if a pending question was found and delivered, nil otherwise - (e.g. unknown or already-answered request-id). - - Uses `swap-vals!` so that claiming the entry is a single atomic op: - under concurrent answer-question! calls for the same request-id only the - caller that wins the swap observes the entry in `old` and delivers." + "Resolves a pending question by request-id: delivers + `{:answer answer :cancelled (boolean cancelled)}` to the registered promise, + cancels the editor's pending request (sending `$/cancelRequest`) and its + watcher, and removes the entry. Returns true when a pending question was + found and delivered, nil otherwise. + + Uses `swap-vals!` so claiming the entry is a single atomic op: under + concurrent calls for the same request-id only the swap winner delivers." [{:keys [pending-questions*]} request-id answer cancelled] (let [[old _new] (swap-vals! pending-questions* dissoc request-id)] - (when-let [p (get old request-id)] - (deliver p {:answer answer :cancelled (boolean cancelled)}) + (when-let [{:keys [promise inner-result watcher]} (get old request-id)] + (deliver promise {:answer answer :cancelled (boolean cancelled)}) + (when (future? inner-result) (future-cancel inner-result)) + (when watcher (future-cancel watcher)) true))) diff --git a/test/eca/remote/handlers_test.clj b/test/eca/remote/handlers_test.clj index 68ccb1e73..dbbf2cce5 100644 --- a/test/eca/remote/handlers_test.clj +++ b/test/eca/remote/handlers_test.clj @@ -259,6 +259,8 @@ (deftest handle-answer-question-test (let [inner (h/messenger) + ;; Editor doesn't answer; isolates the SSE path (inner is also asked). + _ (reset! (:ask-question-response* inner) :block) sse-connections* (sse/create-connections) broadcast-messenger (remote.messenger/make-broadcast-messenger inner sse-connections*) os (java.io.ByteArrayOutputStream.) diff --git a/test/eca/remote/messenger_test.clj b/test/eca/remote/messenger_test.clj index fa81651b0..0598db25e 100644 --- a/test/eca/remote/messenger_test.clj +++ b/test/eca/remote/messenger_test.clj @@ -64,6 +64,8 @@ (deftest ask-question-broadcasts-and-resolves-via-answer-test (testing "ask-question registers a promise, broadcasts SSE, and answer-question! resolves it" (let [inner (h/messenger) + ;; Editor doesn't answer; isolates the SSE path (inner is also asked). + _ (reset! (:ask-question-response* inner) :block) sse-connections* (sse/create-connections) broadcast-messenger (remote.messenger/make-broadcast-messenger inner sse-connections*) os (java.io.ByteArrayOutputStream.) @@ -89,6 +91,7 @@ (deftest ask-question-uses-caller-supplied-request-id-test (testing "caller-supplied :request-id is used as the SSE requestId and pending-questions* key" (let [inner (h/messenger) + _ (reset! (:ask-question-response* inner) :block) sse-connections* (sse/create-connections) broadcast-messenger (remote.messenger/make-broadcast-messenger inner sse-connections*) os (java.io.ByteArrayOutputStream.) @@ -123,3 +126,84 @@ sse-connections* (sse/create-connections) broadcast-messenger (remote.messenger/make-broadcast-messenger inner sse-connections*)] (is (nil? (remote.messenger/answer-question! broadcast-messenger "nonexistent" "x" false)))))) + +;;; ask-question dual-dispatch: with both an SSE client and the editor (inner) +;;; connected, the question reaches both and the first answer wins. + +(deftest ask-question-reaches-editor-and-sse-when-both-connected-test + (testing "with an SSE client connected, inner (editor) still receives chat/askQuestion" + (let [inner-params* (atom nil) + inner (reify messenger/IMessenger + (ask-question [_ params] + (reset! inner-params* params) + (promise))) + sse-connections* (sse/create-connections) + broadcast-messenger (remote.messenger/make-broadcast-messenger inner sse-connections*) + os (java.io.ByteArrayOutputStream.) + _client (sse/add-client! sse-connections* os)] + (messenger/ask-question broadcast-messenger + {:chat-id "c1" :question "Why?" :request-id "req-1"}) + (Thread/sleep 100) + (is (some? @inner-params*) + "editor must receive the question even when an SSE client is connected") + (is (.contains (.toString os "UTF-8") "chat:ask-question") + "SSE clients must also receive the question")))) + +(deftest ask-question-sse-answer-wins-test + (testing "an SSE answer resolves the call when both transports are connected" + (let [inner (reify messenger/IMessenger + ;; editor never answers + (ask-question [_ _params] (promise))) + sse-connections* (sse/create-connections) + broadcast-messenger (remote.messenger/make-broadcast-messenger inner sse-connections*) + os (java.io.ByteArrayOutputStream.) + _client (sse/add-client! sse-connections* os) + result (messenger/ask-question broadcast-messenger + {:chat-id "c1" :question "Q?" :request-id "req-1"}) + watcher (:watcher (get @(:pending-questions* broadcast-messenger) "req-1"))] + (Thread/sleep 50) + (is (= :pending (deref result 1 :pending)) + "result should block until someone answers") + (remote.messenger/answer-question! broadcast-messenger "req-1" "via-sse" false) + (is (= {:answer "via-sse" :cancelled false} (deref result 1000 :timeout))) + ;; The editor watcher must be cancelled so it doesn't park forever on an + ;; editor that may never answer. + (is (future-cancelled? watcher))))) + +(deftest ask-question-sse-answer-retracts-editor-request-test + (testing "an SSE answer cancels the editor's outstanding request (→ $/cancelRequest)" + ;; A CompletableFuture models the jsonrpc PendingRequest: future-cancellable, + ;; and cancelling it is what fires $/cancelRequest in the real ServerMessenger. + (let [inner-result (java.util.concurrent.CompletableFuture.) + inner (reify messenger/IMessenger + (ask-question [_ _params] inner-result)) + sse-connections* (sse/create-connections) + broadcast-messenger (remote.messenger/make-broadcast-messenger inner sse-connections*) + os (java.io.ByteArrayOutputStream.) + _client (sse/add-client! sse-connections* os) + result (messenger/ask-question broadcast-messenger + {:chat-id "c1" :question "Q?" :request-id "req-1"})] + (Thread/sleep 50) + (remote.messenger/answer-question! broadcast-messenger "req-1" "via-sse" false) + (is (= {:answer "via-sse" :cancelled false} (deref result 1000 :timeout))) + (is (future-cancelled? inner-result) + "the editor's pending request must be cancelled so the server retracts it")))) + +(deftest ask-question-editor-answer-wins-test + (testing "an editor answer resolves the call and cleans up the SSE pending entry" + (let [inner-promise (promise) + inner (reify messenger/IMessenger + (ask-question [_ _params] inner-promise)) + sse-connections* (sse/create-connections) + broadcast-messenger (remote.messenger/make-broadcast-messenger inner sse-connections*) + os (java.io.ByteArrayOutputStream.) + _client (sse/add-client! sse-connections* os) + result (messenger/ask-question broadcast-messenger + {:chat-id "c1" :question "Q?" :request-id "req-1"})] + (Thread/sleep 50) + (deliver inner-promise {:answer "via-editor" :cancelled false}) + (is (= {:answer "via-editor" :cancelled false} (deref result 1000 :timeout)) + "editor's answer must resolve the call") + (Thread/sleep 50) + (is (empty? @(:pending-questions* broadcast-messenger)) + "answering via the editor must clear the SSE pending entry so a late /answer is a no-op"))))