Skip to content

Commit 7a2d029

Browse files
committed
More tests for elevenlabs
1 parent 9c06a51 commit 7a2d029

File tree

2 files changed

+241
-95
lines changed

2 files changed

+241
-95
lines changed

src/simulflow/processors/elevenlabs.clj

Lines changed: 150 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@
3939

4040
(comment
4141
(make-elevenlabs-ws-url
42-
{:elevenlabs/api-key (secrets/secret [:elevenlabs :api-key])
43-
:elevenlabs/model-id "eleven_flash_v2_5"
44-
:elevenlabs/voice-id (secrets/secret [:elevenlabs :voice-id])
45-
:voice/stability 0.5
46-
:voice/similarity-boost 0.8
47-
:voice/use-speaker-boost? true}))
42+
{:elevenlabs/api-key (secrets/secret [:elevenlabs :api-key])
43+
:elevenlabs/model-id "eleven_flash_v2_5"
44+
:elevenlabs/voice-id (secrets/secret [:elevenlabs :voice-id])
45+
:voice/stability 0.5
46+
:voice/similarity-boost 0.8
47+
:voice/use-speaker-boost? true}))
4848

4949
(defn begin-stream-message
5050
[{:voice/keys [stability similarity-boost use-speaker-boost?]
@@ -77,14 +77,14 @@
7777
:description "ElevenLabs TTS configuration"}
7878
[:elevenlabs/api-key
7979
[:string
80-
{:min 32 ;; ElevenLabs API keys are typically long
80+
{:min 32 ;; ElevenLabs API keys are typically long
8181
:secret true ;; Marks this as sensitive data
8282
:description "ElevenLabs API key"}]]
8383
[:elevenlabs/model-id
8484
(schema/flex-enum
85-
{:default "eleven_flash_v2_5"
86-
:description "ElevenLabs model identifier"}
87-
["eleven_multilingual_v2" "eleven_turbo_v2_5" "eleven_turbo_v2" "eleven_monolingual_v1" "eleven_multilingual_v1" "eleven_multilingual_sts_v2" "eleven_flash_v2" "eleven_flash_v2_5" "eleven_english_sts_v2"])]
85+
{:default "eleven_flash_v2_5"
86+
:description "ElevenLabs model identifier"}
87+
["eleven_multilingual_v2" "eleven_turbo_v2_5" "eleven_turbo_v2" "eleven_monolingual_v1" "eleven_multilingual_v1" "eleven_multilingual_sts_v2" "eleven_flash_v2" "eleven_flash_v2_5" "eleven_english_sts_v2"])]
8888
[:elevenlabs/voice-id
8989
[:string
9090
{:min 20 ;; ElevenLabs voice IDs are fixed length
@@ -107,86 +107,145 @@
107107
{:default true
108108
:description "Whether to enable speaker beoost enhancement"}]]])
109109

