Skip to content

Commit 2a98bf7

Browse files
committed
Drop pending requests to prioritize client's messages
1 parent 8a18e1e commit 2a98bf7

File tree

3 files changed

+144
-66
lines changed

3 files changed

+144
-66
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
## Unreleased
44

55
- Server requests can be treated as promesa promises.
6+
- In certain blocking situations server will drop its pending sent requests to
7+
prioritize the client's messages.
68

79
## v1.8.1
810

src/lsp4clj/server.clj

Lines changed: 91 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -116,71 +116,15 @@
116116
(recur)))))
117117
ch))
118118

119-
(defn ^:private dispatch-input
120-
"Dispatches messages received on the input-ch based on message type. Returns a
121-
channel which will close after the input-ch is closed."
122-
[server context input-ch]
123-
(let [;; In order to process some requests and (all) notifications in series,
124-
;; the language server sometimes needs to block client-initiated input.
125-
;; If the language server sends requests during that time, it needs to
126-
;; receive responses, even though it's blocking other input. Otherwise,
127-
;; it will end up in a deadlock, where it's waiting to receive a
128-
;; response off the input-ch and the input-ch isn't being read from
129-
;; because the server is blocking input. See
130-
;; https://github.com/clojure-lsp/clojure-lsp/issues/1500.
131-
132-
;; The messages all arrive in order on the input-ch so to get to the
133-
;; client's response, we have to queue whatever other messages it's
134-
;; sent. We do that by storing them in a sliding buffer. Because of the
135-
;; sliding buffer:
136-
;; * if the client sends a message which causes the language server to
137-
;; block, and
138-
;; * if the language server sends a request during that time, and
139-
;; * if the client sends more than 100 other messages between when the
140-
;; language server started blocking and when the client responds to
141-
;; the language server's request,
142-
;; * then the client's earliest messages will be dropped.
143-
;; The same is true in reverse.
144-
145-
;; We process the client- and language-server-initiated messages in
146-
;; separate threads.
147-
;; * Threads, so the language server can use >!! and so that we can use
148-
;; (>!! output-ch) to respect back pressure from clients that are slow
149-
;; to read.
150-
;; * Separate, so one can continue while the other is blocked.
151-
152-
;; (Jacob Maine): 100 is picked out of thin air. I have no idea how to
153-
;; estimate how big the buffer should be to avoid dropping messages. LSP
154-
;; communication tends to be very quiet, then very chatty, so it depends
155-
;; a lot on what the client and server are doing. I also don't know how
156-
;; many messages we could store without running into memory problems,
157-
;; since this is dependent on so many variables, not just the size of
158-
;; the JVM's memory, but also the size of the messages, which can be
159-
;; anywhere from a few bytes to megabytes.
160-
server-initiated-in-ch (thread-loop
161-
(async/sliding-buffer 100)
162-
(fn [response]
163-
(protocols.endpoint/receive-response server response)))
164-
client-initiated-in-ch (thread-loop
165-
(async/sliding-buffer 100)
166-
(fn [[message-type message]]
167-
(if (identical? :request message-type)
168-
(protocols.endpoint/receive-request server context message)
169-
(protocols.endpoint/receive-notification server context message))))]
170-
(async/go-loop []
171-
(if-let [message (async/<! input-ch)]
172-
(let [message-type (coercer/input-message-type message)]
173-
(case message-type
174-
(:parse-error :invalid-request)
175-
(protocols.endpoint/log server :error (format-error-code "Error reading message" message-type))
176-
(:response.result :response.error)
177-
(async/>! server-initiated-in-ch message)
178-
(:request :notification)
179-
(async/>! client-initiated-in-ch [message-type message]))
180-
(recur))
181-
(do
182-
(async/close! server-initiated-in-ch)
183-
(async/close! client-initiated-in-ch))))))
119+
(def input-buffer-size
120+
;; (Jacob Maine): This number is picked out of thin air. I have no idea how to
121+
;; estimate how big the buffer could or should be. LSP communication tends to
122+
;; be very quiet, then very chatty, so it depends a lot on what the client and
123+
;; server are doing. I also don't know how many messages we could store
124+
;; without running into memory problems, since this is dependent on so many
125+
;; variables, not just the size of the JVM's memory, but also the size of the
126+
;; messages, which can be anywhere from a few bytes to several megabytes.
127+
1024)
184128

