Skip to content

Commit e64a7c1

Browse files
authored
Merge pull request #37 from clojure-lsp/receive-responses-while-blocking
Continue receiving responses while blocking requests or notifications
2 parents f662727 + 4feb7d6 commit e64a7c1

File tree

3 files changed

+163
-44
lines changed

3 files changed

+163
-44
lines changed

.clj-kondo/funcool/promesa/config.edn

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{:lint-as {promesa.core/-> clojure.core/->
2+
promesa.core/->> clojure.core/->>
3+
promesa.core/as-> clojure.core/as->
4+
promesa.core/let clojure.core/let
5+
promesa.core/plet clojure.core/let
6+
promesa.core/loop clojure.core/loop
7+
promesa.core/recur clojure.core/recur
8+
promesa.core/with-redefs clojure.core/with-redefs}}

src/lsp4clj/server.clj

Lines changed: 124 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -112,22 +112,81 @@
112112
log-title (format-error-code "Error receiving message" :internal-error)]
113113
(protocols.endpoint/log server :error e (str log-title "\n" message-details))))
114114

115-
(defn ^:private receive-message
116-
[server context message]
117-
(let [message-type (coercer/input-message-type message)]
118-
(try
115+
(defn thread-loop [buf-or-n f]
116+
(let [ch (async/chan buf-or-n)]
117+
(async/thread
119118
(discarding-stdout
120-
(case message-type
121-
(:parse-error :invalid-request)
122-
(protocols.endpoint/log server :error (format-error-code "Error reading message" message-type))
123-
:request
124-
(protocols.endpoint/receive-request server context message)
125-
(:response.result :response.error)
126-
(protocols.endpoint/receive-response server message)
127-
:notification
128-
(protocols.endpoint/receive-notification server context message)))
129-
(catch Throwable e ;; exceptions thrown by receive-response or receive-notification (receive-request catches its own exceptions)
130-
(log-error-receiving server e message)))))
119+
(loop []
120+
(when-let [arg (async/<!! ch)]
121+
(f arg)
122+
(recur)))))
123+
ch))
124+
125+
(defn ^:private dispatch-input
126+
"Dispatches messages received on the input-ch based on message type. Returns a
127+
channel which will close after the input-ch is closed."
128+
[server context input-ch]
129+
(let [;; In order to process some requests and (all) notifications in series,
130+
;; the language server sometimes needs to block client-initiated input.
131+
;; If the language server sends requests during that time, it needs to
132+
;; receive responses, even though it's blocking other input. Otherwise,
133+
;; it will end up in a deadlock, where it's waiting to receive a
134+
;; response off the input-ch and the input-ch isn't being read from
135+
;; because the server is blocking input. See
136+
;; https://github.com/clojure-lsp/clojure-lsp/issues/1500.
137+
138+
;; The messages all arrive in order on the input-ch so to get to the
139+
;; client's response, we have to queue whatever other messages it's
140+
;; sent. We do that by storing them in a sliding buffer. Because of the
141+
;; sliding buffer:
142+
;; * if the client sends a message which causes the language server to
143+
;; block, and
144+
;; * if the language server sends a request during that time, and
145+
;; * if the client sends more than 100 other messages between when the
146+
;; language server started blocking and when the client responds to
147+
;; the language server's request,
148+
;; * then the client's earliest messages will be dropped.
149+
;; The same is true in reverse.
150+
151+
;; We process the client- and language-server-initiated messages in
152+
;; separate threads.
153+
;; * Threads, so the language server can use >!! and so that we can use
154+
;; (>!! output-ch) to respect back pressure from clients that are slow
155+
;; to read.
156+
;; * Separate, so one can continue while the other is blocked.
157+
158+
;; (Jacob Maine): 100 is picked out of thin air. I have no idea how to
159+
;; estimate how big the buffer should be to avoid dropping messages. LSP
160+
;; communication tends to be very quiet, then very chatty, so it depends
161+
;; a lot on what the client and server are doing. I also don't know how
162+
;; many messages we could store without running into memory problems,
163+
;; since this is dependent on so many variables, not just the size of
164+
;; the JVM's memory, but also the size of the messages, which can be
165+
;; anywhere from a few bytes to megabytes.
166+
server-initiated-in-ch (thread-loop
167+
(async/sliding-buffer 100)
168+
(fn [response]
169+
(protocols.endpoint/receive-response server response)))
170+
client-initiated-in-ch (thread-loop
171+
(async/sliding-buffer 100)
172+
(fn [[message-type message]]
173+
(if (identical? :request message-type)
174+
(protocols.endpoint/receive-request server context message)
175+
(protocols.endpoint/receive-notification server context message))))]
176+
(async/go-loop []
177+
(if-let [message (async/<! input-ch)]
178+
(let [message-type (coercer/input-message-type message)]
179+
(case message-type
180+
(:parse-error :invalid-request)
181+
(protocols.endpoint/log server :error (format-error-code "Error reading message" message-type))
182+
(:response.result :response.error)
183+
(async/>! server-initiated-in-ch message)
184+
(:request :notification)
185+
(async/>! client-initiated-in-ch [message-type message]))
186+
(recur))
187+
(do
188+
(async/close! server-initiated-in-ch)
189+
(async/close! client-initiated-in-ch))))))
131190

