From 2e4c58e1f71b2541fc77ff82c9abaff62e241e38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Tue, 18 Feb 2025 14:48:00 +0100 Subject: [PATCH] Add dynamic buffer functionality to rabbit_reader The `buffer` socket option will be changed dynamically based on how much data is received. This is restricted to AMQP protocols (old and 1.0). The algorithm is a little different than Cowboy 2.13. The moving average is less reactive (div 8 instead of 2) and floats are used so that using smaller lower buffer values is possible (otherwise the rounding prevents increasing buffer sizes). The lower buffer size was set to 128 as a result. Compared to the previous which was to set `buffer` to `rcvbuf` effectively, often to 131072 on Linux for example, the performance sees a slight improvement in various scenarios for all message sizes using AMQP-0.9.1 and a lower memory usage as well. But the difference is small in the benchmarks we have run (5% to 10%), whereas Cowboy saw a huge improvement because its default was very small (1460). For AMQP-1.0 this seems to be no worse but we didn't detect a clear improvement. We saw scenarios where small message sizes showed improvement, and large message sizes showed a regression. But we are even less confident with these results. David (AMQP-1.0 native developer) ran a few tests and didn't see a regression. The dynamic buffer code is currently identical for old and 1.0 AMQP. But we might tweak them differently in the future so they're left as duplicate for now. This is because different protocols have different behaviors and so the algorithm may need to be tweaked differently for each protocol. (cherry picked from commit 53444107b576a18bb80c65d92fc99f33893606db) --- deps/rabbit/include/rabbit_amqp_reader.hrl | 5 ++- deps/rabbit/src/rabbit_amqp_reader.erl | 38 +++++++++++++++++-- deps/rabbit/src/rabbit_networking.erl | 21 ++++++++--- deps/rabbit/src/rabbit_reader.erl | 43 ++++++++++++++++++++-- 4 files changed, 94 insertions(+), 13 deletions(-) diff --git a/deps/rabbit/include/rabbit_amqp_reader.hrl b/deps/rabbit/include/rabbit_amqp_reader.hrl index 732bc9f04398..4b1500d00e8a 100644 --- a/deps/rabbit/include/rabbit_amqp_reader.hrl +++ b/deps/rabbit/include/rabbit_amqp_reader.hrl @@ -59,7 +59,10 @@ buf :: list(), buf_len :: non_neg_integer(), tracked_channels = maps:new() :: #{channel_number() => Session :: pid()}, - stats_timer :: rabbit_event:state() + stats_timer :: rabbit_event:state(), + %% dynamic buffer + dynamic_buffer_size = 128, + dynamic_buffer_moving_average = 0.0 }). -type state() :: #v1{}. diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index 3e5d5cc08dd7..b92ba8d3ce6a 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -111,9 +111,10 @@ recvloop(Deb, State0 = #v1{recv_len = RecvLen, mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> case rabbit_net:recv(Sock) of {data, Data} -> - recvloop(Deb, State#v1{buf = [Data | Buf], - buf_len = BufLen + size(Data), - pending_recv = false}); + State1 = maybe_resize_buffer(State, Data), + recvloop(Deb, State1#v1{buf = [Data | Buf], + buf_len = BufLen + size(Data), + pending_recv = false}); closed when State#v1.connection_state =:= closed -> ok; closed -> @@ -130,6 +131,37 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> end end. +maybe_resize_buffer(State=#v1{sock=Sock, dynamic_buffer_size=BufferSize0, + dynamic_buffer_moving_average=MovingAvg0}, Data) -> + LowDynamicBuffer = 128, + HighDynamicBuffer = 131072, + DataLen = byte_size(Data), + MovingAvg = (MovingAvg0 * 7 + DataLen) / 8, + if + BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 -> + BufferSize = min(BufferSize0 * 2, HighDynamicBuffer), + case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of + ok -> State#v1{ + dynamic_buffer_size=BufferSize, + dynamic_buffer_moving_average=MovingAvg + }; + {error, Reason} -> + throw({inet_error, Reason}) + end; + BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 -> + BufferSize = max(BufferSize0 div 2, LowDynamicBuffer), + case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of + ok -> State#v1{ + dynamic_buffer_size=BufferSize, + dynamic_buffer_moving_average=MovingAvg + }; + {error, Reason} -> + throw({inet_error, Reason}) + end; + true -> + State#v1{dynamic_buffer_moving_average=MovingAvg} + end. + -spec handle_other(any(), state()) -> state() | stop. handle_other(emit_stats, State) -> emit_stats(State); diff --git a/deps/rabbit/src/rabbit_networking.erl b/deps/rabbit/src/rabbit_networking.erl index 16576f9b6b57..a2a01ab822e2 100644 --- a/deps/rabbit/src/rabbit_networking.erl +++ b/deps/rabbit/src/rabbit_networking.erl @@ -32,7 +32,7 @@ close_connection/2, close_connections/2, close_all_connections/1, close_all_user_connections/2, force_connection_event_refresh/1, force_non_amqp_connection_event_refresh/1, - handshake/2, tcp_host/1, + handshake/2, handshake/3, tcp_host/1, ranch_ref/1, ranch_ref/2, ranch_ref_of_protocol/1, listener_of_protocol/1, stop_ranch_listener_of_protocol/1, list_local_connections_of_protocol/1]). @@ -551,6 +551,9 @@ failed_to_recv_proxy_header(Ref, Error) -> exit({shutdown, failed_to_recv_proxy_header}). handshake(Ref, ProxyProtocolEnabled) -> + handshake(Ref, ProxyProtocolEnabled, static_buffer). + +handshake(Ref, ProxyProtocolEnabled, BufferStrategy) -> case ProxyProtocolEnabled of true -> case ranch:recv_proxy_header(Ref, 3000) of @@ -560,23 +563,29 @@ handshake(Ref, ProxyProtocolEnabled) -> failed_to_recv_proxy_header(Ref, Error); {ok, ProxyInfo} -> {ok, Sock} = ranch:handshake(Ref), - ok = tune_buffer_size(Sock), + ok = tune_buffer_size(Sock, BufferStrategy), {ok, {rabbit_proxy_socket, Sock, ProxyInfo}} end; false -> {ok, Sock} = ranch:handshake(Ref), - ok = tune_buffer_size(Sock), + ok = tune_buffer_size(Sock, BufferStrategy), {ok, Sock} end. -tune_buffer_size(Sock) -> - case tune_buffer_size1(Sock) of +tune_buffer_size(Sock, dynamic_buffer) -> + case rabbit_net:setopts(Sock, [{buffer, 128}]) of + ok -> ok; + {error, _} -> rabbit_net:fast_close(Sock), + exit(normal) + end; +tune_buffer_size(Sock, static_buffer) -> + case tune_buffer_size_static(Sock) of ok -> ok; {error, _} -> rabbit_net:fast_close(Sock), exit(normal) end. -tune_buffer_size1(Sock) -> +tune_buffer_size_static(Sock) -> case rabbit_net:getopts(Sock, [sndbuf, recbuf, buffer]) of {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]), rabbit_net:setopts(Sock, [{buffer, BufSz}]); diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index 723ca4b5df58..276b6fa03ffc 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -99,7 +99,11 @@ %% throttling state, for both %% credit- and resource-driven flow control throttle, - proxy_socket}). + proxy_socket, + %% dynamic buffer + dynamic_buffer_size = 128, + dynamic_buffer_moving_average = 0.0 +}). -record(throttle, { %% never | timestamp() @@ -155,7 +159,8 @@ shutdown(Pid, Explanation) -> init(Parent, HelperSups, Ref) -> ?LG_PROCESS_TYPE(reader), {ok, Sock} = rabbit_networking:handshake(Ref, - application:get_env(rabbit, proxy_protocol, false)), + application:get_env(rabbit, proxy_protocol, false), + dynamic_buffer), Deb = sys:debug_options([]), start_connection(Parent, HelperSups, Ref, Deb, Sock). @@ -512,8 +517,9 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock, end, case Recv of {data, Data} -> + State1 = maybe_resize_buffer(State, Data), recvloop(Deb, [Data | Buf], BufLen + size(Data), - State#v1{pending_recv = false}); + State1#v1{pending_recv = false}); closed when State#v1.connection_state =:= closed -> State; closed when CS =:= pre_init andalso Buf =:= [] -> @@ -536,6 +542,37 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock, end end. +maybe_resize_buffer(State=#v1{sock=Sock, dynamic_buffer_size=BufferSize0, + dynamic_buffer_moving_average=MovingAvg0}, Data) -> + LowDynamicBuffer = 128, + HighDynamicBuffer = 131072, + DataLen = byte_size(Data), + MovingAvg = (MovingAvg0 * 7 + DataLen) / 8, + if + BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 -> + BufferSize = min(BufferSize0 * 2, HighDynamicBuffer), + case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of + ok -> State#v1{ + dynamic_buffer_size=BufferSize, + dynamic_buffer_moving_average=MovingAvg + }; + Error -> + stop(Error, State) + end; + BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 -> + BufferSize = max(BufferSize0 div 2, LowDynamicBuffer), + case rabbit_net:setopts(Sock, [{buffer, BufferSize}]) of + ok -> State#v1{ + dynamic_buffer_size=BufferSize, + dynamic_buffer_moving_average=MovingAvg + }; + Error -> + stop(Error, State) + end; + true -> + State#v1{dynamic_buffer_moving_average=MovingAvg} + end. + -spec stop(_, #v1{}) -> no_return(). stop(tcp_healthcheck, State) -> %% The connection was closed before any packet was received. It's