Skip to content

Commit 78abdfd

Browse files
committed
More progress
1 parent 2be5c66 commit 78abdfd

File tree

2 files changed

+102
-12
lines changed

2 files changed

+102
-12
lines changed

core/src/voice_fn/experiments/flow.clj

Lines changed: 101 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
(ns voice-fn.experiments.flow
22
(:require
3+
[clojure.core.async :as a]
34
[clojure.core.async.flow :as flow]
5+
[clojure.core.async.flow.impl :as impl]
6+
[clojure.core.async.flow.spi :as spi]
7+
[clojure.datafy :refer [datafy]]
8+
[clojure.pprint :as pprint]
49
[hato.websocket :as ws]
510
[taoensso.telemere :as t]
611
[voice-fn.frame :as frame]
@@ -9,7 +14,10 @@
914
[voice-fn.transport.serializers :refer [make-twilio-serializer]]
1015
[voice-fn.utils.core :as u])
1116
(:import
12-
(java.nio HeapCharBuffer)))
17+
(java.nio HeapCharBuffer)
18+
(java.util.concurrent TimeUnit)))
19+
20+
(t/set-min-level! :debug)
1321

1422
(def real-gdef
1523
{:procs
@@ -44,11 +52,11 @@
4452
:in "Channel for audio input frames (from transport-in) "}
4553
:outs {:sys-out "Channel for system messages that have priority"
4654
:out "Channel on which transcription frames are put"}
47-
:params {:deepgram/api-key "Api key for deepgram"
48-
:processor/supports-interrupt? "Wether this processor should send interrupt start/stop events on the pipeline"}
55+
:params {:deepgram/api-key "Api key for deepgram"}
4956
:workload :io})
5057
:init
5158
(fn [args]
59+
(pprint/pprint args)
5260
(let [websocket-url (deepgram/make-websocket-url {:transcription/api-key (:deepgram/api-key args)
5361
:transcription/interim-results? true
5462
:transcription/punctuate? false
@@ -92,20 +100,22 @@
92100
(t/log! {:level :error :id :deepgram-transcriptor} ["Error" e]))
93101
:on-close (fn [_ws code reason]
94102
(t/log! {:level :info :id :deepgram-transcriptor} ["Deepgram websocket connection closed" "Code:" code "Reason:" reason]))}
95-
_ (t/log! "Connecting to transcription websocket")
103+
_ (t/log! {:level :info :id :deepgram-transcriptor} "Connecting to transcription websocket")
96104
ws-conn @(ws/websocket
97105
websocket-url
98106
conn-config)]
99107
{:websocket/conn ws-conn}))
100108

101109
;; Close ws when pipeline stops
102-
:transition (fn [{conn :websocket/conn} transition]
103-
(prn "This got called")
104-
(when (and (= transition ::flow/stop)
105-
conn)
106-
(t/log! {:id :deepgram-transcriptor :level :debug} "Closing transcription websocket connection")
107-
(ws/send! conn deepgram/close-connection-payload)
108-
(ws/close! conn)))
110+
:transition (fn [{:websocket/keys [conn] :as state} transition]
111+
(if (and (= transition ::flow/stop)
112+
conn)
113+
(do
114+
(t/log! {:id :deepgram-transcriptor :level :info} "Closing transcription websocket connection")
115+
(ws/send! conn deepgram/close-connection-payload)
116+
(ws/close! conn)
117+
{})
118+
state))
109119

110120
:transform (fn [{:websocket/keys [conn]} in-name frame]
111121
(cond
@@ -122,10 +132,89 @@
122132
[[:deepgram-transcriptor :out] [:print-sink :in]]]
123133
:args {:deepgram/api-key (secret [:deepgram :api-key])}})
124134

