Skip to content

Commit eb638c1

Browse files
author
Matthias Radestock
committed
refactoring: bundle up all the data for a publish
Passing this around as separate args was becoming a pain. Also, now it's easier to add more data items.
1 parent e30c012 commit eb638c1

File tree

7 files changed

+66
-64
lines changed

7 files changed

+66
-64
lines changed

include/rabbit.hrl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@
6464

6565
-record(basic_message, {exchange_name, routing_key, content, persistent_key}).
6666

67+
-record(delivery, {mandatory, immediate, txn, message}).
68+
6769
%%----------------------------------------------------------------------------
6870

6971
-ifdef(use_specs).
@@ -134,6 +136,11 @@
134136
content :: content(),
135137
persistent_key :: maybe(pkey())}).
136138
-type(message() :: basic_message()).
139+
-type(delivery() ::
140+
#delivery{mandatory :: bool(),
141+
immediate :: bool(),
142+
txn :: maybe(txn()),
143+
message :: message()}).
137144
%% this really should be an abstract type
138145
-type(msg_id() :: non_neg_integer()).
139146
-type(msg() :: {queue_name(), pid(), msg_id(), bool(), message()}).

src/rabbit_amqqueue.erl

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
-export([internal_declare/2, internal_delete/1]).
3636
-export([pseudo_queue/2]).
3737
-export([lookup/1, with/2, with_or_die/2,
38-
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
38+
stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
3939
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
4040
-export([claim_queue/2]).
4141
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
@@ -85,7 +85,7 @@
8585
{'error', 'in_use'} |
8686
{'error', 'not_empty'}).
8787
-spec(purge/1 :: (amqqueue()) -> qlen()).
88-
-spec(deliver/5 :: (bool(), bool(), maybe(txn()), message(), pid()) -> bool()).
88+
-spec(deliver/2 :: (pid(), delivery()) -> bool()).
8989
-spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok').
9090
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
9191
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
@@ -241,12 +241,13 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
241241

242242
purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity).
243243

244-
deliver(_IsMandatory, true, Txn, Message, QPid) ->
245-
gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity);
246-
deliver(true, _IsImmediate, Txn, Message, QPid) ->
244+
deliver(QPid, #delivery{immediate = true, txn = Txn, message = Message}) ->
245+
gen_server2:call(QPid, {deliver_immediately, Txn, Message},
246+
infinity);
247+
deliver(QPid, #delivery{mandatory = true, txn = Txn, message = Message}) ->
247248
gen_server2:call(QPid, {deliver, Txn, Message}, infinity),
248249
true;
249-
deliver(false, _IsImmediate, Txn, Message, QPid) ->
250+
deliver(QPid, #delivery{txn = Txn, message = Message}) ->
250251
gen_server2:cast(QPid, {deliver, Txn, Message}),
251252
true.
252253

src/rabbit_basic.erl

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,32 +33,36 @@
3333
-include("rabbit.hrl").
3434
-include("rabbit_framing.hrl").
3535

36-
-export([publish/4, message/4]).
36+
-export([publish/1, message/4, delivery/4]).
3737

3838
%%----------------------------------------------------------------------------
3939

4040
-ifdef(use_specs).
4141

42-
-spec(publish/4 :: (bool(), bool(), maybe(txn()), message()) ->
42+
-spec(publish/1 :: (delivery()) ->
4343
{ok, routing_result(), [pid()]} | not_found()).
44+
-spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()).
4445
-spec(message/4 :: (exchange_name(), routing_key(), binary(), binary()) ->
4546
message()).
4647

4748
-endif.
4849

4950
%%----------------------------------------------------------------------------
5051

51-
publish(Mandatory, Immediate, Txn,
52-
Message = #basic_message{exchange_name = ExchangeName}) ->
52+
publish(Delivery = #delivery{
53+
message = #basic_message{exchange_name = ExchangeName}}) ->
5354
case rabbit_exchange:lookup(ExchangeName) of
5455
{ok, X} ->
55-
{RoutingRes, DeliveredQPids} =
56-
rabbit_exchange:publish(X, Mandatory, Immediate, Txn, Message),
56+
{RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery),
5757
{ok, RoutingRes, DeliveredQPids};
5858
Other ->
5959
Other
6060
end.
6161

62+
delivery(Mandatory, Immediate, Txn, Message) ->
63+
#delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn,
64+
message = Message}.
65+
6266
message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) ->
6367
{ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
6468
Content = #content{class_id = ClassId,

src/rabbit_channel.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -324,8 +324,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
324324
content = DecodedContent,
325325
persistent_key = PersistentKey},
326326
{RoutingRes, DeliveredQPids} =
327-
rabbit_exchange:publish(Exchange, Mandatory, Immediate, TxnKey,
328-
Message),
327+
rabbit_exchange:publish(
328+
Exchange,
329+
rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)),
329330
case RoutingRes of
330331
routed ->
331332
ok;

src/rabbit_error_logger.erl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,10 @@ publish(_Other, _Format, _Data, _State) ->
7575

7676
publish1(RoutingKey, Format, Data, LogExch) ->
7777
{ok, _RoutingRes, _DeliveredQPids} =
78-
rabbit_basic:publish(false, false, none,
79-
rabbit_basic:message(
80-
LogExch, RoutingKey, <<"text/plain">>,
81-
list_to_binary(io_lib:format(Format, Data)))),
78+
rabbit_basic:publish(
79+
rabbit_basic:delivery(
80+
false, false, none,
81+
rabbit_basic:message(
82+
LogExch, RoutingKey, <<"text/plain">>,
83+
list_to_binary(io_lib:format(Format, Data))))),
8284
ok.

src/rabbit_exchange.erl

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636

3737
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
3838
list/1, info/1, info/2, info_all/1, info_all/2,
39-
publish/5]).
39+
publish/2]).
4040
-export([add_binding/4, delete_binding/4, list_bindings/1]).
4141
-export([delete/2]).
4242
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
@@ -72,8 +72,7 @@
7272
-spec(info/2 :: (exchange(), [info_key()]) -> [info()]).
7373
-spec(info_all/1 :: (vhost()) -> [[info()]]).
7474
-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
75-
-spec(publish/5 :: (exchange(), bool(), bool(), maybe(txn()), message()) ->
76-
{routing_result(), [pid()]}).
75+
-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
7776
-spec(add_binding/4 ::
7877
(exchange_name(), queue_name(), routing_key(), amqp_table()) ->
7978
bind_res() | {'error', 'durability_settings_incompatible'}).
@@ -188,13 +187,12 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
188187

