|
19 | 19 | (let [chan (async/chan 1024)]
|
20 | 20 | (async/thread
|
21 | 21 | (loop []
|
| 22 | + (log/info "stream[m] --- in-chan[ ] --- plugin") |
22 | 23 | (when-let [msg (msgpack/unpack input-stream)]
|
23 |
| - (log/info "stream -> msg -> in chan: " msg) |
| 24 | + (log/info "stream[ ] ->m in-chan[ ] --- plugin" msg) |
24 | 25 | (async/>!! chan msg)
|
| 26 | + (log/info "stream[ ] --- in-chan[m] --- plugin" (id msg)) |
25 | 27 | (recur))))
|
26 | 28 | chan))
|
27 | 29 |
|
28 |
| -(defn- write-msg! |
29 |
| - [packed-msg out-stream] |
30 |
| - (doseq [b packed-msg] |
31 |
| - (.writeByte out-stream b)) |
32 |
| - (.flush out-stream)) |
33 |
| - |
34 | 30 | (defn- create-output-channel
|
35 | 31 | "Make a channel to read messages from, write to output stream."
|
36 | 32 | [output-stream]
|
37 | 33 | (let [chan (async/chan 1024)]
|
38 | 34 | (async/thread
|
39 | 35 | (loop []
|
| 36 | + (log/info "stream[ ] --- out-chan[m] --- plugin") |
40 | 37 | (when-let [msg (async/<!! chan)]
|
41 |
| - (log/info "stream <- msg <- out chan: " msg) |
42 |
| - (write-msg! (msgpack/pack msg) output-stream) |
| 38 | + (log/info "stream[ ] m<- out-chan[ ] --- plugin" (id msg)) |
| 39 | + (let [packed (msgpack/pack msg)] |
| 40 | + (.write output-stream packed 0 (count packed))) |
| 41 | + (.flush output-stream) |
| 42 | + (log/info "stream[m] --- out-chan[ ] --- plugin" (id msg)) |
43 | 43 | (recur))))
|
44 | 44 | chan))
|
45 | 45 |
|
46 | 46 | ;; ***** Public *****
|
47 | 47 |
|
48 | 48 | (defn send-message-async!
|
49 | 49 | [{:keys [message-table out-chan]} msg callback-fn]
|
50 |
| - (if (= msg/+request+ (msg-type msg)) |
| 50 | + (when (= msg/+request+ (msg-type msg)) |
51 | 51 | (swap! message-table assoc (id msg) {:msg msg :fn callback-fn}))
|
52 |
| - (async/put! out-chan msg)) |
| 52 | + (log/info "stream[ ] --- out-chan[ ] m<- plugin" msg) |
| 53 | + (async/>!! out-chan msg) |
| 54 | + (log/info "stream[ ] --- out-chan[m] --- plugin" (id msg))) |
53 | 55 |
|
54 | 56 | (defn send-message!
|
55 | 57 | [component msg]
|
|
78 | 80 | [input-stream output-stream]
|
79 | 81 | (let [in-chan (create-input-channel input-stream)
|
80 | 82 | input-stream (DataInputStream. input-stream)
|
81 |
| - output-stream (DataOutputStream. output-stream) |
82 | 83 | message-table (atom {})
|
83 | 84 | method-table (atom {})
|
84 | 85 | component {:input-stream input-stream
|
|
87 | 88 | :in-chan in-chan
|
88 | 89 | :message-table message-table
|
89 | 90 | :method-table method-table}]
|
90 |
| - (future (loop |
91 |
| - [] |
92 |
| - (when-let [msg (async/<!! in-chan)] |
93 |
| - (condp = (msg-type msg) |
94 | 91 |
|
95 |
| - msg/+response+ |
96 |
| - (let [f (:fn (get @message-table (id msg)))] |
97 |
| - (swap! message-table dissoc (id msg)) |
98 |
| - (f (value msg))) |
| 92 | + (future |
| 93 | + (try |
| 94 | + (loop [] |
| 95 | + (when-let [msg (async/<!! in-chan)] |
| 96 | + (log/info "stream[ ] --- in-chan[ ] ->m plugin" (id msg)) |
| 97 | + (condp = (msg-type msg) |
| 98 | + |
| 99 | + msg/+response+ |
| 100 | + (let [f (:fn (get @message-table (id msg)))] |
| 101 | + (swap! message-table dissoc (id msg)) |
| 102 | + ;; Don't block the handler to execute this. |
| 103 | + (async/thread (when f (f (value msg))))) |
| 104 | + |
| 105 | + msg/+request+ |
| 106 | + (let [f (get @method-table (method msg) method-not-found) |
| 107 | + ;; TODO - add async/thread here, remove from methods. |
| 108 | + result (f msg)] |
| 109 | + (send-message-async! |
| 110 | + component (->response-msg (id msg) result) nil)) |
99 | 111 |
|
100 |
| - msg/+request+ |
101 |
| - (let [f (get @method-table (method msg) method-not-found) |
102 |
| - result (f msg)] |
103 |
| - (send-message-async! |
104 |
| - component (->response-msg (id msg) result) nil)) |
| 112 | + msg/+notify+ |
| 113 | + (let [f (get @method-table (method msg) method-not-found) |
| 114 | + ;; TODO - see above. |
| 115 | + result (f msg)])) |
105 | 116 |
|
106 |
| - msg/+notify+ |
107 |
| - (let [f (get @method-table (method msg) method-not-found) |
108 |
| - result (f msg)])) |
| 117 | + (recur))) |
| 118 | + (catch Throwable t (log/info |
| 119 | + "Exception in message handler, aborting!" |
| 120 | + t)))) |
109 | 121 |
|
110 |
| - (recur)))) |
111 | 122 | component))
|
0 commit comments