Skip to content

Commit 6701f51

Browse files
committed
QQ: use new log_ext effect
1 parent cbed63f commit 6701f51

File tree

4 files changed

+89
-30
lines changed

4 files changed

+89
-30
lines changed

deps/rabbit/src/rabbit_channel.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -564,8 +564,10 @@ prioritise_cast(Msg, _Len, _State) ->
564564
case Msg of
565565
{confirm, _MsgSeqNos, _QPid} -> 5;
566566
{reject_publish, _MsgSeqNos, _QPid} -> 5;
567-
{queue_event, _, {confirm, _MsgSeqNos, _QPid}} -> 5;
567+
% {queue_event, _, {confirm, _MsgSeqNos, _QPid}} -> 5;
568568
{queue_event, _, {reject_publish, _MsgSeqNos, _QPid}} -> 5;
569+
{method, #'basic.ack'{}, _Content, _Flow} -> 5;
570+
% {queue_event, _, {delivery, _, _}} -> 0;
569571
_ -> 0
570572
end.
571573

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,10 @@
8989
make_purge/0,
9090
make_purge_nodes/1,
9191
make_update_config/1,
92-
make_garbage_collection/0
92+
make_garbage_collection/0,
93+
94+
exec_read/3
95+
9396
]).
9497

9598
-ifdef(TEST).
@@ -2077,29 +2080,26 @@ delivery_effect(ConsumerKey, [{MsgId, ?MSG(Idx, Header)}],
20772080
{send_msg, CPid, {delivery, CTag, [{MsgId, {Header, RawMsg}}]},
20782081
?DELIVERY_SEND_MSG_OPTS};
20792082
delivery_effect(ConsumerKey, Msgs,
2080-
#?STATE{cfg = #cfg{resource = QR}} = State) ->
2083+
#?STATE{cfg = #cfg{resource = _QR}} = State) ->
20812084
{CTag, CPid} = consumer_id(ConsumerKey, State),
2082-
{RaftIdxs, Num} = lists:foldr(fun ({_, ?MSG(I, _)}, {Acc, N}) ->
2083-
{[I | Acc], N+1}
2084-
end, {[], 0}, Msgs),
2085-
{log, RaftIdxs,
2086-
fun (Commands)
2087-
when length(Commands) < Num ->
2088-
%% the mandatory length/1 guard is a bit :(
2089-
rabbit_log:info("~ts: requested read consumer tag '~ts' of ~b "
2090-
"indexes ~w but only ~b were returned. "
2091-
"This is most likely a stale read request "
2092-
"and can be ignored",
2093-
[rabbit_misc:rs(QR), CTag, Num, RaftIdxs,
2094-
length(Commands)]),
2095-
[];
2096-
(Commands) ->
2097-
DelMsgs = lists:zipwith(
2098-
fun (Cmd, {MsgId, ?MSG(_Idx, Header)}) ->
2099-
{MsgId, {Header, get_msg(Cmd)}}
2100-
end, Commands, Msgs),
2101-
[{send_msg, CPid, {delivery, CTag, DelMsgs},
2102-
?DELIVERY_SEND_MSG_OPTS}]
2085+
{RaftIdxs, _Num} = lists:foldr(fun ({_, ?MSG(I, _)}, {Acc, N}) ->
2086+
{[I | Acc], N+1}
2087+
end, {[], 0}, Msgs),
2088+
{log_ext, RaftIdxs,
2089+
fun (ReadPlan) ->
2090+
case node(CPid) == node() of
2091+
true ->
2092+
[{send_msg, CPid, {delivery, CTag, ReadPlan, Msgs},
2093+
?DELIVERY_SEND_MSG_OPTS}];
2094+
false ->
2095+
%% if we got there we need to read the data on this node
2096+
%% and send it to the consumer pid as it isn't availble
2097+
%% locally
2098+
{DelMsgs, Flru} = exec_read(undefined, ReadPlan, Msgs),
2099+
ra_flru:evict_all(Flru),
2100+
[{send_msg, CPid, {delivery, CTag, DelMsgs},
2101+
?DELIVERY_SEND_MSG_OPTS}]
2102+
end
21032103
end,
21042104
{local, node(CPid)}}.
21052105

@@ -3014,3 +3014,11 @@ incr_msg(Msg0, DelFailed, Anns) ->
30143014
false ->
30153015
Msg2
30163016
end.
3017+
3018+
exec_read(Flru0, ReadPlan, Msgs) ->
3019+
{Entries, Flru} = ra_log_read_plan:execute(ReadPlan, Flru0),
3020+
%% return a list in original order
3021+
{lists:map(fun ({MsgId, ?MSG(Idx, Header)}) ->
3022+
Cmd = maps:get(Idx, Entries),
3023+
{MsgId, {Header, get_msg(Cmd)}}
3024+
end, Msgs), Flru}.

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@
6969
pending = #{} :: #{seq() =>
7070
{term(), rabbit_fifo:command()}},
7171
consumers = #{} :: #{rabbit_types:ctag() => #consumer{}},
72-
timer_state :: term()
72+
timer_state :: term(),
73+
cached_segments :: undefined | ra_flru:state()
7374
}).
7475

7576
-opaque state() :: #state{}.
@@ -167,7 +168,7 @@ enqueue(QName, Correlation, Msg,
167168
%% @param QueueName Name of the queue.
168169
%% @param Msg an arbitrary erlang term representing the message.
169170
%% @param State the current {@module} state.
170-
%% @returns
171+
%% @return's
171172
%% `{ok, State, Actions}' if the command was successfully sent.
172173
%% {@module} assigns a sequence number to every raft command it issues. The
173174
%% SequenceNumber can be correlated to the applied sequence numbers returned
@@ -633,7 +634,8 @@ handle_ra_event(QName, From, {applied, Seqs},
633634
_ ->
634635
{ok, State1, Actions}
635636
end;
636-
handle_ra_event(QName, From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
637+
handle_ra_event(QName, From, {machine, Del}, State0)
638+
when element(1, Del) == delivery ->
637639
handle_delivery(QName, From, Del, State0);
638640
handle_ra_event(_QName, _From, {machine, Action}, State)
639641
when element(1, Action) =:= credit_reply orelse
@@ -835,7 +837,12 @@ handle_delivery(_QName, _Leader, {delivery, Tag, [_ | _] = IdMsgs},
835837
%% we should return all messages.
836838
MsgIntIds = [Id || {Id, _} <- IdMsgs],
837839
{State1, Deliveries} = return(Tag, MsgIntIds, State0),
838-
{ok, State1, Deliveries}.
840+
{ok, State1, Deliveries};
841+
handle_delivery(QName, Leader, {delivery, Tag, ReadState, Msgs},
842+
#state{cached_segments = Cached0} = State) ->
843+
{MsgIds, Cached} = rabbit_fifo:exec_read(Cached0, ReadState, Msgs),
844+
handle_delivery(QName, Leader, {delivery, Tag, MsgIds},
845+
State#state{cached_segments = Cached}).
839846

840847
transform_msgs(QName, QRef, Msgs) ->
841848
lists:map(

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,10 @@ groups() ->
107107
quorum_cluster_size_7,
108108
node_removal_is_not_quorum_critical,
109109
select_nodes_with_least_replicas,
110-
select_nodes_with_least_replicas_node_down
110+
select_nodes_with_least_replicas_node_down,
111+
subscribe_from_each
112+
113+
111114
]},
112115
{clustered_with_partitions, [],
113116
[
@@ -187,7 +190,8 @@ all_tests() ->
187190
priority_queue_fifo,
188191
priority_queue_2_1_ratio,
189192
requeue_multiple_true,
190-
requeue_multiple_false
193+
requeue_multiple_false,
194+
subscribe_from_each
191195
].
192196

193197
memory_tests() ->
@@ -1462,6 +1466,43 @@ policy_repair(Config) ->
14621466
consume_all(Ch, QQ)
14631467
end.
14641468

1469+
subscribe_from_each(Config) ->
1470+
1471+
[Server0 | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1472+
1473+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
1474+
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
1475+
QQ = ?config(queue_name, Config),
1476+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1477+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1478+
[begin
1479+
publish_confirm(Ch, QQ)
1480+
end || _ <- Servers],
1481+
timer:sleep(1000),
1482+
[begin
1483+
ok = rpc:call(S, ra_log_wal, force_roll_over, [ra_log_wal])
1484+
end || S <- Servers],
1485+
1486+
[begin
1487+
ct:pal("NODE ~p", [S]),
1488+
C = rabbit_ct_client_helpers:open_channel(Config, S),
1489+
qos(C, 1, false),
1490+
subscribe(C, QQ, false),
1491+
receive
1492+
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
1493+
amqp_channel:call(C, #'basic.ack'{delivery_tag = DeliveryTag})
1494+
after 5000 ->
1495+
ct:pal("TIMEOUT"),
1496+
flush(1),
1497+
ct:fail("basic.deliver timeout")
1498+
end,
1499+
timer:sleep(256),
1500+
rabbit_ct_client_helpers:close_channel(C),
1501+
flush(1)
1502+
1503+
end || S <- Servers],
1504+
1505+
ok.
14651506

14661507
gh_12635(Config) ->
14671508
% https://github.com/rabbitmq/rabbitmq-server/issues/12635
@@ -3633,6 +3674,7 @@ receive_and_ack(Ch) ->
36333674
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
36343675
multiple = false})
36353676
after 5000 ->
3677+
flush(1),
36363678
ct:fail("receive_and_ack timed out", [])
36373679
end.
36383680

0 commit comments

Comments
 (0)