|
1 | 1 | (ns neovim-client.rpc
|
2 | 2 | (:require [clojure.core.async :as async]
|
3 | 3 | [clojure.tools.logging :as log]
|
| 4 | + [msgpack.clojure-extensions] |
4 | 5 | [msgpack.core :as msgpack]
|
5 | 6 | [neovim-client.message :refer [id value msg-type method params
|
6 | 7 | ->response-msg]
|
|
16 | 17 | (defn- create-input-channel
|
17 | 18 | "Read messages from the input stream, put them on a channel."
|
18 | 19 | [input-stream]
|
19 |
| - (let [chan (async/chan 1024) |
20 |
| - input-stream (DataInputStream. input-stream)] |
| 20 | + (let [chan (async/chan 1024)] |
21 | 21 | (async/go-loop
|
22 | 22 | []
|
23 |
| - (when-let [msg (msgpack/unpack-stream input-stream)] |
| 23 | + (when-let [msg (msgpack/unpack (DataInputStream. input-stream))] |
24 | 24 | (log/info "stream -> msg -> in chan: " msg)
|
25 | 25 | (async/>! chan msg)
|
26 | 26 | (recur)))
|
27 |
| - [chan input-stream])) |
| 27 | + chan)) |
28 | 28 |
|
29 | 29 | (defn- write-msg!
|
30 | 30 | [packed-msg out-stream]
|
|
35 | 35 | (defn- create-output-channel
|
36 | 36 | "Make a channel to read messages from, write to output stream."
|
37 | 37 | [output-stream]
|
38 |
| - (let [chan (async/chan 1024) |
39 |
| - output-stream (DataOutputStream. output-stream)] |
| 38 | + (let [chan (async/chan 1024)] |
40 | 39 | (async/go-loop
|
41 | 40 | []
|
42 | 41 | (when-let [msg (async/<! chan)]
|
43 | 42 | (log/info "stream <- msg <- out chan: " msg)
|
44 |
| - (write-msg! (msgpack/pack msg) output-stream) |
| 43 | + (write-msg! (msgpack/pack msg) (DataOutputStream. output-stream)) |
45 | 44 | (recur)))
|
46 | 45 | chan))
|
47 | 46 |
|
|
66 | 65 | (defn stop
|
67 | 66 | "Stop the connection. Right now, this probably only works for debug, when
|
68 | 67 | connected to socket. Don't think we should be trying to .close STDIO streams."
|
69 |
| - [{:keys [input-stream output-stream out-chan in-chan data-stream]}] |
| 68 | + [{:keys [input-stream output-stream out-chan in-chan]}] |
70 | 69 | (async/close! out-chan)
|
71 | 70 | (async/close! in-chan)
|
72 |
| - (.close data-stream) |
73 | 71 | (.close input-stream)
|
74 | 72 | (.close output-stream))
|
75 | 73 |
|
76 | 74 | (defn new*
|
77 | 75 | [input-stream output-stream]
|
78 |
| - (let [[in-chan data-stream] (create-input-channel input-stream) |
| 76 | + (let [in-chan (create-input-channel input-stream) |
79 | 77 | message-table (atom {})
|
80 | 78 | method-table (atom {})
|
81 | 79 | component {:input-stream input-stream
|
82 |
| - :data-stream data-stream |
83 | 80 | :output-stream output-stream
|
84 | 81 | :out-chan (create-output-channel output-stream)
|
85 | 82 | :in-chan in-chan
|
|
0 commit comments