Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 32 additions & 40 deletions deps/rabbit/src/rabbit_classic_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@

-define(STATE, ?MODULE).
-record(?STATE, {
%% the current master pid
pid :: undefined | pid(),
pid :: pid(),
unconfirmed = #{} :: #{non_neg_integer() => #msg_status{}},
monitored = #{} :: #{pid() => ok}
}).
Expand Down Expand Up @@ -50,7 +49,6 @@
format/2,
init/1,
close/1,
update/2,
consume/3,
cancel/3,
handle_event/3,
Expand Down Expand Up @@ -295,16 +293,6 @@ init(Q) when ?amqqueue_is_classic(Q) ->
close(_State) ->
ok.

-spec update(amqqueue:amqqueue() | amqqueue:target(), state()) -> state().
update(Q, #?STATE{pid = Pid} = State) ->
case amqqueue:get_pid(Q) of
Pid ->
State;
NewPid ->
%% master pid is different, update
State#?STATE{pid = NewPid}
end.

consume(Q, Spec, State0) when ?amqqueue_is_classic(Q) ->
QPid = amqqueue:get_pid(Q),
QRef = amqqueue:get_name(Q),
Expand All @@ -326,9 +314,8 @@ consume(Q, Spec, State0) when ?amqqueue_is_classic(Q) ->
ExclusiveConsume, Args, OkMsg, ActingUser},
infinity]}) of
ok ->
%% TODO: track pids as they change
State = ensure_monitor(QPid, QRef, State0),
{ok, State#?STATE{pid = QPid}};
State = State0#?STATE{pid = QPid},
{ok, ensure_monitor(QRef, State)};
{error, exclusive_consume_unavailable} ->
{error, access_refused, "~ts in exclusive use",
[rabbit_misc:rs(QRef)]};
Expand Down Expand Up @@ -364,10 +351,12 @@ cancel(Q, Spec, State) ->
OkMsg = maps:get(ok_msg, Spec, undefined),
{basic_cancel, self(), ConsumerTag, OkMsg, ActingUser}
end,
case delegate:invoke(amqqueue:get_pid(Q),
{gen_server2, call, [Request, infinity]}) of
ok -> {ok, State};
Err -> Err
Pid = amqqueue:get_pid(Q),
case delegate:invoke(Pid, {gen_server2, call, [Request, infinity]}) of
ok ->
{ok, State#?STATE{pid = Pid}};
Err ->
Err
end.

-spec settle(rabbit_amqqueue:name(), rabbit_queue_type:settle_op(),
Expand Down Expand Up @@ -502,19 +491,21 @@ deliver(Qs0, Msg0, Options) ->
delegate:invoke_no_result(MPids, {gen_server2, cast, [MMsg]}),
{Qs, []}.

-spec dequeue(rabbit_amqqueue:name(), NoAck :: boolean(),
-spec dequeue(amqqueue:amqqueue(), NoAck :: boolean(),
LimiterPid :: pid(), rabbit_types:ctag(), state()) ->
{ok, Count :: non_neg_integer(), rabbit_amqqueue:qmsg(), state()} |
{empty, state()}.
dequeue(QName, NoAck, LimiterPid, _CTag, State0) ->
QPid = State0#?STATE.pid,
State1 = ensure_monitor(QPid, QName, State0),
dequeue(Q, NoAck, LimiterPid, _CTag, State0) ->
QName = amqqueue:get_name(Q),
QPid = amqqueue:get_pid(Q),
State1 = State0#?STATE{pid = QPid},
State = ensure_monitor(QName, State1),
case delegate:invoke(QPid, {gen_server2, call,
[{basic_get, self(), NoAck, LimiterPid}, infinity]}) of
empty ->
{empty, State1};
{empty, State};
{ok, Count, Msg} ->
{ok, Count, Msg, State1}
{ok, Count, Msg, State}
end.

-spec state_info(state()) -> #{atom() := term()}.
Expand Down Expand Up @@ -603,9 +594,10 @@ qpids(Qs, Confirm, MsgNo) ->
fun ({Q, S0}, {MPidAcc, Qs0}) ->
QPid = amqqueue:get_pid(Q),
QRef = amqqueue:get_name(Q),
S1 = ensure_monitor(QPid, QRef, S0),
%% confirm record only if necessary
S = case S1 of
S = case S0 of
stateless ->
S0;
#?STATE{unconfirmed = U0} ->
Rec = [QPid],
U = case Confirm of
Expand All @@ -614,10 +606,9 @@ qpids(Qs, Confirm, MsgNo) ->
true ->
U0#{MsgNo => #msg_status{pending = Rec}}
end,
S1#?STATE{pid = QPid,
unconfirmed = U};
stateless ->
S1
S1 = S0#?STATE{pid = QPid,
unconfirmed = U},
ensure_monitor(QRef, S1)
end,
{[QPid | MPidAcc], [{Q, S} | Qs0]}
end, {[], []}, Qs).
Expand Down Expand Up @@ -728,14 +719,15 @@ update_msg_status(confirm, Pid, #msg_status{pending = P,
update_msg_status(down, Pid, #msg_status{pending = P} = S) ->
S#msg_status{pending = lists:delete(Pid, P)}.

ensure_monitor(_, _, State = stateless) ->
State;
ensure_monitor(Pid, _, State = #?STATE{monitored = Monitored})
when is_map_key(Pid, Monitored) ->
State;
ensure_monitor(Pid, QName, State = #?STATE{monitored = Monitored}) ->
_ = erlang:monitor(process, Pid, [{tag, {'DOWN', QName}}]),
State#?STATE{monitored = Monitored#{Pid => ok}}.
ensure_monitor(QName, #?STATE{pid = Pid,
monitored = Monitored} = State) ->
case is_map_key(Pid, Monitored) of
true ->
State;
false ->
_Ref = erlang:monitor(process, Pid, [{tag, {'DOWN', QName}}]),
State#?STATE{monitored = Monitored#{Pid => ok}}
end.

%% part of channel <-> queue api
confirm_to_sender(Pid, QName, MsgSeqNos) ->
Expand Down
15 changes: 5 additions & 10 deletions deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,6 @@

-callback close(queue_state()) -> ok.

-callback update(amqqueue:amqqueue() | amqqueue:target(), queue_state()) ->
queue_state().

-callback consume(amqqueue:amqqueue(),
consume_spec(),
queue_state()) ->
Expand Down Expand Up @@ -251,7 +248,7 @@
Drain :: boolean(), queue_state()) ->
{queue_state(), actions()}.

-callback dequeue(queue_name(), NoAck :: boolean(), LimiterPid :: pid(),
-callback dequeue(amqqueue:amqqueue(), NoAck :: boolean(), LimiterPid :: pid(),
rabbit_types:ctag(), queue_state()) ->
{ok, Count :: non_neg_integer(), rabbit_amqqueue:qmsg(), queue_state()} |
{empty, queue_state()} |
Expand Down Expand Up @@ -751,8 +748,7 @@ credit(QName, CTag, DeliveryCount, Credit, Drain, Ctxs) ->
dequeue(Q, NoAck, LimiterPid, CTag, Ctxs) ->
#ctx{state = State0} = Ctx = get_ctx(Q, Ctxs),
Mod = amqqueue:get_type(Q),
QName = amqqueue:get_name(Q),
case Mod:dequeue(QName, NoAck, LimiterPid, CTag, State0) of
case Mod:dequeue(Q, NoAck, LimiterPid, CTag, State0) of
{ok, Num, Msg, State} ->
{ok, Num, Msg, set_ctx(Q, Ctx#ctx{state = State}, Ctxs)};
{empty, State} ->
Expand Down Expand Up @@ -785,10 +781,9 @@ get_ctx_with(#resource{kind = queue} = QRef, Contexts, undefined) ->
get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState) ->
Ref = amqqueue:get_name(Q),
case Contexts of
#{Ref := #ctx{module = Mod,
state = State} = Ctx} ->
Ctx#ctx{state = Mod:update(Q, State)};
_ when InitState == undefined ->
#{Ref := Ctx} ->
Ctx;
_ when InitState =:= undefined ->
%% not found and no initial state passed - initialize new state
Mod = amqqueue:get_type(Q),
maybe
Expand Down
12 changes: 3 additions & 9 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

-export([init/1,
close/1,
update/2,
handle_event/3]).
-export([is_recoverable/1,
recover/2,
Expand Down Expand Up @@ -234,12 +233,6 @@ init(Q) when ?is_amqqueue(Q) ->
close(State) ->
rabbit_fifo_client:close(State).

-spec update(amqqueue:amqqueue() | amqqueue:target(), rabbit_fifo_client:state()) ->
rabbit_fifo_client:state().
update(_Q, State) ->
%% QQ state maintains its own updates
State.

-spec handle_event(rabbit_amqqueue:name(),
{amqqueue:ra_server_id(), any()},
rabbit_fifo_client:state()) ->
Expand Down Expand Up @@ -975,12 +968,13 @@ credit_v1(_QName, CTag, Credit, Drain, QState) ->
credit(_QName, CTag, DeliveryCount, Credit, Drain, QState) ->
rabbit_fifo_client:credit(quorum_ctag(CTag), DeliveryCount, Credit, Drain, QState).

-spec dequeue(rabbit_amqqueue:name(), NoAck :: boolean(), pid(),
-spec dequeue(amqqueue:amqqueue(), NoAck :: boolean(), pid(),
rabbit_types:ctag(), rabbit_fifo_client:state()) ->
{empty, rabbit_fifo_client:state()} |
{ok, QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()} |
{error, term()}.
dequeue(QName, NoAck, _LimiterPid, CTag0, QState0) ->
dequeue(Q, NoAck, _LimiterPid, CTag0, QState0) ->
QName = amqqueue:get_name(Q),
CTag = quorum_ctag(CTag0),
Settlement = case NoAck of
true ->
Expand Down
4 changes: 0 additions & 4 deletions deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
transfer_leadership/2,
init/1,
close/1,
update/2,
state_info/1,
stat/1,
format/2,
Expand Down Expand Up @@ -1024,9 +1023,6 @@ close(#stream_client{readers = Readers,
rabbit_core_metrics:consumer_deleted(self(), CTag, QName)
end, Readers).

update(_Q, State) ->
State.

update_leader_pid(Pid, #stream_client{leader = Pid} = State) ->
State;
update_leader_pid(Pid, #stream_client{} = State) ->
Expand Down
4 changes: 0 additions & 4 deletions deps/rabbit/src/rabbit_volatile_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
credit/6,
init/1,
close/1,
update/2,
consume/3,
cancel/3,
handle_event/3,
Expand Down Expand Up @@ -277,9 +276,6 @@ credit(_QName, CTag, DeliveryCountRcv, LinkCreditRcv, Drain,
close(#?STATE{}) ->
ok.

update(_, #?STATE{} = State) ->
State.

cancel(_, _, #?STATE{} = State) ->
{ok, State}.

Expand Down
4 changes: 0 additions & 4 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
[
init/1,
close/1,
update/2,
consume/3,
cancel/3,
handle_event/3,
Expand Down Expand Up @@ -291,9 +290,6 @@ init(A1) ->
close(A1) ->
?UNSUPPORTED([A1]).

update(A1,A2) ->
?UNSUPPORTED([A1,A2]).

consume(A1,A2,A3) ->
?UNSUPPORTED([A1,A2,A3]).

Expand Down
Loading