Skip to content

Commit ca25a58

Browse files
Merge pull request rabbitmq#14500 from rabbitmq/loic-cowboy-data_delivery-relay
Enable HTTP/2 Websocket in Web plugins by default
2 parents 39b1a51 + e50bf70 commit ca25a58

File tree

10 files changed

+196
-22
lines changed

10 files changed

+196
-22
lines changed

deps/rabbitmq_web_mqtt/Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@ export BUILD_WITHOUT_QUIC
1919

2020
LOCAL_DEPS = ssl
2121
DEPS = rabbit cowboy rabbitmq_mqtt
22-
TEST_DEPS = emqtt rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management rabbitmq_stomp rabbitmq_consistent_hash_exchange
22+
TEST_DEPS = gun emqtt rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management rabbitmq_stomp rabbitmq_consistent_hash_exchange
2323

2424
PLT_APPS += rabbitmq_cli elixir cowlib ssl
2525

2626
# FIXME: Add Ranch as a BUILD_DEPS to be sure the correct version is picked.
2727
# See rabbitmq-components.mk.
2828
BUILD_DEPS += ranch
2929

30+
dep_gun = hex 2.2.0
3031
dep_emqtt = git https://github.com/emqx/emqtt.git 1.14.6
3132

3233
DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk

deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ start_tcp_listener(TCPConf0, CowboyOpts) ->
124124

125125

126126
start_tls_listener([], _) -> ok;
127-
start_tls_listener(TLSConf0, CowboyOpts) ->
127+
start_tls_listener(TLSConf0, CowboyOpts0) ->
128128
_ = rabbit_networking:ensure_ssl(),
129129
{TLSConf, TLSIpStr, TLSPort} = get_tls_conf(TLSConf0),
130130
RanchRef = rabbit_networking:ranch_ref(TLSConf),
@@ -135,6 +135,10 @@ start_tls_listener(TLSConf0, CowboyOpts) ->
135135
num_acceptors => get_env(num_ssl_acceptors, 10),
136136
num_conns_sups => get_env(num_conns_sup, 1)
137137
},
138+
CowboyOpts = CowboyOpts0#{
139+
%% Enable HTTP/2 Websocket if not explicitly disabled.
140+
enable_connect_protocol => maps:get(enable_connect_protocol, CowboyOpts0, true)
141+
},
138142
case cowboy:start_tls(RanchRef, RanchTransportOpts, CowboyOpts) of
139143
{ok, _} ->
140144
ok;

deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,40 @@
5656
upgrade(Req, Env, Handler, HandlerState) ->
5757
upgrade(Req, Env, Handler, HandlerState, #{}).
5858

59+
%% We create a proxy socket for HTTP/2 even if no proxy was used,
60+
%% and add a special field 'http_version' to indicate this is HTTP/2.
61+
upgrade(Req=#{version := 'HTTP/2', pid := Parent, peer := Peer, sock := Sock},
62+
Env, Handler, HandlerState, Opts) ->
63+
%% Cowboy doesn't expose the socket when HTTP/2 is used.
64+
%% We take it directly from the connection's state.
65+
%%
66+
%% @todo Ideally we would not need the real socket for
67+
%% normal operations. But we currently need it for
68+
%% the heartbeat processes to do their job. In the
69+
%% future we should not rely on those processes
70+
%% and instead do the heartbeating directly in the
71+
%% Websocket handler.
72+
RealSocket = element(4,element(1,sys:get_state(Parent))),
73+
ProxyInfo = case Req of
74+
#{proxy_header := ProxyHeader} ->
75+
ProxyHeader#{http_version => 'HTTP/2'};
76+
_ ->
77+
{SrcAddr, SrcPort} = Peer,
78+
{DestAddr, DestPort} = Sock,
79+
#{
80+
http_version => 'HTTP/2',
81+
src_address => SrcAddr,
82+
src_port => SrcPort,
83+
dest_address => DestAddr,
84+
dest_port => DestPort
85+
}
86+
end,
87+
ProxySocket = {rabbit_proxy_socket, RealSocket, ProxyInfo},
88+
cowboy_websocket:upgrade(Req, Env, Handler, HandlerState#state{socket = ProxySocket}, Opts);
5989
upgrade(Req, Env, Handler, HandlerState, Opts) ->
6090
cowboy_websocket:upgrade(Req, Env, Handler, HandlerState, Opts).
6191

92+
%% This is only called for HTTP/1.1.
6293
takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState}) ->
6394
Sock = case HandlerState#state.socket of
6495
undefined ->
@@ -84,7 +115,7 @@ init(Req, Opts) ->
84115
stats_timer = rabbit_event:init_stats_timer()},
85116
WsOpts0 = proplists:get_value(ws_opts, Opts, #{}),
86117
WsOpts = maps:merge(#{compress => true}, WsOpts0),
87-
{?MODULE, Req1, State, WsOpts}
118+
{?MODULE, Req1, State, WsOpts#{data_delivery => relay}}
88119
end
89120
end.
90121

deps/rabbitmq_web_mqtt/test/web_mqtt_system_SUITE.erl

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ groups() ->
2424
,unacceptable_data_type
2525
,handle_invalid_packets
2626
,duplicate_connect
27+
,wss_http2
2728
]}
2829
].
2930