185129
;; Expose endpoint methods to language servers
186130

@@ -253,14 +197,95 @@
253197
protocols.endpoint/IEndpoint
254198
(start [this context]
255199
;; Start receiving messages.
256-
(let [pipeline (dispatch-input this context input-ch)]
200+
(let [;; In order to process some requests and (all) notifications in
201+
;; series, the language server sometimes needs to block
202+
;; client-initiated input. If the language server sends requests
203+
;; during that time, it needs to receive responses, even though it's
204+
;; blocking other input. Otherwise, it will end up in a deadlock,
205+
;; where it's waiting to receive a response off the input-ch and the
206+
;; input-ch isn't being read from because the server is blocking
207+
;; input. See https://github.com/clojure-lsp/clojure-lsp/issues/1500.
208+
209+
;; To avoid this problem we processes client-initiated input (client
210+
;; requests and notifications) and server-initiated input (client
211+
;; responses) separately.
212+
213+
;; If the server starts blocking waiting for a response, we buffer the
214+
;; client's requests and notifications until the server is prepared to
215+
;; process them.
216+
217+
;; However, if too many client requests and notifications arrive
218+
;; before the response, the buffer fills up. In this situation, we
219+
;; abort all the server's pending requests, hoping that this will
220+
;; allow it to process the client's other messages. We do this to
221+
;; prioritize the client's messages over the server's.
222+
223+
;; The situation is different in reverse. The client can also start
224+
;; blocking waiting for a server response. In the meantime, the server
225+
;; can send lots of messages. But this is bad behavior on the server's
226+
;; part. So in this scenario we drop the server's earliest requests
227+
;; and notifications.
228+
229+
;; We do that by storing them in a sliding buffer. Because of the
230+
;; sliding buffer:
231+
;; * if the language server sends a message, and
232+
;; * if while processing that message the client sends a request which
233+
;; causes it to block, and
234+
;; * if the language server sends too many other messages between when
235+
;; the client started blocking and when the language server responds
236+
;; to the client's request,
237+
;; * then the language server's messages will start being dropped,
238+
;; starting from the earliest.
239+
240+
;; We process the client- and language-server-initiated messages in
241+
;; separate threads.
242+
;; * Threads, so the language server can use >!! and so that we can use
243+
;; (>!! output-ch) to respect back pressure from clients that are slow
244+
;; to read.
245+
;; * Separate, so one can continue while the other is blocked.
246+
server-initiated-in-ch (thread-loop
247+
(async/sliding-buffer input-buffer-size)
248+
(fn [response]
249+
(protocols.endpoint/receive-response this response)))
250+
client-initiated-in-ch (thread-loop
251+
input-buffer-size
252+
(fn [[message-type message]]
253+
(if (identical? :request message-type)
254+
(protocols.endpoint/receive-request this context message)
255+
(protocols.endpoint/receive-notification this context message))))
256+
reject-pending-sent-requests (fn [exception]
257+
(doseq [pending-request (vals @pending-sent-requests*)]
258+
(p/reject! (:p pending-request)
259+
exception)))
260+
pipeline (async/go-loop []
261+
(if-let [message (async/<! input-ch)]
262+
(let [message-type (coercer/input-message-type message)]
263+
(case message-type
264+
(:parse-error :invalid-request)
265+
(protocols.endpoint/log this :error (format-error-code "Error reading message" message-type))
266+
(:response.result :response.error)
267+
(async/>! server-initiated-in-ch message)
268+
(:request :notification)
269+
(when-not (async/offer! client-initiated-in-ch [message-type message])
270+
;; Buffers full. Fail any waiting pending requests and...
271+
(reject-pending-sent-requests
272+
(ex-info "Buffer of client messages exhausted." {}))
273+
;; ... try again, but park this time, to respect
274+
;; back pressure from the client.
275+
(async/>! client-initiated-in-ch [message-type message])))
276+
(recur))
277+
(do
278+
(async/close! server-initiated-in-ch)
279+
(async/close! client-initiated-in-ch))))]
257280
;; Wait to stop receiving messages.
258281
(async/go
259282
;; When pipeline closes, it indicates input-ch has closed. We're done
260283
;; receiving.
261284
(async/<! pipeline)
262285
;; Do cleanup.
263286

287+
(reject-pending-sent-requests (ex-info "Server shutting down." {}))
288+
264289
;; The [docs](https://clojuredocs.org/clojure.core.async/close!) for
265290
;; `close!` say A) "The channel will no longer accept any puts", B)
266291
;; "Data in the channel remains available for taking", and C) "Logically

test/lsp4clj/server_test.clj

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,57 @@
164164
(is (= {:processed true} (deref client-resp 10000 :timed-out))))
165165
(server/shutdown server)))
166166