110-
(defn tts-transform [{::keys [accumulator] :as state} in-name msg]
111-
(if (= in-name ::ws-read)
112-
;; xi sends one json response in multiple events so it needs
113-
;; to be concattenated until the final json can be parsed
114-
(let [attempt (u/parse-if-json (str accumulator msg))]
115-
(if (map? attempt)
116-
[(assoc state ::accumulator "")
117-
(when-let [audio (:audio attempt)]
118-
{:out [(frame/audio-output-raw (u/decode-base64 audio) {:timestamp (:now state)})
119-
(frame/xi-audio-out attempt {:timestamp (:now state)})]})]
120-
121-
;; continue concatenating
122-
[(assoc state ::accumulator attempt)]))
123-
(cond
124-
(frame/speak-frame? msg)
125-
[state {::ws-write [(text-message (:frame/data msg))]}]
126-
:else [state])))
110+
;; ============================================================================
111+
;; Pure Functions for JSON Processing
112+
;; ============================================================================
113+
114+
(defn accumulate-json-response
115+
"Pure function to accumulate JSON response fragments.
116+
Returns [new-accumulator parsed-json-or-nil]"
117+
[current-accumulator new-fragment]
118+
(let [combined (str current-accumulator new-fragment)
119+
parsed (u/parse-if-json combined)]
120+
(if (map? parsed)
121+
["" parsed] ; Successfully parsed, reset accumulator
122+
[combined nil]))) ; Still accumulating
123+
124+
(defn process-completed-json
125+
"Pure function to process completed JSON response into frames.
126+
Returns seq of frames or nil if no audio data."
127+
[json-response timestamp]
128+
(when-let [audio (:audio json-response)]
129+
[(frame/audio-output-raw (u/decode-base64 audio) {:timestamp timestamp})
130+
(frame/xi-audio-out json-response {:timestamp timestamp})]))
131+
132+
(defn process-speak-frame
133+
"Pure function to process speak frame into WebSocket message.
134+
Returns WebSocket message string."
135+
[speak-frame]
136+
(text-message (:frame/data speak-frame)))
137+
138+
(defn process-websocket-message
139+
"Pure function to process WebSocket message and update accumulator state.
140+
Returns [new-state output-map]"
141+
[state message timestamp]
142+
(let [current-accumulator (::accumulator state)
143+
[new-accumulator parsed-json] (accumulate-json-response current-accumulator message)]
144+
(if parsed-json
145+
;; JSON parsing complete
146+
(let [new-state (assoc state ::accumulator new-accumulator)
147+
frames (process-completed-json parsed-json timestamp)]
148+
[new-state (when frames {:out frames})])
149+
;; Still accumulating
150+
[(assoc state ::accumulator new-accumulator) {}])))
151+
152+
;; ============================================================================
153+
;; WebSocket Configuration
154+
;; ============================================================================
155+
156+
(defn create-websocket-config
157+
"Pure function to create WebSocket configuration map"
158+
[args ws-read ws-write]
159+
(let [configuration-msg (begin-stream-message args)]
160+
{:on-open (fn [ws]
161+
(t/log! :debug ["Elevenlabs websocket connection open. Sending configuration message" configuration-msg])
162+
(ws/send! ws configuration-msg))
163+
:on-message (fn [_ws ^HeapCharBuffer data _last?]
164+
(a/put! ws-read (str data)))
165+
:on-error (fn [_ e]
166+
(t/log! :error ["Elevenlabs websocket error" (ex-message e)]))
167+
:on-close (fn [_ws code reason]
168+
(t/log! :debug ["Elevenlabs websocket connection closed" "Code:" code "Reason:" reason]))}))
169+
170+
(defn elevenlabs-tts-transform
171+
"Modular transform function using pure helper functions"
172+
[state in-name msg]
173+
(cond
174+
;; Handle WebSocket messages (JSON accumulation and parsing)
175+
(= in-name ::ws-read)
176+
(process-websocket-message state msg (:now state))
177+
178+
;; Handle speak frames (convert to WebSocket messages)
179+
(frame/speak-frame? msg)
180+
[state {::ws-write [(process-speak-frame msg)]}]
181+
182+
;; Default case - no action
183+
:else
184+
[state]))
185+
186+
(def elevenlabs-tts-describe
187+
{:ins {:sys-in "Channel for system messages that take priority"
188+
:in "Channel for audio input frames (from transport-in) "}
189+
:outs {:sys-out "Channel for system messages that have priority"
190+
:out "Channel on which transcription frames are put"}
191+
:params {:elevenlabs/api-key "Api key required for 11labs connection"
192+
:elevenlabs/model-id "Model used for voice generation"
193+
:elevenlabs/voice-id "Voice id"
194+
:voice/stability "Optional voice stability factor (0.0 to 1.0)"
195+
:voice/similarity-boost "Optional voice similarity boost factor (0.0 to 1.0)"
196+
:voice/use-speaker-boost? "Wether to enable speaker boost enchancement"
197+
:flow/language "Language to use"
198+
:audio.out/encoding "Encoding for the audio generated"
199+
:audio.out/sample-rate "Sample rate for the audio generated"}
200+
:workload :io})
201+
202+
(defn elevenlabs-tts-init! [args]
203+
(let [url (make-elevenlabs-ws-url args)
204+
ws-read (a/chan 100)
205+
ws-write (a/chan 100)
206+
alive? (atom true)
207+
conf (assoc (create-websocket-config args ws-read ws-write)
208+
:on-close (fn [_ws code reason]
209+
(reset! alive? false)
210+
(t/log! :debug ["Elevenlabs websocket connection closed" "Code:" code "Reason:" reason])))
211+
_ (t/log! {:level :debug :id :elevenlabs} "Connecting to transcription websocket")
212+
ws-conn @(ws/websocket url conf)]
213+
(vthread-loop []
214+
(when @alive?
215+
(when-let [msg (a/<!! ws-write)]
216+
(when @alive?
217+
(ws/send! ws-conn msg))
218+
(recur))))
219+
(vthread-loop []
220+
(when @alive?
221+
(a/<!! (a/timeout 3000))
222+
(t/log! {:level :trace :id :elevenlabs} "Sending keep-alive message")
223+
(ws/send! ws-conn keep-alive-message)
224+
(recur)))
225+
{:websocket/conn ws-conn
226+
:websocket/alive? alive?
227+
::flow/in-ports {::ws-read ws-read}
228+
::flow/out-ports {::ws-write ws-write}
229+
::accumulator ""}))
230+
231+
(defn elevenlabs-tts-transition
232+
[{:websocket/keys [conn] ::flow/keys [in-ports out-ports] :as state} transition]
233+
(when (= transition ::flow/stop)
234+
(t/log! {:id :elevenlabs :level :info} "Closing tts websocket connection")
235+
(reset! (:websocket/alive? state) false)
236+
(when conn
237+
(ws/send! conn close-stream-message)
238+
(ws/close! conn))
239+
(doseq [port (concat (vals in-ports) (vals out-ports))]
240+
(a/close! port)))
241+
state)
242+
243+
(defn elevenlabs-tts-process-fn
244+
([] elevenlabs-tts-describe)
245+
([params] (elevenlabs-tts-init! params))
246+
([state transition] (elevenlabs-tts-transition state transition))
247+
([state in msg]
248+
(elevenlabs-tts-transform state in msg)))
127249

