Skip to content

Commit c19fc0e

Browse files
authored
Merge pull request #13436 from rabbitmq/mergify/bp/v4.1.x/pr-13363
Add dynamic socket buffer functionality to rabbit_reader (backport #13363)
2 parents 60ab69c + 2e4c58e commit c19fc0e

File tree

4 files changed

+94
-13
lines changed

4 files changed

+94
-13
lines changed

deps/rabbit/include/rabbit_amqp_reader.hrl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,10 @@
5959
buf :: list(),
6060
buf_len :: non_neg_integer(),
6161
tracked_channels = maps:new() :: #{channel_number() => Session :: pid()},
62-
stats_timer :: rabbit_event:state()
62+
stats_timer :: rabbit_event:state(),
63+
%% dynamic buffer
64+
dynamic_buffer_size = 128,
65+
dynamic_buffer_moving_average = 0.0
6366
}).
6467

6568
-type state() :: #v1{}.

deps/rabbit/src/rabbit_amqp_reader.erl

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,10 @@ recvloop(Deb, State0 = #v1{recv_len = RecvLen,
111111
mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
112112
case rabbit_net:recv(Sock) of
113113
{data, Data} ->
114-
recvloop(Deb, State#v1{buf = [Data | Buf],
115-
buf_len = BufLen + size(Data),
116-
pending_recv = false});
114+
State1 = maybe_resize_buffer(State, Data),
115+
recvloop(Deb, State1#v1{buf = [Data | Buf],
116+
buf_len = BufLen + size(Data),
117+
pending_recv = false});
117118
closed when State#v1.connection_state =:= closed ->
118119
ok;
119120
closed ->
@@ -130,6 +131,37 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
130131
end
131132
end.
132133

134+
maybe_resize_buffer(State=#v1{sock=Sock, dynamic_buffer_size=BufferSize0,
135+
dynamic_buffer_moving_average=MovingAvg0}, Data) ->
136+
LowDynamicBuffer = 128,
137+
HighDynamicBuffer = 131072,
138+
DataLen = byte_size(Data),
139+
MovingAvg = (MovingAvg0 * 7 + DataLen) / 8,
140+
if
141+
BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 ->
142+
BufferSize = min(BufferSize0 * 2, HighDynamicBuffer),
143+
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
144+
ok -> State#v1{
145+
dynamic_buffer_size=BufferSize,
146+
dynamic_buffer_moving_average=MovingAvg
147+
};
148+
{error, Reason} ->
149+
throw({inet_error, Reason})
150+
end;
151+
BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 ->
152+
BufferSize = max(BufferSize0 div 2, LowDynamicBuffer),
153+
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
154+
ok -> State#v1{
155+
dynamic_buffer_size=BufferSize,
156+
dynamic_buffer_moving_average=MovingAvg
157+
};
158+
{error, Reason} ->
159+
throw({inet_error, Reason})
160+
end;
161+
true ->
162+
State#v1{dynamic_buffer_moving_average=MovingAvg}
163+
end.
164+
133165
-spec handle_other(any(), state()) -> state() | stop.
134166
handle_other(emit_stats, State) ->
135167
emit_stats(State);

deps/rabbit/src/rabbit_networking.erl

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
close_connection/2, close_connections/2, close_all_connections/1,
3333
close_all_user_connections/2,
3434
force_connection_event_refresh/1, force_non_amqp_connection_event_refresh/1,
35-
handshake/2, tcp_host/1,
35+
handshake/2, handshake/3, tcp_host/1,
3636
ranch_ref/1, ranch_ref/2, ranch_ref_of_protocol/1,
3737
listener_of_protocol/1, stop_ranch_listener_of_protocol/1,
3838
list_local_connections_of_protocol/1]).
@@ -551,6 +551,9 @@ failed_to_recv_proxy_header(Ref, Error) ->
551551
exit({shutdown, failed_to_recv_proxy_header}).
552552

553553
handshake(Ref, ProxyProtocolEnabled) ->
554+
handshake(Ref, ProxyProtocolEnabled, static_buffer).
555+
556+
handshake(Ref, ProxyProtocolEnabled, BufferStrategy) ->
554557
case ProxyProtocolEnabled of
555558
true ->
556559
case ranch:recv_proxy_header(Ref, 3000) of
@@ -560,23 +563,29 @@ handshake(Ref, ProxyProtocolEnabled) ->
560563
failed_to_recv_proxy_header(Ref, Error);
561564
{ok, ProxyInfo} ->
562565
{ok, Sock} = ranch:handshake(Ref),
563-
ok = tune_buffer_size(Sock),
566+
ok = tune_buffer_size(Sock, BufferStrategy),
564567
{ok, {rabbit_proxy_socket, Sock, ProxyInfo}}
565568
end;
566569
false ->
567570
{ok, Sock} = ranch:handshake(Ref),
568-
ok = tune_buffer_size(Sock),
571+
ok = tune_buffer_size(Sock, BufferStrategy),
569572
{ok, Sock}
570573
end.
571574