135+
(defn raw-proc
136+
"see lib ns for docs"
137+
[{:keys [describe init transition transform introduce] :as impl} {:keys [workload compute-timeout-ms]}]
138+
(assert (= 1 (count (keep identity [transform introduce]))) "must provide exactly one of :transform or :introduce")
139+
(let [{:keys [params ins] :as desc} (describe)
140+
workload (or workload (:workload desc) :mixed)]
141+
(assert (not (and ins introduce)) "can't specify :ins when :introduce")
142+
(assert (or (not params) init) "must have :init if :params")
143+
(assert (not (and introduce (= workload :compute))) "can't specify :introduce and :compute")
144+
(reify
145+
clojure.core.protocols/Datafiable
146+
(datafy [_]
147+
(let [{:keys [params ins outs]} desc]
148+
{:impl impl :params (-> params keys vec) :ins (-> ins keys vec) :outs (-> outs keys vec)}))
149+
spi/ProcLauncher
150+
(describe [_] desc)
151+
(start [_ {:keys [pid args ins outs resolver]}]
152+
(let [comp? (= workload :compute)
153+
transform (cond-> transform (= workload :compute)
154+
#(.get (impl/futurize transform {:exec (spi/get-exec resolver :compute)})
155+
compute-timeout-ms TimeUnit/MILLISECONDS))
156+
exs (spi/get-exec resolver (if (= workload :mixed) :mixed :io))
157+
io-id (zipmap (concat (vals ins) (vals outs)) (concat (keys ins) (keys outs)))
158+
control (::flow/control ins)
159+
;; TODO rotate/randomize after control per normal alts?
160+
read-chans (into [control] (-> ins (dissoc ::flow/control) vals))
161+
run
162+
#(loop [status :paused, state (when init (init args)), count 0]
163+
(let [pong (fn []
164+
(let [pins (dissoc ins ::flow/control)
165+
pouts (dissoc outs ::flow/error ::flow/report)]
166+
(a/>!! (outs ::flow/report)
167+
#::flow{:report :ping, :pid pid, :status status
168+
:state state, :count count
169+
:ins (zipmap (keys pins) (map impl/chan->data (vals pins)))
170+
:outs (zipmap (keys pouts) (map impl/chan->data (vals pouts)))})))
171+
handle-command (partial impl/handle-command pid pong)
172+
[nstatus nstate count]
173+
(try
174+
(if (= status :paused)
175+
(let [nstatus (handle-command status (a/<!! control))
176+
nstate (impl/handle-transition transition status nstatus state)]
177+
[nstatus nstate count])
178+
;; :running
179+
(let [[msg c] (if transform
180+
(a/alts!! read-chans :priority true)
181+
;; introduce
182+
(when-let [msg (a/poll! control)]
183+
[msg control]))
184+
cid (io-id c)]
185+
(if (= c control)
186+
(let [nstatus (handle-command status msg)
187+
nstate (impl/handle-transition transition status nstatus state)]
188+
[nstatus nstate count])
189+
(try
190+
(let [[nstate outputs] (if transform
191+
(transform state cid msg)
192+
(introduce state))
193+
[nstatus nstate]
194+
(impl/send-outputs status nstate outputs outs
195+
resolver control handle-command transition)]
196+
[nstatus nstate (inc count)])
197+
(catch Throwable ex
198+
(a/>!! (outs ::flow/error)
199+
#::flow{:pid pid, :status status, :state state
200+
:count (inc count), :cid cid, :msg msg :op :step, :ex ex})
201+
[status state count])))))
202+
(catch Throwable ex
203+
(a/>!! (outs ::flow/error)
204+
#::flow{:pid pid, :status status, :state state, :count (inc count), :ex ex})
205+
[status state count]))]
206+
(when-not (= nstatus :exit) ;; fall out
207+
(recur nstatus nstate (long count)))))]
208+
((impl/futurize run {:exec exs})))))))
209+
125210
(comment
211+
(datafy (:proc (:deepgram-transcriptor (:procs real-gdef))))
212+
126213
(def g (flow/create-flow real-gdef))
127214

128215
(def res (flow/start g))
129216

130217
(flow/resume g)
131-
(flow/stop g))
218+
(flow/stop g)
219+
220+
,)

core/src/voice_fn/lab.clj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
(ns voice-fn.lab
2+
"This is a gist demonstrating basic usage of clojure.core.async.flow"
23
(:require
34
[clojure.core.async :as async]
45
[clojure.core.async.flow :as flow]

0 commit comments

Comments
 (0)