Skip to content

Commit f5e51d7

Browse files
author
Simon MacMullen
committed
Merged default into bug22908
2 parents 5c54bf9 + cf86b97 commit f5e51d7

11 files changed

+139
-128
lines changed

Makefile

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl $(IN
1515
SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL)
1616
BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES))
1717
TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl $(BEAM_TARGETS)
18-
WEB_URL=http://stage.rabbitmq.com/
18+
WEB_URL=http://www.rabbitmq.com/
1919
MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml))
2020
WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml)
2121
USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-multi.1.xml
@@ -205,7 +205,7 @@ srcdist: distclean
205205
>> $(TARGET_SRC_DIR)/INSTALL
206206
cp README.in $(TARGET_SRC_DIR)/README
207207
elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \
208-
>> $(TARGET_SRC_DIR)/BUILD
208+
>> $(TARGET_SRC_DIR)/README
209209
sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save
210210

211211
cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/
@@ -226,9 +226,10 @@ distclean: clean
226226

227227
# xmlto can not read from standard input, so we mess with a tmp file.
228228
%.gz: %.xml $(DOCS_DIR)/examples-to-end.xsl
229-
xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \
230-
xmlto man -o $(DOCS_DIR) --stringparam man.indent.verbatims=0 $<.tmp && \
231-
gzip -f $(DOCS_DIR)/`basename $< .xml`
229+
xmlto --version | grep -E '^xmlto version 0\.0\.([0-9]|1[1-8])$$' >/dev/null || opt='--stringparam man.indent.verbatims=0' ; \
230+
xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \
231+
xmlto man -o $(DOCS_DIR) $$opt $<.tmp && \
232+
gzip -f $(DOCS_DIR)/`basename $< .xml`
232233
rm -f $<.tmp
233234

234235
# Use tmp files rather than a pipeline so that we get meaningful errors

ebin/rabbit_app.in

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
rabbit_sup,
1212
rabbit_tcp_client_sup]},
1313
{applications, [kernel, stdlib, sasl, mnesia, os_mon]},
14-
%% we also depend on ssl but it shouldn't be in here as we don't
15-
%% actually want to start it
14+
%% we also depend on crypto, public_key and ssl but they shouldn't be
15+
%% in here as we don't actually want to start it
1616
{mod, {rabbit, []}},
1717
{env, [{tcp_listeners, [{"0.0.0.0", 5672}]},
1818
{ssl_listeners, []},

src/rabbit_amqqueue.erl

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@
3737
update_ram_duration/1, set_ram_duration_target/2,
3838
set_maximum_since_use/2]).
3939
-export([pseudo_queue/2]).
40-
-export([lookup/1, with/2, with_or_die/2,
41-
stat/1, stat_all/0, deliver/2, requeue/3, ack/4]).
40+
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
41+
check_exclusive_access/2, with_exclusive_access_or_die/3,
42+
stat/1, deliver/2, requeue/3, ack/4]).
4243
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
4344
-export([consumers/1, consumers_all/1]).
4445
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
@@ -58,18 +59,21 @@
5859

5960
-ifdef(use_specs).
6061

