Skip to content

Commit ade7f07

Browse files
committed
Refactor process-text-think-aware
1 parent 1b1aa11 commit ade7f07

File tree

1 file changed

+85
-95
lines changed

1 file changed

+85
-95
lines changed

src/eca/llm_providers/openai_chat.clj

Lines changed: 85 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -85,23 +85,23 @@
8585
"Transform a single ECA message to OpenAI format. Returns nil for unsupported roles."
8686
[{:keys [role content] :as _msg} supports-image? thinking-start-block thinking-end-block]
8787
(case role
88-
"tool_call" {:type :tool-call ; Special marker for accumulation
89-
:data {:id (:id content)
90-
:type "function"
91-
:function {:name (:name content)
92-
:arguments (json/generate-string (:arguments content))}}}
88+
"tool_call" {:type :tool-call ; Special marker for accumulation
89+
:data {:id (:id content)
90+
:type "function"
91+
:function {:name (:name content)
92+
:arguments (json/generate-string (:arguments content))}}}
9393
"tool_call_output" {:role "tool"
9494
:tool_call_id (:id content)
9595
:content (llm-util/stringfy-tool-result content)}
96-
"user" {:role "user"
97-
:content (extract-content content supports-image?)}
98-
"reason" {:role "assistant"
99-
:content [{:type "text"
100-
:text (str thinking-start-block (:text content) thinking-end-block)}]}
101-
"assistant" {:role "assistant"
102-
:content (extract-content content supports-image?)}
103-
"system" {:role "system"
104-
:content (extract-content content supports-image?)}
96+
"user" {:role "user"
97+
:content (extract-content content supports-image?)}
98+
"reason" {:role "assistant"
99+
:content [{:type "text"
100+
:text (str thinking-start-block (:text content) thinking-end-block)}]}
101+
"assistant" {:role "assistant"
102+
:content (extract-content content supports-image?)}
103+
"system" {:role "system"
104+
:content (extract-content content supports-image?)}
105105
nil))
106106

107107
(defn ^:private accumulate-tool-calls
@@ -131,8 +131,8 @@
131131
"Check if a message should be included in the final output."
132132
[{:keys [role content tool_calls] :as msg}]
133133
(and msg
134-
(or (= role "tool") ; Never remove tool messages
135-
(seq tool_calls) ; Keep messages with tool calls
134+
(or (= role "tool") ; Never remove tool messages
135+
(seq tool_calls) ; Keep messages with tool calls
136136
(and (string? content)
137137
(not (string/blank? content)))
138138
(sequential? content))))
@@ -194,77 +194,67 @@
194194
nil)))
195195

