Skip to content

Commit 27d3aa7

Browse files
committed
Enable HTTP/2 Websocket in Web plugins
This uses Cowboy's new direct data delivery mechanism, which provides more performance. Cowboy is now pinned to 2.14.0 and Cowlib to 2.16.0. HTTP/2 Websocket is enabled by default. It can be disabled as needed by setting `#{enable_connect_protocol => false}` in the plugin's `cowboy_opts`. Web-STOMP did not have HTTP/2 enabled before, now it does. Web-MQTT already had HTTP/2 enabled.
1 parent ecc77ec commit 27d3aa7

File tree

5 files changed

+97
-15
lines changed

5 files changed

+97
-15
lines changed

deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ start_tcp_listener(TCPConf0, CowboyOpts) ->
124124

125125

126126
start_tls_listener([], _) -> ok;
127-
start_tls_listener(TLSConf0, CowboyOpts) ->
127+
start_tls_listener(TLSConf0, CowboyOpts0) ->
128128
_ = rabbit_networking:ensure_ssl(),
129129
{TLSConf, TLSIpStr, TLSPort} = get_tls_conf(TLSConf0),
130130
RanchRef = rabbit_networking:ranch_ref(TLSConf),
@@ -135,6 +135,10 @@ start_tls_listener(TLSConf0, CowboyOpts) ->
135135
num_acceptors => get_env(num_ssl_acceptors, 10),
136136
num_conns_sups => get_env(num_conns_sup, 1)
137137
},
138+
CowboyOpts = CowboyOpts0#{
139+
%% Enable HTTP/2 Websocket if not explicitly disabled.
140+
enable_connect_protocol => maps:get(enable_connect_protocol, CowboyOpts0, true)
141+
},
138142
case cowboy:start_tls(RanchRef, RanchTransportOpts, CowboyOpts) of
139143
{ok, _} ->
140144
ok;

deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,35 @@
5656
upgrade(Req, Env, Handler, HandlerState) ->
5757
upgrade(Req, Env, Handler, HandlerState, #{}).
5858

59+
%% We create a proxy socket for HTTP/2 even if no proxy was used,
60+
%% and add a special field 'http_version' to indicate this is HTTP/2.
61+
upgrade(Req=#{version := 'HTTP/2', pid := Parent, peer := Peer, sock := Sock},
62+
Env, Handler, HandlerState, Opts) ->
63+
%% Cowboy doesn't expose the socket when HTTP/2 is used.
64+
%% We take it directly from the connection's state.
65+
%%
66+
%% @todo Ideally we would not need the real socket for
67+
%% normal operations. But we currently need it for
68+
%% the heartbeat processes to do their job. In the
69+
%% future we should not rely on those processes
70+
%% and instead do the heartbeating directly in the
71+
%% Websocket handler.
72+
RealSocket = element(4,element(1,sys:get_state(Parent))),
73+
{SrcAddr, SrcPort} = Peer,
74+
{DestAddr, DestPort} = Sock,
75+
ProxyInfo = #{
76+
http_version => 'HTTP/2',
77+
src_address => SrcAddr,
78+
src_port => SrcPort,
79+
dest_address => DestAddr,
80+
dest_port => DestPort
81+
},
82+
ProxySocket = {rabbit_proxy_socket, RealSocket, ProxyInfo},
83+
cowboy_websocket:upgrade(Req, Env, Handler, HandlerState#state{socket = ProxySocket}, Opts);
5984
upgrade(Req, Env, Handler, HandlerState, Opts) ->
6085
cowboy_websocket:upgrade(Req, Env, Handler, HandlerState, Opts).
6186

87+
%% This is only called for HTTP/1.1.
6288
takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState}) ->
6389
Sock = case HandlerState#state.socket of
6490
undefined ->
@@ -84,7 +110,7 @@ init(Req, Opts) ->
84110
stats_timer = rabbit_event:init_stats_timer()},
85111
WsOpts0 = proplists:get_value(ws_opts, Opts, #{}),
86112
WsOpts = maps:merge(#{compress => true}, WsOpts0),
87-
{?MODULE, Req1, State, WsOpts}
113+
{?MODULE, Req1, State, WsOpts#{data_delivery => relay}}
88114
end
89115
end.
90116

deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,35 @@
5252
upgrade(Req, Env, Handler, HandlerState) ->
5353
upgrade(Req, Env, Handler, HandlerState, #{}).
5454

55+
%% We create a proxy socket for HTTP/2 even if no proxy was used,
56+
%% and add a special field 'http_version' to indicate this is HTTP/2.
57+
upgrade(Req=#{version := 'HTTP/2', pid := Parent, peer := Peer, sock := Sock},
58+
Env, Handler, HandlerState, Opts) ->
59+
%% Cowboy doesn't expose the socket when HTTP/2 is used.
60+
%% We take it directly from the connection's state.
61+
%%
62+
%% @todo Ideally we would not need the real socket for
63+
%% normal operations. But we currently need it for
64+
%% the heartbeat processes to do their job. In the
65+
%% future we should not rely on those processes
66+
%% and instead do the heartbeating directly in the
67+
%% Websocket handler.
68+
RealSocket = element(4,element(1,sys:get_state(Parent))),
69+
{SrcAddr, SrcPort} = Peer,
70+
{DestAddr, DestPort} = Sock,
71+
ProxyInfo = #{
72+
http_version => 'HTTP/2',
73+
src_address => SrcAddr,
74+
src_port => SrcPort,
75+
dest_address => DestAddr,
76+
dest_port => DestPort
77+
},
78+
ProxySocket = {rabbit_proxy_socket, RealSocket, ProxyInfo},
79+
cowboy_websocket:upgrade(Req, Env, Handler, HandlerState#state{socket = ProxySocket}, Opts);
5580
upgrade(Req, Env, Handler, HandlerState, Opts) ->
5681
cowboy_websocket:upgrade(Req, Env, Handler, HandlerState, Opts).
5782

83+
%% This is only called for HTTP/1.1.
5884
takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState}) ->
5985
Sock = case HandlerState#state.socket of
6086
undefined ->
@@ -95,7 +121,7 @@ init(Req0, Opts) ->
95121
socket = SockInfo,
96122
peername = PeerAddr,
97123
auth_hd = cowboy_req:header(<<"authorization">>, Req)
98-
}, WsOpts}.
124+
}, WsOpts#{data_delivery => relay}}.
99125

