Skip to content

Commit 8a3db77

Browse files
committed
QQ: use new log_ext effect
1 parent cbed63f commit 8a3db77

File tree

4 files changed

+69
-27
lines changed

4 files changed

+69
-27
lines changed

deps/rabbit/src/rabbit_channel.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -564,8 +564,10 @@ prioritise_cast(Msg, _Len, _State) ->
564564
case Msg of
565565
{confirm, _MsgSeqNos, _QPid} -> 5;
566566
{reject_publish, _MsgSeqNos, _QPid} -> 5;
567-
{queue_event, _, {confirm, _MsgSeqNos, _QPid}} -> 5;
567+
% {queue_event, _, {confirm, _MsgSeqNos, _QPid}} -> 5;
568568
{queue_event, _, {reject_publish, _MsgSeqNos, _QPid}} -> 5;
569+
{method, #'basic.ack'{}, _Content, _Flow} -> 5;
570+
% {queue_event, _, {delivery, _, _}} -> 0;
569571
_ -> 0
570572
end.
571573

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 54 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,10 @@
8989
make_purge/0,
9090
make_purge_nodes/1,
9191
make_update_config/1,
92-
make_garbage_collection/0
92+
make_garbage_collection/0,
93+
94+
exec_read/3
95+
9396
]).
9497

