diff --git a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl index 66c6dbdd38bc..f513b27a975c 100644 --- a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl @@ -372,11 +372,11 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = on_confirm when length(Queues) > 0 -> State2; on_publish -> - rabbit_shovel_behaviour:decr_remaining( + decr_remaining( 1, record_confirms([{Tag, Tag}], State2)); _ -> - rabbit_shovel_behaviour:decr_remaining(1, State2) + decr_remaining(1, State2) end), MsgSeqNo = maps:get(correlation, Options, undefined), QNames = lists:map(fun({QName, _}) -> QName; @@ -753,7 +753,7 @@ record_confirms([], State) -> record_confirms(MXs, State = #{dest := Dst = #{confirmed := C, confirmed_count := CC}}) -> Num = length(MXs), - rabbit_shovel_behaviour:decr_remaining( + decr_remaining( Num, State#{dest => Dst#{confirmed => [MXs | C], confirmed_count => CC + Num}}). @@ -762,7 +762,7 @@ record_rejects([], State) -> record_rejects(MXs, State = #{dest := Dst = #{rejected := R, rejected_count := RC}}) -> Num = length(MXs), - rabbit_shovel_behaviour:decr_remaining( + decr_remaining( Num, State#{dest => Dst#{rejected => [MXs | R], rejected_count => RC + Num}}). @@ -824,3 +824,12 @@ send_confirms_and_nacks(State = #{dest := Dst = #{confirmed := C, ConfirmTags, State1#{dest => Dst2#{rejected => [], rejected_count => 0}}). + +decr_remaining(Num, State) -> + try + rabbit_shovel_behaviour:decr_remaining(Num, State) + catch + exit:{shutdown, autodelete} = R -> + _ = send_confirms_and_nacks(State), + exit(R) + end.