Skip to content

Commit 10dd13f

Browse files
committed
Add dynamic buffer functionality to rabbit_reader
1 parent 99a09df commit 10dd13f

File tree

2 files changed

+46
-9
lines changed

2 files changed

+46
-9
lines changed

deps/rabbit/src/rabbit_networking.erl

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -570,18 +570,19 @@ handshake(Ref, ProxyProtocolEnabled) ->
570570
end.
571571

572572
tune_buffer_size(Sock) ->
573-
case tune_buffer_size1(Sock) of
573+
% case tune_buffer_size1(Sock) of
574+
case rabbit_net:setopts(Sock, [{buffer, 1024}]) of
574575
ok -> ok;
575576
{error, _} -> rabbit_net:fast_close(Sock),
576577
exit(normal)
577578
end.
578579

579-
tune_buffer_size1(Sock) ->
580-
case rabbit_net:getopts(Sock, [sndbuf, recbuf, buffer]) of
581-
{ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
582-
rabbit_net:setopts(Sock, [{buffer, BufSz}]);
583-
Error -> Error
584-
end.
580+
%tune_buffer_size1(Sock) ->
581+
% case rabbit_net:getopts(Sock, [sndbuf, recbuf, buffer]) of
582+
% {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
583+
% rabbit_net:setopts(Sock, [{buffer, BufSz}]);
584+
% Error -> Error
585+
% end.
585586

586587
%%--------------------------------------------------------------------
587588

deps/rabbit/src/rabbit_reader.erl

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,11 @@
9999
%% throttling state, for both
100100
%% credit- and resource-driven flow control
101101
throttle,
102-
proxy_socket}).
102+
proxy_socket,
103+
%% dynamic buffer
104+
dynamic_buffer_size,
105+
dynamic_buffer_moving_average
106+
}).
103107

104108
-record(throttle, {
105109
%% never | timestamp()
@@ -512,8 +516,9 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock,
512516
end,
513517
case Recv of
514518
{data, Data} ->
519+
State1 = maybe_resize_buffer(State, Data),
515520
recvloop(Deb, [Data | Buf], BufLen + size(Data),
516-
State#v1{pending_recv = false});
521+
State1#v1{pending_recv = false});
517522
closed when State#v1.connection_state =:= closed ->
518523
State;
519524
closed when CS =:= pre_init andalso Buf =:= [] ->
@@ -536,6 +541,37 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock,
536541
end
537542
end.
538543

544+
maybe_resize_buffer(State=#v1{sock=Sock, dynamic_buffer_size=BufferSize0,
545+
dynamic_buffer_moving_average=MovingAvg0}, Data) ->
546+
LowDynamicBuffer = 1024,
547+
HighDynamicBuffer = 131072,
548+
DataLen = byte_size(Data),
549+
MovingAvg = (MovingAvg0 + DataLen) div 2,
550+
if
551+
BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 ->
552+
BufferSize = min(BufferSize0 * 2, HighDynamicBuffer),
553+
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
554+
ok -> State#v1{
555+
dynamic_buffer_size=BufferSize,
556+
dynamic_buffer_moving_average=MovingAvg
557+
};
558+
Error ->
559+
stop(Error, State)
560+
end;
561+
BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 ->
562+
BufferSize = max(BufferSize0 div 2, LowDynamicBuffer),
563+
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
564+
ok -> State#v1{
565+
dynamic_buffer_size=BufferSize,
566+
dynamic_buffer_moving_average=MovingAvg
567+
};
568+
Error ->
569+
stop(Error, State)
570+
end;
571+
true ->
572+
State#v1{dynamic_buffer_moving_average=MovingAvg}
573+
end.
574+
539575
-spec stop(_, #v1{}) -> no_return().
540576
stop(tcp_healthcheck, State) ->
541577
%% The connection was closed before any packet was received. It's

0 commit comments

Comments
 (0)