9598
-ifdef(TEST).
@@ -2077,31 +2080,52 @@ delivery_effect(ConsumerKey, [{MsgId, ?MSG(Idx, Header)}],
20772080
{send_msg, CPid, {delivery, CTag, [{MsgId, {Header, RawMsg}}]},
20782081
?DELIVERY_SEND_MSG_OPTS};
20792082
delivery_effect(ConsumerKey, Msgs,
2080-
#?STATE{cfg = #cfg{resource = QR}} = State) ->
2083+
#?STATE{cfg = #cfg{resource = _QR}} = State) ->
20812084
{CTag, CPid} = consumer_id(ConsumerKey, State),
2082-
{RaftIdxs, Num} = lists:foldr(fun ({_, ?MSG(I, _)}, {Acc, N}) ->
2083-
{[I | Acc], N+1}
2084-
end, {[], 0}, Msgs),
2085-
{log, RaftIdxs,
2086-
fun (Commands)
2087-
when length(Commands) < Num ->
2088-
%% the mandatory length/1 guard is a bit :(
2089-
rabbit_log:info("~ts: requested read consumer tag '~ts' of ~b "
2090-
"indexes ~w but only ~b were returned. "
2091-
"This is most likely a stale read request "
2092-
"and can be ignored",
2093-
[rabbit_misc:rs(QR), CTag, Num, RaftIdxs,
2094-
length(Commands)]),
2095-
[];
2096-
(Commands) ->
2097-
DelMsgs = lists:zipwith(
2098-
fun (Cmd, {MsgId, ?MSG(_Idx, Header)}) ->
2099-
{MsgId, {Header, get_msg(Cmd)}}
2100-
end, Commands, Msgs),
2101-
[{send_msg, CPid, {delivery, CTag, DelMsgs},
2085+
{RaftIdxs, _Num} = lists:foldr(fun ({_, ?MSG(I, _)}, {Acc, N}) ->
2086+
{[I | Acc], N+1}
2087+
end, {[], 0}, Msgs),
2088+
rabbit_log:debug("DELIVERY EFFECT created for ~s ~p!", [CTag, RaftIdxs]),
2089+
{log_ext, RaftIdxs,
2090+
fun
2091+
% (Commands)
2092+
% when length(Commands) < Num ->
2093+
% %% the mandatory length/1 guard is a bit :(
2094+
% rabbit_log:info("~ts: requested read consumer tag '~ts' of ~b "
2095+
% "indexes ~w but only ~b were returned. "
2096+
% "This is most likely a stale read request "
2097+
% "and can be ignored",
2098+
% [rabbit_misc:rs(QR), CTag, Num, RaftIdxs,
2099+
% length(Commands)]),
2100+
% [];
2101+
(ReadPlan) ->
2102+
rabbit_log:debug("READPLAN created for ~s ~p!", [CTag, RaftIdxs]),
2103+
%% TODO: check if CPid is local or not
2104+
%% TODO: could consider introducing a leader local proxy process
2105+
[{send_msg, CPid, {delivery, CTag, ReadPlan, Msgs},
21022106
?DELIVERY_SEND_MSG_OPTS}]
21032107
end,
21042108
{local, node(CPid)}}.
2109+
% {log, RaftIdxs,
2110+
% fun (Commands)
2111+
% when length(Commands) < Num ->
2112+
% %% the mandatory length/1 guard is a bit :(
2113+
% rabbit_log:info("~ts: requested read consumer tag '~ts' of ~b "
2114+
% "indexes ~w but only ~b were returned. "
2115+
% "This is most likely a stale read request "
2116+
% "and can be ignored",
2117+
% [rabbit_misc:rs(QR), CTag, Num, RaftIdxs,
2118+
% length(Commands)]),
2119+
% [];
2120+
% (Commands) ->
2121+
% DelMsgs = lists:zipwith(
2122+
% fun (Cmd, {MsgId, ?MSG(_Idx, Header)}) ->
2123+
% {MsgId, {Header, get_msg(Cmd)}}
2124+
% end, Commands, Msgs),
2125+
% [{send_msg, CPid, {delivery, CTag, DelMsgs},
2126+
% ?DELIVERY_SEND_MSG_OPTS}]
2127+
% end,
2128+
% {local, node(CPid)}}.
21052129

21062130
reply_log_effect(RaftIdx, MsgId, Header, Ready, From) ->
21072131
{log, [RaftIdx],
@@ -3014,3 +3038,11 @@ incr_msg(Msg0, DelFailed, Anns) ->
30143038
false ->
30153039
Msg2
30163040
end.
3041+
3042+
exec_read(Flru0, ReadPlan, Msgs) ->
3043+
{Entries, Flru} = ra_log_read_plan:execute(ReadPlan, Flru0),
3044+
%% return a list in original order
3045+
{lists:map(fun ({MsgId, ?MSG(Idx, Header)}) ->
3046+
Cmd = maps:get(Idx, Entries),
3047+
{MsgId, {Header, get_msg(Cmd)}}
3048+
end, Msgs), Flru}.

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@
6969
pending = #{} :: #{seq() =>
7070
{term(), rabbit_fifo:command()}},
7171
consumers = #{} :: #{rabbit_types:ctag() => #consumer{}},
72-
timer_state :: term()
72+
timer_state :: term(),
73+
cached_segments :: undefined | ra_flru:state()
7374
}).
7475

7576
-opaque state() :: #state{}.
@@ -167,7 +168,7 @@ enqueue(QName, Correlation, Msg,
167168
%% @param QueueName Name of the queue.
168169
%% @param Msg an arbitrary erlang term representing the message.
169170
%% @param State the current {@module} state.
170-
%% @returns
171+
%% @return's
171172
%% `{ok, State, Actions}' if the command was successfully sent.
172173
%% {@module} assigns a sequence number to every raft command it issues. The
173174
%% SequenceNumber can be correlated to the applied sequence numbers returned
@@ -633,7 +634,8 @@ handle_ra_event(QName, From, {applied, Seqs},
633634
_ ->
634635
{ok, State1, Actions}
635636
end;
636-
handle_ra_event(QName, From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
637+
handle_ra_event(QName, From, {machine, Del}, State0)
638+
when element(1, Del) == delivery ->
637639
handle_delivery(QName, From, Del, State0);
638640
handle_ra_event(_QName, _From, {machine, Action}, State)
639641
when element(1, Action) =:= credit_reply orelse
@@ -835,7 +837,12 @@ handle_delivery(_QName, _Leader, {delivery, Tag, [_ | _] = IdMsgs},
835837
%% we should return all messages.
836838
MsgIntIds = [Id || {Id, _} <- IdMsgs],
837839
{State1, Deliveries} = return(Tag, MsgIntIds, State0),
838-
{ok, State1, Deliveries}.
840+
{ok, State1, Deliveries};
841+
handle_delivery(QName, Leader, {delivery, Tag, ReadState, Msgs},
842+
#state{cached_segments = Cached0} = State) ->
843+
{MsgIds, Cached} = rabbit_fifo:exec_read(Cached0, ReadState, Msgs),
844+
handle_delivery(QName, Leader, {delivery, Tag, MsgIds},
845+
State#state{cached_segments = Cached}).
839846

840847
transform_msgs(QName, QRef, Msgs) ->
841848
lists:map(

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3633,6 +3633,7 @@ receive_and_ack(Ch) ->
36333633
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
36343634
multiple = false})
36353635
after 5000 ->
3636+
flush(1),
36363637
ct:fail("receive_and_ack timed out", [])
36373638
end.
36383639

0 commit comments

Comments
 (0)