Skip to content

Commit f246716

Browse files
committed
Add support for vthreads via vfuturize
1 parent cb48337 commit f246716

File tree

3 files changed

+96
-65
lines changed

3 files changed

+96
-65
lines changed

src/simulflow/async.clj

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
(ns simulflow.async
2+
(:require
3+
[clojure.core.async.flow :as flow])
4+
(:import
5+
(java.util.concurrent Executors)))
6+
7+
(def virtual-threads-supported?
8+
(try
9+
(Class/forName "java.lang.Thread$Builder$OfVirtual")
10+
true
11+
(catch ClassNotFoundException _
12+
false)))
13+
14+
(def ^:private virtual-executor
15+
(when virtual-threads-supported?
16+
(Executors/newVirtualThreadPerTaskExecutor)))
17+
18+
(defn vfuturize
19+
"Like flow/futurize but uses virtual threads when available (Java 21+),
20+
otherwise falls back to the specified executor type (default :mixed)"
21+
([f & {:keys [exec]
22+
:or {exec :mixed}}]
23+
(if virtual-threads-supported?
24+
(flow/futurize f :exec virtual-executor)
25+
(flow/futurize f :exec exec))))

src/simulflow/processors/openai.clj

Lines changed: 8 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -13,44 +13,13 @@
1313

1414
(def openai-completions-url "https://api.openai.com/v1/chat/completions")
1515

