Skip to content

Commit 1b1aa11

Browse files
committed
Refactor openai-chat process-content adding tests
1 parent 774daf8 commit 1b1aa11

File tree

3 files changed

+200
-102
lines changed

3 files changed

+200
-102
lines changed

src/eca/llm_api.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@
180180
:supports-image? supports-image?
181181
:past-messages past-messages
182182
:tools tools
183-
:thinking-block "thought"
183+
:thinking-tag "thought"
184184
:extra-payload (merge {}
185185
(when reason?
186186
{:extra_body {:google {:thinking_config {:include_thoughts true}}}})

src/eca/llm_providers/openai_chat.clj

Lines changed: 96 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,79 @@
193193
;; No completed tools at all - let the streaming response provide the actual finish_reason
194194
nil)))
195195

196+
(defn ^:private process-text-think-aware
197+
"Incremental thinking/content parser that supports tag splits across chunks
198+
Strategy:
199+
- Maintain a rolling buffer across chunks
200+
- Outside a thinking block: search for thinking-start-tag; if not found, emit all but a small tail
201+
of length (thinking-start-len - 1) to allow tags split across chunks (e.g., \"<th\", \"ink\", \">\", ...).
202+
- Inside a thinking block: search for thinking-end-tag; if not found, emit all but a small tail
203+
of length (thinking-end-len - 1) to allow end tags split across chunks.
204+
- On stream end, flush remaining buffer and close any open reasoning block."
205+
[text content-buffer* reasoning-type* current-reason-id*
206+
reasoning-started* thinking-start-tag thinking-end-tag on-message-received on-reason]
207+
(let [thinking-start-len (count thinking-start-tag)
208+
thinking-end-len (count thinking-end-tag)]
209+
(when (seq text)
210+
(swap! content-buffer* str text)
211+
(loop []
212+
(let [buf @content-buffer*
213+
inside-tag? (= @reasoning-type* :tag)
214+
;; Keep a small tail to detect tags split across chunk boundaries.
215+
start-tail (max 0 (dec thinking-start-len))
216+
end-tail (max 0 (dec thinking-end-len))]
217+
(if inside-tag?
218+
;; We are inside a thinking block; look for end tag
219+
(let [idx (.indexOf ^String buf ^String thinking-end-tag)]
220+
(if (>= idx 0)
221+
(let [before (.substring ^String buf 0 idx)
222+
after (.substring ^String buf (+ idx thinking-end-len))]
223+
(when (pos? (count before))
224+
(on-reason {:status :thinking
225+
:id @current-reason-id*
226+
:text before}))
227+
;; Close the thinking block
228+
(reset! content-buffer* after)
229+
(when @reasoning-started*
230+
(on-reason {:status :finished :id @current-reason-id*})
231+
(reset! reasoning-started* false)
232+
(reset! reasoning-type* nil))
233+
(recur))
234+
;; No end tag yet: emit most, keep small tail to match possible split tag
235+
(let [emit-len (max 0 (- (count buf) end-tail))]
236+
(when (pos? emit-len)
237+
(let [to-emit (.substring ^String buf 0 emit-len)
238+
tail (.substring ^String buf emit-len)]
239+
(when (pos? (count to-emit))
240+
(on-reason {:status :thinking
241+
:id @current-reason-id*
242+
:text to-emit}))
243+
(reset! content-buffer* tail))))))
244+
;; We are outside a thinking block; look for start tag
245+
(let [idx (.indexOf ^String buf ^String thinking-start-tag)]
246+
(if (>= idx 0)
247+
(let [before (.substring ^String buf 0 idx)
248+
after (.substring ^String buf (+ idx thinking-start-len))]
249+
(when (pos? (count before))
250+
(on-message-received {:type :text :text before}))
251+
;; Open a new thinking block
252+
(when-not @reasoning-started*
253+
(let [new-id (str (random-uuid))]
254+
(reset! current-reason-id* new-id)
255+
(reset! reasoning-started* true)
256+
(reset! reasoning-type* :tag)
257+
(on-reason {:status :started :id new-id})))
258+
(reset! content-buffer* after)
259+
(recur))
260+
;; No start tag yet: emit most, keep small tail for possible split tag
261+
(let [emit-len (max 0 (- (count buf) start-tail))]
262+
(when (pos? emit-len)
263+
(let [to-emit (.substring ^String buf 0 emit-len)
264+
tail (.substring ^String buf emit-len)]
265+
(when (pos? (count to-emit))
266+
(on-message-received {:type :text :text to-emit}))
267+
(reset! content-buffer* tail))))))))))))
268+
196269
(defn completion!
197270
"Primary entry point for OpenAI chat completions with streaming support.
198271
@@ -201,19 +274,17 @@
201274
Compatible with OpenRouter and other OpenAI-compatible providers."
202275
[{:keys [model user-messages instructions temperature api-key api-url max-output-tokens
203276
past-messages tools extra-payload extra-headers supports-image? parallel-tool-calls?
204-
thinking-block]
277+
thinking-tag]
205278
:or {temperature 1.0
206279
parallel-tool-calls? true
207-
thinking-block "think"}}
280+
thinking-tag "think"}}
208281
{:keys [on-message-received on-error on-prepare-tool-call on-tools-called on-reason]}]
209-
(let [thinking-start-block (str "<" thinking-block ">")
210-
thinking-end-block (str "</" thinking-block ">")
211-
thinking-start-len (count thinking-start-block)
212-
thinking-end-len (count thinking-end-block)
282+
(let [thinking-start-tag (str "<" thinking-tag ">")
283+
thinking-end-tag (str "</" thinking-tag ">")
213284
messages (vec (concat
214285
(when instructions [{:role "system" :content instructions}])
215-
(normalize-messages past-messages supports-image? thinking-start-block thinking-end-block)
216-
(normalize-messages user-messages supports-image? thinking-start-block thinking-end-block)))
286+
(normalize-messages past-messages supports-image? thinking-start-tag thinking-end-tag)
287+
(normalize-messages user-messages supports-image? thinking-start-tag thinking-end-tag)))
217288

218289
body (merge (assoc-some
219290
{:model model
@@ -239,78 +310,17 @@
239310
reasoning-type* (atom nil)
240311
;; Incremental parser buffer for content to detect thinking tags across chunks
241312
content-buffer* (atom "")
242-
243-
;; Incremental thinking/content parser that supports tag splits across chunks
244-
process-content (fn [text]
245-
(when (seq text)
246-
(swap! content-buffer* str text)
247-
(loop []
248-
(let [buf @content-buffer*
249-
inside-tag? (= @reasoning-type* :tag)]
250-
(if inside-tag?
251-
;; We are inside a thinking block; look for end tag
252-
(let [idx (.indexOf ^String buf ^String thinking-end-block)]
253-
(if (>= idx 0)
254-
(let [before (.substring ^String buf 0 idx)
255-
after (.substring ^String buf (+ idx thinking-end-len))]
256-
(when (and on-reason (pos? (count before)))
257-
(on-reason {:status :thinking
258-
:id @current-reason-id*
259-
:text before}))
260-
;; Close the thinking block
261-
(reset! content-buffer* after)
262-
(when (and on-reason @reasoning-started*)
263-
(on-reason {:status :finished :id @current-reason-id*})
264-
(reset! reasoning-started* false)
265-
(reset! reasoning-type* nil))
266-
(recur))
267-
;; No end tag yet: emit most, keep small tail to match possible split tag
268-
(let [emit-len (max 0 (- (count buf) (dec thinking-end-len)))]
269-
(when (pos? emit-len)
270-
(let [to-emit (.substring ^String buf 0 emit-len)
271-
tail (.substring ^String buf emit-len)]
272-
(when (and on-reason (pos? (count to-emit)))
273-
(on-reason {:status :thinking
274-
:id @current-reason-id*
275-
:text to-emit}))
276-
(reset! content-buffer* tail))))))
277-
;; We are outside a thinking block; look for start tag
278-
(let [idx (.indexOf ^String buf ^String thinking-start-block)]
279-
(if (>= idx 0)
280-
(let [before (.substring ^String buf 0 idx)
281-
after (.substring ^String buf (+ idx thinking-start-len))]
282-
(when (pos? (count before))
283-
(on-message-received {:type :text :text before}))
284-
;; Open a new thinking block
285-
(when on-reason
286-
(when-not @reasoning-started*
287-
(let [new-id (str (random-uuid))]
288-
(reset! current-reason-id* new-id)
289-
(reset! reasoning-started* true)
290-
(reset! reasoning-type* :tag)
291-
(on-reason {:status :started :id new-id}))))
292-
(reset! content-buffer* after)
293-
(recur))
294-
;; No start tag yet: emit most, keep small tail for possible split tag
295-
(let [emit-len (max 0 (- (count buf) (dec thinking-start-len)))]
296-
(when (pos? emit-len)
297-
(let [to-emit (.substring ^String buf 0 emit-len)
298-
tail (.substring ^String buf emit-len)]
299-
(when (pos? (count to-emit))
300-
(on-message-received {:type :text :text to-emit}))
301-
(reset! content-buffer* tail)))))))))))
302313
handle-response (fn handle-response [event data tool-calls-atom rid]
303314
(if (= event "stream-end")
304315
(do
305316
;; Flush any leftover buffered content and finish reasoning if needed
306317
(let [buf @content-buffer*]
307318
(when (pos? (count buf))
308319
(if (= @reasoning-type* :tag)
309-
(when on-reason
310-
(on-reason {:status :thinking :id @current-reason-id* :text buf}))
320+
(on-reason {:status :thinking :id @current-reason-id* :text buf})
311321
(on-message-received {:type :text :text buf}))
312322
(reset! content-buffer* "")))
313-
(when (and @reasoning-started* on-reason)
323+
(when @reasoning-started*
314324
(on-reason {:status :finished :id @current-reason-id*})
315325
(reset! reasoning-started* false)
316326
(reset! reasoning-type* nil))
@@ -324,33 +334,31 @@
324334
:api-key api-key
325335
:on-tools-called on-tools-called
326336
:on-error on-error
327-
:thinking-start-block thinking-start-block
328-
:thinking-end-block thinking-end-block
337+
:thinking-start-block thinking-start-tag
338+
:thinking-end-block thinking-end-tag
329339
:handle-response handle-response}))
330340
(when (seq (:choices data))
331341
(doseq [choice (:choices data)]
332342
(let [delta (:delta choice)
333343
finish-reason (:finish_reason choice)]
334344
;; Process content if present (with thinking blocks support)
335345
(when-let [ct (:content delta)]
336-
(if on-reason
337-
(process-content ct)
338-
(on-message-received {:type :text :text ct})))
346+
(process-text-think-aware ct content-buffer* reasoning-type* current-reason-id* reasoning-started*
347+
thinking-start-tag thinking-end-tag on-message-received on-reason))
339348

340349
;; Process reasoning if present (o1 models and compatible providers)
341350
(when-let [reasoning-text (or (:reasoning delta)
342351
(:reasoning_content delta))]
343-
(when on-reason
344-
(when-not @reasoning-started*
352+
(when-not @reasoning-started*
345353
;; Generate new reason-id for each thinking block
346-
(let [new-reason-id (str (random-uuid))]
347-
(reset! current-reason-id* new-reason-id)
348-
(reset! reasoning-started* true)
349-
(reset! reasoning-type* :delta)
350-
(on-reason {:status :started :id new-reason-id})))
351-
(on-reason {:status :thinking
352-
:id @current-reason-id*
353-
:text reasoning-text})))
354+
(let [new-reason-id (str (random-uuid))]
355+
(reset! current-reason-id* new-reason-id)
356+
(reset! reasoning-started* true)
357+
(reset! reasoning-type* :delta)
358+
(on-reason {:status :started :id new-reason-id})))
359+
(on-reason {:status :thinking
360+
:id @current-reason-id*
361+
:text reasoning-text}))
354362

355363
;; Check if reasoning just stopped (delta-based)
356364
(when (and (= @reasoning-type* :delta)
@@ -390,12 +398,11 @@
390398
(let [buf @content-buffer*]
391399
(when (pos? (count buf))
392400
(if (= @reasoning-type* :tag)
393-
(when on-reason
394-
(on-reason {:status :thinking :id @current-reason-id* :text buf}))
401+
(on-reason {:status :thinking :id @current-reason-id* :text buf})
395402
(on-message-received {:type :text :text buf}))
396403
(reset! content-buffer* "")))
397404
;; Handle reasoning completion
398-
(when (and @reasoning-started* on-reason)
405+
(when @reasoning-started*
399406
(on-reason {:status :finished :id @current-reason-id*})
400407
(reset! reasoning-started* false)
401408
(reset! reasoning-type* nil))

0 commit comments

Comments
 (0)