|
| 1 | +(ns jsonrpc.prompt-change-events |
| 2 | + (:require |
| 3 | + [babashka.fs :as fs] |
| 4 | + [clojure.core.async :as async] |
| 5 | + [clojure.string :as string] |
| 6 | + docker |
| 7 | + [jsonrpc.db :as db] |
| 8 | + [jsonrpc.logger :as logger] |
| 9 | + [jsonrpc.producer :as producer] |
| 10 | + [jsonrpc.state :as state] |
| 11 | + [prompts.core :refer [registry]] |
| 12 | + repl |
| 13 | + shutdown)) |
| 14 | + |
| 15 | +(defn debounce-by |
| 16 | + "Debounce in channel with ms miliseconds distincting by by-fn." |
| 17 | + [in by-fn] |
| 18 | + (let [out (async/chan)] |
| 19 | + (async/go-loop [state {}] |
| 20 | + (let [{:keys [f] :as new-val} (async/<! in) |
| 21 | + v (by-fn new-val)] |
| 22 | + (when (not (= (get state f) v)) |
| 23 | + (async/>! out new-val)) |
| 24 | + (recur (assoc state f v)))) |
| 25 | + out)) |
| 26 | + |
| 27 | +(defn publish-change-event [] |
| 28 | + (doseq [producer (vals @state/producers)] |
| 29 | + (try |
| 30 | + (producer/publish-tool-list-changed producer {}) |
| 31 | + (producer/publish-prompt-list-changed producer {}) |
| 32 | + (catch Throwable _)))) |
| 33 | + |
| 34 | +(defn registry-updated [] |
| 35 | + (try |
| 36 | + (db/add-refs (db/registry-refs registry)) |
| 37 | + (publish-change-event) |
| 38 | + (catch Throwable t |
| 39 | + (logger/error t "unable to parse registry.yaml")))) |
| 40 | + |
| 41 | +(defn markdown-tool-updated [opts f] |
| 42 | + (try |
| 43 | + (db/update-prompt opts (string/replace f #"\.md" "") (slurp (fs/file (prompts.core/get-prompts-dir) f))) |
| 44 | + (publish-change-event) |
| 45 | + (catch Throwable t |
| 46 | + (logger/error t "unable to parse " f)))) |
| 47 | + |
| 48 | +(defn content [{:keys [f]}] |
| 49 | + (slurp (fs/file (prompts.core/get-prompts-dir) f))) |
| 50 | + |
| 51 | +(defn init-dynamic-prompt-watcher [opts registry-updated markdown-tool-updated] |
| 52 | + (let [change-events-channel (async/chan) |
| 53 | + debounced (debounce-by change-events-channel content)] |
| 54 | + ;; debounce the change event channel |
| 55 | + (async/go-loop |
| 56 | + [evt (async/<! debounced)] |
| 57 | + (case (:type evt) |
| 58 | + :registry (registry-updated) |
| 59 | + :markdown (markdown-tool-updated (:opts evt) (:f evt)) |
| 60 | + :unknown) |
| 61 | + (recur (async/<! debounced))) |
| 62 | + ;; watch filesystem |
| 63 | + (async/thread |
| 64 | + (let [{x :container} |
| 65 | + (docker/run-streaming-function-with-no-stdin |
| 66 | + {:image "vonwig/inotifywait:latest" |
| 67 | + :volumes ["docker-prompts:/prompts"] |
| 68 | + :command ["-e" "create" "-e" "modify" "-e" "delete" "-q" "-m" "/prompts"]} |
| 69 | + (fn [line] |
| 70 | + (let [[_dir _event f] (string/split line #"\s+")] |
| 71 | + (async/>!! |
| 72 | + change-events-channel |
| 73 | + (cond |
| 74 | + (= f "registry.yaml") |
| 75 | + {:opts opts :f f :type :registry} |
| 76 | + (string/ends-with? f ".md") |
| 77 | + {:opts opts :f f :type :markdown} |
| 78 | + :else |
| 79 | + {})))))] |
| 80 | + (shutdown/schedule-container-shutdown |
| 81 | + (fn [] |
| 82 | + (logger/info "inotifywait shutting down") |
| 83 | + (docker/kill-container x) |
| 84 | + (docker/delete x))))))) |
| 85 | + |
| 86 | +(comment |
| 87 | + (repl/setup-stdout-logger) |
| 88 | + (init-dynamic-prompt-watcher |
| 89 | + {} |
| 90 | + (fn [] (logger/info "registry updated")) |
| 91 | + (fn [& args] (logger/info "markdown updated " args)))) |
| 92 | + |
| 93 | + |
0 commit comments