diff --git a/tests/unit/test_logging.py b/tests/unit/test_logging.py index 53ce3e1a6f3b..7b4e970bf4cb 100644 --- a/tests/unit/test_logging.py +++ b/tests/unit/test_logging.py @@ -1,9 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 -import json import logging import logging.config -import threading import uuid from unittest import mock @@ -15,28 +13,69 @@ from warehouse import logging as wlogging -class TestStructlogFormatter: - def test_warehouse_logger_no_renderer(self): - formatter = wlogging.StructlogFormatter() - record = logging.LogRecord( - "warehouse.request", logging.INFO, None, None, "the message", None, None +class TestGunicornAccessLogParsing: + def test_parse_gunicorn_access_log_success(self): + access_log_line = ( + "192.168.1.1 - - " + '[11/Aug/2025:21:01:13 +0000] "GET /pypi/b5ee/json HTTP/1.1" 404 24 ' + '"-" "dependabot-core/0.325.1 excon/1.2.5 ruby/3.4.5 (x86_64-linux)"' ) - assert formatter.format(record) == "the message" - - def test_non_warehouse_logger_renders(self): - formatter = wlogging.StructlogFormatter() - record = logging.LogRecord( - "another.logger", logging.INFO, None, None, "the message", None, None + event_dict = {"logger": "gunicorn.access", "event": access_log_line} + + result = wlogging._parse_gunicorn_access_log(None, None, event_dict) + + assert result["event"] == "http_request" + assert result["remote_addr"] == "192.168.1.1" + assert result["user"] is None + assert result["timestamp"] == "11/Aug/2025:21:01:13 +0000" + assert result["request"] == "GET /pypi/b5ee/json HTTP/1.1" + assert result["method"] == "GET" + assert result["path"] == "/pypi/b5ee/json" + assert result["protocol"] == "HTTP/1.1" + assert result["status"] == 404 + assert result["size"] == 24 + assert result["referrer"] is None + assert "dependabot-core" in result["user_agent"] + + def test_parse_gunicorn_access_log_with_referrer(self): + access_log_line = ( + "192.168.1.1 - - " + '[12/Aug/2025:10:30:45 +0000] "POST /simple/upload HTTP/1.1" 200 500 ' + '"https://pypi.org/project/test/" "Mozilla/5.0 (compatible; test)"' ) - assert json.loads(formatter.format(record)) == { - "logger": "another.logger", - "level": "INFO", - "event": "the message", - "thread": threading.get_ident(), + event_dict = {"logger": "gunicorn.access", "event": access_log_line} + + result = wlogging._parse_gunicorn_access_log(None, None, event_dict) + + assert result["remote_addr"] == "192.168.1.1" + assert result["method"] == "POST" + assert result["path"] == "/simple/upload" + assert result["status"] == 200 + assert result["size"] == 500 + assert result["referrer"] == "https://pypi.org/project/test/" + assert result["user_agent"] == "Mozilla/5.0 (compatible; test)" + + def test_parse_gunicorn_access_log_unparsable(self): + event_dict = { + "logger": "gunicorn.access", + "event": "this is not a valid access log format", } + result = wlogging._parse_gunicorn_access_log(None, None, event_dict) + + # Should return unchanged if unparsable + assert result["event"] == "this is not a valid access log format" + + def test_parse_gunicorn_access_log_non_access_log(self): + event_dict = {"logger": "some.other.logger", "event": "Some message"} + + result = wlogging._parse_gunicorn_access_log(None, None, event_dict) + + # Should return unchanged for non-access logs + assert result == event_dict + def test_create_id(monkeypatch): uuid4 = pretend.call_recorder(lambda: "a fake uuid") @@ -82,13 +121,17 @@ def test_includeme(monkeypatch, settings, expected_level): "version": 1, "disable_existing_loggers": False, "formatters": { - "structlog": {"()": "warehouse.logging.StructlogFormatter"} + "structlog_formatter": { + "()": structlog.stdlib.ProcessorFormatter, + "processor": mock.ANY, + "foreign_pre_chain": mock.ANY, + } }, "handlers": { "primary": { "class": "logging.StreamHandler", "stream": "ext://sys.stdout", - "formatter": "structlog", + "formatter": "structlog_formatter", }, }, "loggers": { @@ -119,10 +162,12 @@ def test_includeme(monkeypatch, settings, expected_level): structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, - mock.ANY, - mock.ANY, + mock.ANY, # PositionalArgumentsFormatter + mock.ANY, # TimeStamper + mock.ANY, # StackInfoRenderer structlog.processors.format_exc_info, - wlogging.RENDERER, + mock.ANY, # _add_datadog_context + structlog.stdlib.ProcessorFormatter.wrap_for_formatter, ], logger_factory=mock.ANY, wrapper_class=structlog.stdlib.BoundLogger, @@ -135,6 +180,10 @@ def test_includeme(monkeypatch, settings, expected_level): ) assert isinstance( configure.calls[0].kwargs["processors"][4], + structlog.processors.TimeStamper, + ) + assert isinstance( + configure.calls[0].kwargs["processors"][5], structlog.processors.StackInfoRenderer, ) assert isinstance( diff --git a/warehouse/logging.py b/warehouse/logging.py index 9c383716cf62..1b54df360b30 100644 --- a/warehouse/logging.py +++ b/warehouse/logging.py @@ -1,41 +1,125 @@ # SPDX-License-Identifier: Apache-2.0 import logging.config -import threading +import os +import re import uuid import structlog request_logger = structlog.get_logger("warehouse.request") -RENDERER = structlog.processors.JSONRenderer() +# Determine if we're in development mode +DEV_MODE = os.environ.get("WAREHOUSE_ENV") == "development" - -class StructlogFormatter(logging.Formatter): - def format(self, record): - # TODO: Figure out a better way of handling this besides just looking - # at the logger name, ideally this would have some way to - # really differentiate between log items which were logged by - # structlog and which were not. - if not record.name.startswith("warehouse."): - # TODO: Is there a better way to handle this? Maybe we can figure - # out a way to pass this through the structlog processors - # instead of manually duplicating the side effects here? - event_dict = { - "logger": record.name, - "level": record.levelname, - "event": record.msg, - "thread": threading.get_ident(), - } - record.msg = RENDERER(None, record.levelname, event_dict) - - return super().format(record) +# Choose renderer based on environment +RENDERER: structlog.dev.ConsoleRenderer | structlog.processors.JSONRenderer +if DEV_MODE: + RENDERER = structlog.dev.ConsoleRenderer(colors=True) +else: + RENDERER = structlog.processors.JSONRenderer() def _create_id(request): return str(uuid.uuid4()) +def _add_datadog_context(logger, method_name, event_dict): + """Add Datadog trace context if available""" + try: + import ddtrace + + span = ddtrace.tracer.current_span() + if span: + event_dict["dd.trace_id"] = str(span.trace_id) + event_dict["dd.span_id"] = str(span.span_id) + event_dict["dd.service"] = span.service + # deployment metadata + event_dict["dd.env"] = os.environ.get("DD_ENV", "development") + event_dict["dd.version"] = os.environ.get("DD_VERSION", "unknown") + except (ImportError, AttributeError): + pass + return event_dict + + +def _parse_gunicorn_access_log(logger, method_name, event_dict): + """Parse Gunicorn logs into structlog ((only access logs).""" + if event_dict.get("logger") != "gunicorn.access": + return event_dict + + message = event_dict.get("event", "") + + # based on + # https://albersdevelopment.net/2019/08/15/using-structlog-with-gunicorn/ + # and friends + # Combined log format: + # host - user [time] "request" status size "referer" "user-agent" + pattern = re.compile( + r"(?P\S+) \S+ (?P\S+) " + r'\[(?P.+?)\] "(?P.+?)" ' + r"(?P\d+) (?P\S+) " + r'"(?P.*?)" "(?P.*?)"' + ) + + match = pattern.match(message) + if not match: + return event_dict + + fields = match.groupdict() + + # sanitize + fields["user"] = None if fields["user"] == "-" else fields["user"] + fields["status"] = int(fields["status"]) + fields["size"] = 0 if fields["size"] == "-" else int(fields["size"]) + fields["referrer"] = None if fields["referrer"] == "-" else fields["referrer"] + + # Parse "GET /path HTTP/1.1" into separate fields + request_parts = fields["request"].split(" ", 2) + if len(request_parts) >= 2: + fields["method"] = request_parts[0] + fields["path"] = request_parts[1] + if len(request_parts) == 3: + fields["protocol"] = request_parts[2] + + event_dict.update(fields) + event_dict["event"] = "http_request" + return event_dict + + +def configure_celery_logging(logfile: str | None = None, loglevel: int = logging.INFO): + """Configure unified structlog logging for Celery that handles all log types.""" + processors: list = [ + structlog.contextvars.merge_contextvars, + structlog.processors.TimeStamper(fmt="iso"), + structlog.stdlib.add_log_level, + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + _add_datadog_context, + ] + formatter = structlog.stdlib.ProcessorFormatter( + processor=RENDERER, + foreign_pre_chain=processors, + ) + + handler = logging.FileHandler(logfile) if logfile else logging.StreamHandler() + handler.setFormatter(formatter) + + root = logging.getLogger() + root.handlers.clear() + root.addHandler(handler) + root.setLevel(loglevel) + + structlog.configure( + processors=processors + + [ + structlog.stdlib.ProcessorFormatter.wrap_for_formatter, + ], + logger_factory=structlog.stdlib.LoggerFactory(), + wrapper_class=structlog.make_filtering_bound_logger(logging.INFO), + cache_logger_on_first_use=True, + ) + + def _create_logger(request): # This has to use **{} instead of just a kwarg because request.id is not # an allowed kwarg name. @@ -43,17 +127,32 @@ def _create_logger(request): def includeme(config): + # non structlog thigns + foreign_pre_chain: list = [ + structlog.stdlib.add_log_level, + structlog.stdlib.add_logger_name, + structlog.processors.TimeStamper(fmt="iso"), + _add_datadog_context, + _parse_gunicorn_access_log, + ] + # Configure the standard library logging logging.config.dictConfig( { "version": 1, "disable_existing_loggers": False, - "formatters": {"structlog": {"()": "warehouse.logging.StructlogFormatter"}}, + "formatters": { + "structlog_formatter": { + "()": structlog.stdlib.ProcessorFormatter, + "processor": RENDERER, + "foreign_pre_chain": foreign_pre_chain, + } + }, "handlers": { "primary": { "class": "logging.StreamHandler", "stream": "ext://sys.stdout", - "formatter": "structlog", + "formatter": "structlog_formatter", }, }, "loggers": { @@ -88,9 +187,11 @@ def includeme(config): structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), + structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, - RENDERER, + _add_datadog_context, + structlog.stdlib.ProcessorFormatter.wrap_for_formatter, ], logger_factory=structlog.stdlib.LoggerFactory(), wrapper_class=structlog.stdlib.BoundLogger, diff --git a/warehouse/tasks.py b/warehouse/tasks.py index bad2e6e51bc9..b2929b8ada95 100644 --- a/warehouse/tasks.py +++ b/warehouse/tasks.py @@ -14,9 +14,11 @@ import celery.backends.redis import pyramid.scripting import pyramid_retry +import structlog import transaction import venusian +from celery import signals from kombu import Queue from pyramid.threadlocal import get_current_request @@ -36,6 +38,23 @@ logger = logging.getLogger(__name__) +# Celery signal handlers for unified structlog configuration +# https://github.com/hynek/structlog/issues/287 +# https://www.structlog.org/en/stable/frameworks.html#celery +@signals.after_setup_logger.connect +def on_after_setup_logger(logger, loglevel, logfile, *args, **kwargs): + """Override Celery's default logging behavior w/ unified structlog configuration.""" + from warehouse.logging import configure_celery_logging + + configure_celery_logging(logfile, loglevel) + + +@signals.task_prerun.connect +def on_task_prerun(sender, task_id, task, **_): + """Bind task metadata to contextvars for all logs within the task.""" + structlog.contextvars.bind_contextvars(task_id=task_id, task_name=task.name) + + class TLSRedisBackend(celery.backends.redis.RedisBackend): def _params_from_url(self, url, defaults): params = super()._params_from_url(url, defaults) @@ -301,6 +320,10 @@ def includeme(config): REDBEAT_REDIS_URL=s["celery.scheduler_url"], # Silence deprecation warning on startup broker_connection_retry_on_startup=False, + # Disable Celery's logger hijacking for unified structlog control + worker_hijack_root_logger=False, + worker_log_format="%(message)s", + worker_task_log_format="%(message)s", ) config.registry["celery.app"].Task = WarehouseTask config.registry["celery.app"].pyramid_config = config