189188
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
190189

191-
publish(X, Mandatory, Immediate, Txn, Message) ->
192-
publish(X, [], Mandatory, Immediate, Txn, Message).
190+
publish(X, Delivery) ->
191+
publish(X, [], Delivery).
193192

194-
publish(X, Seen, Mandatory, Immediate, Txn,
195-
Message = #basic_message{routing_key = RK, content = C}) ->
196-
case rabbit_router:deliver(route(X, RK, C),
197-
Mandatory, Immediate, Txn, Message) of
193+
publish(X, Seen, Delivery = #delivery{
194+
message = #basic_message{routing_key = RK, content = C}}) ->
195+
case rabbit_router:deliver(route(X, RK, C), Delivery) of
198196
{_, []} = R ->
199197
#exchange{name = XName, arguments = Args} = X,
200198
case rabbit_misc:r_arg(XName, exchange, Args,
@@ -209,9 +207,7 @@ publish(X, Seen, Mandatory, Immediate, Txn,
209207
false ->
210208
case lookup(AName) of
211209
{ok, AX} ->
212-
publish(AX, NewSeen,
213-
Mandatory, Immediate, Txn,
214-
Message);
210+
publish(AX, NewSeen, Delivery);
215211
{error, not_found} ->
216212
rabbit_log:warning(
217213
"alternate exchange for ~s "

src/rabbit_router.erl

Lines changed: 25 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
-behaviour(gen_server2).
3636

3737
-export([start_link/0,
38-
deliver/5]).
38+
deliver/2]).
3939

4040
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
4141
terminate/2, code_change/3]).
@@ -50,8 +50,7 @@
5050
-ifdef(use_specs).
5151

5252
-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
53-
-spec(deliver/5 :: ([pid()], bool(), bool(), maybe(txn()), message()) ->
54-
{routing_result(), [pid()]}).
53+
-spec(deliver/2 :: ([pid()], delivery()) -> {routing_result(), [pid()]}).
5554

5655
-endif.
5756

@@ -62,13 +61,13 @@ start_link() ->
6261

6362
-ifdef(BUG19758).
6463

65-
deliver(QPids, Mandatory, Immediate, Txn, Message) ->
66-
check_delivery(Mandatory, Immediate,
67-
run_bindings(QPids, Mandatory, Immediate, Txn, Message)).
64+
deliver(QPids, Delivery) ->
65+
check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
66+
run_bindings(QPids, Delivery)).
6867

6968
-else.
7069

