Skip to content

Commit 2f4f19b

Browse files
author
Matthew Sackman
committed
merging in bug20782
2 parents e30c012 + 1b43c21 commit 2f4f19b

File tree

8 files changed

+103
-81
lines changed

8 files changed

+103
-81
lines changed

include/rabbit.hrl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@
6464

6565
-record(basic_message, {exchange_name, routing_key, content, persistent_key}).
6666

67+
-record(delivery, {mandatory, immediate, txn, sender, message}).
68+
6769
%%----------------------------------------------------------------------------
6870

6971
-ifdef(use_specs).
@@ -134,6 +136,12 @@
134136
content :: content(),
135137
persistent_key :: maybe(pkey())}).
136138
-type(message() :: basic_message()).
139+
-type(delivery() ::
140+
#delivery{mandatory :: bool(),
141+
immediate :: bool(),
142+
txn :: maybe(txn()),
143+
sender :: pid(),
144+
message :: message()}).
137145
%% this really should be an abstract type
138146
-type(msg_id() :: non_neg_integer()).
139147
-type(msg() :: {queue_name(), pid(), msg_id(), bool(), message()}).

src/rabbit_amqqueue.erl

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
-export([internal_declare/2, internal_delete/1]).
3636
-export([pseudo_queue/2]).
3737
-export([lookup/1, with/2, with_or_die/2,
38-
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
38+
stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
3939
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
4040
-export([claim_queue/2]).
4141
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
@@ -85,7 +85,7 @@
8585
{'error', 'in_use'} |
8686
{'error', 'not_empty'}).
8787
-spec(purge/1 :: (amqqueue()) -> qlen()).
88-
-spec(deliver/5 :: (bool(), bool(), maybe(txn()), message(), pid()) -> bool()).
88+
-spec(deliver/2 :: (pid(), delivery()) -> bool()).
8989
-spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok').
9090
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
9191
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
@@ -241,13 +241,16 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
241241

242242
purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity).
243243

244-
deliver(_IsMandatory, true, Txn, Message, QPid) ->
245-
gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity);
246-
deliver(true, _IsImmediate, Txn, Message, QPid) ->
247-
gen_server2:call(QPid, {deliver, Txn, Message}, infinity),
244+
deliver(QPid, #delivery{immediate = true,
245+
txn = Txn, sender = ChPid, message = Message}) ->
246+
gen_server2:call(QPid, {deliver_immediately, Txn, Message, ChPid},
247+
infinity);
248+
deliver(QPid, #delivery{mandatory = true,
249+
txn = Txn, sender = ChPid, message = Message}) ->
250+
gen_server2:call(QPid, {deliver, Txn, Message, ChPid}, infinity),
248251
true;
249-
deliver(false, _IsImmediate, Txn, Message, QPid) ->
250-
gen_server2:cast(QPid, {deliver, Txn, Message}),
252+
deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
253+
gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}),
251254
true.
252255

253256
redeliver(QPid, Messages) ->

src/rabbit_amqqueue_process.erl

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
monitor_ref,
6767
unacked_messages,
6868
is_limit_active,
69+
txn,
6970
unsent_message_count}).
7071

7172
-define(INFO_KEYS,
@@ -133,6 +134,7 @@ ch_record(ChPid) ->
133134
monitor_ref = MonitorRef,
134135
unacked_messages = dict:new(),
135136
is_limit_active = false,
137+
txn = none,
136138
unsent_message_count = 0},
137139
put(Key, C),
138140
C;
@@ -156,6 +158,11 @@ ch_record_state_transition(OldCR, NewCR) ->
156158
true -> ok
157159
end.
158160

