Skip to content

Commit 2029f21

Browse files
committed
Add flow output transport
1 parent a77d633 commit 2029f21

File tree

2 files changed

+120
-11
lines changed

2 files changed

+120
-11
lines changed

TODO.org

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ CLOSED: [2025-01-20 Lun 07:43]
106106

107107
* TODO add core.async.flow support
108108
:LOGBOOK:
109-
CLOCK: [2025-01-25 Sat 11:14]
109+
CLOCK: [2025-01-25 Sat 15:18]--[2025-01-25 Sat 15:43] => 0:25
110+
CLOCK: [2025-01-25 Sat 11:14]--[2025-01-25 Sat 11:39] => 0:25
110111
CLOCK: [2025-01-25 Sat 09:50]--[2025-01-25 Sat 10:15] => 0:25
111112
:END:

core/src/voice_fn/experiments/flow.clj

Lines changed: 118 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,24 @@
1313
[voice-fn.processors.llm-context-aggregator :as ca]
1414
[voice-fn.processors.openai :as openai :refer [OpenAILLMConfigSchema]]
1515
[voice-fn.secrets :refer [secret]]
16+
[voice-fn.transport.async :refer [mono-time]]
17+
[voice-fn.transport.protocols :as tp]
1618
[voice-fn.transport.serializers :refer [make-twilio-serializer]]
19+
[voice-fn.utils.audio :as au]
1720
[voice-fn.utils.core :as u])
1821
(:import
1922
(java.nio HeapCharBuffer)))
2023

2124
(t/set-min-level! :debug)
2225

23-
(def transport-in
26+
(def twilio-transport-in
2427
(flow/process
25-
{:describe (fn [] {:ins {:in "Channel for audio input "}
26-
:outs {:sys-out "Channel for system messages that have priority"
27-
:out "Channel on which audio frames are put"}})
28+
{:describe (fn [] {:outs {:sys-out "Channel for system messages that have priority"
29+
:out "Channel on which audio frames are put"}
30+
:params {:transport/in-ch "Channel from which input comes"}})
31+
32+
:init (fn [{:transport/keys [in-ch]}]
33+
{::flow/in-ports {:twilio-in in-ch}})
2834

2935
:transform (fn [state _ input]
3036
(let [data (u/parse-if-json input)]
@@ -101,15 +107,17 @@
101107
::flow/out-ports {:ws-write ws-write-chan}}))
102108

103109
;; Close ws when pipeline stops
104-
:transition (fn [{:websocket/keys [conn] :as state} transition]
110+
:transition (fn [{:websocket/keys [conn]
111+
::flow/keys [in-ports out-ports] :as state} transition]
105112
(t/log! {:level :debug} ["TRANSITION" transition])
106113
(when (= transition ::flow/stop)
107114
(t/log! {:id :deepgram-transcriptor :level :info} "Closing transcription websocket connection")
108115
(reset! (:websocket/alive? state) false)
109116
(when conn
110117
(ws/send! conn deepgram/close-connection-payload)
111118
(ws/close! conn))
112-
119+
(doseq [port (concat (vals in-ports) (vals out-ports))]
120+
(a/close! port))
113121
state)
114122
state)
115123

@@ -181,13 +189,17 @@
181189
:websocket/alive? alive?
182190
::flow/in-ports {:ws-read ws-read}
183191
::flow/out-ports {:ws-write ws-write}}))
184-
:transition (fn [{:websocket/keys [conn] :as state} transition]
192+
:transition (fn [{:websocket/keys [conn]
193+
::flow/keys [in-ports out-ports]
194+
:as state} transition]
185195
(when (= transition ::flow/stop)
186196
(t/log! {:id :elevenlabs :level :info} "Closing tts websocket connection")
187197
(reset! (:websocket/alive? state) false)
188198
(when conn
189199
(ws/send! conn xi/close-stream-message)
190-
(ws/close! conn)))
200+
(ws/close! conn))
201+
(doseq [port (concat (vals in-ports) (vals out-ports))]
202+
(a/close! port)))
191203
state)
192204

193205
:transform (fn [{:audio/keys [acc] :as state} in-name msg]
@@ -235,6 +247,10 @@
235247
:llm/max-completion-tokens "Optional Max tokens in completion"
236248
:llm/extra "Optional extra model parameters"}
237249
:workload :io
250+
:transition (fn [{::flow/keys [in-ports out-ports]} transition]
251+
(when (= transition ::flow/stop)
252+
(doseq [port (concat (vals in-ports) (vals out-ports))]
253+
(a/close! port))))
238254
:init (fn [params]
239255
(let [state (m/decode OpenAILLMConfigSchema params mt/default-value-transformer)
240256
llm-write (a/chan 100)
@@ -269,9 +285,93 @@
269285
[{:acc accumulator} {:out [(frame/speak-frame sentence)]}]
270286
[{:acc accumulator}])))))
271287

