Skip to content

Commit 1326576

Browse files
authored
Support code attributes for celery, pika and aio-pika consumer span (#496)
*Description of changes:* Celery, Pika, and Aio-Pika all support consumer callbacks. The consumer spans represent not only the message retrieval action, but also the execution of the callback function that processes the message. This change adds code information about the callback function to the consumer span attributes. ``` { "kind": "SpanKind.CONSUMER", "attributes": { "code.function.name": "tasks.add_numbers", "code.file.path": "/Volumes/workplace/extension/aws-otel-python-instrumentation/samples/celery/tasks.py", "code.line.number": 14, "celery.action": "run", "celery.state": "SUCCESS", "messaging.destination": "celery", "celery.delivery_info": "{'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': False}", "messaging.message.id": "8aaec4dc-31b0-4705-9c90-6f8eb1a9a0a9", "celery.reply_to": "40d7205e-2128-3d9e-afcf-07607c40f3ef", "celery.hostname": "gen88041@7cf34de1a5ba", "celery.task_name": "tasks.add_numbers" }, ``` For Producer span, the code attributes can be achieved by [stacktrace-based solution](https://github.com/aws-observability/aws-otel-python-instrumentation/blob/main/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/code_correlation/code_attributes_span_processor.py#L85-L100) For Confluent-Kafka, Kafka-Python, and AioKafka, these three libraries do not support consumer callbacks. Their consumer spans therefore represent only the message retrieval action, and accordingly use the same stacktrace-based solution. By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
1 parent 8d67179 commit 1326576

File tree

12 files changed

+1195
-128
lines changed

12 files changed

+1195
-128
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ RUN sed -i "/opentelemetry-exporter-otlp-proto-grpc/d" ./aws-opentelemetry-distr
2121
RUN mkdir workspace && pip install --target workspace ./aws-opentelemetry-distro
2222

2323
# Stage 2: Build the cp-utility binary
24-
FROM public.ecr.aws/docker/library/rust:1.87 AS builder
24+
FROM public.ecr.aws/docker/library/rust:1.89 AS builder
2525

2626
WORKDIR /usr/src/cp-utility
2727
COPY ./tools/cp-utility .

aws-opentelemetry-distro/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ dependencies = [
4242
"opentelemetry-instrumentation-aws-lambda == 0.54b1",
4343
"opentelemetry-instrumentation-aio-pika == 0.54b1",
4444
"opentelemetry-instrumentation-aiohttp-client == 0.54b1",
45+
"opentelemetry-instrumentation-aiokafka == 0.54b1",
4546
"opentelemetry-instrumentation-aiopg == 0.54b1",
4647
"opentelemetry-instrumentation-asgi == 0.54b1",
4748
"opentelemetry-instrumentation-asyncpg == 0.54b1",
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""
5+
Patches for OpenTelemetry Aio-Pika instrumentation to add code correlation support.
6+
"""
7+
# pylint: disable=duplicate-code
8+
9+
import functools
10+
import logging
11+
12+
from amazon.opentelemetry.distro.aws_opentelemetry_configurator import get_code_correlation_enabled_status
13+
from amazon.opentelemetry.distro.code_correlation.utils import record_code_attributes
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
def patch_callback_decorator_decorate(original_decorate):
19+
"""Patch CallbackDecorator.decorate to add code attributes to span."""
20+
21+
@functools.wraps(original_decorate)
22+
def patched_decorate(self, callback):
23+
# Decorate the original callback to add code attributes
24+
callback = record_code_attributes(callback)
25+
26+
# Call original decorate method with our enhanced callback
27+
return original_decorate(self, callback)
28+
29+
return patched_decorate
30+
31+
32+
def _apply_aio_pika_instrumentation_patches():
33+
"""Apply aio-pika patches if code correlation is enabled."""
34+
try:
35+
if get_code_correlation_enabled_status() is not True:
36+
return
37+
38+
# Import CallbackDecorator inside function to allow proper testing
39+
try:
40+
# pylint: disable=import-outside-toplevel
41+
from opentelemetry.instrumentation.aio_pika.callback_decorator import CallbackDecorator
42+
except ImportError:
43+
logger.warning("Failed to apply Aio-Pika patches: CallbackDecorator not available")
44+
return
45+
46+
# Patch CallbackDecorator.decorate
47+
CallbackDecorator.decorate = patch_callback_decorator_decorate(CallbackDecorator.decorate)
48+
49+
except Exception as exc: # pylint: disable=broad-exception-caught
50+
logger.warning("Failed to apply Aio-Pika patches: %s", exc)
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""
5+
Patches for OpenTelemetry Celery instrumentation to add code correlation support.
6+
7+
This module provides patches to enhance the Celery instrumentation with code correlation
8+
capabilities, allowing tracking of user code that is executed within Celery tasks.
9+
"""
10+
11+
import functools
12+
import logging
13+
from typing import Any, Callable, Optional
14+
15+
from amazon.opentelemetry.distro.aws_opentelemetry_configurator import get_code_correlation_enabled_status
16+
from amazon.opentelemetry.distro.code_correlation.utils import add_code_attributes_to_span
17+
18+
logger = logging.getLogger(__name__)
19+
20+
# Import at module level to avoid pylint import-outside-toplevel
21+
try:
22+
from opentelemetry.instrumentation.celery import CeleryInstrumentor
23+
from opentelemetry.instrumentation.celery import utils as celery_utils
24+
except ImportError:
25+
celery_utils = None
26+
CeleryInstrumentor = None
27+
28+
29+
def _extract_task_function(task) -> Optional[Callable[..., Any]]: # pylint: disable=too-many-return-statements
30+
"""
31+
Extract the actual user function from a Celery task object.
32+
33+
Args:
34+
task: The Celery task object
35+
36+
Returns:
37+
The underlying user function if found, None otherwise
38+
"""
39+
if task is None:
40+
return None
41+
42+
try:
43+
# For regular function-based tasks, the actual function is stored in task.run
44+
if hasattr(task, "run") and callable(task.run):
45+
func = task.run
46+
if hasattr(func, "__func__"):
47+
return func.__func__
48+
if func.__name__ != "run": # Avoid returning generic run methods
49+
return func
50+
51+
# For function-based tasks, the original function might be stored differently
52+
if hasattr(task, "__call__") and callable(task.__call__):
53+
func = task.__call__
54+
if hasattr(func, "__func__") and func.__func__.__name__ != "__call__":
55+
return func.__func__
56+
if func.__name__ != "__call__":
57+
return func
58+
59+
# Try to get the original function from __wrapped__ attribute
60+
if hasattr(task, "__wrapped__") and callable(task.__wrapped__):
61+
return task.__wrapped__
62+
63+
except Exception: # pylint: disable=broad-exception-caught
64+
pass
65+
66+
return None
67+
68+
69+
def _add_code_correlation_to_span(span, task) -> None:
70+
"""
71+
Add code correlation attributes to a span for a Celery task.
72+
73+
Args:
74+
span: The OpenTelemetry span to add attributes to
75+
task: The Celery task object
76+
"""
77+
try:
78+
if span is None or not span.is_recording():
79+
return
80+
81+
user_function = _extract_task_function(task)
82+
if user_function is not None:
83+
add_code_attributes_to_span(span, user_function)
84+
85+
except Exception: # pylint: disable=broad-exception-caught
86+
pass
87+
88+
89+
def patch_celery_prerun(original_trace_prerun: Callable) -> Callable:
90+
"""
91+
Patch the Celery _trace_prerun method to add code correlation support.
92+
93+
Args:
94+
original_trace_prerun: The original _trace_prerun method to wrap
95+
96+
Returns:
97+
The patched _trace_prerun method
98+
"""
99+
100+
@functools.wraps(original_trace_prerun)
101+
def patched_trace_prerun(self, *args, **kwargs):
102+
result = original_trace_prerun(self, *args, **kwargs)
103+
104+
try:
105+
task = kwargs.get("task")
106+
task_id = kwargs.get("task_id")
107+
108+
if task is not None and task_id is not None and celery_utils is not None:
109+
ctx = celery_utils.retrieve_context(task, task_id)
110+
if ctx is not None:
111+
span, _, _ = ctx
112+
if span is not None:
113+
_add_code_correlation_to_span(span, task)
114+
115+
except Exception: # pylint: disable=broad-exception-caught
116+
pass
117+
118+
return result
119+
120+
return patched_trace_prerun
121+
122+
123+
def _apply_celery_instrumentation_patches():
124+
"""
125+
Apply code correlation patches to the Celery instrumentation.
126+
"""
127+
try:
128+
if get_code_correlation_enabled_status() is not True:
129+
return
130+
131+
if CeleryInstrumentor is None:
132+
logger.warning("Failed to apply Celery patches: CeleryInstrumentor not available")
133+
return
134+
135+
original_trace_prerun = CeleryInstrumentor._trace_prerun
136+
CeleryInstrumentor._trace_prerun = patch_celery_prerun(original_trace_prerun)
137+
138+
except Exception as exc: # pylint: disable=broad-exception-caught
139+
logger.warning("Failed to apply Celery instrumentation patches: %s", exc)

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_django_patches.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ def patched_process_view(
6262
): # pylint: disable=too-many-locals,too-many-nested-blocks,too-many-branches
6363
"""Patched process_view method to add code attributes to the span."""
6464
# First call the original process_view method
65+
# pylint: disable=assignment-from-none
6566
result = original_process_view(self, request, view_func, *args, **kwargs)
6667

6768
# Add code attributes if we have a span and view function
@@ -120,12 +121,12 @@ def patched_instrument(self, **kwargs):
120121
_patch_django_middleware()
121122

122123
# Call the original _instrument method
123-
original_instrument(self, **kwargs)
124+
original_instrument(self, **kwargs) # pylint: disable=assignment-from-none
124125

125126
def patched_uninstrument(self, **kwargs):
126127
"""Patched _uninstrument method with Django middleware patch restoration"""
127128
# Call the original _uninstrument method first
128-
original_uninstrument(self, **kwargs)
129+
original_uninstrument(self, **kwargs) # pylint: disable=assignment-from-none
129130

130131
# Restore original Django middleware
131132
_unpatch_django_middleware()
Lines changed: 22 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,15 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
33
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.
4-
import os
54
from logging import Logger, getLogger
65

76
from amazon.opentelemetry.distro._utils import is_installed
87
from amazon.opentelemetry.distro.patches._resource_detector_patches import _apply_resource_detector_patches
98

10-
# Env variable for determining whether we want to monkey patch gevent modules. Possible values are 'all', 'none', and
11-
# comma separated list 'os, thread, time, sys, socket, select, ssl, subprocess, builtins, signal, queue, contextvars'
12-
AWS_GEVENT_PATCH_MODULES = "AWS_GEVENT_PATCH_MODULES"
13-
149
_logger: Logger = getLogger(__name__)
1510

1611

17-
def apply_instrumentation_patches() -> None:
12+
def apply_instrumentation_patches() -> None: # pylint: disable=too-many-branches
1813
"""Apply patches to upstream instrumentation libraries.
1914
2015
This method is invoked to apply changes to upstream instrumentation libraries, typically when changes to upstream
@@ -23,37 +18,6 @@ def apply_instrumentation_patches() -> None:
2318
2419
Where possible, automated testing should be run to catch upstream changes resulting in broken patches
2520
"""
26-
if is_installed("gevent"):
27-
try:
28-
gevent_patch_module = os.environ.get(AWS_GEVENT_PATCH_MODULES, "all")
29-
30-
if gevent_patch_module != "none":
31-
# pylint: disable=import-outside-toplevel
32-
# Delay import to only occur if monkey patch is needed (e.g. gevent is used to run application).
33-
from gevent import monkey
34-
35-
if gevent_patch_module == "all":
36-
monkey.patch_all()
37-
else:
38-
module_list = [module.strip() for module in gevent_patch_module.split(",")]
39-
40-
monkey.patch_all(
41-
socket="socket" in module_list,
42-
time="time" in module_list,
43-
select="select" in module_list,
44-
thread="thread" in module_list,
45-
os="os" in module_list,
46-
ssl="ssl" in module_list,
47-
subprocess="subprocess" in module_list,
48-
sys="sys" in module_list,
49-
builtins="builtins" in module_list,
50-
signal="signal" in module_list,
51-
queue="queue" in module_list,
52-
contextvars="contextvars" in module_list,
53-
)
54-
except Exception as exc: # pylint: disable=broad-except
55-
_logger.info("Failed to monkey patch gevent, exception: %s", exc)
56-
5721
if is_installed("botocore ~= 1.0"):
5822
# pylint: disable=import-outside-toplevel
5923
# Delay import to only occur if patches is safe to apply (e.g. the instrumented library is installed).
@@ -92,6 +56,27 @@ def apply_instrumentation_patches() -> None:
9256

9357
_apply_django_instrumentation_patches()
9458

59+
if is_installed("celery"):
60+
# pylint: disable=import-outside-toplevel
61+
# Delay import to only occur if patches is safe to apply (e.g. the instrumented library is installed).
62+
from amazon.opentelemetry.distro.patches._celery_patches import _apply_celery_instrumentation_patches
63+
64+
_apply_celery_instrumentation_patches()
65+
66+
if is_installed("pika"):
67+
# pylint: disable=import-outside-toplevel
68+
# Delay import to only occur if patches is safe to apply (e.g. the instrumented library is installed).
69+
from amazon.opentelemetry.distro.patches._pika_patches import _apply_pika_instrumentation_patches
70+
71+
_apply_pika_instrumentation_patches()
72+
73+
if is_installed("aio-pika"):
74+
# pylint: disable=import-outside-toplevel
75+
# Delay import to only occur if patches is safe to apply (e.g. the instrumented library is installed).
76+
from amazon.opentelemetry.distro.patches._aio_pika_patches import _apply_aio_pika_instrumentation_patches
77+
78+
_apply_aio_pika_instrumentation_patches()
79+
9580
# No need to check if library is installed as this patches opentelemetry.sdk,
9681
# which must be installed for the distro to work at all.
9782
_apply_resource_detector_patches()
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""
5+
Patches for OpenTelemetry Pika instrumentation to add code correlation support.
6+
"""
7+
# pylint: disable=duplicate-code
8+
9+
import functools
10+
import logging
11+
12+
from amazon.opentelemetry.distro.aws_opentelemetry_configurator import get_code_correlation_enabled_status
13+
from amazon.opentelemetry.distro.code_correlation.utils import add_code_attributes_to_span
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
def patch_decorate_callback(original_decorate_callback):
19+
"""Patch _decorate_callback to add code attributes to span."""
20+
21+
@functools.wraps(original_decorate_callback)
22+
def patched_decorate_callback(callback, tracer, task_name, consume_hook):
23+
# Create an enhanced consume_hook that adds code attributes
24+
def enhanced_consume_hook(span, body, properties):
25+
# First add code attributes for the callback
26+
if span and span.is_recording():
27+
try:
28+
add_code_attributes_to_span(span, callback)
29+
except Exception: # pylint: disable=broad-exception-caught
30+
pass
31+
32+
try:
33+
consume_hook(span, body, properties)
34+
except Exception: # pylint: disable=broad-exception-caught
35+
pass
36+
37+
# Call original with our enhanced hook
38+
return original_decorate_callback(callback, tracer, task_name, enhanced_consume_hook)
39+
40+
return patched_decorate_callback
41+
42+
43+
def _apply_pika_instrumentation_patches():
44+
"""Apply pika patches if code correlation is enabled."""
45+
try:
46+
if get_code_correlation_enabled_status() is not True:
47+
return
48+
49+
# Import pika_utils inside function to allow proper testing
50+
try:
51+
# pylint: disable=import-outside-toplevel
52+
from opentelemetry.instrumentation.pika import utils as pika_utils
53+
except ImportError:
54+
logger.warning("Failed to apply Pika patches: pika utils not available")
55+
return
56+
57+
# Patch _decorate_callback
58+
# pylint: disable=protected-access
59+
pika_utils._decorate_callback = patch_decorate_callback(pika_utils._decorate_callback)
60+
61+
except Exception as exc: # pylint: disable=broad-exception-caught
62+
logger.warning("Failed to apply Pika patches: %s", exc)

0 commit comments

Comments
 (0)