Skip to content

Commit ccbf925

Browse files
dcorbachomergify[bot]
authored andcommitted
Local shovels: fix handling of acks/nacks
(cherry picked from commit 6e2e195)
1 parent a5a9363 commit ccbf925

File tree

1 file changed

+12
-14
lines changed

1 file changed

+12
-14
lines changed

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -591,22 +591,20 @@ get_user_vhost_from_amqp_param(Uri) ->
591591
settle(Op, DeliveryTag, Multiple,
592592
#{source := #{queue_r := QRef,
593593
current := Current = #{consumer_tag := CTag,
594-
unacked_message_q := UAMQ0}
594+
unacked_message_q := UAMQ0,
595+
queue_states := QState0}
595596
} = Src} = State0) ->
596597
{MsgIds, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple),
597-
State = State0#{source => Src#{current => Current#{unacked_message_q => UAMQ}}},
598-
lists:foldl(
599-
fun(MsgId, #{source := Src0 = #{current := Current0 = #{queue_states := QState0}}} = St0) ->
600-
case rabbit_queue_type:settle(QRef, Op, CTag, [MsgId], QState0) of
601-
{ok, QState1, Actions} ->
602-
St = St0#{source => Src0#{current => Current0#{queue_states => QState1}}},
603-
handle_queue_actions(Actions, St);
604-
{'protocol_error', Type, Reason, Args} ->
605-
?LOG_ERROR("Shovel failed to settle ~p acknowledgments with ~tp: ~tp",
606-
[Op, Type, io_lib:format(Reason, Args)]),
607-
exit({shutdown, {ack_failed, Reason}})
608-
end
609-
end, State, MsgIds).
598+
case rabbit_queue_type:settle(QRef, Op, CTag, lists:reverse(MsgIds), QState0) of
599+
{ok, QState1, Actions} ->
600+
State = State0#{source => Src#{current => Current#{queue_states => QState1,
601+
unacked_message_q => UAMQ}}},
602+
handle_queue_actions(Actions, State);
603+
{'protocol_error', Type, Reason, Args} ->
604+
?LOG_ERROR("Shovel failed to settle ~p acknowledgments with ~tp: ~tp",
605+
[Op, Type, io_lib:format(Reason, Args)]),
606+
exit({shutdown, {ack_failed, Reason}})
607+
end.
610608

611609
%% From rabbit_channel
612610
%% Records a client-sent acknowledgement. Handles both single delivery acks

0 commit comments

Comments
 (0)