@@ -36,7 +37,25 @@ init_per_suite(Config) ->
3637
{rmq_nodename_suffix, ?MODULE},
3738
{protocol, "ws"}
3839
]),
39-
rabbit_ct_helpers:run_setup_steps(Config1,
40+
Config2 = rabbit_ct_helpers:run_setup_steps(Config1),
41+
{rmq_certsdir, CertsDir} = proplists:lookup(rmq_certsdir, Config2),
42+
Config3 = rabbit_ct_helpers:merge_app_env(
43+
Config2,
44+
{rabbitmq_web_mqtt,
45+
[{ssl_config,
46+
[{cacertfile, filename:join([CertsDir, "testca", "cacert.pem"])},
47+
{certfile, filename:join([CertsDir, "server", "cert.pem"])},
48+
{keyfile, filename:join([CertsDir, "server", "key.pem"])},
49+
%% We only want to ensure HTTP/2 Websocket is working.
50+
{fail_if_no_peer_cert, false},
51+
{versions, ['tlsv1.3']},
52+
%% We hard code this port number here because it will be computed later by
53+
%% rabbit_ct_broker_helpers:init_tcp_port_numbers/3 when we start the broker.
54+
%% (The alternative is to first start the broker, stop the rabbitmq_web_amqp app,
55+
%% configure tls_config, and then start the app again.)
56+
{port, 21010}
57+
]}]}),
58+
rabbit_ct_helpers:run_setup_steps(Config3,
4059
rabbit_ct_broker_helpers:setup_steps() ++
4160
rabbit_ct_client_helpers:setup_steps()).
4261

@@ -114,6 +133,26 @@ duplicate_connect(Config) ->
114133
after 500 -> ct:fail("expected web socket to exit")
115134
end.
116135

136+
wss_http2(Config) ->
137+
{ok, _} = application:ensure_all_started(gun),
138+
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt_tls),
139+
{ok, ConnPid} = gun:open("localhost", Port, #{
140+
transport => tls,
141+
tls_opts => [{verify, verify_none}],
142+
protocols => [http2],
143+
http2_opts => #{notify_settings_changed => true},
144+
ws_opts => #{protocols => [{<<"mqtt">>, gun_ws_h}]}
145+
}),
146+
{ok, http2} = gun:await_up(ConnPid),
147+
{notify, settings_changed, #{enable_connect_protocol := true}}
148+
= gun:await(ConnPid, undefined),
149+
StreamRef = gun:ws_upgrade(ConnPid, "/ws", []),
150+
{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
151+
gun:ws_send(ConnPid, StreamRef, {binary, rabbit_ws_test_util:mqtt_3_1_1_connect_packet()}),
152+
{ws, {binary, _P}} = gun:await(ConnPid, StreamRef),
153+
eventually(?_assertEqual(1, num_mqtt_connections(Config, 0))),
154+
ok.
155+
117156
%% -------------------------------------------------------------------
118157
%% Internal helpers
119158
%% -------------------------------------------------------------------

deps/rabbitmq_web_stomp/Makefile

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,20 @@ define PROJECT_APP_EXTRA_KEYS
2020
endef
2121

2222
DEPS = cowboy rabbit_common rabbit rabbitmq_stomp
23-
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers
23+
TEST_DEPS = gun rabbitmq_ct_helpers rabbitmq_ct_client_helpers
2424

2525
PLT_APPS += cowlib
2626

2727
# FIXME: Add Ranch as a BUILD_DEPS to be sure the correct version is picked.
2828
# See rabbitmq-components.mk.
2929
BUILD_DEPS += ranch
3030

31+
dep_gun = hex 2.2.0
32+
3133
DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk
3234
DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk
3335

3436
include ../../rabbitmq-components.mk
3537
include ../../erlang.mk
3638

37-
CT_HOOKS = rabbit_ct_hook
39+
CT_HOOKS = rabbit_ct_hook

deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,40 @@
5252
upgrade(Req, Env, Handler, HandlerState) ->
5353
upgrade(Req, Env, Handler, HandlerState, #{}).
5454

55+
%% We create a proxy socket for HTTP/2 even if no proxy was used,
56+
%% and add a special field 'http_version' to indicate this is HTTP/2.
57+
upgrade(Req=#{version := 'HTTP/2', pid := Parent, peer := Peer, sock := Sock},
58+
Env, Handler, HandlerState, Opts) ->
59+
%% Cowboy doesn't expose the socket when HTTP/2 is used.
60+
%% We take it directly from the connection's state.
61+
%%
62+
%% @todo Ideally we would not need the real socket for
63+
%% normal operations. But we currently need it for
64+
%% the heartbeat processes to do their job. In the
65+
%% future we should not rely on those processes
66+
%% and instead do the heartbeating directly in the
67+
%% Websocket handler.
68+
RealSocket = element(4,element(1,sys:get_state(Parent))),
69+
ProxyInfo = case Req of
70+
#{proxy_header := ProxyHeader} ->
71+
ProxyHeader#{http_version => 'HTTP/2'};
72+
_ ->
73+
{SrcAddr, SrcPort} = Peer,
74+
{DestAddr, DestPort} = Sock,
75+
#{
76+
http_version => 'HTTP/2',
77+
src_address => SrcAddr,
78+
src_port => SrcPort,
79+
dest_address => DestAddr,
80+
dest_port => DestPort
81+
}
82+
end,
83+
ProxySocket = {rabbit_proxy_socket, RealSocket, ProxyInfo},
84+
cowboy_websocket:upgrade(Req, Env, Handler, HandlerState#state{socket = ProxySocket}, Opts);
5585
upgrade(Req, Env, Handler, HandlerState, Opts) ->
5686
cowboy_websocket:upgrade(Req, Env, Handler, HandlerState, Opts).
5787

88+
%% This is only called for HTTP/1.1.
5889
takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState}) ->
5990
Sock = case HandlerState#state.socket of
6091
undefined ->
@@ -95,7 +126,7 @@ init(Req0, Opts) ->
95126
socket = SockInfo,
96127
peername = PeerAddr,
97128
auth_hd = cowboy_req:header(<<"authorization">>, Req)
98-
}, WsOpts}.
129+
}, WsOpts#{data_delivery => relay}}.
99130

