Skip to content

Commit 57ec0c1

Browse files
committed
Fix llm sse requests and add debugging
1 parent d9417a8 commit 57ec0c1

File tree

4 files changed

+68
-38
lines changed

4 files changed

+68
-38
lines changed

src/simulflow/processors/google.clj

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,7 @@
6767
:params (schema/->describe-parameters GoogleLLMConfigSchema)
6868
:workload :io})
6969

70-
(defn init!
71-
[params]
72-
(uai/init-llm-processor! GoogleLLMConfigSchema params :gemini))
70+
(def init! (partial uai/init-llm-processor! GoogleLLMConfigSchema :gemini))
7371

7472
(defn transition
7573
[state transition]

src/simulflow/processors/openai.clj

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,7 @@
141141
:params (schema/->describe-parameters OpenAILLMConfigSchema)
142142
:workload :io})
143143

144-
(defn init!
145-
[params]
146-
(uai/init-llm-processor! OpenAILLMConfigSchema params :openai))
144+
(def init! (partial uai/init-llm-processor! OpenAILLMConfigSchema :openai))
147145

148146
(defn transition
149147
[state transition]

src/simulflow/utils/openai.clj

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -71,19 +71,27 @@
7171
;; Common processor functions
7272
(defn vthread-pipe-response-with-interrupt
7373
"Common function to handle streaming response from LLM"
74-
[{:keys [in-ch out-ch interrupt-chan on-end]}]
74+
[{:keys [in-ch out-ch interrupt-ch on-end] :as data}]
75+
(t/log! {:msg "Piping out result" :data data :level :trace :id :llm-processor})
7576
(vthread-loop []
76-
(if-let [[chunk c] (a/alts!! [in-ch interrupt-chan])]
77-
(when (= c in-ch)
78-
(a/>!! out-ch chunk)
79-
(recur))
80-
;; No more data or interruption, call on-end and exit
81-
(when (fn? on-end)
82-
(on-end)))))
77+
(let [[val port] (a/alts!! [interrupt-ch in-ch] :priority true)]
78+
(t/log! {:level :trace
79+
:msg "Piping current val"
80+
:data {:val val
81+
:chan (if (= port in-ch) :in-ch :interrupt-ch)}
82+
:id :llm-processor})
83+
84+
(if (and (= port in-ch) val)
85+
(do
86+
(a/>!! out-ch val)
87+
(recur))
88+
;; No more data or interruption, call on-end and exit
89+
(when (fn? on-end)
90+
(on-end))))))
8391

8492
(defn init-llm-processor!
8593
"Common initialization function for OpenAI-compatible LLM processors"
86-
[schema params log-id]
94+
[schema log-id params]
8795
(let [parsed-config (schema/parse-with-defaults schema params)
8896
llm-write (a/chan 100)
8997
llm-read (a/chan 1024)
@@ -92,22 +100,26 @@
92100
(vthread-loop []
93101
(when-let [command (a/<!! llm-write)]
94102
(try
95-
(t/log! {:level :debug :id log-id :data command} "Processing request command")
96-
(assert (#{:command/sse-request :command/interrupt-request} command)
103+
(t/log! {:level :debug :id log-id :data command} "Processing command")
104+
(assert (#{:command/sse-request :command/interrupt-request} (:command/kind command))
97105
"LLM processor only supports SSE request or interrupt request commands")
98106
;; Execute the command and handle the streaming response
99-
(cond
100-
(= command :command/sse-request)
107+
(case (:command/kind command)
108+
109+
:command/sse-request
101110
(do
111+
(t/log! {:level :debug :id log-id :msg "Making SSE request"})
102112
(reset! request-in-progress? true)
103113
(vthread-pipe-response-with-interrupt {:in-ch (command/handle-command command)
104114
:out-ch llm-read
105115
:interrupt-ch interrupt-ch
106116
:on-end #(reset! request-in-progress? false)}))
107117

108-
(= command :command/interrupt-request)
118+
:command/interrupt-request
109119
(when @request-in-progress?
110-
(a/>!! interrupt-ch command)))
120+
(a/>!! interrupt-ch command))
121+
122+
nil)
111123
(catch Exception e
112124
(t/log! {:level :error :id log-id :error e} "Error processing command")))
113125
(recur)))

src/simulflow/utils/request.clj

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
[hato.client :as http]
88
[hato.middleware :as hm]
99
[simulflow.async :refer [vthread vthread-loop]]
10-
[simulflow.utils.core :as u])
10+
[simulflow.utils.core :as u]
11+
[taoensso.telemere :as t])
1112
(:import
1213
(java.io InputStream)))
1314

@@ -23,12 +24,19 @@
2324
(recur))))))
2425

