Skip to content

Commit bcbfc6f

Browse files
committed
Local shovel: handle destination queue events
1 parent 422a2ae commit bcbfc6f

File tree

2 files changed

+37
-22
lines changed

2 files changed

+37
-22
lines changed

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -300,16 +300,19 @@ handle_source({{'DOWN', #resource{name = Queue,
300300
handle_source(_Msg, _State) ->
301301
not_handled.
302302

303-
handle_dest({queue_event, _QRef, {confirm, MsgSeqNos, _QPid}},
304-
#{ack_mode := on_confirm} = State) ->
305-
confirm_to_inbound(fun(Tag, StateX) ->
306-
rabbit_shovel_behaviour:ack(Tag, false, StateX)
307-
end, MsgSeqNos, State);
308-
handle_dest({queue_event, _QRef, {reject_publish, Seq, _QPid}},
309-
#{ack_mode := on_confirm} = State) ->
310-
confirm_to_inbound(fun(Tag, StateX) ->
311-
rabbit_shovel_behaviour:nack(Tag, false, StateX)
312-
end, Seq, State);
303+
handle_dest({queue_event, QRef, Evt},
304+
#{ack_mode := on_confirm,
305+
dest := Dest = #{current := Current = #{queue_states := QueueStates0}}} = State0) ->
306+
case rabbit_queue_type:handle_event(QRef, Evt, QueueStates0) of
307+
{ok, QState1, Actions} ->
308+
State = State0#{dest => Dest#{current => Current#{queue_states => QState1}}},
309+
handle_dest_queue_actions(Actions, State);
310+
{eol, Actions} ->
311+
_ = handle_dest_queue_actions(Actions, State0),
312+
{stop, {outbound_link_or_channel_closure, queue_deleted}};
313+
{protocol_error, _Type, Reason, ReasonArgs} ->
314+
{stop, list_to_binary(io_lib:format(Reason, ReasonArgs))}
315+
end;
313316
handle_dest({{'DOWN', #resource{name = Queue,
314317
kind = queue,
315318
virtual_host = VHost}}, _, _, _, _} ,
@@ -424,12 +427,6 @@ handle_queue_actions(Actions, State) ->
424427
handle_credit_reply(Action, S0);
425428
(_Action, S0) ->
426429
S0
427-
%% ({queue_down, QRef}, S0) ->
428-
%% State;
429-
%% ({block, QName}, S0) ->
430-
%% State;
431-
%% ({unblock, QName}, S0) ->
432-
%% State
433430
end, State, Actions).
434431

435432
handle_deliver(AckRequired, Msgs, State) when is_list(Msgs) ->
@@ -445,6 +442,22 @@ next_tag(#{source := #{current := #{next_tag := DeliveryTag}}}) ->
445442
increase_next_tag(#{source := Source = #{current := Current = #{next_tag := DeliveryTag}}} = State) ->
446443
State#{source => Source#{current => Current#{next_tag => DeliveryTag + 1}}}.
447444

445+
handle_dest_queue_actions(Actions, State) ->
446+
lists:foldl(
447+
fun({settled, _QName, MsgSeqNos}, S0) ->
448+
maybe_grant_or_stash_credit(
449+
confirm_to_inbound(fun(Tag, StateX) ->
450+
rabbit_shovel_behaviour:ack(Tag, false, StateX)
451+
end, MsgSeqNos, S0));
452+
({rejected, _QName, MsgSeqNos}, S0) ->
453+
maybe_grant_or_stash_credit(
454+
confirm_to_inbound(fun(Tag, StateX) ->
455+
rabbit_shovel_behaviour:nack(Tag, false, StateX)
456+
end, MsgSeqNos, S0));
457+
(_Action, S0) ->
458+
S0
459+
end, State, Actions).
460+
448461
record_pending(false, _DeliveryTag, _MsgId, State) ->
449462
State;
450463
record_pending(true, DeliveryTag, MsgId, #{unacked_message_q := UAMQ0} = State) ->
@@ -628,10 +641,9 @@ route(Msg, #{current := #{vhost := VHost}}) ->
628641

629642
confirm_to_inbound(ConfirmFun, SeqNos, State)
630643
when is_list(SeqNos) ->
631-
State1 = lists:foldl(fun(Seq, State0) ->
632-
confirm_to_inbound(ConfirmFun, Seq, State0)
633-
end, State, SeqNos),
634-
maybe_grant_or_stash_credit(State1);
644+
lists:foldl(fun(Seq, State0) ->
645+
confirm_to_inbound(ConfirmFun, Seq, State0)
646+
end, State, SeqNos);
635647
confirm_to_inbound(ConfirmFun, Seq,
636648
State0 = #{dest := #{unacked := Unacked} = Dst}) ->
637649
#{Seq := InTag} = Unacked,

deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -546,12 +546,15 @@ local_to_local_delete_after_queue_length(Config) ->
546546
{<<"dest-protocol">>, <<"local">>},
547547
{<<"dest-queue">>, Dest}
548548
]),
549-
shovel_test_utils:await_no_shovel(Config, ?PARAM),
550549
%% The shovel parameter is only deleted when 'delete-after'
551550
%% is used. In any other failure, the shovel should
552551
%% remain and try to restart
553552
expect_many(Sess, Dest, 18),
554-
?assertMatch(not_found, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_runtime_parameters, lookup, [<<"/">>, <<"shovel">>, ?PARAM])),
553+
?awaitMatch(not_found, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_runtime_parameters, lookup, [<<"/">>, <<"shovel">>, ?PARAM]), 30_000),
554+
?awaitMatch([],
555+
rabbit_ct_broker_helpers:rpc(Config, 0,
556+
rabbit_shovel_status, status, []),
557+
30_000),
555558
publish_many(Sess, Src, Dest, <<"tag1">>, 5),
556559
expect_none(Sess, Dest)
557560
end).

0 commit comments

Comments
 (0)