Skip to content

Commit f988562

Browse files
committed
Implement in 1.0
1 parent 6966ed5 commit f988562

File tree

3 files changed

+39
-6
lines changed

3 files changed

+39
-6
lines changed

deps/rabbit/include/rabbit_amqp_reader.hrl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,10 @@
5959
buf :: list(),
6060
buf_len :: non_neg_integer(),
6161
tracked_channels = maps:new() :: #{channel_number() => Session :: pid()},
62-
stats_timer :: rabbit_event:state()
62+
stats_timer :: rabbit_event:state(),
63+
%% dynamic buffer
64+
dynamic_buffer_size = 128,
65+
dynamic_buffer_moving_average = 0.0
6366
}).
6467

6568
-type state() :: #v1{}.

deps/rabbit/src/rabbit_amqp_reader.erl

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,10 @@ recvloop(Deb, State0 = #v1{recv_len = RecvLen,
111111
mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
112112
case rabbit_net:recv(Sock) of
113113
{data, Data} ->
114-
recvloop(Deb, State#v1{buf = [Data | Buf],
115-
buf_len = BufLen + size(Data),
116-
pending_recv = false});
114+
State1 = maybe_resize_buffer(State, Data),
115+
recvloop(Deb, State1#v1{buf = [Data | Buf],
116+
buf_len = BufLen + size(Data),
117+
pending_recv = false});
117118
closed when State#v1.connection_state =:= closed ->
118119
ok;
119120
closed ->
@@ -130,6 +131,37 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
130131
end
131132
end.
132133

134+
maybe_resize_buffer(State=#v1{sock=Sock, dynamic_buffer_size=BufferSize0,
135+
dynamic_buffer_moving_average=MovingAvg0}, Data) ->
136+
LowDynamicBuffer = 128,
137+
HighDynamicBuffer = 131072,
138+
DataLen = byte_size(Data),
139+
MovingAvg = (MovingAvg0 * 7 + DataLen) / 8,
140+
if
141+
BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 ->
142+
BufferSize = min(BufferSize0 * 2, HighDynamicBuffer),
143+
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
144+
ok -> State#v1{
145+
dynamic_buffer_size=BufferSize,
146+
dynamic_buffer_moving_average=MovingAvg
147+
};
148+
{error, Reason} ->
149+
throw({inet_error, Reason})
150+
end;
151+
BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 ->
152+
BufferSize = max(BufferSize0 div 2, LowDynamicBuffer),
153+
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
154+
ok -> State#v1{
155+
dynamic_buffer_size=BufferSize,
156+
dynamic_buffer_moving_average=MovingAvg
157+
};
158+
{error, Reason} ->
159+
throw({inet_error, Reason})
160+
end;
161+
true ->
162+
State#v1{dynamic_buffer_moving_average=MovingAvg}
163+
end.
164+
133165
-spec handle_other(any(), state()) -> state() | stop.
134166
handle_other(emit_stats, State) ->
135167
emit_stats(State);

deps/rabbit/src/rabbit_reader.erl

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,6 @@ maybe_resize_buffer(State=#v1{sock=Sock, dynamic_buffer_size=BufferSize0,
551551
if
552552
BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 ->
553553
BufferSize = min(BufferSize0 * 2, HighDynamicBuffer),
554-
% erlang:display({up,BufferSize}),
555554
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
556555
ok -> State#v1{
557556
dynamic_buffer_size=BufferSize,
@@ -562,7 +561,6 @@ maybe_resize_buffer(State=#v1{sock=Sock, dynamic_buffer_size=BufferSize0,
562561
end;
563562
BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 ->
564563
BufferSize = max(BufferSize0 div 2, LowDynamicBuffer),
565-
% erlang:display({down,BufferSize}),
566564
case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of
567565
ok -> State#v1{
568566
dynamic_buffer_size=BufferSize,

0 commit comments

Comments
 (0)