Skip to content

Commit edf3699

Browse files
kachayevztellman
authored andcommitted
Support manual ping/pong messages over websocket connections (#364)
* http/websocket-ping to support manual ping/pong messages over websocket connections * Extended websocket-ping interface to support custom deferred passed * ConcurrentLinkedQueue-based implementation for pending pings holder, thanks to @gfZeng * More test cases for websocket pings * Do not send PING when pending pings queue is not empty * A few fixes after latest merge * Rever styling changes * Remove :else branch from condp * Fix coercion function for websocket pings that was broken after the merge * Remove one another :else from condp * More reliable test in case we have delivery failure on websockets (better error message at least) * Acquire message when trying to convert to the content of binary websocket frame * Basic support for client-level heartbeats over websockets * Support heartbeats on the server * Documentation for client and server heartbeats
1 parent 0d09e31 commit edf3699

File tree

6 files changed

+354
-168
lines changed

6 files changed

+354
-168
lines changed

src/aleph/http.clj

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
[clojure.string :as str]
55
[manifold.deferred :as d]
66
[manifold.executor :as executor]
7+
[manifold.stream :as s]
78
[aleph.flow :as flow]
89
[aleph.http
910
[server :as server]
1011
[client :as client]
11-
[client-middleware :as middleware]]
12+
[client-middleware :as middleware]
13+
[core :as http-core]]
1214
[aleph.netty :as netty])
1315
(:import
1416
[io.aleph.dirigiste Pools]
@@ -202,7 +204,8 @@
202204
| `max-frame-payload` | maximum allowable frame payload length, in bytes, defaults to `65536`.
203205
| `max-frame-size` | maximum aggregate message size, in bytes, defaults to `1048576`.
204206
| `bootstrap-transform` | an optional function that takes an `io.netty.bootstrap.Bootstrap` object and modifies it.
205-
| `epoll?` | if `true`, uses `epoll` when available, defaults to `false`"
207+
| `epoll?` | if `true`, uses `epoll` when available, defaults to `false`
208+
| `heartbeats` | optional configuration to send Ping frames to the server periodically (if the connection is idle), configuration keys are `:send-after-idle` (in milliseconds), `:payload` (optional, empty frame by default) and `:timeout` (optional, to close the connection if Pong is not received after specified timeout)."
206209
([url]
207210
(websocket-client url nil))
208211
([url options]
@@ -220,11 +223,23 @@
220223
| `pipeline-transform` | an optional function that takes an `io.netty.channel.ChannelPipeline` object, which represents a connection, and modifies it.
221224
| `max-frame-payload` | maximum allowable frame payload length, in bytes, defaults to `65536`.
222225
| `max-frame-size` | maximum aggregate message size, in bytes, defaults to `1048576`.
223-
| `allow-extensions?` | if true, allows extensions to the WebSocket protocol, defaults to `false`"
226+
| `allow-extensions?` | if true, allows extensions to the WebSocket protocol, defaults to `false`.
227+
| `heartbeats` | optional configuration to send Ping frames to the client periodically (if the connection is idle), configuration keys are `:send-after-idle` (in milliseconds), `:payload` (optional, empty uses empty frame by default) and `:timeout` (optional, to close the connection if Pong is not received after specified timeout)."
224228
([req]
225229
(websocket-connection req nil))
226230
([req options]
227-
(server/initialize-websocket-handler req options)))
231+
(server/initialize-websocket-handler req options)))
232+
233+
(defn websocket-ping
234+
"Takes a websocket endpoint (either client or server) and returns a deferred that will
235+
yield true whenever the PONG comes back, or false if the connection is closed. Subsequent
236+
PINGs are supressed to avoid ambiguity in a way that the next PONG trigger all pending PINGs."
237+
([conn]
238+
(http-core/websocket-ping conn (d/deferred) nil))
239+
([conn d']
240+
(http-core/websocket-ping conn d' nil))
241+
([conn d' data]
242+
(http-core/websocket-ping conn d' data)))
228243

229244
(let [maybe-timeout! (fn [d timeout] (when d (d/timeout! d timeout)))]
230245
(defn request

src/aleph/http/client.clj

Lines changed: 126 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,12 @@
3535
ChannelPipeline]
3636
[io.netty.handler.codec
3737
TooLongFrameException]
38-
[io.netty.handler.stream
38+
[io.netty.handler.timeout
39+
IdleState
40+
IdleStateEvent]
41+
[io.netty.handler.stream
3942
ChunkedWriteHandler]
40-
[io.netty.handler.codec.http
43+
[io.netty.handler.codec.http
4144
FullHttpRequest]
4245
[io.netty.handler.codec.http.websocketx
4346
CloseWebSocketFrame
@@ -62,6 +65,8 @@
6265
[io.netty.handler.logging
6366
LoggingHandler
6467
LogLevel]
68+
[java.util.concurrent
69+
ConcurrentLinkedQueue]
6570
[java.util.concurrent.atomic
6671
AtomicInteger]
6772
[aleph.utils
@@ -603,110 +608,139 @@
603608
(doto (DefaultHttpHeaders.) (http/map->headers! headers))
604609
max-frame-payload))
605610

606-
(defn websocket-client-handler [raw-stream? uri sub-protocols extensions? headers max-frame-payload]
607-
(let [d (d/deferred)
608-
in (atom nil)
609-
desc (atom {})
610-
handshaker (websocket-handshaker uri sub-protocols extensions? headers max-frame-payload)]
611+
(defn websocket-client-handler
612+
([raw-stream?
613+
uri
614+
sub-protocols
615+
extensions?
616+
headers
617+
max-frame-payload]
618+
(websocket-client-handler raw-stream?
619+
uri
620+
sub-protocols
621+
extensions?
622+
headers
623+
max-frame-payload
624+
nil))
625+
([raw-stream?
626+
uri
627+
sub-protocols
628+
extensions?
629+
headers
630+
max-frame-payload
631+
heartbeats]
632+
(let [d (d/deferred)
633+
in (atom nil)
634+
desc (atom {})
635+
^ConcurrentLinkedQueue pending-pings (ConcurrentLinkedQueue.)
636+
handshaker (websocket-handshaker uri sub-protocols extensions? headers max-frame-payload)]
611637

612-
[d
638+
[d
613639

614-
(netty/channel-inbound-handler
640+
(netty/channel-inbound-handler
615641

616642
:exception-caught
617643
([_ ctx ex]
618-
(when-not (d/error! d ex)
619-
(log/warn ex "error in websocket client"))
620-
(s/close! @in)
621-
(netty/close ctx))
644+
(when-not (d/error! d ex)
645+
(log/warn ex "error in websocket client"))
646+
(s/close! @in)
647+
(netty/close ctx))
622648

623649
:channel-inactive
624650
([_ ctx]
625-
(when (realized? d)
626-
;; close only on success
627-
(d/chain' d s/close!))
628-
(.fireChannelInactive ctx))
651+
(when (realized? d)
652+
;; close only on success
653+
(d/chain' d s/close!))
654+
(http/resolve-pings! pending-pings false)
655+
(.fireChannelInactive ctx))
629656

630657
:channel-active
631658
([_ ctx]
632-
(let [ch (.channel ctx)]
633-
(reset! in (netty/buffered-source ch (constantly 1) 16))
634-
(.handshake handshaker ch))
635-
(.fireChannelActive ctx))
659+
(let [ch (.channel ctx)]
660+
(reset! in (netty/buffered-source ch (constantly 1) 16))
661+
(.handshake handshaker ch))
662+
(.fireChannelActive ctx))
663+
664+
:user-event-triggered
665+
([_ ctx evt]
666+
(if (and (instance? IdleStateEvent evt)
667+
(= IdleState/ALL_IDLE (.state ^IdleStateEvent evt)))
668+
(when (d/realized? d)
669+
(http/handle-heartbeat ctx @d heartbeats))
670+
(.fireUserEventTriggered ctx evt)))
636671

637672
:channel-read
638673
([_ ctx msg]
639-
(try
640-
(let [ch (.channel ctx)]
641-
(cond
642-
643-
(not (.isHandshakeComplete handshaker))
644-
(-> (netty/wrap-future (.processHandshake handshaker ch msg))
645-
(d/chain'
646-
(fn [_]
647-
(let [out (netty/sink ch false
648-
(fn [c]
649-
(if (instance? CharSequence c)
650-
(TextWebSocketFrame. (bs/to-string c))
651-
(BinaryWebSocketFrame. (netty/to-byte-buf ctx c))))
652-
(fn [] @desc))]
653-
654-
(d/success! d
655-
(doto
674+
(try
675+
(let [ch (.channel ctx)]
676+
(cond
677+
678+
(not (.isHandshakeComplete handshaker))
679+
(-> (netty/wrap-future (.processHandshake handshaker ch msg))
680+
(d/chain'
681+
(fn [_]
682+
(let [out (netty/sink ch false
683+
(http/websocket-message-coerce-fn ch pending-pings)
684+
(fn [] @desc))]
685+
686+
(s/on-closed out (fn [] (http/resolve-pings! pending-pings false)))
687+
688+
(d/success! d
689+
(doto
656690
(s/splice out @in)
657-
(reset-meta! {:aleph/channel ch})))
658-
659-
(s/on-drained @in
660-
#(when (.isOpen ch)
661-
(d/chain'
662-
(netty/wrap-future (.close handshaker ch (CloseWebSocketFrame.)))
663-
(fn [_] (netty/close ctx))))))))
664-
(d/catch'
665-
(fn [ex]
666-
;; handle handshake exception
667-
(d/error! d ex)
668-
(s/close! @in)
669-
(netty/close ctx))))
670-
671-
(instance? FullHttpResponse msg)
672-
(let [rsp ^FullHttpResponse msg]
673-
(throw
674-
(IllegalStateException.
675-
(str "unexpected HTTP response, status: "
691+
(reset-meta! {:aleph/channel ch})))
692+
693+
(s/on-drained @in
694+
#(when (.isOpen ch)
695+
(d/chain'
696+
(netty/wrap-future (.close handshaker ch (CloseWebSocketFrame.)))
697+
(fn [_] (netty/close ctx))))))))
698+
(d/catch'
699+
(fn [ex]
700+
;; handle handshake exception
701+
(d/error! d ex)
702+
(s/close! @in)
703+
(netty/close ctx))))
704+
705+
(instance? FullHttpResponse msg)
706+
(let [rsp ^FullHttpResponse msg]
707+
(throw
708+
(IllegalStateException.
709+
(str "unexpected HTTP response, status: "
676710
(.status rsp)
677711
", body: '"
678712
(bs/to-string (.content rsp))
679713
"'"))))
680714

681-
(instance? TextWebSocketFrame msg)
682-
(netty/put! ch @in (.text ^TextWebSocketFrame msg))
715+
(instance? TextWebSocketFrame msg)
716+
(netty/put! ch @in (.text ^TextWebSocketFrame msg))
683717

684-
(instance? BinaryWebSocketFrame msg)
685-
(let [frame (.content ^BinaryWebSocketFrame msg)]
686-
(netty/put! ch @in
687-
(if raw-stream?
688-
(netty/acquire frame)
689-
(netty/buf->array frame))))
718+
(instance? BinaryWebSocketFrame msg)
719+
(let [frame (.content ^BinaryWebSocketFrame msg)]
720+
(netty/put! ch @in
721+
(if raw-stream?
722+
(netty/acquire frame)
723+
(netty/buf->array frame))))
690724

691-
(instance? PongWebSocketFrame msg)
692-
nil
725+
(instance? PongWebSocketFrame msg)
726+
(http/resolve-pings! pending-pings true)
693727

694-
(instance? PingWebSocketFrame msg)
695-
(let [frame (.content ^PingWebSocketFrame msg)]
696-
(netty/write-and-flush ch (PongWebSocketFrame. (netty/acquire frame))))
728+
(instance? PingWebSocketFrame msg)
729+
(let [frame (.content ^PingWebSocketFrame msg)]
730+
(netty/write-and-flush ch (PongWebSocketFrame. (netty/acquire frame))))
697731

698-
(instance? CloseWebSocketFrame msg)
699-
(let [frame ^CloseWebSocketFrame msg]
700-
(when (realized? d)
701-
(swap! desc assoc
702-
:websocket-close-code (.statusCode frame)
703-
:websocket-close-msg (.reasonText frame)))
704-
(netty/close ctx))
732+
(instance? CloseWebSocketFrame msg)
733+
(let [frame ^CloseWebSocketFrame msg]
734+
(when (realized? d)
735+
(swap! desc assoc
736+
:websocket-close-code (.statusCode frame)
737+
:websocket-close-msg (.reasonText frame)))
738+
(netty/close ctx))
705739

706-
:else
707-
(.fireChannelRead ctx msg)))
708-
(finally
709-
(netty/release msg)))))]))
740+
:else
741+
(.fireChannelRead ctx msg)))
742+
(finally
743+
(netty/release msg)))))])))
710744

711745
(defn websocket-connection
712746
[uri
@@ -722,7 +756,8 @@
722756
extensions?
723757
max-frame-payload
724758
max-frame-size
725-
compression?]
759+
compression?
760+
heartbeats]
726761
:or {bootstrap-transform identity
727762
pipeline-transform identity
728763
raw-stream? false
@@ -736,13 +771,20 @@
736771
scheme (.getScheme uri)
737772
_ (assert (#{"ws" "wss"} scheme) "scheme must be one of 'ws' or 'wss'")
738773
ssl? (= "wss" scheme)
774+
heartbeats (when (some? heartbeats)
775+
(merge
776+
{:send-after-idle 3e4
777+
:payload nil
778+
:timeout nil}
779+
heartbeats))
739780
[s handler] (websocket-client-handler
740781
raw-stream?
741782
uri
742783
sub-protocols
743784
extensions?
744785
headers
745-
max-frame-payload)]
786+
max-frame-payload
787+
heartbeats)]
746788
(d/chain'
747789
(netty/create-client
748790
(fn [^ChannelPipeline pipeline]
@@ -754,6 +796,7 @@
754796
(.addLast ^ChannelPipeline %
755797
"websocket-deflater"
756798
WebSocketClientCompressionHandler/INSTANCE)))
799+
(http/attach-heartbeats-handler heartbeats)
757800
(.addLast "handler" ^ChannelHandler handler)
758801
pipeline-transform))
759802
(when ssl?

0 commit comments

Comments
 (0)