Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion deps/rabbitmq_web_mqtt/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ export BUILD_WITHOUT_QUIC

LOCAL_DEPS = ssl
DEPS = rabbit cowboy rabbitmq_mqtt
TEST_DEPS = emqtt rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management rabbitmq_stomp rabbitmq_consistent_hash_exchange
TEST_DEPS = gun emqtt rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management rabbitmq_stomp rabbitmq_consistent_hash_exchange

PLT_APPS += rabbitmq_cli elixir cowlib ssl

# FIXME: Add Ranch as a BUILD_DEPS to be sure the correct version is picked.
# See rabbitmq-components.mk.
BUILD_DEPS += ranch

dep_gun = hex 2.2.0
dep_emqtt = git https://github.com/emqx/emqtt.git 1.14.6

DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk
Expand Down
6 changes: 5 additions & 1 deletion deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ start_tcp_listener(TCPConf0, CowboyOpts) ->


start_tls_listener([], _) -> ok;
start_tls_listener(TLSConf0, CowboyOpts) ->
start_tls_listener(TLSConf0, CowboyOpts0) ->
_ = rabbit_networking:ensure_ssl(),
{TLSConf, TLSIpStr, TLSPort} = get_tls_conf(TLSConf0),
RanchRef = rabbit_networking:ranch_ref(TLSConf),
Expand All @@ -135,6 +135,10 @@ start_tls_listener(TLSConf0, CowboyOpts) ->
num_acceptors => get_env(num_ssl_acceptors, 10),
num_conns_sups => get_env(num_conns_sup, 1)
},
CowboyOpts = CowboyOpts0#{
%% Enable HTTP/2 Websocket if not explicitly disabled.
enable_connect_protocol => maps:get(enable_connect_protocol, CowboyOpts0, true)
},
case cowboy:start_tls(RanchRef, RanchTransportOpts, CowboyOpts) of
{ok, _} ->
ok;
Expand Down
33 changes: 32 additions & 1 deletion deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,40 @@
upgrade(Req, Env, Handler, HandlerState) ->
upgrade(Req, Env, Handler, HandlerState, #{}).

%% We create a proxy socket for HTTP/2 even if no proxy was used,
%% and add a special field 'http_version' to indicate this is HTTP/2.
upgrade(Req=#{version := 'HTTP/2', pid := Parent, peer := Peer, sock := Sock},
Env, Handler, HandlerState, Opts) ->
%% Cowboy doesn't expose the socket when HTTP/2 is used.
%% We take it directly from the connection's state.
%%
%% @todo Ideally we would not need the real socket for
%% normal operations. But we currently need it for
%% the heartbeat processes to do their job. In the
%% future we should not rely on those processes
%% and instead do the heartbeating directly in the
%% Websocket handler.
RealSocket = element(4,element(1,sys:get_state(Parent))),
ProxyInfo = case Req of
#{proxy_header := ProxyHeader} ->
ProxyHeader#{http_version => 'HTTP/2'};
_ ->
{SrcAddr, SrcPort} = Peer,
{DestAddr, DestPort} = Sock,
#{
http_version => 'HTTP/2',
src_address => SrcAddr,
src_port => SrcPort,
dest_address => DestAddr,
dest_port => DestPort
}
end,
ProxySocket = {rabbit_proxy_socket, RealSocket, ProxyInfo},
cowboy_websocket:upgrade(Req, Env, Handler, HandlerState#state{socket = ProxySocket}, Opts);
upgrade(Req, Env, Handler, HandlerState, Opts) ->
cowboy_websocket:upgrade(Req, Env, Handler, HandlerState, Opts).

%% This is only called for HTTP/1.1.
takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState}) ->
Sock = case HandlerState#state.socket of
undefined ->
Expand All @@ -84,7 +115,7 @@ init(Req, Opts) ->
stats_timer = rabbit_event:init_stats_timer()},
WsOpts0 = proplists:get_value(ws_opts, Opts, #{}),
WsOpts = maps:merge(#{compress => true}, WsOpts0),
{?MODULE, Req1, State, WsOpts}
{?MODULE, Req1, State, WsOpts#{data_delivery => relay}}
end
end.

Expand Down
41 changes: 40 additions & 1 deletion deps/rabbitmq_web_mqtt/test/web_mqtt_system_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ groups() ->
,unacceptable_data_type
,handle_invalid_packets
,duplicate_connect
,wss_http2
]}
].

