Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 72 additions & 23 deletions tests/unit/test_logging.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
# SPDX-License-Identifier: Apache-2.0

import json
import logging
import logging.config
import threading
import uuid

from unittest import mock
Expand All @@ -15,28 +13,69 @@
from warehouse import logging as wlogging


class TestStructlogFormatter:
def test_warehouse_logger_no_renderer(self):
formatter = wlogging.StructlogFormatter()
record = logging.LogRecord(
"warehouse.request", logging.INFO, None, None, "the message", None, None
class TestGunicornAccessLogParsing:
def test_parse_gunicorn_access_log_success(self):
access_log_line = (
"192.168.1.1 - - "
'[11/Aug/2025:21:01:13 +0000] "GET /pypi/b5ee/json HTTP/1.1" 404 24 '
'"-" "dependabot-core/0.325.1 excon/1.2.5 ruby/3.4.5 (x86_64-linux)"'
)

assert formatter.format(record) == "the message"

def test_non_warehouse_logger_renders(self):
formatter = wlogging.StructlogFormatter()
record = logging.LogRecord(
"another.logger", logging.INFO, None, None, "the message", None, None
event_dict = {"logger": "gunicorn.access", "event": access_log_line}

result = wlogging._parse_gunicorn_access_log(None, None, event_dict)

assert result["event"] == "http_request"
assert result["remote_addr"] == "192.168.1.1"
assert result["user"] is None
assert result["timestamp"] == "11/Aug/2025:21:01:13 +0000"
assert result["request"] == "GET /pypi/b5ee/json HTTP/1.1"
assert result["method"] == "GET"
assert result["path"] == "/pypi/b5ee/json"
assert result["protocol"] == "HTTP/1.1"
assert result["status"] == 404
assert result["size"] == 24
assert result["referrer"] is None
assert "dependabot-core" in result["user_agent"]

def test_parse_gunicorn_access_log_with_referrer(self):
access_log_line = (
"192.168.1.1 - - "
'[12/Aug/2025:10:30:45 +0000] "POST /simple/upload HTTP/1.1" 200 500 '
'"https://pypi.org/project/test/" "Mozilla/5.0 (compatible; test)"'
)

assert json.loads(formatter.format(record)) == {
"logger": "another.logger",
"level": "INFO",
"event": "the message",
"thread": threading.get_ident(),
event_dict = {"logger": "gunicorn.access", "event": access_log_line}

result = wlogging._parse_gunicorn_access_log(None, None, event_dict)

assert result["remote_addr"] == "192.168.1.1"
assert result["method"] == "POST"
assert result["path"] == "/simple/upload"
assert result["status"] == 200
assert result["size"] == 500
assert result["referrer"] == "https://pypi.org/project/test/"
assert result["user_agent"] == "Mozilla/5.0 (compatible; test)"

def test_parse_gunicorn_access_log_unparsable(self):
event_dict = {
"logger": "gunicorn.access",
"event": "this is not a valid access log format",
}

result = wlogging._parse_gunicorn_access_log(None, None, event_dict)

# Should return unchanged if unparsable
assert result["event"] == "this is not a valid access log format"

def test_parse_gunicorn_access_log_non_access_log(self):
event_dict = {"logger": "some.other.logger", "event": "Some message"}

result = wlogging._parse_gunicorn_access_log(None, None, event_dict)

# Should return unchanged for non-access logs
assert result == event_dict


def test_create_id(monkeypatch):
uuid4 = pretend.call_recorder(lambda: "a fake uuid")
Expand Down Expand Up @@ -82,13 +121,17 @@ def test_includeme(monkeypatch, settings, expected_level):
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"structlog": {"()": "warehouse.logging.StructlogFormatter"}
"structlog_formatter": {
"()": structlog.stdlib.ProcessorFormatter,
"processor": mock.ANY,
"foreign_pre_chain": mock.ANY,
}
},
"handlers": {
"primary": {
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
"formatter": "structlog",
"formatter": "structlog_formatter",
},
},
"loggers": {
Expand Down Expand Up @@ -119,10 +162,12 @@ def test_includeme(monkeypatch, settings, expected_level):
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
mock.ANY,
mock.ANY,
mock.ANY, # PositionalArgumentsFormatter
mock.ANY, # TimeStamper
mock.ANY, # StackInfoRenderer
structlog.processors.format_exc_info,
wlogging.RENDERER,
mock.ANY, # _add_datadog_context
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=mock.ANY,
wrapper_class=structlog.stdlib.BoundLogger,
Expand All @@ -135,6 +180,10 @@ def test_includeme(monkeypatch, settings, expected_level):
)
assert isinstance(
configure.calls[0].kwargs["processors"][4],
structlog.processors.TimeStamper,
)
assert isinstance(
configure.calls[0].kwargs["processors"][5],
structlog.processors.StackInfoRenderer,
)
assert isinstance(
Expand Down
151 changes: 126 additions & 25 deletions warehouse/logging.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,158 @@
# SPDX-License-Identifier: Apache-2.0

import logging.config
import threading
import os
import re
import uuid

import structlog

request_logger = structlog.get_logger("warehouse.request")

RENDERER = structlog.processors.JSONRenderer()
# Determine if we're in development mode
DEV_MODE = os.environ.get("WAREHOUSE_ENV") == "development"


class StructlogFormatter(logging.Formatter):
def format(self, record):
# TODO: Figure out a better way of handling this besides just looking
# at the logger name, ideally this would have some way to
# really differentiate between log items which were logged by
# structlog and which were not.
if not record.name.startswith("warehouse."):
# TODO: Is there a better way to handle this? Maybe we can figure
# out a way to pass this through the structlog processors
# instead of manually duplicating the side effects here?
event_dict = {
"logger": record.name,
"level": record.levelname,
"event": record.msg,
"thread": threading.get_ident(),
}
record.msg = RENDERER(None, record.levelname, event_dict)

return super().format(record)
# Choose renderer based on environment
RENDERER: structlog.dev.ConsoleRenderer | structlog.processors.JSONRenderer
if DEV_MODE:
RENDERER = structlog.dev.ConsoleRenderer(colors=True)
else:
RENDERER = structlog.processors.JSONRenderer()


def _create_id(request):
return str(uuid.uuid4())


def _add_datadog_context(logger, method_name, event_dict):
"""Add Datadog trace context if available"""
try:
import ddtrace

span = ddtrace.tracer.current_span()
if span:
event_dict["dd.trace_id"] = str(span.trace_id)
event_dict["dd.span_id"] = str(span.span_id)
event_dict["dd.service"] = span.service
# deployment metadata
event_dict["dd.env"] = os.environ.get("DD_ENV", "development")
event_dict["dd.version"] = os.environ.get("DD_VERSION", "unknown")
except (ImportError, AttributeError):
pass
return event_dict


def _parse_gunicorn_access_log(logger, method_name, event_dict):
"""Parse Gunicorn logs into structlog ((only access logs)."""
if event_dict.get("logger") != "gunicorn.access":
return event_dict

message = event_dict.get("event", "")

# based on
# https://albersdevelopment.net/2019/08/15/using-structlog-with-gunicorn/
# and friends
# Combined log format:
# host - user [time] "request" status size "referer" "user-agent"
pattern = re.compile(
r"(?P<remote_addr>\S+) \S+ (?P<user>\S+) "
r'\[(?P<timestamp>.+?)\] "(?P<request>.+?)" '
r"(?P<status>\d+) (?P<size>\S+) "
r'"(?P<referrer>.*?)" "(?P<user_agent>.*?)"'
)

match = pattern.match(message)
if not match:
return event_dict

fields = match.groupdict()

# sanitize
fields["user"] = None if fields["user"] == "-" else fields["user"]
fields["status"] = int(fields["status"])
fields["size"] = 0 if fields["size"] == "-" else int(fields["size"])
fields["referrer"] = None if fields["referrer"] == "-" else fields["referrer"]

# Parse "GET /path HTTP/1.1" into separate fields
request_parts = fields["request"].split(" ", 2)
if len(request_parts) >= 2:
fields["method"] = request_parts[0]
fields["path"] = request_parts[1]
if len(request_parts) == 3:
fields["protocol"] = request_parts[2]

event_dict.update(fields)
event_dict["event"] = "http_request"
return event_dict


def configure_celery_logging(logfile: str | None = None, loglevel: int = logging.INFO):
"""Configure unified structlog logging for Celery that handles all log types."""
processors: list = [
structlog.contextvars.merge_contextvars,
structlog.processors.TimeStamper(fmt="iso"),
structlog.stdlib.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
_add_datadog_context,
]
formatter = structlog.stdlib.ProcessorFormatter(
processor=RENDERER,
foreign_pre_chain=processors,
)

handler = logging.FileHandler(logfile) if logfile else logging.StreamHandler()
handler.setFormatter(formatter)

root = logging.getLogger()
root.handlers.clear()
root.addHandler(handler)
root.setLevel(loglevel)

structlog.configure(
processors=processors
+ [
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
cache_logger_on_first_use=True,
)


def _create_logger(request):
# This has to use **{} instead of just a kwarg because request.id is not
# an allowed kwarg name.
return request_logger.bind(**{"request.id": request.id})


def includeme(config):
# non structlog thigns
foreign_pre_chain: list = [
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.processors.TimeStamper(fmt="iso"),
_add_datadog_context,
_parse_gunicorn_access_log,
]

# Configure the standard library logging
logging.config.dictConfig(
{
"version": 1,
"disable_existing_loggers": False,
"formatters": {"structlog": {"()": "warehouse.logging.StructlogFormatter"}},
"formatters": {
"structlog_formatter": {
"()": structlog.stdlib.ProcessorFormatter,
"processor": RENDERER,
"foreign_pre_chain": foreign_pre_chain,
}
},
"handlers": {
"primary": {
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
"formatter": "structlog",
"formatter": "structlog_formatter",
},
},
"loggers": {
Expand Down Expand Up @@ -88,9 +187,11 @@ def includeme(config):
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
RENDERER,
_add_datadog_context,
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
Expand Down
23 changes: 23 additions & 0 deletions warehouse/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
import celery.backends.redis
import pyramid.scripting
import pyramid_retry
import structlog
import transaction
import venusian

from celery import signals
from kombu import Queue
from pyramid.threadlocal import get_current_request

Expand All @@ -36,6 +38,23 @@
logger = logging.getLogger(__name__)


# Celery signal handlers for unified structlog configuration
# https://github.com/hynek/structlog/issues/287
# https://www.structlog.org/en/stable/frameworks.html#celery
@signals.after_setup_logger.connect
def on_after_setup_logger(logger, loglevel, logfile, *args, **kwargs):
"""Override Celery's default logging behavior w/ unified structlog configuration."""
from warehouse.logging import configure_celery_logging

configure_celery_logging(logfile, loglevel)


@signals.task_prerun.connect
def on_task_prerun(sender, task_id, task, **_):
"""Bind task metadata to contextvars for all logs within the task."""
structlog.contextvars.bind_contextvars(task_id=task_id, task_name=task.name)


class TLSRedisBackend(celery.backends.redis.RedisBackend):
def _params_from_url(self, url, defaults):
params = super()._params_from_url(url, defaults)
Expand Down Expand Up @@ -301,6 +320,10 @@ def includeme(config):
REDBEAT_REDIS_URL=s["celery.scheduler_url"],
# Silence deprecation warning on startup
broker_connection_retry_on_startup=False,
# Disable Celery's logger hijacking for unified structlog control
worker_hijack_root_logger=False,
worker_log_format="%(message)s",
worker_task_log_format="%(message)s",
)
config.registry["celery.app"].Task = WarehouseTask
config.registry["celery.app"].pyramid_config = config
Expand Down
Loading