Skip to content

Commit 1b973d2

Browse files
committed
wip
1 parent ef9260e commit 1b973d2

File tree

3 files changed

+9
-13
lines changed

3 files changed

+9
-13
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2066,6 +2066,7 @@ delivery_effect(ConsumerKey, Msgs,
20662066
{RaftIdxs, _Num} = lists:foldr(fun ({_, ?MSG(I, _)}, {Acc, N}) ->
20672067
{[I | Acc], N+1}
20682068
end, {[], 0}, Msgs),
2069+
rabbit_log:debug("DELIVERY EFFECT created for ~s ~p!", [CTag, RaftIdxs]),
20692070
{log_ext, RaftIdxs,
20702071
fun
20712072
% (Commands)
@@ -2079,15 +2080,9 @@ delivery_effect(ConsumerKey, Msgs,
20792080
% length(Commands)]),
20802081
% [];
20812082
(ReadPlan) ->
2082-
% Fun = fun (Flru0) ->
2083-
% {Entries, Flru} = ra_log:execute_read(ReadPlan, Flru0),
2084-
% %% pretend entries is a map
2085-
% {lists:map(fun ({MsgId, ?MSG(Idx, Header)}) ->
2086-
% {_, _, Cmd} = maps:get(Idx, Entries),
2087-
% %% hacky
2088-
% {MsgId, {Header, get_msg(element(3, Cmd))}}
2089-
% end, Msgs), Flru}
2090-
% end,
2083+
rabbit_log:debug("READPLAN created for ~s ~p!", [CTag, RaftIdxs]),
2084+
%% TODO: check if CPid is local or not
2085+
%% TODO: could consider introducing a leader local proxy process
20912086
[{send_msg, CPid, {delivery, CTag, ReadPlan, Msgs},
20922087
?DELIVERY_SEND_MSG_OPTS}]
20932088
end,
@@ -3026,10 +3021,10 @@ incr_msg(Msg0, DelFailed, Anns) ->
30263021
end.
30273022

30283023
exec_read(Flru0, ReadPlan, Msgs) ->
3029-
{Entries, Flru} = ra_log:execute_read(ReadPlan, Flru0),
3024+
{Entries, Flru} = ra:execute_read_plan(ReadPlan, Flru0),
30303025
%% pretend entries is a map
30313026
{lists:map(fun ({MsgId, ?MSG(Idx, Header)}) ->
3032-
{_, _, Cmd} = maps:get(Idx, Entries),
3027+
{_, _, Cmd, _} = maps:get(Idx, Entries),
30333028
%% hacky
3034-
{MsgId, {Header, get_msg(element(3, Cmd))}}
3029+
{MsgId, {Header, get_msg(Cmd)}}
30353030
end, Msgs), Flru}.

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ enqueue(QName, Correlation, Msg,
168168
%% @param QueueName Name of the queue.
169169
%% @param Msg an arbitrary erlang term representing the message.
170170
%% @param State the current {@module} state.
171-
%% @returns
171+
%% @return's
172172
%% `{ok, State, Actions}' if the command was successfully sent.
173173
%% {@module} assigns a sequence number to every raft command it issues. The
174174
%% SequenceNumber can be correlated to the applied sequence numbers returned

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3632,6 +3632,7 @@ receive_and_ack(Ch) ->
36323632
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
36333633
multiple = false})
36343634
after 5000 ->
3635+
flush(1),
36353636
ct:fail("receive_and_ack timed out", [])
36363637
end.
36373638

0 commit comments

Comments
 (0)