572-
tune_buffer_size(Sock) ->
573-
case tune_buffer_size1(Sock) of
575+
tune_buffer_size(Sock, dynamic_buffer) ->
576+
case rabbit_net:setopts(Sock, [{buffer, 128}]) of
577+
ok -> ok;
578+
{error, _} -> rabbit_net:fast_close(Sock),
579+
exit(normal)
580+
end;
581+
tune_buffer_size(Sock, static_buffer) ->
582+
case tune_buffer_size_static(Sock) of
574583
ok -> ok;
575584
{error, _} -> rabbit_net:fast_close(Sock),
576585
exit(normal)
577586
end.
578587

579-
tune_buffer_size1(Sock) ->
588+
tune_buffer_size_static(Sock) ->
580589
case rabbit_net:getopts(Sock, [sndbuf, recbuf, buffer]) of
581590
{ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
582591
rabbit_net:setopts(Sock, [{buffer, BufSz}]);

deps/rabbit/src/rabbit_reader.erl

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,11 @@
9999
%% throttling state, for both
100100
%% credit- and resource-driven flow control
101101
throttle,
102-
proxy_socket}).
102+
proxy_socket,
103+
%% dynamic buffer
104+
dynamic_buffer_size = 128,
105+
dynamic_buffer_moving_average = 0.0
106+
}).
103107

104108
-record(throttle, {
105109
%% never | timestamp()
@@ -155,7 +159,8 @@ shutdown(Pid, Explanation) ->
155159
init(Parent, HelperSups, Ref) ->
156160
?LG_PROCESS_TYPE(reader),
157161
{ok, Sock} = rabbit_networking:handshake(Ref,
158-
application:get_env(rabbit, proxy_protocol, false)),
162+
application:get_env(rabbit, proxy_protocol, false),
163+
dynamic_buffer),
159164
Deb = sys:debug_options([]),
160165
start_connection(Parent, HelperSups, Ref, Deb, Sock).
161166

@@ -512,8 +517,9 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock,
512517
end,
513518
case Recv of
514519
{data, Data} ->
520+
State1 = maybe_resize_buffer(State, Data),
515521
recvloop(Deb, [Data | Buf], BufLen + size(Data),
516-
State#v1{pending_recv = false});
522+
State1#v1{pending_recv = false});
517523
closed when State#v1.connection_state =:= closed ->
518524
State;
519525
closed when CS =:= pre_init andalso Buf =:= [] ->
@@ -536,6 +542,37 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock,
536542
end
537543
end.
538544

545+
maybe_resize_buffer(State=#v1{sock=Sock, dynamic_buffer_size=BufferSize0,
546+
dynamic_buffer_moving_average=MovingAvg0}, Data) ->
547+
LowDynamicBuffer = 128,
548+
HighDynamicBuffer = 131072,
549+
DataLen = byte_size(Data),
550+
MovingAvg = (MovingAvg0 * 7 + DataLen) / 8,
551+
if
552+
BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 ->
553+
BufferSize = min(BufferSize0 * 2, HighDynamicBuffer),
554+
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
555+
ok -> State#v1{
556+
dynamic_buffer_size=BufferSize,
557+
dynamic_buffer_moving_average=MovingAvg
558+
};
559+
Error ->
560+
stop(Error, State)
561+
end;
562+
BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 ->
563+
BufferSize = max(BufferSize0 div 2, LowDynamicBuffer),
564+
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
565+
ok -> State#v1{
566+
dynamic_buffer_size=BufferSize,
567+
dynamic_buffer_moving_average=MovingAvg
568+
};
569+
Error ->
570+
stop(Error, State)
571+
end;
572+
true ->
573+
State#v1{dynamic_buffer_moving_average=MovingAvg}
574+
end.
575+
539576
-spec stop(_, #v1{}) -> no_return().
540577
stop(tcp_healthcheck, State) ->
541578
%% The connection was closed before any packet was received. It's

0 commit comments

Comments
 (0)