Skip to content

Commit fea31a1

Browse files
authored
Merge pull request #38 from clojure-lsp/avoid-dropping-client-messages
Avoid dropping client messages
2 parents 34fe65d + 38dc302 commit fea31a1

File tree

6 files changed

+328
-115
lines changed

6 files changed

+328
-115
lines changed

.clj-kondo/funcool/promesa/config.edn

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@
55
promesa.core/plet clojure.core/let
66
promesa.core/loop clojure.core/loop
77
promesa.core/recur clojure.core/recur
8-
promesa.core/with-redefs clojure.core/with-redefs}}
8+
promesa.core/with-redefs clojure.core/with-redefs
9+
promesa.core/doseq clojure.core/doseq}}

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
## Unreleased
44

5+
- In certain situations the server will abort its pending sent requests to avoid
6+
blocking the client's messages.
7+
- Server requests can be treated as promesa promises.
8+
- Bump promesa to `10.0.594`
9+
510
## v1.8.1
611

712
- Bump promesa to `10.0.571`
@@ -16,6 +21,8 @@
1621

1722
## v1.7.3
1823

24+
- Server continues receiving responses while it is blocking requests or notifications.
25+
1926
## v1.7.2
2027

2128
## v1.7.1

README.md

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,28 @@ Sending a request is similar, with `lsp4clj.server/send-request`. This method re
9797
response))
9898
```
9999

100-
The request object presents the same interface as `future`. It responds to `future-cancel` (which also sends `$/cancelRequest`), `realized?`, `future?`, `future-done?` and `future-cancelled?`.
100+
The request object presents the same interface as `future`. It responds to `realized?`, `future?`, `future-done?` and `future-cancelled?`.
101101

102-
If the request is cancelled, later invocations of `deref` will return `:lsp4clj.server/cancelled`.
102+
If you call `future-cancel` on the request object, the server will send the client a `$/cancelRequest`. `$/cancelRequest` is sent only once, although `lsp4clj.server/deref-or-cancel` or `future-cancel` can be called multiple times. After a request is cancelled, later invocations of `deref` will return `:lsp4clj.server/cancelled`.
103103

104-
`$/cancelRequest` is sent only once, although `lsp4clj.server/deref-or-cancel` or `future-cancel` can be called multiple times.
104+
Alternatively, you can convert the request to a promesa promise, and handle it using that library's tools:
105+
106+
```clojure
107+
(let [request (lsp4clj.server/send-request server "..." params)]
108+
(-> request
109+
(promesa/promise)
110+
(promesa/then (fn [response] {:result :client-success
111+
:value 1
112+
:resp response}))
113+
(promesa/catch (fn [ex-response] {:result :client-error
114+
:value 10
115+
:resp (ex-data ex-response)}))
116+
(promesa/timeout 10000 {:result :timeout
117+
:value 100})
118+
(promesa/then #(update % :value inc))))
119+
```
120+
121+
In this case `(promesa/cancel! request)` will send `$/cancelRequest`.
105122

106123
### Start and stop a server
107124

deps.edn

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
org.clojure/core.async {:mvn/version "1.5.648"}
33
camel-snake-kebab/camel-snake-kebab {:mvn/version "0.4.3"}
44
cheshire/cheshire {:mvn/version "5.11.0"}
5-
funcool/promesa {:mvn/version "10.0.571"}}
5+
funcool/promesa {:mvn/version "10.0.594"}}
66
:paths ["src" "resources"]
77
:aliases {:test {:extra-deps {lambdaisland/kaocha {:mvn/version "1.64.1010"}}
88
:extra-paths ["test"]

src/lsp4clj/server.clj

Lines changed: 131 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
[lsp4clj.protocols.endpoint :as protocols.endpoint]
1010
[lsp4clj.trace :as trace]
1111
[promesa.core :as p]
12-
[promesa.protocols])
12+
[promesa.protocols :as p.protocols])
1313
(:import
1414
(java.util.concurrent CancellationException)))
1515

@@ -28,47 +28,41 @@
2828
`(binding [*out* null-output-stream-writer]
2929
~@body))
3030

31+
(defn- resolve-ex-data [p]
32+
(p/catch p (fn [ex] (or (ex-data ex) (p/rejected ex)))))
33+
3134
(defprotocol IBlockingDerefOrCancel
3235
(deref-or-cancel [this timeout-ms timeout-val]))
3336

34-
(defrecord PendingRequest [p cancelled? id method started server]
37+
(defrecord PendingRequest [p id method started]
3538
clojure.lang.IDeref
3639
(deref [_] (deref p))
3740
clojure.lang.IBlockingDeref
3841
(deref [_ timeout-ms timeout-val]
39-
(deref p timeout-ms timeout-val))
42+
(deref (resolve-ex-data p) timeout-ms timeout-val))
4043
IBlockingDerefOrCancel
41-
(deref-or-cancel [this timeout-ms timeout-val]
42-
(let [result (deref this timeout-ms ::timeout)]
43-
(if (= ::timeout result)
44-
(do (future-cancel this)
44+
(deref-or-cancel [_ timeout-ms timeout-val]
45+
(let [result (deref (resolve-ex-data p) timeout-ms ::timeout)]
46+
(if (identical? ::timeout result)
47+
(do (p/cancel! p)
4548
timeout-val)
4649
result)))
4750
clojure.lang.IPending
48-
(isRealized [_] (realized? p))
51+
(isRealized [_] (p/done? p))
4952
java.util.concurrent.Future
50-
(get [this]
51-
(let [result (deref this)]
52-
(if (= ::cancelled result)
53-
(throw (java.util.concurrent.CancellationException.))
54-
result)))
55-
(get [this timeout unit]
56-
(let [result (deref this (.toMillis unit timeout) ::timeout)]
57-
(case result
58-
::cancelled (throw (java.util.concurrent.CancellationException.))
59-
::timeout (throw (java.util.concurrent.TimeoutException.))
53+
(get [_] (deref p))
54+
(get [_ timeout unit]
55+
(let [result (deref p (.toMillis unit timeout) ::timeout)]
56+
(if (identical? ::timeout result)
57+
(throw (java.util.concurrent.TimeoutException.))
6058
result)))
61-
(isCancelled [_] @cancelled?)
62-
(isDone [this] (or (.isRealized this) (.isCancelled this)))
63-
(cancel [this _interrupt?]
64-
(if (.isDone this)
65-
false
66-
(if (compare-and-set! cancelled? false true)
67-
(do
68-
(protocols.endpoint/send-notification server "$/cancelRequest" {:id id})
69-
(deliver p ::cancelled)
70-
true)
71-
false))))
59+
(isCancelled [_] (p/cancelled? p))
60+
(isDone [_] (p/done? p))
61+
(cancel [_ _interrupt?]
62+
(p/cancel! p)
63+
(p/cancelled? p))
64+
p.protocols/IPromiseFactory
65+
(-promise [_] p))
7266

7367
;; Avoid error: java.lang.IllegalArgumentException: Multiple methods in multimethod 'simple-dispatch' match dispatch value: class lsp4clj.server.PendingRequest -> interface clojure.lang.IPersistentMap and interface clojure.lang.IDeref, and neither is preferred
7468
;; Only when CIDER is running? See https://github.com/thi-ng/color/issues/10
@@ -96,12 +90,19 @@
9690
Sends `$/cancelRequest` only once, though `lsp4clj.server/deref-or-cancel` or
9791
`future-cancel` can be called multiple times."
9892
[id method started server]
99-
(map->PendingRequest {:p (promise)
100-
:cancelled? (atom false)
101-
:id id
102-
:method method
103-
:started started
104-
:server server}))
93+
(let [p (p/deferred)]
94+
;; Set up a side-effect so that when the Request is cancelled, we inform the
95+
;; client. This cannot be `(-> (p/deferred) (p/catch))` because that returns
96+
;; a promise which, when cancelled, does nothing because there's no
97+
;; exception handler chained onto it. Instead, we must cancel the
98+
;; `(p/deffered)` promise itself.
99+
(p/catch p CancellationException
100+
(fn [_]
101+
(protocols.endpoint/send-notification server "$/cancelRequest" {:id id})))
102+
(map->PendingRequest {:p p
103+
:id id
104+
:method method
105+
:started started})))
105106

106107
(defn ^:private format-error-code [description error-code]
107108
(let [{:keys [code message]} (lsp.errors/by-key error-code)]
@@ -122,71 +123,15 @@
122123
(recur)))))
123124
ch))
124125

125-
(defn ^:private dispatch-input
126-
"Dispatches messages received on the input-ch based on message type. Returns a
127-
channel which will close after the input-ch is closed."
128-
[server context input-ch]
129-
(let [;; In order to process some requests and (all) notifications in series,
130-
;; the language server sometimes needs to block client-initiated input.
131-
;; If the language server sends requests during that time, it needs to
132-
;; receive responses, even though it's blocking other input. Otherwise,
133-
;; it will end up in a deadlock, where it's waiting to receive a
134-
;; response off the input-ch and the input-ch isn't being read from
135-
;; because the server is blocking input. See
136-
;; https://github.com/clojure-lsp/clojure-lsp/issues/1500.
137-
138-
;; The messages all arrive in order on the input-ch so to get to the
139-
;; client's response, we have to queue whatever other messages it's
140-
;; sent. We do that by storing them in a sliding buffer. Because of the
141-
;; sliding buffer:
142-
;; * if the client sends a message which causes the language server to
143-
;; block, and
144-
;; * if the language server sends a request during that time, and
145-
;; * if the client sends more than 100 other messages between when the
146-
;; language server started blocking and when the client responds to
147-
;; the language server's request,
148-
;; * then the client's earliest messages will be dropped.
149-
;; The same is true in reverse.
150-
151-
;; We process the client- and language-server-initiated messages in
152-
;; separate threads.
153-
;; * Threads, so the language server can use >!! and so that we can use
154-
;; (>!! output-ch) to respect back pressure from clients that are slow
155-
;; to read.
156-
;; * Separate, so one can continue while the other is blocked.
157-
158-
;; (Jacob Maine): 100 is picked out of thin air. I have no idea how to
159-
;; estimate how big the buffer should be to avoid dropping messages. LSP
160-
;; communication tends to be very quiet, then very chatty, so it depends
161-
;; a lot on what the client and server are doing. I also don't know how
162-
;; many messages we could store without running into memory problems,
163-
;; since this is dependent on so many variables, not just the size of
164-
;; the JVM's memory, but also the size of the messages, which can be
165-
;; anywhere from a few bytes to megabytes.
166-
server-initiated-in-ch (thread-loop
167-
(async/sliding-buffer 100)
168-
(fn [response]
169-
(protocols.endpoint/receive-response server response)))
170-
client-initiated-in-ch (thread-loop
171-
(async/sliding-buffer 100)
172-
(fn [[message-type message]]
173-
(if (identical? :request message-type)
174-
(protocols.endpoint/receive-request server context message)
175-
(protocols.endpoint/receive-notification server context message))))]
176-
(async/go-loop []
177-
(if-let [message (async/<! input-ch)]
178-
(let [message-type (coercer/input-message-type message)]
179-
(case message-type
180-
(:parse-error :invalid-request)
181-
(protocols.endpoint/log server :error (format-error-code "Error reading message" message-type))
182-
(:response.result :response.error)
183-
(async/>! server-initiated-in-ch message)
184-
(:request :notification)
185-
(async/>! client-initiated-in-ch [message-type message]))
186-
(recur))
187-
(do
188-
(async/close! server-initiated-in-ch)
189-
(async/close! client-initiated-in-ch))))))
126+
(def input-buffer-size
127+
;; (Jacob Maine): This number is picked out of thin air. I have no idea how to
128+
;; estimate how big the buffer could or should be. LSP communication tends to
129+
;; be very quiet, then very chatty, so it depends a lot on what the client and
130+
;; server are doing. I also don't know how many messages we could store
131+
;; without running into memory problems, since this is dependent on so many
132+
;; variables, not just the size of the JVM's memory, but also the size of the
133+
;; messages, which can be anywhere from a few bytes to several megabytes.
134+
1024)
190135

191136
;; Expose endpoint methods to language servers
192137

@@ -222,7 +167,7 @@
222167
(async/put! trace-ch [:debug trace-body])))
223168

