Skip to content

Commit d0f2838

Browse files
committed
Add websockets to Jetty adapter
Change internal AbstractHandler proxy class to ServletHandler instead in order to support websockets.
1 parent f681675 commit d0f2838

File tree

3 files changed

+197
-32
lines changed

3 files changed

+197
-32
lines changed

ring-jetty-adapter/project.clj

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,14 @@
77
:dependencies [[org.clojure/clojure "1.7.0"]
88
[ring/ring-core "1.10.0"]
99
[ring/ring-jakarta-servlet "1.10.0"]
10-
[org.eclipse.jetty/jetty-server "11.0.15"]]
10+
[org.eclipse.jetty/jetty-server "11.0.15"]
11+
[org.eclipse.jetty.websocket/websocket-jetty-server "11.0.15"]]
1112
:aliases {"test-all" ["with-profile" "default:+1.8:+1.9:+1.10:+1.11" "test"]}
1213
:profiles
1314
{:dev {:dependencies [[clj-http "3.12.3"]
14-
[less-awful-ssl "1.0.6"]]
15+
[less-awful-ssl "1.0.6"]
16+
[hato "0.9.0"]
17+
[org.slf4j/slf4j-simple "2.0.7"]]
1518
:jvm-opts ["-Dorg.eclipse.jetty.server.HttpChannelState.DEFAULT_TIMEOUT=500"]}
1619
:1.8 {:dependencies [[org.clojure/clojure "1.8.0"]]}
1720
:1.9 {:dependencies [[org.clojure/clojure "1.9.0"]]}

ring-jetty-adapter/src/ring/adapter/jetty.clj

Lines changed: 101 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
"A Ring adapter that uses the Jetty 9 embedded web server.
33
44
Adapters are used to convert Ring handlers into running web servers."
5-
(:require [ring.util.jakarta.servlet :as servlet])
6-
(:import [org.eclipse.jetty.server
5+
(:require [ring.util.jakarta.servlet :as servlet]
6+
[ring.websocket :as ws])
7+
(:import [java.nio ByteBuffer]
8+
[org.eclipse.jetty.server
79
Request
810
Server
911
ServerConnector
@@ -12,55 +14,126 @@
1214
HttpConnectionFactory
1315
SslConnectionFactory
1416
SecureRequestCustomizer]
17+
[org.eclipse.jetty.servlet ServletContextHandler ServletHandler]
1518
[org.eclipse.jetty.server.handler AbstractHandler]
1619
[org.eclipse.jetty.util BlockingArrayQueue]
1720
[org.eclipse.jetty.util.thread ThreadPool QueuedThreadPool]
1821
[org.eclipse.jetty.util.ssl SslContextFactory$Server KeyStoreScanner]
22+
[org.eclipse.jetty.websocket.server
23+
JettyWebSocketServerContainer
24+
JettyWebSocketCreator]
25+
[org.eclipse.jetty.websocket.api
26+
Session
27+
WebSocketConnectionListener
28+
WebSocketListener
29+
WebSocketPingPongListener]
30+
[org.eclipse.jetty.websocket.server.config
31+
JettyWebSocketServletContainerInitializer]
1932
[jakarta.servlet AsyncContext DispatcherType AsyncEvent AsyncListener]
2033
[jakarta.servlet.http HttpServletRequest HttpServletResponse]))
2134

22-
(defn- ^AbstractHandler proxy-handler [handler]
23-
(proxy [AbstractHandler] []
24-
(handle [_ ^Request base-request ^HttpServletRequest request response]
25-
(when-not (= (.getDispatcherType request) DispatcherType/ERROR)
26-
(let [request-map (servlet/build-request-map request)
27-
response-map (handler request-map)]
28-
(servlet/update-servlet-response response response-map)
29-
(.setHandled base-request true))))))
35+
(defn- websocket-socket [^Session session]
36+
(let [remote (.getRemote session)]
37+
(reify ws/Socket
38+
(-send [_ message]
39+
(if (string? message)
40+
(.sendString remote message)
41+
(.sendBytes remote message)))
42+
(-ping [_ data]
43+
(.sendPing remote data))
44+
(-pong [_ data]
45+
(.sendPong remote data))
46+
(-close [_ status reason]
47+
(.close session status reason)))))
48+
49+
(defn- websocket-listener [listener]
50+
(let [socket (volatile! nil)]
51+
(reify
52+
WebSocketConnectionListener
53+
(onWebSocketConnect [_ session]
54+
(vreset! socket (websocket-socket session))
55+
(ws/on-open listener @socket))
56+
(onWebSocketClose [_ status reason]
57+
(ws/on-close listener @socket status reason))
58+
(onWebSocketError [_ throwable]
59+
(ws/on-error listener @socket throwable))
60+
WebSocketListener
61+
(onWebSocketText [_ message]
62+
(ws/on-message listener @socket message))
63+
(onWebSocketBinary [_ payload offset length]
64+
(let [buffer (ByteBuffer/wrap payload offset length)]
65+
(ws/on-message listener @socket buffer)))
66+
WebSocketPingPongListener
67+
(onWebSocketPing [_ _])
68+
(onWebSocketPong [_ payload]
69+
(ws/on-pong listener @socket payload)))))
70+
71+
(defn- websocket-creator [{listener ::ws/listener}]
72+
(reify JettyWebSocketCreator
73+
(createWebSocket [_ _ _]
74+
(websocket-listener listener))))
75+
76+
(defn- upgrade-to-websocket [^HttpServletRequest request response response-map]
77+
(let [context (.getServletContext request)
78+
container (JettyWebSocketServerContainer/getContainer context)
79+
creator (websocket-creator response-map)]
80+
(.upgrade container creator request response)))
81+
82+
(defn- ^ServletHandler proxy-handler [handler]
83+
(proxy [ServletHandler] []
84+
(doHandle [_ ^Request base-request request response]
85+
(let [request-map (servlet/build-request-map request)
86+
response-map (handler request-map)]
87+
(try
88+
(if (ws/websocket-response? response-map)
89+
(upgrade-to-websocket request response response-map)
90+
(servlet/update-servlet-response response response-map))
91+
(finally
92+
(.setHandled base-request true)))))))
3093

3194
(defn- async-jetty-raise [^AsyncContext context ^HttpServletResponse response]
3295
(fn [^Throwable exception]
3396
(.sendError response 500 (.getMessage exception))
3497
(.complete context)))
3598

36-
(defn- async-jetty-respond [context response]
99+
(defn- async-jetty-respond [context request response]
37100
(fn [response-map]
38-
(servlet/update-servlet-response response context response-map)))
101+
(if (ws/websocket-response? response-map)
102+
(upgrade-to-websocket request response response-map)
103+
(servlet/update-servlet-response response context response-map))))
39104

