|
1 |
| -(ns jsonrpc |
| 1 | +(ns jsonrpc |
2 | 2 | (:require
|
3 | 3 | [cheshire.core :as json]
|
4 |
| - [clojure.java.io :as io]) |
| 4 | + [clojure.core.async :as async] |
| 5 | + [clojure.java.io :as io] |
| 6 | + [clojure.string :as string]) |
5 | 7 | (:import
|
6 |
| - [java.io OutputStream])) |
| 8 | + [java.io |
| 9 | + EOFException |
| 10 | + IOException |
| 11 | + InputStream |
| 12 | + OutputStream])) |
7 | 13 |
|
8 | 14 | (def ^:private write-lock (Object.))
|
9 | 15 |
|
| 16 | +(defn ^:private read-n-bytes [^InputStream input content-length charset-s] |
| 17 | + (let [buffer (byte-array content-length)] |
| 18 | + (loop [total-read 0] |
| 19 | + (when (< total-read content-length) |
| 20 | + (let [new-read (.read input buffer total-read (- content-length total-read))] |
| 21 | + (when (< new-read 0) |
| 22 | + ;; TODO: return nil instead? |
| 23 | + (throw (EOFException.))) |
| 24 | + (recur (+ total-read new-read))))) |
| 25 | + (String. ^bytes buffer ^String charset-s))) |
| 26 | + |
| 27 | +(defn ^:private parse-header [line headers] |
| 28 | + (let [[h v] (string/split line #":\s*" 2)] |
| 29 | + (assoc headers h v))) |
| 30 | + |
| 31 | +(defn ^:private parse-charset [content-type] |
| 32 | + (or (when content-type |
| 33 | + (when-let [[_ charset] (re-find #"(?i)charset=(.*)$" content-type)] |
| 34 | + (when (not= "utf8" charset) |
| 35 | + charset))) |
| 36 | + "utf-8")) |
| 37 | + |
| 38 | +(defn ^:private read-message [input headers keyword-function] |
| 39 | + (try |
| 40 | + (let [content-length (Long/valueOf ^String (get headers "Content-Length")) |
| 41 | + charset-s (parse-charset (get headers "Content-Type")) |
| 42 | + content (read-n-bytes input content-length charset-s) |
| 43 | + m (json/parse-string content keyword-function)] |
| 44 | + ;; even if the params should not be transformed to keywords, |
| 45 | + ;; the top-level keywords still must be transformed |
| 46 | + (cond-> m |
| 47 | + (get m "id") (assoc :id (get m "id")) |
| 48 | + (get m "jsonrpc") (assoc :jsonrpc (get m "jsonrpc")) |
| 49 | + (get m "method") (assoc :method (get m "method")) |
| 50 | + (get m "params") (assoc :params (get m "params")) |
| 51 | + (get m "error") (assoc :error (get m "error")) |
| 52 | + (get m "result") (assoc :result (get m "result")))) |
| 53 | + (catch Exception _ |
| 54 | + :parse-error))) |
| 55 | + |
| 56 | +(defn ^:private read-header-line |
| 57 | + "Reads a line of input. Blocks if there are no messages on the input." |
| 58 | + [^InputStream input] |
| 59 | + (try |
| 60 | + (let [s (java.lang.StringBuilder.)] |
| 61 | + (loop [] |
| 62 | + (let [b (.read input)] ;; blocks, presumably waiting for next message |
| 63 | + (case b |
| 64 | + -1 ::eof ;; end of stream |
| 65 | + #_lf 10 (str s) ;; finished reading line |
| 66 | + #_cr 13 (recur) ;; ignore carriage returns |
| 67 | + (do (.append s (char b)) ;; byte == char because header is in US-ASCII |
| 68 | + (recur)))))) |
| 69 | + (catch IOException _e |
| 70 | + ::eof))) |
| 71 | + |
| 72 | +(defn input-stream->input-chan [input {:keys [close? keyword-function] |
| 73 | + :or {close? true, keyword-function keyword}}] |
| 74 | + (let [input (io/input-stream input) |
| 75 | + messages (async/chan 1)] |
| 76 | + (async/thread |
| 77 | + (loop [headers {}] |
| 78 | + (let [line (read-header-line input)] |
| 79 | + (cond |
| 80 | + ;; input closed; also close channel |
| 81 | + (= line ::eof) (async/close! messages) |
| 82 | + ;; a blank line after the headers indicates start of message |
| 83 | + (string/blank? line) (if (async/>!! messages (read-message input headers keyword-function)) |
| 84 | + ;; wait for next message |
| 85 | + (recur {}) |
| 86 | + ;; messages closed |
| 87 | + (when close? (.close input))) |
| 88 | + :else (recur (parse-header line headers)))))) |
| 89 | + messages)) |
| 90 | + |
10 | 91 | (defn ^:private write-message [^OutputStream output msg]
|
11 | 92 | (let [content (json/generate-string msg)
|
12 | 93 | content-bytes (.getBytes content "utf-8")]
|
|
28 | 109 | ;; prompts({:messages [{:role "", :content ""}]})
|
29 | 110 | ;; functions("") - meant to be updated in place
|
30 | 111 | ;; functions-done("")
|
31 |
| -(defn -notify [{:keys [debug]} method params] |
32 |
| - (case method |
33 |
| - :message (write-message (io/output-stream System/out) (notification method params)) |
34 |
| - :prompts (write-message (io/output-stream System/out) (notification method params)) |
35 |
| - :functions (write-message (io/output-stream System/out) (notification method params)) |
36 |
| - :functions-done (write-message (io/output-stream System/out) (notification method params)))) |
| 112 | +;; error({:content ""}) |
| 113 | +(defn -notify [{:keys [_debug]} method params] |
| 114 | + (write-message (io/output-stream System/out) (notification method params))) |
37 | 115 |
|
38 | 116 | #_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
|
39 | 117 | (defn -println [{:keys [debug]} method params]
|
40 | 118 | (case method
|
41 |
| - :message (cond |
| 119 | + :message (cond |
42 | 120 | (:content params) (do (print (:content params)) (flush))
|
43 | 121 | (and debug (:debug params)) (do (println "### DEBUG\n") (println (:debug params))))
|
44 | 122 | :functions (do (print ".") (flush))
|
45 | 123 | :functions-done (println params)
|
46 |
| - :prompts nil)) |
| 124 | + :error (binding [*out* *err*] |
| 125 | + (println (:content params))) |
| 126 | + :prompts nil |
| 127 | + (binding [*out* *err*] (println (format "%s\n%s\n" method params))))) |
47 | 128 |
|
48 | 129 | (def ^:dynamic notify -notify)
|
49 | 130 |
|
|
0 commit comments