71-
deliver(QPids, Mandatory, Immediate, Txn, Message) ->
70+
deliver(QPids, Delivery) ->
7271
%% we reduce inter-node traffic by grouping the qpids by node and
7372
%% only delivering one copy of the message to each node involved,
7473
%% which then in turn delivers it to its queues.
@@ -81,16 +80,14 @@ deliver(QPids, Mandatory, Immediate, Txn, Message) ->
8180
[QPid], D)
8281
end,
8382
dict:new(), QPids)),
84-
Mandatory, Immediate, Txn, Message).
83+
Delivery).
8584

86-
deliver_per_node([{Node, QPids}], Mandatory, Immediate,
87-
Txn, Message)
88-
when Node == node() ->
85+
deliver_per_node([{Node, QPids}], Delivery) when Node == node() ->
8986
%% optimisation
90-
check_delivery(Mandatory, Immediate,
91-
run_bindings(QPids, Mandatory, Immediate, Txn, Message));
92-
deliver_per_node(NodeQPids, Mandatory = false, Immediate = false,
93-
Txn, Message) ->
87+
check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
88+
run_bindings(QPids, Delivery));
89+
deliver_per_node(NodeQPids, Delivery = #delivery{mandatory = false,
90+
immediate = false}) ->
9491
%% optimisation: when Mandatory = false and Immediate = false,
9592
%% rabbit_amqqueue:deliver in run_bindings below will deliver the
9693
%% message to the queue process asynchronously, and return true,
@@ -101,20 +98,16 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false,
10198
{routed,
10299
lists:flatmap(
103100
fun ({Node, QPids}) ->
104-
gen_server2:cast(
105-
{?SERVER, Node},
106-
{deliver, QPids, Mandatory, Immediate, Txn, Message}),
101+
gen_server2:cast({?SERVER, Node}, {deliver, QPids, Delivery}),
107102
QPids
108103
end,
109104
NodeQPids)};
110-
deliver_per_node(NodeQPids, Mandatory, Immediate,
111-
Txn, Message) ->
105+
deliver_per_node(NodeQPids, Delivery) ->
112106
R = rabbit_misc:upmap(
113107
fun ({Node, QPids}) ->
114-
try gen_server2:call(
115-
{?SERVER, Node},
116-
{deliver, QPids, Mandatory, Immediate, Txn, Message},
117-
infinity)
108+
try gen_server2:call({?SERVER, Node},
109+
{deliver, QPids, Delivery},
110+
infinity)
118111
catch
119112
_Class:_Reason ->
120113
%% TODO: figure out what to log (and do!) here
@@ -131,7 +124,8 @@ deliver_per_node(NodeQPids, Mandatory, Immediate,
131124
end,
132125
{false, []},
133126
R),
134-
check_delivery(Mandatory, Immediate, {Routed, lists:append(Handled)}).
127+
check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
128+
{Routed, lists:append(Handled)}).
135129

136130
-endif.
137131

@@ -140,19 +134,17 @@ deliver_per_node(NodeQPids, Mandatory, Immediate,
140134
init([]) ->
141135
{ok, no_state}.
142136

143-
handle_call({deliver, QPids, Mandatory, Immediate, Txn, Message},
144-
From, State) ->
137+
handle_call({deliver, QPids, Delivery}, From, State) ->
145138
spawn(
146139
fun () ->
147-
R = run_bindings(QPids, Mandatory, Immediate, Txn, Message),
140+
R = run_bindings(QPids, Delivery),
148141
gen_server2:reply(From, R)
149142
end),
150143
{noreply, State}.
151144

152-
handle_cast({deliver, QPids, Mandatory, Immediate, Txn, Message},
153-
State) ->
145+
handle_cast({deliver, QPids, Delivery}, State) ->
154146
%% in order to preserve message ordering we must not spawn here
155-
run_bindings(QPids, Mandatory, Immediate, Txn, Message),
147+
run_bindings(QPids, Delivery),
156148
{noreply, State}.
157149

158150
handle_info(_Info, State) ->
@@ -166,11 +158,10 @@ code_change(_OldVsn, State, _Extra) ->
166158

167159
%%--------------------------------------------------------------------
168160

169-
run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) ->
161+
run_bindings(QPids, Delivery) ->
170162
lists:foldl(
171163
fun (QPid, {Routed, Handled}) ->
172-
case catch rabbit_amqqueue:deliver(IsMandatory, IsImmediate,
173-
Txn, Message, QPid) of
164+
case catch rabbit_amqqueue:deliver(QPid, Delivery) of
174165
true -> {true, [QPid | Handled]};
175166
false -> {true, Handled};
176167
{'EXIT', _Reason} -> {Routed, Handled}

0 commit comments

Comments
 (0)