Skip to content

Commit 9b82020

Browse files
Merge pull request #1459 from rabbitmq/rabbitmq-server-1456
Add lager backend that logs to amq.rabbitmq.log
2 parents e56392f + 6bd51a0 commit 9b82020

File tree

8 files changed

+309
-158
lines changed

8 files changed

+309
-158
lines changed

docs/rabbitmq.conf.example

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,14 @@
674674
##
675675
# log.dir = /var/log/rabbitmq
676676

677+
## Logging to the amq.rabbitmq.log exchange (can be true or false)
678+
##
679+
# log.exchange = false
680+
681+
## Log level to use when logging to the amq.rabbitmq.log exchange
682+
##
683+
# log.exchange.level = info
684+
677685
## Logging to console (can be true or false)
678686
##
679687
# log.console = false

priv/schema/rabbit.schema

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1047,6 +1047,13 @@ end}.
10471047
{datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}}
10481048
]}.
10491049

1050+
{mapping, "log.exchange", "rabbit.log.exchange.enabled", [
1051+
{datatype, {enum, [true, false]}}
1052+
]}.
1053+
{mapping, "log.exchange.level", "rabbit.log.exchange.level", [
1054+
{datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}}
1055+
]}.
1056+
10501057
{mapping, "log.syslog", "rabbit.log.syslog.enabled", [
10511058
{datatype, {enum, [true, false]}}
10521059
]}.

