Skip to content

Commit a188f58

Browse files
committed
Replace futurize with vthreads
1 parent 2c8b077 commit a188f58

File tree

3 files changed

+58
-61
lines changed

3 files changed

+58
-61
lines changed

src/simulflow/processors/deepgram.clj

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
[clojure.core.async :as a]
44
[clojure.core.async.flow :as flow]
55
[hato.websocket :as ws]
6+
[simulflow.async :refer [vthread-loop]]
67
[simulflow.frame :as frame]
78
[simulflow.schema :as schema :refer [flex-enum]]
89
[simulflow.utils.core :as u]
@@ -159,23 +160,19 @@ https://developers.deepgram.com/docs/understanding-end-of-speech-detection#using
159160
_ (t/log! {:level :info :id :deepgram-transcriptor} ["Connecting to transcription websocket" websocket-url])
160161
ws-conn @(ws/websocket
161162
websocket-url
162-
conn-config)
163-
164-
write-to-ws #(loop []
165-
(when @alive?
166-
(when-let [msg (a/<!! ws-write-chan)]
167-
(when (and (frame/audio-input-raw? msg) @alive?)
168-
(ws/send! ws-conn (:frame/data msg))
169-
(recur)))))
170-
keep-alive #(loop []
171-
(when @alive?
172-
(a/<!! (a/timeout 3000))
173-
(t/log! {:level :trace :id :deepgram} "Sending keep-alive message")
174-
(ws/send! ws-conn keep-alive-payload)
175-
(recur)))]
176-
((flow/futurize write-to-ws :exec :io))
177-
((flow/futurize keep-alive :exec :io))
178-
163+
conn-config)]
164+
(vthread-loop []
165+
(when @alive?
166+
(when-let [msg (a/<!! ws-write-chan)]
167+
(when (and (frame/audio-input-raw? msg) @alive?)
168+
(ws/send! ws-conn (:frame/data msg))
169+
(recur)))))
170+
(vthread-loop []
171+
(when @alive?
172+
(a/<!! (a/timeout 3000))
173+
(t/log! {:level :trace :id :deepgram} "Sending keep-alive message")
174+
(ws/send! ws-conn keep-alive-payload)
175+
(recur)))
179176
{:websocket/conn ws-conn
180177
:websocket/alive? alive?
181178
::flow/in-ports {:ws-read ws-read-chan}

src/simulflow/processors/openai.clj

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
[clojure.core.async.flow :as flow]
55
[malli.core :as m]
66
[malli.transform :as mt]
7-
[simulflow.async :as async]
7+
[simulflow.async :refer [vthread-loop]]
88
[simulflow.frame :as frame]
99
[simulflow.schema :as schema]
1010
[simulflow.utils.core :as u]
@@ -119,20 +119,19 @@
119119
(catch Exception e
120120
(t/log! :error e)))]
121121

122-
((async/vfuturize
123-
#(loop []
124-
(when-let [chunk (a/<!! stream-ch)]
125-
(let [d (delta chunk)]
126-
(if (= chunk :done)
127-
(a/>!! out-c (frame/llm-full-response-end true))
122+
(vthread-loop []
123+
(when-let [chunk (a/<!! stream-ch)]
124+
(let [d (delta chunk)]
125+
(if (= chunk :done)
126+
(a/>!! out-c (frame/llm-full-response-end true))
127+
(do
128+
(if-let [tool-call (first (:tool_calls d))]
128129
(do
129-
(if-let [tool-call (first (:tool_calls d))]
130-
(do
131-
(t/log! ["SENDING TOOL CALL" tool-call])
132-
(a/>!! out-c (frame/llm-tool-call-chunk tool-call)))
133-
(when-let [c (:content d)]
134-
(a/>!! out-c (frame/llm-text-chunk c))))
135-
(recur)))))))))))
130+
(t/log! ["SENDING TOOL CALL" tool-call])
131+
(a/>!! out-c (frame/llm-tool-call-chunk tool-call)))
132+
(when-let [c (:content d)]
133+
(a/>!! out-c (frame/llm-text-chunk c))))
134+
(recur)))))))))
136135

