|
199 | 199 | protocols.endpoint/IEndpoint
|
200 | 200 | (start [this context]
|
201 | 201 | ;; Start receiving messages.
|
202 |
| - (let [;; In order to process some requests and (all) notifications in |
203 |
| - ;; series, the language server sometimes needs to block |
204 |
| - ;; client-initiated input. If the language server sends requests |
205 |
| - ;; during that time, it needs to receive responses, even though it's |
206 |
| - ;; blocking other input. Otherwise, it will end up in a deadlock, |
207 |
| - ;; where it's waiting to receive a response off the input-ch and the |
208 |
| - ;; input-ch isn't being read from because the server is blocking |
209 |
| - ;; input. See https://github.com/clojure-lsp/clojure-lsp/issues/1500. |
210 |
| - |
211 |
| - ;; To avoid this problem we processes client-initiated input (client |
212 |
| - ;; requests and notifications) and server-initiated input (client |
213 |
| - ;; responses) separately. |
214 |
| - |
215 |
| - ;; If the server starts blocking waiting for a response, we buffer the |
216 |
| - ;; client's requests and notifications until the server is prepared to |
217 |
| - ;; process them. |
218 |
| - |
219 |
| - ;; However, if too many client requests and notifications arrive |
220 |
| - ;; before the response, the buffer fills up. In this situation, we |
221 |
| - ;; abort all the server's pending requests, hoping that this will |
222 |
| - ;; allow it to process the client's other messages. We do this to |
223 |
| - ;; prioritize the client's messages over the server's. |
224 |
| - |
225 |
| - ;; The situation is different in reverse. The client can also start |
226 |
| - ;; blocking waiting for a server response. In the meantime, the server |
227 |
| - ;; can send lots of messages. But this is bad behavior on the server's |
228 |
| - ;; part. So in this scenario we drop the server's earliest requests |
229 |
| - ;; and notifications. |
230 |
| - |
231 |
| - ;; We do that by storing them in a sliding buffer. Because of the |
232 |
| - ;; sliding buffer: |
233 |
| - ;; * if the language server sends a message, and |
234 |
| - ;; * if while processing that message the client sends a request which |
235 |
| - ;; causes it to block, and |
236 |
| - ;; * if the language server sends too many other messages between when |
237 |
| - ;; the client started blocking and when the language server responds |
238 |
| - ;; to the client's request, |
239 |
| - ;; * then the language server's messages will start being dropped, |
240 |
| - ;; starting from the earliest. |
241 |
| - |
242 |
| - ;; We process the client- and language-server-initiated messages in |
243 |
| - ;; separate threads. |
244 |
| - ;; * Threads, so the language server can use >!! and so that we can use |
245 |
| - ;; (>!! output-ch) to respect back pressure from clients that are slow |
246 |
| - ;; to read. |
247 |
| - ;; * Separate, so one can continue while the other is blocked. |
248 |
| - server-initiated-in-ch (thread-loop |
249 |
| - (async/sliding-buffer input-buffer-size) |
250 |
| - (fn [response] |
251 |
| - (protocols.endpoint/receive-response this response))) |
| 202 | + (let [;; The language server sometimes needs to stop processing inbound |
| 203 | + ;; requests and notifications. |
| 204 | + |
| 205 | + ;; We do this automatically whenever we receive a notification, so |
| 206 | + ;; that we are sure the language server processes didChange before |
| 207 | + ;; moving on to other requests. But the language server can also |
| 208 | + ;; decide to do it itself, even in a request, by processing everything |
| 209 | + ;; synchronously instead of returning a future. |
| 210 | + |
| 211 | + ;; In a request (or even in a notification) the language server can |
| 212 | + ;; send a request to the client and block waiting for a response. |
| 213 | + |
| 214 | + ;; It's possible -- even likely -- that the client will send other |
| 215 | + ;; messages between the time that the server sent its request and the |
| 216 | + ;; time that the client responds. |
| 217 | + |
| 218 | + ;; All inbound messages -- requests, notifications, and responses -- |
| 219 | + ;; come in on a single stream. |
| 220 | + |
| 221 | + ;; If the server blocks waiting for a response, we have to set aside |
| 222 | + ;; the other inbound requests and notifications, so that we can get to |
| 223 | + ;; the response. That is, while the server is blocking we cannot stop |
| 224 | + ;; reading input. Otherwise, the server will end up in a deadlock, |
| 225 | + ;; where it's waiting to receive a response off the input-ch but new |
| 226 | + ;; messages aren't being put on the input-ch because the server is |
| 227 | + ;; blocked. See |
| 228 | + ;; https://github.com/clojure-lsp/clojure-lsp/issues/1500. |
| 229 | + |
| 230 | + ;; To accomplish this we processes inbound requests and notifications |
| 231 | + ;; separately from inbound responses. If the server starts blocking |
| 232 | + ;; waiting for a response, we buffer the inbound requests and |
| 233 | + ;; notificatons until the server is prepared to process them. |
| 234 | + |
| 235 | + ;; If the buffer becomes full, we assume that the server isn't |
| 236 | + ;; handling inbound requests and notifcations because it's waiting for |
| 237 | + ;; a response. So, following our assumption, we reject the server |
| 238 | + ;; requests so that the server will stop waiting for the response. |
| 239 | + ;; It's possible that this won't work -- our assumption might have |
| 240 | + ;; been wrong and the server might have stalled for some other reason. |
| 241 | + ;; So after rejecting, we park trying to add the latest inbound |
| 242 | + ;; request or notification to the buffer. |
| 243 | + |
| 244 | + ;; This ensures we don't drop any client messages, though we could |
| 245 | + ;; stop reading them if the server keeps blocking. If we're lucky |
| 246 | + ;; either the language server will unblock, or the client will decided |
| 247 | + ;; to stop sending messages because it's failed to receive a server |
| 248 | + ;; response (i.e., we will have managed to apply backpressure to the |
| 249 | + ;; client). If we're unlucky, the server could keep blocking forever. |
| 250 | + ;; In any case, this scenario -- where we stop reading messages -- is |
| 251 | + ;; presumed to be both very rare and indicative of a problem that can |
| 252 | + ;; be solved only in the client or the language server. |
252 | 253 | client-initiated-in-ch (thread-loop
|
253 | 254 | input-buffer-size
|
254 | 255 | (fn [[message-type message]]
|
|
266 | 267 | (:parse-error :invalid-request)
|
267 | 268 | (protocols.endpoint/log this :error (format-error-code "Error reading message" message-type))
|
268 | 269 | (:response.result :response.error)
|
269 |
| - (async/>! server-initiated-in-ch message) |
| 270 | + (protocols.endpoint/receive-response this message) |
270 | 271 | (:request :notification)
|
271 | 272 | (when-not (async/offer! client-initiated-in-ch [message-type message])
|
272 | 273 | ;; Buffers full. Fail any waiting pending requests and...
|
273 | 274 | (reject-pending-sent-requests
|
274 | 275 | (ex-info "Buffer of client messages exhausted." {}))
|
275 |
| - ;; ... try again, but park this time, to respect |
276 |
| - ;; back pressure from the client. |
| 276 | + ;; ... try again, but park this time. |
277 | 277 | (async/>! client-initiated-in-ch [message-type message])))
|
278 | 278 | (recur))
|
279 |
| - (do |
280 |
| - (async/close! server-initiated-in-ch) |
281 |
| - (async/close! client-initiated-in-ch))))] |
282 |
| - ;; Wait to stop receiving messages. |
| 279 | + (async/close! client-initiated-in-ch)))] |
283 | 280 | (async/go
|
284 |
| - ;; When pipeline closes, it indicates input-ch has closed. We're done |
285 |
| - ;; receiving. |
| 281 | + ;; Wait to stop receiving messages. |
286 | 282 | (async/<! pipeline)
|
287 |
| - ;; Do cleanup. |
| 283 | + ;; The pipeline has closed, indicating input-ch has closed. We're done |
| 284 | + ;; receiving. Do cleanup. |
288 | 285 |
|
289 |
| - (reject-pending-sent-requests (ex-info "Server shutting down." {})) |
| 286 | + (reject-pending-sent-requests (ex-info "Server shutting down. Input is closed so no response is possible." {})) |
290 | 287 |
|
291 | 288 | ;; The [docs](https://clojuredocs.org/clojure.core.async/close!) for
|
292 | 289 | ;; `close!` say A) "The channel will no longer accept any puts", B)
|
|
331 | 328 | req (lsp.requests/request id method body)
|
332 | 329 | pending-request (pending-request id method now this)]
|
333 | 330 | (trace this trace/sending-request req now)
|
334 |
| - ;; Important: record request before sending it, so it is sure to be |
| 331 | + ;; Important: record request before sending it, so it's sure to be |
335 | 332 | ;; available during receive-response.
|
336 | 333 | (swap! pending-sent-requests* assoc id pending-request)
|
337 | 334 | ;; respect back pressure from clients that are slow to read; (go (>!)) will not suffice
|
|
351 | 348 | (if-let [{:keys [p started] :as req} (get pending-requests id)]
|
352 | 349 | (do
|
353 | 350 | (trace this trace/received-response req resp started now)
|
| 351 | + ;; We resolve the promise whether or not the client completed the |
| 352 | + ;; request successfully. The language-server is expected to |
| 353 | + ;; determine whether there was an error with something like: |
| 354 | + ;; `(let [{:keys [error] :as resp} (deref-or-cancel ,,,)])` |
| 355 | + ;; It would be somewhat more elegant to reject the promise when the |
| 356 | + ;; client had errors, so that the language-server could use tools |
| 357 | + ;; like `p/catch`. |
354 | 358 | (p/resolve! p (if error resp result)))
|
355 | 359 | (trace this trace/received-unmatched-response resp now)))
|
356 | 360 | (catch Throwable e
|
|
0 commit comments