Skip to content

Commit b49dcf2

Browse files
kjnilssonmergify[bot]
authored andcommitted
Fix at-least-once dead lettering when the target include the source.
If the target for at least once dead lettering included the source queue the dead letter outbound queue in the quorum queue would never be cleared. This changes the queue -> dead letter worker message format to better distinguish between those and queue events for "normal" queue type interactions. (cherry picked from commit 5d563d0) (cherry picked from commit 3f6de3e)
1 parent 86fe884 commit b49dcf2

File tree

4 files changed

+13
-10
lines changed

4 files changed

+13
-10
lines changed

deps/rabbit/src/rabbit_fifo_dlx.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ delivery_effects(CPid, Msgs0) ->
236236
Msgs = lists:zipwith(fun (Cmd, {Reason, MsgId}) ->
237237
{MsgId, {Reason, rabbit_fifo:get_msg(Cmd)}}
238238
end, Log, RsnIds),
239-
[{send_msg, CPid, {dlx_delivery, Msgs}, [ra_event]}]
239+
[{send_msg, CPid, {dlx_event, self(), {dlx_delivery, Msgs}}, [cast]}]
240240
end}].
241241

242242
-spec state_enter(ra_server:ra_state() | eol, rabbit_types:r('queue'), dead_letter_handler(), state()) ->
@@ -308,7 +308,7 @@ update_config(at_least_once, at_least_once, _, State) ->
308308
{State, []};
309309
Pid ->
310310
%% Notify rabbit_fifo_dlx_worker about potentially updated policies.
311-
{State, [{send_msg, Pid, lookup_topology, ra_event}]}
311+
{State, [{send_msg, Pid, {dlx_event, self(), lookup_topology}, cast}]}
312312
end;
313313
update_config(SameDLH, SameDLH, _, State) ->
314314
{State, []};

deps/rabbit/src/rabbit_fifo_dlx_client.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,10 @@ process_command(Cmd, #state{leader = Leader} = State, Tries) ->
5757
process_command(Cmd, State, Tries - 1)
5858
end.
5959

60-
-spec handle_ra_event(ra:server_id(), term(), state()) ->
60+
-spec handle_ra_event(pid(), term(), state()) ->
6161
{ok, state(), actions()}.
62-
handle_ra_event(Leader, {machine, {dlx_delivery, _} = Del}, #state{leader = Leader} = State) ->
62+
handle_ra_event(Leader, {dlx_delivery, _} = Del,
63+
#state{leader = _Leader} = State) when node(Leader) == node() ->
6364
handle_delivery(Del, State);
6465
handle_ra_event(From, Evt, State) ->
6566
rabbit_log:debug("Ignoring ra event ~p from ~p", [Evt, From]),

deps/rabbit/src/rabbit_fifo_dlx_worker.erl

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,20 +135,21 @@ handle_call(Request, From, State) ->
135135
rabbit_log:info("~s received unhandled call from ~p: ~p", [?MODULE, From, Request]),
136136
{noreply, State}.
137137

138-
handle_cast({queue_event, QRef, {_From, {machine, lookup_topology}}},
139-
#state{queue_ref = QRef} = State0) ->
138+
handle_cast({dlx_event, _LeaderPid, {machine, lookup_topology}},
139+
#state{queue_ref = _} = State0) ->
140140
State = lookup_topology(State0),
141141
redeliver_and_ack(State);
142-
handle_cast({queue_event, QRef, {From, Evt}},
143-
#state{queue_ref = QRef,
142+
handle_cast({dlx_event, LeaderPid, Evt},
143+
#state{queue_ref = _QRef,
144144
dlx_client_state = DlxState0} = State0) ->
145145
%% received dead-letter message from source queue
146-
{ok, DlxState, Actions} = rabbit_fifo_dlx_client:handle_ra_event(From, Evt, DlxState0),
146+
{ok, DlxState, Actions} = rabbit_fifo_dlx_client:handle_ra_event(LeaderPid, Evt, DlxState0),
147147
State1 = State0#state{dlx_client_state = DlxState},
148148
State = handle_queue_actions(Actions, State1),
149149
{noreply, State};
150150
handle_cast({queue_event, QRef, Evt},
151151
#state{queue_type_state = QTypeState0} = State0) ->
152+
152153
case rabbit_queue_type:handle_event(QRef, Evt, QTypeState0) of
153154
{ok, QTypeState1, Actions} ->
154155
%% received e.g. confirm from target queue

deps/rabbit/test/dead_lettering_SUITE.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
-module(dead_lettering_SUITE).
1010

1111
-include_lib("common_test/include/ct.hrl").
12-
-include_lib("kernel/include/file.hrl").
1312
-include_lib("amqp_client/include/amqp_client.hrl").
1413
-include_lib("eunit/include/eunit.hrl").
1514
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
@@ -1040,6 +1039,7 @@ dead_letter_headers_cycle(Config) ->
10401039
publish(Ch, QName, [P]),
10411040
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
10421041
[DTag] = consume(Ch, QName, [P]),
1042+
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
10431043
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag,
10441044
multiple = false,
10451045
requeue = false}),
@@ -1050,6 +1050,7 @@ dead_letter_headers_cycle(Config) ->
10501050
{array, [{table, Death1}]} = rabbit_misc:table_lookup(Headers1, <<"x-death">>),
10511051
?assertEqual({long, 1}, rabbit_misc:table_lookup(Death1, <<"count">>)),
10521052

1053+
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
10531054
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
10541055
multiple = false,
10551056
requeue = false}),

0 commit comments

Comments
 (0)