196196
(defn ^:private process-text-think-aware
197-
"Incremental thinking/content parser that supports tag splits across chunks
198-
Strategy:
199-
- Maintain a rolling buffer across chunks
200-
- Outside a thinking block: search for thinking-start-tag; if not found, emit all but a small tail
201-
of length (thinking-start-len - 1) to allow tags split across chunks (e.g., \"<th\", \"ink\", \">\", ...).
202-
- Inside a thinking block: search for thinking-end-tag; if not found, emit all but a small tail
203-
of length (thinking-end-len - 1) to allow end tags split across chunks.
204-
- On stream end, flush remaining buffer and close any open reasoning block."
197+
"Incremental parser that splits streamed content into user text and thinking blocks.
198+
- Maintains a rolling buffer across chunks to handle tags that may be split across chunks
199+
- Outside thinking: emit user text up to <think> and keep a small tail to detect split tags
200+
- Inside thinking: emit reasoning up to </think> and keep a small tail to detect split tags
201+
- When a tag boundary is found, open/close the reasoning block accordingly"
205202
[text content-buffer* reasoning-type* current-reason-id*
206203
reasoning-started* thinking-start-tag thinking-end-tag on-message-received on-reason]
207-
(let [thinking-start-len (count thinking-start-tag)
208-
thinking-end-len (count thinking-end-tag)]
204+
(let [start-len (count thinking-start-tag)
205+
end-len (count thinking-end-tag)
206+
;; Keep a small tail to detect tags split across chunk boundaries.
207+
start-tail (max 0 (dec start-len))
208+
end-tail (max 0 (dec end-len))
209+
emit-text! (fn [^String s]
210+
(when (pos? (count s))
211+
(on-message-received {:type :text :text s})))
212+
emit-think! (fn [^String s]
213+
(when (pos? (count s))
214+
(on-reason {:status :thinking :id @current-reason-id* :text s})))
215+
start-think! (fn []
216+
(when-not @reasoning-started*
217+
(let [new-id (str (random-uuid))]
218+
(reset! current-reason-id* new-id)
219+
(reset! reasoning-started* true)
220+
(reset! reasoning-type* :tag)
221+
(on-reason {:status :started :id new-id}))))
222+
finish-think! (fn []
223+
(when @reasoning-started*
224+
(on-reason {:status :finished :id @current-reason-id*})
225+
(reset! reasoning-started* false)
226+
(reset! reasoning-type* nil)))]
209227
(when (seq text)
210228
(swap! content-buffer* str text)
211229
(loop []
212-
(let [buf @content-buffer*
213-
inside-tag? (= @reasoning-type* :tag)
214-
;; Keep a small tail to detect tags split across chunk boundaries.
215-
start-tail (max 0 (dec thinking-start-len))
216-
end-tail (max 0 (dec thinking-end-len))]
217-
(if inside-tag?
218-
;; We are inside a thinking block; look for end tag
219-
(let [idx (.indexOf ^String buf ^String thinking-end-tag)]
230+
(let [^String buf @content-buffer*]
231+
(if (= @reasoning-type* :tag)
232+
;; Inside a thinking block; look for end tag
233+
(let [idx (.indexOf buf ^String thinking-end-tag)]
220234
(if (>= idx 0)
221-
(let [before (.substring ^String buf 0 idx)
222-
after (.substring ^String buf (+ idx thinking-end-len))]
223-
(when (pos? (count before))
224-
(on-reason {:status :thinking
225-
:id @current-reason-id*
226-
:text before}))
227-
;; Close the thinking block
235+
(let [before (.substring buf 0 idx)
236+
after (.substring buf (+ idx end-len))]
237+
(emit-think! before)
228238
(reset! content-buffer* after)
229-
(when @reasoning-started*
230-
(on-reason {:status :finished :id @current-reason-id*})
231-
(reset! reasoning-started* false)
232-
(reset! reasoning-type* nil))
239+
(finish-think!)
233240
(recur))
234-
;; No end tag yet: emit most, keep small tail to match possible split tag
235241
(let [emit-len (max 0 (- (count buf) end-tail))]
236242
(when (pos? emit-len)
237-
(let [to-emit (.substring ^String buf 0 emit-len)
238-
tail (.substring ^String buf emit-len)]
239-
(when (pos? (count to-emit))
240-
(on-reason {:status :thinking
241-
:id @current-reason-id*
242-
:text to-emit}))
243-
(reset! content-buffer* tail))))))
244-
;; We are outside a thinking block; look for start tag
245-
(let [idx (.indexOf ^String buf ^String thinking-start-tag)]
243+
(emit-think! (.substring buf 0 emit-len))
244+
(reset! content-buffer* (.substring buf emit-len))))))
245+
;; Outside a thinking block; look for start tag
246+
(let [idx (.indexOf buf ^String thinking-start-tag)]
246247
(if (>= idx 0)
247-
(let [before (.substring ^String buf 0 idx)
248-
after (.substring ^String buf (+ idx thinking-start-len))]
249-
(when (pos? (count before))
250-
(on-message-received {:type :text :text before}))
251-
;; Open a new thinking block
252-
(when-not @reasoning-started*
253-
(let [new-id (str (random-uuid))]
254-
(reset! current-reason-id* new-id)
255-
(reset! reasoning-started* true)
256-
(reset! reasoning-type* :tag)
257-
(on-reason {:status :started :id new-id})))
248+
(let [before (.substring buf 0 idx)
249+
after (.substring buf (+ idx start-len))]
250+
(emit-text! before)
251+
(start-think!)
258252
(reset! content-buffer* after)
259253
(recur))
260-
;; No start tag yet: emit most, keep small tail for possible split tag
261254
(let [emit-len (max 0 (- (count buf) start-tail))]
262255
(when (pos? emit-len)
263-
(let [to-emit (.substring ^String buf 0 emit-len)
264-
tail (.substring ^String buf emit-len)]
265-
(when (pos? (count to-emit))
266-
(on-message-received {:type :text :text to-emit}))
267-
(reset! content-buffer* tail))))))))))))
256+
(emit-text! (.substring buf 0 emit-len))
257+
(reset! content-buffer* (.substring buf emit-len))))))))))))
268258

