@@ -372,11 +372,11 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} =
372
372
on_confirm when length (Queues ) > 0 ->
373
373
State2 ;
374
374
on_publish ->
375
- rabbit_shovel_behaviour : decr_remaining (
375
+ decr_remaining (
376
376
1 ,
377
377
record_confirms ([{Tag , Tag }], State2 ));
378
378
_ ->
379
- rabbit_shovel_behaviour : decr_remaining (1 , State2 )
379
+ decr_remaining (1 , State2 )
380
380
end ),
381
381
MsgSeqNo = maps :get (correlation , Options , undefined ),
382
382
QNames = lists :map (fun ({QName , _ }) -> QName ;
@@ -753,7 +753,7 @@ record_confirms([], State) ->
753
753
record_confirms (MXs , State = #{dest := Dst = #{confirmed := C ,
754
754
confirmed_count := CC }}) ->
755
755
Num = length (MXs ),
756
- rabbit_shovel_behaviour : decr_remaining (
756
+ decr_remaining (
757
757
Num , State #{dest => Dst #{confirmed => [MXs | C ],
758
758
confirmed_count => CC + Num }}).
759
759
@@ -762,7 +762,7 @@ record_rejects([], State) ->
762
762
record_rejects (MXs , State = #{dest := Dst = #{rejected := R ,
763
763
rejected_count := RC }}) ->
764
764
Num = length (MXs ),
765
- rabbit_shovel_behaviour : decr_remaining (
765
+ decr_remaining (
766
766
Num , State #{dest => Dst #{rejected => [MXs | R ],
767
767
rejected_count => RC + Num }}).
768
768
@@ -824,3 +824,12 @@ send_confirms_and_nacks(State = #{dest := Dst = #{confirmed := C,
824
824
ConfirmTags ,
825
825
State1 #{dest => Dst2 #{rejected => [],
826
826
rejected_count => 0 }}).
827
+
828
+ decr_remaining (Num , State ) ->
829
+ try
830
+ rabbit_shovel_behaviour :decr_remaining (Num , State )
831
+ catch
832
+ exit :{shutdown , autodelete } = R ->
833
+ _ = send_confirms_and_nacks (State ),
834
+ exit (R )
835
+ end .
0 commit comments