|
52 | 52 | upgrade(Req, Env, Handler, HandlerState) -> |
53 | 53 | upgrade(Req, Env, Handler, HandlerState, #{}). |
54 | 54 |
|
| 55 | +%% We create a proxy socket for HTTP/2 even if no proxy was used, |
| 56 | +%% and add a special field 'http_version' to indicate this is HTTP/2. |
| 57 | +upgrade(Req=#{version := 'HTTP/2', pid := Parent, peer := Peer, sock := Sock}, |
| 58 | + Env, Handler, HandlerState, Opts) -> |
| 59 | + %% Cowboy doesn't expose the socket when HTTP/2 is used. |
| 60 | + %% We take it directly from the connection's state. |
| 61 | + %% |
| 62 | + %% @todo Ideally we would not need the real socket for |
| 63 | + %% normal operations. But we currently need it for |
| 64 | + %% the heartbeat processes to do their job. In the |
| 65 | + %% future we should not rely on those processes |
| 66 | + %% and instead do the heartbeating directly in the |
| 67 | + %% Websocket handler. |
| 68 | + RealSocket = element(4,element(1,sys:get_state(Parent))), |
| 69 | + ProxyInfo = case Req of |
| 70 | + #{proxy_header := ProxyHeader} -> |
| 71 | + ProxyHeader#{http_version => 'HTTP/2'}; |
| 72 | + _ -> |
| 73 | + {SrcAddr, SrcPort} = Peer, |
| 74 | + {DestAddr, DestPort} = Sock, |
| 75 | + #{ |
| 76 | + http_version => 'HTTP/2', |
| 77 | + src_address => SrcAddr, |
| 78 | + src_port => SrcPort, |
| 79 | + dest_address => DestAddr, |
| 80 | + dest_port => DestPort |
| 81 | + } |
| 82 | + end, |
| 83 | + ProxySocket = {rabbit_proxy_socket, RealSocket, ProxyInfo}, |
| 84 | + cowboy_websocket:upgrade(Req, Env, Handler, HandlerState#state{socket = ProxySocket}, Opts); |
55 | 85 | upgrade(Req, Env, Handler, HandlerState, Opts) -> |
56 | 86 | cowboy_websocket:upgrade(Req, Env, Handler, HandlerState, Opts). |
57 | 87 |
|
| 88 | +%% This is only called for HTTP/1.1. |
58 | 89 | takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState}) -> |
59 | 90 | Sock = case HandlerState#state.socket of |
60 | 91 | undefined -> |
@@ -95,7 +126,7 @@ init(Req0, Opts) -> |
95 | 126 | socket = SockInfo, |
96 | 127 | peername = PeerAddr, |
97 | 128 | auth_hd = cowboy_req:header(<<"authorization">>, Req) |
98 | | - }, WsOpts}. |
| 129 | + }, WsOpts#{data_delivery => relay}}. |
99 | 130 |
|
100 | 131 | websocket_init(State) -> |
101 | 132 | process_flag(trap_exit, true), |
@@ -218,8 +249,23 @@ websocket_info({start_heartbeats, {SendTimeout, ReceiveTimeout}}, |
218 | 249 | Self = self(), |
219 | 250 | SendFun = fun () -> Self ! {send, <<$\n>>}, ok end, |
220 | 251 | ReceiveFun = fun() -> Self ! client_timeout end, |
221 | | - Heartbeat = rabbit_heartbeat:start(SupPid, Sock, SendTimeout, |
222 | | - SendFun, ReceiveTimeout, ReceiveFun), |
| 252 | + Heartbeat = case Sock of |
| 253 | + {rabbit_proxy_socket, RealSocket, #{http_version := 'HTTP/2'}} -> |
| 254 | + %% HTTP/2 Websocket may have multiple Websocket sessions |
| 255 | + %% on a single connection (this can happen for example |
| 256 | + %% when refreshing a page). As a result we need to attach |
| 257 | + %% the heartbeat processes to the session. We do this via |
| 258 | + %% a link for now. @todo In the future we will have a |
| 259 | + %% mechanism in Cowboy to attach them to the stream. |
| 260 | + {ok, Sender} = rabbit_heartbeat:start_heartbeat_sender(RealSocket, |
| 261 | + SendTimeout, SendFun, {heartbeat_sender, unknown}), |
| 262 | + {ok, Receiver} = rabbit_heartbeat:start_heartbeat_receiver(RealSocket, |
| 263 | + ReceiveTimeout, ReceiveFun, {heartbeat_receiver, unknown}), |
| 264 | + {Sender, Receiver}; |
| 265 | + _ -> |
| 266 | + rabbit_heartbeat:start(SupPid, Sock, SendTimeout, |
| 267 | + SendFun, ReceiveTimeout, ReceiveFun) |
| 268 | + end, |
223 | 269 | {ok, State#state{heartbeat = Heartbeat}}; |
224 | 270 | websocket_info(client_timeout, State) -> |
225 | 271 | stop(State); |
@@ -248,10 +294,17 @@ websocket_info(Msg, State) -> |
248 | 294 | [Msg]), |
249 | 295 | {ok, State}. |
250 | 296 |
|
251 | | -terminate(_Reason, _Req, #state{proc_state = undefined}) -> |
252 | | - ok; |
253 | | -terminate(_Reason, _Req, #state{proc_state = ProcState}) -> |
| 297 | +terminate(_Reason, _Req, State = #state{proc_state = undefined}) -> |
| 298 | + terminate_heartbeaters(State); |
| 299 | +terminate(_Reason, _Req, State = #state{proc_state = ProcState}) -> |
254 | 300 | _ = rabbit_stomp_processor:flush_and_die(ProcState), |
| 301 | + terminate_heartbeaters(State). |
| 302 | + |
| 303 | +terminate_heartbeaters(#state{heartbeat = {none, none}}) -> |
| 304 | + ok; |
| 305 | +terminate_heartbeaters(#state{heartbeat = {SPid, RPid}}) -> |
| 306 | + sys:terminate(SPid, shutdown), |
| 307 | + sys:terminate(RPid, shutdown), |
255 | 308 | ok. |
256 | 309 |
|
257 | 310 | %%---------------------------------------------------------------------------- |
|
0 commit comments