src/lager_exchange_backend.erl

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 1.1 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at http://www.mozilla.org/MPL/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is GoPivotal, Inc.
14+
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
15+
%%
16+
17+
%% @doc RabbitMQ backend for lager.
18+
%% Configuration is a proplist with the following keys:
19+
%% <ul>
20+
%% <li>`level' - log level to use</li>
21+
%% <li>`formatter' - the module to use when formatting log messages. Defaults to
22+
%% `lager_default_formatter'</li>
23+
%% <li>`formatter_config' - the format configuration string. Defaults to
24+
%% `time [ severity ] message'</li>
25+
%% </ul>
26+
27+
-module(lager_exchange_backend).
28+
29+
-behaviour(gen_event).
30+
31+
-export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2,
32+
handle_info/2]).
33+
34+
-include("rabbit.hrl").
35+
-include("rabbit_framing.hrl").
36+
37+
-include_lib("lager/include/lager.hrl").
38+
39+
-record(state, {level :: {'mask', integer()},
40+
formatter :: atom(),
41+
format_config :: any(),
42+
init_exchange_ts = undefined :: rabbit_types:timestamp(),
43+
exchange = undefined :: #resource{}}).
44+
45+
-ifdef(TEST).
46+
-include_lib("eunit/include/eunit.hrl").
47+
-compile([{parse_transform, lager_transform}]).
48+
-endif.
49+
50+
-define(INIT_EXCHANGE_INTERVAL_SECS, 5).
51+
-define(TERSE_FORMAT, [time, " [", severity, "] ", message]).
52+
-define(DEFAULT_FORMAT_CONFIG, ?TERSE_FORMAT).
53+
-define(FORMAT_CONFIG_OFF, []).
54+
55+
-ifdef(TEST).
56+
-define(DEPRECATED(_Msg), ok).
57+
-else.
58+
-define(DEPRECATED(Msg),
59+
io:format(user, "WARNING: This is a deprecated lager_exchange_backend configuration. Please use \"~w\" instead.~n", [Msg])).
60+
-endif.
61+
62+
-define(LOG_EXCH_NAME, <<"amq.rabbitmq.log">>).
63+
64+
init([Level]) when is_atom(Level) ->
65+
?DEPRECATED([{level, Level}]),
66+
init([{level, Level}]);
67+
init([Level, true]) when is_atom(Level) -> % for backwards compatibility
68+
?DEPRECATED([{level, Level}, {formatter_config, [{eol, "\\r\\n\\"}]}]),
69+
init([{level, Level}, {formatter_config, ?FORMAT_CONFIG_OFF}]);
70+
init([Level, false]) when is_atom(Level) -> % for backwards compatibility
71+
?DEPRECATED([{level, Level}]),
72+
init([{level, Level}]);
73+
74+
init(Options) when is_list(Options) ->
75+
true = validate_options(Options),
76+
Level = get_option(level, Options, undefined),
77+
try lager_util:config_to_mask(Level) of
78+
L ->
79+
DefaultOptions = [{formatter, lager_default_formatter},
80+
{formatter_config, ?DEFAULT_FORMAT_CONFIG}],
81+
[Formatter, Config] = [get_option(K, Options, Default) || {K, Default} <- DefaultOptions],
82+
State0 = #state{level=L,
83+
formatter=Formatter,
84+
format_config=Config},
85+
State1 = maybe_init_exchange(State0),
86+
{ok, State1}
87+
catch
88+
_:_ ->
89+
{error, {fatal, bad_log_level}}
90+
end;
91+
init(Level) when is_atom(Level) ->
92+
?DEPRECATED([{level, Level}]),
93+
init([{level, Level}]);
94+
init(Other) ->
95+
{error, {fatal, {bad_lager_exchange_backend_config, Other}}}.
96+
97+
validate_options([]) -> true;
98+
validate_options([{level, L}|T]) when is_atom(L) ->
99+
case lists:member(L, ?LEVELS) of
100+
false ->
101+
throw({error, {fatal, {bad_level, L}}});
102+
true ->
103+
validate_options(T)
104+
end;
105+
validate_options([{formatter, M}|T]) when is_atom(M) ->
106+
validate_options(T);
107+
validate_options([{formatter_config, C}|T]) when is_list(C) ->
108+
validate_options(T);
109+
validate_options([H|_]) ->
110+
throw({error, {fatal, {bad_lager_exchange_backend_config, H}}}).
111+
112+
get_option(K, Options, Default) ->
113+
case lists:keyfind(K, 1, Options) of
114+
{K, V} -> V;
115+
false -> Default
116+
end.
117+
118+
handle_call(get_loglevel, #state{level=Level} = State) ->
119+
{ok, Level, State};
120+
handle_call({set_loglevel, Level}, State) ->
121+
try lager_util:config_to_mask(Level) of
122+
Levels ->
123+
{ok, ok, State#state{level=Levels}}
124+
catch
125+
_:_ ->
126+
{ok, {error, bad_log_level}, State}
127+
end;
128+
handle_call(_Request, State) ->
129+
{ok, ok, State}.
130+
131+
handle_event({log, _Message} = Event, State0) ->
132+
State1 = maybe_init_exchange(State0),
133+
handle_log_event(Event, State1);
134+
handle_event(_Event, State) ->
135+
{ok, State}.
136+
137+
handle_info(_Info, State) ->
138+
{ok, State}.
139+
140+
terminate(_Reason, _State) ->
141+
ok.
142+
143+
code_change(_OldVsn, State, _Extra) ->
144+
{ok, State}.
145+
146+
%% @private
147+
handle_log_event({log, _Message}, #state{exchange=undefined} = State) ->
148+
% NB: tried to define the exchange but still undefined,
149+
% so not logging this message. Note: we can't log this dropped
150+
% message because it will start an infinite loop
151+
{ok, State};
152+
handle_log_event({log, Message},
153+
#state{level=L, exchange=LogExch,
154+
formatter=Formatter, format_config=FormatConfig} = State) ->
155+
case lager_util:is_loggable(Message, L, ?MODULE) of
156+
true ->
157+
%% 0-9-1 says the timestamp is a "64 bit POSIX timestamp". That's
158+
%% second resolution, not millisecond.
159+
RoutingKey = rabbit_data_coercion:to_binary(lager_msg:severity(Message)),
160+
Timestamp = os:system_time(seconds),
161+
Node = rabbit_data_coercion:to_binary(node()),
162+
Headers = [{<<"node">>, longstr, Node}],
163+
AmqpMsg = #'P_basic'{content_type = <<"text/plain">>,
164+
timestamp = Timestamp,
165+
headers = Headers},
166+
Body = rabbit_data_coercion:to_binary(Formatter:format(Message, FormatConfig)),
167+
case rabbit_basic:publish(LogExch, RoutingKey, AmqpMsg, Body) of
168+
{ok, _DeliveredQPids} -> ok;
169+
{error, not_found} -> ok
170+
end,
171+
{ok, State};
172+
false ->
173+
{ok, State}
174+
end.
175+
176+
%% @private
177+
maybe_init_exchange(#state{exchange=undefined, init_exchange_ts=undefined} = State) ->
178+
Now = erlang:monotonic_time(second),
179+
handle_init_exchange(init_exchange(true), Now, State);
180+
maybe_init_exchange(#state{exchange=undefined, init_exchange_ts=Timestamp} = State) ->
181+
Now = erlang:monotonic_time(second),
182+
Result = init_exchange(Now - Timestamp > ?INIT_EXCHANGE_INTERVAL_SECS),
183+
handle_init_exchange(Result, Now, State);
184+
maybe_init_exchange(State) ->
185+
State.
186+
187+
%% @private
188+
init_exchange(true) ->
189+
{ok, DefaultVHost} = application:get_env(rabbit, default_vhost),
190+
VHost = rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
191+
try
192+
#exchange{} = rabbit_exchange:declare(VHost, topic, true, false, true, [], ?INTERNAL_USER),
193+
{ok, #resource{virtual_host=DefaultVHost, kind=exchange, name=?LOG_EXCH_NAME}}
194+
catch
195+
ErrType:Err ->
196+
rabbit_log:debug("Could not initialize exchange '~s' in vhost '~s', reason: ~p:~p",
197+
[?LOG_EXCH_NAME, DefaultVHost, ErrType, Err]),
198+
{ok, undefined}
199+
end;
200+
init_exchange(_) ->
201+
{ok, undefined}.
202+
203+
%% @private
204+
handle_init_exchange({ok, undefined}, Now, State) ->
205+
State#state{init_exchange_ts=Now};
206+
handle_init_exchange({ok, Exchange}, Now, State) ->
207+
State#state{exchange=Exchange, init_exchange_ts=Now}.
208+
209+
-ifdef(TEST).
210+
console_config_validation_test_() ->
211+
Good = [{level, info}],
212+
Bad1 = [{level, foo}],
213+
Bad2 = [{larval, info}],
214+
AllGood = [{level, info}, {formatter, my_formatter},
215+
{formatter_config, ["blort", "garbage"]}],
216+
[
217+
?_assertEqual(true, validate_options(Good)),
218+
?_assertThrow({error, {fatal, {bad_level, foo}}}, validate_options(Bad1)),
219+
?_assertThrow({error, {fatal, {bad_lager_exchange_backend_config, {larval, info}}}}, validate_options(Bad2)),
220+
?_assertEqual(true, validate_options(AllGood))
221+
].
222+
-endif.

src/rabbit.erl

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -173,30 +173,19 @@
173173
[{description, "message delivery logic ready"},
174174
{requires, core_initialized}]}).
175175

176-
-rabbit_boot_step({log_relay,
177-
[{description, "error log relay"},
178-
{mfa, {rabbit_sup, start_child,
179-
[rabbit_error_logger_lifecycle,
180-
supervised_lifecycle,
181-
[rabbit_error_logger_lifecycle,
182-
{rabbit_error_logger, start, []},
183-
{rabbit_error_logger, stop, []}]]}},
184-
{requires, routing_ready},
185-
{enables, networking}]}).
186-
187176
-rabbit_boot_step({direct_client,
188177
[{description, "direct client"},
189178
{mfa, {rabbit_direct, boot, []}},
190-
{requires, log_relay}]}).
179+
{requires, routing_ready}]}).
191180

192181
-rabbit_boot_step({connection_tracking,
193182
[{description, "sets up internal storage for node-local connections"},
194183
{mfa, {rabbit_connection_tracking, boot, []}},
195-
{requires, log_relay}]}).
184+
{requires, routing_ready}]}).
196185

197186
-rabbit_boot_step({networking,
198187
[{mfa, {rabbit_networking, boot, []}},
199-
{requires, log_relay}]}).
188+
{requires, routing_ready}]}).
200189

201190
-rabbit_boot_step({notify_cluster,
202191
[{description, "notify cluster nodes"},

0 commit comments

Comments
 (0)