@@ -128,11 +128,19 @@ connect_dest(State = #{dest := Dest = #{resource_decl := {M, F, MFArgs},
128128 State #{dest => Dest #{current => #{queue_states => QState ,
129129 delivery_id => 1 ,
130130 vhost => VHost },
131- unacked => #{}}};
131+ unconfirmed => rabbit_confirms :init (),
132+ rejected => [],
133+ rejected_count => 0 ,
134+ confirmed => [],
135+ confirmed_count => 0 }};
132136 _ ->
133137 State #{dest => Dest #{current => #{queue_states => QState ,
134138 vhost => VHost },
135- unacked => #{}}}
139+ unconfirmed => rabbit_confirms :init (),
140+ confirmed => [],
141+ confirmed_count => 0 ,
142+ rejected => [],
143+ rejected_count => 0 }}
136144 end ).
137145
138146maybe_add_dest_queue (State = #{dest := Dest = #{queue := QName ,
@@ -206,6 +214,7 @@ init_source(State = #{source := #{queue_r := QName,
206214init_dest (#{name := Name ,
207215 shovel_type := Type ,
208216 dest := #{add_forward_headers := AFH } = Dst } = State ) ->
217+ _TRef = erlang :send_after (1000 , self (), send_confirms_and_nacks ),
209218 case AFH of
210219 true ->
211220 Props = #{<<" x-opt-shovelled-by" >> => rabbit_nodes :cluster_name (),
@@ -291,6 +300,9 @@ handle_source({queue_event, #resource{name = Queue,
291300 {protocol_error , _Type , Reason , ReasonArgs } ->
292301 {stop , list_to_binary (io_lib :format (Reason , ReasonArgs ))}
293302 end ;
303+ handle_source (send_confirms_and_nacks , State ) ->
304+ _TRef = erlang :send_after (1000 , self (), send_confirms_and_nacks ),
305+ send_confirms_and_nacks (State );
294306handle_source ({{'DOWN' , # resource {name = Queue ,
295307 kind = queue ,
296308 virtual_host = VHost }}, _ , _ , _ , _ } ,
@@ -305,9 +317,9 @@ handle_dest({queue_event, QRef, Evt},
305317 case rabbit_queue_type :handle_event (QRef , Evt , QueueStates0 ) of
306318 {ok , QState1 , Actions } ->
307319 State = State0 #{dest => Dest #{current => Current #{queue_states => QState1 }}},
308- handle_dest_queue_actions (Actions , State );
320+ send_confirms_and_nacks ( handle_dest_queue_actions (Actions , State ) );
309321 {eol , Actions } ->
310- _ = handle_dest_queue_actions (Actions , State0 ),
322+ _ = send_confirms_and_nacks ( handle_dest_queue_actions (Actions , State0 ) ),
311323 {stop , {outbound_link_or_channel_closure , queue_deleted }};
312324 {protocol_error , _Type , Reason , ReasonArgs } ->
313325 {stop , list_to_binary (io_lib :format (Reason , ReasonArgs ))}
@@ -337,8 +349,7 @@ ack(DeliveryTag, Multiple, State) ->
337349nack (DeliveryTag , Multiple , State ) ->
338350 maybe_grant_credit (settle (requeue , DeliveryTag , Multiple , State )).
339351
340- forward (Tag , Msg0 , #{dest := #{current := #{queue_states := QState } = Current ,
341- unacked := Unacked } = Dest ,
352+ forward (Tag , Msg0 , #{dest := #{current := #{queue_states := QState } = Current } = Dest ,
342353 ack_mode := AckMode } = State0 ) ->
343354 {Options , #{dest := #{current := Current1 } = Dest1 } = State } =
344355 case AckMode of
@@ -355,23 +366,21 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current,
355366 case rabbit_queue_type :deliver (Queues , Msg , Options , QState ) of
356367 {ok , QState1 , Actions } ->
357368 State1 = State #{dest => Dest1 #{current => Current1 #{queue_states => QState1 }}},
358- #{ dest : = Dst1 } = State2 = rabbit_shovel_behaviour :incr_forwarded (State1 ),
359- State4 = rabbit_shovel_behaviour :decr_remaining_unacked (
369+ State2 = rabbit_shovel_behaviour :incr_forwarded (State1 ),
370+ State3 = rabbit_shovel_behaviour :decr_remaining_unacked (
360371 case AckMode of
361- no_ack ->
362- rabbit_shovel_behaviour :decr_remaining (1 , State2 );
363372 on_confirm when length (Queues ) > 0 ->
364- Correlation = maps :get (correlation , Options ),
365- State2 #{dest => Dst1 #{unacked => Unacked #{Correlation => Tag }}};
366- on_confirm ->
367- % % Drop the messages as 0.9.1, no destination available
368- State3 = rabbit_shovel_behaviour :ack (Tag , false , State2 ),
369- rabbit_shovel_behaviour :decr_remaining (1 , State3 );
373+ State2 ;
370374 on_publish ->
371- State3 = rabbit_shovel_behaviour :ack (Tag , false , State2 ),
372- rabbit_shovel_behaviour :decr_remaining (1 , State3 )
375+ rabbit_shovel_behaviour :decr_remaining (
376+ 1 ,
377+ record_confirms ([{Tag , none }], State2 ));
378+ _ ->
379+ rabbit_shovel_behaviour :decr_remaining (1 , State2 )
373380 end ),
374- handle_dest_queue_actions (Actions , State4 );
381+ MsgSeqNo = maps :get (correlation , Options , undefined ),
382+ State4 = process_routing_confirm (MsgSeqNo , QNames , State3 ),
383+ send_confirms_and_nacks (handle_dest_queue_actions (Actions , State4 ));
375384 {error , Reason } ->
376385 exit ({shutdown , Reason })
377386 end .
@@ -455,14 +464,21 @@ increase_next_tag(#{source := Source = #{current := Current = #{next_tag := Deli
455464
456465handle_dest_queue_actions (Actions , State ) ->
457466 lists :foldl (
458- fun ({settled , _QName , MsgSeqNos }, S0 ) ->
459- confirm_to_inbound (fun (Tag , StateX ) ->
460- rabbit_shovel_behaviour :ack (Tag , false , StateX )
461- end , MsgSeqNos , S0 );
462- ({rejected , _QName , MsgSeqNos }, S0 ) ->
463- confirm_to_inbound (fun (Tag , StateX ) ->
464- rabbit_shovel_behaviour :nack (Tag , false , StateX )
465- end , MsgSeqNos , S0 );
467+ fun ({settled , QName , MsgSeqNos }, S0 ) ->
468+ confirm (MsgSeqNos , QName , S0 );
469+ ({rejected , _QName , MsgSeqNos }, #{dest := Dst = #{unconfirmed := U0 }} = S0 ) ->
470+ {U , Rej } =
471+ lists :foldr (
472+ fun (SeqNo , {U1 , Acc }) ->
473+ case rabbit_confirms :reject (SeqNo , U1 ) of
474+ {ok , MX , U2 } ->
475+ {U2 , [MX | Acc ]};
476+ {error , not_found } ->
477+ {U1 , Acc }
478+ end
479+ end , {U0 , []}, MsgSeqNos ),
480+ S = S0 #{dest => Dst #{unconfirmed => U }},
481+ record_rejects (Rej , S );
466482 % % TODO handle {block, QName}
467483 (_Action , S0 ) ->
468484 S0
@@ -647,23 +663,6 @@ route(Msg, #{current := #{vhost := VHost}}) ->
647663 Exchange = rabbit_exchange :lookup_or_die (ExchangeName ),
648664 rabbit_exchange :route (Exchange , Msg , #{return_binding_keys => true }).
649665
650- confirm_to_inbound (ConfirmFun , SeqNos , State )
651- when is_list (SeqNos ) ->
652- lists :foldl (fun (Seq , State0 ) ->
653- confirm_to_inbound (ConfirmFun , Seq , State0 )
654- end , State , SeqNos );
655- confirm_to_inbound (ConfirmFun , Seq ,
656- State0 = #{dest := #{unacked := Unacked } = Dst }) ->
657- case Unacked of
658- #{Seq := InTag } ->
659- Unacked1 = maps :remove (Seq , Unacked ),
660- State = rabbit_shovel_behaviour :decr_remaining (
661- 1 , State0 #{dest => Dst #{unacked => Unacked1 }}),
662- ConfirmFun (InTag , State );
663- _ ->
664- State0
665- end .
666-
667666sent_delivery (#{source := #{delivery_count := DeliveryCount0 ,
668667 credit := Credit0 } = Src
669668 } = State0 , NumMsgs ) ->
@@ -736,3 +735,90 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra
736735 State0 #{source => Src #{credit => Credit ,
737736 at_least_one_credit_req_in_flight => false }}
738737 end .
738+
739+ process_routing_confirm (undefined , _ , State ) ->
740+ State ;
741+ process_routing_confirm (MsgSeqNo , [], State )
742+ when is_integer (MsgSeqNo ) ->
743+ record_confirms ([{MsgSeqNo , none }], State );
744+ process_routing_confirm (MsgSeqNo , QRefs , #{dest := Dst = #{unconfirmed := Unconfirmed }} = State ) when is_integer (MsgSeqNo ) ->
745+ XName = rabbit_misc :r (<<" /" >>, exchange , <<>>),
746+ State #{dest => Dst #{unconfirmed =>
747+ rabbit_confirms :insert (MsgSeqNo , QRefs , XName , Unconfirmed )}}.
748+
749+ record_confirms ([], State ) ->
750+ State ;
751+ record_confirms (MXs , State = #{dest := Dst = #{confirmed := C ,
752+ confirmed_count := CC }}) ->
753+ Num = length (MXs ),
754+ rabbit_shovel_behaviour :decr_remaining (
755+ Num , State #{dest => Dst #{confirmed => [MXs | C ],
756+ confirmed_count => CC + Num }}).
757+
758+ record_rejects ([], State ) ->
759+ State ;
760+ record_rejects (MXs , State = #{dest := Dst = #{rejected := R ,
761+ rejected_count := RC }}) ->
762+ Num = length (MXs ),
763+ rabbit_shovel_behaviour :decr_remaining (
764+ Num , State #{dest => Dst #{rejected => [MXs | R ],
765+ rejected_count => RC + Num }}).
766+
767+ confirm (MsgSeqNos , QRef , State = #{dest := Dst = #{unconfirmed := UC }}) ->
768+ {ConfirmMXs , UC1 } = rabbit_confirms :confirm (MsgSeqNos , QRef , UC ),
769+ record_confirms (ConfirmMXs , State #{dest => Dst #{unconfirmed => UC1 }}).
770+
771+ send_nacks ([], _ , State ) ->
772+ State ;
773+ send_nacks (Rs , Cs , State ) ->
774+ coalesce_and_send (Rs , Cs ,
775+ fun (MsgSeqNo , Multiple , StateX ) ->
776+ rabbit_shovel_behaviour :nack (MsgSeqNo , Multiple , StateX )
777+ end , State ).
778+
779+ send_confirms ([], _ , State ) ->
780+ State ;
781+ send_confirms ([MsgSeqNo ], _ , State ) ->
782+ rabbit_shovel_behaviour :ack (MsgSeqNo , false , State );
783+ send_confirms (Cs , Rs , State ) ->
784+ coalesce_and_send (Cs , Rs ,
785+ fun (MsgSeqNo , Multiple , StateX ) ->
786+ rabbit_shovel_behaviour :ack (MsgSeqNo , Multiple , StateX )
787+ end , State ).
788+
789+ coalesce_and_send (MsgSeqNos , NegativeMsgSeqNos , MkMsgFun ,
790+ State = #{dest := #{unconfirmed := UC }}) ->
791+ SMsgSeqNos = lists :usort (MsgSeqNos ),
792+ UnconfirmedCutoff = case rabbit_confirms :is_empty (UC ) of
793+ true -> lists :last (SMsgSeqNos ) + 1 ;
794+ false -> rabbit_confirms :smallest (UC )
795+ end ,
796+ Cutoff = lists :min ([UnconfirmedCutoff | NegativeMsgSeqNos ]),
797+ {Ms , Ss } = lists :splitwith (fun (X ) -> X < Cutoff end , SMsgSeqNos ),
798+ State1 = case Ms of
799+ [] -> State ;
800+ _ -> MkMsgFun (lists :last (Ms ), true , State )
801+ end ,
802+ lists :foldl (fun (SeqNo , S ) ->
803+ MkMsgFun (SeqNo , false , S )
804+ end , State1 , Ss ).
805+
806+ % % Todo remove XName from confirm/unconfirm as we don't need it for local
807+ send_confirms_and_nacks (State = #{dest := #{confirmed := [],
808+ rejected := []}}) ->
809+ State ;
810+ send_confirms_and_nacks (State = #{dest := Dst = #{confirmed := C ,
811+ rejected := R }}) ->
812+ Confirms = lists :append (C ),
813+ ConfirmMsgSeqNos = [MsgSeqNo || {MsgSeqNo , _ } <- Confirms ],
814+ Rejects = lists :append (R ),
815+ RejectMsgSeqNos = [MsgSeqNo || {MsgSeqNo , _ } <- Rejects ],
816+ State1 = #{dest := Dst2 }
817+ = send_confirms (ConfirmMsgSeqNos ,
818+ RejectMsgSeqNos ,
819+ State #{dest => Dst #{confirmed => [],
820+ confirmed_count => 0 }}),
821+ send_nacks (RejectMsgSeqNos ,
822+ ConfirmMsgSeqNos ,
823+ State1 #{dest => Dst2 #{rejected => [],
824+ rejected_count => 0 }}).
0 commit comments