Skip to content

Commit 5344410

Browse files
committed
Add dynamic buffer functionality to rabbit_reader
The `buffer` socket option will be changed dynamically based on how much data is received. This is restricted to AMQP protocols (old and 1.0). The algorithm is a little different than Cowboy 2.13. The moving average is less reactive (div 8 instead of 2) and floats are used so that using smaller lower buffer values is possible (otherwise the rounding prevents increasing buffer sizes). The lower buffer size was set to 128 as a result. Compared to the previous which was to set `buffer` to `rcvbuf` effectively, often to 131072 on Linux for example, the performance sees a slight improvement in various scenarios for all message sizes using AMQP-0.9.1 and a lower memory usage as well. But the difference is small in the benchmarks we have run (5% to 10%), whereas Cowboy saw a huge improvement because its default was very small (1460). For AMQP-1.0 this seems to be no worse but we didn't detect a clear improvement. We saw scenarios where small message sizes showed improvement, and large message sizes showed a regression. But we are even less confident with these results. David (AMQP-1.0 native developer) ran a few tests and didn't see a regression. The dynamic buffer code is currently identical for old and 1.0 AMQP. But we might tweak them differently in the future so they're left as duplicate for now. This is because different protocols have different behaviors and so the algorithm may need to be tweaked differently for each protocol.
1 parent cdc042a commit 5344410

File tree

4 files changed

+94
-13
lines changed

4 files changed

+94
-13
lines changed

deps/rabbit/include/rabbit_amqp_reader.hrl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,10 @@
5959
buf :: list(),
6060
buf_len :: non_neg_integer(),
6161
tracked_channels = maps:new() :: #{channel_number() => Session :: pid()},
62-
stats_timer :: rabbit_event:state()
62+
stats_timer :: rabbit_event:state(),
63+
%% dynamic buffer
64+
dynamic_buffer_size = 128,
65+
dynamic_buffer_moving_average = 0.0
6366
}).
6467

6568
-type state() :: #v1{}.

deps/rabbit/src/rabbit_amqp_reader.erl

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,10 @@ recvloop(Deb, State0 = #v1{recv_len = RecvLen,
111111
mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
112112
case rabbit_net:recv(Sock) of
113113
{data, Data} ->
114-
recvloop(Deb, State#v1{buf = [Data | Buf],
115-
buf_len = BufLen + size(Data),
116-
pending_recv = false});
114+
State1 = maybe_resize_buffer(State, Data),
115+
recvloop(Deb, State1#v1{buf = [Data | Buf],
116+
buf_len = BufLen + size(Data),
117+
pending_recv = false});
117118
closed when State#v1.connection_state =:= closed ->
118119
ok;
119120
closed ->
@@ -130,6 +131,37 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
130131
end
131132
end.
132133

134+
maybe_resize_buffer(State=#v1{sock=Sock, dynamic_buffer_size=BufferSize0,
135+
dynamic_buffer_moving_average=MovingAvg0}, Data) ->
136+
LowDynamicBuffer = 128,
137+
HighDynamicBuffer = 131072,
138+
DataLen = byte_size(Data),
139+
MovingAvg = (MovingAvg0 * 7 + DataLen) / 8,
140+
if
141+
BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 ->
142+
BufferSize = min(BufferSize0 * 2, HighDynamicBuffer),
143+
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
144+
ok -> State#v1{
145+
dynamic_buffer_size=BufferSize,
146+
dynamic_buffer_moving_average=MovingAvg
147+
};
148+
{error, Reason} ->
149+
throw({inet_error, Reason})
150+
end;
151+
BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 ->
152+
BufferSize = max(BufferSize0 div 2, LowDynamicBuffer),
153+
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
154+
ok -> State#v1{
155+
dynamic_buffer_size=BufferSize,
156+
dynamic_buffer_moving_average=MovingAvg
157+
};
158+
{error, Reason} ->
159+
throw({inet_error, Reason})
160+
end;
161+
true ->
162+
State#v1{dynamic_buffer_moving_average=MovingAvg}
163+
end.
164+
133165
-spec handle_other(any(), state()) -> state() | stop.
134166
handle_other(emit_stats, State) ->
135167
emit_stats(State);

deps/rabbit/src/rabbit_networking.erl

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
close_connection/2, close_connections/2, close_all_connections/1,
3333
close_all_user_connections/2,
3434
force_connection_event_refresh/1, force_non_amqp_connection_event_refresh/1,
35-
handshake/2, tcp_host/1,
35+
handshake/2, handshake/3, tcp_host/1,
3636
ranch_ref/1, ranch_ref/2, ranch_ref_of_protocol/1,
3737
listener_of_protocol/1, stop_ranch_listener_of_protocol/1,
3838
list_local_connections_of_protocol/1]).
@@ -551,6 +551,9 @@ failed_to_recv_proxy_header(Ref, Error) ->
551551
exit({shutdown, failed_to_recv_proxy_header}).
552552

