Change audio utils for conversion into specific encoding change (ulaw->pcm, pcm->ulaw etc) and resampling (same encoding, different sample rate))
[#A] Add LLM usage metrics based on chunks responses API docs for usage
[#A] Implement background noise filtering with krisp.ai
Our task for today is to finalize the interruption flow to work correctly. Don’t edit anything, we just want to plan the implementation. The interruption flow consists of this:
- if the user speaks, aka a user-speech-start frame is emitted, the transport realtime output is interrupted from sending out realtime out frames At the same time, transport-out (realtime out process) discards new raw-audio-out frames until it receives a user-speech-stop frame
- the llm process (openai.clj for example) when it receives a user-speech-start frame, if there is a in-flight request for generating responses, that request should be discarded - here we need to plan ahead for request interruption as that is not yet supported. We use hato which is a clojure wrapper over the Java 11 HTTP Client
- the text-to-speech process needs to drop its audio buffer if it is
accumulating an audio response, and also maintain a :pipeline/interrupted?
state. New results from the websocket connection will be dropped while
:pipeline/interrupted? is true. This flag is toggled true on
:user-speech-start and ends on :user-speech-stop frames
Actually sorry, it is not on :user-speech-stop/start frames but on
::control-interrupt-start ::control-interrupt-stop that these things occur.
- simulflow uses a sentence assembler that assembles streaming llm tokens until it has a full sentence and sends it forward as a speak frame. Usually this goes to the TTS process and to the context aggregator. This process also needs to drop all accumulated buffer for a partial sentence and not assemble anymore when pipeline is interrupted
- Context aggregator takes all speak frames and keeps a full ledger of the conversation so far. The trouble is that the context aggregator appends a new LLM sentence without any concern if the user heard it. (audio equivalent of that sentence was played back to the user). Text to speech processors like Elevenlabs return also char timing information. Simulflow maintains this info with [[file:src/simulflow/frame.clj::(def XiAlignment \[:map][vendor specific frames]]. Currently, they are not used in processing. As an example, they were used by a user of simulflow to track when a specific sentence was said. We could normalize the char alignment into a standard format and make audio chunking at least fit specific chars. This way, the transport out could emit a “heard” frame - a frame that denotes that specific word was heard by the user so it can be safely appended to the context even if the full sentence wasn’t heard.
Here’s another description of this process from the perspective of another popular realtime ai orchestration platform: pipecat
In pipecat, transport contains both in & out. When audio comes in, each chunk is
sent to a VAD analizer like Silero and optionally a Smart turn detection model.
Based on these analyzers the BaseTransportInput keeps a speaking flag. If
there is a smart turn detector, it’s logic for wether the user is
speaking/stopped speaking will take precedence, otherwise the VAD will be the
source of truth.
When the VAD state changes, (ex: speaking -> not speaking), the pipeline emits
VADUserStartedSpeakingFrame=/=VADUserStoppedSpeakingFrame. This happens
regardless in order to have good observation on the pipeline.
The transport input has a role into starting interruptions if there isn’t a interruption strategy created. If one or more interruption strategies exists, the triggering of the interruption logic is defered to use UserContextAggregator that has access to the current transcriptions coming in.
Interruption strategies differ from normal VAD or Turn Taking because they can
implement custom logic like: Interrupt the user only if the user said 3 words.
The context aggregator will send a BotInterrupt frame to transport in which
will send a StartInterruptionFrame.
- When either the BaseTransportInput or the UserContextAggregator deems an
interruption should start, they emit a frame to do so. BaseTransportInput emits
a StartInterruptionFrame and UserContextAggregator emits a
BotInterruptionFrame which is sent to the BaseTransportInput who upon
receiving this frame, emits a StartInterruptionFrame. This frame does the
following:
- Tells the LLM to cancel in-flight requests and current streaming tokens
- Tells TTS processors to cancel in-flight reqeusts and clear their accumulators
- Tells TransportOut to stop sending AudioOut frames and clear the current playback queue
- (Possibly) tells BotContextAggregator to drop current sentence assembled or just cut it short as the user only heard a part of it.
- The pipeline is now in a interrupted state (relevant for TransportOut because
it drops any new AudioOut frames until it receives a
StopInterruptionFrame) - When user is deemed to have stopped speaking (by either VAD or Turn taking
model) a
UserStoppedSpeakingframe is sent. Which will trigger aStopInterruptionFrameif the pipeline supports interuption. This frame will either be sent by the TransportBaseInput.Important mention here: The LLM, TTS don’t keep a
speakingflag in this period as they don’t care about this state. They only drop their current activity when aStartInterruptionFrameis received but don’t handle aStopInterruptionFrameat all.
- (I think) simulflow TTS processors whould keep a
:pipeline/interrupted?state because when the processor receives aspeak-frame, it sends it on the websocket connection to the actual TTS provider that may send one or more events back that need to be accumulated to construct the full audio eequivalent of the text from thespeak-frame. Therefore we keep thepipeline/interrupted?flag so when new data is received on the websocket the processor drops them. - We need a way to clear the “playback queue”. Currently the playback queue is represented by the file:src/simulflow/transport/out.clj::audio-write-ch (a/chan 1024)\[audio-write-channel]] defined. There is a drain-channel! function which will work but we need to introduce two channels to communicate with the [[file:src/simulflow/transport/out.clj::(vthread-loop \[\]][process running in a vthread]] that sends audio to out. One for commands to drain audio, and one on which to take audio from (the current existing one)
- Pipecat uses a bidirectional queue system between processors:
Transport in <-> Transcriptor <-> Context Aggregator <-> LLM <-> TTS <-> Transport out
Processors send frames either forward or backward (upstream or downstream). They also have two different queues on each direction, to account for normal frames and system frames, because you want system frames (like interrupt-start) to be handled immediately, even if on the queue you have more frames to handle.
This system makes it easy to send system frames and be sure they are received by all interested processors. The downside is that all processors need to process all frames, by at a minimum sending them to the next process in the defined direction.
Simulflow defines processes in a directed graph that can have cycles, so as a user you need to define exactly the connections needed for processors to interract correctly.
This is more efficient, in the sense that processors need to handle only the frames they care about and don’t have to worry about sending frames forward however this complicates system frames propagation. and you end up with large edge definitions. Example:
(defn phone-flow
"This example showcases a voice AI agent for the phone. Phone audio is usually
encoded as MULAW at 8kHz frequency (sample rate) and it is mono (1 channel)."
[{:keys [llm-context extra-procs in out extra-conns language vad-analyser]
:or {llm-context {:messages [{:role "system"
:content "You are a helpful assistant "}]}
extra-procs {}
language :en
extra-conns []}}]
(let [chunk-duration-ms 20]
{;; procs are the processes involved in the pipeline. They are nodes in the graph
:procs
(u/deep-merge
{:transport-in {:proc transport-in/twilio-transport-in
:args {:transport/in-ch in
:vad/analyser vad-analyser}}
:transcriptor {:proc asr/deepgram-processor
:args {:transcription/api-key (secret [:deepgram :api-key])
:transcription/interim-results? true
:transcription/punctuate? false
:transcription/vad-events? false
:transcription/smart-format? true
:transcription/model :nova-2
:transcription/utterance-end-ms 1000
:transcription/language language}}
:context-aggregator {:proc context/context-aggregator
:args {:llm/context llm-context
:aggregator/debug? false}}
:llm {:proc llm/openai-llm-process
:args {:openai/api-key (secret [:openai :new-api-sk])
:llm/model :gpt-4.1-mini}}
:assistant-context-assembler {:proc context/assistant-context-assembler
:args {:debug? false}}
:llm-sentence-assembler {:proc context/llm-sentence-assembler}
:tts {:proc tts/elevenlabs-tts-process
:args {:elevenlabs/api-key (secret [:elevenlabs :api-key])
:elevenlabs/model-id "eleven_flash_v2_5"
:elevenlabs/voice-id (secret [:elevenlabs :voice-id])
:voice/stability 0.5
:voice/similarity-boost 0.8
:voice/use-speaker-boost? true
:pipeline/language language
:audio.out/sample-rate 8000
:audio.out/encoding :ulaw}}
:audio-splitter {:proc transport/audio-splitter
:args {:audio.out/duration-ms chunk-duration-ms
:audio.out/sample-size-bits 8}}
:transport-out {:proc transport-out/realtime-out-processor
:args {:audio.out/chan out
:audio.out/sending-interval chunk-duration-ms}}
:activity-monitor {:proc activity-monitor/process
:args {::activity-monitor/timeout-ms 5000}}}
extra-procs)
;; :conns are the edges of the graph. The :out, :in, :sys-out, :sys-in are
;; the channels each processor defines. They are described in the 0 arity
;; version of the processor or the describe function
:conns (concat
[[[:transport-in :out] [:transcriptor :in]]
[[:transcriptor :out] [:context-aggregator :in]]
[[:transport-in :sys-out] [:context-aggregator :sys-in]]
[[:transport-in :sys-out] [:transport-out :sys-in]]
[[:context-aggregator :out] [:llm :in]]
;; Aggregate full context
[[:llm :out] [:assistant-context-assembler :in]]
[[:assistant-context-assembler :out] [:context-aggregator :in]]
;; Assemble sentence by sentence for fast speech
[[:llm :out] [:llm-sentence-assembler :in]]
[[:llm-sentence-assembler :out] [:tts :in]]
[[:tts :out] [:audio-splitter :in]]
[[:audio-splitter :out] [:transport-out :in]]
;; Activity detection
[[:transport-out :sys-out] [:activity-monitor :sys-in]]
[[:transport-in :sys-out] [:activity-monitor :sys-in]]
[[:transcriptor :sys-out] [:activity-monitor :sys-in]]
[[:activity-monitor :out] [:context-aggregator :in]]
[[:activity-monitor :out] [:tts :in]]]
extra-conns)}))There is a system frame router process that can be used but there is a caveat:
the system frame router, fans out all of the system frames he receives to his
:sys-out channel but all the processors that define an edge from his output,
need to ensure to not send system frames forward because it will cause an
infinite loop. Just to be mentioned, all processors that emit system frames,
will receive back the same system frame from the system route simply by the
nature of the setup.
We’ll need to keep a context-id or sentence-id for each resulting
audio-out-raw frame from TTS service. The original sentence-id will be kept on
each audio-out-raw resulting from audio splitter to provide realtime.
The TTS processor will output a word-timestamp frame with the same sentence-id
so it can be matched when playback happens.
The transport-out processor will receive the realtime audio-out-raw frames and
keep the sentence-id in local state until it has been played back:
- Depending which one comes it first:
word-timestampframe or theaudio-out-raw, the state will keep a map of{sentence-id: {word-timestamps started-playback?}} - When the first audio frame is played back, started-playback? is turned to true
- A new command will be added:
:command/output-words. The handler from the init processor will receive theword-timestampdata and wait based on the computed end time of each word to send back to the transform aword-playedmsg which will output aWordPlayedFrameor aword-heardthat has thesentence-id,wordand if it marks the sentence end. - The LLMSentenc
Add support for ultravox
Use trove as a logging facade so we don’t force users to use telemere for logging
research clojure-media for dedicated ffmpeg support for media conversion
(def phone-flow
(simulflow/create-flow {:language :en
:transport {:mode :telephony
:in (input-channel)
:out (output-channel)}
:transcriptor {:proc asr/deepgram-processor
:args {:transcription/api-key (secret [:deepgram :api-key])
:transcription/model :nova-2}}
:llm {:proc llm/openai-llm-process
:args {:openai/api-key (secret [:openai :new-api-sk])
:llm/model "gpt-4o-mini"}}
:tts {:proc tts/elevenlabs-tts-process
:args {:elevenlabs/api-key (secret [:elevenlabs :api-key])
:elevenlabs/model-id "eleven_flash_v2_5"}}}))Some code from another project
;;;;;;;;; Gladia ASR ;;;;;;;;;;;;;
;; :frames_format "base64"
;; :word_timestamps true})
(def ^:private gladia-url "wss://api.gladia.io/audio/text/audio-transcription")
;; this may be outdated
(def ^:private asr-configuration {:x_gladia_key api-key
:sample_rate 8000
:encoding "WAV/ULAW"
:language_behaviour "manual"
:language "romanian"})
(defn transcript?
[m]
(= (:event m) "transcript"))
(defn final-transcription?
[m]
(and (transcript? m)
(= (:type m) "final")))
(defn partial-transcription?
[m]
(and (transcript? m)
(= (:type m) "partial")))
(defrecord GladiaASR [ws asr-chan]
ASR
(send-audio-chunk [_ data]
(send! ws {:frames (get-in data [:media :payload])} false))
(close! [_]
(ws/close! ws)))
(defn- make-gladia-asr!
[{:keys [asr-text]}]
;; TODO: Handle reconnect & errors
(let [ws @(websocket gladia-url
{:on-open (fn [ws]
(prn "Open ASR Stream")
(send! ws asr-configuration)
(u/log ::gladia-asr-connected))
:on-message (fn [_ws ^HeapCharBuffer data _last?]
(let [m (json/parse-if-json (str data))]
(u/log ::gladia-msg :m m)
(when (final-transcription? m)
(u/log ::gladia-asr-transcription :sentence (:transcription m) :transcription m)
(go (>! asr-text (:transcription m))))))
:on-error (fn [_ e]
(u/log ::gladia-asr-error :exception e))
:on-close (fn [_ code reason]
(u/log ::gladia-asr-closed :code code :reason reason))})]
(->GladiaASR ws asr-text)))
(require '[wkok.openai-clojure.api :as openai])
(defn openai
"Generate speech using openai"
([input]
(openai input {}))
([input config]
(openai/create-speech (merge {:input input
:voice "alloy"
:response_format "wav"
:model "tts-1"}
config)
{:version :http-2 :as :stream})))
(defn tts-stage-openai
[sid in]
(a/go-loop []
(let [sentence (a/<! in)]
(when-not (nil? sentence)
(append-message! sid "assistant" sentence)
(try
(let [sentence-stream (-> (tts/openai sentence) (io/input-stream))
ais (AudioSystem/getAudioInputStream sentence-stream)
twilio-ais (audio/->twilio-phone ais)
buffer (byte-array 256)]
(loop []
(let [bytes-read (.read twilio-ais buffer)]
(when (pos? bytes-read)
(twilio/send-msg! (sessions/ws sid)
sid
(e/encode-base64 buffer))
(recur)))))
(catch Exception e
(u/log ::tts-stage-error :exception e)))
(recur)))))
(def ^:private rime-tts-url "https://users.rime.ai/v1/rime-tts")
(defn rime
"Generate speech using rime-ai provider"
[sentence]
(-> {:method :post
:url rime-tts-url
:as :stream
:body (json/->json-str {:text sentence
:reduceLatency false
:samplingRate 8000
:speedAlpha 1.0
:modelId "v1"
:speaker "Colby"})
:headers {"Authorization" (str "Bearer " rime-api-key)
"Accept" "audio/x-mulaw"
"Content-Type" "application/json"}}
(client/request)
:body))
(defn rime-async
"Generate speech using rime-ai provider, outputs results on a async
channel"
[sentence]
(let [stream (-> (rime sentence)
(io/input-stream))
c (a/chan 1024)]
(au/input-stream->chan stream c 1024)))
(defn tts-stage
[sid in]
(a/go-loop []
(let [sentence (a/<! in)]
(when-not (nil? sentence)
(append-message! sid "assistant" sentence)
(try
(let [sentence-stream (-> (tts/rime sentence) (io/input-stream))
buffer (byte-array 256)]
(loop []
(let [bytes-read (.read sentence-stream buffer)]
(when (pos? bytes-read)
(twilio/send-msg! (sessions/ws sid)
sid
(e/encode-base64 buffer))
(recur)))))
(catch Exception e
(u/log ::tts-stage-error :exception e)))
(recur)))))Add support for Talon STT
This means implementing flow diagrams
{:initial-node :start
:nodes
{:start {:role_messages [{:role :system
:content "You are an order-taking assistant. You must ALWAYS use the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis."}]
:task_messages [{:role :system
:content "For this step, ask the user if they want pizza or sushi, and wait for them to use a function to choose. Start off by greeting them. Be friendly and casual; you're taking an order for food over the phone."}]}
:functions [{:type :function
:function {:name :choose_sushi
:description "User wants to order sushi. Let's get that order started"
}}]
}}