288+
(def audio-splitter
289+
(flow/process
290+
{:describe (fn [] {:ins {:in "Channel for raw audio frames"}
291+
:outs {:out "Channel for audio frames split by chunk size"}})
292+
:params {:audio.out/chunk-size "The chunk size by which to split each audio
293+
frame. Specify either this or the other parameters so that chunk size can be computed"
294+
295+
:audio.out/sample-rate "Sample rate of the output audio"
296+
:audio.out/sample-size-bits "Size in bits for each sample"
297+
:audio.out/channels "Number of channels. 1 or 2 (mono or stereo audio)"
298+
:audio.out/duration-ms "Duration in ms of each chunk that will be streamed to output"}
299+
:init (fn [{:audio.out/keys [chunk-size sample-rate sample-size-bits channels duration-ms]}]
300+
(assert (or chunk-size (and sample-rate sample-size-bits channels duration-ms))
301+
"Either provide :audio.out/chunk-size or sample-rate, sample-size-bits, channels and chunk duration for the size to be computed")
302+
{:audio.out/chunk-size (or chunk-size (au/audio-chunk-size {:sample-rate sample-rate
303+
:sample-size-bits sample-size-bits
304+
:channels channels
305+
:duration-ms duration-ms}))})
306+
:transform (fn [{:audio.out/keys [chunk-size] :as state} _ frame]
307+
(cond
308+
(frame/audio-output-raw? frame)
309+
(loop [audio (:frame/data frame)
310+
chunks []]
311+
(let [audio-size (count audio)
312+
chunk-actual-size (min chunk-size audio-size)
313+
chunk (byte-array chunk-actual-size)]
314+
;; Copy chunk-size amount of data into next chunk
315+
(System/arraycopy audio 0 chunk 0 chunk-actual-size)
316+
(if (> audio-size chunk-actual-size)
317+
(let [new-audio-size (- audio-size chunk-actual-size)
318+
remaining-audio (byte-array new-audio-size)]
319+
(System/arraycopy audio chunk-actual-size remaining-audio 0 new-audio-size)
320+
(recur remaining-audio (conj chunks (frame/audio-output-raw chunk))))
321+
;; No more chunks to process, return final result
322+
[state {:out (conj chunks (frame/audio-output-raw chunk))}])))
323+
324+
:else [state]))}))
325+
326+
(def realtime-transport-out-processor
327+
"Processor that streams audio out in real time so we can account for
328+
interruptions."
329+
(flow/process
330+
{:describe (fn [] {:ins {:in "Channel for audio output frames "}
331+
:outs {:out "Channel on which serialized buffered output is put"}})
332+
:params {:transport/out-chan "Channel on which to put buffered serialized audio"
333+
:audio.out/duration-ms "Duration of each audio chunk. Defaults to 20ms"
334+
:transport/supports-interrupt? "Whether the processor supports interrupt or not"}
335+
:transition (fn [{::flow/keys [in-ports out-ports]} transition]
336+
(when (= transition ::flow/stop)
337+
(doseq [port (concat (vals in-ports) (vals out-ports))]
338+
(a/close! port))))
339+
:init (fn [{:audio.out/keys [duration-ms]
340+
:transport/keys [out-chan]}]
341+
(assert out-chan "Required :transport/out-chan for sending output")
342+
(let [;; send every 10ms to account for network
343+
duration (or duration-ms 20)
344+
sending-interval (/ duration 2)
345+
next-send-time (atom (mono-time))
346+
347+
audio-write-c (a/chan 1024)
348+
realtime-loop #(loop []
349+
(when-let [msg (a/<!! audio-write-c)]
350+
(let [now (mono-time)]
351+
(a/<!! (a/timeout (- @next-send-time now)))
352+
(a/put! out-chan msg)
353+
(reset! next-send-time (+ now sending-interval)))
354+
(recur)))]
355+
((flow/futurize realtime-loop :exec :io))
356+
{::flow/out-ports {:audio-write audio-write-c}}))
357+
358+
:transform (fn [{:transport/keys [serializer] :as state} _ msg]
359+
(cond
360+
(frame/audio-output-raw? msg)
361+
[state {:audio-write [(if serializer
362+
(tp/serialize-frame serializer msg)
363+
msg)]}]
364+
365+
(frame/system-config-change? msg)
366+
(if-let [serializer (:transport/serializer (:frame/data msg))]
367+
[(assoc state :transport/serializer serializer)]
368+
[state])
369+
370+
:else [state]))}))
371+
272372
(def gdef
273373
{:procs
274-
{:transport-in {:proc transport-in}
374+
{:transport-in {:proc twilio-transport-in}
275375
:deepgram-transcriptor {:proc deepgram-processor
276376
:args {:transcription/api-key (secret [:deepgram :api-key])
277377
:transcription/interim-results? true
@@ -312,6 +412,13 @@
312412
:flow/language :en
313413
:audio.out/encoding :ulaw
314414
:audio.out/sample-rate 8000}}
415+
:audio-splitter {:proc audio-splitter
416+
:args {:audio.out/sample-rate 8000
417+
:audio.out/sample-size-bits 8
418+
:audio.out/channels 1
419+
:audio.out/duration-ms 20}}
420+
:realtime-out {:proc realtime-transport-out-processor
421+
:args {:transport/out-chan (a/chan 1024)}}
315422

316423
:print-sink {:proc (flow/process
317424
{:describe (fn [] {:ins {:in "Channel for receiving transcriptions"}})
@@ -332,7 +439,8 @@
332439
[[:llm :out] [:llm-sentence-assembler :in]]
333440
[[:llm-sentence-assembler :out] [:tts :in]]
334441

335-
[[:tts :out] [:print-sink :in]]]})
442+
[[:tts :out] [:audio-splitter :in]]
443+
[[:audio-splitter :out] [:realtime-out :in]]]})
336444

337445
(comment
338446
(datafy (:proc (:deepgram-transcriptor (:procs gdef))))

0 commit comments

Comments
 (0)