137136
(def openai-llm-process
138137
(flow/process
@@ -156,17 +155,17 @@
156155
:init (fn [params]
157156
(let [state (m/decode OpenAILLMConfigSchema params mt/default-value-transformer)
158157
llm-write (a/chan 100)
159-
llm-read (a/chan 1024)
160-
write-to-llm #(loop []
161-
(if-let [frame (a/<!! llm-write)]
162-
(do
163-
(t/log! :info ["AI REQUEST" (:frame/data frame)])
164-
(assert (or (frame/llm-context? frame)
165-
(frame/control-interrupt-start? frame)) "Invalid frame sent to LLM. Only llm-context or interrupt-start")
166-
(flow-do-completion! state llm-read (:frame/data frame))
167-
(recur))
168-
(t/log! {:level :info :id :llm} "Closing llm loop")))]
169-
((flow/futurize write-to-llm :exec :io))
158+
llm-read (a/chan 1024)]
159+
(vthread-loop []
160+
(if-let [frame (a/<!! llm-write)]
161+
(do
162+
(t/log! :info ["AI REQUEST" (:frame/data frame)])
163+
(assert (or (frame/llm-context? frame)
164+
(frame/control-interrupt-start? frame)) "Invalid frame sent to LLM. Only llm-context or interrupt-start")
165+
(flow-do-completion! state llm-read (:frame/data frame))
166+
(recur))
167+
(t/log! {:level :info :id :llm} "Closing llm loop")))
168+
170169
{::flow/in-ports {:llm-read llm-read}
171170
::flow/out-ports {:llm-write llm-write}}))
172171

src/simulflow/processors/silence_monitor.clj

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
(:require
33
[clojure.core.async :as a]
44
[clojure.core.async.flow :as flow]
5+
[simulflow.async :refer [vthread-loop]]
56
[simulflow.frame :as frame]))
67

78
(defn speech-start?
@@ -44,25 +45,25 @@
4445
(let [input-ch (a/chan 1024)
4546
speak-ch (a/chan 1024)
4647
speaking? (atom true)
47-
silence-message-count (atom 0)
48-
silence-detection-loop #(loop []
49-
(when-let [[frame c] (a/alts!! [(a/timeout timeout-ms) input-ch])]
50-
(if (and (= c input-ch)
51-
(voice-activity-frame? frame))
52-
(do
53-
(when (user-msg? frame) (reset! silence-message-count 0))
54-
(when (speech-start? frame) (reset! speaking? true))
55-
(when (speech-stop? frame) (reset! speaking? false))
56-
(recur))
57-
(do
58-
(when-not @speaking?
59-
(swap! silence-message-count inc)
60-
;; End the call if we prompted for activity 3 times
61-
(if (>= @silence-message-count 3)
62-
(a/>!! speak-ch (frame/speak-frame (rand-nth (vec end-call-prompts))))
63-
(a/>!! speak-ch (frame/speak-frame (rand-nth (vec prompts))))))
64-
(recur)))))]
65-
((flow/futurize silence-detection-loop :exec :io))
48+
silence-message-count (atom 0)]
49+
(vthread-loop []
50+
(when-let [[frame c] (a/alts!! [(a/timeout timeout-ms) input-ch])]
51+
(if (and (= c input-ch)
52+
(voice-activity-frame? frame))
53+
(do
54+
(when (user-msg? frame) (reset! silence-message-count 0))
55+
(when (speech-start? frame) (reset! speaking? true))
56+
(when (speech-stop? frame) (reset! speaking? false))
57+
(recur))
58+
(do
59+
(when-not @speaking?
60+
(swap! silence-message-count inc)
61+
;; End the call if we prompted for activity 3 times
62+
(if (>= @silence-message-count 3)
63+
(a/>!! speak-ch (frame/speak-frame (rand-nth (vec end-call-prompts))))
64+
(a/>!! speak-ch (frame/speak-frame (rand-nth (vec prompts))))))
65+
(recur)))))
66+
6667
{::flow/out-ports {:input input-ch}
6768
::flow/in-ports {:speak-out speak-ch}}))
6869
:transition (fn [{::flow/keys [out-ports]} transition]

0 commit comments

Comments
 (0)