|
10 | 10 | PipedInputStream
|
11 | 11 | PipedOutputStream]))
|
12 | 12 |
|
13 |
| -(declare graph) |
| 13 | +(defn create-step |
| 14 | + "compile a function that inserts a user message to-array |
| 15 | + to the conversation and stream it through the graph |
| 16 | + compiled function returns a channel that will emit the final state" |
| 17 | + [graph] |
| 18 | + (fn [state] |
| 19 | + (graph/stream graph state))) |
14 | 20 |
|
15 |
| -(def do-stream (partial graph/stream graph)) |
| 21 | +(defn state-reducer [state s] |
| 22 | + (update state :messages (fnil conj []) {:role "user" :content s})) |
16 | 23 |
|
17 |
| -(defn start-jsonrpc-loop [f in m] |
| 24 | +(defn create-test-step [] |
| 25 | + (fn [state] |
| 26 | + (async/go |
| 27 | + state))) |
| 28 | + |
| 29 | +(defn start-jsonrpc-loop |
| 30 | + "start a jsonrpc loop that will inject jsonrpc requests into an |
| 31 | + ongoing set of state transitions |
| 32 | + params |
| 33 | + run-graph async state -> state |
| 34 | + reduce-state state, message -> state |
| 35 | + in input stream |
| 36 | + m initial state |
| 37 | + returns |
| 38 | + the final state" |
| 39 | + [run-graph state-reducer in m] |
18 | 40 | (let [c (jsonrpc/input-stream->input-chan in {})]
|
19 | 41 | (async/go-loop
|
20 |
| - [state m] |
21 |
| - (let [message (async/<! c) |
22 |
| - s (-> message :params :content)] |
23 |
| - (println "message content: " s) |
24 |
| - (if (some (partial = s) ["exit" "quit" "q"]) |
25 |
| - state |
26 |
| - (recur (async/<! (f state s)))))))) |
| 42 | + [next-state (async/<! (run-graph m)) n 0] |
| 43 | + (println "### loop " n) |
| 44 | + (let [message (async/<! c)] |
| 45 | + (cond |
| 46 | + (= "exit" (:method message)) |
| 47 | + (assoc next-state :jsonrpc-loop-finished :exit) |
| 48 | + :else |
| 49 | + (recur (async/<! ((comp run-graph state-reducer) next-state (-> message :params :content))) (inc n))))))) |
27 | 50 |
|
28 | 51 | (def counter (atom 0))
|
29 | 52 | (defn get-id [] (swap! counter inc))
|
30 | 53 |
|
31 | 54 | (def ^{:private true} start-test-loop
|
32 |
| - (partial start-jsonrpc-loop (fn [state s] |
33 |
| - (async/go |
34 |
| - (update state :messages (fnil conj []) s))))) |
| 55 | + (partial start-jsonrpc-loop (create-test-step) state-reducer)) |
35 | 56 |
|
36 |
| -(defn -create-pipe [] |
| 57 | +(defn create-pipe [] |
37 | 58 | ;; Create a PipedInputStream and PipedOutputStream
|
38 | 59 | (let [piped-out (PipedOutputStream.)
|
39 | 60 | piped-in (PipedInputStream. piped-out)
|
|
43 | 64 | piped-in]))
|
44 | 65 |
|
45 | 66 | (comment
|
46 |
| - (let [[[w c] in] (-create-pipe)] |
47 |
| - (async/go (println "ending: " (async/<! (start-test-loop in {})))) |
48 |
| - (w (jsonrpc/request "prompt" {:content "hello"} get-id)) |
49 |
| - (w (jsonrpc/request "prompt" {:content "hello1"} get-id)) |
50 |
| - (w (jsonrpc/request "prompt" {:content "exit"} get-id)) |
51 |
| - (c))) |
| 67 | + (println "should be true: " |
| 68 | + (async/<!! |
| 69 | + (let [[[w c] in] (create-pipe)] |
| 70 | + (w (jsonrpc/request "prompt" {:content "hello"} get-id)) |
| 71 | + (w (jsonrpc/request "prompt" {:content "hello1"} get-id)) |
| 72 | + (w (jsonrpc/request "prompt" {:content "hello2"} get-id)) |
| 73 | + (w (jsonrpc/request "exit" {} get-id)) |
| 74 | + (c) |
| 75 | + (async/go |
| 76 | + (println "ending: " (async/<! (start-test-loop in {}))) |
| 77 | + true))))) |
52 | 78 |
|
53 | 79 | (comment
|
54 | 80 | ;; an input stream is something from which we can read bytes
|
|
0 commit comments