Skip to content

Commit 20ee9f4

Browse files
author
Matthew Sackman
committed
merge in from bug 19356
2 parents b34e177 + 37115f1 commit 20ee9f4

File tree

11 files changed

+237
-52
lines changed

11 files changed

+237
-52
lines changed

.hgignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
syntax: glob
22
*.beam
33
*~
4+
*.swp
5+
*.patch
46
erl_crash.dump
57

68
syntax: regexp

ebin/rabbit_app.in

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
%% actually want to start it
1616
{mod, {rabbit, []}},
1717
{env, [{tcp_listeners, [{"0.0.0.0", 5672}]},
18+
{ssl_listeners, []},
19+
{ssl_options, []},
1820
{extra_startup_steps, []},
1921
{default_user, <<"guest">>},
2022
{default_pass, <<"guest">>},

include/rabbit.hrl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464

6565
-record(basic_message, {exchange_name, routing_key, content, persistent_key}).
6666

67+
-record(ssl_socket, {tcp, ssl}).
6768
-record(delivery, {mandatory, immediate, txn, sender, message}).
6869

6970
%%----------------------------------------------------------------------------
@@ -74,7 +75,8 @@
7475

7576
-type(maybe(T) :: T | 'none').
7677
-type(erlang_node() :: atom()).
77-
-type(socket() :: port()).
78+
-type(ssl_socket() :: #ssl_socket{}).
79+
-type(socket() :: port() | ssl_socket()).
7880
-type(thunk(T) :: fun(() -> T)).
7981
-type(info_key() :: atom()).
8082
-type(info() :: {info_key(), any()}).

src/rabbit.erl

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,27 @@ start(normal, []) ->
170170
{"TCP listeners",
171171
fun () ->
172172
ok = rabbit_networking:start(),
173-
{ok, TCPListeners} = application:get_env(tcp_listeners),
173+
{ok, TcpListeners} = application:get_env(tcp_listeners),
174174
lists:foreach(
175175
fun ({Host, Port}) ->
176176
ok = rabbit_networking:start_tcp_listener(Host, Port)
177177
end,
178-
TCPListeners)
178+
TcpListeners)
179+
end},
180+
{"SSL listeners",
181+
fun () ->
182+
case application:get_env(ssl_listeners) of
183+
{ok, []} ->
184+
ok;
185+
{ok, SslListeners} ->
186+
ok = rabbit_misc:start_applications([crypto, ssl]),
187+
188+
{ok, SslOpts} = application:get_env(ssl_options),
189+
190+
[rabbit_networking:start_ssl_listener
191+
(Host, Port, SslOpts) || {Host, Port} <- SslListeners],
192+
ok
193+
end
179194
end}]
180195
++ ExtraSteps),
181196

src/rabbit_heartbeat.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ start_heartbeat(Sock, TimeoutSec) ->
5353
spawn_link(fun () -> heartbeater(Sock, TimeoutSec * 1000 div 2,
5454
send_oct, 0,
5555
fun () ->
56-
catch gen_tcp:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
56+
catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
5757
continue
5858
end,
5959
erlang:monitor(process, Parent)) end),
@@ -73,7 +73,7 @@ heartbeater(Sock, TimeoutMillisec, StatName, Threshold, Handler, MonitorRef) ->
7373
{'DOWN', MonitorRef, process, _Object, _Info} -> ok;
7474
Other -> exit({unexpected_message, Other})
7575
after TimeoutMillisec ->
76-
case inet:getstat(Sock, [StatName]) of
76+
case rabbit_net:getstat(Sock, [StatName]) of
7777
{ok, [{StatName, NewStatVal}]} ->
7878
if NewStatVal =/= StatVal ->
7979
F({NewStatVal, 0});

