Skip to content

Commit d025046

Browse files
committed
Transform malli schema to core.async.flow :describe format
1 parent 7ed7fc2 commit d025046

File tree

2 files changed

+69
-42
lines changed

2 files changed

+69
-42
lines changed

src/simulflow/processors/openai.clj

Lines changed: 35 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
[;; GPT-4 Models
2626
"gpt-4"
2727
"gpt-4-32k"
28-
"gpt-4-1106-preview" ;; GPT-4 Turbo
29-
"gpt-4-vision-preview" ;; GPT-4 Vision
28+
"gpt-4-1106-preview" ;; GPT-4 Turbo
29+
"gpt-4-vision-preview" ;; GPT-4 Vision
3030
;; GPT-3.5 Models
3131
"gpt-3.5-turbo"
3232
"gpt-3.5-turbo-16k"
@@ -135,43 +135,36 @@
135135

136136
(def openai-llm-process
137137
(flow/process
138-
(flow/map->step {:describe (fn [] {:ins {:in "Channel for incoming context aggregations"}
139-
:outs {:out "Channel where streaming responses will go"}
140-
:params {:llm/model "Openai model used"
141-
:openai/api-key "OpenAI Api key"
142-
:llm/temperature "Optional temperature parameter for the llm inference"
143-
:llm/max-tokens "Optional max tokens to generate"
144-
:llm/presence-penalty "Optional (-2.0 to 2.0)"
145-
:llm/top-p "Optional nucleus sampling threshold"
146-
:llm/seed "Optional seed used for deterministic sampling"
147-
:llm/max-completion-tokens "Optional Max tokens in completion"
148-
:llm/extra "Optional extra model parameters"}
149-
:workload :io})
150-
151-
:transition (fn [{::flow/keys [in-ports out-ports]} transition]
152-
(when (= transition ::flow/stop)
153-
(doseq [port (concat (vals in-ports) (vals out-ports))]
154-
(a/close! port))))
155-
:init (fn [params]
156-
(let [state (m/decode OpenAILLMConfigSchema params mt/default-value-transformer)
157-
llm-write (a/chan 100)
158-
llm-read (a/chan 1024)]
159-
(vthread-loop []
160-
(if-let [frame (a/<!! llm-write)]
161-
(do
162-
(t/log! :info ["AI REQUEST" (:frame/data frame)])
163-
(assert (or (frame/llm-context? frame)
164-
(frame/control-interrupt-start? frame)) "Invalid frame sent to LLM. Only llm-context or interrupt-start")
165-
(flow-do-completion! state llm-read (:frame/data frame))
166-
(recur))
167-
(t/log! {:level :info :id :llm} "Closing llm loop")))
168-
169-
{::flow/in-ports {:llm-read llm-read}
170-
::flow/out-ports {:llm-write llm-write}}))
171-
172-
:transform (fn [state in msg]
173-
(if (= in :llm-read)
174-
[state {:out [msg]}]
175-
(cond
176-
(frame/llm-context? msg)
177-
[state {:llm-write [msg]}])))})))
138+
(fn
139+
([] {:ins {:in "Channel for incoming context aggregations"}
140+
:outs {:out "Channel where streaming responses will go"}
141+
:params (schema/->flow-describe-parameters OpenAILLMConfigSchema)
142+
:workload :io})
143+
([params]
144+
(let [state (m/decode OpenAILLMConfigSchema params mt/default-value-transformer)
145+
llm-write (a/chan 100)
146+
llm-read (a/chan 1024)]
147+
(vthread-loop []
148+
(if-let [frame (a/<!! llm-write)]
149+
(do
150+
(t/log! :info ["AI REQUEST" (:frame/data frame)])
151+
(assert (or (frame/llm-context? frame)
152+
(frame/control-interrupt-start? frame)) "Invalid frame sent to LLM. Only llm-context or interrupt-start")
153+
(flow-do-completion! state llm-read (:frame/data frame))
154+
(recur))
155+
(t/log! {:level :info :id :llm} "Closing llm loop")))
156+
157+
{::flow/in-ports {:llm-read llm-read}
158+
::flow/out-ports {:llm-write llm-write}}))
159+
160+
([{::flow/keys [in-ports out-ports]} transition]
161+
(when (= transition ::flow/stop)
162+
(doseq [port (concat (vals in-ports) (vals out-ports))]
163+
(a/close! port))))
164+
165+
([state in msg]
166+
(if (= in :llm-read)
167+
[state {:out [msg]}]
168+
(cond
169+
(frame/llm-context? msg)
170+
[state {:llm-write [msg]}]))))))

src/simulflow/schema.clj

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,40 @@
55
[malli.error :as me]
66
[malli.util :as mu]))
77

8+
(defn parse-schema
9+
[s]
10+
(cond
11+
(keyword? s) {:type s
12+
:data {}
13+
:children []}
14+
:else
15+
(let [[type & [maybe-arg :as args]] s
16+
[data childs] (if (or (vector? maybe-arg)
17+
(and (sequential? maybe-arg)
18+
(sequential? (first maybe-arg)))
19+
(nil? maybe-arg))
20+
[{} args]
21+
[maybe-arg (rest args)])]
22+
{:type type
23+
:data data
24+
:children (if (= 1 (count childs)) (first childs) (vec childs))})))
25+
26+
(defn ->flow-describe-parameters
27+
"Take a malli schema and transform to core.async.flow processor :describe key format"
28+
[s]
29+
(let [{:keys [children type]} (parse-schema s)]
30+
(assert (= type :map) "Can only transform :map schemas to :describe parameter format ")
31+
(reduce
32+
(fn [acc child]
33+
(let [{:keys [type data children]} (parse-schema child)
34+
key-name type
35+
optional? (:optional data false)
36+
{:keys [type data]} (parse-schema children)
37+
description (:description data)]
38+
(assoc acc key-name (apply str (remove nil? ["Type: " type (when optional? "; Optional ") (when description (str "; Description: " description))])))))
39+
{}
40+
children)))
41+
842
(defn flex-enum
943
"Creates a flexible enum that accepts both keywords and their string versions.
1044
Input: coll of keywords or string

0 commit comments

Comments
 (0)