40105
(defn- async-timeout-listener [request context response handler]
41106
(proxy [AsyncListener] []
42107
(onTimeout [^AsyncEvent _]
43108
(handler (servlet/build-request-map request)
44-
(async-jetty-respond context response)
109+
(async-jetty-respond context request response)
45110
(async-jetty-raise context response)))
46111
(onComplete [^AsyncEvent _])
47112
(onError [^AsyncEvent _])
48113
(onStartAsync [^AsyncEvent _])))
49114

50-
(defn- ^AbstractHandler async-proxy-handler [handler timeout timeout-handler]
51-
(proxy [AbstractHandler] []
52-
(handle [_ ^Request base-request ^HttpServletRequest request ^HttpServletResponse response]
115+
(defn- ^ServletHandler async-proxy-handler [handler timeout timeout-handler]
116+
(proxy [ServletHandler] []
117+
(doHandle [_ ^Request base-request request response]
53118
(let [^AsyncContext context (.startAsync request)]
54119
(.setTimeout context timeout)
55120
(when timeout-handler
56121
(.addListener
57122
context
58123
(async-timeout-listener request context response timeout-handler)))
59-
(handler
60-
(servlet/build-request-map request)
61-
(async-jetty-respond context response)
62-
(async-jetty-raise context response))
63-
(.setHandled base-request true)))))
124+
(try
125+
(handler
126+
(servlet/build-request-map request)
127+
(async-jetty-respond context request response)
128+
(async-jetty-raise context response))
129+
(finally
130+
(.setHandled base-request true)))))))
131+
132+
(defn- ^ServletContextHandler context-handler [proxy-handler]
133+
(doto (ServletContextHandler.)
134+
(.setServletHandler proxy-handler)
135+
(.setAllowNullPathInfo true)
136+
(JettyWebSocketServletContainerInitializer/configure nil)))
64137

