Skip to content

Commit a97648d

Browse files
committed
Improve LLM streaming response handler.
1 parent ececc90 commit a97648d

File tree

5 files changed

+74
-21
lines changed

5 files changed

+74
-21
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
- Support custom API urls for OpenAI and Anthropic
99
- Add `--log-level <level>` option for better debugging.
1010
- Add support for global config file.
11+
- Improve MCP response handling.
12+
- Improve LLM streaming response handler.
1113

1214
## 0.0.3
1315

src/eca/features/mcp.clj

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,14 @@
7474
(defn call-tool! [^String name ^Map arguments db]
7575
(let [result (.callTool ^McpSyncClient (get-in db [:mcp-tools name :mcp-client])
7676
(McpSchema$CallToolRequest. name arguments))]
77-
(if (.isError result)
78-
{:error (.content result)}
79-
{:contents (map (fn [content]
80-
(case (.type ^McpSchema$Content content)
81-
"text" {:type :text
82-
:content (.text ^McpSchema$TextContent content)}
83-
nil))
84-
(.content result))})))
77+
(logger/debug logger-tag "ToolCall result: " result)
78+
{:contents (map (fn [content]
79+
(case (.type ^McpSchema$Content content)
80+
"text" {:type :text
81+
:error (.isError result)
82+
:content (.text ^McpSchema$TextContent content)}
83+
nil))
84+
(.content result))}))
8585

8686
(defn shutdown! [db*]
8787
(doseq [[_name {:keys [_client]}] (:mcp-clients @db*)]

src/eca/llm_providers/openai.clj

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
(def ^:private logger-tag "[OPENAI]")
1212

13-
(def ^:private openai-url "https://api.openai.com/")
13+
(def ^:private openai-url "https://api.openai.com")
1414
(def ^:private responses-path "/v1/responses")
1515

1616
(defn ^:private url [path]
@@ -21,13 +21,15 @@
2121

2222
(defn ^:private base-completion-request! [{:keys [body api-key on-error on-response]}]
2323
(let [api-key (or api-key
24-
(System/getenv "OPENAI_API_KEY"))]
25-
(logger/debug logger-tag (format "Sending input: '%s' instructions: '%s' tools: '%s'"
24+
(System/getenv "OPENAI_API_KEY"))
25+
url (url responses-path)]
26+
(logger/debug logger-tag (format "Sending input: '%s' instructions: '%s' tools: '%s' url: '%s'"
2627
(:input body)
2728
(:instructions body)
28-
(:tools body)))
29+
(:tools body)
30+
url))
2931
(http/post
30-
(url responses-path)
32+
url
3133
{:headers {"Authorization" (str "Bearer " api-key)
3234
"Content-Type" "application/json"}
3335
:body (json/generate-string body)

src/eca/llm_util.clj

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,25 @@
77
[java.io BufferedReader]))
88

99
(defn event-data-seq [^BufferedReader rdr]
10-
(when-let [event (.readLine rdr)]
11-
(when (string/starts-with? event "event:")
12-
(when-let [data (.readLine rdr)]
13-
(.readLine rdr) ;; blank line
14-
(when (string/starts-with? data "data:")
15-
(cons [(subs event 7)
16-
(json/parse-string (subs data 6) true)]
17-
(lazy-seq (event-data-seq rdr))))))))
10+
(letfn [(next-group []
11+
(loop [event-line nil]
12+
(let [line (.readLine rdr)]
13+
(cond
14+
(nil? line) nil ; EOF
15+
(string/blank? line) (recur event-line) ; skip blank lines
16+
(string/starts-with? line "event:") (recur line)
17+
(string/starts-with? line "data:")
18+
(let [data-str (subs line 6)]
19+
(if (= data-str "[DONE]")
20+
(recur event-line) ; skip [DONE]
21+
(let [event-type (if event-line
22+
(subs event-line 7)
23+
(-> (json/parse-string data-str true)
24+
:type))]
25+
(cons [event-type (json/parse-string data-str true)]
26+
(lazy-seq (next-group))))))
27+
:else (recur event-line)))))]
28+
(next-group)))
1829

1930
(defn log-response [tag event data]
2031
(logger/debug tag event data))

test/eca/llm_util_test.clj

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
(ns eca.llm-util-test
2+
(:require
3+
[clojure.java.io :as io]
4+
[clojure.test :refer [deftest is testing]]
5+
[eca.llm-util :as llm-util]
6+
[matcher-combinators.test :refer [match?]])
7+
(:import
8+
[java.io ByteArrayInputStream]))
9+
10+
(deftest event-data-seq-test
11+
(testing "when there is a event line and another data line"
12+
(with-open [r (io/reader (ByteArrayInputStream. (.getBytes (str "event: foo.bar\n"
13+
"data: {\"type\": \"foo.bar\"}\n"
14+
"\n"
15+
"event: foo.baz\n"
16+
"data: {\"type\": \"foo.baz\"}"))))]
17+
(is (match?
18+
[["foo.bar" {:type "foo.bar"}]
19+
["foo.baz" {:type "foo.baz"}]]
20+
(llm-util/event-data-seq r)))))
21+
(testing "when there is no event line, only a data line"
22+
(with-open [r (io/reader (ByteArrayInputStream. (.getBytes (str "data: {\"type\": \"foo.bar\"}\n"
23+
"\n"
24+
"data: {\"type\": \"foo.baz\"}"))))]
25+
(is (match?
26+
[["foo.bar" {:type "foo.bar"}]
27+
["foo.baz" {:type "foo.baz"}]]
28+
(llm-util/event-data-seq r)))))
29+
(testing "Ignore [DONE] when exists"
30+
(with-open [r (io/reader (ByteArrayInputStream. (.getBytes (str "data: {\"type\": \"foo.bar\"}\n"
31+
"\n"
32+
"data: {\"type\": \"foo.baz\"}\n"
33+
"\n"
34+
"data: [DONE]\n"))))]
35+
(is (match?
36+
[["foo.bar" {:type "foo.bar"}]
37+
["foo.baz" {:type "foo.baz"}]]
38+
(llm-util/event-data-seq r))))))

0 commit comments

Comments
 (0)