diff --git a/CHANGELOG.md b/CHANGELOG.md index df51596d..ed6728ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - Improved `eca_edit_file` to automatically handle whitespace and indentation differences in single-occurrence edits. - Fix contexts in user prompts (not system contexts) not parsing lines ranges properly. +- Support non-stream providers on openai-chat API. #174 ## 0.73.5 diff --git a/src/eca/features/chat.clj b/src/eca/features/chat.clj index 99dcdb89..bd13fd81 100644 --- a/src/eca/features/chat.clj +++ b/src/eca/features/chat.clj @@ -561,19 +561,19 @@ (when-not (get-in db [:chats chat-id :title]) (future* config - (when-let [{:keys [result]} (llm-api/sync-prompt! - {:provider provider - :model model - :model-capabilities (assoc model-capabilities - :reason? false - :tools false - :web-search false) - :instructions (f.prompt/title-prompt) - :user-messages user-messages - :config config - :provider-auth provider-auth})] - (when result - (let [title (subs result 0 (min (count result) 30))] + (when-let [{:keys [output-text]} (llm-api/sync-prompt! + {:provider provider + :model model + :model-capabilities (assoc model-capabilities + :reason? false + :tools false + :web-search false) + :instructions (f.prompt/title-prompt) + :user-messages user-messages + :config config + :provider-auth provider-auth})] + (when output-text + (let [title (subs output-text 0 (min (count output-text) 30))] (swap! db* assoc-in [:chats chat-id :title] title) (send-content! chat-ctx :system (assoc-some {:type :metadata} @@ -584,281 +584,284 @@ (send-content! chat-ctx :system {:type :progress :state :running :text "Waiting model"}) - (llm-api/async-prompt! - {:model model - :provider provider - :model-capabilities model-capabilities - :user-messages user-messages - :instructions instructions - :past-messages past-messages - :config config - :tools all-tools - :provider-auth provider-auth - :on-first-response-received (fn [& _] - (assert-chat-not-stopped! chat-ctx) - (doseq [message user-messages] - (add-to-history! message)) - (send-content! chat-ctx :system {:type :progress - :state :running - :text "Generating"})) - :on-usage-updated on-usage-updated - :on-message-received (fn [{:keys [type] :as msg}] - (assert-chat-not-stopped! chat-ctx) - (case type - :text (do - (swap! received-msgs* str (:text msg)) - (send-content! chat-ctx :assistant {:type :text - :text (:text msg)})) - :url (send-content! chat-ctx :assistant {:type :url - :title (:title msg) - :url (:url msg)}) - :limit-reached (do - (send-content! chat-ctx :system - {:type :text - :text (str "API limit reached. Tokens: " (json/generate-string (:tokens msg)))}) - - (finish-chat-prompt! :idle chat-ctx)) - :finish (do - (add-to-history! {:role "assistant" :content [{:type :text :text @received-msgs*}]}) - (finish-chat-prompt! :idle chat-ctx)))) - :on-prepare-tool-call (fn [{:keys [id name arguments-text]}] - (assert-chat-not-stopped! chat-ctx) - (transition-tool-call! db* chat-ctx id :tool-prepare - {:name name - :server (tool-name->server name all-tools) - :origin (tool-name->origin name all-tools) - :arguments-text arguments-text - :summary (f.tools/tool-call-summary all-tools name nil config)})) - :on-tools-called (fn [tool-calls] - ;; If there are multiple tool calls, they are allowed to execute concurrently. - (assert-chat-not-stopped! chat-ctx) - ;; Flush any pending assistant text once before processing multiple tool calls - (when-not (string/blank? @received-msgs*) - (add-to-history! {:role "assistant" :content [{:type :text :text @received-msgs*}]}) - (reset! received-msgs* "")) - (let [any-rejected-tool-call?* (atom nil)] - (run! (fn do-tool-call [{:keys [id name arguments] :as tool-call}] - (let [approved?* (promise) ; created here, stored in the state. - db @db* - hook-approved?* (atom true) - origin (tool-name->origin name all-tools) - server (tool-name->server name all-tools) - server-name (:name server) - approval (f.tools/approval all-tools name arguments db config behavior) - ask? (= :ask approval) - details (f.tools/tool-call-details-before-invocation name arguments server db ask?) - summary (f.tools/tool-call-summary all-tools name arguments config)] - ;; assert: In :preparing or :stopping or :cleanup - ;; Inform client the tool is about to run and store approval promise - (when-not (#{:stopping :cleanup} (:status (get-tool-call-state db chat-id id))) - (transition-tool-call! db* chat-ctx id :tool-run - {:approved?* approved?* - :future-cleanup-complete?* (promise) - :name name - :server server-name - :origin origin - :arguments arguments - :manual-approval ask? - :details details - :summary summary})) - ;; assert: In: :check-approval or :stopping or :cleanup or :rejected - (when-not (#{:stopping :cleanup :rejected} (:status (get-tool-call-state db chat-id id))) - (case approval - :ask (transition-tool-call! db* chat-ctx id :approval-ask - {:progress-text "Waiting for tool call approval"}) - :allow (transition-tool-call! db* chat-ctx id :approval-allow - {:reason {:code :user-config-allow - :text "Tool call allowed by user config"}}) - :deny (transition-tool-call! db* chat-ctx id :approval-deny - {:reason {:code :user-config-deny - :text "Tool call rejected by user config"}}) - (logger/warn logger-tag "Unknown value of approval in config" - {:approval approval :tool-call-id id}))) - (f.hooks/trigger-if-matches! :preToolCall - {:chat-id chat-id - :tool-name name - :server server-name - :arguments arguments} - {:on-before-action (partial notify-before-hook-action! chat-ctx) - :on-after-action (fn [result] - (when (= 2 (:status result)) - (transition-tool-call! db* chat-ctx id :hook-rejected - {:reason {:code :hook-rejected - :text (str "Tool call rejected by hook, output: " (:output result))}}) - (reset! hook-approved?* false)) - (notify-after-hook-action! chat-ctx result))} - db - config) - (if (and @approved?* @hook-approved?*) - ;; assert: In :execution-approved or :stopping or :cleanup - (when-not (#{:stopping :cleanup} (:status (get-tool-call-state @db* chat-id id))) - (assert-chat-not-stopped! chat-ctx) - (let [;; Since a future starts executing immediately, - ;; we need to delay the future so that the add-future action, - ;; used implicitly in the transition-tool-call! on the :execution-start event, - ;; can activate the future only *after* the state transition to :executing. - delayed-future - (delay - (future - ;; assert: In :executing - (let [result (f.tools/call-tool! name arguments chat-id id behavior db* config messenger metrics - (partial get-tool-call-state @db* chat-id id) - (partial transition-tool-call! db* chat-ctx id)) - details (f.tools/tool-call-details-after-invocation name arguments details result) - {:keys [start-time]} (get-tool-call-state @db* chat-id id) - total-time-ms (- (System/currentTimeMillis) start-time)] - (add-to-history! {:role "tool_call" - :content (assoc tool-call - :details details - :summary summary - :origin origin - :server server-name)}) - (add-to-history! {:role "tool_call_output" - :content (assoc tool-call - :error (:error result) - :output result - :total-time-ms total-time-ms - :details details - :summary summary - :origin origin - :server server-name)}) - ;; assert: In :executing or :stopping - (let [state (get-tool-call-state @db* chat-id id) - status (:status state)] - (case status - :executing (transition-tool-call! db* chat-ctx id :execution-end - {:origin origin - :name name - :server server-name - :arguments arguments - :error (:error result) - :outputs (:contents result) - :total-time-ms total-time-ms - :progress-text "Generating" - :details details - :summary summary}) - :stopping (transition-tool-call! db* chat-ctx id :stop-attempted - {:origin origin - :name name - :server server-name - :arguments arguments - :error (:error result) - :outputs (:contents result) - :total-time-ms total-time-ms - :reason :user-stop - :details details - :summary summary}) - (logger/warn logger-tag "Unexpected value of :status in tool call" {:status status}))))))] - (transition-tool-call! db* chat-ctx id :execution-start - {:delayed-future delayed-future - :origin origin - :name name - :server server-name - :arguments arguments - :start-time (System/currentTimeMillis) - :details details - :summary summary - :progress-text "Calling tool"}))) - ;; assert: In :rejected state - (let [tool-call-state (get-tool-call-state @db* chat-id id) - {:keys [code text]} (:decision-reason tool-call-state)] - (add-to-history! {:role "tool_call" :content tool-call}) - (add-to-history! {:role "tool_call_output" - :content (assoc tool-call :output {:error true - :contents [{:text text - :type :text}]})}) - (reset! any-rejected-tool-call?* code) - (transition-tool-call! db* chat-ctx id :send-reject - {:origin origin + ;; We spawn a new future to not block the lsp4clj thread + ;; in case a tool call approval is needed + (future* config + (llm-api/sync-or-async-prompt! + {:model model + :provider provider + :model-capabilities model-capabilities + :user-messages user-messages + :instructions instructions + :past-messages past-messages + :config config + :tools all-tools + :provider-auth provider-auth + :on-first-response-received (fn [& _] + (assert-chat-not-stopped! chat-ctx) + (doseq [message user-messages] + (add-to-history! message)) + (send-content! chat-ctx :system {:type :progress + :state :running + :text "Generating"})) + :on-usage-updated on-usage-updated + :on-message-received (fn [{:keys [type] :as msg}] + (assert-chat-not-stopped! chat-ctx) + (case type + :text (do + (swap! received-msgs* str (:text msg)) + (send-content! chat-ctx :assistant {:type :text + :text (:text msg)})) + :url (send-content! chat-ctx :assistant {:type :url + :title (:title msg) + :url (:url msg)}) + :limit-reached (do + (send-content! chat-ctx :system + {:type :text + :text (str "API limit reached. Tokens: " (json/generate-string (:tokens msg)))}) + + (finish-chat-prompt! :idle chat-ctx)) + :finish (do + (add-to-history! {:role "assistant" :content [{:type :text :text @received-msgs*}]}) + (finish-chat-prompt! :idle chat-ctx)))) + :on-prepare-tool-call (fn [{:keys [id name arguments-text]}] + (assert-chat-not-stopped! chat-ctx) + (transition-tool-call! db* chat-ctx id :tool-prepare + {:name name + :server (tool-name->server name all-tools) + :origin (tool-name->origin name all-tools) + :arguments-text arguments-text + :summary (f.tools/tool-call-summary all-tools name nil config)})) + :on-tools-called (fn [tool-calls] + ;; If there are multiple tool calls, they are allowed to execute concurrently. + (assert-chat-not-stopped! chat-ctx) + ;; Flush any pending assistant text once before processing multiple tool calls + (when-not (string/blank? @received-msgs*) + (add-to-history! {:role "assistant" :content [{:type :text :text @received-msgs*}]}) + (reset! received-msgs* "")) + (let [any-rejected-tool-call?* (atom nil)] + (run! (fn do-tool-call [{:keys [id name arguments] :as tool-call}] + (let [approved?* (promise) ; created here, stored in the state. + db @db* + hook-approved?* (atom true) + origin (tool-name->origin name all-tools) + server (tool-name->server name all-tools) + server-name (:name server) + approval (f.tools/approval all-tools name arguments db config behavior) + ask? (= :ask approval) + details (f.tools/tool-call-details-before-invocation name arguments server db ask?) + summary (f.tools/tool-call-summary all-tools name arguments config)] + ;; assert: In :preparing or :stopping or :cleanup + ;; Inform client the tool is about to run and store approval promise + (when-not (#{:stopping :cleanup} (:status (get-tool-call-state db chat-id id))) + (transition-tool-call! db* chat-ctx id :tool-run + {:approved?* approved?* + :future-cleanup-complete?* (promise) :name name :server server-name + :origin origin :arguments arguments - :reason code + :manual-approval ask? :details details - :summary summary}))))) - tool-calls) - (assert-chat-not-stopped! chat-ctx) - ;; assert: In :cleanup - ;; assert: Only those tool calls that have reached :executing have futures. - ;; Before we handle interrupts, we will wait for all tool calls with futures to complete naturally. - ;; Since a deref of a cancelled future *immediately* results in a CancellationException without waiting for the future to cleanup, - ;; we have to use another promise and deref that to know when the tool call is finished cleaning up. - (doseq [[tool-call-id state] (get-active-tool-calls @db* chat-id)] - (when-let [f (:future state)] - (try - (deref f) ; TODO: A timeout would be useful for tools that get into an infinite loop. - (catch java.util.concurrent.CancellationException _ - ;; The future was cancelled - ;; TODO: Why not just wait for the promise and not bother about the future? - ;; If future was cancelled, wait for the future's cleanup to finish. - (when-let [p (:future-cleanup-complete?* state)] - (logger/debug logger-tag "Caught CancellationException. Waiting for future to finish cleanup." + :summary summary})) + ;; assert: In: :check-approval or :stopping or :cleanup or :rejected + (when-not (#{:stopping :cleanup :rejected} (:status (get-tool-call-state db chat-id id))) + (case approval + :ask (transition-tool-call! db* chat-ctx id :approval-ask + {:progress-text "Waiting for tool call approval"}) + :allow (transition-tool-call! db* chat-ctx id :approval-allow + {:reason {:code :user-config-allow + :text "Tool call allowed by user config"}}) + :deny (transition-tool-call! db* chat-ctx id :approval-deny + {:reason {:code :user-config-deny + :text "Tool call rejected by user config"}}) + (logger/warn logger-tag "Unknown value of approval in config" + {:approval approval :tool-call-id id}))) + (f.hooks/trigger-if-matches! :preToolCall + {:chat-id chat-id + :tool-name name + :server server-name + :arguments arguments} + {:on-before-action (partial notify-before-hook-action! chat-ctx) + :on-after-action (fn [result] + (when (= 2 (:status result)) + (transition-tool-call! db* chat-ctx id :hook-rejected + {:reason {:code :hook-rejected + :text (str "Tool call rejected by hook, output: " (:output result))}}) + (reset! hook-approved?* false)) + (notify-after-hook-action! chat-ctx result))} + db + config) + (if (and @approved?* @hook-approved?*) + ;; assert: In :execution-approved or :stopping or :cleanup + (when-not (#{:stopping :cleanup} (:status (get-tool-call-state @db* chat-id id))) + (assert-chat-not-stopped! chat-ctx) + (let [;; Since a future starts executing immediately, + ;; we need to delay the future so that the add-future action, + ;; used implicitly in the transition-tool-call! on the :execution-start event, + ;; can activate the future only *after* the state transition to :executing. + delayed-future + (delay + (future + ;; assert: In :executing + (let [result (f.tools/call-tool! name arguments chat-id id behavior db* config messenger metrics + (partial get-tool-call-state @db* chat-id id) + (partial transition-tool-call! db* chat-ctx id)) + details (f.tools/tool-call-details-after-invocation name arguments details result) + {:keys [start-time]} (get-tool-call-state @db* chat-id id) + total-time-ms (- (System/currentTimeMillis) start-time)] + (add-to-history! {:role "tool_call" + :content (assoc tool-call + :details details + :summary summary + :origin origin + :server server-name)}) + (add-to-history! {:role "tool_call_output" + :content (assoc tool-call + :error (:error result) + :output result + :total-time-ms total-time-ms + :details details + :summary summary + :origin origin + :server server-name)}) + ;; assert: In :executing or :stopping + (let [state (get-tool-call-state @db* chat-id id) + status (:status state)] + (case status + :executing (transition-tool-call! db* chat-ctx id :execution-end + {:origin origin + :name name + :server server-name + :arguments arguments + :error (:error result) + :outputs (:contents result) + :total-time-ms total-time-ms + :progress-text "Generating" + :details details + :summary summary}) + :stopping (transition-tool-call! db* chat-ctx id :stop-attempted + {:origin origin + :name name + :server server-name + :arguments arguments + :error (:error result) + :outputs (:contents result) + :total-time-ms total-time-ms + :reason :user-stop + :details details + :summary summary}) + (logger/warn logger-tag "Unexpected value of :status in tool call" {:status status}))))))] + (transition-tool-call! db* chat-ctx id :execution-start + {:delayed-future delayed-future + :origin origin + :name name + :server server-name + :arguments arguments + :start-time (System/currentTimeMillis) + :details details + :summary summary + :progress-text "Calling tool"}))) + ;; assert: In :rejected state + (let [tool-call-state (get-tool-call-state @db* chat-id id) + {:keys [code text]} (:decision-reason tool-call-state)] + (add-to-history! {:role "tool_call" :content tool-call}) + (add-to-history! {:role "tool_call_output" + :content (assoc tool-call :output {:error true + :contents [{:text text + :type :text}]})}) + (reset! any-rejected-tool-call?* code) + (transition-tool-call! db* chat-ctx id :send-reject + {:origin origin + :name name + :server server-name + :arguments arguments + :reason code + :details details + :summary summary}))))) + tool-calls) + (assert-chat-not-stopped! chat-ctx) + ;; assert: In :cleanup + ;; assert: Only those tool calls that have reached :executing have futures. + ;; Before we handle interrupts, we will wait for all tool calls with futures to complete naturally. + ;; Since a deref of a cancelled future *immediately* results in a CancellationException without waiting for the future to cleanup, + ;; we have to use another promise and deref that to know when the tool call is finished cleaning up. + (doseq [[tool-call-id state] (get-active-tool-calls @db* chat-id)] + (when-let [f (:future state)] + (try + (deref f) ; TODO: A timeout would be useful for tools that get into an infinite loop. + (catch java.util.concurrent.CancellationException _ + ;; The future was cancelled + ;; TODO: Why not just wait for the promise and not bother about the future? + ;; If future was cancelled, wait for the future's cleanup to finish. + (when-let [p (:future-cleanup-complete?* state)] + (logger/debug logger-tag "Caught CancellationException. Waiting for future to finish cleanup." + {:tool-call-id tool-call-id + :promise p}) + (deref p) ; TODO: May need a timeout here too, in case the tool does not clean up. + )) + (catch Throwable t + (logger/debug logger-tag "Ignoring a Throwable while deref'ing a tool call future" {:tool-call-id tool-call-id - :promise p}) - (deref p) ; TODO: May need a timeout here too, in case the tool does not clean up. - )) - (catch Throwable t - (logger/debug logger-tag "Ignoring a Throwable while deref'ing a tool call future" - {:tool-call-id tool-call-id - :ex-data (ex-data t) - :message (.getMessage t) - :cause (.getCause t)})) - (finally (try - (transition-tool-call! db* chat-ctx tool-call-id :cleanup-finished - {:name name}) - (catch Throwable t - (logger/debug logger-tag "Ignoring an exception while finishing tool call" - {:tool-call-id tool-call-id - :ex-data (ex-data t) - :message (.getMessage t) - :cause (.getCause t)}))))))) - (if-let [reason-code @any-rejected-tool-call?*] - (do - (if (= :hook-rejected reason-code) - (do - (send-content! chat-ctx :system - {:type :text - :text "Tool rejected by hook"}) - (add-to-history! {:role "user" :content [{:type :text - :text "A user hook rejected one or more tool calls with the following reason"}]})) - (do - (send-content! chat-ctx :system - {:type :text - :text "Tell ECA what to do differently for the rejected tool(s)"}) - (add-to-history! {:role "user" :content [{:type :text - :text "I rejected one or more tool calls with the following reason"}]}))) - (finish-chat-prompt! :idle chat-ctx) - nil) - {:new-messages (get-in @db* [:chats chat-id :messages])}))) - :on-reason (fn [{:keys [status id text external-id]}] - (assert-chat-not-stopped! chat-ctx) - (case status - :started (do - (swap! reasonings* assoc-in [id :start-time] (System/currentTimeMillis)) - (send-content! chat-ctx :assistant - {:type :reasonStarted - :id id})) - :thinking (do - (swap! reasonings* update-in [id :text] str text) - (send-content! chat-ctx :assistant - {:type :reasonText - :id id - :text text})) - :finished (let [total-time-ms (- (System/currentTimeMillis) (get-in @reasonings* [id :start-time]))] - (add-to-history! {:role "reason" :content {:id id - :external-id external-id - :total-time-ms total-time-ms - :text (get-in @reasonings* [id :text])}}) - (send-content! chat-ctx :assistant - {:type :reasonFinished - :total-time-ms total-time-ms - :id id})) - nil)) - :on-error (fn [{:keys [message exception]}] - (send-content! chat-ctx :system - {:type :text - :text (or message (str "Error: " (ex-message exception)))}) - (finish-chat-prompt! :idle chat-ctx))}))) + :ex-data (ex-data t) + :message (.getMessage t) + :cause (.getCause t)})) + (finally (try + (transition-tool-call! db* chat-ctx tool-call-id :cleanup-finished + {:name name}) + (catch Throwable t + (logger/debug logger-tag "Ignoring an exception while finishing tool call" + {:tool-call-id tool-call-id + :ex-data (ex-data t) + :message (.getMessage t) + :cause (.getCause t)}))))))) + (if-let [reason-code @any-rejected-tool-call?*] + (do + (if (= :hook-rejected reason-code) + (do + (send-content! chat-ctx :system + {:type :text + :text "Tool rejected by hook"}) + (add-to-history! {:role "user" :content [{:type :text + :text "A user hook rejected one or more tool calls with the following reason"}]})) + (do + (send-content! chat-ctx :system + {:type :text + :text "Tell ECA what to do differently for the rejected tool(s)"}) + (add-to-history! {:role "user" :content [{:type :text + :text "I rejected one or more tool calls with the following reason"}]}))) + (finish-chat-prompt! :idle chat-ctx) + nil) + {:new-messages (get-in @db* [:chats chat-id :messages])}))) + :on-reason (fn [{:keys [status id text external-id]}] + (assert-chat-not-stopped! chat-ctx) + (case status + :started (do + (swap! reasonings* assoc-in [id :start-time] (System/currentTimeMillis)) + (send-content! chat-ctx :assistant + {:type :reasonStarted + :id id})) + :thinking (do + (swap! reasonings* update-in [id :text] str text) + (send-content! chat-ctx :assistant + {:type :reasonText + :id id + :text text})) + :finished (let [total-time-ms (- (System/currentTimeMillis) (get-in @reasonings* [id :start-time]))] + (add-to-history! {:role "reason" :content {:id id + :external-id external-id + :total-time-ms total-time-ms + :text (get-in @reasonings* [id :text])}}) + (send-content! chat-ctx :assistant + {:type :reasonFinished + :total-time-ms total-time-ms + :id id})) + nil)) + :on-error (fn [{:keys [message exception]}] + (send-content! chat-ctx :system + {:type :text + :text (or message (str "Error: " (ex-message exception)))}) + (finish-chat-prompt! :idle chat-ctx))})))) (defn ^:private send-mcp-prompt! [{:keys [prompt args]} diff --git a/src/eca/llm_api.clj b/src/eca/llm_api.clj index f4d40170..79eff422 100644 --- a/src/eca/llm_api.clj +++ b/src/eca/llm_api.clj @@ -93,140 +93,138 @@ :on-tools-called on-tools-called :on-reason on-reason :on-usage-updated on-usage-updated})] - ;; We spawn a new future to not block the lsp4clj thread - ;; in case a tool call approval is needed - (future - (try - (when-not api-url (throw (ex-info (format "API url not found.\nMake sure you have provider '%s' configured properly." provider) {}))) - (cond - (= "openai" provider) - (llm-providers.openai/create-response! - {:model real-model - :instructions instructions - :user-messages user-messages - :max-output-tokens max-output-tokens - :reason? reason? - :supports-image? supports-image? - :past-messages past-messages - :tools tools - :web-search web-search - :extra-payload (merge {:parallel_tool_calls true} - extra-payload) - :api-url api-url - :api-key api-key - :auth-type auth-type} - callbacks) + (try + (when-not api-url (throw (ex-info (format "API url not found.\nMake sure you have provider '%s' configured properly." provider) {}))) + (cond + (= "openai" provider) + (llm-providers.openai/create-response! + {:model real-model + :instructions instructions + :user-messages user-messages + :max-output-tokens max-output-tokens + :reason? reason? + :supports-image? supports-image? + :past-messages past-messages + :tools tools + :web-search web-search + :extra-payload (merge {:parallel_tool_calls true} + extra-payload) + :api-url api-url + :api-key api-key + :auth-type auth-type} + callbacks) - (= "anthropic" provider) - (llm-providers.anthropic/chat! - {:model real-model - :instructions instructions - :user-messages user-messages - :max-output-tokens max-output-tokens - :reason? reason? - :supports-image? supports-image? - :past-messages past-messages - :tools tools - :web-search web-search - :extra-payload extra-payload - :api-url api-url - :api-key api-key - :auth-type auth-type} - callbacks) + (= "anthropic" provider) + (llm-providers.anthropic/chat! + {:model real-model + :instructions instructions + :user-messages user-messages + :max-output-tokens max-output-tokens + :reason? reason? + :supports-image? supports-image? + :past-messages past-messages + :tools tools + :web-search web-search + :extra-payload extra-payload + :api-url api-url + :api-key api-key + :auth-type auth-type} + callbacks) - (= "github-copilot" provider) - (llm-providers.openai-chat/chat-completion! - {:model real-model - :instructions instructions - :user-messages user-messages - :max-output-tokens max-output-tokens - :reason? reason? - :supports-image? supports-image? - :past-messages past-messages - :tools tools - :extra-payload (merge {:parallel_tool_calls true} - extra-payload) - :api-url api-url - :api-key api-key - :extra-headers {"openai-intent" "conversation-panel" - "x-request-id" (str (random-uuid)) - "vscode-sessionid" "" - "vscode-machineid" "" - "Copilot-Vision-Request" "true" - "copilot-integration-id" "vscode-chat"}} - callbacks) + (= "github-copilot" provider) + (llm-providers.openai-chat/chat-completion! + {:model real-model + :instructions instructions + :user-messages user-messages + :max-output-tokens max-output-tokens + :reason? reason? + :supports-image? supports-image? + :past-messages past-messages + :tools tools + :extra-payload (merge {:parallel_tool_calls true} + extra-payload) + :api-url api-url + :api-key api-key + :extra-headers {"openai-intent" "conversation-panel" + "x-request-id" (str (random-uuid)) + "vscode-sessionid" "" + "vscode-machineid" "" + "Copilot-Vision-Request" "true" + "copilot-integration-id" "vscode-chat"}} + callbacks) + + (= "google" provider) + (llm-providers.openai-chat/chat-completion! + {:model real-model + :instructions instructions + :user-messages user-messages + :max-output-tokens max-output-tokens + :reason? reason? + :supports-image? supports-image? + :past-messages past-messages + :tools tools + :thinking-tag "thought" + :extra-payload (merge {:parallel_tool_calls false} + (when reason? + {:extra_body {:google {:thinking_config {:include_thoughts true}}}}) + extra-payload) + :api-url api-url + :api-key api-key} + callbacks) + + (= "ollama" provider) + (llm-providers.ollama/chat! + {:api-url api-url + :reason? (:reason? model-capabilities) + :supports-image? supports-image? + :model real-model + :instructions instructions + :user-messages user-messages + :past-messages past-messages + :tools tools + :extra-payload extra-payload} + callbacks) - (= "google" provider) - (llm-providers.openai-chat/chat-completion! + model-config + (let [provider-fn (case (:api provider-config) + ("openai-responses" + "openai") llm-providers.openai/create-response! + "anthropic" llm-providers.anthropic/chat! + "openai-chat" llm-providers.openai-chat/chat-completion! + (on-error {:message (format "Unknown model %s for provider %s" (:api provider-config) provider)})) + url-relative-path (:completionUrlRelativePath provider-config)] + (provider-fn {:model real-model :instructions instructions :user-messages user-messages :max-output-tokens max-output-tokens + :web-search web-search :reason? reason? :supports-image? supports-image? :past-messages past-messages :tools tools - :thinking-tag "thought" - :extra-payload (merge {:parallel_tool_calls false} - (when reason? - {:extra_body {:google {:thinking_config {:include_thoughts true}}}}) - extra-payload) + :extra-payload extra-payload + :url-relative-path url-relative-path :api-url api-url :api-key api-key} - callbacks) + callbacks)) - (= "ollama" provider) - (llm-providers.ollama/chat! - {:api-url api-url - :reason? (:reason? model-capabilities) - :supports-image? supports-image? - :model real-model - :instructions instructions - :user-messages user-messages - :past-messages past-messages - :tools tools - :extra-payload extra-payload} - callbacks) - - model-config - (let [provider-fn (case (:api provider-config) - ("openai-responses" - "openai") llm-providers.openai/create-response! - "anthropic" llm-providers.anthropic/chat! - "openai-chat" llm-providers.openai-chat/chat-completion! - (on-error {:message (format "Unknown model %s for provider %s" (:api provider-config) provider)})) - url-relative-path (:completionUrlRelativePath provider-config)] - (provider-fn - {:model real-model - :instructions instructions - :user-messages user-messages - :max-output-tokens max-output-tokens - :web-search web-search - :reason? reason? - :supports-image? supports-image? - :past-messages past-messages - :tools tools - :extra-payload extra-payload - :url-relative-path url-relative-path - :api-url api-url - :api-key api-key} - callbacks)) + :else + (on-error {:message (format "ECA Unsupported model %s for provider %s" real-model provider)})) + (catch Exception e + (on-error {:exception e}))))) - :else - (on-error {:message (format "ECA Unsupported model %s for provider %s" real-model provider)})) - (catch Exception e - (on-error {:exception e})))))) - -(defn async-prompt! [{:keys [provider model model-capabilities instructions user-messages config on-first-response-received - on-message-received on-error on-prepare-tool-call on-tools-called on-reason on-usage-updated - past-messages tools provider-auth] - :or {on-first-response-received identity - on-message-received identity - on-error identity - on-prepare-tool-call identity - on-tools-called identity - on-reason identity - on-usage-updated identity}}] +(defn sync-or-async-prompt! + [{:keys [provider model model-capabilities instructions user-messages config on-first-response-received + on-message-received on-error on-prepare-tool-call on-tools-called on-reason on-usage-updated + past-messages tools provider-auth] + :or {on-first-response-received identity + on-message-received identity + on-error identity + on-prepare-tool-call identity + on-tools-called identity + on-reason identity + on-usage-updated identity}}] (let [first-response-received* (atom false) emit-first-message-fn (fn [& args] (when-not @first-response-received* @@ -244,38 +242,73 @@ on-error-wrapper (fn [{:keys [exception] :as args}] (when-not (:silent? (ex-data exception)) (logger/error args) - (on-error args)))] - (prompt! - {:sync? false - :provider provider - :model model - :model-capabilities model-capabilities - :instructions instructions - :tools tools - :provider-auth provider-auth - :past-messages past-messages - :user-messages user-messages - :on-message-received on-message-received-wrapper - :on-prepare-tool-call on-prepare-tool-call-wrapper - :on-tools-called on-tools-called - :on-usage-updated on-usage-updated - :on-reason on-reason-wrapper - :on-error on-error-wrapper - :config config}))) + (on-error args))) + provider-config (get-in config [:providers provider]) + model-config (get-in provider-config [:models model]) + extra-payload (:extraPayload model-config) + stream? (if (not (nil? (:stream extra-payload))) + (:stream extra-payload) + true)] + (if (not stream?) + (loop [result (prompt! + {:sync? true + :provider provider + :model model + :model-capabilities model-capabilities + :instructions instructions + :tools tools + :provider-auth provider-auth + :past-messages past-messages + :user-messages user-messages + :on-error on-error-wrapper + :config config})] + (let [{:keys [error output-text reason-text tools-to-call call-tools-fn reason-id usage]} result] + (if error + (on-error-wrapper error) + (do + (when reason-text + (on-reason-wrapper {:status :started :id reason-id}) + (on-reason-wrapper {:status :thinking :id reason-id :text reason-text}) + (on-reason-wrapper {:status :finished :id reason-id})) + (on-message-received-wrapper {:type :text :text output-text}) + (some-> usage (on-usage-updated)) + (if-let [new-result (when (seq tools-to-call) + (doseq [tool-to-call tools-to-call] + (on-prepare-tool-call tool-to-call)) + (call-tools-fn on-tools-called))] + (recur new-result) + (on-message-received-wrapper {:type :finish :finish-reason "stop"})))))) + (prompt! + {:sync? false + :provider provider + :model model + :model-capabilities model-capabilities + :instructions instructions + :tools tools + :provider-auth provider-auth + :past-messages past-messages + :user-messages user-messages + :on-message-received on-message-received-wrapper + :on-prepare-tool-call on-prepare-tool-call-wrapper + :on-tools-called on-tools-called + :on-usage-updated on-usage-updated + :on-reason on-reason-wrapper + :on-error on-error-wrapper + :config config})))) (defn sync-prompt! [{:keys [provider model model-capabilities instructions prompt past-messages user-messages config tools provider-auth]}] - @(prompt! - {:sync? true - :provider provider - :model model - :model-capabilities model-capabilities - :instructions instructions - :tools tools - :provider-auth provider-auth - :past-messages past-messages - :user-messages (or user-messages - [{:role "user" :content [{:type :text :text prompt}]}]) - :config config - :on-error (fn [error] {:error error})})) + (prompt! + {:sync? true + :provider provider + :model model + :model-capabilities model-capabilities + :instructions instructions + :tools tools + :provider-auth provider-auth + :past-messages past-messages + :user-messages (or user-messages + [{:role "user" :content [{:type :text :text prompt}]}]) + :config config + :on-error (fn [error] {:error error})})) diff --git a/src/eca/llm_providers/anthropic.clj b/src/eca/llm_providers/anthropic.clj index 92ece6b6..fe2c5f65 100644 --- a/src/eca/llm_providers/anthropic.clj +++ b/src/eca/llm_providers/anthropic.clj @@ -103,7 +103,7 @@ (do (llm-util/log-response logger-tag rid "response" body) (reset! response* - {:result (:text (last (:content body)))})))) + {:output-text (:text (last (:content body)))})))) (catch Exception e (on-error {:exception e})))) (fn [e] diff --git a/src/eca/llm_providers/ollama.clj b/src/eca/llm_providers/ollama.clj index 4f25bd5d..3aeb6861 100644 --- a/src/eca/llm_providers/ollama.clj +++ b/src/eca/llm_providers/ollama.clj @@ -81,7 +81,7 @@ (do (llm-util/log-response logger-tag rid "response" body) (reset! response* - {:result (:content (:message body))})))) + {:output-text (:content (:message body))})))) (catch Exception e (on-error {:exception e})))) (fn [e] diff --git a/src/eca/llm_providers/openai.clj b/src/eca/llm_providers/openai.clj index dabf80ce..7a137daa 100644 --- a/src/eca/llm_providers/openai.clj +++ b/src/eca/llm_providers/openai.clj @@ -65,10 +65,10 @@ (do (llm-util/log-response logger-tag rid "response" body) (reset! response* - {:result (reduce - #(str %1 (:text %2)) - "" - (:content (last (:output body))))})))) + {:output-text (reduce + #(str %1 (:text %2)) + "" + (:content (last (:output body))))})))) (catch Exception e (on-error {:exception e})))) (fn [e] diff --git a/src/eca/llm_providers/openai_chat.clj b/src/eca/llm_providers/openai_chat.clj index 540d7dcb..11a57ea3 100644 --- a/src/eca/llm_providers/openai_chat.clj +++ b/src/eca/llm_providers/openai_chat.clj @@ -14,6 +14,14 @@ (def ^:private chat-completions-path "/chat/completions") +(defn ^:private parse-usage [usage] + (let [input-cache-read-tokens (-> usage :prompt_tokens_details :cached_tokens)] + {:input-tokens (if input-cache-read-tokens + (- (:prompt_tokens usage) input-cache-read-tokens) + (:prompt_tokens usage)) + :output-tokens (:completion_tokens usage) + :input-cache-read-tokens input-cache-read-tokens})) + (defn ^:private extract-content "Extract text content from various message content formats. Handles: strings (legacy eca), nested arrays from chat.clj, and fallback." @@ -57,15 +65,35 @@ :function (select-keys tool [:name :description :parameters])}) tools)) -(defn ^:private base-chat-request! [{:keys [rid extra-headers body url-relative-path api-url api-key on-error on-stream]}] +(defn ^:private response-body->result [body on-tools-called-wrapper] + (let [tools-to-call (->> (:choices body) + (mapcat (comp :tool_calls :message)) + (map (fn [tool-call] + {:id (:id tool-call) + :name (:name (:function tool-call)) + :arguments (json/parse-string (:arguments (:function tool-call)))})))] + {:usage (parse-usage (:usage body)) + :reason-id (str (random-uuid)) + :tools-to-call tools-to-call + :call-tools-fn (fn [on-tools-called] + (on-tools-called-wrapper tools-to-call on-tools-called nil)) + :reason-text (->> (:choices body) + (map (comp :reasoning :message)) + (string/join "\n") + not-empty) + :output-text (->> (:choices body) + (map (comp :content :message)) + (string/join "\n") + not-empty)})) + +(defn ^:private base-chat-request! + [{:keys [rid extra-headers body url-relative-path api-url api-key on-error on-stream on-tools-called-wrapper]}] (let [url (str api-url (or url-relative-path chat-completions-path)) - response* (atom nil) on-error (if on-stream on-error (fn [error-data] - (llm-util/log-response logger-tag rid "response-error" body) - (reset! response* error-data)))] - + (llm-util/log-response logger-tag rid "response-error" error-data) + {:error error-data}))] (llm-util/log-request logger-tag rid url body) @(http/post url @@ -90,17 +118,15 @@ (on-stream "stream-end" {})) (do (llm-util/log-response logger-tag rid "full-response" body) - (reset! response* - {:result (:content (:message (last (:choices body))))})))) + (response-body->result body on-tools-called-wrapper)))) (catch Exception e (on-error {:exception e})))) (fn [e] - (on-error {:exception e}))) - @response*)) + (on-error {:exception e}))))) (defn ^:private transform-message "Transform a single ECA message to OpenAI format. Returns nil for unsupported roles." - [{:keys [role content] :as _msg} supports-image? thinking-start-block thinking-end-block] + [{:keys [role content] :as _msg} supports-image? thinking-start-tag thinking-end-tag] (case role "tool_call" {:type :tool-call ; Special marker for accumulation :data {:id (:id content) @@ -114,7 +140,7 @@ :content (extract-content content supports-image?)} "reason" {:role "assistant" :content [{:type "text" - :text (str thinking-start-block (:text content) thinking-end-block)}]} + :text (str thinking-start-tag (:text content) thinking-end-tag)}]} "assistant" {:role "assistant" :content (extract-content content supports-image?)} "system" {:role "system" @@ -167,18 +193,16 @@ 'assistant' role message, not as separate messages. This function ensures compliance with that requirement by accumulating tool calls and flushing them into assistant messages when a non-tool_call message is encountered." - [messages supports-image? thinking-start-block thinking-end-block] + [messages supports-image? thinking-start-tag thinking-end-tag] (->> messages - (map #(transform-message % supports-image? thinking-start-block thinking-end-block)) + (map #(transform-message % supports-image? thinking-start-tag thinking-end-tag)) (remove nil?) accumulate-tool-calls (filter valid-message?))) (defn ^:private execute-accumulated-tools! - [{:keys [tool-calls-atom instructions extra-headers body api-url api-key url-relative-path - on-tools-called on-error handle-response supports-image? - thinking-start-block thinking-end-block]}] - (let [all-accumulated (vals @tool-calls-atom) + [tool-calls* on-tools-called-wrapper on-tools-called handle-response] + (let [all-accumulated (vals @tool-calls*) completed-tools (->> all-accumulated (filter #(every? % [:id :name :arguments-text])) (map (fn [{:keys [arguments-text name] :as tool-call}] @@ -193,21 +217,7 @@ valid-tools (remove :parse-error completed-tools)] (if (seq completed-tools) ;; We have some completed tools (valid or with errors), so continue the conversation - (when-let [{:keys [new-messages]} (on-tools-called valid-tools)] - (let [new-messages-list (vec (concat - (when instructions [{:role "system" :content instructions}]) - (normalize-messages new-messages supports-image? thinking-start-block thinking-end-block)))] - (reset! tool-calls-atom {}) - (let [new-rid (llm-util/gen-rid)] - (base-chat-request! - {:rid new-rid - :body (assoc body :messages new-messages-list) - :extra-headers extra-headers - :api-url api-url - :api-key api-key - :url-relative-path url-relative-path - :on-error on-error - :on-stream (fn [event data] (handle-response event data tool-calls-atom new-rid))})))) + (on-tools-called-wrapper valid-tools on-tools-called handle-response) ;; No completed tools at all - let the streaming response provide the actual finish_reason nil))) @@ -297,9 +307,9 @@ (assoc-some {:model model :messages messages - :temperature temperature :stream stream? :max_completion_tokens 32000} + :temperature temperature :parallel_tool_calls (:parallel_tool_calls extra-payload) :tools (when (seq tools) (->tools tools))) extra-payload) @@ -318,7 +328,24 @@ reasoning-type* (atom nil) ;; Incremental parser buffer for content to detect thinking tags across chunks content-buffer* (atom "") - handle-response (fn handle-response [event data tool-calls-atom rid] + on-tools-called-wrapper (fn on-tools-called-wrapper [tools-to-call on-tools-called handle-response] + (when-let [{:keys [new-messages]} (on-tools-called tools-to-call)] + (let [new-messages-list (vec (concat + (when instructions [{:role "system" :content instructions}]) + (normalize-messages new-messages supports-image? thinking-start-tag thinking-end-tag))) + new-rid (llm-util/gen-rid)] + (reset! tool-calls* {}) + (base-chat-request! + {:rid new-rid + :body (assoc body :messages new-messages-list) + :on-tools-called-wrapper on-tools-called-wrapper + :extra-headers extra-headers + :api-url api-url + :api-key api-key + :url-relative-path url-relative-path + :on-error on-error + :on-stream (when stream? (fn [event data] (handle-response event data tool-calls* (llm-util/gen-rid))))})))) + handle-response (fn handle-response [event data tool-calls* rid] (if (= event "stream-end") (do ;; Flush any leftover buffered content and finish reasoning if needed @@ -332,20 +359,7 @@ (on-reason {:status :finished :id @current-reason-id*}) (reset! reasoning-started* false) (reset! reasoning-type* nil)) - (execute-accumulated-tools! - {:tool-calls-atom tool-calls-atom - :instructions instructions - :supports-image? supports-image? - :extra-headers extra-headers - :body body - :api-url api-url - :api-key api-key - :url-relative-path url-relative-path - :on-tools-called on-tools-called - :on-error on-error - :thinking-start-block thinking-start-tag - :thinking-end-block thinking-end-tag - :handle-response handle-response})) + (execute-accumulated-tools! tool-calls* on-tools-called-wrapper on-tools-called handle-response)) (when (seq (:choices data)) (doseq [choice (:choices data)] (let [delta (:delta choice) @@ -398,13 +412,13 @@ unique-id (when id (str rid "-" id))] (when (and name unique-id) (on-prepare-tool-call {:id unique-id :name name :arguments-text ""})) - (swap! tool-calls-atom update tool-key + (swap! tool-calls* update tool-key (fn [existing] (cond-> (or existing {:index index}) unique-id (assoc :id unique-id) name (assoc :name name) args (update :arguments-text (fnil str "") args)))) - (when-let [updated-tool-call (get @tool-calls-atom tool-key)] + (when-let [updated-tool-call (get @tool-calls* tool-key)] (when (and (:id updated-tool-call) (:name updated-tool-call) args) @@ -427,12 +441,7 @@ (when (not= finish-reason "tool_calls") (on-message-received {:type :finish :finish-reason finish-reason}))))))) (when-let [usage (:usage data)] - (on-usage-updated (let [input-cache-read-tokens (-> usage :prompt_tokens_details :cached_tokens)] - {:input-tokens (if input-cache-read-tokens - (- (:prompt_tokens usage) input-cache-read-tokens) - (:prompt_tokens usage)) - :output-tokens (:completion_tokens usage) - :input-cache-read-tokens input-cache-read-tokens})))) + (on-usage-updated (parse-usage usage)))) rid (llm-util/gen-rid)] (base-chat-request! {:rid rid @@ -442,5 +451,7 @@ :api-key api-key :url-relative-path url-relative-path :tool-calls* tool-calls* + :on-tools-called-wrapper on-tools-called-wrapper :on-error on-error - :on-stream (when stream? (fn [event data] (handle-response event data tool-calls* rid)))}))) + :on-stream (when stream? + (fn [event data] (handle-response event data tool-calls* rid)))}))) diff --git a/src/eca/shared.clj b/src/eca/shared.clj index 2ff688cb..15aa8f8e 100644 --- a/src/eca/shared.clj +++ b/src/eca/shared.clj @@ -184,8 +184,18 @@ (DateTimeFormatter/ofPattern pattern)))) (defmacro future* - "Wrapper for future unless in tests" + "Wrapper for future unless in tests. In non-test envs we spawn a Thread and + return a promise (derefable) to avoid relying on clojure.core/future which + can behave differently in some REPL tooling environments." [config & body] `(if (= "test" (:env ~config)) ~@body - (future ~@body))) + (let [p# (promise) + t# (Thread. (fn [] + (try + (deliver p# (do ~@body)) + (catch Throwable e# + ;; deliver the Throwable so deref can inspect it if needed + (deliver p# e#)))))] + (.start t#) + p#))) diff --git a/test/eca/features/chat_test.clj b/test/eca/features/chat_test.clj index 81af3aeb..01bae0d1 100644 --- a/test/eca/features/chat_test.clj +++ b/test/eca/features/chat_test.clj @@ -15,9 +15,11 @@ (defn ^:private prompt! [params mocks] (let [{:keys [chat-id] :as resp} - (with-redefs [llm-api/async-prompt! (:api-mock mocks) + (with-redefs [llm-api/sync-or-async-prompt! (:api-mock mocks) + llm-api/sync-prompt! (constantly nil) f.tools/call-tool! (:call-tool-mock mocks) f.tools/approval (constantly :allow)] + (h/config! {:env "test"}) (f.chat/prompt params (h/db*) (h/messenger) (h/config) (h/metrics)))] (is (match? {:chat-id string? :status :prompting} resp)) {:chat-id chat-id})) diff --git a/test/eca/shared_test.clj b/test/eca/shared_test.clj index 73ed8bb0..ef46039b 100644 --- a/test/eca/shared_test.clj +++ b/test/eca/shared_test.clj @@ -91,3 +91,13 @@ (is (= {:a {:b 1}} (shared/deep-merge {:a 2} {:a {:b 1}}))))) + +(deftest future*-test + (testing "on test env we run on same thread" + (is (= 1 + (shared/future* {:env "test"} + 1)))) + (testing "on prod env we run a normal future" + (is (= 1 + @(shared/future* {:env "prod"} + 1)))))