Skip to content

Commit def400e

Browse files
committed
Handle rejected queues on confirm.
When one queue confirms a message and another queue failed without confirming it, the message should be rejected. Fixes some races between confirms and rejects.
1 parent 35377fc commit def400e

File tree

3 files changed

+34
-29
lines changed

3 files changed

+34
-29
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -603,13 +603,22 @@ confirm_messages(MsgIds, MTC) ->
603603
none ->
604604
{CMs, MTC0};
605605
{SenderPid, MsgSeqNo} ->
606-
{rabbit_misc:gb_trees_cons(SenderPid,
607-
MsgSeqNo, CMs),
606+
{maps:update_with(SenderPid,
607+
fun(MsgSeqNos) ->
608+
[MsgSeqNo | MsgSeqNos]
609+
end,
610+
[MsgSeqNo],
611+
CMs),
608612
maps:remove(MsgId, MTC0)}
609613

610614
end
611-
end, {gb_trees:empty(), MTC}, MsgIds),
612-
rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
615+
end, {#{}, MTC}, MsgIds),
616+
maps:fold(
617+
fun(Pid, MsgSeqNos, _) ->
618+
rabbit_misc:confirm_to_sender(Pid, MsgSeqNos)
619+
end,
620+
ok,
621+
CMs),
613622
MTC1.
614623

615624
send_or_record_confirm(#delivery{confirm = false}, State) ->

src/rabbit_channel.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2233,10 +2233,11 @@ confirm(MsgSeqNos, QRef, State = #ch{queue_names = QNames, unconfirmed = UC}) ->
22332233
%% does not exist in unconfirmed messages.
22342234
%% Neither does the 'ignore' atom, so it's a reasonable fallback.
22352235
QName = maps:get(QRef, QNames, ignore),
2236-
{MXs, UC1} =
2236+
{ConfirmMXs, RejectMXs, UC1} =
22372237
unconfirmed_messages:confirm_multiple_msg_ref(MsgSeqNos, QName, QRef, UC),
22382238
%% NB: don't call noreply/1 since we don't want to send confirms.
2239-
record_confirms(MXs, State#ch{unconfirmed = UC1}).
2239+
State1 = record_confirms(ConfirmMXs, State#ch{unconfirmed = UC1}),
2240+
record_rejects(RejectMXs, State1).
22402241

22412242
send_confirms_and_nacks(State = #ch{tx = none, confirmed = [], rejected = []}) ->
22422243
State;

src/unconfirmed_messages.erl

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333

3434
-export([new/0,
3535
insert/5,
36-
confirm_msg_ref/4,
3736
confirm_multiple_msg_ref/4,
3837
forget_ref/2,
3938

@@ -112,27 +111,22 @@ insert(MsgId, QueueNames, QueueRefs, XName,
112111
error({message_already_exists, MsgId, QueueNames, QueueRefs, XName, UC})
113112
end.
114113

115-
%% Confirms a message on behalf of the given queue. If it was the last queue (ref)
116-
%% on the waiting list, returns 'confirmed' and performs the necessary cleanup.
117-
-spec confirm_msg_ref(msg_id(), queue_name(), queue_ref(), ?MODULE()) ->
118-
{{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}.
119-
confirm_msg_ref(MsgId, QueueName, QueueRef,
120-
#unconfirmed{reverse = Reverse} = UC) ->
121-
remove_msg_ref(confirm, MsgId, QueueName, QueueRef,
122-
UC#unconfirmed{reverse = remove_from_reverse(QueueRef, [MsgId], Reverse)}).
123-
114+
%% Confirms messages on behalf of the given queue. If it was the last queue (ref)
115+
%% on the waiting list, returns message id and excahnge name
116+
%% and performs the necessary cleanup.
124117
-spec confirm_multiple_msg_ref(msg_id(), queue_name(), queue_ref(), ?MODULE()) ->
125-
{{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}.
118+
{[{msg_id(), exchange_name()}], [{msg_id(), exchange_name()}], ?MODULE()}.
126119
confirm_multiple_msg_ref(MsgIds, QueueName, QueueRef,
127120
#unconfirmed{reverse = Reverse} = UC0) ->
128121
lists:foldl(
129-
fun(MsgId, {C, UC}) ->
122+
fun(MsgId, {C, R, UC}) ->
130123
case remove_msg_ref(confirm, MsgId, QueueName, QueueRef, UC) of
131-
{{confirmed, V}, UC1} -> {[V | C], UC1};
132-
{not_confirmed, UC1} -> {C, UC1}
124+
{{confirmed, V}, UC1} -> {[V | C], R, UC1};
125+
{{rejected, V}, UC1} -> {C, [V | R], UC1};
126+
{not_confirmed, UC1} -> {C, R, UC1}
133127
end
134128
end,
135-
{[], UC0#unconfirmed{reverse = remove_from_reverse(QueueRef, MsgIds, Reverse)}},
129+
{[], [], UC0#unconfirmed{reverse = remove_from_reverse(QueueRef, MsgIds, Reverse)}},
136130
MsgIds).
137131

138132
%% Removes all messages for a queue.
@@ -179,14 +173,15 @@ reject_msg(MsgId, #unconfirmed{ordered = Ordered, index = Index, reverse = Rever
179173
{Rejected :: [{msg_id(), exchange_name()}], ?MODULE()}.
180174
reject_all_for_queue(QueueRef, #unconfirmed{reverse = Reverse0} = UC0) ->
181175
MsgIds = maps:keys(maps:get(QueueRef, Reverse0, #{})),
182-
lists:foldl(fun(MsgId, {R, UC}) ->
183-
case reject_msg(MsgId, UC) of
184-
{not_confirmed, UC1} -> {R, UC1};
185-
{{rejected, V}, UC1} -> {[V | R], UC1}
186-
end
187-
end,
188-
{[], UC0#unconfirmed{reverse = maps:remove(QueueRef, Reverse0)}},
189-
MsgIds).
176+
lists:foldl(
177+
fun(MsgId, {R, UC}) ->
178+
case reject_msg(MsgId, UC) of
179+
{not_confirmed, UC1} -> {R, UC1};
180+
{{rejected, V}, UC1} -> {[V | R], UC1}
181+
end
182+
end,
183+
{[], UC0#unconfirmed{reverse = maps:remove(QueueRef, Reverse0)}},
184+
MsgIds).
190185

191186
%% Returns a smallest message id.
192187
-spec smallest(?MODULE()) -> msg_id().

0 commit comments

Comments
 (0)