-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Expand file tree
/
Copy pathlogging.py
More file actions
403 lines (338 loc) · 13.6 KB
/
logging.py
File metadata and controls
403 lines (338 loc) · 13.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
#
# SPDX-License-Identifier: Apache-2.0
import builtins
import functools
import logging
import os
import sys
import typing
from typing import Any
if typing.TYPE_CHECKING:
from structlog.typing import EventDict, Processor, WrappedLogger
HAYSTACK_LOGGING_USE_JSON_ENV_VAR = "HAYSTACK_LOGGING_USE_JSON"
HAYSTACK_LOGGING_IGNORE_STRUCTLOG_ENV_VAR = "HAYSTACK_LOGGING_IGNORE_STRUCTLOG"
class PatchedLogger(typing.Protocol):
"""Class which enables using type checkers to find wrong logger usage."""
def debug(
self,
msg: str,
*,
_: Any = None,
exc_info: Any = None,
stack_info: Any = False,
stacklevel: int = 1,
**kwargs: Any,
) -> None:
"""Log a debug message."""
def info(
self,
msg: str,
*,
_: Any = None,
exc_info: Any = None,
stack_info: Any = False,
stacklevel: int = 1,
**kwargs: Any,
) -> None:
"""Log an info message."""
def warn(
self,
msg: str,
*,
_: Any = None,
exc_info: Any = None,
stack_info: Any = False,
stacklevel: int = 1,
**kwargs: Any,
) -> None:
"""Log a warning message."""
def warning(
self,
msg: str,
*,
_: Any = None,
exc_info: Any = None,
stack_info: Any = False,
stacklevel: int = 1,
**kwargs: Any,
) -> None:
"""Log a warning message."""
def error(
self,
msg: str,
*,
_: Any = None,
exc_info: Any = None,
stack_info: Any = False,
stacklevel: int = 1,
**kwargs: Any,
) -> None:
"""Log an error message."""
def critical(
self,
msg: str,
*,
_: Any = None,
exc_info: Any = None,
stack_info: Any = False,
stacklevel: int = 1,
**kwargs: Any,
) -> None:
"""Log a critical message."""
def exception(
self,
msg: str,
*,
_: Any = None,
exc_info: Any = None,
stack_info: Any = False,
stacklevel: int = 1,
**kwargs: Any,
) -> None:
"""Log an exception message."""
def fatal(
self,
msg: str,
*,
_: Any = None,
exc_info: Any = None,
stack_info: Any = False,
stacklevel: int = 1,
**kwargs: Any,
) -> None:
"""Log a fatal message."""
def log(
self,
level: int,
msg: str,
*,
_: Any = None,
exc_info: Any = None,
stack_info: Any = False,
stacklevel: int = 1,
**kwargs: Any,
) -> None:
"""Log a message."""
def setLevel(self, level: int) -> None:
"""Set the logging level."""
def patch_log_method_to_kwargs_only(func: typing.Callable) -> typing.Callable:
"""A decorator to make sure that a function is only called with keyword arguments."""
@functools.wraps(func)
def _log_only_with_kwargs(
msg: str, *, _: Any = None, exc_info: Any = None, stack_info: Any = False, stacklevel: int = 1, **kwargs: Any
) -> typing.Callable: # we need the `_` to avoid a syntax error
existing_extra = kwargs.pop("extra", {})
return func(
# we need to increase the stacklevel by 1 to point to the correct caller
# (otherwise it points to this function)
msg,
exc_info=exc_info,
stack_info=stack_info,
stacklevel=stacklevel + 1,
extra={**existing_extra, **kwargs},
)
return _log_only_with_kwargs
def patch_log_with_level_method_to_kwargs_only(func: typing.Callable) -> typing.Callable:
"""A decorator to make sure that a function is only called with keyword arguments."""
@functools.wraps(func)
def _log_only_with_kwargs(
level: int | str,
msg: str,
*,
_: Any = None,
exc_info: Any = None,
stack_info: Any = False,
stacklevel: int = 1,
**kwargs: Any, # we need the `_` to avoid a syntax error
) -> typing.Callable:
existing_extra = kwargs.pop("extra", {})
return func(
level,
msg,
exc_info=exc_info,
stack_info=stack_info,
# we need to increase the stacklevel by 1 to point to the correct caller
# (otherwise it points to this function)
stacklevel=stacklevel + 1,
extra={**existing_extra, **kwargs},
)
return _log_only_with_kwargs
def patch_make_records_to_use_kwarg_string_interpolation(original_make_records: typing.Callable) -> typing.Callable:
"""A decorator to ensure string interpolation is used."""
@functools.wraps(original_make_records)
def _wrapper(
name: str,
level: int | str,
fn: str,
lno: int,
msg: str,
args: Any, # noqa: ARG001
exc_info: Any,
func: Any = None,
extra: Any = None,
sinfo: Any = None,
) -> typing.Callable:
safe_extra = extra or {}
try:
interpolated_msg = msg.format(**safe_extra)
except (KeyError, ValueError, IndexError):
interpolated_msg = msg
return original_make_records(name, level, fn, lno, interpolated_msg, (), exc_info, func, extra, sinfo)
return _wrapper
def _patch_structlog_call_information(logger: logging.Logger) -> None:
# structlog patches the findCaller to hide itself from the traceback.
# We need to patch their patch to hide `haystack.logging` from the traceback.
try:
from structlog._frames import _find_first_app_frame_and_name, _format_stack
from structlog.stdlib import _FixedFindCallerLogger
if not isinstance(logger, _FixedFindCallerLogger):
return
# completely copied from structlog. We only add `haystack.logging` to the list of ignored frames
def findCaller(stack_info: bool = False, stacklevel: int = 1) -> tuple[str, int, str, str | None]: # noqa: ARG001
try:
sinfo: str | None
# we need to exclude `haystack.logging` from the stack
f, name = _find_first_app_frame_and_name(["logging", "haystack.logging"])
sinfo = _format_stack(f) if stack_info else None
except Exception as error:
print(f"Error in findCaller: {error}")
return f.f_code.co_filename, f.f_lineno, f.f_code.co_name, sinfo
logger.findCaller = findCaller # type: ignore
except ImportError:
pass
def getLogger(name: str) -> PatchedLogger:
"""
Get the Haystack logger, a patched version of the one from the standard library.
We patch the default logger methods to make sure that they are only called with keyword arguments.
We enforce keyword-arguments because
- it brings in consistency
- it makes structure logging effective, not just an available feature
"""
logger = logging.getLogger(name)
logger.debug = patch_log_method_to_kwargs_only(logger.debug) # type: ignore
logger.info = patch_log_method_to_kwargs_only(logger.info) # type: ignore
logger.warn = patch_log_method_to_kwargs_only(logger.warn) # type: ignore
logger.warning = patch_log_method_to_kwargs_only(logger.warning) # type: ignore
logger.error = patch_log_method_to_kwargs_only(logger.error) # type: ignore
logger.critical = patch_log_method_to_kwargs_only(logger.critical) # type: ignore
logger.exception = patch_log_method_to_kwargs_only(logger.exception) # type: ignore
logger.fatal = patch_log_method_to_kwargs_only(logger.fatal) # type: ignore
logger.log = patch_log_with_level_method_to_kwargs_only(logger.log) # type: ignore
_patch_structlog_call_information(logger)
# We also patch the `makeRecord` method to use keyword string interpolation
logger.makeRecord = patch_make_records_to_use_kwarg_string_interpolation(logger.makeRecord) # type: ignore
return typing.cast(PatchedLogger, logger)
def add_line_and_file(_: "WrappedLogger", __: str, event_dict: "EventDict") -> "EventDict":
"""Add line and file to log entries."""
stdlib_record = event_dict.get("_record")
if not stdlib_record:
return event_dict
event_dict["lineno"] = stdlib_record.lineno
event_dict["module"] = stdlib_record.name
return event_dict
def correlate_logs_with_traces(_: "WrappedLogger", __: str, event_dict: "EventDict") -> "EventDict":
"""
Add correlation data for logs.
This is useful if you want to correlate logs with traces.
"""
import haystack.tracing.tracer # to avoid circular imports
if not haystack.tracing.is_tracing_enabled():
return event_dict
current_span = haystack.tracing.tracer.current_span()
if current_span:
event_dict.update(current_span.get_correlation_data_for_logs())
return event_dict
def configure_logging(use_json: bool | None = None) -> None:
"""
Configure logging for Haystack.
- If `structlog` is not installed, we keep everything as it is. The user is responsible for configuring logging
themselves.
- If `structlog` is installed, we configure it to format log entries including its key-value data. To disable this
behavior set the environment variable `HAYSTACK_LOGGING_IGNORE_STRUCTLOG` to `true`.
- If `structlog` is installed, you can JSON format all logs. Enable this by
- setting the `use_json` parameter to `True` when calling this function
- setting the environment variable `HAYSTACK_LOGGING_USE_JSON` to `true`
"""
import haystack.utils.jupyter # to avoid circular imports
try:
import structlog
from structlog.processors import ExceptionRenderer
from structlog.tracebacks import ExceptionDictTransformer
except ImportError:
# structlog is not installed - fall back to standard logging
return
if os.getenv(HAYSTACK_LOGGING_IGNORE_STRUCTLOG_ENV_VAR, "false").lower() == "true":
# If the user wants to ignore structlog, we don't configure it and fall back to standard logging
return
# We roughly follow the structlog documentation here:
# https://www.structlog.org/en/stable/standard-library.html#rendering-using-structlog-based-formatters-within-logging
# This means that we use structlog to format the log entries for entries emitted via `logging` and `structlog`.
if use_json is None: # explicit parameter takes precedence over everything else
use_json_env_var = os.getenv(HAYSTACK_LOGGING_USE_JSON_ENV_VAR)
if use_json_env_var is None:
# We try to guess if we are in an interactive terminal or not
interactive_terminal = (
sys.stderr.isatty() or hasattr(builtins, "__IPYTHON__") or haystack.utils.jupyter.is_in_jupyter()
)
use_json = not interactive_terminal
else:
# User gave us an explicit value via environment variable
use_json = use_json_env_var.lower() == "true"
shared_processors: list[Processor] = [
# Add the log level to the event_dict for structlog to use
structlog.stdlib.add_log_level,
# Adds the current timestamp in ISO format to logs
structlog.processors.TimeStamper(fmt="iso"),
structlog.contextvars.merge_contextvars,
add_line_and_file,
]
if use_json:
# We only need that in sophisticated production setups where we want to correlate logs with traces
shared_processors.append(correlate_logs_with_traces)
structlog.configure(
processors=shared_processors + [structlog.stdlib.ProcessorFormatter.wrap_for_formatter],
logger_factory=structlog.stdlib.LoggerFactory(ignore_frame_names=["haystack.logging"]),
cache_logger_on_first_use=True,
# This is a filter that will filter out log entries that are below the log level of the root logger.
wrapper_class=structlog.make_filtering_bound_logger(min_level=logging.root.getEffectiveLevel()),
)
renderers: list[Processor]
if use_json:
renderers = [
ExceptionRenderer(
# don't show locals in production logs - this can be quite sensitive information
ExceptionDictTransformer(show_locals=False)
),
structlog.processors.JSONRenderer(),
]
else:
renderers = [structlog.dev.ConsoleRenderer()]
formatter = structlog.stdlib.ProcessorFormatter(
# These run ONLY on `logging` entries that do NOT originate within
# structlog.
foreign_pre_chain=shared_processors
+ [
# Add the information from the `logging` `extras` to the event dictionary
structlog.stdlib.ExtraAdder()
],
# These run on ALL entries after the pre_chain is done.
processors=[
# Remove _record & _from_structlog. to avoid that this metadata is added to the final log record
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
*renderers,
],
)
handler = logging.StreamHandler()
handler.name = "HaystackLoggingHandler"
# Use OUR `ProcessorFormatter` to format all `logging` entries.
handler.setFormatter(formatter)
haystack_logger = logging.getLogger("haystack")
# avoid adding our handler twice
old_handlers = [
h
for h in haystack_logger.handlers
if not (isinstance(h, logging.StreamHandler) and h.name == "HaystackLoggingHandler")
]
new_handlers = [handler, *old_handlers]
haystack_logger.handlers = new_handlers