553553
handshake(Ref, ProxyProtocolEnabled) ->
554+
handshake(Ref, ProxyProtocolEnabled, static_buffer).
555+
556+
handshake(Ref, ProxyProtocolEnabled, BufferStrategy) ->
554557
case ProxyProtocolEnabled of
555558
true ->
556559
case ranch:recv_proxy_header(Ref, 3000) of
@@ -560,23 +563,29 @@ handshake(Ref, ProxyProtocolEnabled) ->
560563
failed_to_recv_proxy_header(Ref, Error);
561564
{ok, ProxyInfo} ->
562565
{ok, Sock} = ranch:handshake(Ref),
563-
ok = tune_buffer_size(Sock),
566+
ok = tune_buffer_size(Sock, BufferStrategy),
564567
{ok, {rabbit_proxy_socket, Sock, ProxyInfo}}
565568
end;
566569
false ->
567570
{ok, Sock} = ranch:handshake(Ref),
568-
ok = tune_buffer_size(Sock),
571+
ok = tune_buffer_size(Sock, BufferStrategy),
569572
{ok, Sock}
570573
end.
571574

572-
tune_buffer_size(Sock) ->
573-
case tune_buffer_size1(Sock) of
575+
tune_buffer_size(Sock, dynamic_buffer) ->
576+
case rabbit_net:setopts(Sock, [{buffer, 128}]) of
577+
ok -> ok;
578+
{error, _} -> rabbit_net:fast_close(Sock),
579+
exit(normal)
580+
end;
581+
tune_buffer_size(Sock, static_buffer) ->
582+
case tune_buffer_size_static(Sock) of
574583
ok -> ok;
575584
{error, _} -> rabbit_net:fast_close(Sock),
576585
exit(normal)
577586
end.
578587

579-
tune_buffer_size1(Sock) ->
588+
tune_buffer_size_static(Sock) ->
580589
case rabbit_net:getopts(Sock, [sndbuf, recbuf, buffer]) of
581590
{ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
582591
rabbit_net:setopts(Sock, [{buffer, BufSz}]);

deps/rabbit/src/rabbit_reader.erl

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,11 @@
9999
%% throttling state, for both
100100
%% credit- and resource-driven flow control
101101
throttle,
102-
proxy_socket}).
102+
proxy_socket,
103+
%% dynamic buffer
104+
dynamic_buffer_size = 128,
105+
dynamic_buffer_moving_average = 0.0
106+
}).
103107

104108
-record(throttle, {
105109
%% never | timestamp()
@@ -155,7 +159,8 @@ shutdown(Pid, Explanation) ->
155159
init(Parent, HelperSups, Ref) ->
156160
?LG_PROCESS_TYPE(reader),
157161
{ok, Sock} = rabbit_networking:handshake(Ref,
158-
application:get_env(rabbit, proxy_protocol, false)),
162+
application:get_env(rabbit, proxy_protocol, false),
163+
dynamic_buffer),
159164
Deb = sys:debug_options([]),
160165
start_connection(Parent, HelperSups, Ref, Deb, Sock).
161166

@@ -512,8 +517,9 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock,
512517
end,
513518
case Recv of
514519
{data, Data} ->
520+
State1 = maybe_resize_buffer(State, Data),
515521
recvloop(Deb, [Data | Buf], BufLen + size(Data),
516-
State#v1{pending_recv = false});
522+
State1#v1{pending_recv = false});
517523
closed when State#v1.connection_state =:= closed ->
518524
State;
519525
closed when CS =:= pre_init andalso Buf =:= [] ->
@@ -536,6 +542,37 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock,
536542
end
537543
end.
538544

545+
maybe_resize_buffer(State=#v1{sock=Sock, dynamic_buffer_size=BufferSize0,
546+
dynamic_buffer_moving_average=MovingAvg0}, Data) ->
547+
LowDynamicBuffer = 128,
548+
HighDynamicBuffer = 131072,
549+
DataLen = byte_size(Data),
550+
MovingAvg = (MovingAvg0 * 7 + DataLen) / 8,
551+
if
552+
BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 ->
553+
BufferSize = min(BufferSize0 * 2, HighDynamicBuffer),
554+
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
555+
ok -> State#v1{
556+
dynamic_buffer_size=BufferSize,
557+
dynamic_buffer_moving_average=MovingAvg
558+
};
559+
Error ->
560+
stop(Error, State)
561+
end;
562+
BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 ->
563+
BufferSize = max(BufferSize0 div 2, LowDynamicBuffer),
564+
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
565+
ok -> State#v1{
566+
dynamic_buffer_size=BufferSize,
567+
dynamic_buffer_moving_average=MovingAvg
568+
};
569+
Error ->
570+
stop(Error, State)
571+
end;
572+
true ->
573+
State#v1{dynamic_buffer_moving_average=MovingAvg}
574+
end.
575+
539576
-spec stop(_, #v1{}) -> no_return().
540577
stop(tcp_healthcheck, State) ->
541578
%% The connection was closed before any packet was received. It's

0 commit comments

Comments
 (0)