|
15 | 15 | -include_lib("rabbit/include/mc.hrl"). |
16 | 16 | -include("rabbit_shovel.hrl"). |
17 | 17 |
|
| 18 | +-rabbit_boot_step({rabbit_global_local_shovel_counters, |
| 19 | + [{description, "global local shovel counters"}, |
| 20 | + {mfa, {?MODULE, boot_step, |
| 21 | + []}}, |
| 22 | + {requires, rabbit_global_counters}, |
| 23 | + {enables, external_infrastructure}]}). |
| 24 | + |
18 | 25 | -export([ |
| 26 | + boot_step/0, |
19 | 27 | parse/2, |
20 | 28 | connect_source/1, |
21 | 29 | connect_dest/1, |
|
53 | 61 | %% See rabbit_amqp_session.erl |
54 | 62 | -define(INITIAL_DELIVERY_COUNT, 16#ff_ff_ff_ff - 4). |
55 | 63 | -define(DEFAULT_MAX_LINK_CREDIT, 1000). |
| 64 | +-define(PROTOCOL, 'local-shovel'). |
56 | 65 |
|
57 | 66 | -record(pending_ack, { |
58 | 67 | delivery_tag, |
59 | 68 | msg_id |
60 | 69 | }). |
61 | 70 |
|
| 71 | +boot_step() -> |
| 72 | + Labels = #{protocol => ?PROTOCOL}, |
| 73 | + rabbit_global_counters:init(Labels), |
| 74 | + rabbit_global_counters:init(Labels#{queue_type => rabbit_classic_queue}), |
| 75 | + rabbit_global_counters:init(Labels#{queue_type => rabbit_quorum_queue}), |
| 76 | + rabbit_global_counters:init(Labels#{queue_type => rabbit_stream_queue}). |
| 77 | + |
62 | 78 | parse(_Name, {source, Source}) -> |
63 | 79 | Queue = parse_parameter(queue, fun parse_binary/1, |
64 | 80 | proplists:get_value(queue, Source)), |
@@ -185,6 +201,7 @@ init_source(State = #{source := #{queue_r := QName, |
185 | 201 | end |
186 | 202 | end) of |
187 | 203 | {Remaining, {ok, QState1}} -> |
| 204 | + rabbit_global_counters:consumer_created(?PROTOCOL), |
188 | 205 | {ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, ?INITIAL_DELIVERY_COUNT, MaxLinkCredit, false, QState1), |
189 | 206 | State2 = State#{source => Src#{current => Current#{queue_states => QState, |
190 | 207 | consumer_tag => CTag}, |
@@ -214,6 +231,7 @@ init_source(State = #{source := #{queue_r := QName, |
214 | 231 | init_dest(#{name := Name, |
215 | 232 | shovel_type := Type, |
216 | 233 | dest := #{add_forward_headers := AFH} = Dst} = State) -> |
| 234 | + rabbit_global_counters:publisher_created(?PROTOCOL), |
217 | 235 | _TRef = erlang:send_after(1000, self(), send_confirms_and_nacks), |
218 | 236 | case AFH of |
219 | 237 | true -> |
@@ -258,12 +276,14 @@ dest_endpoint(_Config) -> |
258 | 276 | []. |
259 | 277 |
|
260 | 278 | close_dest(_State) -> |
| 279 | + rabbit_global_counters:publisher_deleted(?PROTOCOL), |
261 | 280 | ok. |
262 | 281 |
|
263 | 282 | close_source(#{source := #{current := #{queue_states := QStates0, |
264 | 283 | consumer_tag := CTag, |
265 | 284 | user := User}, |
266 | 285 | queue_r := QName}}) -> |
| 286 | + rabbit_global_counters:consumer_deleted(?PROTOCOL), |
267 | 287 | case rabbit_amqqueue:with( |
268 | 288 | QName, |
269 | 289 | fun(Q) -> |
@@ -363,6 +383,7 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = |
363 | 383 | Msg = set_annotations(Msg0, Dest), |
364 | 384 | RoutedQNames = route(Msg, Dest), |
365 | 385 | Queues = rabbit_amqqueue:lookup_many(RoutedQNames), |
| 386 | + messages_received(AckMode), |
366 | 387 | case rabbit_queue_type:deliver(Queues, Msg, Options, QState) of |
367 | 388 | {ok, QState1, Actions} -> |
368 | 389 | State1 = State#{dest => Dest1#{current => Current1#{queue_states => QState1}}}, |
@@ -451,13 +472,15 @@ handle_queue_actions(Actions, State) -> |
451 | 472 | end, State, Actions). |
452 | 473 |
|
453 | 474 | handle_deliver(AckRequired, Msgs, State) when is_list(Msgs) -> |
| 475 | + NumMsgs = length(Msgs), |
454 | 476 | maybe_grant_credit( |
455 | 477 | lists:foldl( |
456 | | - fun({_QName, _QPid, MsgId, _Redelivered, Mc}, S0) -> |
| 478 | + fun({QName, _QPid, MsgId, _Redelivered, Mc}, #{source := #{current := #{queue_states := QStates }}} = S0) -> |
| 479 | + messages_delivered(QName, QStates), |
457 | 480 | DeliveryTag = next_tag(S0), |
458 | 481 | S = record_pending(AckRequired, DeliveryTag, MsgId, increase_next_tag(S0)), |
459 | 482 | rabbit_shovel_behaviour:forward(DeliveryTag, Mc, S) |
460 | | - end, sent_delivery(State, length(Msgs)), Msgs)). |
| 483 | + end, sent_delivery(State, NumMsgs), Msgs)). |
461 | 484 |
|
462 | 485 | next_tag(#{source := #{current := #{next_tag := DeliveryTag}}}) -> |
463 | 486 | DeliveryTag. |
@@ -616,6 +639,7 @@ settle(Op, DeliveryTag, Multiple, |
616 | 639 | {MsgIds, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple), |
617 | 640 | case rabbit_queue_type:settle(QRef, Op, CTag, lists:reverse(MsgIds), QState0) of |
618 | 641 | {ok, QState1, Actions} -> |
| 642 | + messages_acknowledged(Op, QRef, QState1, MsgIds), |
619 | 643 | State = State0#{source => Src#{current => Current#{queue_states => QState1, |
620 | 644 | unacked_message_q => UAMQ}}}, |
621 | 645 | handle_queue_actions(Actions, State); |
@@ -739,12 +763,18 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra |
739 | 763 | at_least_one_credit_req_in_flight => false}} |
740 | 764 | end. |
741 | 765 |
|
742 | | -process_routing_confirm(undefined, _, _, State) -> |
| 766 | +process_routing_confirm(undefined, _, [], State) -> |
| 767 | + rabbit_global_counters:messages_unroutable_returned(?PROTOCOL, 1), |
| 768 | + State; |
| 769 | +process_routing_confirm(undefined, _, QRefs, State) -> |
| 770 | + rabbit_global_counters:messages_routed(?PROTOCOL, length(QRefs)), |
743 | 771 | State; |
744 | 772 | process_routing_confirm(MsgSeqNo, Tag, [], State) |
745 | 773 | when is_integer(MsgSeqNo) -> |
| 774 | + rabbit_global_counters:messages_unroutable_dropped(?PROTOCOL, 1), |
746 | 775 | record_confirms([{MsgSeqNo, Tag}], State); |
747 | 776 | process_routing_confirm(MsgSeqNo, Tag, QRefs, #{dest := Dst = #{unconfirmed := Unconfirmed}} = State) when is_integer(MsgSeqNo) -> |
| 777 | + rabbit_global_counters:messages_routed(?PROTOCOL, length(QRefs)), |
748 | 778 | State#{dest => Dst#{unconfirmed => |
749 | 779 | rabbit_shovel_confirms:insert(MsgSeqNo, QRefs, Tag, Unconfirmed)}}. |
750 | 780 |
|
@@ -781,8 +811,10 @@ send_nacks(Rs, Cs, State) -> |
781 | 811 | send_confirms([], _, State) -> |
782 | 812 | State; |
783 | 813 | send_confirms([MsgSeqNo], _, State) -> |
| 814 | + rabbit_global_counters:messages_confirmed(?PROTOCOL, 1), |
784 | 815 | rabbit_shovel_behaviour:ack(MsgSeqNo, false, State); |
785 | 816 | send_confirms(Cs, Rs, State) -> |
| 817 | + rabbit_global_counters:messages_confirmed(?PROTOCOL, length(Cs)), |
786 | 818 | coalesce_and_send(Cs, Rs, |
787 | 819 | fun(MsgSeqNo, Multiple, StateX) -> |
788 | 820 | rabbit_shovel_behaviour:ack(MsgSeqNo, Multiple, StateX) |
@@ -833,3 +865,30 @@ decr_remaining(Num, State) -> |
833 | 865 | _ = send_confirms_and_nacks(State), |
834 | 866 | exit(R) |
835 | 867 | end. |
| 868 | + |
| 869 | +messages_acknowledged(complete, QName, QS, MsgIds) -> |
| 870 | + case rabbit_queue_type:module(QName, QS) of |
| 871 | + {ok, QType} -> |
| 872 | + rabbit_global_counters:messages_acknowledged(?PROTOCOL, QType, length(MsgIds)); |
| 873 | + _ -> |
| 874 | + ok |
| 875 | + end; |
| 876 | +messages_acknowledged(_, _, _, _) -> |
| 877 | + ok. |
| 878 | + |
| 879 | +messages_received(AckMode) -> |
| 880 | + rabbit_global_counters:messages_received(?PROTOCOL, 1), |
| 881 | + case AckMode of |
| 882 | + on_confirm -> |
| 883 | + rabbit_global_counters:messages_received_confirm(?PROTOCOL, 1); |
| 884 | + _ -> |
| 885 | + ok |
| 886 | + end. |
| 887 | + |
| 888 | +messages_delivered(QName, S0) -> |
| 889 | + case rabbit_queue_type:module(QName, S0) of |
| 890 | + {ok, QType} -> |
| 891 | + rabbit_global_counters:messages_delivered(?PROTOCOL, QType, 1); |
| 892 | + _ -> |
| 893 | + ok |
| 894 | + end. |
0 commit comments