61-
-type(qstats() :: {'ok', queue_name(), non_neg_integer(), non_neg_integer()}).
6262
-type(qlen() :: {'ok', non_neg_integer()}).
6363
-type(qfun(A) :: fun ((amqqueue()) -> A)).
6464
-type(ok_or_errors() ::
6565
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
6666

6767
-spec(start/0 :: () -> 'ok').
6868
-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(),
69-
maybe(pid())) -> amqqueue()).
69+
maybe(pid())) -> {'new' | 'existing', amqqueue()}).
7070
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
7171
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
7272
-spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A).
73+
-spec(assert_equivalence/5 :: (amqqueue(), boolean(), boolean(), amqp_table(),
74+
maybe(pid)) -> ok).
75+
-spec(check_exclusive_access/2 :: (amqqueue(), pid()) -> 'ok').
76+
-spec(with_exclusive_access_or_die/3 :: (queue_name(), pid(), qfun(A)) -> A).
7377
-spec(list/1 :: (vhost()) -> [amqqueue()]).
7478
-spec(info_keys/0 :: () -> [info_key()]).
7579
-spec(info/1 :: (amqqueue()) -> [info()]).
@@ -79,8 +83,8 @@
7983
-spec(consumers/1 :: (amqqueue()) -> [{pid(), ctag(), boolean()}]).
8084
-spec(consumers_all/1 ::
8185
(vhost()) -> [{queue_name(), pid(), ctag(), boolean()}]).
82-
-spec(stat/1 :: (amqqueue()) -> qstats()).
83-
-spec(stat_all/0 :: () -> [qstats()]).
86+
-spec(stat/1 ::
87+
(amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}).
8488
-spec(delete/3 ::
8589
(amqqueue(), 'false', 'false') -> qlen();
8690
(amqqueue(), 'true' , 'false') -> qlen() | {'error', 'in_use'};
@@ -213,6 +217,31 @@ with(Name, F) ->
213217
with_or_die(Name, F) ->
214218
with(Name, F, fun () -> rabbit_misc:not_found(Name) end).
215219

220+
assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q,
221+
Durable, AutoDelete, _Args, Owner) ->
222+
check_exclusive_access(Q, Owner, strict);
223+
assert_equivalence(#amqqueue{name = QueueName},
224+
_Durable, _AutoDelete, _Args, _Owner) ->
225+
rabbit_misc:protocol_error(
226+
not_allowed, "parameters for ~s not equivalent",
227+
[rabbit_misc:rs(QueueName)]).
228+
229+
check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax).
230+
231+
check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) ->
232+
ok;
233+
check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) ->
234+
ok;
235+
check_exclusive_access(#amqqueue{name = QueueName}, _ReaderPid, _MatchType) ->
236+
rabbit_misc:protocol_error(
237+
resource_locked,
238+
"cannot obtain exclusive access to locked ~s",
239+
[rabbit_misc:rs(QueueName)]).
240+
241+
with_exclusive_access_or_die(Name, ReaderPid, F) ->
242+
with_or_die(Name,
243+
fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end).
244+
216245
list(VHostPath) ->
217246
mnesia:dirty_match_object(
218247
rabbit_queue,
@@ -247,9 +276,6 @@ consumers_all(VHostPath) ->
247276

248277
stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
249278

250-
stat_all() ->
251-
lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)).
252-
253279
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
254280
delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
255281

@@ -395,7 +421,7 @@ delegate_call(Pid, Msg, Timeout) ->
395421
delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
396422

397423
delegate_pcall(Pid, Pri, Msg, Timeout) ->
398-
delegate:invoke(Pid,
424+
delegate:invoke(Pid,
399425
fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
400426

401427
delegate_pcast(Pid, Pri, Msg) ->

src/rabbit_amqqueue_process.erl

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ declare(Recover, From,
137137
backing_queue = BQ, backing_queue_state = undefined}) ->
138138
case rabbit_amqqueue:internal_declare(Q, Recover) of
139139
not_found -> {stop, normal, not_found, State};
140-
Q -> gen_server2:reply(From, Q),
140+
Q -> gen_server2:reply(From, {new, Q}),
141141
ok = file_handle_cache:register_callback(
142142
rabbit_amqqueue, set_maximum_since_use,
143143
[self()]),
@@ -146,7 +146,7 @@ declare(Recover, From,
146146
set_ram_duration_target, [self()]}),
147147
BQS = BQ:init(QName, IsDurable, Recover),
148148
noreply(State#q{backing_queue_state = BQS});
149-
Q1 -> {stop, normal, Q1, State}
149+
Q1 -> {stop, normal, {existing, Q1}, State}
150150
end.
151151

152152
terminate_shutdown(Fun, State) ->
@@ -692,11 +692,10 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
692692
end
693693
end;
694694

695-
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
696-
backing_queue = BQ,
695+
handle_call(stat, _From, State = #q{backing_queue = BQ,
697696
backing_queue_state = BQS,
698697
active_consumers = ActiveConsumers}) ->
699-
reply({ok, Name, BQ:len(BQS), queue:len(ActiveConsumers)}, State);
698+
reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State);
700699

701700
handle_call({delete, IfUnused, IfEmpty}, _From,
702701
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->

src/rabbit_channel.erl

Lines changed: 54 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -284,20 +284,15 @@ terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) ->
284284
Reader ! {channel_exit, Channel, Reason},
285285
State#ch{state = terminating}.
286286

