|
112 | 112 | log-title (format-error-code "Error receiving message" :internal-error)]
|
113 | 113 | (protocols.endpoint/log server :error e (str log-title "\n" message-details))))
|
114 | 114 |
|
115 |
| -(defn ^:private receive-message |
116 |
| - [server context message] |
117 |
| - (let [message-type (coercer/input-message-type message)] |
118 |
| - (try |
| 115 | +(defn ^:private spawn-receipt-thread [buf-or-n f] |
| 116 | + (let [receipt-ch (async/chan buf-or-n)] |
| 117 | + (async/thread |
119 | 118 | (discarding-stdout
|
120 |
| - (case message-type |
121 |
| - (:parse-error :invalid-request) |
122 |
| - (protocols.endpoint/log server :error (format-error-code "Error reading message" message-type)) |
123 |
| - :request |
124 |
| - (protocols.endpoint/receive-request server context message) |
125 |
| - (:response.result :response.error) |
126 |
| - (protocols.endpoint/receive-response server message) |
127 |
| - :notification |
128 |
| - (protocols.endpoint/receive-notification server context message))) |
129 |
| - (catch Throwable e ;; exceptions thrown by receive-response or receive-notification (receive-request catches its own exceptions) |
130 |
| - (log-error-receiving server e message))))) |
| 119 | + (loop [] |
| 120 | + (when-let [[message-type message] (async/<!! receipt-ch)] |
| 121 | + (f message-type message) |
| 122 | + (recur))))) |
| 123 | + receipt-ch)) |
| 124 | + |
| 125 | +(defn ^:private run-pipeline |
| 126 | + "Forwards messages received on the input-ch to the language server, for |
| 127 | + further processing." |
| 128 | + [server context input-ch] |
| 129 | + (let [;; In order to process some requests and (all) notifications in series, |
| 130 | + ;; the language server sometimes needs to block client-initiated input. |
| 131 | + ;; If the language server sends requests during that time, it needs to |
| 132 | + ;; receive responses, even though it's blocking other input. Otherwise, |
| 133 | + ;; it will end up in a deadlock, where it's waiting to receive a |
| 134 | + ;; response off the input-ch and the input-ch isn't being read from |
| 135 | + ;; because the server is blocking input. See |
| 136 | + ;; https://github.com/clojure-lsp/clojure-lsp/issues/1500. |
| 137 | + |
| 138 | + ;; The messages all arrive in order on the input-ch so to get to the |
| 139 | + ;; client's response, we have to queue whatever other messages it's |
| 140 | + ;; sent. We do that by storing them in a sliding buffer. Because of the |
| 141 | + ;; sliding buffer: |
| 142 | + ;; * if the client sends a message which causes the language server to |
| 143 | + ;; block, and |
| 144 | + ;; * if the language server sends a request during that time, and |
| 145 | + ;; * if the client sends more than 100 other messages between when the |
| 146 | + ;; language server started blocking and when the client responds to |
| 147 | + ;; the language server's request, |
| 148 | + ;; * then the client's earliest messages will be dropped. |
| 149 | + ;; The same is true in reverse. |
| 150 | + |
| 151 | + ;; We process the client- and language-server-initiated messages in |
| 152 | + ;; separate threads. |
| 153 | + ;; * Threads, so the language server can use >!! and so that we can use |
| 154 | + ;; (>!! output-ch) to respect back pressure from clients that are slow |
| 155 | + ;; to read. |
| 156 | + ;; * Separate, so one can continue while the other is blocked. |
| 157 | + server-initiated-in-ch (spawn-receipt-thread |
| 158 | + (async/sliding-buffer 100) |
| 159 | + (fn [_ message] |
| 160 | + (try |
| 161 | + (protocols.endpoint/receive-response server message) |
| 162 | + (catch Throwable e |
| 163 | + (log-error-receiving server e message))))) |
| 164 | + client-initiated-in-ch (spawn-receipt-thread |
| 165 | + (async/sliding-buffer 100) |
| 166 | + (fn [message-type message] |
| 167 | + (if (identical? :request message-type) |
| 168 | + ;; receive-request catches its own exceptions |
| 169 | + (protocols.endpoint/receive-request server context message) |
| 170 | + (try |
| 171 | + (protocols.endpoint/receive-notification server context message) |
| 172 | + (catch Throwable e |
| 173 | + (log-error-receiving server e message))))))] |
| 174 | + (async/go-loop [] |
| 175 | + (if-let [message (async/<! input-ch)] |
| 176 | + (let [message-type (coercer/input-message-type message)] |
| 177 | + (case message-type |
| 178 | + (:parse-error :invalid-request) |
| 179 | + (protocols.endpoint/log server :error (format-error-code "Error reading message" message-type)) |
| 180 | + (:response.result :response.error) |
| 181 | + (async/>! server-initiated-in-ch [:response message]) |
| 182 | + (:request :notification) |
| 183 | + (async/>! client-initiated-in-ch [message-type message])) |
| 184 | + (recur)) |
| 185 | + (do |
| 186 | + (async/close! server-initiated-in-ch) |
| 187 | + (async/close! client-initiated-in-ch)))))) |
131 | 188 |
|
132 | 189 | ;; Expose endpoint methods to language servers
|
133 | 190 |
|
|
199 | 256 | join]
|
200 | 257 | protocols.endpoint/IEndpoint
|
201 | 258 | (start [this context]
|
202 |
| - (let [;; a thread so language server can use >!! and so that receive-message |
203 |
| - ;; can use (>!! output-ch) to respect back pressure from clients that |
204 |
| - ;; are slow to read. |
205 |
| - pipeline (async/thread |
206 |
| - (loop [] |
207 |
| - (if-let [message (async/<!! input-ch)] |
208 |
| - (do |
209 |
| - (receive-message this context message) |
210 |
| - (recur)) |
211 |
| - (async/close! output-ch))))] |
| 259 | + ;; Start receiving messages. |
| 260 | + (let [pipeline (run-pipeline this context input-ch)] |
| 261 | + ;; Wait to stop receiving messages. |
212 | 262 | (async/go
|
213 |
| - ;; Wait for pipeline to close. This indicates input-ch was closed and |
214 |
| - ;; that now output-ch is closed. |
| 263 | + ;; When pipeline closes, it indicates input-ch has closed. We're done |
| 264 | + ;; receiving. |
215 | 265 | (async/<! pipeline)
|
216 |
| - ;; Do additional cleanup. |
| 266 | + ;; Do cleanup. |
| 267 | + ;; TODO: do we really know that we've finished putting to output-ch? |
| 268 | + (async/close! output-ch) |
217 | 269 | (async/close! log-ch)
|
218 | 270 | (some-> trace-ch async/close!)
|
219 | 271 | (on-close)
|
|
0 commit comments