|
21 | 21 | (defn create-input-channel
|
22 | 22 | "Read messages from the input stream, put them on a channel."
|
23 | 23 | [input-stream]
|
24 |
| - (let [chan (async/chan) |
| 24 | + (let [chan (async/chan 1024) |
25 | 25 | input-stream (DataInputStream. input-stream)]
|
26 |
| - (async/go |
27 |
| - (while true |
28 |
| - (log/info "waiting for input stream") |
29 |
| - (let [msg (msgpack/unpack-stream input-stream)] |
30 |
| - (log/info "stream -> msg -> in chan: " msg) |
31 |
| - (async/>! chan msg)))) |
| 26 | + (async/go-loop [] |
| 27 | + (let [msg (msgpack/unpack-stream input-stream)] |
| 28 | + (log/info "stream -> msg -> in chan: " msg) |
| 29 | + (async/>! chan msg)) |
| 30 | + (recur)) |
32 | 31 | chan))
|
33 | 32 |
|
34 | 33 | (defn write-msg!
|
|
40 | 39 | (defn create-output-channel
|
41 | 40 | "Make a channel to read messages from, write to output stream."
|
42 | 41 | [output-stream]
|
43 |
| - (let [chan (async/chan) |
| 42 | + (let [chan (async/chan 1024) |
44 | 43 | output-stream (DataOutputStream. output-stream)]
|
45 |
| - (async/go |
46 |
| - (while true |
47 |
| - (let [msg (async/<! chan)] |
48 |
| - (log/info "stream <- msg <- out chan: " msg) |
49 |
| - (write-msg! (msgpack/pack msg) output-stream)))) |
| 44 | + (async/go-loop [] |
| 45 | + (let [msg (async/<! chan)] |
| 46 | + (log/info "stream <- msg <- out chan: " msg) |
| 47 | + (write-msg! (msgpack/pack msg) output-stream)) |
| 48 | + (recur)) |
50 | 49 | chan))
|
51 | 50 |
|
52 | 51 | (declare send-message-async!)
|
|
57 | 56 | (reset! out-chan (create-output-channel output-stream))
|
58 | 57 |
|
59 | 58 | ;; Handle stuff on the input channel -- where should this live?
|
60 |
| - (async/go |
61 |
| - (while true |
62 |
| - ;; TODO - let the in-chan, if we leave this code here. |
63 |
| - (let [msg (async/<! @in-chan)] |
64 |
| - (condp = (msg-type msg) |
| 59 | + (async/go-loop [] |
| 60 | + ;; TODO - let the in-chan, if we leave this code here. |
| 61 | + (let [msg (async/<! @in-chan)] |
| 62 | + (condp = (msg-type msg) |
65 | 63 |
|
66 |
| - msg/+response+ |
67 |
| - (let [f (:fn (get @msg-table (id msg)))] |
68 |
| - (swap! msg-table dissoc (id msg)) |
69 |
| - (f (value msg))) |
| 64 | + msg/+response+ |
| 65 | + (let [f (:fn (get @msg-table (id msg)))] |
| 66 | + (swap! msg-table dissoc (id msg)) |
| 67 | + (f (value msg))) |
70 | 68 |
|
71 |
| - msg/+request+ |
72 |
| - (let [f (get @method-table (method msg) method-not-found) |
73 |
| - result (f msg)] |
74 |
| - (send-message-async! (->response-msg (id msg) result) nil))))))) |
| 69 | + msg/+request+ |
| 70 | + (let [f (get @method-table (method msg) method-not-found) |
| 71 | + result (f msg)] |
| 72 | + (send-message-async! (->response-msg (id msg) result) nil)))) |
| 73 | + (recur))) |
75 | 74 |
|
76 | 75 | ;; ***** Public *****
|
77 | 76 |
|
|
0 commit comments