|
112 | 112 | log-title (format-error-code "Error receiving message" :internal-error)]
|
113 | 113 | (protocols.endpoint/log server :error e (str log-title "\n" message-details))))
|
114 | 114 |
|
115 |
| -(defn ^:private spawn-receipt-thread [buf-or-n f] |
116 |
| - (let [receipt-ch (async/chan buf-or-n)] |
| 115 | +(defn thread-loop [buf-or-n f] |
| 116 | + (let [ch (async/chan buf-or-n)] |
117 | 117 | (async/thread
|
118 | 118 | (discarding-stdout
|
119 | 119 | (loop []
|
120 |
| - (when-let [[message-type message] (async/<!! receipt-ch)] |
121 |
| - (f message-type message) |
| 120 | + (when-let [arg (async/<!! ch)] |
| 121 | + (f arg) |
122 | 122 | (recur)))))
|
123 |
| - receipt-ch)) |
| 123 | + ch)) |
124 | 124 |
|
125 |
| -(defn ^:private run-pipeline |
126 |
| - "Forwards messages received on the input-ch to the language server, for |
127 |
| - further processing." |
| 125 | +(defn ^:private dispatch-input |
| 126 | + "Dispatches messages received on the input-ch based on message type. Returns a |
| 127 | + channel which will close after the input-ch is closed." |
128 | 128 | [server context input-ch]
|
129 | 129 | (let [;; In order to process some requests and (all) notifications in series,
|
130 | 130 | ;; the language server sometimes needs to block client-initiated input.
|
|
154 | 154 | ;; (>!! output-ch) to respect back pressure from clients that are slow
|
155 | 155 | ;; to read.
|
156 | 156 | ;; * Separate, so one can continue while the other is blocked.
|
157 |
| - server-initiated-in-ch (spawn-receipt-thread |
| 157 | + |
| 158 | + ;; (Jacob Maine): 100 is picked out of thin air. I have no idea how to |
| 159 | + ;; estimate how big the buffer should be to avoid dropping messages. LSP |
| 160 | + ;; communication tends to be very quiet, then very chatty, so it depends |
| 161 | + ;; a lot on what the client and server are doing. I also don't know how |
| 162 | + ;; many messages we could store without running into memory problems, |
| 163 | + ;; since this is dependent on so many variables, not just the size of |
| 164 | + ;; the JVM's memory, but also the size of the messages, which can be |
| 165 | + ;; anywhere from a few bytes to megabytes. |
| 166 | + server-initiated-in-ch (thread-loop |
158 | 167 | (async/sliding-buffer 100)
|
159 |
| - (fn [_ message] |
160 |
| - (try |
161 |
| - (protocols.endpoint/receive-response server message) |
162 |
| - (catch Throwable e |
163 |
| - (log-error-receiving server e message))))) |
164 |
| - client-initiated-in-ch (spawn-receipt-thread |
| 168 | + (fn [response] |
| 169 | + (protocols.endpoint/receive-response server response))) |
| 170 | + client-initiated-in-ch (thread-loop |
165 | 171 | (async/sliding-buffer 100)
|
166 |
| - (fn [message-type message] |
| 172 | + (fn [[message-type message]] |
167 | 173 | (if (identical? :request message-type)
|
168 |
| - ;; receive-request catches its own exceptions |
169 | 174 | (protocols.endpoint/receive-request server context message)
|
170 |
| - (try |
171 |
| - (protocols.endpoint/receive-notification server context message) |
172 |
| - (catch Throwable e |
173 |
| - (log-error-receiving server e message))))))] |
| 175 | + (protocols.endpoint/receive-notification server context message))))] |
174 | 176 | (async/go-loop []
|
175 | 177 | (if-let [message (async/<! input-ch)]
|
176 | 178 | (let [message-type (coercer/input-message-type message)]
|
177 | 179 | (case message-type
|
178 | 180 | (:parse-error :invalid-request)
|
179 | 181 | (protocols.endpoint/log server :error (format-error-code "Error reading message" message-type))
|
180 | 182 | (:response.result :response.error)
|
181 |
| - (async/>! server-initiated-in-ch [:response message]) |
| 183 | + (async/>! server-initiated-in-ch message) |
182 | 184 | (:request :notification)
|
183 | 185 | (async/>! client-initiated-in-ch [message-type message]))
|
184 | 186 | (recur))
|
|
257 | 259 | protocols.endpoint/IEndpoint
|
258 | 260 | (start [this context]
|
259 | 261 | ;; Start receiving messages.
|
260 |
| - (let [pipeline (run-pipeline this context input-ch)] |
| 262 | + (let [pipeline (dispatch-input this context input-ch)] |
261 | 263 | ;; Wait to stop receiving messages.
|
262 | 264 | (async/go
|
263 | 265 | ;; When pipeline closes, it indicates input-ch has closed. We're done
|
264 | 266 | ;; receiving.
|
265 | 267 | (async/<! pipeline)
|
266 | 268 | ;; Do cleanup.
|
267 |
| - ;; TODO: do we really know that we've finished putting to output-ch? |
| 269 | + |
| 270 | + ;; The [docs](https://clojuredocs.org/clojure.core.async/close!) for |
| 271 | + ;; `close!` say A) "The channel will no longer accept any puts", B) |
| 272 | + ;; "Data in the channel remains available for taking", and C) "Logically |
| 273 | + ;; closing happens after all puts have been delivered." |
| 274 | + |
| 275 | + ;; At this point the input-ch has been closed, which means any messages |
| 276 | + ;; that were read before the channel was closed have been put on the |
| 277 | + ;; channel (C). However, the takes off of it, the takes which then |
| 278 | + ;; forward the messages to the language server, may or may not have |
| 279 | + ;; happened (B). And even if the language server has received some |
| 280 | + ;; messages, if it responds after this line closes the output-ch, the |
| 281 | + ;; responses will be dropped (A). |
| 282 | + |
| 283 | + ;; All that to say, it's possible for the lsp4clj server to drop the |
| 284 | + ;; language server's final few responses. |
| 285 | + |
| 286 | + ;; It doesn't really matter though, because the users of lsp4clj |
| 287 | + ;; typically don't call `shutdown` on the lsp4clj server until they've |
| 288 | + ;; received the `exit` notification, which is the client indicating it |
| 289 | + ;; no longer expects any responses anyway. |
268 | 290 | (async/close! output-ch)
|
269 | 291 | (async/close! log-ch)
|
270 | 292 | (some-> trace-ch async/close!)
|
|
302 | 324 | (async/>!! output-ch notif)
|
303 | 325 | nil))
|
304 | 326 | (receive-response [this {:keys [id error result] :as resp}]
|
305 |
| - (let [now (.instant clock) |
306 |
| - [pending-requests _] (swap-vals! pending-sent-requests* dissoc id)] |
307 |
| - (if-let [{:keys [p started] :as req} (get pending-requests id)] |
308 |
| - (do |
309 |
| - (trace this trace/received-response req resp started now) |
310 |
| - (deliver p (if error resp result))) |
311 |
| - (trace this trace/received-unmatched-response resp now)))) |
| 327 | + (try |
| 328 | + (let [now (.instant clock) |
| 329 | + [pending-requests _] (swap-vals! pending-sent-requests* dissoc id)] |
| 330 | + (if-let [{:keys [p started] :as req} (get pending-requests id)] |
| 331 | + (do |
| 332 | + (trace this trace/received-response req resp started now) |
| 333 | + (deliver p (if error resp result))) |
| 334 | + (trace this trace/received-unmatched-response resp now))) |
| 335 | + (catch Throwable e |
| 336 | + (log-error-receiving this e resp)))) |
312 | 337 | (receive-request [this context {:keys [id method params] :as req}]
|
313 | 338 | (let [started (.instant clock)
|
314 | 339 | resp (lsp.responses/response id)]
|
|
345 | 370 | (log-error-receiving this e req)
|
346 | 371 | (async/>!! output-ch (internal-error-response resp req))))))
|
347 | 372 | (receive-notification [this context {:keys [method params] :as notif}]
|
348 |
| - (let [now (.instant clock)] |
349 |
| - (trace this trace/received-notification notif now) |
350 |
| - (if (= method "$/cancelRequest") |
351 |
| - (if-let [pending-req (get @pending-received-requests* (:id params))] |
352 |
| - (p/cancel! pending-req) |
353 |
| - (trace this trace/received-unmatched-cancellation-notification notif now)) |
354 |
| - (let [result (receive-notification method context params)] |
355 |
| - (when (identical? ::method-not-found result) |
356 |
| - (protocols.endpoint/log this :warn "received unexpected notification" method))))))) |
| 373 | + (try |
| 374 | + (let [now (.instant clock)] |
| 375 | + (trace this trace/received-notification notif now) |
| 376 | + (if (= method "$/cancelRequest") |
| 377 | + (if-let [pending-req (get @pending-received-requests* (:id params))] |
| 378 | + (p/cancel! pending-req) |
| 379 | + (trace this trace/received-unmatched-cancellation-notification notif now)) |
| 380 | + (let [result (receive-notification method context params)] |
| 381 | + (when (identical? ::method-not-found result) |
| 382 | + (protocols.endpoint/log this :warn "received unexpected notification" method))))) |
| 383 | + (catch Throwable e |
| 384 | + (log-error-receiving this e notif))))) |
357 | 385 |
|
358 | 386 | (defn set-trace-level [server trace-level]
|
359 | 387 | (update server :tracer* reset! (trace/tracer-for-level trace-level)))
|
|
0 commit comments