100131
websocket_init(State) ->
101132
process_flag(trap_exit, true),
@@ -218,8 +249,23 @@ websocket_info({start_heartbeats, {SendTimeout, ReceiveTimeout}},
218249
Self = self(),
219250
SendFun = fun () -> Self ! {send, <<$\n>>}, ok end,
220251
ReceiveFun = fun() -> Self ! client_timeout end,
221-
Heartbeat = rabbit_heartbeat:start(SupPid, Sock, SendTimeout,
222-
SendFun, ReceiveTimeout, ReceiveFun),
252+
Heartbeat = case Sock of
253+
{rabbit_proxy_socket, RealSocket, #{http_version := 'HTTP/2'}} ->
254+
%% HTTP/2 Websocket may have multiple Websocket sessions
255+
%% on a single connection (this can happen for example
256+
%% when refreshing a page). As a result we need to attach
257+
%% the heartbeat processes to the session. We do this via
258+
%% a link for now. @todo In the future we will have a
259+
%% mechanism in Cowboy to attach them to the stream.
260+
{ok, Sender} = rabbit_heartbeat:start_heartbeat_sender(RealSocket,
261+
SendTimeout, SendFun, {heartbeat_sender, unknown}),
262+
{ok, Receiver} = rabbit_heartbeat:start_heartbeat_receiver(RealSocket,
263+
ReceiveTimeout, ReceiveFun, {heartbeat_receiver, unknown}),
264+
{Sender, Receiver};
265+
_ ->
266+
rabbit_heartbeat:start(SupPid, Sock, SendTimeout,
267+
SendFun, ReceiveTimeout, ReceiveFun)
268+
end,
223269
{ok, State#state{heartbeat = Heartbeat}};
224270
websocket_info(client_timeout, State) ->
225271
stop(State);
@@ -248,10 +294,17 @@ websocket_info(Msg, State) ->
248294
[Msg]),
249295
{ok, State}.
250296

251-
terminate(_Reason, _Req, #state{proc_state = undefined}) ->
252-
ok;
253-
terminate(_Reason, _Req, #state{proc_state = ProcState}) ->
297+
terminate(_Reason, _Req, State = #state{proc_state = undefined}) ->
298+
terminate_heartbeaters(State);
299+
terminate(_Reason, _Req, State = #state{proc_state = ProcState}) ->
254300
_ = rabbit_stomp_processor:flush_and_die(ProcState),
301+
terminate_heartbeaters(State).
302+
303+
terminate_heartbeaters(#state{heartbeat = {none, none}}) ->
304+
ok;
305+
terminate_heartbeaters(#state{heartbeat = {SPid, RPid}}) ->
306+
sys:terminate(SPid, shutdown),
307+
sys:terminate(RPid, shutdown),
255308
ok.
256309

257310
%%----------------------------------------------------------------------------

deps/rabbitmq_web_stomp/src/rabbit_web_stomp_listener.erl

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,16 +136,20 @@ start_tls_listener(TLSConf0, CowboyOpts0, Routes) ->
136136
TLSPort = proplists:get_value(port, TLSConf0),
137137
TLSConf = maybe_parse_ip(TLSConf0),
138138
RanchTransportOpts = #{
139-
socket_opts => TLSConf,
139+
socket_opts => [{alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]}|TLSConf],
140140
connection_type => supervisor,
141141
max_connections => get_max_connections(),
142142
num_acceptors => NumSslAcceptors,
143143
num_conns_sups => 1
144144
},
145-
CowboyOpts = CowboyOpts0#{env => #{dispatch => Routes},
146-
middlewares => [cowboy_router,
147-
rabbit_web_stomp_middleware,
148-
cowboy_handler]},
145+
CowboyOpts = CowboyOpts0#{
146+
env => #{dispatch => Routes},
147+
middlewares => [cowboy_router,
148+
rabbit_web_stomp_middleware,
149+
cowboy_handler],
150+
%% Enable HTTP/2 Websocket if not explicitly disabled.
151+
enable_connect_protocol => maps:get(enable_connect_protocol, CowboyOpts0, true)
152+
},
149153
case ranch:start_listener(rabbit_networking:ranch_ref(TLSConf),
150154
ranch_ssl,
151155
RanchTransportOpts,

deps/rabbitmq_web_stomp/test/cowboy_websocket_SUITE.erl

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ groups() ->
2727
pubsub_binary,
2828
sub_non_existent,
2929
disconnect,
30-
http_auth
30+
http_auth,
31+
wss_http2
3132
]},
3233
%% rabbitmq/rabbitmq-web-stomp#110
3334
{default_login_enabled, [],
@@ -48,7 +49,25 @@ init_per_suite(Config) ->
4849
[{rmq_nodename_suffix, ?MODULE},
4950
{protocol, "ws"}]),
5051
rabbit_ct_helpers:log_environment(),
51-
rabbit_ct_helpers:run_setup_steps(Config1,
52+
Config2 = rabbit_ct_helpers:run_setup_steps(Config1),
53+
{rmq_certsdir, CertsDir} = proplists:lookup(rmq_certsdir, Config2),
54+
Config3 = rabbit_ct_helpers:merge_app_env(
55+
Config2,
56+
{rabbitmq_web_stomp,
57+
[{ssl_config,
58+
[{cacertfile, filename:join([CertsDir, "testca", "cacert.pem"])},
59+
{certfile, filename:join([CertsDir, "server", "cert.pem"])},
60+
{keyfile, filename:join([CertsDir, "server", "key.pem"])},
61+
%% We only want to ensure HTTP/2 Websocket is working.
62+
{fail_if_no_peer_cert, false},
63+
{versions, ['tlsv1.3']},
64+
%% We hard code this port number here because it will be computed later by
65+
%% rabbit_ct_broker_helpers:init_tcp_port_numbers/3 when we start the broker.
66+
%% (The alternative is to first start the broker, stop the rabbitmq_web_amqp app,
67+
%% configure tls_config, and then start the app again.)
68+
{port, 21014}
69+
]}]}),
70+
rabbit_ct_helpers:run_setup_steps(Config3,
5271
rabbit_ct_broker_helpers:setup_steps()).
5372

5473
end_per_suite(Config) ->
@@ -286,3 +305,23 @@ Protocol = ?config(protocol, Config),
286305
{close, _} = rfc6455_client:close(WS3),
287306

288307
ok.
308+
309+
wss_http2(Config) ->
310+
{ok, _} = application:ensure_all_started(gun),
311+
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_stomp_tls),
312+
{ok, ConnPid} = gun:open("localhost", Port, #{
313+
transport => tls,
314+
tls_opts => [{verify, verify_none}],
315+
protocols => [http2],
316+
http2_opts => #{notify_settings_changed => true},
317+
ws_opts => #{protocols => [{<<"v12.stomp">>, gun_ws_h}]}
318+
}),
319+
{ok, http2} = gun:await_up(ConnPid),
320+
{notify, settings_changed, #{enable_connect_protocol := true}}
321+
= gun:await(ConnPid, undefined),
322+
StreamRef = gun:ws_upgrade(ConnPid, "/ws", []),
323+
{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
324+
gun:ws_send(ConnPid, StreamRef, {text, stomp:marshal("CONNECT", [{"login","guest"}, {"passcode", "guest"}])}),
325+
{ws, {text, P}} = gun:await(ConnPid, StreamRef),
326+
{<<"CONNECTED">>, _, <<>>} = stomp:unmarshal(P),
327+
ok.

deps/rabbitmq_web_stomp_examples/priv/echo.html

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ <h2>Logs</h2>
7979
};
8080

8181
// Stomp.js boilerplate
82-
var client = Stomp.client('ws://' + window.location.hostname + ':15674/ws');
82+
const wsProtocol = location.protocol === 'https:' ? 'wss:' : 'ws:';
83+
var client = Stomp.client(wsProtocol + '//' + window.location.hostname + ':15674/ws');
8384
client.debug = pipe('#second');
8485

8586
var print_first = pipe('#first', function(data) {

rabbitmq-components.mk

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ endif
4040
# all projects use the same versions. It avoids conflicts.
4141

4242
dep_accept = hex 0.3.5
43-
dep_cowboy = hex 2.13.0
44-
dep_cowlib = hex 2.14.0
43+
dep_cowboy = hex 2.14.0
44+
dep_cowlib = hex 2.16.0
4545
dep_credentials_obfuscation = hex 3.5.0
4646
dep_cuttlefish = hex 3.5.0
4747
dep_gen_batch_server = hex 0.8.8

0 commit comments

Comments
 (0)