287-
return_queue_declare_ok(State, NoWait, Q) ->
288-
NewState = State#ch{most_recently_declared_queue =
289-
(Q#amqqueue.name)#resource.name},
287+
return_queue_declare_ok(#resource{name = ActualName},
288+
NoWait, MessageCount, ConsumerCount, State) ->
289+
NewState = State#ch{most_recently_declared_queue = ActualName},
290290
case NoWait of
291291
true -> {noreply, NewState};
292-
false ->
293-
{ok, ActualName, MessageCount, ConsumerCount} =
294-
rabbit_misc:with_exit_handler(
295-
fun () -> {ok, Q#amqqueue.name, 0, 0} end,
296-
fun () -> rabbit_amqqueue:stat(Q) end),
297-
Reply = #'queue.declare_ok'{queue = ActualName#resource.name,
298-
message_count = MessageCount,
299-
consumer_count = ConsumerCount},
300-
{reply, Reply, NewState}
292+
false -> Reply = #'queue.declare_ok'{queue = ActualName,
293+
message_count = MessageCount,
294+
consumer_count = ConsumerCount},
295+
{reply, Reply, NewState}
301296
end.
302297

303298
check_resource_access(Username, Resource, Perm) ->
@@ -329,19 +324,6 @@ check_write_permitted(Resource, #ch{ username = Username}) ->
329324
check_read_permitted(Resource, #ch{ username = Username}) ->
330325
check_resource_access(Username, Resource, read).
331326

332-
check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) ->
333-
ok;
334-
check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) ->
335-
ok;
336-
check_exclusive_access(#amqqueue{name = QName}, _ReaderPid, _MatchType) ->
337-
rabbit_misc:protocol_error(
338-
resource_locked,
339-
"cannot obtain exclusive access to locked ~s", [rabbit_misc:rs(QName)]).
340-
341-
with_exclusive_access_or_die(QName, ReaderPid, F) ->
342-
rabbit_amqqueue:with_or_die(
343-
QName, fun (Q) -> check_exclusive_access(Q, ReaderPid, lax), F(Q) end).
344-
345327
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
346328
rabbit_misc:protocol_error(
347329
not_found, "no previously declared queue", []);
@@ -444,12 +426,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
444426
Exchange,
445427
rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)),
446428
case RoutingRes of
447-
routed ->
448-
ok;
449-
unroutable ->
450-
ok = basic_return(Message, WriterPid, no_route);
451-
not_delivered ->
452-
ok = basic_return(Message, WriterPid, no_consumers)
429+
routed -> ok;
430+
unroutable -> ok = basic_return(Message, WriterPid, no_route);
431+
not_delivered -> ok = basic_return(Message, WriterPid, no_consumers)
453432
end,
454433
{noreply, case TxnKey of
455434
none -> State;
@@ -480,7 +459,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
480459
next_tag = DeliveryTag }) ->
481460
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
482461
check_read_permitted(QueueName, State),
483-
case with_exclusive_access_or_die(
462+
case rabbit_amqqueue:with_exclusive_access_or_die(
484463
QueueName, ReaderPid,
485464
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
486465
{ok, MessageCount,
@@ -499,7 +478,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
499478
Content),
500479
{noreply, State1#ch{next_tag = DeliveryTag + 1}};
501480
empty ->
502-
{reply, #'basic.get_empty'{cluster_id = <<>>}, State}
481+
{reply, #'basic.get_empty'{}, State}
503482
end;
504483

505484
handle_method(#'basic.consume'{queue = QueueNameBin,
@@ -524,7 +503,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
524503
%% We get the queue process to send the consume_ok on our
525504
%% behalf. This is for symmetry with basic.cancel - see
526505
%% the comment in that method for why.
527-
case with_exclusive_access_or_die(
506+
case rabbit_amqqueue:with_exclusive_access_or_die(
528507
QueueName, ReaderPid,
529508
fun (Q) ->
530509
rabbit_amqqueue:basic_consume(
@@ -716,45 +695,48 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
716695
exclusive = ExclusiveDeclare,
717696
auto_delete = AutoDelete,
718697
nowait = NoWait,
719-
arguments = Args},
698+
arguments = Args} = Declare,
720699
_, State = #ch{virtual_host = VHostPath,
721700
reader_pid = ReaderPid,
722701
queue_collector_pid = CollectorPid}) ->
723702
Owner = case ExclusiveDeclare of
724703
true -> ReaderPid;
725704
false -> none
726705
end,
727-
%% We use this in both branches, because queue_declare may yet return an
728-
%% existing queue.
729706
ActualNameBin = case QueueNameBin of
730707
<<>> -> rabbit_guid:binstring_guid("amq.gen");
731708
Other -> check_name('queue', Other)
732709
end,
733710
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
734-
Q = case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
735-
Args, Owner) of
736-
#amqqueue{name = QueueName,
737-
durable = Durable1,
738-
auto_delete = AutoDelete1} = Q1
739-
when Durable =:= Durable1, AutoDelete =:= AutoDelete1 ->
740-
check_exclusive_access(Q1, Owner, strict),
741-
check_configure_permitted(QueueName, State),
742-
%% We need to notify the reader within the channel
743-
%% process so that we can be sure there are no
744-
%% outstanding exclusive queues being declared as the
745-
%% connection shuts down.
746-
case Owner of
747-
none -> ok;
748-
_ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q1)
749-
end,
750-
Q1;
751-
%% non-equivalence trumps exclusivity arbitrarily
752-
#amqqueue{name = QueueName} ->
753-
rabbit_misc:protocol_error(
754-
not_allowed, "parameters for ~s not equivalent",
755-
[rabbit_misc:rs(QueueName)])
756-
end,
757-
return_queue_declare_ok(State, NoWait, Q);
711+
check_configure_permitted(QueueName, State),
712+
case rabbit_amqqueue:with(
713+
QueueName,
714+
fun (Q) -> ok = rabbit_amqqueue:assert_equivalence(
715+
Q, Durable, AutoDelete, Args, Owner),
716+
rabbit_amqqueue:stat(Q)
717+
end) of
718+
{ok, MessageCount, ConsumerCount} ->
719+
return_queue_declare_ok(QueueName, NoWait, MessageCount,
720+
ConsumerCount, State);
721+
{error, not_found} ->
722+
case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
723+
Args, Owner) of
724+
{new, Q = #amqqueue{}} ->
725+
%% We need to notify the reader within the channel
726+
%% process so that we can be sure there are no
727+
%% outstanding exclusive queues being declared as
728+
%% the connection shuts down.
729+
ok = case Owner of
730+
none -> ok;
731+
_ -> rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
732+
end,
733+
return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
734+
{existing, _Q} ->
735+
%% must have been created between the stat and the
736+
%% declare. Loop around again.
737+
handle_method(Declare, none, State)
738+
end
739+
end;
758740

759741
handle_method(#'queue.declare'{queue = QueueNameBin,
760742
passive = true,
@@ -763,8 +745,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
763745
reader_pid = ReaderPid}) ->
764746
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
765747
check_configure_permitted(QueueName, State),
766-
Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun (Q) -> Q end),
767-
return_queue_declare_ok(State, NoWait, Q);
748+
{{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
749+
rabbit_amqqueue:with_or_die(
750+
QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end),
751+
ok = rabbit_amqqueue:check_exclusive_access(Q, ReaderPid),
752+
return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
753+
State);
768754

769755
handle_method(#'queue.delete'{queue = QueueNameBin,
770756
if_unused = IfUnused,
@@ -773,7 +759,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
773759
_, State = #ch{reader_pid = ReaderPid}) ->
774760
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
775761
check_configure_permitted(QueueName, State),
776-
case with_exclusive_access_or_die(
762+
case rabbit_amqqueue:with_exclusive_access_or_die(
777763
QueueName, ReaderPid,
778764
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
779765
{error, in_use} ->
@@ -809,7 +795,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
809795
_, State = #ch{reader_pid = ReaderPid}) ->
810796
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
811797
check_read_permitted(QueueName, State),
812-
{ok, PurgedMessageCount} = with_exclusive_access_or_die(
798+
{ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die(
813799
QueueName, ReaderPid,
814800
fun (Q) -> rabbit_amqqueue:purge(Q) end),
815801
return_ok(State, NoWait,
@@ -917,7 +903,9 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
917903
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
918904
check_read_permitted(ExchangeName, State),
919905
case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments,
920-
fun (_X, Q) -> check_exclusive_access(Q, ReaderPid, lax) end) of
906+
fun (_X, Q) ->
907+
rabbit_amqqueue:check_exclusive_access(Q, ReaderPid)
908+
end) of
921909
{error, exchange_not_found} ->
922910
rabbit_misc:not_found(ExchangeName);
923911
{error, queue_not_found} ->

0 commit comments

Comments
 (0)