65138
(defn- ^ServerConnector server-connector [^Server server & factories]
66139
(ServerConnector. server #^"[Lorg.eclipse.jetty.server.ConnectionFactory;" (into-array ConnectionFactory factories)))
@@ -213,13 +286,13 @@
213286
:response-header-size - the maximum size of a response header (default 8192)
214287
:send-server-version? - add Server header to HTTP response (default true)"
215288
[handler options]
216-
(let [server (create-server (dissoc options :configurator))]
217-
(if (:async? options)
218-
(.setHandler server
219-
(async-proxy-handler handler
220-
(:async-timeout options 0)
221-
(:async-timeout-handler options)))
222-
(.setHandler server (proxy-handler handler)))
289+
(let [server (create-server (dissoc options :configurator))
290+
proxy (if (:async? options)
291+
(async-proxy-handler handler
292+
(:async-timeout options 0)
293+
(:async-timeout-handler options))
294+
(proxy-handler handler))]
295+
(.setHandler server (context-handler proxy))
223296
(when-let [configurator (:configurator options)]
224297
(configurator server))
225298
(try

ring-jetty-adapter/test/ring/adapter/test/jetty.clj

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
[ring.adapter.jetty :refer :all]
44
[clj-http.client :as http]
55
[clojure.java.io :as io]
6+
[hato.websocket :as hato]
67
[less.awful.ssl :as less-ssl]
7-
[ring.core.protocols :as p])
8-
(:import [org.eclipse.jetty.util.thread QueuedThreadPool]
8+
[ring.core.protocols :as p]
9+
[ring.websocket :as ws])
10+
(:import [java.nio ByteBuffer]
11+
[org.eclipse.jetty.util.thread QueuedThreadPool]
912
[org.eclipse.jetty.util BlockingArrayQueue]
1013
[org.eclipse.jetty.server Server Request SslConnectionFactory]
1114
[org.eclipse.jetty.server.handler AbstractHandler]
@@ -628,3 +631,89 @@
628631
(try (http/get test-url)
629632
(catch Exception _ nil))
630633
(is (= 1 @call-count)))))
634+
635+
(def test-websocket-url (str "ws://localhost:" test-port))
636+
637+
(defn- buf->str [buffer]
638+
(let [bs (byte-array (.capacity buffer))]
639+
(.get buffer bs)
640+
(String. bs)))
641+
642+
(deftest run-jetty-websocket-test
643+
(testing "receiving websocket messages"
644+
(let [log (atom [])
645+
handler (constantly
646+
{::ws/listener
647+
(reify ws/Listener
648+
(on-open [_ _] (swap! log conj [:open]))
649+
(on-message [_ _ msg] (swap! log conj [:message msg]))
650+
(on-pong [_ _ data]
651+
(swap! log conj [:pong (buf->str data)]))
652+
(on-error [_ _ ex] (swap! log conj [:error ex]))
653+
(on-close [_ _ c r] (swap! log conj [:close c r])))})]
654+
(with-server handler {:port test-port}
655+
(let [ws @(hato/websocket test-websocket-url {})]
656+
@(hato/send! ws "foo")
657+
@(hato/pong! ws (ByteBuffer/wrap (.getBytes "bar")))
658+
@(hato/close! ws 1000 "Normal close")
659+
;; Short wait to prevent server from shutting down too abruptly
660+
(Thread/sleep 100)))
661+
(is (= [[:open]
662+
[:message "foo"]
663+
[:pong "bar"]
664+
[:close 1000 "Normal close"]]
665+
@log))))
666+
667+
(testing "sending websocket messages"
668+
(let [log (atom [])
669+
handler (constantly
670+
{::ws/listener
671+
(reify ws/Listener
672+
(on-open [_ sock]
673+
(ws/send sock "Hello")
674+
(ws/send sock (.getBytes "World")))
675+
(on-message [_ sock msg]
676+
(if (string? msg)
677+
(ws/send sock (str "t: " msg))
678+
(ws/send sock (str "b: " (buf->str msg)))))
679+
(on-pong [_ _ _])
680+
(on-error [_ _ _])
681+
(on-close [_ _ _ _]))})]
682+
(with-server handler {:port test-port}
683+
(let [ws @(hato/websocket test-websocket-url
684+
{:on-message
685+
(fn [_ msg _]
686+
(if (instance? ByteBuffer msg)
687+
(swap! log conj [:b (buf->str msg)])
688+
(swap! log conj [:t (str msg)])))})]
689+
@(hato/send! ws "one")
690+
@(hato/send! ws (ByteBuffer/wrap (.getBytes "two")))
691+
(Thread/sleep 100)
692+
@(hato/close! ws 1000 "Normal close")
693+
(Thread/sleep 100)))
694+
(is (= [[:t "Hello"]
695+
[:b "World"]
696+
[:t "t: one"]
697+
[:t "b: two"]]
698+
@log))))
699+
700+
(testing "ping pong"
701+
(let [log (atom [])
702+
handler (constantly
703+
{::ws/listener
704+
(reify ws/Listener
705+
(on-open [_ sock]
706+
(ws/ping sock (ByteBuffer/wrap (.getBytes "foo")))
707+
(swap! log conj [:ping "foo"]))
708+
(on-message [_ _ _])
709+
(on-pong [_ _ data]
710+
(swap! log conj [:pong (buf->str data)]))
711+
(on-error [_ _ _])
712+
(on-close [_ _ _ _]))})]
713+
(with-server handler {:port test-port}
714+
(let [ws @(hato/websocket test-websocket-url {})]
715+
(Thread/sleep 100)
716+
@(hato/close! ws)
717+
(Thread/sleep 100)))
718+
(is (= [[:ping "foo"] [:pong "foo"]]
719+
@log)))))

0 commit comments

Comments
 (0)