Skip to content

Commit 4f256bc

Browse files
committed
minor
1 parent f2536fb commit 4f256bc

File tree

1 file changed

+49
-125
lines changed

1 file changed

+49
-125
lines changed

packages/service-library/src/servicelib/logging_utils.py

Lines changed: 49 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -264,17 +264,61 @@ async def setup_async_loggers(
264264
log_format_local_dev_enabled=log_format_local_dev_enabled,
265265
)
266266

267-
# Start the async logging context
268-
async with AsyncLoggingContext(
269-
log_format_local_dev_enabled=log_format_local_dev_enabled,
270-
fmt=fmt,
271-
):
267+
# Set up async logging infrastructure
268+
log_queue: queue.Queue = queue.Queue()
269+
# Create handler with proper formatting
270+
handler = logging.StreamHandler()
271+
handler.setFormatter(
272+
CustomFormatter(
273+
fmt,
274+
log_format_local_dev_enabled=log_format_local_dev_enabled,
275+
)
276+
)
277+
278+
# Create and start the queue listener
279+
listener = logging.handlers.QueueListener(
280+
log_queue, handler, respect_handler_level=True
281+
)
282+
listener.start()
283+
284+
# Create queue handler for loggers
285+
queue_handler = logging.handlers.QueueHandler(log_queue)
286+
287+
# Configure all existing loggers - add queue handler alongside existing handlers
288+
manager: logging.Manager = logging.Logger.manager
289+
root_logger = logging.getLogger()
290+
all_loggers = [root_logger] + [
291+
logging.getLogger(name) for name in manager.loggerDict
292+
]
293+
294+
# Add queue handler to all loggers (preserving existing handlers)
295+
for logger in all_loggers:
296+
logger.addHandler(queue_handler)
297+
298+
try:
272299
# Apply filters if provided
273300
if logger_filter_mapping:
274301
_apply_logger_filters(logger_filter_mapping)
275302

303+
_logger.info("Async logging context initialized with unlimited queue")
276304
yield
277305

306+
finally:
307+
# Cleanup: Remove queue handlers from all loggers
308+
try:
309+
for logger in all_loggers:
310+
if queue_handler in logger.handlers:
311+
logger.removeHandler(queue_handler)
312+
313+
# Stop the queue listener
314+
_logger.debug("Shutting down async logging listener...")
315+
listener.stop()
316+
_logger.debug("Async logging context cleanup complete")
317+
318+
except Exception as exc:
319+
sys.stderr.write(f"Error during async logging cleanup: {exc}\n")
320+
sys.stderr.flush()
321+
278322

279323
class LogExceptionsKwargsDict(TypedDict, total=True):
280324
logger: logging.Logger
@@ -521,123 +565,3 @@ def guess_message_log_level(message: str) -> LogLevelInt:
521565
def set_parent_module_log_level(current_module: str, desired_log_level: int) -> None:
522566
parent_module = ".".join(current_module.split(".")[:-1])
523567
logging.getLogger(parent_module).setLevel(desired_log_level)
524-
525-
526-
class AsyncLoggingContext:
527-
"""
528-
Async context manager for non-blocking logging infrastructure.
529-
Based on the pattern from SuperFastPython article.
530-
"""
531-
532-
def __init__(
533-
self,
534-
*,
535-
log_format_local_dev_enabled: bool = False,
536-
fmt: str | None = None,
537-
) -> None:
538-
self.log_format_local_dev_enabled = log_format_local_dev_enabled
539-
self.fmt = fmt or _DEFAULT_FORMATTING
540-
self.queue: queue.Queue | None = None
541-
self.listener: logging.handlers.QueueListener | None = None
542-
self.queue_handler: logging.handlers.QueueHandler | None = None
543-
self.original_handlers: dict[str, list[logging.Handler]] = {}
544-
545-
async def __aenter__(self) -> "AsyncLoggingContext":
546-
"""Set up async logging infrastructure."""
547-
await self._setup_async_logging()
548-
return self
549-
550-
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
551-
"""Clean up async logging infrastructure."""
552-
await self._cleanup_async_logging()
553-
554-
async def _setup_async_logging(self) -> None:
555-
"""Configure non-blocking logging using queue-based approach."""
556-
# Create unlimited queue for log messages
557-
self.queue = queue.Queue()
558-
559-
# Use default StreamHandler with proper formatting
560-
handler = logging.StreamHandler()
561-
handler.setFormatter(
562-
CustomFormatter(
563-
self.fmt,
564-
log_format_local_dev_enabled=self.log_format_local_dev_enabled,
565-
)
566-
)
567-
568-
# Create and start the queue listener
569-
self.listener = logging.handlers.QueueListener(
570-
self.queue, handler, respect_handler_level=True
571-
)
572-
self.listener.start()
573-
574-
# Create queue handler for loggers
575-
self.queue_handler = logging.handlers.QueueHandler(self.queue)
576-
577-
# Configure all existing loggers
578-
await self._configure_loggers()
579-
580-
_logger.info("Async logging context initialized with unlimited queue")
581-
582-
async def _configure_loggers(self) -> None:
583-
"""Add queue handler to all loggers while preserving existing handlers."""
584-
# Get all loggers
585-
manager: logging.Manager = logging.Logger.manager
586-
root_logger = logging.getLogger()
587-
all_loggers = [root_logger] + [
588-
logging.getLogger(name) for name in manager.loggerDict
589-
]
590-
591-
# Store original handlers and add queue handler
592-
for logger in all_loggers:
593-
logger_name = logger.name or "root"
594-
595-
# Store original handlers
596-
self.original_handlers[logger_name] = logger.handlers[:]
597-
598-
# Add queue handler alongside existing handlers
599-
if self.queue_handler:
600-
logger.addHandler(self.queue_handler)
601-
602-
# Allow other coroutines to run
603-
await asyncio.sleep(0)
604-
605-
async def _cleanup_async_logging(self) -> None:
606-
"""Restore original logging configuration."""
607-
try:
608-
# Remove queue handlers from all loggers
609-
manager: logging.Manager = logging.Logger.manager
610-
root_logger = logging.getLogger()
611-
all_loggers = [root_logger] + [
612-
logging.getLogger(name) for name in manager.loggerDict
613-
]
614-
615-
for logger in all_loggers:
616-
# Remove only the queue handler we added
617-
if self.queue_handler and self.queue_handler in logger.handlers:
618-
logger.removeHandler(self.queue_handler)
619-
620-
# Stop the queue listener
621-
if self.listener:
622-
_logger.debug("Shutting down async logging listener...")
623-
self.listener.stop()
624-
625-
_logger.debug("Async logging context cleanup complete")
626-
627-
except Exception as exc:
628-
sys.stderr.write(f"Error during async logging cleanup: {exc}\n")
629-
sys.stderr.flush()
630-
finally:
631-
self.queue = None
632-
self.listener = None
633-
self.queue_handler = None
634-
self.original_handlers.clear()
635-
636-
def get_metrics(self) -> dict[str, Any] | None:
637-
"""Get logging performance metrics."""
638-
if self.queue:
639-
return {
640-
"queue_size": self.queue.qsize(),
641-
"listener_active": self.listener is not None,
642-
}
643-
return None

0 commit comments

Comments
 (0)