diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index 3ecec12fe89c..95a56bc11ac4 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -13,8 +13,7 @@ -define(STATE, ?MODULE). -record(?STATE, { - %% the current master pid - pid :: undefined | pid(), + pid :: pid(), unconfirmed = #{} :: #{non_neg_integer() => #msg_status{}}, monitored = #{} :: #{pid() => ok} }). @@ -50,7 +49,6 @@ format/2, init/1, close/1, - update/2, consume/3, cancel/3, handle_event/3, @@ -295,16 +293,6 @@ init(Q) when ?amqqueue_is_classic(Q) -> close(_State) -> ok. --spec update(amqqueue:amqqueue() | amqqueue:target(), state()) -> state(). -update(Q, #?STATE{pid = Pid} = State) -> - case amqqueue:get_pid(Q) of - Pid -> - State; - NewPid -> - %% master pid is different, update - State#?STATE{pid = NewPid} - end. - consume(Q, Spec, State0) when ?amqqueue_is_classic(Q) -> QPid = amqqueue:get_pid(Q), QRef = amqqueue:get_name(Q), @@ -326,9 +314,8 @@ consume(Q, Spec, State0) when ?amqqueue_is_classic(Q) -> ExclusiveConsume, Args, OkMsg, ActingUser}, infinity]}) of ok -> - %% TODO: track pids as they change - State = ensure_monitor(QPid, QRef, State0), - {ok, State#?STATE{pid = QPid}}; + State = State0#?STATE{pid = QPid}, + {ok, ensure_monitor(QRef, State)}; {error, exclusive_consume_unavailable} -> {error, access_refused, "~ts in exclusive use", [rabbit_misc:rs(QRef)]}; @@ -364,10 +351,12 @@ cancel(Q, Spec, State) -> OkMsg = maps:get(ok_msg, Spec, undefined), {basic_cancel, self(), ConsumerTag, OkMsg, ActingUser} end, - case delegate:invoke(amqqueue:get_pid(Q), - {gen_server2, call, [Request, infinity]}) of - ok -> {ok, State}; - Err -> Err + Pid = amqqueue:get_pid(Q), + case delegate:invoke(Pid, {gen_server2, call, [Request, infinity]}) of + ok -> + {ok, State#?STATE{pid = Pid}}; + Err -> + Err end. -spec settle(rabbit_amqqueue:name(), rabbit_queue_type:settle_op(), @@ -502,19 +491,21 @@ deliver(Qs0, Msg0, Options) -> delegate:invoke_no_result(MPids, {gen_server2, cast, [MMsg]}), {Qs, []}. --spec dequeue(rabbit_amqqueue:name(), NoAck :: boolean(), +-spec dequeue(amqqueue:amqqueue(), NoAck :: boolean(), LimiterPid :: pid(), rabbit_types:ctag(), state()) -> {ok, Count :: non_neg_integer(), rabbit_amqqueue:qmsg(), state()} | {empty, state()}. -dequeue(QName, NoAck, LimiterPid, _CTag, State0) -> - QPid = State0#?STATE.pid, - State1 = ensure_monitor(QPid, QName, State0), +dequeue(Q, NoAck, LimiterPid, _CTag, State0) -> + QName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), + State1 = State0#?STATE{pid = QPid}, + State = ensure_monitor(QName, State1), case delegate:invoke(QPid, {gen_server2, call, [{basic_get, self(), NoAck, LimiterPid}, infinity]}) of empty -> - {empty, State1}; + {empty, State}; {ok, Count, Msg} -> - {ok, Count, Msg, State1} + {ok, Count, Msg, State} end. -spec state_info(state()) -> #{atom() := term()}. @@ -603,9 +594,10 @@ qpids(Qs, Confirm, MsgNo) -> fun ({Q, S0}, {MPidAcc, Qs0}) -> QPid = amqqueue:get_pid(Q), QRef = amqqueue:get_name(Q), - S1 = ensure_monitor(QPid, QRef, S0), %% confirm record only if necessary - S = case S1 of + S = case S0 of + stateless -> + S0; #?STATE{unconfirmed = U0} -> Rec = [QPid], U = case Confirm of @@ -614,10 +606,9 @@ qpids(Qs, Confirm, MsgNo) -> true -> U0#{MsgNo => #msg_status{pending = Rec}} end, - S1#?STATE{pid = QPid, - unconfirmed = U}; - stateless -> - S1 + S1 = S0#?STATE{pid = QPid, + unconfirmed = U}, + ensure_monitor(QRef, S1) end, {[QPid | MPidAcc], [{Q, S} | Qs0]} end, {[], []}, Qs). @@ -728,14 +719,15 @@ update_msg_status(confirm, Pid, #msg_status{pending = P, update_msg_status(down, Pid, #msg_status{pending = P} = S) -> S#msg_status{pending = lists:delete(Pid, P)}. -ensure_monitor(_, _, State = stateless) -> - State; -ensure_monitor(Pid, _, State = #?STATE{monitored = Monitored}) - when is_map_key(Pid, Monitored) -> - State; -ensure_monitor(Pid, QName, State = #?STATE{monitored = Monitored}) -> - _ = erlang:monitor(process, Pid, [{tag, {'DOWN', QName}}]), - State#?STATE{monitored = Monitored#{Pid => ok}}. +ensure_monitor(QName, #?STATE{pid = Pid, + monitored = Monitored} = State) -> + case is_map_key(Pid, Monitored) of + true -> + State; + false -> + _Ref = erlang:monitor(process, Pid, [{tag, {'DOWN', QName}}]), + State#?STATE{monitored = Monitored#{Pid => ok}} + end. %% part of channel <-> queue api confirm_to_sender(Pid, QName, MsgSeqNos) -> diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 953c34731d79..2a115c54552d 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -209,9 +209,6 @@ -callback close(queue_state()) -> ok. --callback update(amqqueue:amqqueue() | amqqueue:target(), queue_state()) -> - queue_state(). - -callback consume(amqqueue:amqqueue(), consume_spec(), queue_state()) -> @@ -251,7 +248,7 @@ Drain :: boolean(), queue_state()) -> {queue_state(), actions()}. --callback dequeue(queue_name(), NoAck :: boolean(), LimiterPid :: pid(), +-callback dequeue(amqqueue:amqqueue(), NoAck :: boolean(), LimiterPid :: pid(), rabbit_types:ctag(), queue_state()) -> {ok, Count :: non_neg_integer(), rabbit_amqqueue:qmsg(), queue_state()} | {empty, queue_state()} | @@ -751,8 +748,7 @@ credit(QName, CTag, DeliveryCount, Credit, Drain, Ctxs) -> dequeue(Q, NoAck, LimiterPid, CTag, Ctxs) -> #ctx{state = State0} = Ctx = get_ctx(Q, Ctxs), Mod = amqqueue:get_type(Q), - QName = amqqueue:get_name(Q), - case Mod:dequeue(QName, NoAck, LimiterPid, CTag, State0) of + case Mod:dequeue(Q, NoAck, LimiterPid, CTag, State0) of {ok, Num, Msg, State} -> {ok, Num, Msg, set_ctx(Q, Ctx#ctx{state = State}, Ctxs)}; {empty, State} -> @@ -785,10 +781,9 @@ get_ctx_with(#resource{kind = queue} = QRef, Contexts, undefined) -> get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState) -> Ref = amqqueue:get_name(Q), case Contexts of - #{Ref := #ctx{module = Mod, - state = State} = Ctx} -> - Ctx#ctx{state = Mod:update(Q, State)}; - _ when InitState == undefined -> + #{Ref := Ctx} -> + Ctx; + _ when InitState =:= undefined -> %% not found and no initial state passed - initialize new state Mod = amqqueue:get_type(Q), maybe diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 2e69f227841b..0bf9361e933a 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -14,7 +14,6 @@ -export([init/1, close/1, - update/2, handle_event/3]). -export([is_recoverable/1, recover/2, @@ -234,12 +233,6 @@ init(Q) when ?is_amqqueue(Q) -> close(State) -> rabbit_fifo_client:close(State). --spec update(amqqueue:amqqueue() | amqqueue:target(), rabbit_fifo_client:state()) -> - rabbit_fifo_client:state(). -update(_Q, State) -> - %% QQ state maintains its own updates - State. - -spec handle_event(rabbit_amqqueue:name(), {amqqueue:ra_server_id(), any()}, rabbit_fifo_client:state()) -> @@ -975,12 +968,13 @@ credit_v1(_QName, CTag, Credit, Drain, QState) -> credit(_QName, CTag, DeliveryCount, Credit, Drain, QState) -> rabbit_fifo_client:credit(quorum_ctag(CTag), DeliveryCount, Credit, Drain, QState). --spec dequeue(rabbit_amqqueue:name(), NoAck :: boolean(), pid(), +-spec dequeue(amqqueue:amqqueue(), NoAck :: boolean(), pid(), rabbit_types:ctag(), rabbit_fifo_client:state()) -> {empty, rabbit_fifo_client:state()} | {ok, QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()} | {error, term()}. -dequeue(QName, NoAck, _LimiterPid, CTag0, QState0) -> +dequeue(Q, NoAck, _LimiterPid, CTag0, QState0) -> + QName = amqqueue:get_name(Q), CTag = quorum_ctag(CTag0), Settlement = case NoAck of true -> diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 63e19d88c111..94bbc755a9b0 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -35,7 +35,6 @@ transfer_leadership/2, init/1, close/1, - update/2, state_info/1, stat/1, format/2, @@ -1024,9 +1023,6 @@ close(#stream_client{readers = Readers, rabbit_core_metrics:consumer_deleted(self(), CTag, QName) end, Readers). -update(_Q, State) -> - State. - update_leader_pid(Pid, #stream_client{leader = Pid} = State) -> State; update_leader_pid(Pid, #stream_client{} = State) -> diff --git a/deps/rabbit/src/rabbit_volatile_queue.erl b/deps/rabbit/src/rabbit_volatile_queue.erl index d8886de6c36a..29b085b8e305 100644 --- a/deps/rabbit/src/rabbit_volatile_queue.erl +++ b/deps/rabbit/src/rabbit_volatile_queue.erl @@ -33,7 +33,6 @@ credit/6, init/1, close/1, - update/2, consume/3, cancel/3, handle_event/3, @@ -277,9 +276,6 @@ credit(_QName, CTag, DeliveryCountRcv, LinkCreditRcv, Drain, close(#?STATE{}) -> ok. -update(_, #?STATE{} = State) -> - State. - cancel(_, _, #?STATE{} = State) -> {ok, State}. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl index c596e6dd660f..78336f20e2ce 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl @@ -58,7 +58,6 @@ [ init/1, close/1, - update/2, consume/3, cancel/3, handle_event/3, @@ -291,9 +290,6 @@ init(A1) -> close(A1) -> ?UNSUPPORTED([A1]). -update(A1,A2) -> - ?UNSUPPORTED([A1,A2]). - consume(A1,A2,A3) -> ?UNSUPPORTED([A1,A2,A3]).