Skip to content

Commit ed96326

Browse files
author
Francesco Mazzoli
committed
Update the msg_id_ack in set_length.
1 parent b4667e3 commit ed96326

File tree

1 file changed

+16
-12
lines changed

1 file changed

+16
-12
lines changed

src/rabbit_mirror_queue_slave.erl

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -798,18 +798,22 @@ process_instruction({set_length, Length, AckRequired},
798798
backing_queue_state = BQS }) ->
799799
QLen = BQ:len(BQS),
800800
ToDrop = QLen - Length,
801-
{ok, case ToDrop >= 0 of
802-
true -> BQS1 =
803-
lists:foldl(
804-
fun (const, BQSN) ->
805-
{{_Msg, _IsDelivered, _AckTag, _Remaining},
806-
BQSN1} = BQ:fetch(AckRequired, BQSN),
807-
BQSN1
808-
end, BQS, lists:duplicate(ToDrop, const)),
809-
set_synchronised(
810-
true, State #state { backing_queue_state = BQS1 });
811-
false -> State
812-
end};
801+
{ok,
802+
case ToDrop >= 0 of
803+
true ->
804+
State1 =
805+
lists:foldl(
806+
fun (const, StateN = #state {backing_queue_state = BQSN}) ->
807+
{{#basic_message{id = MsgId}, _IsDelivered, AckTag,
808+
_Remaining}, BQSN1} = BQ:fetch(AckRequired, BQSN),
809+
maybe_store_ack(
810+
AckRequired, MsgId, AckTag,
811+
StateN #state { backing_queue_state = BQSN1 })
812+
end, State, lists:duplicate(ToDrop, const)),
813+
set_synchronised(true, State1);
814+
false ->
815+
State
816+
end};
813817
process_instruction({fetch, AckRequired, MsgId, Remaining},
814818
State = #state { backing_queue = BQ,
815819
backing_queue_state = BQS }) ->

0 commit comments

Comments
 (0)