Skip to content

Commit b3127de

Browse files
committed
Fix infinite loop in context-aggregator from tool-result-frames
Using system router with fan out mechanism needs to ensure that processors never send out system frames that they receive or a infinite loop can occur
1 parent ab1a10f commit b3127de

File tree

3 files changed

+159
-5
lines changed

3 files changed

+159
-5
lines changed
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
(ns simulflow-examples.scenario-example-system-router-mute-filter
2+
(:require
3+
[clojure.core.async :as a]
4+
[clojure.core.async.flow :as flow]
5+
[simulflow-examples.scenario-example :refer [config]]
6+
[simulflow.async :refer [vthread-loop]]
7+
[simulflow.filters.mute :as mute-filter]
8+
[simulflow.processors.deepgram :as deepgram]
9+
[simulflow.processors.elevenlabs :as xi]
10+
[simulflow.processors.llm-context-aggregator :as context]
11+
[simulflow.processors.openai :as openai]
12+
[simulflow.processors.system-frame-router :as system-router]
13+
[simulflow.scenario-manager :as sm]
14+
[simulflow.secrets :refer [secret]]
15+
[simulflow.transport :as transport]
16+
[simulflow.transport.in :as transport-in]
17+
[simulflow.transport.out :as transport-out]
18+
[taoensso.telemere :as t]))
19+
20+
(def llm-context
21+
{:messages
22+
[{:role "system"
23+
:content "You are a voice agent operating via phone. Be
24+
concise in your answers. The input you receive comes from a
25+
speech-to-text (transcription) system that isn't always
26+
efficient and may send unclear text. Ask for
27+
clarification when you're unsure what the person said."}]
28+
:tools
29+
[{:type :function
30+
:function
31+
{:name "get_weather"
32+
:handler (fn [{:keys [town]}] (str "The weather in " town " is 17 degrees celsius"))
33+
:description "Get the current weather of a location"
34+
:parameters {:type :object
35+
:required [:town]
36+
:properties {:town {:type :string
37+
:description "Town for which to retrieve the current weather"}}
38+
:additionalProperties false}
39+
:strict true}}]})
40+
41+
(def chunk-duration 20)
42+
43+
(def flow-processors
44+
{;; Capture audio from microphone and send raw-audio-input frames further in the pipeline
45+
:transport-in {:proc transport-in/microphone-transport-in
46+
:args {:vad/analyser :vad.analyser/silero
47+
:pipeline/supports-interrupt? false}}
48+
;; raw-audio-input -> transcription frames
49+
:transcriptor {:proc deepgram/deepgram-processor
50+
:args {:transcription/api-key (secret [:deepgram :api-key])
51+
:transcription/interim-results? true
52+
:transcription/punctuate? false
53+
;; We use silero for computing VAD
54+
:transcription/vad-events? false
55+
:transcription/smart-format? true
56+
:transcription/model :nova-2
57+
:transcription/utterance-end-ms 1000
58+
:transcription/language :en}}
59+
60+
;; user transcription & llm message frames -> llm-context frames
61+
;; responsible for keeping the full conversation history
62+
:context-aggregator {:proc context/context-aggregator
63+
:args {:llm/context llm-context}}
64+
65+
;; Takes llm-context frames and produces new llm-text-chunk & llm-tool-call-chunk frames
66+
:llm {:proc openai/openai-llm-process
67+
:args {:openai/api-key (secret [:openai :new-api-sk])
68+
:llm/model :gpt-4.1-mini}}
69+
70+
;; llm-text-chunk & llm-tool-call-chunk -> llm-context-messages-append frames
71+
:assistant-context-assembler {:proc context/assistant-context-assembler
72+
:args {}}
73+
74+
;; llm-text-chunk -> sentence speak frames (faster for text to speech)
75+
:llm-sentence-assembler {:proc context/llm-sentence-assembler}
76+
77+
;; speak-frames -> audio-output-raw frames
78+
:tts {:proc xi/elevenlabs-tts-process
79+
:args {:elevenlabs/api-key (secret [:elevenlabs :api-key])
80+
:elevenlabs/model-id "eleven_flash_v2_5"
81+
:elevenlabs/voice-id (secret [:elevenlabs :voice-id])
82+
:voice/stability 0.5
83+
:voice/similarity-boost 0.8
84+
:voice/use-speaker-boost? true
85+
:pipeline/language :en}}
86+
87+
;; audio-output-raw -> smaller audio-output-raw frames (used for sending audio in realtime)
88+
:audio-splitter {:proc transport/audio-splitter
89+
:args {:audio.out/duration-ms chunk-duration}}
90+
91+
;; speakers out
92+
:transport-out {:proc transport-out/realtime-speakers-out-processor
93+
:args {:audio.out/sending-interval chunk-duration
94+
:audio.out/duration-ms chunk-duration}}
95+
96+
:mute-filter {:proc mute-filter/process
97+
:args {:mute/strategies #{:mute.strategy/bot-speech}}}
98+
99+
:scenario-in {:proc sm/scenario-in-process}
100+
;; Fan out of system frames to processors that need it
101+
:system-router {:proc system-router/system-frame-router}})
102+
103+
(def flow-conns (into (system-router/generate-system-router-connections flow-processors)
104+
[[[:transport-in :out] [:transcriptor :in]]
105+
106+
[[:transcriptor :out] [:context-aggregator :in]]
107+
[[:context-aggregator :out] [:llm :in]]
108+
109+
;; Aggregate full context
110+
[[:llm :out] [:assistant-context-assembler :in]]
111+
[[:assistant-context-assembler :out] [:context-aggregator :in]]
112+
113+
;; Assemble sentence by sentence for fast speech
114+
[[:llm :out] [:llm-sentence-assembler :in]]
115+
116+
[[:tts :out] [:audio-splitter :in]]
117+
[[:audio-splitter :out] [:transport-out :in]]]))
118+
119+
(def flow-config
120+
{:procs flow-processors
121+
:conns flow-conns})
122+
123+
(def g (flow/create-flow flow-config))
124+
125+
(defonce flow-started? (atom false))
126+
127+
(def s (sm/scenario-manager {:flow g
128+
:scenario-config config
129+
:flow-in-coord [:scenario-in :scenario-in]}))
130+
131+
(comment
132+
133+
;; Start local ai flow - starts paused
134+
(let [{:keys [report-chan error-chan]} (flow/start g)]
135+
(reset! flow-started? true)
136+
(sm/start s)
137+
;; Resume local ai -> you can now speak with the AI
138+
(flow/resume g)
139+
(vthread-loop []
140+
(when @flow-started?
141+
(when-let [[msg c] (a/alts!! [report-chan error-chan])]
142+
(when (map? msg)
143+
(t/log! (cond-> {:level :debug :id (if (= c error-chan) :error :report)}
144+
(= c error-chan) (assoc :error msg)) msg))
145+
(recur)))))
146+
147+
;; Stop the conversation
148+
(do
149+
(flow/stop g)
150+
(reset! flow-started? false))
151+
152+
,)

src/simulflow/processors/llm_context_aggregator.clj

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@
148148
(some #(% frame) user-speech-predicates))
149149

150150
(defn context-aggregator-transform
151-
[state _ frame]
151+
[state in frame]
152152
(when (:aggregator/debug? state)
153153
(t/log! {:level :debug :id :llm-context} {:type (:frame/type frame) :data (:frame/data frame)}))
154154

@@ -165,7 +165,8 @@
165165
;; context aggregator is the source of truth for the llm context. The
166166
;; assistant aggregator will send tool result frames and the user context
167167
;; aggregator will send back the assembled new context
168-
(frame/llm-tool-call-result? frame)
168+
(and (frame/llm-tool-call-result? frame)
169+
(= in :tool-read))
169170
,(let [tool-result (:frame/data frame)
170171
{:keys [run-llm? on-update] :or {run-llm? true}} (:properties tool-result)
171172
_ (when debug? (t/log! {:level :debug :id id} ["TOOL CALL RESULT: " tool-result]))
@@ -206,7 +207,8 @@
206207
(frame/speak-frame? frame)
207208
[(update-in state [:llm/context :messages] conj {:role :assistant :content (:frame/data frame)})]
208209

209-
(user-speech-frame? frame) (user-speech-aggregator-transform state _ frame)
210+
(user-speech-frame? frame)
211+
(user-speech-aggregator-transform state in frame)
210212

211213
:else [state])))
212214

test/simulflow/processors/llm_context_aggregator_test.clj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,8 @@
155155
:strict true}}]}
156156
s (assoc sstate :llm/context context)
157157
new-context (update-in context [:messages] conj tool-result)
158-
[new-context-state {:keys [out]}] (sut/context-aggregator-transform s nil (frame/llm-tool-call-result {:result tool-result
159-
:request tool-request}))
158+
[new-context-state {:keys [out]}] (sut/context-aggregator-transform s :tool-read (frame/llm-tool-call-result {:result tool-result
159+
:request tool-request}))
160160
context-frame (first out)]
161161
(is (= new-context (:llm/context new-context-state)))
162162
(is (= new-context (:frame/data context-frame)))

0 commit comments

Comments
 (0)