Skip to content

Commit 056e965

Browse files
authored
Merge pull request #220 from aiokitchen/simplify-logging
Simplify logging. Allow to passing handlers to entrypoint
2 parents 25ac87f + a8ac79d commit 056e965

File tree

8 files changed

+310
-279
lines changed

8 files changed

+310
-279
lines changed

aiomisc/aggregate.py

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,7 @@
66
from dataclasses import dataclass
77
from inspect import Parameter
88
from typing import (
9-
Any,
10-
Callable,
11-
Coroutine,
12-
Generic,
13-
Iterable,
14-
List,
15-
Optional,
16-
Protocol,
9+
Any, Callable, Coroutine, Generic, Iterable, List, Optional, Protocol,
1710
TypeVar,
1811
)
1912

@@ -245,7 +238,7 @@ def __init__(
245238

246239

247240
def aggregate(
248-
leeway_ms: float, max_count: Optional[int] = None
241+
leeway_ms: float, max_count: Optional[int] = None,
249242
) -> Callable[[AggregateFunc[V, R]], Callable[[V], Coroutine[Any, Any, R]]]:
250243
"""
251244
Parametric decorator that aggregates multiple
@@ -276,7 +269,7 @@ def aggregate(
276269
:return:
277270
"""
278271
def decorator(
279-
func: AggregateFunc[V, R]
272+
func: AggregateFunc[V, R],
280273
) -> Callable[[V], Coroutine[Any, Any, R]]:
281274
aggregator = Aggregator(
282275
func, max_count=max_count, leeway_ms=leeway_ms,
@@ -289,7 +282,7 @@ def aggregate_async(
289282
leeway_ms: float, max_count: Optional[int] = None,
290283
) -> Callable[
291284
[AggregateAsyncFunc[V, R]],
292-
Callable[[V], Coroutine[Any, Any, R]]
285+
Callable[[V], Coroutine[Any, Any, R]],
293286
]:
294287
"""
295288
Same as ``aggregate``, but with ``func`` arguments of type ``Arg``
@@ -302,7 +295,7 @@ def aggregate_async(
302295
:return:
303296
"""
304297
def decorator(
305-
func: AggregateAsyncFunc[V, R]
298+
func: AggregateAsyncFunc[V, R],
306299
) -> Callable[[V], Coroutine[Any, Any, R]]:
307300
aggregator = AggregatorAsync(
308301
func, max_count=max_count, leeway_ms=leeway_ms,

aiomisc/backoff.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def asyncbackoff(
8989
exceptions += asyncio.TimeoutError,
9090

9191
def decorator(
92-
func: Callable[P, Coroutine[Any, Any, T]]
92+
func: Callable[P, Coroutine[Any, Any, T]],
9393
) -> Callable[P, Coroutine[Any, Any, T]]:
9494
if attempt_timeout is not None:
9595
func = timeout(attempt_timeout)(func)

aiomisc/entrypoint.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
import threading
77
from concurrent.futures import Executor
88
from typing import (
9-
Any, Callable, Coroutine, FrozenSet, MutableSet, Optional, Set, Tuple,
10-
TypeVar, Union,
9+
Any, Callable, Coroutine, FrozenSet, Iterable, MutableSet, Optional, Set,
10+
Tuple, TypeVar, Union,
1111
)
1212
from weakref import WeakSet
1313

@@ -115,6 +115,7 @@ async def _start(self) -> None:
115115
date_format=self.log_date_format,
116116
buffered=self.log_buffering,
117117
loop=self.loop,
118+
handlers=self.log_handlers,
118119
buffer_size=self.log_buffer_size,
119120
flush_interval=self.log_flush_interval,
120121
)
@@ -146,6 +147,7 @@ def __init__(
146147
log_date_format: Optional[str] = DEFAULT_LOG_DATE_FORMAT,
147148
log_flush_interval: float = DEFAULT_AIOMISC_LOG_FLUSH,
148149
log_config: bool = DEFAULT_AIOMISC_LOG_CONFIG,
150+
log_handlers: Iterable[logging.Handler] = (),
149151
policy: asyncio.AbstractEventLoopPolicy = event_loop_policy,
150152
debug: bool = DEFAULT_AIOMISC_DEBUG,
151153
catch_signals: Optional[Tuple[int, ...]] = None,
@@ -196,6 +198,7 @@ def __init__(
196198
self.log_date_format = log_date_format
197199
self.log_flush_interval = log_flush_interval
198200
self.log_format = log_format
201+
self.log_handlers = tuple(log_handlers)
199202
self.log_level = log_level
200203
self.policy = policy
201204

aiomisc/log.py

Lines changed: 100 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,82 @@
11
import asyncio
2-
import atexit
3-
import logging
42
import logging.handlers
5-
import time
3+
import threading
64
import traceback
5+
import warnings
76
from contextlib import suppress
8-
from functools import partial
7+
from queue import Empty, Queue
98
from socket import socket
109
from typing import (
1110
Any, Callable, Dict, Iterable, List, Optional, Tuple, Type, Union,
1211
)
13-
from weakref import finalize
1412

1513
import aiomisc_log
1614
from aiomisc_log.enum import LogFormat, LogLevel
1715

18-
from .thread_pool import run_in_new_thread
19-
20-
21-
def _thread_flusher(
22-
handler: logging.handlers.MemoryHandler,
23-
flush_interval: Union[float, int],
24-
loop: asyncio.AbstractEventLoop,
25-
) -> None:
26-
def has_no_target() -> bool:
27-
return True
28-
29-
def has_target() -> bool:
30-
return bool(handler.target)
31-
32-
is_target = has_no_target
33-
34-
if isinstance(handler, logging.handlers.MemoryHandler):
35-
is_target = has_target
36-
37-
while not loop.is_closed() and is_target():
38-
with suppress(Exception):
39-
if handler.buffer:
40-
handler.flush()
41-
42-
time.sleep(flush_interval)
16+
from .counters import Statistic
17+
18+
19+
class ThreadedHandlerStatistic(Statistic):
20+
threads: int
21+
records: int
22+
errors: int
23+
flushes: int
24+
25+
26+
class ThreadedHandler(logging.Handler):
27+
def __init__(
28+
self, target: logging.Handler, flush_interval: float = 0.1,
29+
buffered: bool = True, queue_size: int = 0,
30+
):
31+
super().__init__()
32+
self._buffered = buffered
33+
self._target = target
34+
self._flush_interval = flush_interval
35+
self._flush_event = threading.Event()
36+
self._queue: Queue[Optional[logging.LogRecord]] = Queue(queue_size)
37+
self._close_event = threading.Event()
38+
self._thread = threading.Thread(target=self._in_thread, daemon=True)
39+
self._statistic = ThreadedHandlerStatistic()
40+
41+
def start(self) -> None:
42+
self._statistic.threads += 1
43+
self._thread.start()
44+
45+
def close(self) -> None:
46+
self._queue.put(None)
47+
del self._queue
48+
self.flush()
49+
self._close_event.set()
50+
super().close()
51+
52+
def flush(self) -> None:
53+
self._statistic.flushes += 1
54+
self._flush_event.set()
55+
56+
def emit(self, record: logging.LogRecord) -> None:
57+
if self._buffered:
58+
self._queue.put_nowait(record)
59+
else:
60+
self._queue.put(record)
61+
self._statistic.records += 1
62+
63+
def _in_thread(self) -> None:
64+
queue = self._queue
65+
while not self._close_event.is_set():
66+
self._flush_event.wait(self._flush_interval)
67+
try:
68+
self.acquire()
69+
while True:
70+
record = queue.get(timeout=self._flush_interval)
71+
if record is None:
72+
return
73+
with suppress(Exception):
74+
self._target.handle(record)
75+
except Empty:
76+
pass
77+
finally:
78+
self.release()
79+
self._statistic.threads -= 1
4380

4481

4582
def suppressor(
@@ -54,27 +91,18 @@ def wrapper() -> None:
5491

5592
def wrap_logging_handler(
5693
handler: logging.Handler,
57-
loop: Optional[asyncio.AbstractEventLoop] = None,
5894
buffer_size: int = 1024,
5995
flush_interval: Union[float, int] = 0.1,
96+
loop: Optional[asyncio.AbstractEventLoop] = None,
6097
) -> logging.Handler:
61-
buffered_handler = logging.handlers.MemoryHandler(
62-
buffer_size,
98+
warnings.warn("wrap_logging_handler is deprecated", DeprecationWarning)
99+
handler = ThreadedHandler(
63100
target=handler,
64-
flushLevel=logging.CRITICAL,
65-
)
66-
67-
run_in_new_thread(
68-
_thread_flusher, args=(
69-
buffered_handler, flush_interval, loop,
70-
), no_return=True, statistic_name="logger",
101+
queue_size=buffer_size,
102+
flush_interval=flush_interval,
71103
)
72-
73-
at_exit_flusher = suppressor(handler.flush)
74-
atexit.register(at_exit_flusher)
75-
finalize(buffered_handler, partial(atexit.unregister, at_exit_flusher))
76-
77-
return buffered_handler
104+
handler.start()
105+
return handler
78106

79107

80108
class UnhandledLoopHook(aiomisc_log.UnhandledHook):
@@ -109,7 +137,7 @@ def __call__(
109137
protocol: Optional[asyncio.Protocol] = context.pop("protocol", None)
110138
transport: Optional[asyncio.Transport] = context.pop("transport", None)
111139
sock: Optional[socket] = context.pop("socket", None)
112-
source_traceback: List[traceback.FrameSummary] = (
140+
source_tb: List[traceback.FrameSummary] = (
113141
context.pop("source_traceback", None) or []
114142
)
115143

@@ -129,51 +157,52 @@ def __call__(
129157

130158
self._fill_transport_extra(transport, extra)
131159
self.logger.exception(message, exc_info=exception, extra=extra)
132-
if source_traceback:
133-
self.logger.error(
134-
"".join(traceback.format_list(source_traceback)),
135-
)
160+
if source_tb:
161+
self.logger.error("".join(traceback.format_list(source_tb)))
136162

137163

138164
def basic_config(
139165
level: Union[int, str] = LogLevel.default(),
140166
log_format: Union[str, LogFormat] = LogFormat.default(),
141-
buffered: bool = True, buffer_size: int = 1024,
167+
buffered: bool = True,
168+
buffer_size: int = 0,
142169
flush_interval: Union[int, float] = 0.2,
143170
loop: Optional[asyncio.AbstractEventLoop] = None,
144171
handlers: Iterable[logging.Handler] = (),
145172
**kwargs: Any,
146173
) -> None:
147-
loop = loop or asyncio.get_event_loop()
148174
unhandled_hook = UnhandledLoopHook(logger_name="asyncio.unhandled")
149175

150-
def wrap_handler(handler: logging.Handler) -> logging.Handler:
151-
nonlocal buffer_size, buffered, loop, unhandled_hook
176+
if loop is None:
177+
loop = asyncio.get_event_loop()
152178

153-
unhandled_hook.add_handler(handler)
179+
forever_task = asyncio.gather(
180+
loop.create_future(), return_exceptions=True,
181+
)
182+
loop.set_exception_handler(unhandled_hook)
154183

155-
if buffered:
156-
return wrap_logging_handler(
157-
handler=handler,
158-
buffer_size=buffer_size,
159-
flush_interval=flush_interval,
160-
loop=loop,
161-
)
162-
return handler
184+
log_handlers = []
185+
186+
for user_handler in handlers:
187+
handler = ThreadedHandler(
188+
buffered=buffered,
189+
flush_interval=flush_interval,
190+
queue_size=buffer_size,
191+
target=user_handler,
192+
)
193+
unhandled_hook.add_handler(handler)
194+
forever_task.add_done_callback(lambda _: handler.close())
195+
log_handlers.append(handler)
196+
handler.start()
163197

164198
aiomisc_log.basic_config(
165-
level=level,
166-
log_format=log_format,
167-
handler_wrapper=wrap_handler,
168-
handlers=handlers,
169-
**kwargs,
199+
level=level, log_format=log_format, handlers=log_handlers, **kwargs,
170200
)
171201

172-
loop.set_exception_handler(unhandled_hook)
173-
174202

175203
__all__ = (
176204
"LogFormat",
177205
"LogLevel",
178206
"basic_config",
207+
"ThreadedHandler",
179208
)

aiomisc/service/uvicorn.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ async def start(self) -> Any:
117117
self.sock = config.bind_socket()
118118
self.server = Server(config)
119119
self.serve_task = asyncio.create_task(
120-
self.server.serve(sockets=[self.sock])
120+
self.server.serve(sockets=[self.sock]),
121121
)
122122

123123
async def stop(self, exception: Optional[Exception] = None) -> None:

aiomisc/timeout.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717

1818
def timeout(
19-
value: Number
19+
value: Number,
2020
) -> Callable[
2121
[Callable[P, Coroutine[Any, Any, T]]],
2222
Callable[P, Coroutine[Any, Any, T]],

aiomisc_log/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@ def basic_config(
125125
handlers: Iterable[logging.Handler] = (),
126126
**kwargs: Any,
127127
) -> None:
128-
129128
if isinstance(level, str):
130129
level = LogLevel[level]
131130

0 commit comments

Comments
 (0)