Skip to content

Commit bcf09a1

Browse files
committed
Skip and ignore pending Acks which no longer exist in RAM and/or DISK when
dead-lettering rejected messages - to avoid crashing the queue from rejected message Acks (messages have been rejected already, no further Ack processing required if not found in ram_pending_ack and disk_pending_ack).
1 parent 8199c0f commit bcf09a1

File tree

1 file changed

+8
-4
lines changed

1 file changed

+8
-4
lines changed

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -650,9 +650,13 @@ requeue(AckTags, #vqstate { delta = Delta,
650650
ackfold(MsgFun, Acc, State, AckTags) ->
651651
{AccN, StateN} =
652652
lists:foldl(fun(SeqId, {Acc0, State0}) ->
653-
MsgStatus = lookup_pending_ack(SeqId, State0),
654-
{Msg, State1} = read_msg(MsgStatus, State0),
655-
{MsgFun(Msg, SeqId, Acc0), State1}
653+
case lookup_pending_ack(SeqId, State0) of
654+
none ->
655+
{Acc0, State0};
656+
MsgStatus = #msg_status{} ->
657+
{Msg, State1} = read_msg(MsgStatus, State0),
658+
{MsgFun(Msg, SeqId, Acc0), State1}
659+
end
656660
end, {Acc, State}, AckTags),
657661
{AccN, a(StateN)}.
658662

@@ -1923,7 +1927,7 @@ record_pending_ack(#msg_status { seq_id = SeqId } = MsgStatus,
19231927
lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
19241928
disk_pending_ack = DPA}) ->
19251929
case maps:get(SeqId, RPA, none) of
1926-
none -> maps:get(SeqId, DPA);
1930+
none -> maps:get(SeqId, DPA, none);
19271931
V -> V
19281932
end.
19291933

0 commit comments

Comments
 (0)