File tree Expand file tree Collapse file tree 3 files changed +43
-5
lines changed Expand file tree Collapse file tree 3 files changed +43
-5
lines changed Original file line number Diff line number Diff line change 28
28
(-pong [socket data])
29
29
(-close [socket status reason]))
30
30
31
+ (defprotocol AsyncSocket
32
+ (-send-async [socket message succeed fail]))
33
+
31
34
(defprotocol TextData
32
35
(->string [data]))
33
36
51
54
:else (throw (ex-info " message is not a valid text or binary data type"
52
55
{:message message}))))
53
56
54
- (defn send [socket message]
55
- (-send socket (encode-message message)))
57
+ (defn send
58
+ ([socket message]
59
+ (-send socket (encode-message message)))
60
+ ([socket message succeed fail]
61
+ (-send-async socket (encode-message message) succeed fail)))
56
62
57
63
(defn ping
58
64
([socket]
Original file line number Diff line number Diff line change 26
26
Session
27
27
WebSocketConnectionListener
28
28
WebSocketListener
29
- WebSocketPingPongListener]
29
+ WebSocketPingPongListener
30
+ WriteCallback]
30
31
[org.eclipse.jetty.websocket.server.config
31
32
JettyWebSocketServletContainerInitializer]
32
33
[jakarta.servlet AsyncContext DispatcherType AsyncEvent AsyncListener]
33
34
[jakarta.servlet.http HttpServletRequest HttpServletResponse]))
34
35
35
36
(defn- websocket-socket [^Session session]
36
37
(let [remote (.getRemote session)]
37
- (reify ws/Socket
38
+ (reify
39
+ ws/Socket
38
40
(-send [_ message]
39
41
(if (string? message)
40
42
(.sendString remote message)
44
46
(-pong [_ data]
45
47
(.sendPong remote data))
46
48
(-close [_ status reason]
47
- (.close session status reason)))))
49
+ (.close session status reason))
50
+ ws/AsyncSocket
51
+ (-send-async [_ message succeed fail]
52
+ (let [callback (reify WriteCallback
53
+ (writeSuccess [_] (succeed ))
54
+ (writeFailed [_ ex] (fail ex)))]
55
+ (if (string? message)
56
+ (.sendString remote message callback)
57
+ (.sendBytes remote message callback)))))))
48
58
49
59
(defn- websocket-listener [listener]
50
60
(let [socket (volatile! nil )]
Original file line number Diff line number Diff line change 718
718
(is (= [[:ping " foo" ] [:pong " foo" ]]
719
719
@log))))
720
720
721
+ (testing " sending websocket messages asynchronously"
722
+ (let [log (atom [])
723
+ handler (constantly
724
+ {::ws/listener
725
+ (reify ws/Listener
726
+ (on-open [_ sock]
727
+ (ws/send sock " Hello"
728
+ (fn [] (ws/send sock " World" (fn []) (fn [_])))
729
+ (fn [_])))
730
+ (on-message [_ _ _])
731
+ (on-pong [_ _ _])
732
+ (on-error [_ _ _])
733
+ (on-close [_ _ _ _]))})]
734
+ (with-server handler {:port test-port}
735
+ (let [ws @(hato/websocket test-websocket-url
736
+ {:on-message
737
+ (fn [_ msg _] (swap! log conj (str msg)))})]
738
+ (Thread/sleep 100 )
739
+ @(hato/close! ws)
740
+ (Thread/sleep 100 )))
741
+ (is (= [" Hello" " World" ] @log))))
742
+
721
743
(testing " using a map as a listener"
722
744
(let [listener {:on-open (fn [s] [:on-open s])
723
745
:on-message (fn [s m] [:on-message s m])
You can’t perform that action at this time.
0 commit comments