132191
;; Expose endpoint methods to language servers
133192

@@ -199,21 +258,36 @@
199258
join]
200259
protocols.endpoint/IEndpoint
201260
(start [this context]
202-
(let [;; a thread so language server can use >!! and so that receive-message
203-
;; can use (>!! output-ch) to respect back pressure from clients that
204-
;; are slow to read.
205-
pipeline (async/thread
206-
(loop []
207-
(if-let [message (async/<!! input-ch)]
208-
(do
209-
(receive-message this context message)
210-
(recur))
211-
(async/close! output-ch))))]
261+
;; Start receiving messages.
262+
(let [pipeline (dispatch-input this context input-ch)]
263+
;; Wait to stop receiving messages.
212264
(async/go
213-
;; Wait for pipeline to close. This indicates input-ch was closed and
214-
;; that now output-ch is closed.
265+
;; When pipeline closes, it indicates input-ch has closed. We're done
266+
;; receiving.
215267
(async/<! pipeline)
216-
;; Do additional cleanup.
268+
;; Do cleanup.
269+
270+
;; The [docs](https://clojuredocs.org/clojure.core.async/close!) for
271+
;; `close!` say A) "The channel will no longer accept any puts", B)
272+
;; "Data in the channel remains available for taking", and C) "Logically
273+
;; closing happens after all puts have been delivered."
274+
275+
;; At this point the input-ch has been closed, which means any messages
276+
;; that were read before the channel was closed have been put on the
277+
;; channel (C). However, the takes off of it, the takes which then
278+
;; forward the messages to the language server, may or may not have
279+
;; happened (B). And even if the language server has received some
280+
;; messages, if it responds after this line closes the output-ch, the
281+
;; responses will be dropped (A).
282+
283+
;; All that to say, it's possible for the lsp4clj server to drop the
284+
;; language server's final few responses.
285+
286+
;; It doesn't really matter though, because the users of lsp4clj
287+
;; typically don't call `shutdown` on the lsp4clj server until they've
288+
;; received the `exit` notification, which is the client indicating it
289+
;; no longer expects any responses anyway.
290+
(async/close! output-ch)
217291
(async/close! log-ch)
218292
(some-> trace-ch async/close!)
219293
(on-close)
@@ -250,13 +324,16 @@
250324
(async/>!! output-ch notif)
251325
nil))
252326
(receive-response [this {:keys [id error result] :as resp}]
253-
(let [now (.instant clock)
254-
[pending-requests _] (swap-vals! pending-sent-requests* dissoc id)]
255-
(if-let [{:keys [p started] :as req} (get pending-requests id)]
256-
(do
257-
(trace this trace/received-response req resp started now)
258-
(deliver p (if error resp result)))
259-
(trace this trace/received-unmatched-response resp now))))
327+
(try
328+
(let [now (.instant clock)
329+
[pending-requests _] (swap-vals! pending-sent-requests* dissoc id)]
330+
(if-let [{:keys [p started] :as req} (get pending-requests id)]
331+
(do
332+
(trace this trace/received-response req resp started now)
333+
(deliver p (if error resp result)))
334+
(trace this trace/received-unmatched-response resp now)))
335+
(catch Throwable e
336+
(log-error-receiving this e resp))))
260337
(receive-request [this context {:keys [id method params] :as req}]
261338
(let [started (.instant clock)
262339
resp (lsp.responses/response id)]
@@ -293,15 +370,18 @@
293370
(log-error-receiving this e req)
294371
(async/>!! output-ch (internal-error-response resp req))))))
295372
(receive-notification [this context {:keys [method params] :as notif}]
296-
(let [now (.instant clock)]
297-
(trace this trace/received-notification notif now)
298-
(if (= method "$/cancelRequest")
299-
(if-let [pending-req (get @pending-received-requests* (:id params))]
300-
(p/cancel! pending-req)
301-
(trace this trace/received-unmatched-cancellation-notification notif now))
302-
(let [result (receive-notification method context params)]
303-
(when (identical? ::method-not-found result)
304-
(protocols.endpoint/log this :warn "received unexpected notification" method)))))))
373+
(try
374+
(let [now (.instant clock)]
375+
(trace this trace/received-notification notif now)
376+
(if (= method "$/cancelRequest")
377+
(if-let [pending-req (get @pending-received-requests* (:id params))]
378+
(p/cancel! pending-req)
379+
(trace this trace/received-unmatched-cancellation-notification notif now))
380+
(let [result (receive-notification method context params)]
381+
(when (identical? ::method-not-found result)
382+
(protocols.endpoint/log this :warn "received unexpected notification" method)))))
383+
(catch Throwable e
384+
(log-error-receiving this e notif)))))
305385

