Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 146 additions & 60 deletions deps/rabbitmq_shovel/src/rabbit_local_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,19 @@ connect_dest(State = #{dest := Dest = #{resource_decl := {M, F, MFArgs},
State#{dest => Dest#{current => #{queue_states => QState,
delivery_id => 1,
vhost => VHost},
unacked => #{}}};
unconfirmed => rabbit_shovel_confirms:init(),
rejected => [],
rejected_count => 0,
confirmed => [],
confirmed_count => 0}};
_ ->
State#{dest => Dest#{current => #{queue_states => QState,
vhost => VHost},
unacked => #{}}}
unconfirmed => rabbit_shovel_confirms:init(),
confirmed => [],
confirmed_count => 0,
rejected => [],
rejected_count => 0}}
end).

maybe_add_dest_queue(State = #{dest := Dest = #{queue := QName,
Expand Down Expand Up @@ -206,6 +214,7 @@ init_source(State = #{source := #{queue_r := QName,
init_dest(#{name := Name,
shovel_type := Type,
dest := #{add_forward_headers := AFH} = Dst} = State) ->
_TRef = erlang:send_after(1000, self(), send_confirms_and_nacks),
case AFH of
true ->
Props = #{<<"x-opt-shovelled-by">> => rabbit_nodes:cluster_name(),
Expand Down Expand Up @@ -291,6 +300,9 @@ handle_source({queue_event, #resource{name = Queue,
{protocol_error, _Type, Reason, ReasonArgs} ->
{stop, list_to_binary(io_lib:format(Reason, ReasonArgs))}
end;
handle_source(send_confirms_and_nacks, State) ->
_TRef = erlang:send_after(1000, self(), send_confirms_and_nacks),
send_confirms_and_nacks(State);
handle_source({{'DOWN', #resource{name = Queue,
kind = queue,
virtual_host = VHost}}, _, _, _, _} ,
Expand All @@ -305,9 +317,9 @@ handle_dest({queue_event, QRef, Evt},
case rabbit_queue_type:handle_event(QRef, Evt, QueueStates0) of
{ok, QState1, Actions} ->
State = State0#{dest => Dest#{current => Current#{queue_states => QState1}}},
handle_dest_queue_actions(Actions, State);
send_confirms_and_nacks(handle_dest_queue_actions(Actions, State));
{eol, Actions} ->
_ = handle_dest_queue_actions(Actions, State0),
_ = send_confirms_and_nacks(handle_dest_queue_actions(Actions, State0)),
{stop, {outbound_link_or_channel_closure, queue_deleted}};
{protocol_error, _Type, Reason, ReasonArgs} ->
{stop, list_to_binary(io_lib:format(Reason, ReasonArgs))}
Expand Down Expand Up @@ -337,8 +349,7 @@ ack(DeliveryTag, Multiple, State) ->
nack(DeliveryTag, Multiple, State) ->
maybe_grant_credit(settle(requeue, DeliveryTag, Multiple, State)).

forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current,
unacked := Unacked} = Dest,
forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = Dest,
ack_mode := AckMode} = State0) ->
{Options, #{dest := #{current := Current1} = Dest1} = State} =
case AckMode of
Expand All @@ -350,28 +361,29 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current,
{#{}, State0}
end,
Msg = set_annotations(Msg0, Dest),
QNames = route(Msg, Dest),
Queues = rabbit_amqqueue:lookup_many(QNames),
RoutedQNames = route(Msg, Dest),
Queues = rabbit_amqqueue:lookup_many(RoutedQNames),
case rabbit_queue_type:deliver(Queues, Msg, Options, QState) of
{ok, QState1, Actions} ->
State1 = State#{dest => Dest1#{current => Current1#{queue_states => QState1}}},
#{dest := Dst1} = State2 = rabbit_shovel_behaviour:incr_forwarded(State1),
State4 = rabbit_shovel_behaviour:decr_remaining_unacked(
State2 = rabbit_shovel_behaviour:incr_forwarded(State1),
State3 = rabbit_shovel_behaviour:decr_remaining_unacked(
case AckMode of
no_ack ->
rabbit_shovel_behaviour:decr_remaining(1, State2);
on_confirm when length(Queues) > 0 ->
Correlation = maps:get(correlation, Options),
State2#{dest => Dst1#{unacked => Unacked#{Correlation => Tag}}};
on_confirm ->
%% Drop the messages as 0.9.1, no destination available
State3 = rabbit_shovel_behaviour:ack(Tag, false, State2),
rabbit_shovel_behaviour:decr_remaining(1, State3);
State2;
on_publish ->
State3 = rabbit_shovel_behaviour:ack(Tag, false, State2),
rabbit_shovel_behaviour:decr_remaining(1, State3)
rabbit_shovel_behaviour:decr_remaining(
1,
record_confirms([{Tag, Tag}], State2));
_ ->
rabbit_shovel_behaviour:decr_remaining(1, State2)
end),
handle_dest_queue_actions(Actions, State4);
MsgSeqNo = maps:get(correlation, Options, undefined),
QNames = lists:map(fun({QName, _}) -> QName;
(QName) -> QName
end, RoutedQNames),
State4 = process_routing_confirm(MsgSeqNo, Tag, QNames, State3),
send_confirms_and_nacks(handle_dest_queue_actions(Actions, State4));
{error, Reason} ->
exit({shutdown, Reason})
end.
Expand Down Expand Up @@ -455,14 +467,21 @@ increase_next_tag(#{source := Source = #{current := Current = #{next_tag := Deli

handle_dest_queue_actions(Actions, State) ->
lists:foldl(
fun({settled, _QName, MsgSeqNos}, S0) ->
confirm_to_inbound(fun(Tag, StateX) ->
rabbit_shovel_behaviour:ack(Tag, false, StateX)
end, MsgSeqNos, S0);
({rejected, _QName, MsgSeqNos}, S0) ->
confirm_to_inbound(fun(Tag, StateX) ->
rabbit_shovel_behaviour:nack(Tag, false, StateX)
end, MsgSeqNos, S0);
fun({settled, QName, MsgSeqNos}, S0) ->
confirm(MsgSeqNos, QName, S0);
({rejected, _QName, MsgSeqNos}, #{dest := Dst = #{unconfirmed := U0}} = S0) ->
{U, Rej} =
lists:foldr(
fun(SeqNo, {U1, Acc}) ->
case rabbit_shovel_confirms:reject(SeqNo, U1) of
{ok, MX, U2} ->
{U2, [MX | Acc]};
{error, not_found} ->
{U1, Acc}
end
end, {U0, []}, MsgSeqNos),
S = S0#{dest => Dst#{unconfirmed => U}},
record_rejects(Rej, S);
%% TODO handle {block, QName}
(_Action, S0) ->
S0
Expand Down Expand Up @@ -591,22 +610,20 @@ get_user_vhost_from_amqp_param(Uri) ->
settle(Op, DeliveryTag, Multiple,
#{source := #{queue_r := QRef,
current := Current = #{consumer_tag := CTag,
unacked_message_q := UAMQ0}
unacked_message_q := UAMQ0,
queue_states := QState0}
} = Src} = State0) ->
{MsgIds, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple),
State = State0#{source => Src#{current => Current#{unacked_message_q => UAMQ}}},
lists:foldl(
fun(MsgId, #{source := Src0 = #{current := Current0 = #{queue_states := QState0}}} = St0) ->
case rabbit_queue_type:settle(QRef, Op, CTag, [MsgId], QState0) of
{ok, QState1, Actions} ->
St = St0#{source => Src0#{current => Current0#{queue_states => QState1}}},
handle_queue_actions(Actions, St);
{'protocol_error', Type, Reason, Args} ->
?LOG_ERROR("Shovel failed to settle ~p acknowledgments with ~tp: ~tp",
[Op, Type, io_lib:format(Reason, Args)]),
exit({shutdown, {ack_failed, Reason}})
end
end, State, MsgIds).
case rabbit_queue_type:settle(QRef, Op, CTag, lists:reverse(MsgIds), QState0) of
{ok, QState1, Actions} ->
State = State0#{source => Src#{current => Current#{queue_states => QState1,
unacked_message_q => UAMQ}}},
handle_queue_actions(Actions, State);
{'protocol_error', Type, Reason, Args} ->
?LOG_ERROR("Shovel failed to settle ~p acknowledgments with ~tp: ~tp",
[Op, Type, io_lib:format(Reason, Args)]),
exit({shutdown, {ack_failed, Reason}})
end.

%% From rabbit_channel
%% Records a client-sent acknowledgement. Handles both single delivery acks
Expand Down Expand Up @@ -649,23 +666,6 @@ route(Msg, #{current := #{vhost := VHost}}) ->
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
rabbit_exchange:route(Exchange, Msg, #{return_binding_keys => true}).

confirm_to_inbound(ConfirmFun, SeqNos, State)
when is_list(SeqNos) ->
lists:foldl(fun(Seq, State0) ->
confirm_to_inbound(ConfirmFun, Seq, State0)
end, State, SeqNos);
confirm_to_inbound(ConfirmFun, Seq,
State0 = #{dest := #{unacked := Unacked} = Dst}) ->
case Unacked of
#{Seq := InTag} ->
Unacked1 = maps:remove(Seq, Unacked),
State = rabbit_shovel_behaviour:decr_remaining(
1, State0#{dest => Dst#{unacked => Unacked1}}),
ConfirmFun(InTag, State);
_ ->
State0
end.

sent_delivery(#{source := #{delivery_count := DeliveryCount0,
credit := Credit0} = Src
} = State0, NumMsgs) ->
Expand Down Expand Up @@ -738,3 +738,89 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra
State0#{source => Src#{credit => Credit,
at_least_one_credit_req_in_flight => false}}
end.

process_routing_confirm(undefined, _, _, State) ->
State;
process_routing_confirm(MsgSeqNo, Tag, [], State)
when is_integer(MsgSeqNo) ->
record_confirms([{MsgSeqNo, Tag}], State);
process_routing_confirm(MsgSeqNo, Tag, QRefs, #{dest := Dst = #{unconfirmed := Unconfirmed}} = State) when is_integer(MsgSeqNo) ->
State#{dest => Dst#{unconfirmed =>
rabbit_shovel_confirms:insert(MsgSeqNo, QRefs, Tag, Unconfirmed)}}.

record_confirms([], State) ->
State;
record_confirms(MXs, State = #{dest := Dst = #{confirmed := C,
confirmed_count := CC}}) ->
Num = length(MXs),
rabbit_shovel_behaviour:decr_remaining(
Num, State#{dest => Dst#{confirmed => [MXs | C],
confirmed_count => CC + Num}}).

