Skip to content

Commit 2d0d3af

Browse files
committed
[skip ci] Remove rabbit_log_federation and use LOG_ macros
1 parent 9c2ae88 commit 2d0d3af

File tree

9 files changed

+65
-181
lines changed

9 files changed

+65
-181
lines changed

deps/rabbitmq_exchange_federation/src/rabbit_exchange_federation_app.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
-include_lib("rabbitmq_federation_common/include/rabbit_federation.hrl").
1111
-include("rabbit_exchange_federation.hrl").
12+
-include_lib("kernel/include/logger.hrl").
1213

1314
-behaviour(application).
1415
-export([start/2, stop/1]).
@@ -43,6 +44,7 @@ stop(_State) ->
4344
%%----------------------------------------------------------------------------
4445

4546
init([]) ->
47+
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_FEDERATION}),
4648
Flags = #{
4749
strategy => one_for_one,
4850
intensity => 3,

deps/rabbitmq_exchange_federation/src/rabbit_federation_exchange_link.erl

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ start_link(Args) ->
6565
gen_server2:start_link(?MODULE, Args, [{timeout, infinity}]).
6666

6767
init({Upstream, XName}) ->
68+
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_FEDERATION,
69+
exchange => XName}),
6870
%% If we are starting up due to a policy change then it's possible
6971
%% for the exchange to have been deleted before we got here, in which
7072
%% case it's possible that delete callback would also have been called
@@ -80,7 +82,7 @@ init({Upstream, XName}) ->
8082
gen_server2:cast(self(), maybe_go),
8183
{ok, {not_started, {Upstream, UParams, XName}}};
8284
{error, not_found} ->
83-
rabbit_federation_link_util:log_warning(XName, "not found, stopping link", []),
85+
?LOG_WARNING("not found, stopping link", []),
8486
{stop, gone}
8587
end.
8688

@@ -105,14 +107,12 @@ handle_cast({enqueue, _, _}, State = {not_started, _}) ->
105107
{noreply, State};
106108