161+
record_current_channel_tx(ChPid, Txn) ->
162+
%% as a side effect this also starts monitoring the channel (if
163+
%% that wasn't happening already)
164+
store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
165+
159166
deliver_immediately(Message, Delivered,
160167
State = #q{q = #amqqueue{name = QName},
161168
round_robin = RoundRobin,
@@ -198,7 +205,7 @@ deliver_immediately(Message, Delivered,
198205
{not_offered, State}
199206
end.
200207

201-
attempt_delivery(none, Message, State) ->
208+
attempt_delivery(none, _ChPid, Message, State) ->
202209
case deliver_immediately(Message, false, State) of
203210
{offered, false, State1} ->
204211
{true, State1};
@@ -209,13 +216,13 @@ attempt_delivery(none, Message, State) ->
209216
{not_offered, State1} ->
210217
{false, State1}
211218
end;
212-
attempt_delivery(Txn, Message, State) ->
219+
attempt_delivery(Txn, ChPid, Message, State) ->
213220
persist_message(Txn, qname(State), Message),
214-
record_pending_message(Txn, Message),
221+
record_pending_message(Txn, ChPid, Message),
215222
{true, State}.
216223

217-
deliver_or_enqueue(Txn, Message, State) ->
218-
case attempt_delivery(Txn, Message, State) of
224+
deliver_or_enqueue(Txn, ChPid, Message, State) ->
225+
case attempt_delivery(Txn, ChPid, Message, State) of
219226
{true, NewState} ->
220227
{true, NewState};
221228
{false, NewState} ->
@@ -295,10 +302,16 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
295302
round_robin = ActiveConsumers}) ->
296303
case lookup_ch(DownPid) of
297304
not_found -> noreply(State);
298-
#cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} ->
305+
#cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn,
306+
unacked_messages = UAM} ->
299307
NewActive = block_consumers(ChPid, ActiveConsumers),
300308
erlang:demonitor(MonitorRef),
301309
erase({ch, ChPid}),
310+
case Txn of
311+
none -> ok;
312+
_ -> ok = rollback_work(Txn, qname(State)),
313+
erase_tx(Txn)
314+
end,
302315
case check_auto_delete(
303316
deliver_or_enqueue_n(
304317
[{Message, true} ||
@@ -456,13 +469,17 @@ is_tx_persistent(Txn) ->
456469
#tx{is_persistent = Res} = lookup_tx(Txn),
457470
Res.
458471

459-
record_pending_message(Txn, Message) ->
472+
record_pending_message(Txn, ChPid, Message) ->
460473
Tx = #tx{pending_messages = Pending} = lookup_tx(Txn),
461-
store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}).
474+
record_current_channel_tx(ChPid, Txn),
475+
store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending],
476+
ch_pid = ChPid}).
462477

463478
record_pending_acks(Txn, ChPid, MsgIds) ->
464479
Tx = #tx{pending_acks = Pending} = lookup_tx(Txn),
465-
store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], ch_pid = ChPid}).
480+
record_current_channel_tx(ChPid, Txn),
481+
store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending],
482+
ch_pid = ChPid}).
466483

467484
process_pending(Txn, State) ->
468485
#tx{ch_pid = ChPid,
@@ -541,7 +558,7 @@ handle_call({info, Items}, _From, State) ->
541558
catch Error -> reply({error, Error}, State)
542559
end;
543560

544-
handle_call({deliver_immediately, Txn, Message}, _From, State) ->
561+
handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
545562
%% Synchronous, "immediate" delivery mode
546563
%%
547564
%% FIXME: Is this correct semantics?
@@ -555,12 +572,12 @@ handle_call({deliver_immediately, Txn, Message}, _From, State) ->
555572
%% just all ready-to-consume queues get the message, with unready
556573
%% queues discarding the message?
557574
%%
558-
{Delivered, NewState} = attempt_delivery(Txn, Message, State),
575+
{Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State),
559576
reply(Delivered, NewState);
560577