record_rejects([], State) ->
State;
record_rejects(MXs, State = #{dest := Dst = #{rejected := R,
rejected_count := RC}}) ->
Num = length(MXs),
rabbit_shovel_behaviour:decr_remaining(
Num, State#{dest => Dst#{rejected => [MXs | R],
rejected_count => RC + Num}}).

confirm(MsgSeqNos, QRef, State = #{dest := Dst = #{unconfirmed := UC}}) ->
{ConfirmMXs, UC1} = rabbit_shovel_confirms:confirm(MsgSeqNos, QRef, UC),
record_confirms(ConfirmMXs, State#{dest => Dst#{unconfirmed => UC1}}).

send_nacks([], _, State) ->
State;
send_nacks(Rs, Cs, State) ->
coalesce_and_send(Rs, Cs,
fun(MsgSeqNo, Multiple, StateX) ->
rabbit_shovel_behaviour:nack(MsgSeqNo, Multiple, StateX)
end, State).

send_confirms([], _, State) ->
State;
send_confirms([MsgSeqNo], _, State) ->
rabbit_shovel_behaviour:ack(MsgSeqNo, false, State);
send_confirms(Cs, Rs, State) ->
coalesce_and_send(Cs, Rs,
fun(MsgSeqNo, Multiple, StateX) ->
rabbit_shovel_behaviour:ack(MsgSeqNo, Multiple, StateX)
end, State).

coalesce_and_send(MsgSeqNos, NegativeMsgSeqNos, MkMsgFun,
State = #{dest := #{unconfirmed := UC}}) ->
SMsgSeqNos = lists:usort(MsgSeqNos),
UnconfirmedCutoff = case rabbit_shovel_confirms:is_empty(UC) of
true -> lists:last(SMsgSeqNos) + 1;
false -> rabbit_shovel_confirms:smallest(UC)
end,
Cutoff = lists:min([UnconfirmedCutoff | NegativeMsgSeqNos]),
{Ms, Ss} = lists:splitwith(fun(X) -> X < Cutoff end, SMsgSeqNos),
State1 = case Ms of
[] -> State;
_ -> MkMsgFun(lists:last(Ms), true, State)
end,
lists:foldl(fun(SeqNo, S) ->
MkMsgFun(SeqNo, false, S)
end, State1, Ss).

%% Todo remove XName from confirm/unconfirm as we don't need it for local
send_confirms_and_nacks(State = #{dest := #{confirmed := [],
rejected := []}}) ->
State;
send_confirms_and_nacks(State = #{dest := Dst = #{confirmed := C,
rejected := R}}) ->
Confirms = lists:append(C),
ConfirmTags = [Tag || {_, Tag} <- Confirms],
Rejects = lists:append(R),
RejectTags = [Tag || {_, Tag} <- Rejects],
State1 = #{dest := Dst2}
= send_confirms(ConfirmTags,
RejectTags,
State#{dest => Dst#{confirmed => [],
confirmed_count => 0}}),
send_nacks(RejectTags,
ConfirmTags,
State1#{dest => Dst2#{rejected => [],
rejected_count => 0}}).
Loading
Loading