Skip to content

Commit 83067e3

Browse files
committed
Add malli config for transport in, add predefined VAD analyser support
1 parent b9d75e1 commit 83067e3

File tree

5 files changed

+119
-54
lines changed

5 files changed

+119
-54
lines changed

TODO.org

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
#+TITLE: Tasks to do for simulflow
22
#+startup: indent content
33

4+
* DONE Add standard =vad= keywords that are supported by simulflow by default (currently just =silero/vad=)
5+
CLOSED: [2025-08-27 Wed 13:23]
6+
:LOGBOOK:
7+
CLOCK: [2025-08-27 Wed 10:08]--[2025-08-27 Wed 13:23] => 3:15
8+
:END:
9+
10+
411
* TODO Change audio utils for conversion into specific encoding change (ulaw->pcm, pcm->ulaw etc) and resampling (same encoding, different sample rate))
512
* TODO Standardize serializers/deserializers to have within the pipeline 16kHz PCM audio
613
* TODO [#A] Add TTFT metric

src/simulflow/schema.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -771,7 +771,7 @@
771771
:max 1.0
772772
:error/message "Confidence must be between 0.0 and 1.0"}])
773773

774-
(def VADAnalyzerProtocol
774+
(def VADAnalyserProtocol
775775
"Schema for objects implementing the VADAnalyzer protocol"
776776
[:fn
777777
{:description "Object implementing simulflow.vad.core/VADAnalyzer protocol"

src/simulflow/transport/in.clj

Lines changed: 101 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@
22
(:require
33
[clojure.core.async :as a :refer [close!]]
44
[clojure.core.async.flow :as flow]
5+
[malli.util :as mu]
56
[simulflow.async :refer [vthread-loop]]
67
[simulflow.frame :as frame]
8+
[simulflow.schema :as schema]
79
[simulflow.transport.codecs :refer [make-twilio-serializer]]
810
[simulflow.utils.audio :as audio]
911
[simulflow.utils.core :as u]
1012
[simulflow.vad.core :as vad]
13+
[simulflow.vad.factory :as vad-factory]
1114
[taoensso.telemere :as t]
1215
[uncomplicate.clojure-sound.core :as sound]
1316
[uncomplicate.clojure-sound.sampled :as sampled])
@@ -16,9 +19,27 @@
1619

1720
;; Base logic for all input transport
1821

22+
(def CommonTransportInputConfig
23+
[:map
24+
[:vad/analyser {:description "An instance of simulflow.vad.core/VADAnalyser protocol or one of the standard simulflow supported VAD processors to be used on new audio."
25+
:optional true}
26+
[:or schema/VADAnalyserProtocol (into [:enum] (keys vad-factory/factory))]]
27+
[:vad/args {:description "If `:vad/analyser` is a standard simulflow vad (like silero), these args are used as args to the vad factory"
28+
:optional true} [:map]]
29+
[:pipeline/supports-interrupt? {:description "Whether the pipeline supports or not interruptions."
30+
:default false
31+
:optional true} :boolean]])
32+
33+
(def BaseTransportInputConfig
34+
(mu/merge
35+
CommonTransportInputConfig
36+
[:map
37+
[:transport/in-ch
38+
{:description "Core async channel to take audio data from. The data is raw byte array or serialzed if a deserializer is provided"}
39+
schema/CoreAsyncChannel]]))
40+
1941
(def base-input-params
20-
{:vad/analyser "An instance of simulflow.vad.core/VADAnalyser protocol to be used on new audio."
21-
:pipeline/supports-interrupt? "Whether the pipeline supports or not interruptions."})
42+
(schema/->describe-parameters CommonTransportInputConfig))
2243

2344
(def base-transport-outs {:sys-out "Channel for system messages that have priority"
2445
:out "Channel on which audio frames are put"})
@@ -32,6 +53,49 @@
3253
:data {:vad/prev-state prev-vad-state
3354
:vad/state vad-state}})))
3455