128250
(def elevenlabs-tts-process
129-
(flow/process
130-
(flow/map->step
131-
{:describe (fn [] {:ins {:sys-in "Channel for system messages that take priority"
132-
:in "Channel for audio input frames (from transport-in) "}
133-
:outs {:sys-out "Channel for system messages that have priority"
134-
:out "Channel on which transcription frames are put"}
135-
:params {:elevenlabs/api-key "Api key required for 11labs connection"
136-
:elevenlabs/model-id "Model used for voice generation"
137-
:elevenlabs/voice-id "Voice id"
138-
:voice/stability "Optional voice stability factor (0.0 to 1.0)"
139-
:voice/similarity-boost "Optional voice similarity boost factor (0.0 to 1.0)"
140-
:voice/use-speaker-boost? "Wether to enable speaker boost enchancement"
141-
:flow/language "Language to use"
142-
:audio.out/encoding "Encoding for the audio generated"
143-
:audio.out/sample-rate "Sample rate for the audio generated"}
144-
:workload :io})
145-
:init (fn [args]
146-
(let [url (make-elevenlabs-ws-url args)
147-
ws-read (a/chan 100)
148-
ws-write (a/chan 100)
149-
configuration-msg (begin-stream-message args)
150-
alive? (atom true)
151-
conf {:on-open (fn [ws]
152-
(t/log! :debug ["Elevenlabs websocket connection open. Sending configuration message" configuration-msg])
153-
(ws/send! ws configuration-msg))
154-
:on-message (fn [_ws ^HeapCharBuffer data _last?]
155-
(a/put! ws-read (str data)))
156-
:on-error (fn [_ e]
157-
(t/log! :error ["Elevenlabs websocket error" (ex-message e)]))
158-
:on-close (fn [_ws code reason]
159-
(reset! alive? false)
160-
(t/log! :debug ["Elevenlabs websocket connection closed" "Code:" code "Reason:" reason]))}
161-
_ (t/log! {:level :debug :id :elevenlabs} "Connecting to transcription websocket")
162-
ws-conn @(ws/websocket url conf)]
163-
(vthread-loop []
164-
(when @alive?
165-
(when-let [msg (a/<!! ws-write)]
166-
(when @alive?
167-
(ws/send! ws-conn msg))
168-
(recur))))
169-
(vthread-loop []
170-
(when @alive?
171-
(a/<!! (a/timeout 3000))
172-
(t/log! {:level :trace :id :elevenlabs} "Sending keep-alive message")
173-
(ws/send! ws-conn keep-alive-message)
174-
(recur)))
175-
{:websocket/conn ws-conn
176-
:websocket/alive? alive?
177-
::flow/in-ports {::ws-read ws-read}
178-
::flow/out-ports {::ws-write ws-write}}))
179-
:transition (fn [{:websocket/keys [conn]
180-
::flow/keys [in-ports out-ports]
181-
:as state} transition]
182-
(when (= transition ::flow/stop)
183-
(t/log! {:id :elevenlabs :level :info} "Closing tts websocket connection")
184-
(reset! (:websocket/alive? state) false)
185-
(when conn
186-
(ws/send! conn close-stream-message)
187-
(ws/close! conn))
188-
(doseq [port (concat (vals in-ports) (vals out-ports))]
189-
(a/close! port)))
190-
state)
191-
192-
:transform tts-transform})))
251+
(flow/process elevenlabs-tts-process-fn))

