Skip to content

Commit 56bb786

Browse files
committed
QQ: track discarded bytes and take snapshots based on that.
1 parent 38e50fc commit 56bb786

10 files changed

+803
-208
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 534 additions & 159 deletions
Large diffs are not rendered by default.

deps/rabbit/src/rabbit_fifo.hrl

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,31 @@
180180
unused_3 = ?NIL
181181
}).
182182

183+
-record(messages,
184+
{
185+
messages = rabbit_fifo_q:new() :: rabbit_fifo_q:state(),
186+
messages_total = 0 :: non_neg_integer(),
187+
% queue of returned msg_in_ids - when checking out it picks from
188+
returns = lqueue:new() :: lqueue:lqueue(term())
189+
}).
190+
191+
-record(dlx_consumer,
192+
{pid :: pid(),
193+
prefetch :: non_neg_integer(),
194+
checked_out = #{} :: #{msg_id() =>
195+
optimised_tuple(rabbit_dead_letter:reason(), msg())},
196+
next_msg_id = 0 :: msg_id()}).
197+
198+
-record(rabbit_fifo_dlx,
199+
{consumer :: option(#dlx_consumer{}),
200+
%% Queue of dead-lettered messages.
201+
discards = lqueue:new() :: lqueue:lqueue(optimised_tuple(rabbit_dead_letter:reason(), msg())),
202+
%% Raft indexes of messages in both discards queue and dlx_consumer's checked_out map
203+
%% so that we get the smallest ra index in O(1).
204+
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
205+
msg_bytes = 0 :: non_neg_integer(),
206+
msg_bytes_checkout = 0 :: non_neg_integer()}).
207+
183208
-record(rabbit_fifo,
184209
{cfg :: #cfg{},
185210
% unassigned messages
@@ -207,7 +232,7 @@
207232
% consumers that require further service are queued here
208233
service_queue = priority_queue:new() :: priority_queue:q(),
209234
%% state for at-least-once dead-lettering
210-
dlx = rabbit_fifo_dlx:init() :: rabbit_fifo_dlx:state(),
235+
dlx = #rabbit_fifo_dlx{} :: #rabbit_fifo_dlx{},
211236
msg_bytes_enqueue = 0 :: non_neg_integer(),
212237
msg_bytes_checkout = 0 :: non_neg_integer(),
213238
%% one is picked if active consumer is cancelled or dies

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,13 +143,13 @@ enqueue(QName, Correlation, Msg,
143143
{reject_publish, State0};
144144
{error, {shutdown, delete}} ->
145145
?LOG_DEBUG("~ts: QQ ~ts tried to register enqueuer during delete shutdown",
146-
[?MODULE, rabbit_misc:rs(QName)]),
146+
[?MODULE, rabbit_misc:rs(QName)]),
147147
{reject_publish, State0};
148148
{timeout, _} ->
149149
{reject_publish, State0};
150150
Err ->
151151
?LOG_DEBUG("~ts: QQ ~ts error when registering enqueuer ~p",
152-
[?MODULE, rabbit_misc:rs(QName), Err]),
152+
[?MODULE, rabbit_misc:rs(QName), Err]),
153153
exit(Err)
154154
end;
155155
enqueue(_QName, _Correlation, _Msg,

deps/rabbit/src/rabbit_fifo_dlx.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
-module(rabbit_fifo_dlx).
88

99
-include("rabbit_fifo_dlx.hrl").
10-
-include("rabbit_fifo.hrl").
10+
-include("rabbit_fifo_v7.hrl").
1111
-include_lib("kernel/include/logger.hrl").
1212
-compile({no_auto_import, [apply/3]}).
1313

deps/rabbit/src/rabbit_fifo_dlx.hrl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
-record(dlx_consumer,
88
{pid :: pid(),
99
prefetch :: non_neg_integer(),
10-
checked_out = #{} :: #{msg_id() => optimised_tuple(rabbit_dead_letter:reason(), msg())},
10+
checked_out = #{} :: #{msg_id() =>
11+
optimised_tuple(rabbit_dead_letter:reason(), msg())},
1112
next_msg_id = 0 :: msg_id()}).
1213

1314
-record(rabbit_fifo_dlx,

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -373,13 +373,11 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
373373
PolicyConfig = gather_policy_config(Q, true),
374374
QName = amqqueue:get_name(Q),
375375
{Name, _} = amqqueue:get_pid(Q),
376-
PolicyConfig#{
377-
name => Name,
378-
queue_resource => QName,
379-
become_leader_handler => {?MODULE, become_leader, [QName]},
380-
single_active_consumer_on => single_active_consumer_on(Q),
381-
created => erlang:system_time(millisecond)
382-
}.
376+
PolicyConfig#{name => Name,
377+
queue_resource => QName,
378+
single_active_consumer_on => single_active_consumer_on(Q),
379+
created => erlang:system_time(millisecond)
380+
}.
383381

384382
resolve_delivery_limit(PolVal, ArgVal)
385383
when PolVal < 0 orelse ArgVal < 0 ->
@@ -678,13 +676,13 @@ handle_tick(QName,
678676
catch
679677
_:Err ->
680678
?LOG_DEBUG("~ts: handle tick failed with ~p",
681-
[rabbit_misc:rs(QName), Err]),
679+
[rabbit_misc:rs(QName), Err]),
682680
ok
683681
end
684682
end);
685683
handle_tick(QName, Config, _Nodes) ->
686684
?LOG_DEBUG("~ts: handle tick received unexpected config format ~tp",
687-
[rabbit_misc:rs(QName), Config]).
685+
[rabbit_misc:rs(QName), Config]).
688686

