Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions aws-opentelemetry-distro/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies = [
"opentelemetry-instrumentation-aws-lambda == 0.54b1",
"opentelemetry-instrumentation-aio-pika == 0.54b1",
"opentelemetry-instrumentation-aiohttp-client == 0.54b1",
"opentelemetry-instrumentation-aiokafka == 0.54b1",
"opentelemetry-instrumentation-aiopg == 0.54b1",
"opentelemetry-instrumentation-asgi == 0.54b1",
"opentelemetry-instrumentation-asyncpg == 0.54b1",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

"""
Patches for OpenTelemetry Aio-Pika instrumentation to add code correlation support.
"""

import functools
import logging

from amazon.opentelemetry.distro.aws_opentelemetry_configurator import get_code_correlation_enabled_status
from amazon.opentelemetry.distro.code_correlation.utils import add_code_attributes_to_span
from opentelemetry import trace

logger = logging.getLogger(__name__)


def patch_callback_decorator_decorate(original_decorate):
"""Patch CallbackDecorator.decorate to add code attributes to span."""

@functools.wraps(original_decorate)
def patched_decorate(self, callback):
# Decorate the original callback to add code attributes
async def enhanced_callback(message):
# Get current active span
current_span = trace.get_current_span()
if current_span and current_span.is_recording():
try:
add_code_attributes_to_span(current_span, callback)
except Exception: # pylint: disable=broad-exception-caught
pass

# Call original callback
return await callback(message)

# Call original decorate method with our enhanced callback
return original_decorate(self, enhanced_callback)

return patched_decorate


def _apply_aio_pika_instrumentation_patches():
"""Apply aio-pika patches if code correlation is enabled."""
try:
if get_code_correlation_enabled_status() is not True:
return

# Import CallbackDecorator inside function to allow proper testing
try:
# pylint: disable=import-outside-toplevel
from opentelemetry.instrumentation.aio_pika.callback_decorator import CallbackDecorator
except ImportError:
logger.warning("Failed to apply Aio-Pika patches: CallbackDecorator not available")
return

# Patch CallbackDecorator.decorate
CallbackDecorator.decorate = patch_callback_decorator_decorate(CallbackDecorator.decorate)

except Exception as exc: # pylint: disable=broad-exception-caught
logger.warning("Failed to apply Aio-Pika patches: %s", exc)
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

"""
Patches for OpenTelemetry Celery instrumentation to add code correlation support.

This module provides patches to enhance the Celery instrumentation with code correlation
capabilities, allowing tracking of user code that is executed within Celery tasks.
"""

import functools
import logging
from typing import Any, Callable, Optional

from amazon.opentelemetry.distro.aws_opentelemetry_configurator import get_code_correlation_enabled_status
from amazon.opentelemetry.distro.code_correlation.utils import add_code_attributes_to_span

logger = logging.getLogger(__name__)

# Import at module level to avoid pylint import-outside-toplevel
try:
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.instrumentation.celery import utils as celery_utils
except ImportError:
celery_utils = None
CeleryInstrumentor = None


def _extract_task_function(task) -> Optional[Callable[..., Any]]: # pylint: disable=too-many-return-statements
"""
Extract the actual user function from a Celery task object.

Args:
task: The Celery task object

Returns:
The underlying user function if found, None otherwise
"""
if task is None:
return None

try:
# For regular function-based tasks, the actual function is stored in task.run
if hasattr(task, "run") and callable(task.run):
func = task.run
if hasattr(func, "__func__"):
return func.__func__
if func.__name__ != "run": # Avoid returning generic run methods
return func

# For function-based tasks, the original function might be stored differently
if hasattr(task, "__call__") and callable(task.__call__):
func = task.__call__
if hasattr(func, "__func__") and func.__func__.__name__ != "__call__":
return func.__func__
if func.__name__ != "__call__":
return func

# Try to get the original function from __wrapped__ attribute
if hasattr(task, "__wrapped__") and callable(task.__wrapped__):
return task.__wrapped__

except Exception: # pylint: disable=broad-exception-caught
pass

return None


def _add_code_correlation_to_span(span, task) -> None:
"""
Add code correlation attributes to a span for a Celery task.

Args:
span: The OpenTelemetry span to add attributes to
task: The Celery task object
"""
try:
if span is None or not span.is_recording():
return

user_function = _extract_task_function(task)
if user_function is not None:
add_code_attributes_to_span(span, user_function)

except Exception: # pylint: disable=broad-exception-caught
pass


def patch_celery_prerun(original_trace_prerun: Callable) -> Callable:
"""
Patch the Celery _trace_prerun method to add code correlation support.

Args:
original_trace_prerun: The original _trace_prerun method to wrap

Returns:
The patched _trace_prerun method
"""

@functools.wraps(original_trace_prerun)
def patched_trace_prerun(self, *args, **kwargs):
result = original_trace_prerun(self, *args, **kwargs)

try:
task = kwargs.get("task")
task_id = kwargs.get("task_id")

if task is not None and task_id is not None and celery_utils is not None:
ctx = celery_utils.retrieve_context(task, task_id)
if ctx is not None:
span, _, _ = ctx
if span is not None:
_add_code_correlation_to_span(span, task)

except Exception: # pylint: disable=broad-exception-caught
pass

return result

return patched_trace_prerun


def _apply_celery_instrumentation_patches():
"""
Apply code correlation patches to the Celery instrumentation.
"""
try:
if get_code_correlation_enabled_status() is not True:
return

if CeleryInstrumentor is None:
logger.warning("Failed to apply Celery patches: CeleryInstrumentor not available")
return

original_trace_prerun = CeleryInstrumentor._trace_prerun
CeleryInstrumentor._trace_prerun = patch_celery_prerun(original_trace_prerun)

except Exception as exc: # pylint: disable=broad-exception-caught
logger.warning("Failed to apply Celery instrumentation patches: %s", exc)
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def patched_process_view(
): # pylint: disable=too-many-locals,too-many-nested-blocks,too-many-branches
"""Patched process_view method to add code attributes to the span."""
# First call the original process_view method
# pylint: disable=assignment-from-none
result = original_process_view(self, request, view_func, *args, **kwargs)

# Add code attributes if we have a span and view function
Expand Down Expand Up @@ -120,12 +121,12 @@ def patched_instrument(self, **kwargs):
_patch_django_middleware()

# Call the original _instrument method
original_instrument(self, **kwargs)
original_instrument(self, **kwargs) # pylint: disable=assignment-from-none

def patched_uninstrument(self, **kwargs):
"""Patched _uninstrument method with Django middleware patch restoration"""
# Call the original _uninstrument method first
original_uninstrument(self, **kwargs)
original_uninstrument(self, **kwargs) # pylint: disable=assignment-from-none

# Restore original Django middleware
_unpatch_django_middleware()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
_logger: Logger = getLogger(__name__)


def apply_instrumentation_patches() -> None:
def apply_instrumentation_patches() -> None: # pylint: disable=too-many-branches
"""Apply patches to upstream instrumentation libraries.

This method is invoked to apply changes to upstream instrumentation libraries, typically when changes to upstream
Expand Down Expand Up @@ -92,6 +92,27 @@ def apply_instrumentation_patches() -> None:

_apply_django_instrumentation_patches()

if is_installed("celery"):
# pylint: disable=import-outside-toplevel
# Delay import to only occur if patches is safe to apply (e.g. the instrumented library is installed).
from amazon.opentelemetry.distro.patches._celery_patches import _apply_celery_instrumentation_patches

_apply_celery_instrumentation_patches()

if is_installed("pika"):
# pylint: disable=import-outside-toplevel
# Delay import to only occur if patches is safe to apply (e.g. the instrumented library is installed).
from amazon.opentelemetry.distro.patches._pika_patches import _apply_pika_instrumentation_patches

_apply_pika_instrumentation_patches()

if is_installed("aio-pika"):
# pylint: disable=import-outside-toplevel
# Delay import to only occur if patches is safe to apply (e.g. the instrumented library is installed).
from amazon.opentelemetry.distro.patches._aio_pika_patches import _apply_aio_pika_instrumentation_patches

_apply_aio_pika_instrumentation_patches()

# No need to check if library is installed as this patches opentelemetry.sdk,
# which must be installed for the distro to work at all.
_apply_resource_detector_patches()
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

"""
Patches for OpenTelemetry Pika instrumentation to add code correlation support.
"""

import functools
import logging

from amazon.opentelemetry.distro.aws_opentelemetry_configurator import get_code_correlation_enabled_status
from amazon.opentelemetry.distro.code_correlation.utils import add_code_attributes_to_span

logger = logging.getLogger(__name__)


def patch_decorate_callback(original_decorate_callback):
"""Patch _decorate_callback to add code attributes to span."""

@functools.wraps(original_decorate_callback)
def patched_decorate_callback(callback, tracer, task_name, consume_hook):
# Create an enhanced consume_hook that adds code attributes
def enhanced_consume_hook(span, body, properties):
# First add code attributes for the callback
if span and span.is_recording():
try:
add_code_attributes_to_span(span, callback)
except Exception: # pylint: disable=broad-exception-caught
pass

# Then call the original consume_hook if it exists
if consume_hook:
try:
consume_hook(span, body, properties)
except Exception: # pylint: disable=broad-exception-caught
pass

# Call original with our enhanced hook
return original_decorate_callback(callback, tracer, task_name, enhanced_consume_hook)

return patched_decorate_callback


def _apply_pika_instrumentation_patches():
"""Apply pika patches if code correlation is enabled."""
try:
if get_code_correlation_enabled_status() is not True:
return

# Import pika_utils inside function to allow proper testing
try:
# pylint: disable=import-outside-toplevel
from opentelemetry.instrumentation.pika import utils as pika_utils
except ImportError:
logger.warning("Failed to apply Pika patches: pika utils not available")
return

# Patch _decorate_callback
pika_utils._decorate_callback = patch_decorate_callback(pika_utils._decorate_callback)

except Exception as exc: # pylint: disable=broad-exception-caught
logger.warning("Failed to apply Pika patches: %s", exc)
Loading
Loading