test/simulflow/processors/elevenlabs_test.clj

Lines changed: 91 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@
2222

2323
(deftest elevenlabs-tts
2424
(testing "Sends speak frames as json payloads to the websocket connection for generating speech"
25-
(is (= (elevenlabs/tts-transform {} :in (frame/speak-frame "Test speech"))
25+
(is (= (elevenlabs/elevenlabs-tts-transform {} :in (frame/speak-frame "Test speech"))
2626
[{} {::elevenlabs/ws-write ["{\"text\":\"Test speech \",\"flush\":true}"]}])))
2727
(testing "Keeps incomplete websocket results until they can be parsed"
28-
(is (= (elevenlabs/tts-transform {::elevenlabs/accumulator ""} ::elevenlabs/ws-read incomplete-result-json)
29-
[{::elevenlabs/accumulator incomplete-result-json}])))
28+
(is (= (elevenlabs/elevenlabs-tts-transform {::elevenlabs/accumulator ""} ::elevenlabs/ws-read incomplete-result-json)
29+
[{::elevenlabs/accumulator incomplete-result-json} {}])))
3030
(testing "Parses completed JSON results and sends results to out"
31-
(let [[state {[audio-out-frame xi-frame] :out}] (elevenlabs/tts-transform
31+
(let [[state {[audio-out-frame xi-frame] :out}] (elevenlabs/elevenlabs-tts-transform
3232
{::elevenlabs/accumulator incomplete-result-json
3333
:now current-time}
3434
::elevenlabs/ws-read
@@ -38,3 +38,90 @@
3838
(is (update-in audio-out-frame [:frame/data] vec) {:frame/type ::frame/audio-output-raw
3939
:frame/ts current-time
4040
:frame/data (vec (range 20))}))))
41+
42+
(deftest test-accumulate-json-response
43+
(testing "accumulates partial JSON correctly"
44+
(is (= (elevenlabs/accumulate-json-response "" "{\"partial\":")
45+
["{\"partial\":" nil]))
46+
(is (= (elevenlabs/accumulate-json-response "{\"partial\":" " \"data\": true}")
47+
["{\"partial\": \"data\": true}" nil])))
48+
49+
(testing "parses complete JSON and resets accumulator"
50+
(is (= (elevenlabs/accumulate-json-response "" "{\"audio\": \"test\"}")
51+
["" {:audio "test"}]))
52+
(is (= (elevenlabs/accumulate-json-response "{\"audio\": \"d" "GVzdA==\"}")
53+
["" {:audio "dGVzdA=="}])))
54+
55+
(testing "handles invalid JSON gracefully"
56+
(is (= (elevenlabs/accumulate-json-response "" "not json")
57+
["not json" nil]))))
58+
59+
(deftest test-process-completed-json
60+
(testing "creates audio frames from valid JSON"
61+
(let [json {:audio "dGVzdA==" :alignment nil :isFinal true}
62+
timestamp #inst "2025-01-01"
63+
[audio-frame xi-frame] (elevenlabs/process-completed-json json timestamp)]
64+
(is (= (:frame/type audio-frame) :simulflow.frame/audio-output-raw))
65+
(is (= (vec (:frame/data audio-frame)) [116 101 115 116])) ; "test" in bytes
66+
(is (= (:frame/ts audio-frame) timestamp))
67+
(is (= (:frame/type xi-frame) :simulflow.frame/xi-audio-out))
68+
(is (= (:frame/data xi-frame) json))))
69+
70+
(testing "returns nil for JSON without audio"
71+
(is (nil? (elevenlabs/process-completed-json {} #inst "2025-01-01")))
72+
(is (nil? (elevenlabs/process-completed-json {:other "data"} #inst "2025-01-01")))))
73+
74+
(deftest test-process-speak-frame
75+
(testing "converts speak frame to WebSocket message"
76+
(let [speak-frame (frame/speak-frame "Hello world")
77+
message (elevenlabs/process-speak-frame speak-frame)]
78+
(is (= message "{\"text\":\"Hello world \",\"flush\":true}"))))
79+
80+
(testing "handles empty speak frame"
81+
(let [speak-frame (frame/speak-frame "")
82+
message (elevenlabs/process-speak-frame speak-frame)]
83+
(is (= message "{\"text\":\" \",\"flush\":true}")))))
84+
85+
(deftest test-process-websocket-message
86+
(testing "accumulates partial messages"
87+
(let [state {::elevenlabs/accumulator ""}
88+
partial-msg "{\"audio\": \"dGVz"
89+
[new-state output] (elevenlabs/process-websocket-message state partial-msg #inst "2025-01-01")]
90+
(is (= (::elevenlabs/accumulator new-state) "{\"audio\": \"dGVz"))
91+
(is (= output {}))))
92+
93+
(testing "completes and processes full JSON"
94+
(let [state {::elevenlabs/accumulator "{\"audio\": \"dGVz"}
95+
completion "dA==\", \"alignment\": null, \"isFinal\": true}"
96+
timestamp #inst "2025-01-01"
97+
[new-state output] (elevenlabs/process-websocket-message state completion timestamp)]
98+
(is (= (::elevenlabs/accumulator new-state) ""))
99+
(is (= (count (:out output)) 2))
100+
(is (= (:frame/type (first (:out output))) :simulflow.frame/audio-output-raw))))
101+
102+
(testing "processes complete JSON in single message"
103+
(let [state {::elevenlabs/accumulator ""}
104+
complete-msg "{\"audio\": \"dGVzdA==\", \"alignment\": null, \"isFinal\": true}"
105+
timestamp #inst "2025-01-01"
106+
[new-state output] (elevenlabs/process-websocket-message state complete-msg timestamp)]
107+
(is (= (::elevenlabs/accumulator new-state) ""))
108+
(is (= (count (:out output)) 2)))))
109+
110+
(deftest test-elevenlabs-tts-transform
111+
(testing "delegates to correct pure functions"
112+
;; Test speak frame processing
113+
(let [speak-frame (frame/speak-frame "Test")
114+
[_ output] (elevenlabs/elevenlabs-tts-transform {} :in speak-frame)]
115+
(is (= output {::elevenlabs/ws-write ["{\"text\":\"Test \",\"flush\":true}"]})))
116+
117+
;; Test WebSocket message processing
118+
(let [ws-msg "{\"audio\": \"dGVzdA==\", \"alignment\": null, \"isFinal\": true}"
119+
state {::elevenlabs/accumulator "" :now #inst "2025-01-01"}
120+
[new-state output] (elevenlabs/elevenlabs-tts-transform state ::elevenlabs/ws-read ws-msg)]
121+
(is (= (::elevenlabs/accumulator new-state) ""))
122+
(is (= (count (:out output)) 2)))
123+
124+
;; Test unknown input port
125+
(let [[state output] (elevenlabs/elevenlabs-tts-transform {} :unknown-port "message")]
126+
(is (= state {}))
127+
(is (nil? output)))))

0 commit comments

Comments
 (0)