2526
(defn- parse-openai-event [raw-event]
26-
(let [data-idx (string/index-of raw-event "{")
27+
(let [_ (t/log! {:level :trace :msg "SSE parsing raw event" :data {:raw-event-preview (take 100 raw-event)}})
28+
data-idx (string/index-of raw-event "{")
2729
done-idx (string/index-of raw-event "[DONE]")]
30+
(t/log! {:level :trace :msg "SSE parsing indices" :data {:data-idx data-idx :done-idx done-idx}})
2831
(if done-idx
29-
:done
30-
(-> (subs raw-event data-idx)
31-
(u/parse-if-json :throw-on-error? true)))))
32+
(do
33+
(t/log! {:level :trace :msg "SSE parsing DONE event"})
34+
:done)
35+
(let [json-str (subs raw-event data-idx)
36+
_ (t/log! {:level :trace :msg "SSE parsing JSON string" :data {:json-preview (take 100 json-str)}})
37+
parsed (u/parse-if-json json-str :throw-on-error? true)]
38+
(t/log! {:level :trace :msg "SSE parsing result" :data {:parsed-preview (take 100 (str parsed))}})
39+
parsed))))
3240

3341
; Per this discussion: https://community.openai.com/t/clarification-for-max-tokens/19576
3442
; if the max_tokens is not provided, the response will try to use all the available
@@ -48,33 +56,47 @@
4856
[{:keys [request params]}]
4957
(let [close? (:stream/close? params)
5058
parse-event (or (:parse-event params) parse-openai-event)
51-
event-stream ^InputStream (:body (http/request (merge request
52-
params
53-
{:as :stream})))
59+
result (http/request (merge request
60+
params
61+
{:as :stream}))
62+
_ (t/log! {:level :trace :msg "SSE Request result" :data result})
63+
event-stream ^InputStream (:body result)
5464
buffer-size (calc-buffer-size params)
5565
events (a/chan (a/buffer buffer-size) (map parse-event))]
5666
(vthread
5767
(try
5868
(loop [byte-coll []]
5969
(let [byte-arr (byte-array (max 1 (.available event-stream)))
6070
bytes-read (.read event-stream byte-arr)]
71+
(t/log! {:level :trace :msg "SSE read bytes" :data {:bytes-read bytes-read :available (.available event-stream)}})
6172

6273
(if (neg? bytes-read)
6374

6475
;; Input stream closed, exiting read-loop
65-
nil
76+
(do
77+
(t/log! {:level :trace :msg "SSE stream closed"})
78+
nil)
6679

6780
(let [next-byte-coll (concat byte-coll (seq byte-arr))
6881
data (slurp (byte-array next-byte-coll))]
82+
(t/log! {:level :trace :msg "SSE data chunk" :data {:data-length (count data) :data-preview (take 200 data)}})
6983
(if-let [es (not-empty (re-seq event-mask data))]
70-
(if (every? true? (map #(a/>!! events %) es))
71-
(recur (drop (apply + (map #(count (.getBytes ^String %)) es))
72-
next-byte-coll))
73-
74-
;; Output stream closed, exiting read-loop
75-
nil)
76-
77-
(recur next-byte-coll))))))
84+
(do
85+
(t/log! {:level :trace :msg "SSE parsed events" :data {:event-count (count es) :events (map #(take 100 %) es)}})
86+
(if (every? true? (map #(do
87+
(t/log! {:level :trace :msg "SSE putting event in channel" :data {:event (take 100 %)}})
88+
(a/>!! events %)) es))
89+
(recur (drop (apply + (map #(count (.getBytes ^String %)) es))
90+
next-byte-coll))
91+
92+
;; Output stream closed, exiting read-loop
93+
(do
94+
(t/log! {:level :trace :msg "SSE output stream closed during event sending"})
95+
nil)))
96+
97+
(do
98+
(t/log! {:level :trace :msg "SSE no events found in data chunk"})
99+
(recur next-byte-coll)))))))
78100
(finally
79101
(when close?
80102
(a/close! events))

0 commit comments

Comments
 (0)