689687
repair_leader_record(Q, Name) ->
690688
Node = node(),
@@ -695,7 +693,7 @@ repair_leader_record(Q, Name) ->
695693
_ ->
696694
QName = amqqueue:get_name(Q),
697695
?LOG_DEBUG("~ts: updating leader record to current node ~ts",
698-
[rabbit_misc:rs(QName), Node]),
696+
[rabbit_misc:rs(QName), Node]),
699697
ok = become_leader0(QName, Name),
700698
ok
701699
end,

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3607,16 +3607,18 @@ message_bytes_metrics(Config) ->
36073607
wait_for_messages_pending_ack(Servers, RaName, 0),
36083608
rabbit_ct_helpers:await_condition(
36093609
fun() ->
3610-
{3, 3, 0} == get_message_bytes(Leader, QRes)
3610+
{M, M, 0} = get_message_bytes(Leader, QRes),
3611+
M > 0
36113612
end, 30000),
36123613

3614+
{MsgSize, _, _} = get_message_bytes(Leader, QRes),
36133615
subscribe(Ch, QQ, false),
36143616

36153617
wait_for_messages_ready(Servers, RaName, 0),
36163618
wait_for_messages_pending_ack(Servers, RaName, 1),
36173619
rabbit_ct_helpers:await_condition(
36183620
fun() ->
3619-
{3, 0, 3} == get_message_bytes(Leader, QRes)
3621+
{MsgSize, 0, MsgSize} == get_message_bytes(Leader, QRes)
36203622
end, 30000),
36213623

36223624
receive
@@ -3641,7 +3643,7 @@ message_bytes_metrics(Config) ->
36413643
wait_for_messages_pending_ack(Servers, RaName, 1),
36423644
rabbit_ct_helpers:await_condition(
36433645
fun() ->
3644-
{3, 0, 3} == get_message_bytes(Leader, QRes)
3646+
{MsgSize, 0, MsgSize} == get_message_bytes(Leader, QRes)
36453647
end, 30000),
36463648

36473649
rabbit_ct_client_helpers:close_channel(Ch),
@@ -3650,7 +3652,7 @@ message_bytes_metrics(Config) ->
36503652
wait_for_messages_pending_ack(Servers, RaName, 0),
36513653
rabbit_ct_helpers:await_condition(
36523654
fun() ->
3653-
{3, 3, 0} == get_message_bytes(Leader, QRes)
3655+
{MsgSize, MsgSize, 0} == get_message_bytes(Leader, QRes)
36543656
end, 30000),
36553657
ok.
36563658

0 commit comments

Comments
 (0)