Expand All @@ -36,7 +37,25 @@ init_per_suite(Config) ->
{rmq_nodename_suffix, ?MODULE},
{protocol, "ws"}
]),
rabbit_ct_helpers:run_setup_steps(Config1,
Config2 = rabbit_ct_helpers:run_setup_steps(Config1),
{rmq_certsdir, CertsDir} = proplists:lookup(rmq_certsdir, Config2),
Config3 = rabbit_ct_helpers:merge_app_env(
Config2,
{rabbitmq_web_mqtt,
[{ssl_config,
[{cacertfile, filename:join([CertsDir, "testca", "cacert.pem"])},
{certfile, filename:join([CertsDir, "server", "cert.pem"])},
{keyfile, filename:join([CertsDir, "server", "key.pem"])},
%% We only want to ensure HTTP/2 Websocket is working.
{fail_if_no_peer_cert, false},
{versions, ['tlsv1.3']},
%% We hard code this port number here because it will be computed later by
%% rabbit_ct_broker_helpers:init_tcp_port_numbers/3 when we start the broker.
%% (The alternative is to first start the broker, stop the rabbitmq_web_amqp app,
%% configure tls_config, and then start the app again.)
{port, 21010}
]}]}),
rabbit_ct_helpers:run_setup_steps(Config3,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).

Expand Down Expand Up @@ -114,6 +133,26 @@ duplicate_connect(Config) ->
after 500 -> ct:fail("expected web socket to exit")
end.

wss_http2(Config) ->
{ok, _} = application:ensure_all_started(gun),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt_tls),
{ok, ConnPid} = gun:open("localhost", Port, #{
transport => tls,
tls_opts => [{verify, verify_none}],
protocols => [http2],
http2_opts => #{notify_settings_changed => true},
ws_opts => #{protocols => [{<<"mqtt">>, gun_ws_h}]}
}),
{ok, http2} = gun:await_up(ConnPid),
{notify, settings_changed, #{enable_connect_protocol := true}}
= gun:await(ConnPid, undefined),
StreamRef = gun:ws_upgrade(ConnPid, "/ws", []),
{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
gun:ws_send(ConnPid, StreamRef, {binary, rabbit_ws_test_util:mqtt_3_1_1_connect_packet()}),
{ws, {binary, _P}} = gun:await(ConnPid, StreamRef),
eventually(?_assertEqual(1, num_mqtt_connections(Config, 0))),
ok.

%% -------------------------------------------------------------------
%% Internal helpers
%% -------------------------------------------------------------------
Expand Down
6 changes: 4 additions & 2 deletions deps/rabbitmq_web_stomp/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ define PROJECT_APP_EXTRA_KEYS
endef

DEPS = cowboy rabbit_common rabbit rabbitmq_stomp
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers
TEST_DEPS = gun rabbitmq_ct_helpers rabbitmq_ct_client_helpers

PLT_APPS += cowlib

# FIXME: Add Ranch as a BUILD_DEPS to be sure the correct version is picked.
# See rabbitmq-components.mk.
BUILD_DEPS += ranch

dep_gun = hex 2.2.0

DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk
DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk

include ../../rabbitmq-components.mk
include ../../erlang.mk

CT_HOOKS = rabbit_ct_hook
CT_HOOKS = rabbit_ct_hook
65 changes: 59 additions & 6 deletions deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,40 @@
upgrade(Req, Env, Handler, HandlerState) ->
upgrade(Req, Env, Handler, HandlerState, #{}).

%% We create a proxy socket for HTTP/2 even if no proxy was used,
%% and add a special field 'http_version' to indicate this is HTTP/2.
upgrade(Req=#{version := 'HTTP/2', pid := Parent, peer := Peer, sock := Sock},
Env, Handler, HandlerState, Opts) ->
%% Cowboy doesn't expose the socket when HTTP/2 is used.
%% We take it directly from the connection's state.
%%
%% @todo Ideally we would not need the real socket for
%% normal operations. But we currently need it for
%% the heartbeat processes to do their job. In the
%% future we should not rely on those processes
%% and instead do the heartbeating directly in the
%% Websocket handler.
RealSocket = element(4,element(1,sys:get_state(Parent))),
ProxyInfo = case Req of
#{proxy_header := ProxyHeader} ->
ProxyHeader#{http_version => 'HTTP/2'};
_ ->
{SrcAddr, SrcPort} = Peer,
{DestAddr, DestPort} = Sock,
#{
http_version => 'HTTP/2',
src_address => SrcAddr,
src_port => SrcPort,
dest_address => DestAddr,
dest_port => DestPort
}
end,
ProxySocket = {rabbit_proxy_socket, RealSocket, ProxyInfo},
cowboy_websocket:upgrade(Req, Env, Handler, HandlerState#state{socket = ProxySocket}, Opts);
upgrade(Req, Env, Handler, HandlerState, Opts) ->
cowboy_websocket:upgrade(Req, Env, Handler, HandlerState, Opts).

%% This is only called for HTTP/1.1.
takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState}) ->
Sock = case HandlerState#state.socket of
undefined ->
Expand Down Expand Up @@ -95,7 +126,7 @@ init(Req0, Opts) ->
socket = SockInfo,
peername = PeerAddr,
auth_hd = cowboy_req:header(<<"authorization">>, Req)
}, WsOpts}.
}, WsOpts#{data_delivery => relay}}.

