@@ -128,11 +128,19 @@ connect_dest(State = #{dest := Dest = #{resource_decl := {M, F, MFArgs},
128128 State #{dest => Dest #{current => #{queue_states => QState ,
129129 delivery_id => 1 ,
130130 vhost => VHost },
131- unacked => #{}}};
131+ unconfirmed => rabbit_shovel_confirms :init (),
132+ rejected => [],
133+ rejected_count => 0 ,
134+ confirmed => [],
135+ confirmed_count => 0 }};
132136 _ ->
133137 State #{dest => Dest #{current => #{queue_states => QState ,
134138 vhost => VHost },
135- unacked => #{}}}
139+ unconfirmed => rabbit_shovel_confirms :init (),
140+ confirmed => [],
141+ confirmed_count => 0 ,
142+ rejected => [],
143+ rejected_count => 0 }}
136144 end ).
137145
138146maybe_add_dest_queue (State = #{dest := Dest = #{queue := QName ,
@@ -206,6 +214,7 @@ init_source(State = #{source := #{queue_r := QName,
206214init_dest (#{name := Name ,
207215 shovel_type := Type ,
208216 dest := #{add_forward_headers := AFH } = Dst } = State ) ->
217+ _TRef = erlang :send_after (1000 , self (), send_confirms_and_nacks ),
209218 case AFH of
210219 true ->
211220 Props = #{<<" x-opt-shovelled-by" >> => rabbit_nodes :cluster_name (),
@@ -291,6 +300,9 @@ handle_source({queue_event, #resource{name = Queue,
291300 {protocol_error , _Type , Reason , ReasonArgs } ->
292301 {stop , list_to_binary (io_lib :format (Reason , ReasonArgs ))}
293302 end ;
303+ handle_source (send_confirms_and_nacks , State ) ->
304+ _TRef = erlang :send_after (1000 , self (), send_confirms_and_nacks ),
305+ send_confirms_and_nacks (State );
294306handle_source ({{'DOWN' , # resource {name = Queue ,
295307 kind = queue ,
296308 virtual_host = VHost }}, _ , _ , _ , _ } ,
@@ -305,9 +317,9 @@ handle_dest({queue_event, QRef, Evt},
305317 case rabbit_queue_type :handle_event (QRef , Evt , QueueStates0 ) of
306318 {ok , QState1 , Actions } ->
307319 State = State0 #{dest => Dest #{current => Current #{queue_states => QState1 }}},
308- handle_dest_queue_actions (Actions , State );
320+ send_confirms_and_nacks ( handle_dest_queue_actions (Actions , State ) );
309321 {eol , Actions } ->
310- _ = handle_dest_queue_actions (Actions , State0 ),
322+ _ = send_confirms_and_nacks ( handle_dest_queue_actions (Actions , State0 ) ),
311323 {stop , {outbound_link_or_channel_closure , queue_deleted }};
312324 {protocol_error , _Type , Reason , ReasonArgs } ->
313325 {stop , list_to_binary (io_lib :format (Reason , ReasonArgs ))}
@@ -337,8 +349,7 @@ ack(DeliveryTag, Multiple, State) ->
337349nack (DeliveryTag , Multiple , State ) ->
338350 maybe_grant_credit (settle (requeue , DeliveryTag , Multiple , State )).
339351
340- forward (Tag , Msg0 , #{dest := #{current := #{queue_states := QState } = Current ,
341- unacked := Unacked } = Dest ,
352+ forward (Tag , Msg0 , #{dest := #{current := #{queue_states := QState } = Current } = Dest ,
342353 ack_mode := AckMode } = State0 ) ->
343354 {Options , #{dest := #{current := Current1 } = Dest1 } = State } =
344355 case AckMode of
@@ -350,28 +361,29 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current,
350361 {#{}, State0 }
351362 end ,
352363 Msg = set_annotations (Msg0 , Dest ),
353- QNames = route (Msg , Dest ),
354- Queues = rabbit_amqqueue :lookup_many (QNames ),
364+ RoutedQNames = route (Msg , Dest ),
365+ Queues = rabbit_amqqueue :lookup_many (RoutedQNames ),
355366 case rabbit_queue_type :deliver (Queues , Msg , Options , QState ) of
356367 {ok , QState1 , Actions } ->
357368 State1 = State #{dest => Dest1 #{current => Current1 #{queue_states => QState1 }}},
358- #{ dest : = Dst1 } = State2 = rabbit_shovel_behaviour :incr_forwarded (State1 ),
359- State4 = rabbit_shovel_behaviour :decr_remaining_unacked (
369+ State2 = rabbit_shovel_behaviour :incr_forwarded (State1 ),
370+ State3 = rabbit_shovel_behaviour :decr_remaining_unacked (
360371 case AckMode of
361- no_ack ->
362- rabbit_shovel_behaviour :decr_remaining (1 , State2 );
363372 on_confirm when length (Queues ) > 0 ->
364- Correlation = maps :get (correlation , Options ),
365- State2 #{dest => Dst1 #{unacked => Unacked #{Correlation => Tag }}};
366- on_confirm ->
367- % % Drop the messages as 0.9.1, no destination available
368- State3 = rabbit_shovel_behaviour :ack (Tag , false , State2 ),
369- rabbit_shovel_behaviour :decr_remaining (1 , State3 );
373+ State2 ;
370374 on_publish ->
371- State3 = rabbit_shovel_behaviour :ack (Tag , false , State2 ),
372- rabbit_shovel_behaviour :decr_remaining (1 , State3 )
375+ rabbit_shovel_behaviour :decr_remaining (
376+ 1 ,
377+ record_confirms ([{Tag , Tag }], State2 ));
378+ _ ->
379+ rabbit_shovel_behaviour :decr_remaining (1 , State2 )
373380 end ),
374- handle_dest_queue_actions (Actions , State4 );
381+ MsgSeqNo = maps :get (correlation , Options , undefined ),
382+ QNames = lists :map (fun ({QName , _ }) -> QName ;
383+ (QName ) -> QName
384+ end , RoutedQNames ),
385+ State4 = process_routing_confirm (MsgSeqNo , Tag , QNames , State3 ),
386+ send_confirms_and_nacks (handle_dest_queue_actions (Actions , State4 ));
375387 {error , Reason } ->
376388 exit ({shutdown , Reason })
377389 end .
@@ -455,14 +467,21 @@ increase_next_tag(#{source := Source = #{current := Current = #{next_tag := Deli
455467
456468handle_dest_queue_actions (Actions , State ) ->
457469 lists :foldl (
458- fun ({settled , _QName , MsgSeqNos }, S0 ) ->
459- confirm_to_inbound (fun (Tag , StateX ) ->
460- rabbit_shovel_behaviour :ack (Tag , false , StateX )
461- end , MsgSeqNos , S0 );
462- ({rejected , _QName , MsgSeqNos }, S0 ) ->
463- confirm_to_inbound (fun (Tag , StateX ) ->
464- rabbit_shovel_behaviour :nack (Tag , false , StateX )
465- end , MsgSeqNos , S0 );
470+ fun ({settled , QName , MsgSeqNos }, S0 ) ->
471+ confirm (MsgSeqNos , QName , S0 );
472+ ({rejected , _QName , MsgSeqNos }, #{dest := Dst = #{unconfirmed := U0 }} = S0 ) ->
473+ {U , Rej } =
474+ lists :foldr (
475+ fun (SeqNo , {U1 , Acc }) ->
476+ case rabbit_shovel_confirms :reject (SeqNo , U1 ) of
477+ {ok , MX , U2 } ->
478+ {U2 , [MX | Acc ]};
479+ {error , not_found } ->
480+ {U1 , Acc }
481+ end
482+ end , {U0 , []}, MsgSeqNos ),
483+ S = S0 #{dest => Dst #{unconfirmed => U }},
484+ record_rejects (Rej , S );
466485 % % TODO handle {block, QName}
467486 (_Action , S0 ) ->
468487 S0
@@ -591,22 +610,20 @@ get_user_vhost_from_amqp_param(Uri) ->
591610settle (Op , DeliveryTag , Multiple ,
592611 #{source := #{queue_r := QRef ,
593612 current := Current = #{consumer_tag := CTag ,
594- unacked_message_q := UAMQ0 }
613+ unacked_message_q := UAMQ0 ,
614+ queue_states := QState0 }
595615 } = Src } = State0 ) ->
596616 {MsgIds , UAMQ } = collect_acks (UAMQ0 , DeliveryTag , Multiple ),
597- State = State0 #{source => Src #{current => Current #{unacked_message_q => UAMQ }}},
598- lists :foldl (
599- fun (MsgId , #{source := Src0 = #{current := Current0 = #{queue_states := QState0 }}} = St0 ) ->
600- case rabbit_queue_type :settle (QRef , Op , CTag , [MsgId ], QState0 ) of
601- {ok , QState1 , Actions } ->
602- St = St0 #{source => Src0 #{current => Current0 #{queue_states => QState1 }}},
603- handle_queue_actions (Actions , St );
604- {'protocol_error' , Type , Reason , Args } ->
605- ? LOG_ERROR (" Shovel failed to settle ~p acknowledgments with ~tp : ~tp " ,
606- [Op , Type , io_lib :format (Reason , Args )]),
607- exit ({shutdown , {ack_failed , Reason }})
608- end
609- end , State , MsgIds ).
617+ case rabbit_queue_type :settle (QRef , Op , CTag , lists :reverse (MsgIds ), QState0 ) of
618+ {ok , QState1 , Actions } ->
619+ State = State0 #{source => Src #{current => Current #{queue_states => QState1 ,
620+ unacked_message_q => UAMQ }}},
621+ handle_queue_actions (Actions , State );
622+ {'protocol_error' , Type , Reason , Args } ->
623+ ? LOG_ERROR (" Shovel failed to settle ~p acknowledgments with ~tp : ~tp " ,
624+ [Op , Type , io_lib :format (Reason , Args )]),
625+ exit ({shutdown , {ack_failed , Reason }})
626+ end .
610627
611628% % From rabbit_channel
612629% % Records a client-sent acknowledgement. Handles both single delivery acks
@@ -649,23 +666,6 @@ route(Msg, #{current := #{vhost := VHost}}) ->
649666 Exchange = rabbit_exchange :lookup_or_die (ExchangeName ),
650667 rabbit_exchange :route (Exchange , Msg , #{return_binding_keys => true }).
651668
652- confirm_to_inbound (ConfirmFun , SeqNos , State )
653- when is_list (SeqNos ) ->
654- lists :foldl (fun (Seq , State0 ) ->
655- confirm_to_inbound (ConfirmFun , Seq , State0 )
656- end , State , SeqNos );
657- confirm_to_inbound (ConfirmFun , Seq ,
658- State0 = #{dest := #{unacked := Unacked } = Dst }) ->
659- case Unacked of
660- #{Seq := InTag } ->
661- Unacked1 = maps :remove (Seq , Unacked ),
662- State = rabbit_shovel_behaviour :decr_remaining (
663- 1 , State0 #{dest => Dst #{unacked => Unacked1 }}),
664- ConfirmFun (InTag , State );
665- _ ->
666- State0
667- end .
668-
669669sent_delivery (#{source := #{delivery_count := DeliveryCount0 ,
670670 credit := Credit0 } = Src
671671 } = State0 , NumMsgs ) ->
@@ -738,3 +738,89 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra
738738 State0 #{source => Src #{credit => Credit ,
739739 at_least_one_credit_req_in_flight => false }}
740740 end .
741+
742+ process_routing_confirm (undefined , _ , _ , State ) ->
743+ State ;
744+ process_routing_confirm (MsgSeqNo , Tag , [], State )
745+ when is_integer (MsgSeqNo ) ->
746+ record_confirms ([{MsgSeqNo , Tag }], State );
747+ process_routing_confirm (MsgSeqNo , Tag , QRefs , #{dest := Dst = #{unconfirmed := Unconfirmed }} = State ) when is_integer (MsgSeqNo ) ->
748+ State #{dest => Dst #{unconfirmed =>
749+ rabbit_shovel_confirms :insert (MsgSeqNo , QRefs , Tag , Unconfirmed )}}.
750+
751+ record_confirms ([], State ) ->
752+ State ;
753+ record_confirms (MXs , State = #{dest := Dst = #{confirmed := C ,
754+ confirmed_count := CC }}) ->
755+ Num = length (MXs ),
756+ rabbit_shovel_behaviour :decr_remaining (
757+ Num , State #{dest => Dst #{confirmed => [MXs | C ],
758+ confirmed_count => CC + Num }}).
759+
760+ record_rejects ([], State ) ->
761+ State ;
762+ record_rejects (MXs , State = #{dest := Dst = #{rejected := R ,
763+ rejected_count := RC }}) ->
764+ Num = length (MXs ),
765+ rabbit_shovel_behaviour :decr_remaining (
766+ Num , State #{dest => Dst #{rejected => [MXs | R ],
767+ rejected_count => RC + Num }}).
768+
769+ confirm (MsgSeqNos , QRef , State = #{dest := Dst = #{unconfirmed := UC }}) ->
770+ {ConfirmMXs , UC1 } = rabbit_shovel_confirms :confirm (MsgSeqNos , QRef , UC ),
771+ record_confirms (ConfirmMXs , State #{dest => Dst #{unconfirmed => UC1 }}).
772+
773+ send_nacks ([], _ , State ) ->
774+ State ;
775+ send_nacks (Rs , Cs , State ) ->
776+ coalesce_and_send (Rs , Cs ,
777+ fun (MsgSeqNo , Multiple , StateX ) ->
778+ rabbit_shovel_behaviour :nack (MsgSeqNo , Multiple , StateX )
779+ end , State ).
780+
781+ send_confirms ([], _ , State ) ->
782+ State ;
783+ send_confirms ([MsgSeqNo ], _ , State ) ->
784+ rabbit_shovel_behaviour :ack (MsgSeqNo , false , State );
785+ send_confirms (Cs , Rs , State ) ->
786+ coalesce_and_send (Cs , Rs ,
787+ fun (MsgSeqNo , Multiple , StateX ) ->
788+ rabbit_shovel_behaviour :ack (MsgSeqNo , Multiple , StateX )
789+ end , State ).
790+
791+ coalesce_and_send (MsgSeqNos , NegativeMsgSeqNos , MkMsgFun ,
792+ State = #{dest := #{unconfirmed := UC }}) ->
793+ SMsgSeqNos = lists :usort (MsgSeqNos ),
794+ UnconfirmedCutoff = case rabbit_shovel_confirms :is_empty (UC ) of
795+ true -> lists :last (SMsgSeqNos ) + 1 ;
796+ false -> rabbit_shovel_confirms :smallest (UC )
797+ end ,
798+ Cutoff = lists :min ([UnconfirmedCutoff | NegativeMsgSeqNos ]),
799+ {Ms , Ss } = lists :splitwith (fun (X ) -> X < Cutoff end , SMsgSeqNos ),
800+ State1 = case Ms of
801+ [] -> State ;
802+ _ -> MkMsgFun (lists :last (Ms ), true , State )
803+ end ,
804+ lists :foldl (fun (SeqNo , S ) ->
805+ MkMsgFun (SeqNo , false , S )
806+ end , State1 , Ss ).
807+
808+ % % Todo remove XName from confirm/unconfirm as we don't need it for local
809+ send_confirms_and_nacks (State = #{dest := #{confirmed := [],
810+ rejected := []}}) ->
811+ State ;
812+ send_confirms_and_nacks (State = #{dest := Dst = #{confirmed := C ,
813+ rejected := R }}) ->
814+ Confirms = lists :append (C ),
815+ ConfirmTags = [Tag || {_ , Tag } <- Confirms ],
816+ Rejects = lists :append (R ),
817+ RejectTags = [Tag || {_ , Tag } <- Rejects ],
818+ State1 = #{dest := Dst2 }
819+ = send_confirms (ConfirmTags ,
820+ RejectTags ,
821+ State #{dest => Dst #{confirmed => [],
822+ confirmed_count => 0 }}),
823+ send_nacks (RejectTags ,
824+ ConfirmTags ,
825+ State1 #{dest => Dst2 #{rejected => [],
826+ rejected_count => 0 }}).
0 commit comments