561-
handle_call({deliver, Txn, Message}, _From, State) ->
578+
handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
562579
%% Synchronous, "mandatory" delivery mode
563-
{Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
580+
{Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
564581
reply(Delivered, NewState);
565582

566583
handle_call({commit, Txn}, From, State) ->
@@ -711,9 +728,9 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
711728
reply(locked, State)
712729
end.
713730

714-
handle_cast({deliver, Txn, Message}, State) ->
731+
handle_cast({deliver, Txn, Message, ChPid}, State) ->
715732
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
716-
{_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
733+
{_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
717734
noreply(NewState);
718735

719736
handle_cast({ack, Txn, MsgIds, ChPid}, State) ->

src/rabbit_basic.erl

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,32 +33,36 @@
3333
-include("rabbit.hrl").
3434
-include("rabbit_framing.hrl").
3535

36-
-export([publish/4, message/4]).
36+
-export([publish/1, message/4, delivery/4]).
3737

3838
%%----------------------------------------------------------------------------
3939

4040
-ifdef(use_specs).
4141

42-
-spec(publish/4 :: (bool(), bool(), maybe(txn()), message()) ->
42+
-spec(publish/1 :: (delivery()) ->
4343
{ok, routing_result(), [pid()]} | not_found()).
44+
-spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()).
4445
-spec(message/4 :: (exchange_name(), routing_key(), binary(), binary()) ->
4546
message()).
4647

4748
-endif.
4849

4950
%%----------------------------------------------------------------------------
5051

51-
publish(Mandatory, Immediate, Txn,
52-
Message = #basic_message{exchange_name = ExchangeName}) ->
52+
publish(Delivery = #delivery{
53+
message = #basic_message{exchange_name = ExchangeName}}) ->
5354
case rabbit_exchange:lookup(ExchangeName) of
5455
{ok, X} ->
55-
{RoutingRes, DeliveredQPids} =
56-
rabbit_exchange:publish(X, Mandatory, Immediate, Txn, Message),
56+
{RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery),
5757
{ok, RoutingRes, DeliveredQPids};
5858
Other ->
5959
Other
6060
end.
6161

62+
delivery(Mandatory, Immediate, Txn, Message) ->
63+
#delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn,
64+
sender = self(), message = Message}.
65+
6266
message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) ->
6367
{ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
6468
Content = #content{class_id = ClassId,

src/rabbit_channel.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -324,8 +324,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
324324
content = DecodedContent,
325325
persistent_key = PersistentKey},
326326
{RoutingRes, DeliveredQPids} =
327-
rabbit_exchange:publish(Exchange, Mandatory, Immediate, TxnKey,
328-
Message),
327+
rabbit_exchange:publish(
328+
Exchange,
329+
rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)),
329330
case RoutingRes of
330331
routed ->
331332
ok;

src/rabbit_error_logger.erl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,10 @@ publish(_Other, _Format, _Data, _State) ->
7575

7676
publish1(RoutingKey, Format, Data, LogExch) ->
7777
{ok, _RoutingRes, _DeliveredQPids} =
78-
rabbit_basic:publish(false, false, none,
79-
rabbit_basic:message(
80-
LogExch, RoutingKey, <<"text/plain">>,
81-
list_to_binary(io_lib:format(Format, Data)))),
78+
rabbit_basic:publish(
79+
rabbit_basic:delivery(
80+
false, false, none,
81+
rabbit_basic:message(
82+
LogExch, RoutingKey, <<"text/plain">>,
83+
list_to_binary(io_lib:format(Format, Data))))),
8284
ok.

src/rabbit_exchange.erl

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636

3737
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
3838
list/1, info/1, info/2, info_all/1, info_all/2,
39-
publish/5]).
39+
publish/2]).
4040
-export([add_binding/4, delete_binding/4, list_bindings/1]).
4141
-export([delete/2]).
4242
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
@@ -72,8 +72,7 @@
7272
-spec(info/2 :: (exchange(), [info_key()]) -> [info()]).
7373
-spec(info_all/1 :: (vhost()) -> [[info()]]).
7474
-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
75-
-spec(publish/5 :: (exchange(), bool(), bool(), maybe(txn()), message()) ->
76-
{routing_result(), [pid()]}).
75+
-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
7776
-spec(add_binding/4 ::
7877
(exchange_name(), queue_name(), routing_key(), amqp_table()) ->
7978
bind_res() | {'error', 'durability_settings_incompatible'}).
@@ -188,13 +187,12 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
188187

189188
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
190189

191-
publish(X, Mandatory, Immediate, Txn, Message) ->
192-
publish(X, [], Mandatory, Immediate, Txn, Message).
190+
publish(X, Delivery) ->
191+
publish(X, [], Delivery).
193192

194-
publish(X, Seen, Mandatory, Immediate, Txn,
195-
Message = #basic_message{routing_key = RK, content = C}) ->
196-
case rabbit_router:deliver(route(X, RK, C),
197-
Mandatory, Immediate, Txn, Message) of
193+
publish(X, Seen, Delivery = #delivery{
194+
message = #basic_message{routing_key = RK, content = C}}) ->
195+
case rabbit_router:deliver(route(X, RK, C), Delivery) of
198196
{_, []} = R ->
199197
#exchange{name = XName, arguments = Args} = X,
200198
case rabbit_misc:r_arg(XName, exchange, Args,
@@ -209,9 +207,7 @@ publish(X, Seen, Mandatory, Immediate, Txn,
209207
false ->
210208
case lookup(AName) of
211209
{ok, AX} ->
212-
publish(AX, NewSeen,
213-
Mandatory, Immediate, Txn,
214-
Message);
210+
publish(AX, NewSeen, Delivery);
215211
{error, not_found} ->
216212
rabbit_log:warning(
217213
"alternate exchange for ~s "

0 commit comments

Comments
 (0)