Skip to content

Commit b330b15

Browse files
ansdmergify[bot]
authored andcommitted
Remove update/2 callback from rabbit_queue_type (#14610)
(cherry picked from commit ed05807)
1 parent bc8d9aa commit b330b15

File tree

6 files changed

+40
-71
lines changed

6 files changed

+40
-71
lines changed

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 32 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@
1313

1414
-define(STATE, ?MODULE).
1515
-record(?STATE, {
16-
%% the current master pid
17-
pid :: undefined | pid(),
16+
pid :: pid(),
1817
unconfirmed = #{} :: #{non_neg_integer() => #msg_status{}},
1918
monitored = #{} :: #{pid() => ok}
2019
}).
@@ -50,7 +49,6 @@
5049
format/2,
5150
init/1,
5251
close/1,
53-
update/2,
5452
consume/3,
5553
cancel/3,
5654
handle_event/3,
@@ -295,16 +293,6 @@ init(Q) when ?amqqueue_is_classic(Q) ->
295293
close(_State) ->
296294
ok.
297295

298-
-spec update(amqqueue:amqqueue() | amqqueue:target(), state()) -> state().
299-
update(Q, #?STATE{pid = Pid} = State) ->
300-
case amqqueue:get_pid(Q) of
301-
Pid ->
302-
State;
303-
NewPid ->
304-
%% master pid is different, update
305-
State#?STATE{pid = NewPid}
306-
end.
307-
308296
consume(Q, Spec, State0) when ?amqqueue_is_classic(Q) ->
309297
QPid = amqqueue:get_pid(Q),
310298
QRef = amqqueue:get_name(Q),
@@ -326,9 +314,8 @@ consume(Q, Spec, State0) when ?amqqueue_is_classic(Q) ->
326314
ExclusiveConsume, Args, OkMsg, ActingUser},
327315
infinity]}) of
328316
ok ->
329-
%% TODO: track pids as they change
330-
State = ensure_monitor(QPid, QRef, State0),
331-
{ok, State#?STATE{pid = QPid}};
317+
State = State0#?STATE{pid = QPid},
318+
{ok, ensure_monitor(QRef, State)};
332319
{error, exclusive_consume_unavailable} ->
333320
{error, access_refused, "~ts in exclusive use",
334321
[rabbit_misc:rs(QRef)]};
@@ -364,10 +351,12 @@ cancel(Q, Spec, State) ->
364351
OkMsg = maps:get(ok_msg, Spec, undefined),
365352
{basic_cancel, self(), ConsumerTag, OkMsg, ActingUser}
366353
end,
367-
case delegate:invoke(amqqueue:get_pid(Q),
368-
{gen_server2, call, [Request, infinity]}) of
369-
ok -> {ok, State};
370-
Err -> Err
354+
Pid = amqqueue:get_pid(Q),
355+
case delegate:invoke(Pid, {gen_server2, call, [Request, infinity]}) of
356+
ok ->
357+
{ok, State#?STATE{pid = Pid}};
358+
Err ->
359+
Err
371360
end.
372361

373362
-spec settle(rabbit_amqqueue:name(), rabbit_queue_type:settle_op(),
@@ -502,19 +491,21 @@ deliver(Qs0, Msg0, Options) ->
502491
delegate:invoke_no_result(MPids, {gen_server2, cast, [MMsg]}),
503492
{Qs, []}.
504493

505-
-spec dequeue(rabbit_amqqueue:name(), NoAck :: boolean(),
494+
-spec dequeue(amqqueue:amqqueue(), NoAck :: boolean(),
506495
LimiterPid :: pid(), rabbit_types:ctag(), state()) ->
507496
{ok, Count :: non_neg_integer(), rabbit_amqqueue:qmsg(), state()} |
508497
{empty, state()}.
509-
dequeue(QName, NoAck, LimiterPid, _CTag, State0) ->
510-
QPid = State0#?STATE.pid,
511-
State1 = ensure_monitor(QPid, QName, State0),
498+
dequeue(Q, NoAck, LimiterPid, _CTag, State0) ->
499+
QName = amqqueue:get_name(Q),
500+
QPid = amqqueue:get_pid(Q),
501+
State1 = State0#?STATE{pid = QPid},
502+
State = ensure_monitor(QName, State1),
512503
case delegate:invoke(QPid, {gen_server2, call,
513504
[{basic_get, self(), NoAck, LimiterPid}, infinity]}) of
514505
empty ->
515-
{empty, State1};
506+
{empty, State};
516507
{ok, Count, Msg} ->
517-
{ok, Count, Msg, State1}
508+
{ok, Count, Msg, State}
518509
end.
519510

520511
-spec state_info(state()) -> #{atom() := term()}.
@@ -603,9 +594,10 @@ qpids(Qs, Confirm, MsgNo) ->
603594
fun ({Q, S0}, {MPidAcc, Qs0}) ->
604595
QPid = amqqueue:get_pid(Q),
605596
QRef = amqqueue:get_name(Q),
606-
S1 = ensure_monitor(QPid, QRef, S0),
607597
%% confirm record only if necessary
608-
S = case S1 of
598+
S = case S0 of
599+
stateless ->
600+
S0;
609601
#?STATE{unconfirmed = U0} ->
610602
Rec = [QPid],
611603
U = case Confirm of
@@ -614,10 +606,9 @@ qpids(Qs, Confirm, MsgNo) ->
614606
true ->
615607
U0#{MsgNo => #msg_status{pending = Rec}}
616608
end,
617-
S1#?STATE{pid = QPid,
618-
unconfirmed = U};
619-
stateless ->
620-
S1
609+
S1 = S0#?STATE{pid = QPid,
610+
unconfirmed = U},
611+
ensure_monitor(QRef, S1)
621612
end,
622613
{[QPid | MPidAcc], [{Q, S} | Qs0]}
623614
end, {[], []}, Qs).
@@ -728,14 +719,15 @@ update_msg_status(confirm, Pid, #msg_status{pending = P,
728719
update_msg_status(down, Pid, #msg_status{pending = P} = S) ->
729720
S#msg_status{pending = lists:delete(Pid, P)}.
730721

731-
ensure_monitor(_, _, State = stateless) ->
732-
State;
733-
ensure_monitor(Pid, _, State = #?STATE{monitored = Monitored})
734-
when is_map_key(Pid, Monitored) ->
735-
State;
736-
ensure_monitor(Pid, QName, State = #?STATE{monitored = Monitored}) ->
737-
_ = erlang:monitor(process, Pid, [{tag, {'DOWN', QName}}]),
738-
State#?STATE{monitored = Monitored#{Pid => ok}}.
722+
ensure_monitor(QName, #?STATE{pid = Pid,
723+
monitored = Monitored} = State) ->
724+
case is_map_key(Pid, Monitored) of
725+
true ->
726+
State;
727+
false ->
728+
_Ref = erlang:monitor(process, Pid, [{tag, {'DOWN', QName}}]),
729+
State#?STATE{monitored = Monitored#{Pid => ok}}
730+
end.
739731

740732
%% part of channel <-> queue api
741733
confirm_to_sender(Pid, QName, MsgSeqNos) ->

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -209,9 +209,6 @@
209209

210210
-callback close(queue_state()) -> ok.
211211

212-
-callback update(amqqueue:amqqueue() | amqqueue:target(), queue_state()) ->
213-
queue_state().
214-
215212
-callback consume(amqqueue:amqqueue(),
216213
consume_spec(),
217214
queue_state()) ->
@@ -251,7 +248,7 @@
251248
Drain :: boolean(), queue_state()) ->
252249
{queue_state(), actions()}.
253250

254-
-callback dequeue(queue_name(), NoAck :: boolean(), LimiterPid :: pid(),
251+
-callback dequeue(amqqueue:amqqueue(), NoAck :: boolean(), LimiterPid :: pid(),
255252
rabbit_types:ctag(), queue_state()) ->
256253
{ok, Count :: non_neg_integer(), rabbit_amqqueue:qmsg(), queue_state()} |
257254
{empty, queue_state()} |
@@ -751,8 +748,7 @@ credit(QName, CTag, DeliveryCount, Credit, Drain, Ctxs) ->
751748
dequeue(Q, NoAck, LimiterPid, CTag, Ctxs) ->
752749
#ctx{state = State0} = Ctx = get_ctx(Q, Ctxs),
753750
Mod = amqqueue:get_type(Q),
754-
QName = amqqueue:get_name(Q),
755-
case Mod:dequeue(QName, NoAck, LimiterPid, CTag, State0) of
751+
case Mod:dequeue(Q, NoAck, LimiterPid, CTag, State0) of
756752
{ok, Num, Msg, State} ->
757753
{ok, Num, Msg, set_ctx(Q, Ctx#ctx{state = State}, Ctxs)};
758754
{empty, State} ->
@@ -785,10 +781,9 @@ get_ctx_with(#resource{kind = queue} = QRef, Contexts, undefined) ->
785781
get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState) ->
786782
Ref = amqqueue:get_name(Q),
787783
case Contexts of
788-
#{Ref := #ctx{module = Mod,
789-
state = State} = Ctx} ->
790-
Ctx#ctx{state = Mod:update(Q, State)};
791-
_ when InitState == undefined ->
784+
#{Ref := Ctx} ->
785+
Ctx;
786+
_ when InitState =:= undefined ->
792787
%% not found and no initial state passed - initialize new state
793788
Mod = amqqueue:get_type(Q),
794789
maybe

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
-export([init/1,
1616
close/1,
17-
update/2,
1817
handle_event/3]).
1918
-export([is_recoverable/1,
2019
recover/2,
@@ -234,12 +233,6 @@ init(Q) when ?is_amqqueue(Q) ->
234233
close(State) ->
235234
rabbit_fifo_client:close(State).
236235

237-
-spec update(amqqueue:amqqueue() | amqqueue:target(), rabbit_fifo_client:state()) ->
238-
rabbit_fifo_client:state().
239-
update(_Q, State) ->
240-
%% QQ state maintains its own updates
241-
State.
242-
243236
-spec handle_event(rabbit_amqqueue:name(),
244237
{amqqueue:ra_server_id(), any()},
245238
rabbit_fifo_client:state()) ->
@@ -975,12 +968,13 @@ credit_v1(_QName, CTag, Credit, Drain, QState) ->
975968
credit(_QName, CTag, DeliveryCount, Credit, Drain, QState) ->
976969
rabbit_fifo_client:credit(quorum_ctag(CTag), DeliveryCount, Credit, Drain, QState).
977970

978-
-spec dequeue(rabbit_amqqueue:name(), NoAck :: boolean(), pid(),
971+
-spec dequeue(amqqueue:amqqueue(), NoAck :: boolean(), pid(),
979972
rabbit_types:ctag(), rabbit_fifo_client:state()) ->
980973
{empty, rabbit_fifo_client:state()} |
981974
{ok, QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()} |
982975
{error, term()}.
983-
dequeue(QName, NoAck, _LimiterPid, CTag0, QState0) ->
976+
dequeue(Q, NoAck, _LimiterPid, CTag0, QState0) ->
977+
QName = amqqueue:get_name(Q),
984978
CTag = quorum_ctag(CTag0),
985979
Settlement = case NoAck of
986980
true ->

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
transfer_leadership/2,
3636
init/1,
3737
close/1,
38-
update/2,
3938
state_info/1,
4039
stat/1,
4140
format/2,
@@ -1024,9 +1023,6 @@ close(#stream_client{readers = Readers,
10241023
rabbit_core_metrics:consumer_deleted(self(), CTag, QName)
10251024
end, Readers).
10261025

1027-
update(_Q, State) ->
1028-
State.
1029-
10301026
update_leader_pid(Pid, #stream_client{leader = Pid} = State) ->
10311027
State;
10321028
update_leader_pid(Pid, #stream_client{} = State) ->

deps/rabbit/src/rabbit_volatile_queue.erl

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
credit/6,
3434
init/1,
3535
close/1,
36-
update/2,
3736
consume/3,
3837
cancel/3,
3938
handle_event/3,
@@ -277,9 +276,6 @@ credit(_QName, CTag, DeliveryCountRcv, LinkCreditRcv, Drain,
277276
close(#?STATE{}) ->
278277
ok.
279278

280-
update(_, #?STATE{} = State) ->
281-
State.
282-
283279
cancel(_, _, #?STATE{} = State) ->
284280
{ok, State}.
285281

deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
[
5959
init/1,
6060
close/1,
61-
update/2,
6261
consume/3,
6362
cancel/3,
6463
handle_event/3,
@@ -291,9 +290,6 @@ init(A1) ->
291290
close(A1) ->
292291
?UNSUPPORTED([A1]).
293292

294-
update(A1,A2) ->
295-
?UNSUPPORTED([A1,A2]).
296-
297293
consume(A1,A2,A3) ->
298294
?UNSUPPORTED([A1,A2,A3]).
299295

0 commit comments

Comments
 (0)