269259
(defn completion!
270260
"Primary entry point for OpenAI chat completions with streaming support.
@@ -287,10 +277,10 @@
287277
(normalize-messages user-messages supports-image? thinking-start-tag thinking-end-tag)))
288278

289279
body (merge (assoc-some
290-
{:model model
291-
:messages messages
292-
:temperature temperature
293-
:stream true
280+
{:model model
281+
:messages messages
282+
:temperature temperature
283+
:stream true
294284
:parallel_tool_calls parallel-tool-calls?}
295285
:max_tokens max-output-tokens
296286
:tools (when (seq tools) (->tools tools)))
@@ -326,20 +316,20 @@
326316
(reset! reasoning-type* nil))
327317
(execute-accumulated-tools!
328318
{:tool-calls-atom tool-calls-atom
329-
:instructions instructions
319+
:instructions instructions
330320
:supports-image? supports-image?
331-
:extra-headers extra-headers
332-
:body body
333-
:api-url api-url
334-
:api-key api-key
321+
:extra-headers extra-headers
322+
:body body
323+
:api-url api-url
324+
:api-key api-key
335325
:on-tools-called on-tools-called
336-
:on-error on-error
326+
:on-error on-error
337327
:thinking-start-block thinking-start-tag
338328
:thinking-end-block thinking-end-tag
339329
:handle-response handle-response}))
340330
(when (seq (:choices data))
341331
(doseq [choice (:choices data)]
342-
(let [delta (:delta choice)
332+
(let [delta (:delta choice)
343333
finish-reason (:finish_reason choice)]
344334
;; Process content if present (with thinking blocks support)
345335
(when-let [ct (:content delta)]
@@ -357,8 +347,8 @@
357347
(reset! reasoning-type* :delta)
358348
(on-reason {:status :started :id new-reason-id})))
359349
(on-reason {:status :thinking
360-
:id @current-reason-id*
361-
:text reasoning-text}))
350+
:id @current-reason-id*
351+
:text reasoning-text}))
362352

363353
;; Check if reasoning just stopped (delta-based)
364354
(when (and (= @reasoning-type* :delta)
@@ -374,20 +364,20 @@
374364
;; Process tool calls if present
375365
(when (:tool_calls delta)
376366
(doseq [tool-call (:tool_calls delta)]
377-
(let [{:keys [index id function]} tool-call
367+
(let [{:keys [index id function]} tool-call
378368
{name :name args :arguments} function
379369
;; Use RID as key to avoid collisions between API requests
380-
tool-key (str rid "-" index)
370+
tool-key (str rid "-" index)
381371
;; Create globally unique tool call ID for client
382-
unique-id (when id (str rid "-" id))]
372+
unique-id (when id (str rid "-" id))]
383373
(when (and name unique-id)
384374
(on-prepare-tool-call {:id unique-id :name name :arguments-text ""}))
385375
(swap! tool-calls-atom update tool-key
386376
(fn [existing]
387377
(cond-> (or existing {:index index})
388378
unique-id (assoc :id unique-id)
389-
name (assoc :name name)
390-
args (update :arguments-text (fnil str "") args))))
379+
name (assoc :name name)
380+
args (update :arguments-text (fnil str "") args))))
391381
(when-let [updated-tool-call (get @tool-calls-atom tool-key)]
392382
(when (and (:id updated-tool-call) (:name updated-tool-call)
393383
(not (string/blank? (:arguments-text updated-tool-call))))
@@ -411,11 +401,11 @@
411401
(on-message-received {:type :finish :finish-reason finish-reason}))))))))
412402
rid (llm-util/gen-rid)]
413403
(base-request!
414-
{:rid rid
415-
:body body
404+
{:rid rid
405+
:body body
416406
:extra-headers extra-headers
417-
:api-url api-url
418-
:api-key api-key
407+
:api-url api-url
408+
:api-key api-key
419409
:tool-calls* tool-calls*
420-
:on-error on-error
410+
:on-error on-error
421411
:on-response (fn [event data] (handle-response event data tool-calls* rid))})))

0 commit comments

Comments
 (0)