100126
websocket_init(State) ->
101127
process_flag(trap_exit, true),
@@ -218,8 +244,23 @@ websocket_info({start_heartbeats, {SendTimeout, ReceiveTimeout}},
218244
Self = self(),
219245
SendFun = fun () -> Self ! {send, <<$\n>>}, ok end,
220246
ReceiveFun = fun() -> Self ! client_timeout end,
221-
Heartbeat = rabbit_heartbeat:start(SupPid, Sock, SendTimeout,
222-
SendFun, ReceiveTimeout, ReceiveFun),
247+
Heartbeat = case Sock of
248+
{rabbit_proxy_socket, RealSocket, #{http_version := 'HTTP/2'}} ->
249+
%% HTTP/2 Websocket may have multiple Websocket sessions
250+
%% on a single connection (this can happen for example
251+
%% when refreshing a page). As a result we need to attach
252+
%% the heartbeat processes to the session. We do this via
253+
%% a link for now. @todo In the future we will have a
254+
%% mechanism in Cowboy to attach them to the stream.
255+
{ok, Sender} = rabbit_heartbeat:start_heartbeat_sender(RealSocket,
256+
SendTimeout, SendFun, {heartbeat_sender, unknown}),
257+
{ok, Receiver} = rabbit_heartbeat:start_heartbeat_receiver(RealSocket,
258+
ReceiveTimeout, ReceiveFun, {heartbeat_receiver, unknown}),
259+
{Sender, Receiver};
260+
_ ->
261+
rabbit_heartbeat:start(SupPid, Sock, SendTimeout,
262+
SendFun, ReceiveTimeout, ReceiveFun)
263+
end,
223264
{ok, State#state{heartbeat = Heartbeat}};
224265
websocket_info(client_timeout, State) ->
225266
stop(State);
@@ -248,10 +289,17 @@ websocket_info(Msg, State) ->
248289
[Msg]),
249290
{ok, State}.
250291

251-
terminate(_Reason, _Req, #state{proc_state = undefined}) ->
252-
ok;
253-
terminate(_Reason, _Req, #state{proc_state = ProcState}) ->
292+
terminate(_Reason, _Req, State = #state{proc_state = undefined}) ->
293+
terminate_heartbeaters(State);
294+
terminate(_Reason, _Req, State = #state{proc_state = ProcState}) ->
254295
_ = rabbit_stomp_processor:flush_and_die(ProcState),
296+
terminate_heartbeaters(State).
297+
298+
terminate_heartbeaters(#state{heartbeat = {none, none}}) ->
299+
ok;
300+
terminate_heartbeaters(#state{heartbeat = {SPid, RPid}}) ->
301+
sys:terminate(SPid, shutdown),
302+
sys:terminate(RPid, shutdown),
255303
ok.
256304

257305
%%----------------------------------------------------------------------------

deps/rabbitmq_web_stomp/src/rabbit_web_stomp_listener.erl

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,16 +136,20 @@ start_tls_listener(TLSConf0, CowboyOpts0, Routes) ->
136136
TLSPort = proplists:get_value(port, TLSConf0),
137137
TLSConf = maybe_parse_ip(TLSConf0),
138138
RanchTransportOpts = #{
139-
socket_opts => TLSConf,
139+
socket_opts => [{alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]}|TLSConf],
140140
connection_type => supervisor,
141141
max_connections => get_max_connections(),
142142
num_acceptors => NumSslAcceptors,
143143
num_conns_sups => 1
144144
},
145-
CowboyOpts = CowboyOpts0#{env => #{dispatch => Routes},
146-
middlewares => [cowboy_router,
147-
rabbit_web_stomp_middleware,
148-
cowboy_handler]},
145+
CowboyOpts = CowboyOpts0#{
146+
env => #{dispatch => Routes},
147+
middlewares => [cowboy_router,
148+
rabbit_web_stomp_middleware,
149+
cowboy_handler],
150+
%% Enable HTTP/2 Websocket if not explicitly disabled.
151+
enable_connect_protocol => maps:get(enable_connect_protocol, CowboyOpts0, true)
152+
},
149153
case ranch:start_listener(rabbit_networking:ranch_ref(TLSConf),
150154
ranch_ssl,
151155
RanchTransportOpts,

rabbitmq-components.mk

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ endif
4040
# all projects use the same versions. It avoids conflicts.
4141

4242
dep_accept = hex 0.3.5
43-
dep_cowboy = hex 2.13.0
44-
dep_cowlib = hex 2.14.0
43+
dep_cowboy = hex 2.14.0
44+
dep_cowlib = hex 2.16.0
4545
dep_credentials_obfuscation = hex 3.5.0
4646
dep_cuttlefish = hex 3.5.0
4747
dep_gen_batch_server = hex 0.8.8

0 commit comments

Comments
 (0)