Skip to content

Commit 8d8f5ee

Browse files
committed
Use delegate for queue existence check
1 parent f203803 commit 8d8f5ee

File tree

1 file changed

+35
-29
lines changed

1 file changed

+35
-29
lines changed

deps/rabbit/src/rabbit_volatile_queue.erl

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
generate_name/1,
2121
key_from_name/1,
2222
pid_from_name/2,
23-
deliver_local/3,
23+
local_cast/2,
24+
local_call/2,
2425
exists/1,
2526
ff_enabled/0]).
2627

@@ -83,9 +84,9 @@ new(#resource{virtual_host = Vhost,
8384
new(#resource{virtual_host = Vhost,
8485
name = QNameBin} = QName) ->
8586
case pid_from_name(QNameBin, nodes_with_hashes()) of
86-
{ok, Pid} ->
87+
{ok, Pid} when is_pid(Pid) ->
8788
new0(QName, Pid, Vhost);
88-
error ->
89+
_ ->
8990
error
9091
end.
9192

@@ -137,31 +138,29 @@ declare(Q, _Node) ->
137138
exists(#resource{kind = queue,
138139
name = QNameBin} = QName) ->
139140
case pid_from_name(QNameBin, nodes_with_hashes()) of
140-
{ok, Pid} ->
141+
{ok, Pid} when is_pid(Pid) ->
141142
case ff_enabled() of
142143
true ->
143-
Msg = {has_state, QName, ?MODULE},
144-
try gen_server:call(Pid, Msg)
145-
catch exit:_ ->
146-
false
144+
Request = {has_state, QName, ?MODULE},
145+
%% We use delegate instead of erpc to guarantee that order is preserved.
146+
MFA = {?MODULE, local_call, [Request]},
147+
try delegate:invoke(Pid, MFA)
148+
catch _:_ -> false
147149
end;
148150
false ->
149151
case key_from_name(QNameBin) of
150152
{ok, Key} ->
151153
Msg = {declare_fast_reply_to, Key},
152154
try gen_server:call(Pid, Msg, infinity) of
153-
exists ->
154-
true;
155-
not_found ->
156-
false
157-
catch exit:_ ->
158-
false
155+
exists -> true;
156+
_ -> false
157+
catch exit:_ -> false
159158
end;
160159
error ->
161160
false
162161
end
163162
end;
164-
error ->
163+
_ ->
165164
false
166165
end.
167166

@@ -187,8 +186,9 @@ deliver0(Q, Msg) ->
187186
QPid = amqqueue:get_pid(Q),
188187
case ff_enabled() of
189188
true ->
189+
Request = {queue_event, QName, {deliver, Msg}},
190190
%% We use delegate instead of erpc to guarantee that order is preserved.
191-
MFA = {?MODULE, deliver_local, [QName, Msg]},
191+
MFA = {?MODULE, local_cast, [Request]},
192192
delegate:invoke_no_result(QPid, MFA);
193193
false ->
194194
case key_from_name(QName#resource.name) of
@@ -200,18 +200,26 @@ deliver0(Q, Msg) ->
200200
end
201201
end.
202202

203-
-spec deliver_local(pid(), rabbit_amqqueue:name(), mc:state()) -> ok.
204-
deliver_local(Pid, QName, Msg) ->
205-
%% We ensure clients can't use this mechanism to send a message
206-
%% to an arbitrary process and kill it.
207-
case rabbit_amqp_session:is_local(Pid) orelse
208-
pg_local:in_group(rabbit_channels, Pid) of
209-
true ->
210-
gen_server:cast(Pid, {queue_event, QName, {deliver, Msg}});
211-
false ->
212-
ok
203+
-spec local_cast(pid(), term()) -> ok.
204+
local_cast(Pid, Request) ->
205+
%% Ensure clients can't send a message to an arbitrary process and kill it.
206+
case is_local(Pid) of
207+
true -> gen_server:cast(Pid, Request);
208+
false -> ok
213209
end.
214210

211+
-spec local_call(pid(), term()) -> term().
212+
local_call(Pid, Request) ->
213+
%% Ensure clients can't send a message to an arbitrary process and kill it.
214+
case is_local(Pid) of
215+
true -> gen_server:call(Pid, Request);
216+
false -> exit({unknown_pid, Pid})
217+
end.
218+
219+
is_local(Pid) ->
220+
rabbit_amqp_session:is_local(Pid) orelse
221+
pg_local:in_group(rabbit_channels, Pid).
222+
215223
handle_event(QName, {deliver, Msg}, #?STATE{name = QName,
216224
ctag = Ctag,
217225
credit = undefined} = State) ->
@@ -383,9 +391,7 @@ pid_from_name(<<?PREFIX, Bin/binary>>, CandidateNodes) ->
383391
PidParts = maps:update(node, Candidate, PidParts0),
384392
{ok, rabbit_pid_codec:recompose(PidParts)}
385393
end
386-
catch
387-
error:_ ->
388-
error
394+
catch error:_ -> error
389395
end;
390396
pid_from_name(_, _) ->
391397
error.

0 commit comments

Comments
 (0)