src/rabbit_net.erl

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 1.1 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License at
4+
%% http://www.mozilla.org/MPL/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
%% License for the specific language governing rights and limitations
9+
%% under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developers of the Original Code are LShift Ltd,
14+
%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
%%
16+
%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
%% Technologies LLC, and Rabbit Technologies Ltd.
20+
%%
21+
%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
22+
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
%% Copyright (C) 2007-2009 Cohesive Financial Technologies
24+
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
%% (C) 2007-2009 Rabbit Technologies Ltd.
26+
%%
27+
%% All Rights Reserved.
28+
%%
29+
%% Contributor(s): ______________________________________.
30+
%%
31+
32+
-module(rabbit_net).
33+
-include("rabbit.hrl").
34+
-include_lib("kernel/include/inet.hrl").
35+
36+
-export([async_recv/3, close/1, controlling_process/2,
37+
getstat/2, peername/1, port_command/2,
38+
send/2, sockname/1]).
39+
%%---------------------------------------------------------------------------
40+
41+
-ifdef(use_specs).
42+
43+
-type(stat_option() ::
44+
'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' |
45+
'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend').
46+
-type(error() :: {'error', any()}).
47+
48+
-spec(async_recv/3 :: (socket(), integer(), timeout()) -> {'ok', any()}).
49+
-spec(close/1 :: (socket()) -> 'ok' | error()).
50+
-spec(controlling_process/2 :: (socket(), pid()) -> 'ok' | error()).
51+
-spec(port_command/2 :: (socket(), iolist()) -> 'true').
52+
-spec(send/2 :: (socket(), binary() | iolist()) -> 'ok' | error()).
53+
-spec(peername/1 :: (socket()) ->
54+
{'ok', {ip_address(), non_neg_integer()}} | error()).
55+
-spec(sockname/1 :: (socket()) ->
56+
{'ok', {ip_address(), non_neg_integer()}} | error()).
57+
-spec(getstat/2 :: (socket(), [stat_option()]) ->
58+
{'ok', [{stat_option(), integer()}]} | error()).
59+
60+
-endif.
61+
62+
%%---------------------------------------------------------------------------
63+
64+
65+
async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) ->
66+
Pid = self(),
67+
Ref = make_ref(),
68+
69+
spawn(fun() -> Pid ! {inet_async, Sock, Ref,
70+
ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)}
71+
end),
72+
73+
{ok, Ref};
74+
75+
async_recv(Sock, Length, infinity) when is_port(Sock) ->
76+
prim_inet:async_recv(Sock, Length, -1);
77+
78+
async_recv(Sock, Length, Timeout) when is_port(Sock) ->
79+
prim_inet:async_recv(Sock, Length, Timeout).
80+
81+
close(Sock) when is_record(Sock, ssl_socket) ->
82+
ssl:close(Sock#ssl_socket.ssl);
83+
84+
close(Sock) when is_port(Sock) ->
85+
gen_tcp:close(Sock).
86+
87+
88+
controlling_process(Sock, Pid) when is_record(Sock, ssl_socket) ->
89+
ssl:controlling_process(Sock#ssl_socket.ssl, Pid);
90+
91+
controlling_process(Sock, Pid) when is_port(Sock) ->
92+
gen_tcp:controlling_process(Sock, Pid).
93+
94+
95+
getstat(Sock, Stats) when is_record(Sock, ssl_socket) ->
96+
inet:getstat(Sock#ssl_socket.tcp, Stats);
97+
98+
getstat(Sock, Stats) when is_port(Sock) ->
99+
inet:getstat(Sock, Stats).
100+
101+
102+
peername(Sock) when is_record(Sock, ssl_socket) ->
103+
ssl:peername(Sock#ssl_socket.ssl);
104+
105+
peername(Sock) when is_port(Sock) ->
106+
inet:peername(Sock).
107+
108+
109+
port_command(Sock, Data) when is_record(Sock, ssl_socket) ->
110+
case ssl:send(Sock#ssl_socket.ssl, Data) of
111+
ok ->
112+
self() ! {inet_reply, Sock, ok},
113+
true;
114+
{error, Reason} ->
115+
erlang:error(Reason)
116+
end;
117+
118+
port_command(Sock, Data) when is_port(Sock) ->
119+
erlang:port_command(Sock, Data).
120+
121+
send(Sock, Data) when is_record(Sock, ssl_socket) ->
122+
ssl:send(Sock#ssl_socket.ssl, Data);
123+
124+
send(Sock, Data) when is_port(Sock) ->
125+
gen_tcp:send(Sock, Data).
126+
127+
128+
sockname(Sock) when is_record(Sock, ssl_socket) ->
129+
ssl:sockname(Sock#ssl_socket.ssl);
130+
131+
sockname(Sock) when is_port(Sock) ->
132+
inet:sockname(Sock).

src/rabbit_networking.erl

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,28 @@
3131

3232
-module(rabbit_networking).
3333

34-
-export([start/0, start_tcp_listener/2, stop_tcp_listener/2,
35-
on_node_down/1, active_listeners/0, node_listeners/1,
36-
connections/0, connection_info/1, connection_info/2,
37-
connection_info_all/0, connection_info_all/1]).
34+
-export([start/0, start_tcp_listener/2, start_ssl_listener/3,
35+
stop_tcp_listener/2, on_node_down/1, active_listeners/0,
36+
node_listeners/1, connections/0, connection_info/1,
37+
connection_info/2, connection_info_all/0,
38+
connection_info_all/1]).
3839
%%used by TCP-based transports, e.g. STOMP adapter
3940
-export([check_tcp_listener_address/3]).
4041

41-
-export([tcp_listener_started/2, tcp_listener_stopped/2, start_client/1]).
42+
-export([tcp_listener_started/2, ssl_connection_upgrade/2,
43+
tcp_listener_stopped/2, start_client/1]).
4244

4345
-include("rabbit.hrl").
4446
-include_lib("kernel/include/inet.hrl").
4547

48+
-define(RABBIT_TCP_OPTS, [
49+
binary,
50+
{packet, raw}, % no packaging
51+
{reuseaddr, true}, % allow rebind without waiting
52+
%% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg.
53+
%% {delay_send, true},
54+
{exit_on_close, false}
55+
]).
4656
%%----------------------------------------------------------------------------
4757

4858
-ifdef(use_specs).
@@ -52,6 +62,7 @@
5262

5363
-spec(start/0 :: () -> 'ok').
5464
-spec(start_tcp_listener/2 :: (host(), ip_port()) -> 'ok').
65+
-spec(start_ssl_listener/3 :: (host(), ip_port(), [info()]) -> 'ok').
5566
-spec(stop_tcp_listener/2 :: (host(), ip_port()) -> 'ok').
5667
-spec(active_listeners/0 :: () -> [listener()]).
5768
-spec(node_listeners/1 :: (erlang_node()) -> [listener()]).
@@ -96,21 +107,24 @@ check_tcp_listener_address(NamePrefix, Host, Port) ->
96107
{IPAddress, Name}.
97108

98109
start_tcp_listener(Host, Port) ->
99-
{IPAddress, Name} = check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port),
110+
start_listener(Host, Port, "TCP Listener",
111+
{?MODULE, start_client, []}).
112+
113+
start_ssl_listener(Host, Port, SslOpts) ->
114+
start_listener(Host, Port, "SSL Listener",
115+
{?MODULE, ssl_connection_upgrade, [SslOpts]}).
116+
117+
start_listener(Host, Port, Label, OnConnect) ->
118+
{IPAddress, Name} =
119+
check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port),
100120
{ok,_} = supervisor:start_child(
101121
rabbit_sup,
102122
{Name,
103123
{tcp_listener_sup, start_link,
104-
[IPAddress, Port,
105-
[binary,
106-
{packet, raw}, % no packaging
107-
{reuseaddr, true}, % allow rebind without waiting
108-
%% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg.
109-
%% {delay_send, true},
110-
{exit_on_close, false}],
124+
[IPAddress, Port, ?RABBIT_TCP_OPTS ,
111125
{?MODULE, tcp_listener_started, []},
112126
{?MODULE, tcp_listener_stopped, []},
113-
{?MODULE, start_client, []}]},
127+
OnConnect, Label]},
114128
transient, infinity, supervisor, [tcp_listener_sup]}),
115129
ok.
116130

@@ -148,10 +162,27 @@ on_node_down(Node) ->
148162

149163
start_client(Sock) ->
150164
{ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
151-
ok = gen_tcp:controlling_process(Sock, Child),
165+
ok = rabbit_net:controlling_process(Sock, Child),
152166
Child ! {go, Sock},
153167
Child.
154168

169+
ssl_connection_upgrade(SslOpts, Sock) ->
170+
{ok, {PeerAddress, PeerPort}} = rabbit_net:peername(Sock),
171+
PeerIp = inet_parse:ntoa(PeerAddress),
172+
173+
case ssl:ssl_accept(Sock, SslOpts) of
174+
{ok, SslSock} ->
175+
rabbit_log:info("upgraded TCP connection from ~s:~p to SSL~n",
176+
[PeerIp, PeerPort]),
177+
RabbitSslSock = #ssl_socket{tcp = Sock, ssl = SslSock},
178+
start_client(RabbitSslSock);
179+
{error, Reason} ->
180+
gen_tcp:close(Sock),
181+
rabbit_log:error("failed to upgrade TCP connection from ~s:~p "
182+
"to SSL: ~n~p~n", [PeerIp, PeerPort, Reason]),
183+
{error, Reason}
184+
end.
185+
155186
connections() ->
156187
[Pid || {_, Pid, _, _} <- supervisor:which_children(
157188
rabbit_tcp_client_sup)].

src/rabbit_reader.erl

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
200200

201201
peername(Sock) ->
202202
try
203-
{Address, Port} = inet_op(fun () -> inet:peername(Sock) end),
203+
{Address, Port} = inet_op(fun () -> rabbit_net:peername(Sock) end),
204204
AddressS = inet_parse:ntoa(Address),
205205
{AddressS, Port}
206206
catch
@@ -323,8 +323,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
323323
end.
324324

325325
switch_callback(OldState, NewCallback, Length) ->
326-
Ref = inet_op(fun () -> prim_inet:async_recv(
327-
OldState#v1.sock, Length, -1) end),
326+
Ref = inet_op(fun () -> rabbit_net:async_recv(
327+
OldState#v1.sock, Length, infinity) end),
328328
OldState#v1{callback = NewCallback,
329329
recv_ref = Ref}.
330330

@@ -539,7 +539,7 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
539539
end;
540540

541541
handle_input(handshake, Other, #v1{sock = Sock}) ->
542-
ok = inet_op(fun () -> gen_tcp:send(
542+
ok = inet_op(fun () -> rabbit_net:send(
543543
Sock, <<"AMQP",1,1,
544544
?PROTOCOL_VERSION_MAJOR,
545545
?PROTOCOL_VERSION_MINOR>>) end),
@@ -675,23 +675,23 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
675675
i(pid, #v1{}) ->
676676
self();
677677
i(address, #v1{sock = Sock}) ->
678-
{ok, {A, _}} = inet:sockname(Sock),
678+
{ok, {A, _}} = rabbit_net:sockname(Sock),
679679
A;
680680
i(port, #v1{sock = Sock}) ->
681-
{ok, {_, P}} = inet:sockname(Sock),
681+
{ok, {_, P}} = rabbit_net:sockname(Sock),
682682
P;
683683
i(peer_address, #v1{sock = Sock}) ->
684-
{ok, {A, _}} = inet:peername(Sock),
684+
{ok, {A, _}} = rabbit_net:peername(Sock),
685685
A;
686686
i(peer_port, #v1{sock = Sock}) ->
687-
{ok, {_, P}} = inet:peername(Sock),
687+
{ok, {_, P}} = rabbit_net:peername(Sock),
688688
P;
689689
i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
690690
SockStat =:= recv_cnt;
691691
SockStat =:= send_oct;
692692
SockStat =:= send_cnt;
693693
SockStat =:= send_pend ->
694-
case inet:getstat(Sock, [SockStat]) of
694+
case rabbit_net:getstat(Sock, [SockStat]) of
695695
{ok, [{SockStat, StatVal}]} -> StatVal;
696696
{error, einval} -> undefined;
697697
{error, Error} -> throw({cannot_get_socket_stats, Error})

src/rabbit_writer.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax) ->
169169

170170
tcp_send(Sock, Data) ->
171171
rabbit_misc:throw_on_error(inet_error,
172-
fun () -> gen_tcp:send(Sock, Data) end).
172+
fun () -> rabbit_net:send(Sock, Data) end).
173173

174174
internal_send_command(Sock, Channel, MethodRecord) ->
175175
ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)).
@@ -206,6 +206,6 @@ internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) ->
206206
ok.
207207

208208
port_cmd(Sock, Data) ->
209-
try erlang:port_command(Sock, Data)
209+
try rabbit_net:port_command(Sock, Data)
210210
catch error:Error -> exit({writer, send_failed, Error})
211211
end.

0 commit comments

Comments
 (0)