167+
(deftest should-fail-pending-requests-if-too-many-inbound-messages-are-buffered
168+
;; * If the server sends a request, and blocks waiting for the response,
169+
;; * and if the client sends too many other messages before responding,
170+
;; * then the server's buffer of unprocessed inbound messages will fill up.
171+
;; To avoid dropping messages or buffering endlessly, the server eventually
172+
;; aborts its request.
173+
(with-redefs [server/input-buffer-size 5]
174+
(let [input-ch (async/chan 3)
175+
output-ch (async/chan 3)
176+
server (server/chan-server {:output-ch output-ch
177+
:input-ch input-ch})
178+
client-req-id* (atom 0)
179+
client-req (fn [body]
180+
(async/put! input-ch (lsp.requests/request (swap! client-req-id* inc)
181+
"client-sent-request"
182+
body)))]
183+
(server/start server nil)
184+
(with-redefs [server/receive-request (fn [_ _ {:keys [server-action] :as client-req}]
185+
(if (= :block server-action)
186+
(let [req (server/send-request server "server-sent-request" {:body "foo"})]
187+
(try
188+
(deref req)
189+
{:processed client-req}
190+
(catch Throwable _
191+
{:error {:result :deref-aborted}})))
192+
{:processed client-req}))]
193+
;; The client sends a request which causes the server to send its own
194+
;; request. The server starts blocking, waiting for the client to respond.
195+
(client-req {:server-action :block})
196+
;; The client receives the server's request but doesn't respond yet.
197+
(is (= "server-sent-request" (:method (h/assert-take output-ch))))
198+
;; Before responding to the server's request, the client sends many other
199+
;; messages. The server will buffer these messages.
200+
(dotimes [n server/input-buffer-size]
201+
(client-req {:server-action :buffer, :input n}))
202+
;; The server is still blocking.
203+
(is (h/assert-no-take output-ch))
204+
;; The client sends one more mesage, which is too many for the server to buffer.
205+
(client-req {:server-action :overflow})
206+
;; To prioritize the client's inbound messages, the server's outbound
207+
;; request is aborted, causing it to stop waiting for a client response.
208+
(is (= {:jsonrpc "2.0", :id 1, :error {:result :deref-aborted}}
209+
(h/assert-take output-ch)))
210+
;; Now the server can process every other message from the client.
211+
(dotimes [n server/input-buffer-size]
212+
(is (= {:processed {:server-action :buffer, :input n}}
213+
(:result (h/assert-take output-ch)))))
214+
(is (= {:processed {:server-action :overflow}}
215+
(:result (h/assert-take output-ch)))))
216+
(server/shutdown server))))
217+
167218
(deftest should-cancel-request-when-cancellation-notification-receieved
168219
(let [input-ch (async/chan 3)
169220
output-ch (async/chan 3)

0 commit comments

Comments
 (0)