56+
(defn init-vad!
57+
[{:vad/keys [analyser] :as params}]
58+
(when analyser
59+
(cond
60+
(satisfies? vad/VADAnalyzer analyser)
61+
analyser
62+
63+
(keyword? analyser)
64+
(if-let [make-vad (get vad-factory/factory analyser)]
65+
(if (contains? params :vad/args)
66+
(make-vad (:vad/args params))
67+
(make-vad))
68+
(throw (ex-info "Something went wrong initiating :vad/analyser for transport in"
69+
{:params params
70+
:cause ::unknown-vad})))
71+
72+
:else
73+
(throw (ex-info "Something went wrong initiating :vad/analyser for transport in"
74+
{:params params
75+
:cause ::unknown-vad})))))
76+
77+
(defn base-transport-in-init!
78+
[schema params]
79+
(let [{:transport/keys [in-ch] :as parsed-params} (schema/parse-with-defaults schema params)
80+
vad-analyser (init-vad! parsed-params)]
81+
(into parsed-params {::flow/in-ports {::in in-ch}
82+
:vad/analyser vad-analyser})))
83+
84+
(defn base-transport-in-transition!
85+
[{::flow/keys [in-ports out-ports] :as state} transition]
86+
(when (= transition ::flow/stop)
87+
(doseq [port (remove nil? (concat (vals in-ports) (vals out-ports)))]
88+
(a/close! port))
89+
(when-let [close-fn (::close state)]
90+
(when (fn? close-fn)
91+
(t/log! {:level :info
92+
:id :transport-in} "Closing input")
93+
(close-fn)))
94+
(when-let [analyser (:vad/analyser state)]
95+
(t/log! {:level :debug :id :transport-in :msg "Cleaning up vad analyser"})
96+
(vad/cleanup analyser)))
97+
state)
98+
3599
(defn base-input-transport-transform
36100
"Base input transport logic that is used by most transport input processors.
37101
Assumes audio-input-raw frames that come in are 16kHz PCM mono. Conversion to
@@ -71,12 +135,35 @@
71135

72136
;; Twilio transport in
73137

138+
(def TwilioTransportInConfig
139+
(mu/merge
140+
BaseTransportInputConfig
141+
[:map
142+
[:twilio/handle-event
143+
{:description "[DEPRECATED] Optional function to be called when a new twilio event is received. Return a map like {cid [frame1 frame2]} to put new frames on the pipeline"
144+
:optional true}
145+
[:=> [:cat :map] :map]]
146+
[:serializer/convert-audio?
147+
{:description "If the serializer that is created should convert audio to 8kHz ULAW or not."
148+
:optional true
149+
:default false} :boolean]
150+
[:transport/send-twilio-serializer?
151+
{:description "Whether to send a `::frame/system-config-change` with a `twilio-frame-serializer` when a twilio start frame is received. Default true"
152+
:optional true
153+
:default true} :boolean]]))
154+
155+
(def twilio-transport-in-describe
156+
{:outs base-transport-outs
157+
:params (schema/->describe-parameters TwilioTransportInConfig)})
158+
159+
(def twilio-transport-in-init! (partial base-transport-in-init! TwilioTransportInConfig))
160+
74161
(defn twilio-transport-in-transform
75162
[{:twilio/keys [handle-event]
76163
:transport/keys [send-twilio-serializer?]
77164
:or {send-twilio-serializer? true}
78165
:as state} in input]
79-
(if (= in ::twilio-in)
166+
(if (= in ::in)
80167
(let [data (u/parse-if-json input)
81168
output (if (fn? handle-event)
82169
(do
@@ -109,23 +196,10 @@
109196
[state]))
110197
(base-input-transport-transform state in input)))
111198

112-
(defn twilio-transport-in-init!
113-
[{:transport/keys [in-ch] :as state}]
114-
(into state
115-
{::flow/in-ports {::twilio-in in-ch}}))
116-
117-
(def twilio-transport-in-describe
118-
{:outs base-transport-outs
119-
:params (into base-input-params
120-
{:transport/in-ch "Channel from which input comes"
121-
:twilio/handle-event "[DEPRECATED] Optional function to be called when a new twilio event is received. Return a map like {cid [frame1 frame2]} to put new frames on the pipeline"
122-
:serializer/convert-audio? "If the serializer that is created should convert audio to 8kHz ULAW or not."
123-
:transport/send-twilio-serializer? "Whether to send a `::frame/system-config-change` with a `twilio-frame-serializer` when a twilio start frame is received. Default true"})})
124-
125199
(defn twilio-transport-in-fn
126200
([] twilio-transport-in-describe)
127201
([params] (twilio-transport-in-init! params))
128-
([state _] state)
202+
([state trs] (base-transport-in-transition! state trs))
129203
([state in msg] (twilio-transport-in-transform state in msg)))
130204

131205
(def twilio-transport-in
@@ -155,8 +229,10 @@
155229
:params base-input-params})
156230

157231
(defn mic-transport-in-init!
158-
[state]
159-
(let [{:keys [buffer-size audio-format channel-size]} mic-resource-config
232+
[params]
233+
(let [parsed-params (schema/parse-with-defaults CommonTransportInputConfig params)
234+
vad-analyser (init-vad! parsed-params)
235+
{:keys [buffer-size audio-format channel-size]} mic-resource-config
160236
line (audio/open-line! :target audio-format)
161237
mic-in-ch (a/chan channel-size)
162238
buffer (byte-array buffer-size)
@@ -178,24 +254,11 @@
178254
;; Brief pause before retrying to prevent tight error loop
179255
(Thread/sleep 100)))
180256
(recur)))
181-
(into state
182-
{::flow/in-ports {::mic-in mic-in-ch}
257+
(into parsed-params
258+
{::flow/in-ports {::in mic-in-ch}
259+
:vad/analyser vad-analyser
183260
::close close})))
184261

185-
(defn mic-transport-in-transition
186-
[state transition]
187-
(when (= transition ::flow/stop)
188-
(when-let [close-fn (::close state)]
189-
(when (fn? close-fn)
190-
(t/log! {:level :info
191-
:id :transport-in} "Closing input")
192-
(close-fn)))
193-
(when-let [analyser (:vad/analyser state)]
194-
(t/log! {:level :debug :id :transport-in :msg "Cleaning up vad analyser"})
195-
(vad/cleanup analyser)))
196-
197-
state)
198-
199262
(defn mic-transport-in-transform
200263
[state in {:keys [audio-data timestamp]}]
201264
(base-input-transport-transform state in (frame/audio-input-raw audio-data {:timestamp timestamp})))
@@ -205,7 +268,7 @@
205268
([] (mic-transport-in-describe))
206269
([params] (mic-transport-in-init! params))
207270
([state transition]
208-
(mic-transport-in-transition state transition))
271+
(base-transport-in-transition! state transition))
209272
([state in msg]
210273
(mic-transport-in-transform state in msg)))
211274

@@ -217,24 +280,12 @@
217280
{:outs base-transport-outs
218281
:params (into base-input-params {:transport/in-ch "Channel from which input comes. Input should be byte array"})})
219282

220-
(defn async-transport-in-transition
221-
[{::flow/keys [in-ports out-ports] :as state} transition]
222-
(when (= transition ::flow/stop)
223-
(doseq [port (remove nil? (concat (vals in-ports) (vals out-ports)))]
224-
(a/close! port))
225-
(when-let [analyser (:vad/analyser state)]
226-
(t/log! {:level :debug :id :transport-in :msg "Cleaning up vad analyser"})
227-
(vad/cleanup analyser))
228-
state))
229-
230-
(defn async-transport-in-init!
231-
[{:transport/keys [in-ch] :as state}]
232-
(into state {::flow/in-ports {:in in-ch}}))
283+
(def async-transport-in-init! (partial base-transport-in-init! BaseTransportInputConfig))
233284

234285
(defn async-transport-in-fn
235286
([] async-transport-in-describe)
236287
([state] (async-transport-in-init! state))
237-
([state transition] (async-transport-in-transition state transition))
288+
([state transition] (base-transport-in-transition! state transition))
238289
([state in msg] (base-input-transport-transform state in msg)))
239290

240291
(def async-transport-in-process (flow/process async-transport-in-fn))

src/simulflow/vad/factory.clj

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
(ns simulflow.vad.factory
2+
"Standard vad processors supported by simulflow"
3+
(:require
4+
[simulflow.vad.silero :as silero]))
5+
6+
(def factory
7+
{:vad.analyser/silero silero/create-silero-vad})

test/simulflow/transport/in_test.clj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,13 +134,13 @@
134134
(is (= [state {:sys-out [test-config-change-frame]}]
135135
(in/twilio-transport-in-transform
136136
state
137-
::in/twilio-in
137+
::in/in
138138
(u/json-str {:event "start"}))))
139139

140140
(testing "Merges frames in the same array if more are generated"
141141
(let [[new-state {:keys [sys-out]}] (in/twilio-transport-in-transform
142142
state
143-
::in/twilio-in
143+
::in/in
144144
(u/json-str {:event "start"
145145
:streamSid "hello"}))]
146146
(is (= state new-state))
@@ -151,7 +151,7 @@
151151
(let [state {:transport/send-twilio-serializer? false}
152152
[new-state out] (in/twilio-transport-in-transform
153153
state
154-
::in/twilio-in
154+
::in/in
155155
(u/json-str {:event "start"
156156
:streamSid "hello"}))
157157
config-change-frame (-> out :sys-out first)

0 commit comments

Comments
 (0)