224169
(defrecord PendingReceivedRequest [result-promise cancelled?]
225-
promesa.protocols/ICancellable
170+
p.protocols/ICancellable
226171
(-cancel! [_]
227172
(p/cancel! result-promise)
228173
(reset! cancelled? true))
@@ -259,13 +204,89 @@
259204
protocols.endpoint/IEndpoint
260205
(start [this context]
261206
;; Start receiving messages.
262-
(let [pipeline (dispatch-input this context input-ch)]
263-
;; Wait to stop receiving messages.
207+
(let [;; The language server sometimes needs to stop processing inbound
208+
;; requests and notifications.
209+
210+
;; We do this automatically whenever we receive a notification, so
211+
;; that we are sure the language server processes didChange before
212+
;; moving on to other requests. But the language server can also
213+
;; decide to do it itself, even in a request, by processing everything
214+
;; synchronously instead of returning a future.
215+
216+
;; In a request (or even in a notification) the language server can
217+
;; send a request to the client and block waiting for a response.
218+
219+
;; It's possible -- even likely -- that the client will send other
220+
;; messages between the time that the server sent its request and the
221+
;; time that the client responds.
222+
223+
;; All inbound messages -- requests, notifications, and responses --
224+
;; come in on a single stream.
225+
226+
;; If the server blocks waiting for a response, we have to set aside
227+
;; the other inbound requests and notifications, so that we can get to
228+
;; the response. That is, while the server is blocking we cannot stop
229+
;; accepting input. Otherwise, the server will end up in a deadlock,
230+
;; where it's waiting to receive a response, but the response is
231+
;; waiting to be accepted.
232+
233+
;; To accomplish this we processes inbound requests and notifications
234+
;; separately from inbound responses. If the server starts blocking
235+
;; waiting for a response, we buffer the inbound requests and
236+
;; notifications until the server is prepared to process them.
237+
238+
;; If the buffer becomes full, we assume that the server isn't
239+
;; handling inbound requests and notifcations because it's waiting for
240+
;; a response. So, following our assumption, we reject the server
241+
;; requests so that the server will stop waiting for the response.
242+
;; It's possible that this won't work -- our assumption might have
243+
;; been wrong and the server might have stalled for some other reason.
244+
;; So after rejecting, we park trying to add the latest inbound
245+
;; request or notification to the buffer.
246+
247+
;; This ensures we don't drop any client messages, though we could
248+
;; stop reading them if the server keeps blocking. If we're lucky
249+
;; either the language server will unblock, or the client will decide
250+
;; to stop sending messages because it's failed to receive a server
251+
;; response (i.e., we will have managed to apply backpressure to the
252+
;; client). If we're unlucky, the server could keep blocking forever.
253+
;; In any case, this scenario -- where we stop reading messages -- is
254+
;; presumed to be both very rare and indicative of a problem that can
255+
;; be solved only in the client or the language server.
256+
client-initiated-in-ch (thread-loop
257+
input-buffer-size
258+
(fn [[message-type message]]
259+
(if (identical? :request message-type)
260+
(protocols.endpoint/receive-request this context message)
261+
(protocols.endpoint/receive-notification this context message))))
262+
reject-pending-sent-requests (fn [exception]
263+
(doseq [pending-request (vals @pending-sent-requests*)]
264+
(p/reject! (:p pending-request)
265+
exception)))
266+
pipeline (async/go-loop []
267+
(if-let [message (async/<! input-ch)]
268+
(let [message-type (coercer/input-message-type message)]
269+
(case message-type
270+
(:parse-error :invalid-request)
271+
(protocols.endpoint/log this :error (format-error-code "Error reading message" message-type))
272+
(:response.result :response.error)
273+
(protocols.endpoint/receive-response this message)
274+
(:request :notification)
275+
(when-not (async/offer! client-initiated-in-ch [message-type message])
276+
;; Buffers full. Fail any waiting pending requests and...
277+
(reject-pending-sent-requests
278+
(ex-info "Buffer of client messages exhausted." {}))
279+
;; ... try again, but park this time.
280+
(async/>! client-initiated-in-ch [message-type message])))
281+
(recur))
282+
(async/close! client-initiated-in-ch)))]
264283
(async/go
265-
;; When pipeline closes, it indicates input-ch has closed. We're done
266-
;; receiving.
284+
;; Wait to stop receiving messages.
267285
(async/<! pipeline)
268-
;; Do cleanup.
286+
;; The pipeline has closed, indicating input-ch has closed. We're done
287+
;; receiving. Do cleanup.
288+
289+
(reject-pending-sent-requests (ex-info "Server shutting down. Input is closed so no response is possible." {}))
269290

270291
;; The [docs](https://clojuredocs.org/clojure.core.async/close!) for
271292
;; `close!` say A) "The channel will no longer accept any puts", B)
@@ -310,7 +331,7 @@
310331
req (lsp.requests/request id method body)
311332
pending-request (pending-request id method now this)]
312333
(trace this trace/sending-request req now)
313-
;; Important: record request before sending it, so it is sure to be
334+
;; Important: record request before sending it, so it's sure to be
314335
;; available during receive-response.
315336
(swap! pending-sent-requests* assoc id pending-request)
316337
;; respect back pressure from clients that are slow to read; (go (>!)) will not suffice
@@ -330,7 +351,9 @@
330351
(if-let [{:keys [p started] :as req} (get pending-requests id)]
331352
(do
332353
(trace this trace/received-response req resp started now)
333-
(deliver p (if error resp result)))
354+
(if error
355+
(p/reject! p (ex-info "Received error response" resp))
356+
(p/resolve! p result)))
334357
(trace this trace/received-unmatched-response resp now)))
335358
(catch Throwable e
336359
(log-error-receiving this e resp))))

0 commit comments

Comments
 (0)