Skip to content

Commit e1ec471

Browse files
committed
Move common openai logic to utils.openai
1 parent 9607e92 commit e1ec471

File tree

2 files changed

+38
-32
lines changed

2 files changed

+38
-32
lines changed

src/simulflow/processors/openai.clj

Lines changed: 10 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
[simulflow.frame :as frame]
99
[simulflow.schema :as schema]
1010
[simulflow.utils.core :as u]
11+
[simulflow.utils.openai :as uai]
1112
[simulflow.utils.request :as request]
1213
[taoensso.telemere :as t]))
1314

@@ -103,36 +104,6 @@
103104

104105
,)
105106

106-
(def delta (comp :delta first :choices))
107-
108-
(defn flow-do-completion!
109-
"Handle completion requests for OpenAI LLM models"
110-
[state out-c context]
111-
(let [{:llm/keys [model] :openai/keys [api-key]} state]
112-
;; Start request only when the last message in the context is by the user
113-
114-
(a/>!! out-c (frame/llm-full-response-start true))
115-
(let [stream-ch (try (request/stream-chat-completion (merge {:model model
116-
:api-key api-key
117-
:messages (:messages context)
118-
:tools (mapv u/->tool-fn (:tools context))}))
119-
(catch Exception e
120-
(t/log! :error e)))]
121-
122-
(vthread-loop []
123-
(when-let [chunk (a/<!! stream-ch)]
124-
(let [d (delta chunk)]
125-
(if (= chunk :done)
126-
(a/>!! out-c (frame/llm-full-response-end true))
127-
(do
128-
(if-let [tool-call (first (:tool_calls d))]
129-
(do
130-
(t/log! ["Sending tool call" tool-call])
131-
(a/>!! out-c (frame/llm-tool-call-chunk tool-call)))
132-
(when-let [c (:content d)]
133-
(a/>!! out-c (frame/llm-text-chunk c))))
134-
(recur)))))))))
135-
136107
(def openai-llm-process
137108
(flow/process
138109
(fn
@@ -141,7 +112,7 @@
141112
:params (schema/->flow-describe-parameters OpenAILLMConfigSchema)
142113
:workload :io})
143114
([params]
144-
(let [state (m/decode OpenAILLMConfigSchema params mt/default-value-transformer)
115+
(let [{:llm/keys [model] :openai/keys [api-key]} (m/decode OpenAILLMConfigSchema params mt/default-value-transformer)
145116
llm-write (a/chan 100)
146117
llm-read (a/chan 1024)]
147118
(vthread-loop []
@@ -150,7 +121,14 @@
150121
(t/log! :info ["AI REQUEST" (:frame/data frame)])
151122
(assert (or (frame/llm-context? frame)
152123
(frame/control-interrupt-start? frame)) "Invalid frame sent to LLM. Only llm-context or interrupt-start")
153-
(flow-do-completion! state llm-read (:frame/data frame))
124+
(let [context (:frame/data frame)
125+
_ (a/>!! llm-read (frame/llm-full-response-start true))
126+
stream-ch (request/stream-chat-completion {:model model
127+
:api-key api-key
128+
:messages (:messages context)
129+
:tools (mapv u/->tool-fn (:tools context))})]
130+
(uai/handle-completion-request! stream-ch llm-read))
131+
154132
(recur))
155133
(t/log! {:level :info :id :llm} "Closing llm loop")))
156134

src/simulflow/utils/openai.clj

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
(ns simulflow.utils.openai
2+
"Common logic for openai format requests. Many LLM providers use openai format
3+
for their APIs. This NS keeps common logic for those providers."
4+
(:require
5+
[clojure.core.async :as a]
6+
[simulflow.async :refer [vthread-loop]]
7+
[simulflow.frame :as frame]))
8+
9+
(def response-chunk-delta
10+
"Retrieve the delta part of a streaming completion response"
11+
(comp :delta first :choices))
12+
13+
(defn handle-completion-request!
14+
"Handle completion requests for OpenAI LLM models"
15+
[in-c out-c]
16+
17+
(vthread-loop []
18+
(when-let [chunk (a/<!! in-c)]
19+
(let [d (response-chunk-delta chunk)]
20+
(if (= chunk :done)
21+
(a/>!! out-c (frame/llm-full-response-end true))
22+
(do
23+
(if-let [tool-call (first (:tool_calls d))]
24+
(a/>!! out-c (frame/llm-tool-call-chunk tool-call))
25+
;; normal text completion
26+
(when-let [c (:content d)]
27+
(a/>!! out-c (frame/llm-text-chunk c))))
28+
(recur)))))))

0 commit comments

Comments
 (0)