websocket_init(State) ->
process_flag(trap_exit, true),
Expand Down Expand Up @@ -218,8 +249,23 @@ websocket_info({start_heartbeats, {SendTimeout, ReceiveTimeout}},
Self = self(),
SendFun = fun () -> Self ! {send, <<$\n>>}, ok end,
ReceiveFun = fun() -> Self ! client_timeout end,
Heartbeat = rabbit_heartbeat:start(SupPid, Sock, SendTimeout,
SendFun, ReceiveTimeout, ReceiveFun),
Heartbeat = case Sock of
{rabbit_proxy_socket, RealSocket, #{http_version := 'HTTP/2'}} ->
%% HTTP/2 Websocket may have multiple Websocket sessions
%% on a single connection (this can happen for example
%% when refreshing a page). As a result we need to attach
%% the heartbeat processes to the session. We do this via
%% a link for now. @todo In the future we will have a
%% mechanism in Cowboy to attach them to the stream.
{ok, Sender} = rabbit_heartbeat:start_heartbeat_sender(RealSocket,
SendTimeout, SendFun, {heartbeat_sender, unknown}),
{ok, Receiver} = rabbit_heartbeat:start_heartbeat_receiver(RealSocket,
ReceiveTimeout, ReceiveFun, {heartbeat_receiver, unknown}),
{Sender, Receiver};
_ ->
rabbit_heartbeat:start(SupPid, Sock, SendTimeout,
SendFun, ReceiveTimeout, ReceiveFun)
end,
{ok, State#state{heartbeat = Heartbeat}};
websocket_info(client_timeout, State) ->
stop(State);
Expand Down Expand Up @@ -248,10 +294,17 @@ websocket_info(Msg, State) ->
[Msg]),
{ok, State}.

terminate(_Reason, _Req, #state{proc_state = undefined}) ->
ok;
terminate(_Reason, _Req, #state{proc_state = ProcState}) ->
terminate(_Reason, _Req, State = #state{proc_state = undefined}) ->
terminate_heartbeaters(State);
terminate(_Reason, _Req, State = #state{proc_state = ProcState}) ->
_ = rabbit_stomp_processor:flush_and_die(ProcState),
terminate_heartbeaters(State).

terminate_heartbeaters(#state{heartbeat = {none, none}}) ->
ok;
terminate_heartbeaters(#state{heartbeat = {SPid, RPid}}) ->
sys:terminate(SPid, shutdown),
sys:terminate(RPid, shutdown),
ok.

%%----------------------------------------------------------------------------
Expand Down
14 changes: 9 additions & 5 deletions deps/rabbitmq_web_stomp/src/rabbit_web_stomp_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,20 @@ start_tls_listener(TLSConf0, CowboyOpts0, Routes) ->
TLSPort = proplists:get_value(port, TLSConf0),
TLSConf = maybe_parse_ip(TLSConf0),
RanchTransportOpts = #{
socket_opts => TLSConf,
socket_opts => [{alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]}|TLSConf],
connection_type => supervisor,
max_connections => get_max_connections(),
num_acceptors => NumSslAcceptors,
num_conns_sups => 1
},
CowboyOpts = CowboyOpts0#{env => #{dispatch => Routes},
middlewares => [cowboy_router,
rabbit_web_stomp_middleware,
cowboy_handler]},
CowboyOpts = CowboyOpts0#{
env => #{dispatch => Routes},
middlewares => [cowboy_router,
rabbit_web_stomp_middleware,
cowboy_handler],
%% Enable HTTP/2 Websocket if not explicitly disabled.
enable_connect_protocol => maps:get(enable_connect_protocol, CowboyOpts0, true)
},
case ranch:start_listener(rabbit_networking:ranch_ref(TLSConf),
ranch_ssl,
RanchTransportOpts,
Expand Down
43 changes: 41 additions & 2 deletions deps/rabbitmq_web_stomp/test/cowboy_websocket_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ groups() ->
pubsub_binary,
sub_non_existent,
disconnect,
http_auth
http_auth,
wss_http2
]},
%% rabbitmq/rabbitmq-web-stomp#110
{default_login_enabled, [],
Expand All @@ -48,7 +49,25 @@ init_per_suite(Config) ->
[{rmq_nodename_suffix, ?MODULE},
{protocol, "ws"}]),
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config1,
Config2 = rabbit_ct_helpers:run_setup_steps(Config1),
{rmq_certsdir, CertsDir} = proplists:lookup(rmq_certsdir, Config2),
Config3 = rabbit_ct_helpers:merge_app_env(
Config2,
{rabbitmq_web_stomp,
[{ssl_config,
[{cacertfile, filename:join([CertsDir, "testca", "cacert.pem"])},
{certfile, filename:join([CertsDir, "server", "cert.pem"])},
{keyfile, filename:join([CertsDir, "server", "key.pem"])},
%% We only want to ensure HTTP/2 Websocket is working.
{fail_if_no_peer_cert, false},
{versions, ['tlsv1.3']},
%% We hard code this port number here because it will be computed later by
%% rabbit_ct_broker_helpers:init_tcp_port_numbers/3 when we start the broker.
%% (The alternative is to first start the broker, stop the rabbitmq_web_amqp app,
%% configure tls_config, and then start the app again.)
{port, 21014}
]}]}),
rabbit_ct_helpers:run_setup_steps(Config3,
rabbit_ct_broker_helpers:setup_steps()).

end_per_suite(Config) ->
Expand Down Expand Up @@ -286,3 +305,23 @@ Protocol = ?config(protocol, Config),
{close, _} = rfc6455_client:close(WS3),

ok.

wss_http2(Config) ->
{ok, _} = application:ensure_all_started(gun),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_stomp_tls),
{ok, ConnPid} = gun:open("localhost", Port, #{
transport => tls,
tls_opts => [{verify, verify_none}],
protocols => [http2],
http2_opts => #{notify_settings_changed => true},
ws_opts => #{protocols => [{<<"v12.stomp">>, gun_ws_h}]}
}),
{ok, http2} = gun:await_up(ConnPid),
{notify, settings_changed, #{enable_connect_protocol := true}}
= gun:await(ConnPid, undefined),
StreamRef = gun:ws_upgrade(ConnPid, "/ws", []),
{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
gun:ws_send(ConnPid, StreamRef, {text, stomp:marshal("CONNECT", [{"login","guest"}, {"passcode", "guest"}])}),
{ws, {text, P}} = gun:await(ConnPid, StreamRef),
{<<"CONNECTED">>, _, <<>>} = stomp:unmarshal(P),
ok.
3 changes: 2 additions & 1 deletion deps/rabbitmq_web_stomp_examples/priv/echo.html
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ <h2>Logs</h2>
};

// Stomp.js boilerplate
var client = Stomp.client('ws://' + window.location.hostname + ':15674/ws');
const wsProtocol = location.protocol === 'https:' ? 'wss:' : 'ws:';
var client = Stomp.client(wsProtocol + '//' + window.location.hostname + ':15674/ws');
client.debug = pipe('#second');

var print_first = pipe('#first', function(data) {
Expand Down
4 changes: 2 additions & 2 deletions rabbitmq-components.mk
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ endif
# all projects use the same versions. It avoids conflicts.

dep_accept = hex 0.3.5
dep_cowboy = hex 2.13.0
dep_cowlib = hex 2.14.0
dep_cowboy = hex 2.14.0
dep_cowlib = hex 2.16.0
dep_credentials_obfuscation = hex 3.5.0
dep_cuttlefish = hex 3.5.0
dep_gen_batch_server = hex 0.8.8
Expand Down
Loading