16-
(defn stream-openai-chat-completion
17-
[{:keys [api-key messages tools model response-format]
18-
:or {model "gpt-4o-mini"}}]
19-
(:body (request/sse-request {:request {:url openai-completions-url
20-
:headers {"Authorization" (str "Bearer " api-key)
21-
"Content-Type" "application/json"}
22-
23-
:method :post
24-
:body (u/json-str (cond-> {:messages messages
25-
:stream true
26-
:response_format response-format
27-
:model model}
28-
(pos? (count tools)) (assoc :tools tools)))}
29-
:params {:stream/close? true}})))
30-
31-
(defn normal-chat-completion
32-
[{:keys [api-key messages tools model response-format stream]
33-
:or {model "gpt-4o-mini"
34-
stream false}}]
35-
(http/request {:url openai-completions-url
36-
:headers {"Authorization" (str "Bearer " api-key)
37-
"Content-Type" "application/json"}
38-
39-
:throw-on-error? false
40-
:method :post
41-
:body (u/json-str (cond-> {:messages messages
42-
:stream stream
43-
:response_format response-format
44-
:model model}
45-
(pos? (count tools)) (assoc :tools tools)))}))
46-
4716
(comment
4817
(require '[simulflow.secrets :refer [secret]])
4918
(def context {:messages [{:role "system", :content "You are a helpful assistant "} {:role :system, :content "You are a restaurant reservation assistant for La Maison, an upscale French restaurant. You must ALWAYS use one of the available functions to progress the conversation. This is a phone conversations and your responses will be converted to audio. Avoid outputting special characters and emojis. Be casual and friendly."} {:role :system, :content "Warmly greet the customer and ask how many people are in their party."} {:role "user", :content "It's gonna be 5 people."} {:role :assistant, :tool_calls [{:id "call_CBFGmrwUrrzcmsgvI9kvqdjc", :type :function, :function {:name "record_party_size", :arguments "{\"size\":5}"}}]} {:role :tool, :content [{:type :text, :text "5"}], :tool_call_id "call_CBFGmrwUrrzcmsgvI9kvqdjc"} {:role :system, :content "Ask what time they'd like to dine. Restaurant is open 5 PM to 10 PM. After they provide a time, confirm it's within operating hours before recording. Use 24-hour format for internal recording (e.g., 17:00 for 5 PM)."} {:role "user", :content "At around 4 o'clock in the afternoon."}], :tools [{:type :function, :function {:name "record_time" :description "Record the requested time", :parameters {:type :object, :properties {:time {:type :string, :pattern "^(17|18|19|20|21|22):([0-5][0-9])$", :description "Reservation time in 24-hour format (17:00-22:00)"}}, :required [:time]}, :transition_to :confirm}}]})
50-
(a/<!! (a/into [] (stream-openai-chat-completion {:messages (:messages context)
51-
:tools (mapv u/->tool-fn (:tools context))
52-
:api-key (secret [:openai :new-api-sk])
53-
:model "gpt-4o-mini"})))
19+
(a/<!! (a/into [] (request/stream-chat-completion {:messages (:messages context)
20+
:tools (mapv u/->tool-fn (:tools context))
21+
:api-key (secret [:openai :new-api-sk])
22+
:model "gpt-4o-mini"})))
5423

5524
,)
5625

@@ -153,10 +122,10 @@
153122
;; Start request only when the last message in the context is by the user
154123

155124
(a/>!! out-c (frame/llm-full-response-start true))
156-
(let [stream-ch (try (stream-openai-chat-completion (merge {:model model
157-
:api-key api-key
158-
:messages (:messages context)
159-
:tools (mapv u/->tool-fn (:tools context))}))
125+
(let [stream-ch (try (request/stream-chat-completion (merge {:model model
126+
:api-key api-key
127+
:messages (:messages context)
128+
:tools (mapv u/->tool-fn (:tools context))}))
160129
(catch Exception e
161130
(t/log! :error e)))]
162131

src/simulflow/utils/request.clj

Lines changed: 63 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
[clojure.string :as string]
77
[hato.client :as http]
88
[hato.middleware :as hm]
9+
[simulflow.async :as async]
910
[simulflow.utils.core :as u])
1011
(:import
1112
(java.io InputStream)))
@@ -53,32 +54,34 @@
5354
{:as :stream})))
5455
buffer-size (calc-buffer-size params)
5556
events (a/chan (a/buffer buffer-size) (map parse-event))]
56-
(a/thread
57-
(try
58-
(loop [byte-coll []]
59-
(let [byte-arr (byte-array (max 1 (.available event-stream)))
60-
bytes-read (.read event-stream byte-arr)]
61-
62-
(if (neg? bytes-read)
63-
64-
;; Input stream closed, exiting read-loop
65-
nil
66-
67-
(let [next-byte-coll (concat byte-coll (seq byte-arr))
68-
data (slurp (byte-array next-byte-coll))]
69-
(if-let [es (not-empty (re-seq event-mask data))]
70-
(if (every? true? (map #(a/>!! events %) es))
71-
(recur (drop (apply + (map #(count (.getBytes ^String %)) es))
72-
next-byte-coll))
73-
74-
;; Output stream closed, exiting read-loop
75-
nil)
76-
77-
(recur next-byte-coll))))))
78-
(finally
79-
(when close?
80-
(a/close! events))
81-
(.close event-stream))))
57+
((async/vfuturize
58+
(fn []
59+
(try
60+
(loop [byte-coll []]
61+
(let [byte-arr (byte-array (max 1 (.available event-stream)))
62+
bytes-read (.read event-stream byte-arr)]
63+
64+
(if (neg? bytes-read)
65+
66+
;; Input stream closed, exiting read-loop
67+
nil
68+
69+
(let [next-byte-coll (concat byte-coll (seq byte-arr))
70+
data (slurp (byte-array next-byte-coll))]
71+
(if-let [es (not-empty (re-seq event-mask data))]
72+
(if (every? true? (map #(a/>!! events %) es))
73+
(recur (drop (apply + (map #(count (.getBytes ^String %)) es))
74+
next-byte-coll))
75+
76+
;; Output stream closed, exiting read-loop
77+
nil)
78+
79+
(recur next-byte-coll))))))
80+
(finally
81+
(when close?
82+
(a/close! events))
83+
(.close event-stream))))))
84+
8285
events))
8386

8487
(defn sse-request
@@ -135,6 +138,40 @@
135138
hm/wrap-nested-params
136139
hm/wrap-method])
137140

141+
(def openai-completions-url "https://api.openai.com/v1/chat/completions")
142+
143+
(defn stream-chat-completion
144+
[{:keys [api-key messages tools model response-format completions-url]
145+
:or {model "gpt-4o-mini"
146+
completions-url openai-completions-url}}]
147+
(:body (sse-request {:request {:url completions-url
148+
:headers {"Authorization" (str "Bearer " api-key)
149+
"Content-Type" "application/json"}
150+
151+
:method :post
152+
:body (u/json-str (cond-> {:messages messages
153+
:stream true
154+
:response_format response-format
155+
:model model}
156+
(pos? (count tools)) (assoc :tools tools)))}
157+
:params {:stream/close? true}})))
158+
159+
(defn normal-chat-completion
160+
[{:keys [api-key messages tools model response-format stream completions-url]
161+
:or {model "gpt-4o-mini"
162+
stream false}}]
163+
(http/request {:url openai-completions-url
164+
:headers {"Authorization" (str "Bearer " api-key)
165+
"Content-Type" "application/json"}
166+
167+
:throw-on-error? false
168+
:method :post
169+
:body (u/json-str (cond-> {:messages messages
170+
:stream stream
171+
:response_format response-format
172+
:model model}
173+
(pos? (count tools)) (assoc :tools tools)))}))
174+
138175
(comment
139176
(require '[simulflow.secrets :refer [secret]])
140177

0 commit comments

Comments
 (0)