107109
handle_cast({enqueue, Serial, Cmd},
108-
State = #state{waiting_cmds = Waiting,
109-
downstream_exchange = XName}) ->
110+
State = #state{waiting_cmds = Waiting}) ->
110111
Waiting1 = gb_trees:insert(Serial, Cmd, Waiting),
111112
try
112113
{noreply, play_back_commands(State#state{waiting_cmds = Waiting1})}
113114
catch exit:{{shutdown, {server_initiated_close, 404, Text}}, _} ->
114-
rabbit_federation_link_util:log_warning(
115-
XName, "detected upstream changes, restarting link: ~tp", [Text]),
115+
?LOG_WARNING("detected upstream changes, restarting link: ~tp", [Text]),
116116
{stop, {shutdown, restart}, State}
117117
end;
118118

@@ -177,7 +177,7 @@ handle_info(check_internal_exchange, State = #state{internal_exchange = IntXName
177177
internal_exchange_interval = Interval}) ->
178178
case check_internal_exchange(IntXNameBin, State) of
179179
upstream_not_found ->
180-
rabbit_log_federation:warning("Federation link could not find upstream exchange '~ts' and will restart",
180+
?LOG_WARNING("Federation link could not find upstream exchange '~ts' and will restart",
181181
[IntXNameBin]),
182182
{stop, {shutdown, restart}, State};
183183
_ ->
@@ -470,25 +470,25 @@ go(S0 = {not_started, {Upstream, UParams, DownXName}}) ->
470470
unacked = Unacked,
471471
internal_exchange_interval = Interval}),
472472
Bindings),
473-
rabbit_log_federation:info("Federation link for ~ts (upstream: ~ts) will perform internal exchange checks "
473+
?LOG_INFO("Federation link for ~ts (upstream: ~ts) will perform internal exchange checks "
474474
"every ~b seconds", [rabbit_misc:rs(DownXName), UName, round(Interval / 1000)]),
475475
TRef = erlang:send_after(Interval, self(), check_internal_exchange),
476476
{noreply, State#state{internal_exchange_timer = TRef}}
477477
end, Upstream, UParams, DownXName, S0).
478478

479479
log_link_startup_attempt(#upstream{name = Name, channel_use_mode = ChMode}, DownXName) ->
480-
rabbit_log_federation:debug("Will try to start a federation link for ~ts, upstream: '~ts', channel use mode: ~ts",
480+
?LOG_DEBUG("Will try to start a federation link for ~ts, upstream: '~ts', channel use mode: ~ts",
481481
[rabbit_misc:rs(DownXName), Name, ChMode]).
482482

483483
%% If channel use mode is 'single', reuse the message transfer channel.
484484
%% Otherwise open a separate one.
485485
reuse_command_channel(MainCh, #upstream{name = UName}, DownXName) ->
486-
rabbit_log_federation:debug("Will use a single channel for both schema operations and message transfer on links to upstream '~ts' for downstream federated ~ts",
486+
?LOG_DEBUG("Will use a single channel for both schema operations and message transfer on links to upstream '~ts' for downstream federated ~ts",
487487
[UName, rabbit_misc:rs(DownXName)]),
488488
{ok, MainCh}.
489489

490490
open_command_channel(Conn, Upstream = #upstream{name = UName}, UParams, DownXName, S0) ->
491-
rabbit_log_federation:debug("Will open a command channel to upstream '~ts' for downstream federated ~ts",
491+
?LOG_DEBUG("Will open a command channel to upstream '~ts' for downstream federated ~ts",
492492
[UName, rabbit_misc:rs(DownXName)]),
493493
case amqp_connection:open_channel(Conn) of
494494
{ok, CCh} ->
@@ -583,12 +583,12 @@ ensure_internal_exchange(IntXNameBin,
583583
connection = Conn,
584584
channel = Ch,
585585
downstream_exchange = #resource{virtual_host = DVhost}}) ->
586-
rabbit_log_federation:debug("Exchange federation will set up exchange '~ts' in upstream '~ts'",
586+
?LOG_DEBUG("Exchange federation will set up exchange '~ts' in upstream '~ts'",
587587
[IntXNameBin, UName]),
588588
#upstream_params{params = Params} = rabbit_federation_util:deobfuscate_upstream_params(UParams),
589-
rabbit_log_federation:debug("Will delete upstream exchange '~ts'", [IntXNameBin]),
589+
?LOG_DEBUG("Will delete upstream exchange '~ts'", [IntXNameBin]),
590590
delete_upstream_exchange(Conn, IntXNameBin),
591-
rabbit_log_federation:debug("Will declare an internal upstream exchange '~ts'", [IntXNameBin]),
591+
?LOG_DEBUG("Will declare an internal upstream exchange '~ts'", [IntXNameBin]),
592592
Base = #'exchange.declare'{exchange = IntXNameBin,
593593
durable = true,
594594
internal = true,
@@ -613,7 +613,7 @@ check_internal_exchange(IntXNameBin,
613613
downstream_exchange = XName = #resource{virtual_host = DVhost}}) ->
614614
#upstream_params{params = Params} =
615615
rabbit_federation_util:deobfuscate_upstream_params(UParams),
616-
rabbit_log_federation:debug("Exchange federation will check on exchange '~ts' in upstream '~ts'",
616+
?LOG_DEBUG("Exchange federation will check on exchange '~ts' in upstream '~ts'",
617617
[IntXNameBin, UName]),
618618
Base = #'exchange.declare'{exchange = IntXNameBin,
619619
passive = true,
@@ -629,13 +629,11 @@ check_internal_exchange(IntXNameBin,
629629
arguments = XFUArgs},
630630
rabbit_federation_link_util:disposable_connection_call(
631631
Params, XFU, fun(404, Text) ->
632-
rabbit_federation_link_util:log_warning(
633-
XName, "detected internal upstream exchange changes,"
634-
" restarting link: ~tp", [Text]),
632+
?LOG_WARNING("detected internal upstream exchange changes,"
633+
" restarting link: ~tp", [Text]),
635634
upstream_not_found;
636635
(Code, Text) ->
637-
rabbit_federation_link_util:log_warning(
638-
XName, "internal upstream exchange check failed: ~tp ~tp",
636+
?LOG_WARNING("internal upstream exchange check failed: ~tp ~tp",
639637
[Code, Text]),
640638
error
641639
end).

deps/rabbitmq_exchange_federation/src/rabbit_federation_exchange_link_sup_sup.erl

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
-include_lib("rabbit_common/include/rabbit.hrl").
1313
-include("rabbit_exchange_federation.hrl").
14+
-include_lib("kernel/include/logger.hrl").
15+
-include_lib("rabbit_federation.hrl").
1416
-define(SUPERVISOR, ?MODULE).
1517

1618
%% Supervises the upstream links for all exchanges (but not queues). We need
@@ -43,8 +45,8 @@ start_child(X) ->
4345
{ok, _Pid} -> ok;
4446
{error, {already_started, _Pid}} ->
4547
#exchange{name = ExchangeName} = X,
46-
rabbit_log_federation:debug("Federation link for exchange ~tp was already started",
47-
[rabbit_misc:rs(ExchangeName)]),
48+
?LOG_DEBUG("Federation link for exchange ~tp was already started",
49+
[rabbit_misc:rs(ExchangeName)]),
4850
ok;
4951
%% A link returned {stop, gone}, the link_sup shut down, that's OK.
5052
{error, {shutdown, _}} -> ok
@@ -67,16 +69,16 @@ stop_child(X) ->
6769
ok -> ok;
6870
{error, Err} ->
6971
#exchange{name = ExchangeName} = X,
70-
rabbit_log_federation:warning(
71-
"Attempt to stop a federation link for exchange ~tp failed: ~tp",
72-
[rabbit_misc:rs(ExchangeName), Err]),
72+
?LOG_WARNING("Attempt to stop a federation link for exchange ~tp failed: ~tp",
73+
[rabbit_misc:rs(ExchangeName), Err]),
7374
ok
7475
end,
7576
ok = mirrored_supervisor:delete_child(?SUPERVISOR, id(X)).
7677

7778
%%----------------------------------------------------------------------------
7879

7980
init([]) ->
81+
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_FEDERATION}),
8082
{ok, {{one_for_one, 1200, 60}, []}}.
8183

8284
%% See comment in rabbit_federation_queue_link_sup_sup:id/1

deps/rabbitmq_federation_common/src/rabbit_federation_link_util.erl

Lines changed: 25 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@
1010
-include_lib("rabbit/include/amqqueue.hrl").
1111
-include_lib("amqp_client/include/amqp_client.hrl").
1212
-include("rabbit_federation.hrl").
13+
-include_lib("kernel/include/logger.hrl").
1314

1415
%% real
1516
-export([start_conn_ch/5, disposable_channel_call/2, disposable_channel_call/3,
1617
disposable_connection_call/3, ensure_connection_closed/1,
1718
log_terminate/4, unacked_new/0, ack/3, nack/3, forward/9,
1819
handle_downstream_down/3, handle_upstream_down/3,
19-
get_connection_name/2, log_debug/3, log_info/3, log_warning/3,
20-
log_error/3]).
20+
get_connection_name/2]).
2121

2222
%% temp
2323
-export([connection_error/6]).
@@ -55,10 +55,9 @@ start_conn_ch(Fun, OUpstream, OUParams,
5555
process_flag(trap_exit, true),
5656
try
5757
R = Fun(Conn, Ch, DConn, DCh),
58-
log_info(
59-
XorQName, "connected to ~ts",
60-
[rabbit_federation_upstream:params_to_string(
61-
UParams)]),
58+
?LOG_INFO("Federation ~ts connected to ~ts",
59+
[rabbit_misc:rs(XorQName),
60+
rabbit_federation_upstream:params_to_string(UParams)]),
6261
Name = pget(name, amqp_connection:info(DConn, [name])),
6362
rabbit_federation_status:report(
6463
OUpstream, OUParams, XorQName, {running, Name}),
@@ -130,45 +129,44 @@ connection_error(remote_start, {{shutdown, {server_initiated_close, Code, Messag
130129
Upstream, UParams, XorQName, State) ->
131130
rabbit_federation_status:report(
132131
Upstream, UParams, XorQName, clean_reason(E)),
133-
log_warning(XorQName,
134-
"did not connect to ~ts. Server has closed the connection due to an error, code: ~tp, "
132+
?LOG_WARNING("Federation ~ts did not connect to ~ts. Server has closed the connection due to an error, code: ~tp, "
135133
"message: ~ts",
136-
[rabbit_federation_upstream:params_to_string(UParams),
134+
[rabbit_misc:rs(XorQName), rabbit_federation_upstream:params_to_string(UParams),
137135
Code, Message]),
138136
{stop, {shutdown, restart}, State};
139137

140138
connection_error(remote_start, E, Upstream, UParams, XorQName, State) ->
141139
rabbit_federation_status:report(
142140
Upstream, UParams, XorQName, clean_reason(E)),
143-
log_warning(XorQName, "did not connect to ~ts. Reason: ~tp",
144-
[rabbit_federation_upstream:params_to_string(UParams),
141+
?LOG_WARNING("Federation ~ts did not connect to ~ts. Reason: ~tp",
142+
[rabbit_misc:rs(XorQName), rabbit_federation_upstream:params_to_string(UParams),
145143
E]),
146144
{stop, {shutdown, restart}, State};
147145

148146
connection_error(remote, E, Upstream, UParams, XorQName, State) ->
149147
rabbit_federation_status:report(
150148
Upstream, UParams, XorQName, clean_reason(E)),
151-
log_info(XorQName, "disconnected from ~ts~n~tp",
152-
[rabbit_federation_upstream:params_to_string(UParams), E]),
149+
?LOG_INFO("Federation ~ts disconnected from ~ts~n~tp",
150+
[rabbit_misc:rs(XorQName), rabbit_federation_upstream:params_to_string(UParams), E]),
153151
{stop, {shutdown, restart}, State};
154152

155153
connection_error(command_channel, E, Upstream, UParams, XorQName, State) ->
156154
rabbit_federation_status:report(
157155
Upstream, UParams, XorQName, clean_reason(E)),
158-
log_info(XorQName, "failed to open a command channel for upstream ~ts~n~tp",
159-
[rabbit_federation_upstream:params_to_string(UParams), E]),
156+
?LOG_INFO("Federation ~ts failed to open a command channel for upstream ~ts~n~tp",
157+
[rabbit_misc:rs(XorQName), rabbit_federation_upstream:params_to_string(UParams), E]),
160158
{stop, {shutdown, restart}, State};
161159

162160
connection_error(local, basic_cancel, Upstream, UParams, XorQName, State) ->
163161
rabbit_federation_status:report(
164162
Upstream, UParams, XorQName, {error, basic_cancel}),
165-
log_info(XorQName, "received a 'basic.cancel'", []),
163+
?LOG_INFO("Federation ~ts received a 'basic.cancel'", [rabbit_misc:rs(XorQName)]),
166164
{stop, {shutdown, restart}, State};
167165

168166
connection_error(local_start, E, Upstream, UParams, XorQName, State) ->
169167
rabbit_federation_status:report(
170168
Upstream, UParams, XorQName, clean_reason(E)),
171-
log_warning(XorQName, "did not connect locally~n~tp", [E]),
169+
?LOG_WARNING("Federation ~ts did not connect locally~n~tp", [rabbit_misc:rs(XorQName), E]),
172170
{stop, {shutdown, restart}, State}.
173171

174172
%% If we terminate due to a gen_server call exploding (almost
@@ -285,7 +283,7 @@ log_terminate(shutdown, Upstream, UParams, XorQName) ->
285283
%% the link because configuration has changed. So try to shut down
286284
%% nicely so that we do not cause unacked messages to be
287285
%% redelivered.
288-
log_info(XorQName, "disconnecting from ~ts",
286+
?LOG_INFO("disconnecting from ~ts",
289287
[rabbit_federation_upstream:params_to_string(UParams)]),
290288
rabbit_federation_status:remove(Upstream, XorQName);
291289

@@ -295,21 +293,6 @@ log_terminate(Reason, Upstream, UParams, XorQName) ->
295293
rabbit_federation_status:report(
296294
Upstream, UParams, XorQName, clean_reason(Reason)).
297295

298-
log_debug(XorQName, Fmt, Args) -> log(debug, XorQName, Fmt, Args).
299-
log_info(XorQName, Fmt, Args) -> log(info, XorQName, Fmt, Args).
300-
log_warning(XorQName, Fmt, Args) -> log(warning, XorQName, Fmt, Args).
301-
log_error(XorQName, Fmt, Args) -> log(error, XorQName, Fmt, Args).
302-
303-
log(Level, XorQName, Fmt0, Args0) ->
304-
Fmt = "Federation ~ts " ++ Fmt0,
305-
Args = [rabbit_misc:rs(XorQName) | Args0],
306-
case Level of
307-
debug -> rabbit_log_federation:debug(Fmt, Args);
308-
info -> rabbit_log_federation:info(Fmt, Args);
309-
warning -> rabbit_log_federation:warning(Fmt, Args);
310-
error -> rabbit_log_federation:error(Fmt, Args)
311-
end.
312-
313296
%%----------------------------------------------------------------------------
314297

315298
disposable_channel_call(Conn, Method) ->
@@ -327,12 +310,13 @@ disposable_channel_call(Conn, Method, ErrFun) ->
327310
end
328311
catch
329312
Exception:Reason ->
330-
rabbit_log_federation:error("Federation link could not create a disposable (one-off) channel due to an error ~tp: ~tp", [Exception, Reason])
313+
?LOG_ERROR("Federation link could not create a disposable (one-off) channel due to an error ~tp: ~tp",
314+
[Exception, Reason])
331315
end.
332316

333317
disposable_connection_call(Params, Method, ErrFun) ->
334318
try
335-
rabbit_log_federation:debug("Disposable connection parameters: ~tp", [Params]),
319+
?LOG_DEBUG("Disposable connection parameters: ~tp", [Params]),
336320
case open(Params, <<"Disposable exchange federation link connection">>) of
337321
{ok, Conn, Ch} ->
338322
try
@@ -345,15 +329,15 @@ disposable_connection_call(Params, Method, ErrFun) ->
345329
ensure_connection_closed(Conn)
346330
end;
347331
{error, {auth_failure, Message}} ->
348-
rabbit_log_federation:error("Federation link could not open a disposable (one-off) connection "
349-
"due to an authentication failure: ~ts", [Message]);
332+
?LOG_ERROR("Federation link could not open a disposable (one-off) connection "
333+
"due to an authentication failure: ~ts", [Message]);
350334
Error ->
351-
rabbit_log_federation:error("Federation link could not open a disposable (one-off) connection, "
352-
"reason: ~tp", [Error]),
335+
?LOG_ERROR("Federation link could not open a disposable (one-off) connection, "
336+
"reason: ~tp", [Error]),
353337
Error
354338
end
355339
catch
356340
Exception:Reason ->
357-
rabbit_log_federation:error("Federation link could not create a disposable (one-off) connection "
358-
"due to an error ~tp: ~tp", [Exception, Reason])
341+
?LOG_ERROR("Federation link could not create a disposable (one-off) connection "
342+
"due to an error ~tp: ~tp", [Exception, Reason])
359343
end.

deps/rabbitmq_federation_common/src/rabbit_federation_pg.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313
-export([start_scope/1, stop_scope/1]).
1414

1515
start_scope(Scope) ->
16-
rabbit_log_federation:debug("Starting pg scope ~ts", [Scope]),
16+
?LOG_DEBUG("Starting pg scope ~ts", [Scope]),
1717
_ = pg:start_link(Scope).
1818

1919
stop_scope(Scope) ->
2020
case whereis(Scope) of
2121
Pid when is_pid(Pid) ->
22-
rabbit_log_federation:debug("Stopping pg scope ~ts", [Scope]),
22+
?LOG_DEBUG("Stopping pg scope ~ts", [Scope]),
2323
Groups = pg:which_groups(Scope),
2424
lists:foreach(
2525
fun(Group) ->

0 commit comments

Comments
 (0)