Skip to content

Commit fcaf9a8

Browse files
Merge branch 'nyczol-configurable_gc_threshold'
(cherry picked from commit e86b5a6)
1 parent dd6e8ba commit fcaf9a8

File tree

3 files changed

+24
-10
lines changed

3 files changed

+24
-10
lines changed

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,9 @@ define PROJECT_ENV
129129
%% interval at which the channel can perform periodic actions
130130
{channel_tick_interval, 60000},
131131
%% Default max message size is 128 MB
132-
{max_message_size, 134217728}
132+
{max_message_size, 134217728},
133+
%% Socket writer will run GC every 1 GB of outgoing data
134+
{gc_threshold, 1000000000}
133135
]
134136
endef
135137

src/rabbit_basic.erl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
message/3, message/4, properties/1, prepend_table_header/3,
2323
extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4,
2424
header_routes/1, parse_expiration/1, header/2, header/3]).
25-
-export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]).
25+
-export([build_content/2, from_content/1, msg_size/1,
26+
maybe_gc_large_msg/1, maybe_gc_large_msg/2]).
2627
-export([add_header/4]).
2728

2829
%%----------------------------------------------------------------------------
@@ -311,6 +312,11 @@ parse_expiration(#'P_basic'{expiration = Expiration}) ->
311312
maybe_gc_large_msg(Content) ->
312313
rabbit_writer:maybe_gc_large_msg(Content).
313314

315+
maybe_gc_large_msg(Content, undefined) ->
316+
rabbit_writer:msg_size(Content);
317+
maybe_gc_large_msg(Content, GCThreshold) ->
318+
rabbit_writer:maybe_gc_large_msg(Content, GCThreshold).
319+
314320
msg_size(Content) ->
315321
rabbit_writer:msg_size(Content).
316322

src/rabbit_channel.erl

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,9 @@
169169
delivery_flow,
170170
interceptor_state,
171171
queue_states,
172-
tick_timer
172+
tick_timer,
173+
%% defines how ofter gc will be executed
174+
gc_threshold
173175
}).
174176

175177
-define(QUEUE, lqueue).
@@ -508,6 +510,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
508510
MaxMessageSize = get_max_message_size(),
509511
ConsumerTimeout = get_consumer_timeout(),
510512
OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams),
513+
{ok, GCThreshold} = application:get_env(rabbit, gc_threshold),
511514
State = #ch{cfg = #conf{state = starting,
512515
protocol = Protocol,
513516
channel = Channel,
@@ -543,7 +546,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
543546
reply_consumer = none,
544547
delivery_flow = Flow,
545548
interceptor_state = undefined,
546-
queue_states = #{}
549+
queue_states = #{},
550+
gc_threshold = GCThreshold
547551
},
548552
State1 = State#ch{
549553
interceptor_state = rabbit_channel_interceptor:init(State)},
@@ -1110,8 +1114,8 @@ extract_variable_map_from_amqp_params([Value]) ->
11101114
extract_variable_map_from_amqp_params(_) ->
11111115
#{}.
11121116

1113-
check_msg_size(Content, MaxMessageSize) ->
1114-
Size = rabbit_basic:maybe_gc_large_msg(Content),
1117+
check_msg_size(Content, MaxMessageSize, GCThreshold) ->
1118+
Size = rabbit_basic:maybe_gc_large_msg(Content, GCThreshold),
11151119
case Size of
11161120
S when S > MaxMessageSize ->
11171121
ErrorMessage = case MaxMessageSize of
@@ -1309,9 +1313,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
13091313
},
13101314
tx = Tx,
13111315
confirm_enabled = ConfirmEnabled,
1312-
delivery_flow = Flow
1316+
delivery_flow = Flow,
1317+
gc_threshold = GCThreshold
13131318
}) ->
1314-
check_msg_size(Content, MaxMessageSize),
1319+
check_msg_size(Content, MaxMessageSize, GCThreshold),
13151320
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
13161321
check_write_permitted(ExchangeName, User, AuthzContext),
13171322
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -2689,7 +2694,8 @@ handle_deliver(ConsumerTag, AckRequired,
26892694
routing_keys = [RoutingKey | _CcRoutes],
26902695
content = Content}},
26912696
State = #ch{cfg = #conf{writer_pid = WriterPid},
2692-
next_tag = DeliveryTag}) ->
2697+
next_tag = DeliveryTag,
2698+
gc_threshold = GCThreshold}) ->
26932699
Deliver = #'basic.deliver'{consumer_tag = ConsumerTag,
26942700
delivery_tag = DeliveryTag,
26952701
redelivered = Redelivered,
@@ -2702,7 +2708,7 @@ handle_deliver(ConsumerTag, AckRequired,
27022708
false ->
27032709
ok = rabbit_writer:send_command(WriterPid, Deliver, Content)
27042710
end,
2705-
rabbit_basic:maybe_gc_large_msg(Content),
2711+
rabbit_basic:maybe_gc_large_msg(Content, GCThreshold),
27062712
record_sent(deliver, ConsumerTag, AckRequired, Msg, State).
27072713

27082714
handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,

0 commit comments

Comments
 (0)