306386
(defn set-trace-level [server trace-level]
307387
(update server :tracer* reset! (trace/tracer-for-level trace-level)))

test/lsp4clj/server_test.clj

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,37 @@
133133
(is (nil? (server/send-notification server "req" {:body "foo"})))
134134
(server/shutdown server)))
135135

136+
(deftest should-receive-receive-response-to-request-sent-while-processing-notification
137+
;; https://github.com/clojure-lsp/clojure-lsp/issues/1500
138+
(let [input-ch (async/chan 3)
139+
output-ch (async/chan 3)
140+
server (server/chan-server {:output-ch output-ch
141+
:input-ch input-ch})
142+
client-resp (promise)]
143+
(server/start server nil)
144+
(with-redefs [server/receive-notification (fn [& _args]
145+
(let [req (server/send-request server "server-sent-request" {:body "foo"})
146+
_ (Thread/sleep 100)
147+
resp (server/deref-or-cancel req 1000 :broke-deadlock)]
148+
(deliver client-resp resp)))]
149+
;; The first pass of this fix used a `(chan 1)` instead of
150+
;; `(sliding-buffer 100)`. It worked when the client sent only two
151+
;; messages before the server could finish the first, but not when it sent
152+
;; 3 or more. The first notif made the server start sleeping, the second
153+
;; filled up the channel's buffer, and the third blocked on putting onto
154+
;; the channel, meaning the client response on the next few lines never
155+
;; got through. So these lines check that "several" client messages can be
156+
;; queued. See notes in lsp4clj.server for caveats about what "several"
157+
;; means.
158+
(async/put! input-ch (lsp.requests/notification "client-sent-notif" {:input 1}))
159+
(async/put! input-ch (lsp.requests/notification "client-sent-notif" {:input 2}))
160+
(async/put! input-ch (lsp.requests/notification "client-sent-notif" {:input 3}))
161+
(let [client-rcvd-request (h/assert-take output-ch)]
162+
(is (= "server-sent-request" (:method client-rcvd-request)))
163+
(async/put! input-ch (lsp.responses/response (:id client-rcvd-request) {:processed true})))
164+
(is (= {:processed true} (deref client-resp 10000 :timed-out))))
165+
(server/shutdown server)))
166+
136167
(deftest should-cancel-request-when-cancellation-notification-receieved
137168
(let [input-ch (async/chan 3)
138169
output-ch (async/chan 3)

0 commit comments

Comments
 (0)