|
1 | 1 | (ns lambdaisland.funnel |
2 | 2 | (:gen-class) |
3 | | - (:require [clojure.core.async :as async] |
4 | | - [clojure.java.io :as io] |
5 | | - [clojure.pprint :as pprint] |
6 | | - [clojure.tools.cli :as cli] |
7 | | - [cognitect.transit :as transit] |
8 | | - [io.pedestal.log :as log] |
9 | | - [lambdaisland.funnel.version :as version]) |
10 | | - (:import (com.cognitect.transit DefaultReadHandler |
11 | | - WriteHandler) |
12 | | - (java.io ByteArrayInputStream |
13 | | - ByteArrayOutputStream |
14 | | - FileInputStream |
15 | | - FileOutputStream |
16 | | - PrintStream |
17 | | - IOException) |
18 | | - (java.net InetSocketAddress Socket) |
19 | | - (java.nio ByteBuffer) |
20 | | - (java.nio.file Files Path Paths) |
21 | | - (java.security KeyStore) |
22 | | - (java.util Comparator) |
23 | | - (javax.net.ssl SSLContext |
24 | | - KeyManagerFactory) |
25 | | - (lambdaisland.funnel Daemon) |
26 | | - (org.java_websocket WebSocket |
27 | | - WebSocketAdapter |
28 | | - WebSocketImpl) |
29 | | - (org.java_websocket.drafts Draft_6455) |
30 | | - (org.java_websocket.handshake Handshakedata) |
31 | | - (org.java_websocket.handshake ClientHandshake) |
32 | | - (org.java_websocket.server DefaultWebSocketServerFactory |
33 | | - DefaultSSLWebSocketServerFactory |
34 | | - WebSocketServer) |
35 | | - (sun.misc Signal))) |
| 3 | + (:require |
| 4 | + [clojure.core.async :as async] |
| 5 | + [clojure.edn :as edn] |
| 6 | + [clojure.java.io :as io] |
| 7 | + [clojure.pprint :as pprint] |
| 8 | + [clojure.tools.cli :as cli] |
| 9 | + [cognitect.transit :as transit] |
| 10 | + [charred.api :as charred] |
| 11 | + [lambdaisland.funnel.log :as log] |
| 12 | + [lambdaisland.funnel.version :as version]) |
| 13 | + (:import |
| 14 | + (com.cognitect.transit DefaultReadHandler |
| 15 | + WriteHandler) |
| 16 | + (java.io ByteArrayInputStream |
| 17 | + ByteArrayOutputStream |
| 18 | + FileInputStream |
| 19 | + FileOutputStream |
| 20 | + PrintStream |
| 21 | + IOException) |
| 22 | + (java.net InetSocketAddress Socket) |
| 23 | + (java.nio ByteBuffer) |
| 24 | + (java.nio.file Files Path Paths) |
| 25 | + (java.security KeyStore) |
| 26 | + (java.util Comparator) |
| 27 | + (javax.net.ssl SSLContext |
| 28 | + KeyManagerFactory) |
| 29 | + (lambdaisland.funnel Daemon) |
| 30 | + (org.java_websocket WebSocket |
| 31 | + WebSocketAdapter |
| 32 | + WebSocketImpl) |
| 33 | + (org.java_websocket.drafts Draft_6455) |
| 34 | + (org.java_websocket.handshake Handshakedata) |
| 35 | + (org.java_websocket.handshake ClientHandshake) |
| 36 | + (org.java_websocket.server DefaultWebSocketServerFactory |
| 37 | + DefaultSSLWebSocketServerFactory |
| 38 | + WebSocketServer) |
| 39 | + (sun.misc Signal))) |
36 | 40 |
|
37 | 41 | (set! *warn-on-reflection* true) |
38 | 42 |
|
|
71 | 75 | (when (and (vector? e) (= ::error (first e))) |
72 | 76 | (second e))) |
73 | 77 |
|
74 | | -(defn to-transit [value] |
| 78 | +(defmulti encode (fn [format value] format)) |
| 79 | +(defmulti decode (fn [format value] format)) |
| 80 | + |
| 81 | +(defmethod encode :transit [_ value] |
75 | 82 | (try |
76 | 83 | (let [out (ByteArrayOutputStream. 4096) |
77 | 84 | writer (transit/writer out :json {:handlers {TaggedValue tagged-write-handler}})] |
|
80 | 87 | (catch Exception e |
81 | 88 | [::error e]))) |
82 | 89 |
|
83 | | -(defn from-transit [^String transit] |
| 90 | +(defmethod decode :transit [_ ^String transit] |
84 | 91 | (try |
85 | 92 | (let [in (ByteArrayInputStream. (.getBytes transit)) |
86 | 93 | reader (transit/reader in :json {:default-handler tagged-read-handler})] |
87 | 94 | (transit/read reader)) |
88 | 95 | (catch Exception e |
89 | 96 | [::error e]))) |
90 | 97 |
|
| 98 | +(defmethod encode :edn [_ value] |
| 99 | + (pr-str value)) |
| 100 | + |
| 101 | +(defmethod decode :edn [_ ^String edn] |
| 102 | + (edn/read-string edn)) |
| 103 | + |
| 104 | +(defmethod encode :json [_ value] |
| 105 | + (charred/write-json-str value)) |
| 106 | + |
| 107 | +(defmethod decode :json [_ ^String json] |
| 108 | + (charred/read-json json :key-fn keyword)) |
| 109 | + |
91 | 110 | (defn match-selector? [whoami selector] |
92 | 111 | (cond |
93 | 112 | (true? selector) true |
|
113 | 132 | (.getAttachment conn)) |
114 | 133 |
|
115 | 134 | (defn handle-query [conn selector conns] |
116 | | - (let [msg (to-transit |
117 | | - {:funnel/clients |
118 | | - (map (comp :whoami val) |
119 | | - (filter |
120 | | - (fn [[c m]] |
121 | | - (and (match-selector? (:whoami m) selector) |
122 | | - (not= c conn))) |
123 | | - conns))})] |
124 | | - (if-let [e (maybe-error msg)] |
125 | | - (log/error :query-encoding-failed {:selector selector :conns (vals conns)} :exception e) |
126 | | - (async/>!! (outbox conn) msg)))) |
| 135 | + (let [msg {:funnel/clients |
| 136 | + (map (comp :whoami val) |
| 137 | + (filter |
| 138 | + (fn [[c m]] |
| 139 | + (and (match-selector? (:whoami m) selector) |
| 140 | + (not= c conn))) |
| 141 | + conns))}] |
| 142 | + (async/>!! (outbox conn) msg))) |
127 | 143 |
|
128 | 144 | (defn handle-message [state ^WebSocket conn raw-msg] |
129 | | - (let [msg (from-transit raw-msg) |
130 | | - inbox (:inbox (.getAttachment conn))] |
| 145 | + (let [msg (decode (get-in @state [conn :format]) raw-msg)] |
131 | 146 | (when-let [e (maybe-error msg)] |
132 | 147 | (log/warn :message-decoding-failed {:raw-msg raw-msg :desc "Raw message will be forwarded"} :exception e)) |
133 | 148 | (let [[msg broadcast] |
|
146 | 161 | (when-let [selector (:funnel/query msg)] |
147 | 162 | (handle-query conn selector @state)) |
148 | 163 |
|
149 | | - [(to-transit |
150 | | - (if-let [whomai (:whoami (get @state conn))] |
151 | | - (assoc msg :funnel/whoami whomai) |
152 | | - msg)) |
| 164 | + [(if-let [whomai (:whoami (get @state conn))] |
| 165 | + (assoc msg :funnel/whoami whomai) |
| 166 | + msg) |
153 | 167 | (:funnel/broadcast msg)]))] |
154 | 168 | (if-let [e (maybe-error msg)] |
155 | 169 | (log/error :message-encoding-failed {:msg msg} :exception e) |
|
162 | 176 |
|
163 | 177 | (defn handle-open [state ^WebSocket conn handshake] |
164 | 178 | (log/info :connection-opened {:remote-socket-address (.getRemoteSocketAddress conn)}) |
165 | | - (let [outbox (async/chan 8 #_(map #(doto % prn)))] |
| 179 | + (let [path (.getResourceDescriptor conn) |
| 180 | + format (case path |
| 181 | + "/?content-type=json" :json |
| 182 | + "/?content-type=edn" :edn |
| 183 | + "/?content-type=transit" :transit |
| 184 | + :transit) |
| 185 | + outbox (async/chan 8 (map (partial encode format)))] |
166 | 186 | (.setAttachment conn outbox) |
| 187 | + (swap! state assoc-in [conn :format] format) |
167 | 188 | (async/go-loop [] |
168 | 189 | (when-let [^String msg (async/<! outbox)] |
169 | 190 | (when (.isOpen conn) |
|
430 | 451 | (intern 'user 'foo 123) |
431 | 452 |
|
432 | 453 | (log/error :foo :bar :exception (Exception. "123")) |
| 454 | + |
| 455 | + (start-servers {:ws-port 2234 :wss-port 2235}) |
433 | 456 | ) |
0 commit comments