Skip to content

Commit 3c410ba

Browse files
Merge branch 'nyczol-configurable_gc_threshold'
(cherry picked from commit e86b5a6) Conflicts: Makefile src/rabbit_basic.erl src/rabbit_channel.erl
1 parent b9ebc67 commit 3c410ba

File tree

3 files changed

+21
-8
lines changed

3 files changed

+21
-8
lines changed

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,9 @@ define PROJECT_ENV
123123
%% vhost had to shut down, see server#1158 and server#1280
124124
{vhost_restart_strategy, continue},
125125
%% {global, prefetch count}
126-
{default_consumer_prefetch, {false, 0}}
126+
{default_consumer_prefetch, {false, 0}},
127+
%% Socket writer will run GC every 1 GB of outgoing data
128+
{gc_threshold, 1000000000}
127129
]
128130
endef
129131

src/rabbit_basic.erl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
message/3, message/4, properties/1, prepend_table_header/3,
2323
extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4,
2424
header_routes/1, parse_expiration/1, header/2, header/3]).
25-
-export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]).
25+
-export([build_content/2, from_content/1, msg_size/1,
26+
maybe_gc_large_msg/1, maybe_gc_large_msg/2]).
2627

2728
%%----------------------------------------------------------------------------
2829

@@ -298,5 +299,10 @@ parse_expiration(#'P_basic'{expiration = Expiration}) ->
298299
maybe_gc_large_msg(Content) ->
299300
rabbit_writer:maybe_gc_large_msg(Content).
300301

302+
maybe_gc_large_msg(Content, undefined) ->
303+
rabbit_writer:msg_size(Content);
304+
maybe_gc_large_msg(Content, GCThreshold) ->
305+
rabbit_writer:maybe_gc_large_msg(Content, GCThreshold).
306+
301307
msg_size(Content) ->
302308
rabbit_writer:msg_size(Content).

src/rabbit_channel.erl

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,9 @@
161161
%% flow | noflow, see rabbitmq-server#114
162162
delivery_flow,
163163
interceptor_state,
164-
authz_context
164+
authz_context,
165+
%% defines how ofter gc will be executed
166+
gc_threshold
165167
}).
166168

167169

@@ -456,6 +458,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
456458
Limiter0
457459
end,
458460
OptionalVariables = extract_topic_variable_map_from_amqp_params(AmqpParams),
461+
{ok, GCThreshold} = application:get_env(rabbit, gc_threshold),
459462
State = #ch{state = starting,
460463
protocol = Protocol,
461464
channel = Channel,
@@ -488,7 +491,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
488491
reply_consumer = none,
489492
delivery_flow = Flow,
490493
interceptor_state = undefined,
491-
authz_context = OptionalVariables},
494+
authz_context = OptionalVariables,
495+
gc_threshold = GCThreshold},
492496
State1 = State#ch{
493497
interceptor_state = rabbit_channel_interceptor:init(State)},
494498
State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer),
@@ -931,8 +935,8 @@ extract_topic_variable_map_from_amqp_params([Value]) ->
931935
extract_topic_variable_map_from_amqp_params(_) ->
932936
#{}.
933937

934-
check_msg_size(Content) ->
935-
Size = rabbit_basic:maybe_gc_large_msg(Content),
938+
check_msg_size(Content, GCThreshold) ->
939+
Size = rabbit_basic:maybe_gc_large_msg(Content, GCThreshold),
936940
case Size > ?MAX_MSG_SIZE of
937941
true -> precondition_failed("message size ~B larger than max size ~B",
938942
[Size, ?MAX_MSG_SIZE]);
@@ -1118,8 +1122,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
11181122
user = #user{username = Username} = User,
11191123
conn_name = ConnName,
11201124
delivery_flow = Flow,
1121-
authz_context = AuthzContext}) ->
1122-
check_msg_size(Content),
1125+
authz_context = AuthzContext,
1126+
gc_threshold = GCThreshold}) ->
1127+
check_msg_size(Content, GCThreshold),
11231128
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
11241129
check_write_permitted(ExchangeName, User),
11251130
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),

0 commit comments

Comments
 (0)