@@ -372,11 +372,11 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} =
372372 on_confirm when length (Queues ) > 0 ->
373373 State2 ;
374374 on_publish ->
375- rabbit_shovel_behaviour : decr_remaining (
375+ decr_remaining (
376376 1 ,
377377 record_confirms ([{Tag , Tag }], State2 ));
378378 _ ->
379- rabbit_shovel_behaviour : decr_remaining (1 , State2 )
379+ decr_remaining (1 , State2 )
380380 end ),
381381 MsgSeqNo = maps :get (correlation , Options , undefined ),
382382 QNames = lists :map (fun ({QName , _ }) -> QName ;
@@ -753,7 +753,7 @@ record_confirms([], State) ->
753753record_confirms (MXs , State = #{dest := Dst = #{confirmed := C ,
754754 confirmed_count := CC }}) ->
755755 Num = length (MXs ),
756- rabbit_shovel_behaviour : decr_remaining (
756+ decr_remaining (
757757 Num , State #{dest => Dst #{confirmed => [MXs | C ],
758758 confirmed_count => CC + Num }}).
759759
@@ -762,7 +762,7 @@ record_rejects([], State) ->
762762record_rejects (MXs , State = #{dest := Dst = #{rejected := R ,
763763 rejected_count := RC }}) ->
764764 Num = length (MXs ),
765- rabbit_shovel_behaviour : decr_remaining (
765+ decr_remaining (
766766 Num , State #{dest => Dst #{rejected => [MXs | R ],
767767 rejected_count => RC + Num }}).
768768
@@ -824,3 +824,12 @@ send_confirms_and_nacks(State = #{dest := Dst = #{confirmed := C,
824824 ConfirmTags ,
825825 State1 #{dest => Dst2 #{rejected => [],
826826 rejected_count => 0 }}).
827+
828+ decr_remaining (Num , State ) ->
829+ try
830+ rabbit_shovel_behaviour :decr_remaining (Num , State )
831+ catch
832+ exit :{shutdown , autodelete } = R ->
833+ _ = send_confirms_and_nacks (State ),
834+ exit (R )
835+ end .
0 commit comments