From 409cb6a71dc1c9b4abf5f4ab0e58bc36034a2a17 Mon Sep 17 00:00:00 2001 From: Michael He <53622546+yiyuan-he@users.noreply.github.com> Date: Sat, 17 May 2025 12:00:32 -0700 Subject: [PATCH 1/5] Genesis LLO Support in ADOT SDK (#361) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What does this pull request do? Adds support to handle LLO from third-party instrumentation SDKs in ADOT SDK. The following SDKs are supported: - Traceloop/Openllmetry - OpenInference - OpenLit Note: OTel dependencies in ADOT SDK have been loosened as a short-term workaround to support the various conflicting dependency requirements of third-party instrumentation SDKs. ## Test plan Built this custom ADOT SDK into various sample apps and exported the span and logs data to the OTLP X-Ray and Logs endpoint, respectively, to validate the LLO extraction and transformation to Gen AI Events. Configurations tested: - LangChain + Traceloop/Openllmetry - LangChan + OpenInference - LangChain + OpenLit - CrewAI + Traceloop/Openllmetry - CrewAI + OpenInference - CrewAI + OpenLit Environment variable configuration: ``` λ env OTEL_METRICS_EXPORTER=none \ OTEL_TRACES_EXPORTER=otlp \ OTEL_LOGS_EXPORTER=otlp \ OTEL_PYTHON_DISTRO=aws_distro \ OTEL_PYTHON_CONFIGURATOR=aws_configurator \ OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf \ OTEL_EXPORTER_OTLP_LOGS_HEADERS="x-aws-log-group=test,x-aws-log-stream=default" \ OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=https://xray.us-east-1.amazonaws.com/v1/traces \ OTEL_EXPORTER_OTLP_LOGS_ENDPOINT=https://logs.us-east-1.amazonaws.com/v1/logs \ OTEL_RESOURCE_ATTRIBUTES="service.name=langchain-app" \ OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED="true" \ OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT="true" \ OTEL_PYTHON_DISABLED_INSTRUMENTATIONS="http,sqlalchemy,psycopg2,pymysql,sqlite3,aiopg,asyncpg,mysql_connector,botocore,boto3,urllib3,requests,starlette" \ AGENT_OBSERVABILITY_ENABLED="true" \ python app.py ``` By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. --- aws-opentelemetry-distro/pyproject.toml | 110 +-- .../src/amazon/opentelemetry/distro/_utils.py | 5 + .../distro/aws_opentelemetry_configurator.py | 22 +- .../otlp/aws/traces/otlp_aws_span_exporter.py | 20 +- .../opentelemetry/distro/llo_handler.py | 681 ++++++++++++++++++ .../distro/patches/_bedrock_patches.py | 10 +- .../distro/patches/_botocore_patches.py | 10 +- .../distro/test_instrumentation_patch.py | 5 +- .../opentelemetry/distro/test_llo_handler.py | 651 +++++++++++++++++ 9 files changed, 1443 insertions(+), 71 deletions(-) create mode 100644 aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py create mode 100644 aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py diff --git a/aws-opentelemetry-distro/pyproject.toml b/aws-opentelemetry-distro/pyproject.toml index d4f9ca204..cc738c435 100644 --- a/aws-opentelemetry-distro/pyproject.toml +++ b/aws-opentelemetry-distro/pyproject.toml @@ -25,61 +25,61 @@ classifiers = [ ] dependencies = [ - "opentelemetry-api == 1.27.0", - "opentelemetry-sdk == 1.27.0", - "opentelemetry-exporter-otlp-proto-grpc == 1.27.0", - "opentelemetry-exporter-otlp-proto-http == 1.27.0", - "opentelemetry-propagator-b3 == 1.27.0", - "opentelemetry-propagator-jaeger == 1.27.0", - "opentelemetry-exporter-otlp-proto-common == 1.27.0", - "opentelemetry-sdk-extension-aws == 2.0.2", - "opentelemetry-propagator-aws-xray == 1.0.1", - "opentelemetry-distro == 0.48b0", - "opentelemetry-propagator-ot-trace == 0.48b0", - "opentelemetry-instrumentation == 0.48b0", - "opentelemetry-instrumentation-aws-lambda == 0.48b0", - "opentelemetry-instrumentation-aio-pika == 0.48b0", - "opentelemetry-instrumentation-aiohttp-client == 0.48b0", - "opentelemetry-instrumentation-aiopg == 0.48b0", - "opentelemetry-instrumentation-asgi == 0.48b0", - "opentelemetry-instrumentation-asyncpg == 0.48b0", - "opentelemetry-instrumentation-boto == 0.48b0", - "opentelemetry-instrumentation-boto3sqs == 0.48b0", - "opentelemetry-instrumentation-botocore == 0.48b0", - "opentelemetry-instrumentation-celery == 0.48b0", - "opentelemetry-instrumentation-confluent-kafka == 0.48b0", - "opentelemetry-instrumentation-dbapi == 0.48b0", - "opentelemetry-instrumentation-django == 0.48b0", - "opentelemetry-instrumentation-elasticsearch == 0.48b0", - "opentelemetry-instrumentation-falcon == 0.48b0", - "opentelemetry-instrumentation-fastapi == 0.48b0", - "opentelemetry-instrumentation-flask == 0.48b0", - "opentelemetry-instrumentation-grpc == 0.48b0", - "opentelemetry-instrumentation-httpx == 0.48b0", - "opentelemetry-instrumentation-jinja2 == 0.48b0", - "opentelemetry-instrumentation-kafka-python == 0.48b0", - "opentelemetry-instrumentation-logging == 0.48b0", - "opentelemetry-instrumentation-mysql == 0.48b0", - "opentelemetry-instrumentation-mysqlclient == 0.48b0", - "opentelemetry-instrumentation-pika == 0.48b0", - "opentelemetry-instrumentation-psycopg2 == 0.48b0", - "opentelemetry-instrumentation-pymemcache == 0.48b0", - "opentelemetry-instrumentation-pymongo == 0.48b0", - "opentelemetry-instrumentation-pymysql == 0.48b0", - "opentelemetry-instrumentation-pyramid == 0.48b0", - "opentelemetry-instrumentation-redis == 0.48b0", - "opentelemetry-instrumentation-remoulade == 0.48b0", - "opentelemetry-instrumentation-requests == 0.48b0", - "opentelemetry-instrumentation-sqlalchemy == 0.48b0", - "opentelemetry-instrumentation-sqlite3 == 0.48b0", - "opentelemetry-instrumentation-starlette == 0.48b0", - "opentelemetry-instrumentation-system-metrics == 0.48b0", - "opentelemetry-instrumentation-tornado == 0.48b0", - "opentelemetry-instrumentation-tortoiseorm == 0.48b0", - "opentelemetry-instrumentation-urllib == 0.48b0", - "opentelemetry-instrumentation-urllib3 == 0.48b0", - "opentelemetry-instrumentation-wsgi == 0.48b0", - "opentelemetry-instrumentation-cassandra == 0.48b0", + "opentelemetry-api >= 1.29.0", + "opentelemetry-sdk >= 1.29.0", + "opentelemetry-exporter-otlp-proto-grpc >= 1.29.0", + "opentelemetry-exporter-otlp-proto-http >= 1.29.0", + "opentelemetry-propagator-b3 >= 1.29.0", + "opentelemetry-propagator-jaeger >= 1.29.0", + "opentelemetry-exporter-otlp-proto-common >= 1.29.0", + "opentelemetry-sdk-extension-aws >= 2.0.2", + "opentelemetry-propagator-aws-xray >= 1.0.1", + "opentelemetry-distro >= 0.50b0", + "opentelemetry-propagator-ot-trace >= 0.50b0", + "opentelemetry-instrumentation >= 0.50b0", + "opentelemetry-instrumentation-aws-lambda >= 0.50b0", + "opentelemetry-instrumentation-aio-pika >= 0.50b0", + "opentelemetry-instrumentation-aiohttp-client >= 0.50b0", + "opentelemetry-instrumentation-aiopg >= 0.50b0", + "opentelemetry-instrumentation-asgi >= 0.50b0", + "opentelemetry-instrumentation-asyncpg >= 0.50b0", + "opentelemetry-instrumentation-boto >= 0.50b0", + "opentelemetry-instrumentation-boto3sqs >= 0.50b0", + "opentelemetry-instrumentation-botocore >= 0.50b0", + "opentelemetry-instrumentation-celery >= 0.50b0", + "opentelemetry-instrumentation-confluent-kafka >= 0.50b0", + "opentelemetry-instrumentation-dbapi >= 0.50b0", + "opentelemetry-instrumentation-django >= 0.50b0", + "opentelemetry-instrumentation-elasticsearch >= 0.50b0", + "opentelemetry-instrumentation-falcon >= 0.50b0", + "opentelemetry-instrumentation-fastapi >= 0.50b0", + "opentelemetry-instrumentation-flask >= 0.50b0", + "opentelemetry-instrumentation-grpc >= 0.50b0", + "opentelemetry-instrumentation-httpx >= 0.50b0", + "opentelemetry-instrumentation-jinja2 >= 0.50b0", + "opentelemetry-instrumentation-kafka-python >= 0.50b0", + "opentelemetry-instrumentation-logging >= 0.50b0", + "opentelemetry-instrumentation-mysql >= 0.50b0", + "opentelemetry-instrumentation-mysqlclient >= 0.50b0", + "opentelemetry-instrumentation-pika >= 0.50b0", + "opentelemetry-instrumentation-psycopg2 >= 0.50b0", + "opentelemetry-instrumentation-pymemcache >= 0.50b0", + "opentelemetry-instrumentation-pymongo >= 0.50b0", + "opentelemetry-instrumentation-pymysql >= 0.50b0", + "opentelemetry-instrumentation-pyramid >= 0.50b0", + "opentelemetry-instrumentation-redis >= 0.50b0", + "opentelemetry-instrumentation-remoulade >= 0.50b0", + "opentelemetry-instrumentation-requests >= 0.50b0", + "opentelemetry-instrumentation-sqlalchemy >= 0.50b0", + "opentelemetry-instrumentation-sqlite3 >= 0.50b0", + "opentelemetry-instrumentation-starlette >= 0.50b0", + "opentelemetry-instrumentation-system-metrics >= 0.50b0", + "opentelemetry-instrumentation-tornado >= 0.50b0", + "opentelemetry-instrumentation-tortoiseorm >= 0.50b0", + "opentelemetry-instrumentation-urllib >= 0.50b0", + "opentelemetry-instrumentation-urllib3 >= 0.50b0", + "opentelemetry-instrumentation-wsgi >= 0.50b0", + "opentelemetry-instrumentation-cassandra >= 0.50b0", ] [project.optional-dependencies] diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py index 847f50fb1..011a1d19d 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py @@ -1,6 +1,7 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 +import os import sys from logging import Logger, getLogger @@ -8,6 +9,7 @@ _logger: Logger = getLogger(__name__) +AGENT_OBSERVABILITY_ENABLED = "AGENT_OBSERVABILITY_ENABLED" def is_installed(req: str) -> bool: """Is the given required package installed?""" @@ -21,3 +23,6 @@ def is_installed(req: str) -> bool: _logger.debug("Skipping instrumentation patch: package %s, exception: %s", req, exc) return False return True + +def is_agent_observability_enabled() -> bool: + return os.environ.get(AGENT_OBSERVABILITY_ENABLED, "false").lower() == "true" diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py index e4be93d99..b2a695536 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py @@ -10,6 +10,7 @@ from typing_extensions import override from amazon.opentelemetry.distro._aws_attribute_keys import AWS_LOCAL_SERVICE +from amazon.opentelemetry.distro._utils import is_agent_observability_enabled from amazon.opentelemetry.distro._aws_resource_attribute_configurator import get_service_attribute from amazon.opentelemetry.distro.always_record_sampler import AlwaysRecordSampler from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import ( @@ -27,7 +28,7 @@ from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader from amazon.opentelemetry.distro.scope_based_filtering_view import ScopeBasedRetainingView -from opentelemetry._logs import set_logger_provider +from opentelemetry._logs import set_logger_provider, get_logger_provider from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter @@ -91,6 +92,8 @@ OTEL_EXPORTER_OTLP_LOGS_ENDPOINT = "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" OTEL_EXPORTER_OTLP_LOGS_HEADERS = "OTEL_EXPORTER_OTLP_LOGS_HEADERS" +AGENT_OBSERVABILITY_ENABLED = "AGENT_OBSERVABILITY_ENABLED" + AWS_TRACES_OTLP_ENDPOINT_PATTERN = r"https://xray\.([a-z0-9-]+)\.amazonaws\.com/v1/traces$" AWS_LOGS_OTLP_ENDPOINT_PATTERN = r"https://logs\.([a-z0-9-]+)\.amazonaws\.com/v1/logs$" @@ -160,6 +163,10 @@ def _initialize_components(): sampler_name = _get_sampler() sampler = _custom_import_sampler(sampler_name, resource) + logging_enabled = os.getenv(_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED, "false") + if logging_enabled.strip().lower() == "true": + _init_logging(log_exporters, resource) + _init_tracing( exporters=trace_exporters, id_generator=id_generator, @@ -167,9 +174,6 @@ def _initialize_components(): resource=resource, ) _init_metrics(metric_exporters, resource) - logging_enabled = os.getenv(_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED, "false") - if logging_enabled.strip().lower() == "true": - _init_logging(log_exporters, resource) def _init_logging( @@ -359,7 +363,15 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) -> _logger.info("Detected using AWS OTLP Traces Endpoint.") if isinstance(span_exporter, OTLPSpanExporter): - span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint) + if is_agent_observability_enabled(): + logs_endpoint = os.getenv(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT) + logs_exporter = OTLPAwsLogExporter(endpoint=logs_endpoint) + span_exporter = OTLPAwsSpanExporter( + endpoint=traces_endpoint, + logger_provider=get_logger_provider() + ) + else: + span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint) else: _logger.warning( diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py index 5fd5d744d..7defb5d47 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py @@ -1,11 +1,18 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 -from typing import Dict, Optional +from typing import Dict, Optional, Sequence from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession +from amazon.opentelemetry.distro.llo_handler import LLOHandler +from amazon.opentelemetry.distro._utils import is_agent_observability_enabled +from opentelemetry.sdk._logs import LoggerProvider from opentelemetry.exporter.otlp.proto.http import Compression from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExportResult + +AGENT_OBSERVABILITY_ENABLED = "AGENT_OBSERVABILITY_ENABLED" class OTLPAwsSpanExporter(OTLPSpanExporter): @@ -18,9 +25,13 @@ def __init__( headers: Optional[Dict[str, str]] = None, timeout: Optional[int] = None, compression: Optional[Compression] = None, + logger_provider: Optional[LoggerProvider] = None ): self._aws_region = None + if logger_provider: + self._llo_handler = LLOHandler(logger_provider) + if endpoint: self._aws_region = endpoint.split(".")[1] @@ -35,3 +46,10 @@ def __init__( compression, session=AwsAuthSession(aws_region=self._aws_region, service="xray"), ) + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + if is_agent_observability_enabled(): + llo_processed_spans = self._llo_handler.process_spans(spans) + return super().export(llo_processed_spans) + + return super().export(spans) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py new file mode 100644 index 000000000..4c3706d96 --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py @@ -0,0 +1,681 @@ +import logging +import re + +from typing import Any, Dict, List, Optional, Sequence + +from opentelemetry.attributes import BoundedAttributes +from opentelemetry._events import Event +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._events import EventLoggerProvider +from opentelemetry.sdk.trace import ReadableSpan, Event as SpanEvent + +# Message event types +GEN_AI_SYSTEM_MESSAGE = "gen_ai.system.message" +GEN_AI_USER_MESSAGE = "gen_ai.user.message" +GEN_AI_ASSISTANT_MESSAGE = "gen_ai.assistant.message" + +# Framework-specific attribute keys +TRACELOOP_ENTITY_INPUT = "traceloop.entity.input" +TRACELOOP_ENTITY_OUTPUT = "traceloop.entity.output" +OPENINFERENCE_INPUT_VALUE = "input.value" +OPENINFERENCE_OUTPUT_VALUE = "output.value" +OPENLIT_PROMPT = "gen_ai.prompt" +OPENLIT_COMPLETION = "gen_ai.completion" +OPENLIT_REVISED_PROMPT = "gen_ai.content.revised_prompt" + +# Roles +ROLE_SYSTEM = "system" +ROLE_USER = "user" +ROLE_ASSISTANT = "assistant" + +_logger = logging.getLogger(__name__) + + +class LLOHandler: + """ + Utility class for handling Large Language Objects (LLO) in OpenTelemetry spans. + + LLOHandler performs three primary functions: + 1. Identifies Large Language Objects (LLO) content in spans + 2. Extracts and transforms these attributes into OpenTelemetry Gen AI Events + 3. Filters LLO from spans to maintain privacy and reduce span size + + Supported frameworks and their attribute patterns: + - Standard Gen AI: + - gen_ai.prompt.{n}.content: Structured prompt content + - gen_ai.prompt.{n}.role: Role for prompt content (system, user, assistant, etc.) + - gen_ai.completion.{n}.content: Structured completion content + - gen_ai.completion.{n}.role: Role for completion content (usually assistant) + + - Traceloop: + - traceloop.entity.input: Input text for LLM operations + - traceloop.entity.output: Output text from LLM operations + - traceloop.entity.name: Name of the entity processing the LLO + + - OpenLit: + - gen_ai.prompt: Direct prompt text (treated as user message) + - gen_ai.completion: Direct completion text (treated as assistant message) + - gen_ai.content.revised_prompt: Revised prompt text (treated as system message) + + - OpenInference: + - input.value: Direct input prompt + - output.value: Direct output response + - llm.input_messages.{n}.message.content: Individual structured input messages + - llm.input_messages.{n}.message.role: Role for input messages + - llm.output_messages.{n}.message.content: Individual structured output messages + - llm.output_messages.{n}.message.role: Role for output messages + - llm.model_name: Model name used for the LLM operation + """ + + def __init__(self, logger_provider: LoggerProvider): + """ + Initialize an LLOHandler with the specified logger provider. + + This constructor sets up the event logger provider, configures the event logger, + and initializes the patterns used to identify LLO attributes. + + Args: + logger_provider: The OpenTelemetry LoggerProvider used for emitting events. + Global LoggerProvider instance injected from our AwsOpenTelemetryConfigurator + """ + self._logger_provider = logger_provider + + self._event_logger_provider = EventLoggerProvider(logger_provider=self._logger_provider) + self._event_logger = self._event_logger_provider.get_event_logger("gen_ai.events") + + # Patterns for attribute filtering + self._exact_match_patterns = [ + TRACELOOP_ENTITY_INPUT, + TRACELOOP_ENTITY_OUTPUT, + OPENLIT_PROMPT, + OPENLIT_COMPLETION, + OPENLIT_REVISED_PROMPT, + OPENINFERENCE_INPUT_VALUE, + OPENINFERENCE_OUTPUT_VALUE, + ] + + # Pre-compile regex patterns for better performance + self._regex_patterns = [ + re.compile(r"^gen_ai\.prompt\.\d+\.content$"), + re.compile(r"^gen_ai\.completion\.\d+\.content$"), + re.compile(r"^llm\.input_messages\.\d+\.message\.content$"), + re.compile(r"^llm\.output_messages\.\d+\.message\.content$"), + ] + + # Additional pre-compiled patterns used in extraction methods + self._prompt_content_pattern = re.compile(r"^gen_ai\.prompt\.(\d+)\.content$") + self._completion_content_pattern = re.compile(r"^gen_ai\.completion\.(\d+)\.content$") + self._openinference_input_msg_pattern = re.compile(r"^llm\.input_messages\.(\d+)\.message\.content$") + self._openinference_output_msg_pattern = re.compile(r"^llm\.output_messages\.(\d+)\.message\.content$") + + def process_spans(self, spans: Sequence[ReadableSpan]) -> List[ReadableSpan]: + """ + Processes a sequence of spans to extract and filter LLO attributes. + + For each span, this method: + 1. Extracts LLO attributes and emits them as Gen AI Events + 2. Filters out LLO attributes from the span to maintain privacy + 3. Processes any LLO attributes in span events + 4. Preserves non-LLO attributes in the span + + Handles LLO attributes from multiple frameworks: + - Standard Gen AI (structured prompt/completion pattern) + - Traceloop (entity input/output pattern) + - OpenLit (direct prompt/completion pattern) + - OpenInference (input/output value and structured messages pattern) + + Args: + spans: A sequence of OpenTelemetry ReadableSpan objects to process + + Returns: + List[ReadableSpan]: Modified spans with LLO attributes removed + """ + modified_spans = [] + + for span in spans: + self._emit_llo_attributes(span, span.attributes) + updated_attributes = self._filter_attributes(span.attributes) + + if isinstance(span.attributes, BoundedAttributes): + span._attributes = BoundedAttributes( + maxlen=span.attributes.maxlen, + attributes=updated_attributes, + immutable=span.attributes._immutable, + max_value_len=span.attributes.max_value_len, + ) + else: + span._attributes = updated_attributes + + self.process_span_events(span) + + modified_spans.append(span) + + return modified_spans + + def process_span_events(self, span: ReadableSpan) -> None: + """ + Process events within a span to extract and filter LLO attributes. + + For each event in the span, this method: + 1. Emits LLO attributes found in event attributes as Gen AI Events + 2. Filters out LLO attributes from event attributes + 3. Creates updated events with filtered attributes + 4. Replaces the original span events with updated events + + This ensures that LLO attributes are properly handled even when they appear + in span events rather than directly in the span's attributes. + + Args: + span: The ReadableSpan to process events for + + Returns: + None: The span is modified in-place + """ + if not span.events: + return + + updated_events = [] + + for event in span.events: + if not event.attributes: + updated_events.append(event) + continue + + self._emit_llo_attributes(span, event.attributes, event_timestamp=event.timestamp) + + updated_event_attributes = self._filter_attributes(event.attributes) + + if len(updated_event_attributes) != len(event.attributes): + limit = None + if isinstance(event.attributes, BoundedAttributes): + limit = event.attributes.maxlen + + updated_event = SpanEvent( + name=event.name, attributes=updated_event_attributes, timestamp=event.timestamp, limit=limit + ) + + updated_events.append(updated_event) + else: + updated_events.append(event) + + span._events = updated_events + + def _emit_llo_attributes( + self, span: ReadableSpan, attributes: Dict[str, Any], event_timestamp: Optional[int] = None + ) -> None: + """ + Extract Gen AI Events from LLO attributes and emit them via the event logger. + + This method: + 1. Collects LLO attributes from multiple frameworks using specialized extractors + 2. Converts each LLO attribute into appropriate Gen AI Events + 3. Emits all collected events through the event logger + + Supported frameworks: + - Standard Gen AI: Structured prompt/completion with roles + - Traceloop: Entity input/output + - OpenLit: Direct prompt/completion/revised prompt + - OpenInference: Direct values and structured messages + + Args: + span: The source ReadableSpan containing the attributes + attributes: Dictionary of attributes to process + event_timestamp: Optional timestamp to override span timestamps + + Returns: + None: Events are emitted via the event logger + """ + all_events = [] + all_events.extend(self._extract_gen_ai_prompt_events(span, attributes, event_timestamp)) + all_events.extend(self._extract_gen_ai_completion_events(span, attributes, event_timestamp)) + all_events.extend(self._extract_traceloop_events(span, attributes, event_timestamp)) + all_events.extend(self._extract_openlit_span_event_attributes(span, attributes, event_timestamp)) + all_events.extend(self._extract_openinference_attributes(span, attributes, event_timestamp)) + + for event in all_events: + self._event_logger.emit(event) + _logger.debug(f"Emitted Gen AI Event: {event.name}") + + def _filter_attributes(self, attributes: Dict[str, Any]) -> Dict[str, Any]: + """ + Create a new attributes dictionary with LLO attributes removed. + + This method creates a new dictionary containing only non-LLO attributes, + preserving the original values while filtering out sensitive LLO content. + This helps maintain privacy and reduces the size of spans. + + Args: + attributes: Original dictionary of span or event attributes + + Returns: + Dict[str, Any]: New dictionary with LLO attributes removed + """ + filtered_attributes = {} + + for key, value in attributes.items(): + if not self._is_llo_attribute(key): + filtered_attributes[key] = value + + return filtered_attributes + + def _is_llo_attribute(self, key: str) -> bool: + """ + Determine if an attribute key contains LLO content based on pattern matching. + + Checks attribute keys against two types of patterns: + 1. Exact match patterns (complete string equality): + - Traceloop: "traceloop.entity.input", "traceloop.entity.output" + - OpenLit: "gen_ai.prompt", "gen_ai.completion", "gen_ai.content.revised_prompt" + - OpenInference: "input.value", "output.value" + + 2. Regex match patterns (regular expression matching): + - Standard Gen AI: "gen_ai.prompt.{n}.content", "gen_ai.completion.{n}.content" + - OpenInference: "llm.input_messages.{n}.message.content", + "llm.output_messages.{n}.message.content" + + Args: + key: The attribute key to check + + Returns: + bool: True if the key matches any LLO pattern, False otherwise + """ + # Check exact matches first (faster) + if key in self._exact_match_patterns: + return True + + # Then check regex patterns + return any(pattern.match(key) for pattern in self._regex_patterns) + + def _extract_gen_ai_prompt_events( + self, span: ReadableSpan, attributes: Dict[str, Any], event_timestamp: Optional[int] = None + ) -> List[Event]: + """ + Extract Gen AI Events from structured prompt attributes. + + Processes attributes matching the pattern `gen_ai.prompt.{n}.content` and their + associated `gen_ai.prompt.{n}.role` attributes to create appropriate events. + + Event types are determined by the role: + 1. `system` → `gen_ai.system.message` Event + 2. `user` → `gen_ai.user.message` Event + 3. `assistant` → `gen_ai.assistant.message` Event + 4. `function` → `gen_ai.{gen_ai.system}.message` custom Event + 5. `unknown` → `gen_ai.{gen_ai.system}.message` custom Event + + Args: + span: The source ReadableSpan containing the attributes + attributes: Dictionary of attributes to process + event_timestamp: Optional timestamp to override span.start_time + + Returns: + List[Event]: Events created from prompt attributes + """ + events = [] + span_ctx = span.context + gen_ai_system = span.attributes.get("gen_ai.system", "unknown") + + # Use helper method to get appropriate timestamp (prompts are inputs) + prompt_timestamp = self._get_timestamp(span, event_timestamp, is_input=True) + + for key, value in attributes.items(): + match = self._prompt_content_pattern.match(key) + if not match: + continue + + index = match.group(1) + role_key = f"gen_ai.prompt.{index}.role" + role = attributes.get(role_key, "unknown") + + event_attributes = {"gen_ai.system": gen_ai_system, "original_attribute": key} + body = {"content": value, "role": role} + + # Use helper method to determine event name based on role + event_name = self._get_event_name_for_role(role, gen_ai_system) + + event = self._get_gen_ai_event( + name=event_name, + span_ctx=span_ctx, + timestamp=prompt_timestamp, + attributes=event_attributes, + body=body, + ) + + events.append(event) + + return events + + def _extract_gen_ai_completion_events( + self, span: ReadableSpan, attributes: Dict[str, Any], event_timestamp: Optional[int] = None + ) -> List[Event]: + """ + Extract Gen AI Events from structured completion attributes. + + Processes attributes matching the pattern `gen_ai.completion.{n}.content` and their + associated `gen_ai.completion.{n}.role` attributes to create appropriate events. + + Event types are determined by the role: + 1. `assistant` → `gen_ai.assistant.message` Event (most common) + 2. Other roles → `gen_ai.{gen_ai.system}.message` custom Event + + Args: + span: The source ReadableSpan containing the attributes + attributes: Dictionary of attributes to process + event_timestamp: Optional timestamp to override span.end_time + + Returns: + List[Event]: Events created from completion attributes + """ + events = [] + span_ctx = span.context + gen_ai_system = span.attributes.get("gen_ai.system", "unknown") + + # Use helper method to get appropriate timestamp (completions are outputs) + completion_timestamp = self._get_timestamp(span, event_timestamp, is_input=False) + + for key, value in attributes.items(): + match = self._completion_content_pattern.match(key) + if not match: + continue + + index = match.group(1) + role_key = f"gen_ai.completion.{index}.role" + role = attributes.get(role_key, "unknown") + + event_attributes = {"gen_ai.system": gen_ai_system, "original_attribute": key} + body = {"content": value, "role": role} + + # Use helper method to determine event name based on role + event_name = self._get_event_name_for_role(role, gen_ai_system) + + event = self._get_gen_ai_event( + name=event_name, + span_ctx=span_ctx, + timestamp=completion_timestamp, + attributes=event_attributes, + body=body, + ) + + events.append(event) + + return events + + def _extract_traceloop_events( + self, span: ReadableSpan, attributes: Dict[str, Any], event_timestamp: Optional[int] = None + ) -> List[Event]: + """ + Extract Gen AI Events from Traceloop attributes. + + Processes Traceloop-specific attributes: + - `traceloop.entity.input`: Input data (uses span.start_time) + - `traceloop.entity.output`: Output data (uses span.end_time) + - `traceloop.entity.name`: Used as the gen_ai.system value + + Creates generic `gen_ai.{entity_name}.message` events for both input and output. + + Args: + span: The source ReadableSpan containing the attributes + attributes: Dictionary of attributes to process + event_timestamp: Optional timestamp to override span timestamps + + Returns: + List[Event]: Events created from Traceloop attributes + """ + events = [] + span_ctx = span.context + gen_ai_system = span.attributes.get("traceloop.entity.name", "unknown") + + # Use helper methods to get appropriate timestamps + input_timestamp = self._get_timestamp(span, event_timestamp, is_input=True) + output_timestamp = self._get_timestamp(span, event_timestamp, is_input=False) + + traceloop_attrs = [ + (TRACELOOP_ENTITY_INPUT, input_timestamp, ROLE_USER), # Treat input as user role + (TRACELOOP_ENTITY_OUTPUT, output_timestamp, ROLE_ASSISTANT), # Treat output as assistant role + ] + + for attr_key, timestamp, role in traceloop_attrs: + if attr_key in attributes: + event_attributes = {"gen_ai.system": gen_ai_system, "original_attribute": attr_key} + body = {"content": attributes[attr_key], "role": role} + + # Custom event name for Traceloop (always use system-specific format) + event_name = f"gen_ai.{gen_ai_system}.message" + + event = self._get_gen_ai_event( + name=event_name, + span_ctx=span_ctx, + timestamp=timestamp, + attributes=event_attributes, + body=body, + ) + events.append(event) + + return events + + def _extract_openlit_span_event_attributes( + self, span: ReadableSpan, attributes: Dict[str, Any], event_timestamp: Optional[int] = None + ) -> List[Event]: + """ + Extract Gen AI Events from OpenLit direct attributes. + + OpenLit uses direct key-value pairs for LLO attributes: + - `gen_ai.prompt`: Direct prompt text (treated as user message) + - `gen_ai.completion`: Direct completion text (treated as assistant message) + - `gen_ai.content.revised_prompt`: Revised prompt text (treated as system message) + + The event timestamps are set based on attribute type: + - Prompt and revised prompt: span.start_time + - Completion: span.end_time + + Args: + span: The source ReadableSpan containing the attributes + attributes: Dictionary of attributes to process + event_timestamp: Optional timestamp to override span timestamps + + Returns: + List[Event]: Events created from OpenLit attributes + """ + events = [] + span_ctx = span.context + gen_ai_system = span.attributes.get("gen_ai.system", "unknown") + + # Use helper methods to get appropriate timestamps + prompt_timestamp = self._get_timestamp(span, event_timestamp, is_input=True) + completion_timestamp = self._get_timestamp(span, event_timestamp, is_input=False) + + openlit_event_attrs = [ + (OPENLIT_PROMPT, prompt_timestamp, ROLE_USER), # Assume user role for direct prompts + (OPENLIT_COMPLETION, completion_timestamp, ROLE_ASSISTANT), # Assume assistant role for completions + (OPENLIT_REVISED_PROMPT, prompt_timestamp, ROLE_SYSTEM), # Assume system role for revised prompts + ] + + for attr_key, timestamp, role in openlit_event_attrs: + if attr_key in attributes: + event_attributes = {"gen_ai.system": gen_ai_system, "original_attribute": attr_key} + body = {"content": attributes[attr_key], "role": role} + + # Use helper method to determine event name based on role + event_name = self._get_event_name_for_role(role, gen_ai_system) + + event = self._get_gen_ai_event( + name=event_name, + span_ctx=span_ctx, + timestamp=timestamp, + attributes=event_attributes, + body=body, + ) + + events.append(event) + + return events + + def _extract_openinference_attributes( + self, span: ReadableSpan, attributes: Dict[str, Any], event_timestamp: Optional[int] = None + ) -> List[Event]: + """ + Extract Gen AI Events from OpenInference attributes. + + OpenInference uses two patterns for LLO attributes: + 1. Direct values: + - `input.value`: Direct input prompt (treated as user message) + - `output.value`: Direct output response (treated as assistant message) + + 2. Structured messages: + - `llm.input_messages.{n}.message.content`: Individual input messages + - `llm.input_messages.{n}.message.role`: Role for input message + - `llm.output_messages.{n}.message.content`: Individual output messages + - `llm.output_messages.{n}.message.role`: Role for output message + + The LLM model name is extracted from the `llm.model_name` attribute + instead of `gen_ai.system` which other frameworks use. + + Event timestamps are set based on message type: + - Input messages: span.start_time + - Output messages: span.end_time + + Args: + span: The source ReadableSpan containing the attributes + attributes: Dictionary of attributes to process + event_timestamp: Optional timestamp to override span timestamps + + Returns: + List[Event]: Events created from OpenInference attributes + """ + events = [] + span_ctx = span.context + gen_ai_system = span.attributes.get("llm.model_name", "unknown") + + # Use helper methods to get appropriate timestamps + input_timestamp = self._get_timestamp(span, event_timestamp, is_input=True) + output_timestamp = self._get_timestamp(span, event_timestamp, is_input=False) + + # Process direct value attributes + openinference_direct_attrs = [ + (OPENINFERENCE_INPUT_VALUE, input_timestamp, ROLE_USER), + (OPENINFERENCE_OUTPUT_VALUE, output_timestamp, ROLE_ASSISTANT), + ] + + for attr_key, timestamp, role in openinference_direct_attrs: + if attr_key in attributes: + event_attributes = {"gen_ai.system": gen_ai_system, "original_attribute": attr_key} + body = {"content": attributes[attr_key], "role": role} + + # Use helper method to determine event name based on role + event_name = self._get_event_name_for_role(role, gen_ai_system) + + event = self._get_gen_ai_event( + name=event_name, span_ctx=span_ctx, timestamp=timestamp, attributes=event_attributes, body=body + ) + + events.append(event) + + # Process input messages + for key, value in attributes.items(): + match = self._openinference_input_msg_pattern.match(key) + if not match: + continue + + index = match.group(1) + role_key = f"llm.input_messages.{index}.message.role" + role = attributes.get(role_key, ROLE_USER) # Default to user if role not specified + + event_attributes = {"gen_ai.system": gen_ai_system, "original_attribute": key} + body = {"content": value, "role": role} + + # Use helper method to determine event name based on role + event_name = self._get_event_name_for_role(role, gen_ai_system) + + event = self._get_gen_ai_event( + name=event_name, span_ctx=span_ctx, timestamp=input_timestamp, attributes=event_attributes, body=body + ) + + events.append(event) + + # Process output messages + for key, value in attributes.items(): + match = self._openinference_output_msg_pattern.match(key) + if not match: + continue + + index = match.group(1) + role_key = f"llm.output_messages.{index}.message.role" + role = attributes.get(role_key, ROLE_ASSISTANT) # Default to assistant if role not specified + + event_attributes = {"gen_ai.system": gen_ai_system, "original_attribute": key} + body = {"content": value, "role": role} + + # Use helper method to determine event name based on role + event_name = self._get_event_name_for_role(role, gen_ai_system) + + event = self._get_gen_ai_event( + name=event_name, span_ctx=span_ctx, timestamp=output_timestamp, attributes=event_attributes, body=body + ) + + events.append(event) + + return events + + def _get_event_name_for_role(self, role: str, gen_ai_system: str) -> str: + """ + Map a message role to the appropriate event name. + + Args: + role: The role of the message (system, user, assistant, etc.) + gen_ai_system: The gen_ai system identifier + + Returns: + str: The appropriate event name for the given role + """ + if role == ROLE_SYSTEM: + return GEN_AI_SYSTEM_MESSAGE + elif role == ROLE_USER: + return GEN_AI_USER_MESSAGE + elif role == ROLE_ASSISTANT: + return GEN_AI_ASSISTANT_MESSAGE + else: + return f"gen_ai.{gen_ai_system}.message" + + def _get_timestamp(self, span: ReadableSpan, event_timestamp: Optional[int], is_input: bool) -> int: + """ + Determine the appropriate timestamp to use for an event. + + Args: + span: The source span + event_timestamp: Optional override timestamp + is_input: Whether this is an input (True) or output (False) message + + Returns: + int: The timestamp to use for the event + """ + if event_timestamp is not None: + return event_timestamp + + return span.start_time if is_input else span.end_time + + def _get_gen_ai_event(self, name, span_ctx, timestamp, attributes, body): + """ + Create and return a Gen AI Event with the specified parameters. + + This helper method constructs a fully configured OpenTelemetry Event object + that includes all necessary fields for proper event propagation and context. + + Args: + name: Event type name (e.g., gen_ai.system.message, gen_ai.user.message) + span_ctx: Span context to extract trace/span IDs from + timestamp: Timestamp for the event (nanoseconds) + attributes: Additional attributes to include with the event + body: Event body containing content and role information + + Returns: + Event: A fully configured OpenTelemetry Gen AI Event object with + proper trace context propagation + """ + return Event( + name=name, + timestamp=timestamp, + attributes=attributes, + body=body, + trace_id=span_ctx.trace_id, + span_id=span_ctx.span_id, + trace_flags=span_ctx.trace_flags, + ) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py index a25e55330..70dfe36c4 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py @@ -192,7 +192,7 @@ def extract_attributes(self, attributes: _AttributeMapT): if request_param_value: attributes[attribute_key] = request_param_value - def on_success(self, span: Span, result: _BotoResultT): + def on_success(self, span: Span, result: _BotoResultT, instrumentor_context=None): if self._operation_class is None: return @@ -220,6 +220,10 @@ def extract_attributes(self, attributes: _AttributeMapT): knowledge_base_id = self._call_context.params.get(_KNOWLEDGE_BASE_ID) if knowledge_base_id: attributes[AWS_BEDROCK_KNOWLEDGE_BASE_ID] = knowledge_base_id + + def on_success(self, span: Span, result: _BotoResultT, instrumentor_context=None): + # Currently no attributes to extract from the result + pass class _BedrockExtension(_AwsSdkExtension): @@ -229,7 +233,7 @@ class _BedrockExtension(_AwsSdkExtension): """ # pylint: disable=no-self-use - def on_success(self, span: Span, result: _BotoResultT): + def on_success(self, span: Span, result: _BotoResultT, instrumentor_context=None): # _GUARDRAIL_ID can only be retrieved from the response, not from the request guardrail_id = result.get(_GUARDRAIL_ID) if guardrail_id: @@ -333,7 +337,7 @@ def _set_if_not_none(attributes, key, value): attributes[key] = value # pylint: disable=too-many-branches - def on_success(self, span: Span, result: Dict[str, Any]): + def on_success(self, span: Span, result: Dict[str, Any], instrumentor_context=None): model_id = self._call_context.params.get(_MODEL_ID) if not model_id: diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py index 0f4a77d1e..d45404a0c 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py @@ -75,8 +75,8 @@ def patch_extract_attributes(self, attributes: _AttributeMapT): old_on_success = _LambdaExtension.on_success - def patch_on_success(self, span: Span, result: _BotoResultT): - old_on_success(self, span, result) + def patch_on_success(self, span: Span, result: _BotoResultT, instrumentor_context=None): + old_on_success(self, span, result, instrumentor_context) lambda_configuration = result.get("Configuration", {}) function_arn = lambda_configuration.get("FunctionArn") if function_arn: @@ -180,8 +180,8 @@ def patch_extract_attributes(self, attributes: _AttributeMapT): old_on_success = _SqsExtension.on_success - def patch_on_success(self, span: Span, result: _BotoResultT): - old_on_success(self, span, result) + def patch_on_success(self, span: Span, result: _BotoResultT, instrumentor_context=None): + old_on_success(self, span, result, instrumentor_context) queue_url = result.get("QueueUrl") if queue_url: span.set_attribute(AWS_SQS_QUEUE_URL, queue_url) @@ -243,7 +243,7 @@ def extract_attributes(self, attributes: _AttributeMapT): attributes[AWS_SECRETSMANAGER_SECRET_ARN] = secret_id # pylint: disable=no-self-use - def on_success(self, span: Span, result: _BotoResultT): + def on_success(self, span: Span, result: _BotoResultT, instrumentor_context=None): secret_arn = result.get("ARN") if secret_arn: span.set_attribute(AWS_SECRETSMANAGER_SECRET_ARN, secret_arn) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py index 87e6c4810..7b62cfb49 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py @@ -147,7 +147,7 @@ def _test_unpatched_botocore_instrumentation(self): ) # BedrockRuntime - self.assertFalse("bedrock-runtime" in _KNOWN_EXTENSIONS, "Upstream has added a bedrock-runtime extension") + self.assertTrue("bedrock-runtime" in _KNOWN_EXTENSIONS, "Upstream has added a bedrock-runtime extension") # SecretsManager self.assertFalse("secretsmanager" in _KNOWN_EXTENSIONS, "Upstream has added a SecretsManager extension") @@ -678,6 +678,7 @@ def _do_on_success( ) -> Dict[str, str]: span_mock: Span = MagicMock() mock_call_context = MagicMock() + mock_instrumentor_context = MagicMock() span_attributes: Dict[str, str] = {} def set_side_effect(set_key, set_value): @@ -692,6 +693,6 @@ def set_side_effect(set_key, set_value): mock_call_context.params = params extension = _KNOWN_EXTENSIONS[service_name]()(mock_call_context) - extension.on_success(span_mock, result) + extension.on_success(span_mock, result, mock_instrumentor_context) return span_attributes diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py new file mode 100644 index 000000000..bb4ac51be --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py @@ -0,0 +1,651 @@ +from unittest import TestCase +from unittest.mock import MagicMock, patch, call + +from amazon.opentelemetry.distro.llo_handler import LLOHandler +from opentelemetry._events import Event +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk.trace import ReadableSpan, SpanContext +from opentelemetry.trace import SpanKind, TraceFlags, TraceState + + +class TestLLOHandler(TestCase): + def setUp(self): + self.logger_provider_mock = MagicMock(spec=LoggerProvider) + self.event_logger_mock = MagicMock() + self.event_logger_provider_mock = MagicMock() + self.event_logger_provider_mock.get_event_logger.return_value = self.event_logger_mock + + with patch( + "amazon.opentelemetry.distro.llo_handler.EventLoggerProvider", return_value=self.event_logger_provider_mock + ): + self.llo_handler = LLOHandler(self.logger_provider_mock) + + def _create_mock_span(self, attributes=None, kind=SpanKind.INTERNAL): + """ + Helper method to create a mock span with given attributes + """ + if attributes is None: + attributes = {} + + span_context = SpanContext( + trace_id=0x123456789ABCDEF0123456789ABCDEF0, + span_id=0x123456789ABCDEF0, + is_remote=False, + trace_flags=TraceFlags.SAMPLED, + trace_state=TraceState.get_default(), + ) + + mock_span = MagicMock(spec=ReadableSpan) + mock_span.context = span_context + mock_span.attributes = attributes + mock_span.kind = kind + mock_span.start_time = 1234567890 + + return mock_span + + def test_init(self): + """ + Test initialization of LLOHandler + """ + self.assertEqual(self.llo_handler._logger_provider, self.logger_provider_mock) + self.assertEqual(self.llo_handler._event_logger_provider, self.event_logger_provider_mock) + self.event_logger_provider_mock.get_event_logger.assert_called_once_with("gen_ai.events") + + def test_is_llo_attribute_match(self): + """ + Test _is_llo_attribute method with matching patterns + """ + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.prompt.0.content")) + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.prompt.123.content")) + + def test_is_llo_attribute_no_match(self): + """ + Test _is_llo_attribute method with non-matching patterns + """ + self.assertFalse(self.llo_handler._is_llo_attribute("gen_ai.prompt.content")) + self.assertFalse(self.llo_handler._is_llo_attribute("gen_ai.prompt.abc.content")) + self.assertFalse(self.llo_handler._is_llo_attribute("some.other.attribute")) + + def test_is_llo_attribute_traceloop_match(self): + """ + Test _is_llo_attribute method with Traceloop patterns + """ + # Test exact matches for Traceloop attributes + self.assertTrue(self.llo_handler._is_llo_attribute("traceloop.entity.input")) + self.assertTrue(self.llo_handler._is_llo_attribute("traceloop.entity.output")) + + def test_is_llo_attribute_openlit_match(self): + """ + Test _is_llo_attribute method with OpenLit patterns + """ + # Test exact matches for direct OpenLit attributes + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.prompt")) + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.completion")) + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.content.revised_prompt")) + + def test_is_llo_attribute_openinference_match(self): + """ + Test _is_llo_attribute method with OpenInference patterns + """ + # Test exact matches + self.assertTrue(self.llo_handler._is_llo_attribute("input.value")) + self.assertTrue(self.llo_handler._is_llo_attribute("output.value")) + + # Test regex matches + self.assertTrue(self.llo_handler._is_llo_attribute("llm.input_messages.0.message.content")) + self.assertTrue(self.llo_handler._is_llo_attribute("llm.output_messages.123.message.content")) + + def test_filter_attributes(self): + """ + Test _filter_attributes method + """ + attributes = { + "gen_ai.prompt.0.content": "test content", + "gen_ai.prompt.0.role": "user", + "normal.attribute": "value", + "another.normal.attribute": 123, + } + + filtered = self.llo_handler._filter_attributes(attributes) + + self.assertNotIn("gen_ai.prompt.0.content", filtered) + self.assertIn("gen_ai.prompt.0.role", filtered) + self.assertIn("normal.attribute", filtered) + self.assertIn("another.normal.attribute", filtered) + + def test_extract_gen_ai_prompt_events_system_role(self): + """ + Test _extract_gen_ai_prompt_events with system role + """ + attributes = { + "gen_ai.prompt.0.content": "system instruction", + "gen_ai.prompt.0.role": "system", + "gen_ai.system": "openai", + } + + span = self._create_mock_span(attributes) + + events = self.llo_handler._extract_gen_ai_prompt_events(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.system.message") + self.assertEqual(event.body["content"], "system instruction") + self.assertEqual(event.body["role"], "system") + self.assertEqual(event.attributes["gen_ai.system"], "openai") + self.assertEqual(event.attributes["original_attribute"], "gen_ai.prompt.0.content") + + def test_extract_gen_ai_prompt_events_user_role(self): + """ + Test _extract_gen_ai_prompt_events with user role + """ + attributes = { + "gen_ai.prompt.0.content": "user question", + "gen_ai.prompt.0.role": "user", + "gen_ai.system": "anthropic", + } + + span = self._create_mock_span(attributes) + + events = self.llo_handler._extract_gen_ai_prompt_events(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.user.message") + self.assertEqual(event.body["content"], "user question") + self.assertEqual(event.body["role"], "user") + self.assertEqual(event.attributes["gen_ai.system"], "anthropic") + self.assertEqual(event.attributes["original_attribute"], "gen_ai.prompt.0.content") + + def test_extract_gen_ai_prompt_events_assistant_role(self): + """ + Test _extract_gen_ai_prompt_events with assistant role + """ + attributes = { + "gen_ai.prompt.1.content": "assistant response", + "gen_ai.prompt.1.role": "assistant", + "gen_ai.system": "anthropic", + } + + span = self._create_mock_span(attributes) + + events = self.llo_handler._extract_gen_ai_prompt_events(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.assistant.message") + self.assertEqual(event.body["content"], "assistant response") + self.assertEqual(event.body["role"], "assistant") + self.assertEqual(event.attributes["gen_ai.system"], "anthropic") + self.assertEqual(event.attributes["original_attribute"], "gen_ai.prompt.1.content") + + def test_extract_gen_ai_prompt_events_function_role(self): + """ + Test _extract_gen_ai_prompt_events with function role + """ + attributes = { + "gen_ai.prompt.2.content": "function data", + "gen_ai.prompt.2.role": "function", + "gen_ai.system": "openai", + } + + span = self._create_mock_span(attributes) + events = self.llo_handler._extract_gen_ai_prompt_events(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.openai.message") + self.assertEqual(event.body["content"], "function data") + self.assertEqual(event.body["role"], "function") + self.assertEqual(event.attributes["gen_ai.system"], "openai") + self.assertEqual(event.attributes["original_attribute"], "gen_ai.prompt.2.content") + + def test_extract_gen_ai_prompt_events_unknown_role(self): + """ + Test _extract_gen_ai_prompt_events with unknown role + """ + attributes = { + "gen_ai.prompt.3.content": "unknown type content", + "gen_ai.prompt.3.role": "unknown", + "gen_ai.system": "bedrock", + } + + span = self._create_mock_span(attributes) + events = self.llo_handler._extract_gen_ai_prompt_events(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.bedrock.message") + self.assertEqual(event.body["content"], "unknown type content") + self.assertEqual(event.body["role"], "unknown") + self.assertEqual(event.attributes["gen_ai.system"], "bedrock") + + def test_extract_gen_ai_completion_events_assistant_role(self): + """ + Test _extract_gen_ai_completion_events with assistant role + """ + attributes = { + "gen_ai.completion.0.content": "assistant completion", + "gen_ai.completion.0.role": "assistant", + "gen_ai.system": "openai", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 # end time for completion events + + events = self.llo_handler._extract_gen_ai_completion_events(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.assistant.message") + self.assertEqual(event.body["content"], "assistant completion") + self.assertEqual(event.body["role"], "assistant") + self.assertEqual(event.attributes["gen_ai.system"], "openai") + self.assertEqual(event.timestamp, 1234567899) + + def test_extract_gen_ai_completion_events_other_role(self): + """ + Test _extract_gen_ai_completion_events with non-assistant role + """ + attributes = { + "gen_ai.completion.1.content": "other completion", + "gen_ai.completion.1.role": "other", + "gen_ai.system": "anthropic", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + events = self.llo_handler._extract_gen_ai_completion_events(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.anthropic.message") + self.assertEqual(event.body["content"], "other completion") + self.assertEqual(event.attributes["gen_ai.system"], "anthropic") + + def test_extract_traceloop_events(self): + """ + Test _extract_traceloop_events + """ + attributes = { + "traceloop.entity.input": "input data", + "traceloop.entity.output": "output data", + "traceloop.entity.name": "my_entity", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + events = self.llo_handler._extract_traceloop_events(span, attributes) + + self.assertEqual(len(events), 2) + + input_event = events[0] + self.assertEqual(input_event.name, "gen_ai.my_entity.message") + self.assertEqual(input_event.body["content"], "input data") + self.assertEqual(input_event.attributes["gen_ai.system"], "my_entity") + self.assertEqual(input_event.attributes["original_attribute"], "traceloop.entity.input") + self.assertEqual(input_event.timestamp, 1234567890) # start_time + + output_event = events[1] + self.assertEqual(output_event.name, "gen_ai.my_entity.message") + self.assertEqual(output_event.body["content"], "output data") + self.assertEqual(output_event.attributes["gen_ai.system"], "my_entity") + self.assertEqual(output_event.attributes["original_attribute"], "traceloop.entity.output") + self.assertEqual(output_event.timestamp, 1234567899) # end_time + + def test_extract_openlit_direct_prompt(self): + """ + Test _extract_openlit_span_event_attributes with direct prompt attribute + """ + attributes = { + "gen_ai.prompt": "user direct prompt", + "gen_ai.system": "openlit" + } + + span = self._create_mock_span(attributes) + + events = self.llo_handler._extract_openlit_span_event_attributes(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.user.message") + self.assertEqual(event.body["content"], "user direct prompt") + self.assertEqual(event.body["role"], "user") + self.assertEqual(event.attributes["gen_ai.system"], "openlit") + self.assertEqual(event.attributes["original_attribute"], "gen_ai.prompt") + self.assertEqual(event.timestamp, 1234567890) # start_time + + def test_extract_openlit_direct_completion(self): + """ + Test _extract_openlit_span_event_attributes with direct completion attribute + """ + attributes = { + "gen_ai.completion": "assistant direct completion", + "gen_ai.system": "openlit" + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + events = self.llo_handler._extract_openlit_span_event_attributes(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.assistant.message") + self.assertEqual(event.body["content"], "assistant direct completion") + self.assertEqual(event.body["role"], "assistant") + self.assertEqual(event.attributes["gen_ai.system"], "openlit") + self.assertEqual(event.attributes["original_attribute"], "gen_ai.completion") + self.assertEqual(event.timestamp, 1234567899) # end_time + + def test_extract_openlit_all_attributes(self): + """ + Test _extract_openlit_span_event_attributes with all OpenLit attributes + """ + attributes = { + "gen_ai.prompt": "user prompt", + "gen_ai.completion": "assistant response", + "gen_ai.content.revised_prompt": "revised prompt", + "gen_ai.system": "langchain" + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + events = self.llo_handler._extract_openlit_span_event_attributes(span, attributes) + + self.assertEqual(len(events), 3) + + # Check that all events have the correct system + for event in events: + self.assertEqual(event.attributes["gen_ai.system"], "langchain") + + # Check we have the expected event types + event_types = {event.name for event in events} + self.assertIn("gen_ai.user.message", event_types) + self.assertIn("gen_ai.assistant.message", event_types) + self.assertIn("gen_ai.system.message", event_types) + + # Check original attributes + original_attrs = {event.attributes["original_attribute"] for event in events} + self.assertIn("gen_ai.prompt", original_attrs) + self.assertIn("gen_ai.completion", original_attrs) + self.assertIn("gen_ai.content.revised_prompt", original_attrs) + + def test_extract_openlit_revised_prompt(self): + """ + Test _extract_openlit_span_event_attributes with revised prompt attribute + """ + attributes = { + "gen_ai.content.revised_prompt": "revised system prompt", + "gen_ai.system": "openlit" + } + + span = self._create_mock_span(attributes) + + events = self.llo_handler._extract_openlit_span_event_attributes(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.system.message") + self.assertEqual(event.body["content"], "revised system prompt") + self.assertEqual(event.body["role"], "system") + self.assertEqual(event.attributes["gen_ai.system"], "openlit") + self.assertEqual(event.attributes["original_attribute"], "gen_ai.content.revised_prompt") + self.assertEqual(event.timestamp, 1234567890) # start_time + + def test_extract_openinference_direct_attributes(self): + """ + Test _extract_openinference_attributes with direct input/output values + """ + attributes = { + "input.value": "user prompt", + "output.value": "assistant response", + "llm.model_name": "gpt-4", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + events = self.llo_handler._extract_openinference_attributes(span, attributes) + + self.assertEqual(len(events), 2) + + input_event = events[0] + self.assertEqual(input_event.name, "gen_ai.user.message") + self.assertEqual(input_event.body["content"], "user prompt") + self.assertEqual(input_event.body["role"], "user") + self.assertEqual(input_event.attributes["gen_ai.system"], "gpt-4") + self.assertEqual(input_event.attributes["original_attribute"], "input.value") + self.assertEqual(input_event.timestamp, 1234567890) # start_time + + output_event = events[1] + self.assertEqual(output_event.name, "gen_ai.assistant.message") + self.assertEqual(output_event.body["content"], "assistant response") + self.assertEqual(output_event.body["role"], "assistant") + self.assertEqual(output_event.attributes["gen_ai.system"], "gpt-4") + self.assertEqual(output_event.attributes["original_attribute"], "output.value") + self.assertEqual(output_event.timestamp, 1234567899) # end_time + + def test_extract_openinference_structured_input_messages(self): + """ + Test _extract_openinference_attributes with structured input messages + """ + attributes = { + "llm.input_messages.0.message.content": "system prompt", + "llm.input_messages.0.message.role": "system", + "llm.input_messages.1.message.content": "user message", + "llm.input_messages.1.message.role": "user", + "llm.model_name": "claude-3", + } + + span = self._create_mock_span(attributes) + + events = self.llo_handler._extract_openinference_attributes(span, attributes) + + self.assertEqual(len(events), 2) + + system_event = events[0] + self.assertEqual(system_event.name, "gen_ai.system.message") + self.assertEqual(system_event.body["content"], "system prompt") + self.assertEqual(system_event.body["role"], "system") + self.assertEqual(system_event.attributes["gen_ai.system"], "claude-3") + self.assertEqual(system_event.attributes["original_attribute"], "llm.input_messages.0.message.content") + + user_event = events[1] + self.assertEqual(user_event.name, "gen_ai.user.message") + self.assertEqual(user_event.body["content"], "user message") + self.assertEqual(user_event.body["role"], "user") + self.assertEqual(user_event.attributes["gen_ai.system"], "claude-3") + self.assertEqual(user_event.attributes["original_attribute"], "llm.input_messages.1.message.content") + + def test_extract_openinference_structured_output_messages(self): + """ + Test _extract_openinference_attributes with structured output messages + """ + attributes = { + "llm.output_messages.0.message.content": "assistant response", + "llm.output_messages.0.message.role": "assistant", + "llm.model_name": "llama-3", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + events = self.llo_handler._extract_openinference_attributes(span, attributes) + + self.assertEqual(len(events), 1) + + output_event = events[0] + self.assertEqual(output_event.name, "gen_ai.assistant.message") + self.assertEqual(output_event.body["content"], "assistant response") + self.assertEqual(output_event.body["role"], "assistant") + self.assertEqual(output_event.attributes["gen_ai.system"], "llama-3") + self.assertEqual(output_event.attributes["original_attribute"], "llm.output_messages.0.message.content") + self.assertEqual(output_event.timestamp, 1234567899) # end_time + + def test_extract_openinference_mixed_attributes(self): + """ + Test _extract_openinference_attributes with a mix of all attribute types + """ + attributes = { + "input.value": "direct input", + "output.value": "direct output", + "llm.input_messages.0.message.content": "message input", + "llm.input_messages.0.message.role": "user", + "llm.output_messages.0.message.content": "message output", + "llm.output_messages.0.message.role": "assistant", + "llm.model_name": "bedrock.claude-3", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + events = self.llo_handler._extract_openinference_attributes(span, attributes) + + self.assertEqual(len(events), 4) + + # Verify all events have the correct model name + for event in events: + self.assertEqual(event.attributes["gen_ai.system"], "bedrock.claude-3") + + # We don't need to check every detail since other tests do that, + # but we can verify we got all the expected event types + event_types = {event.name for event in events} + self.assertIn("gen_ai.user.message", event_types) + self.assertIn("gen_ai.assistant.message", event_types) + + # Verify original attributes were correctly captured + original_attrs = {event.attributes["original_attribute"] for event in events} + self.assertIn("input.value", original_attrs) + self.assertIn("output.value", original_attrs) + self.assertIn("llm.input_messages.0.message.content", original_attrs) + self.assertIn("llm.output_messages.0.message.content", original_attrs) + + def test_emit_llo_attributes(self): + """ + Test _emit_llo_attributes + """ + attributes = { + "gen_ai.prompt.0.content": "prompt content", + "gen_ai.prompt.0.role": "user", + "gen_ai.completion.0.content": "completion content", + "gen_ai.completion.0.role": "assistant", + "traceloop.entity.input": "traceloop input", + "traceloop.entity.name": "entity_name", + "gen_ai.system": "openai", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + with patch.object(self.llo_handler, "_extract_gen_ai_prompt_events") as mock_extract_prompt, patch.object( + self.llo_handler, "_extract_gen_ai_completion_events" + ) as mock_extract_completion, patch.object( + self.llo_handler, "_extract_traceloop_events" + ) as mock_extract_traceloop, patch.object( + self.llo_handler, "_extract_openlit_span_event_attributes" + ) as mock_extract_openlit, patch.object( + self.llo_handler, "_extract_openinference_attributes" + ) as mock_extract_openinference: + + # Create mocks with name attribute properly set + prompt_event = MagicMock(spec=Event) + prompt_event.name = "gen_ai.user.message" + + completion_event = MagicMock(spec=Event) + completion_event.name = "gen_ai.assistant.message" + + traceloop_event = MagicMock(spec=Event) + traceloop_event.name = "gen_ai.entity.message" + + openlit_event = MagicMock(spec=Event) + openlit_event.name = "gen_ai.langchain.message" + + openinference_event = MagicMock(spec=Event) + openinference_event.name = "gen_ai.anthropic.message" + + mock_extract_prompt.return_value = [prompt_event] + mock_extract_completion.return_value = [completion_event] + mock_extract_traceloop.return_value = [traceloop_event] + mock_extract_openlit.return_value = [openlit_event] + mock_extract_openinference.return_value = [openinference_event] + + self.llo_handler._emit_llo_attributes(span, attributes) + + mock_extract_prompt.assert_called_once_with(span, attributes, None) + mock_extract_completion.assert_called_once_with(span, attributes, None) + mock_extract_traceloop.assert_called_once_with(span, attributes, None) + mock_extract_openlit.assert_called_once_with(span, attributes, None) + mock_extract_openinference.assert_called_once_with(span, attributes, None) + + self.event_logger_mock.emit.assert_has_calls( + [ + call(prompt_event), + call(completion_event), + call(traceloop_event), + call(openlit_event), + call(openinference_event), + ] + ) + + def test_process_spans(self): + """ + Test process_spans + """ + attributes = {"gen_ai.prompt.0.content": "prompt content", "normal.attribute": "normal value"} + + span = self._create_mock_span(attributes) + + with patch.object(self.llo_handler, "_emit_llo_attributes") as mock_emit, patch.object( + self.llo_handler, "_filter_attributes" + ) as mock_filter: + + filtered_attributes = {"normal.attribute": "normal value"} + mock_filter.return_value = filtered_attributes + + result = self.llo_handler.process_spans([span]) + + mock_emit.assert_called_once_with(span, attributes) + mock_filter.assert_called_once_with(attributes) + + self.assertEqual(len(result), 1) + self.assertEqual(result[0], span) + # Access the _attributes property that was set by the process_spans method + self.assertEqual(result[0]._attributes, filtered_attributes) + + def test_process_spans_with_bounded_attributes(self): + """ + Test process_spans with BoundedAttributes + """ + from opentelemetry.attributes import BoundedAttributes + + bounded_attrs = BoundedAttributes( + maxlen=10, + attributes={"gen_ai.prompt.0.content": "prompt content", "normal.attribute": "normal value"}, + immutable=False, + max_value_len=1000, + ) + + span = self._create_mock_span(bounded_attrs) + + with patch.object(self.llo_handler, "_emit_llo_attributes") as mock_emit, patch.object( + self.llo_handler, "_filter_attributes" + ) as mock_filter: + + filtered_attributes = {"normal.attribute": "normal value"} + mock_filter.return_value = filtered_attributes + + result = self.llo_handler.process_spans([span]) + + mock_emit.assert_called_once_with(span, bounded_attrs) + mock_filter.assert_called_once_with(bounded_attrs) + + self.assertEqual(len(result), 1) + self.assertEqual(result[0], span) + # Check that we got a BoundedAttributes instance + self.assertIsInstance(result[0]._attributes, BoundedAttributes) + # Check the underlying dictionary content + self.assertEqual(dict(result[0]._attributes), filtered_attributes) From 6572783eccf98b732fdb3d374f3bd3637dc5d5bd Mon Sep 17 00:00:00 2001 From: liustve Date: Fri, 16 May 2025 19:07:09 +0000 Subject: [PATCH 2/5] init genesis custom log behavior --- .../distro/aws_opentelemetry_configurator.py | 42 +++-- .../exporter/otlp/aws/common/constants.py | 2 + .../logs/aws_batch_log_record_processor.py | 134 +++++++++++++++ .../otlp/aws/logs/otlp_aws_logs_exporter.py | 155 +++++++++++++++++- .../otlp/aws/traces/otlp_aws_span_exporter.py | 2 +- .../opentelemetry/distro/llo_handler.py | 2 +- 6 files changed, 318 insertions(+), 19 deletions(-) create mode 100644 aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/common/constants.py create mode 100644 aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/aws_batch_log_record_processor.py diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py index b2a695536..8809455ad 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py @@ -3,7 +3,7 @@ # Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License. import os import re -from logging import NOTSET, Logger, getLogger +from logging import NOTSET, CRITICAL, Logger, getLogger from typing import ClassVar, Dict, List, Type, Union from importlib_metadata import version @@ -83,6 +83,7 @@ DEPRECATED_APP_SIGNALS_EXPORTER_ENDPOINT_CONFIG = "OTEL_AWS_APP_SIGNALS_EXPORTER_ENDPOINT" APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG = "OTEL_AWS_APPLICATION_SIGNALS_EXPORTER_ENDPOINT" METRIC_EXPORT_INTERVAL_CONFIG = "OTEL_METRIC_EXPORT_INTERVAL" +OTEL_LOGS_EXPORTER = "OTEL_LOGS_EXPORTER" DEFAULT_METRIC_EXPORT_INTERVAL = 60000.0 AWS_LAMBDA_FUNCTION_NAME_CONFIG = "AWS_LAMBDA_FUNCTION_NAME" AWS_XRAY_DAEMON_ADDRESS_CONFIG = "AWS_XRAY_DAEMON_ADDRESS" @@ -122,6 +123,24 @@ class AwsOpenTelemetryConfigurator(_OTelSDKConfigurator): # pylint: disable=no-self-use @override def _configure(self, **kwargs): + + print(f"OTEL_EXPORTER_OTLP_LOGS_HEADERS: {os.environ.get('OTEL_EXPORTER_OTLP_LOGS_HEADERS', 'Not set')}") + print(f"OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED: {os.environ.get('OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED', 'Not set')}") + print(f"OTEL_METRICS_EXPORTER: {os.environ.get('OTEL_METRICS_EXPORTER', 'Not set')}") + print(f"OTEL_TRACES_EXPORTER: {os.environ.get('OTEL_TRACES_EXPORTER', 'Not set')}") + print(f"OTEL_LOGS_EXPORTER: {os.environ.get('OTEL_LOGS_EXPORTER', 'Not set')}") + print(f"OTEL_PYTHON_DISTRO: {os.environ.get('OTEL_PYTHON_DISTRO', 'Not set')}") + print(f"OTEL_PYTHON_CONFIGURATOR: {os.environ.get('OTEL_PYTHON_CONFIGURATOR', 'Not set')}") + print(f"OTEL_EXPORTER_OTLP_PROTOCOL: {os.environ.get('OTEL_EXPORTER_OTLP_PROTOCOL', 'Not set')}") + print(f"OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: {os.environ.get('OTEL_EXPORTER_OTLP_TRACES_ENDPOINT', 'Not set')}") + print(f"OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: {os.environ.get('OTEL_EXPORTER_OTLP_LOGS_ENDPOINT', 'Not set')}") + print(f"OTEL_RESOURCE_ATTRIBUTES: {os.environ.get('OTEL_RESOURCE_ATTRIBUTES', 'Not set')}") + print(f"AGENT_OBSERVABILITY_ENABLED: {os.environ.get('AGENT_OBSERVABILITY_ENABLED', 'Not set')}") + print(f"AWS_CLOUDWATCH_LOG_GROUP: {os.environ.get('AWS_CLOUDWATCH_LOG_GROUP', 'Not set')}") + print(f"AWS_CLOUDWATCH_LOG_STREAM: {os.environ.get('AWS_CLOUDWATCH_LOG_STREAM', 'Not set')}") + print(f"OTEL_PYTHON_DISABLED_INSTRUMENTATIONS: {os.environ.get('OTEL_PYTHON_DISABLED_INSTRUMENTATIONS', 'Not set')}") + print(f"PYTHONPATH: {os.environ.get('PYTHONPATH', 'Not set')}") + if _is_defer_to_workers_enabled() and _is_wsgi_master_process(): _logger.info( "Skipping ADOT initialization since deferral to worker is enabled, and this is a master process." @@ -174,6 +193,9 @@ def _initialize_components(): resource=resource, ) _init_metrics(metric_exporters, resource) + logging_enabled = os.getenv(_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED, "false") + if logging_enabled.strip().lower() == "true": + _init_logging(log_exporters, resource) def _init_logging( @@ -181,9 +203,9 @@ def _init_logging( resource: Resource = None, ): - # Provides a default OTLP log exporter when none is specified. + # Provides a default OTLP log exporter when the environment is not set. # This is the behavior for the logs exporters for other languages. - if not exporters: + if not exporters and os.environ.get(OTEL_LOGS_EXPORTER) == None: exporters = {"otlp": OTLPLogExporter} provider = LoggerProvider(resource=resource) @@ -192,7 +214,7 @@ def _init_logging( for _, exporter_class in exporters.items(): exporter_args: Dict[str, any] = {} log_exporter = _customize_logs_exporter(exporter_class(**exporter_args), resource) - provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter)) + provider.add_log_record_processor(AwsBatchLogRecordProcessor(exporter=log_exporter)) handler = LoggingHandler(level=NOTSET, logger_provider=provider) @@ -363,15 +385,7 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) -> _logger.info("Detected using AWS OTLP Traces Endpoint.") if isinstance(span_exporter, OTLPSpanExporter): - if is_agent_observability_enabled(): - logs_endpoint = os.getenv(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT) - logs_exporter = OTLPAwsLogExporter(endpoint=logs_endpoint) - span_exporter = OTLPAwsSpanExporter( - endpoint=traces_endpoint, - logger_provider=get_logger_provider() - ) - else: - span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint) + span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint) else: _logger.warning( @@ -636,4 +650,4 @@ def create_exporter(self): endpoint=application_signals_endpoint, preferred_temporality=temporality_dict ) - raise RuntimeError(f"Unsupported AWS Application Signals export protocol: {protocol} ") + raise RuntimeError(f"Unsupported AWS Application Signals export protocol: {protocol} ") \ No newline at end of file diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/common/constants.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/common/constants.py new file mode 100644 index 000000000..5fa75cc7d --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/common/constants.py @@ -0,0 +1,2 @@ +BASE_LOG_BUFFER_BYTE_SIZE = 450000 +MAX_LOG_REQUEST_BYTE_SIZE = 1048576 # https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html \ No newline at end of file diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/aws_batch_log_record_processor.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/aws_batch_log_record_processor.py new file mode 100644 index 000000000..a55d41253 --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/aws_batch_log_record_processor.py @@ -0,0 +1,134 @@ +from time import sleep +import json +import logging +import os +import threading +from typing import Mapping, Sequence +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor +from amazon.opentelemetry.distro.exporter.otlp.aws.common.constants import MAX_LOG_REQUEST_BYTE_SIZE, BASE_LOG_BUFFER_BYTE_SIZE +from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter +from opentelemetry.sdk._logs._internal.export import BatchLogExportStrategy, detach, attach, set_value, _SUPPRESS_INSTRUMENTATION_KEY + +from opentelemetry.sdk._logs import LogData +from opentelemetry.util.types import AnyValue + +_logger = logging.getLogger(__name__) + +class AwsBatchLogRecordProcessor(BatchLogRecordProcessor): + + def __init__( + self, + exporter: OTLPAwsLogExporter, + schedule_delay_millis: float | None = None, + max_export_batch_size: int | None = None, + export_timeout_millis: float | None = None, + max_queue_size: int | None = None + ): + + super().__init__( + exporter=exporter, + schedule_delay_millis=schedule_delay_millis, + max_export_batch_size=max_export_batch_size, + export_timeout_millis=export_timeout_millis, + max_queue_size=max_queue_size + ) + + # Code based off of: + # https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L143 + def _export(self, batch_strategy: BatchLogExportStrategy) -> None: + """ + Overrides the batching behavior of upstream's export method. Preserves existing batching behavior but + will intermediarly export small log batches if the size of the data in the batch is at or above AWS CloudWatch's maximum request size limit + of 1 MB. + + - Data size of exported batches will ALWAYS be <= 1 MB except for the case below: + - If the data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1 + """ + + with self._export_lock: + iteration = 0 + # We could see concurrent export calls from worker and force_flush. We call _should_export_batch + # once the lock is obtained to see if we still need to make the requested export. + while self._should_export_batch(batch_strategy, iteration): + + iteration += 1 + token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + try: + batch_length = min(self._max_export_batch_size, len(self._queue)) + batch_data_size = 0 + batch = [] + + for _ in range(batch_length): + + log_data = self._queue.pop() + log_size = self._get_size_of_log(log_data) + + if batch and (batch_data_size + log_size > MAX_LOG_REQUEST_BYTE_SIZE): + # if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE then len(batch) == 1 + if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE: + self._exporter.set_gen_ai_flag() + + self._exporter.export(batch) + batch_data_size = 0 + batch = [] + + batch_data_size += log_size + batch.append(log_data) + + if batch: + # if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE then len(batch) == 1 + if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE: + self._exporter.set_gen_ai_flag() + + self._exporter.export(batch) + except Exception: # pylint: disable=broad-exception-caught + _logger.exception("Exception while exporting logs.") + detach(token) + + def _get_size_of_log(self, log_data: LogData) -> int: + """ + Estimates the size of a given LogData based on the size of the body + a buffer amount representing a rough guess of other data present + in the log. + """ + size = BASE_LOG_BUFFER_BYTE_SIZE + body = log_data.log_record.body + + if body: + size += self._get_size_of_any_value(body) + + return size + + def _get_size_of_any_value(self, val: AnyValue) -> int: + """ + Recursively calculates the size of an AnyValue type in bytes. + """ + size = 0 + + if isinstance(val, str) or isinstance(val, bytes): + return len(val) + + if isinstance(val, bool): + if val: + return 4 #len(True) = 4 + return 5 #len(False) = 5 + + if isinstance(val, int) or isinstance(val, float): + return len(str(val)) + + if isinstance(val, Sequence): + for content in val: + size += self._get_size_of_any_value(content) + + if isinstance(val, Mapping): + for _, content in val.items(): + size += self._get_size_of_any_value(content) + + return size + + + + + + + + \ No newline at end of file diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py index 048632c06..8937dafae 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py @@ -1,14 +1,36 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 -from typing import Dict, Optional +import gzip +from io import BytesIO +import logging +from time import sleep +from typing import Dict, Mapping, Optional, Sequence + +from email.utils import parsedate_to_datetime +from datetime import datetime from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession +from amazon.opentelemetry.distro.exporter.otlp.aws.common.constants import MAX_LOG_REQUEST_BYTE_SIZE, BASE_LOG_BUFFER_BYTE_SIZE from opentelemetry.exporter.otlp.proto.http import Compression -from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter +from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter, _create_exp_backoff_generator +from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs +from opentelemetry.sdk._logs.export import ( + LogExportResult, +) +from opentelemetry.sdk._logs import ( + LogData, +) + +import requests + +_logger = logging.getLogger(__name__) class OTLPAwsLogExporter(OTLPLogExporter): + _LARGE_LOG_HEADER = {'x-aws-log-semantics': 'otel'} + _RETRY_AFTER_HEADER = 'Retry-After' + def __init__( self, endpoint: Optional[str] = None, @@ -18,8 +40,9 @@ def __init__( headers: Optional[Dict[str, str]] = None, timeout: Optional[int] = None, ): + self._gen_ai_flag = False self._aws_region = None - + if endpoint: self._aws_region = endpoint.split(".")[1] @@ -34,3 +57,129 @@ def __init__( compression=Compression.Gzip, session=AwsAuthSession(aws_region=self._aws_region, service="logs"), ) + + def export(self, batch: Sequence[LogData]) -> LogExportResult: + print(f"Exporting batch of {len(batch)} logs") + print("TOTAL DATA SIZE " + str(sum(self._get_size_of_log(logz) for logz in batch))) + print("GEN_AI_FLAG " + str(self._gen_ai_flag)) + + return super().export(batch) + + def set_gen_ai_flag(self): + self._gen_ai_flag = True + + @staticmethod + def _retryable(resp: requests.Response) -> bool: + if resp.status_code == 429 or resp.status_code == 503: + return True + + return OTLPLogExporter._retryable(resp) + + def _export(self, serialized_data: bytes) -> requests.Response: + """ + Exports the given serialized OTLP log data. Behaviors of how this export will work. + + 1. Always compresses the serialized data into gzip before sending. + + 2. If self._gen_ai_flag is enabled, the log data is > 1 MB and we assume that the log contains normalized gen.ai attributes. + - in this case we inject the 'x-aws-log-semantics' flag into the header. + + 3. Retry behavior is now the following: + - if the response contains a status code that is retryable and the response contains Retry-After in its headers, + the serialized data will be exported after that set delay + + - if the reponse does not contain that Retry-After header, default back to the current iteration of the + exponential backoff delay + """ + gzip_data = BytesIO() + with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: + gzip_stream.write(serialized_data) + + data = gzip_data.getvalue() + + def send(): + try: + return self._session.post( + url=self._endpoint, + headers=self._LARGE_LOG_HEADER if self._gen_ai_flag else None, + data=data, + verify=self._certificate_file, + timeout=self._timeout, + cert=self._client_cert, + ) + except ConnectionError: + return self._session.post( + url=self._endpoint, + headers=self._LARGE_LOG_HEADER if self._gen_ai_flag else None, + data=data, + verify=self._certificate_file, + timeout=self._timeout, + cert=self._client_cert, + ) + + backoff = list(_create_exp_backoff_generator(self._MAX_RETRY_TIMEOUT)) + + while True: + resp = send() + + if not self._retryable(resp) or not backoff: + return resp + + retry_after = resp.headers.get(self._RETRY_AFTER_HEADER, None) + delay = backoff.pop(0) if retry_after == None else self._parse_retryable_header(retry_after) + + _logger.warning( + "Transient error %s encountered while exporting logs batch, retrying in %ss.", + resp.reason, + delay, + ) + + sleep(delay) + continue + + + def _parse_retryable_header(self, retry_header: str) -> float: + "Converts the given retryable header into a delay in seconds, returns -1 if there's an error with the parsing" + try: + return float(retry_header) + except ValueError: + return -1.0 + + def _get_size_of_log(self, log_data: LogData): + # Rough estimate of the size of the LogData based on size of the content body + a buffer to account for other information in logs. + size = BASE_LOG_BUFFER_BYTE_SIZE + body = log_data.log_record.body + + if body: + size += self._get_size_of_any_value(log_data.log_record.body) + + return size + + def _get_size_of_any_value(self, val) -> int: + size = 0 + + if isinstance(val, str) or isinstance(val, bytes): + return len(val) + + if isinstance(val, bool): + if val: + return 4 #len(True) = 4 + return 5 #len(False) = 5 + + if isinstance(val, int) or isinstance(val, float): + return len(str(val)) + + if isinstance(val, Sequence): + for content in val: + size += self._get_size_of_any_value(content) + + if isinstance(val, Mapping): + for _, content in val.items(): + size += self._get_size_of_any_value(content) + + return size + + + + + diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py index 7defb5d47..7b46d47ba 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py @@ -52,4 +52,4 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: llo_processed_spans = self._llo_handler.process_spans(spans) return super().export(llo_processed_spans) - return super().export(spans) + return super().export(spans) \ No newline at end of file diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py index 4c3706d96..63a6641e9 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py @@ -678,4 +678,4 @@ def _get_gen_ai_event(self, name, span_ctx, timestamp, attributes, body): trace_id=span_ctx.trace_id, span_id=span_ctx.span_id, trace_flags=span_ctx.trace_flags, - ) + ) \ No newline at end of file From b4c340e2e33983166c6c20b6c263b13927b4a107 Mon Sep 17 00:00:00 2001 From: liustve Date: Sun, 18 May 2025 00:24:38 +0000 Subject: [PATCH 3/5] merge --- .../src/amazon/opentelemetry/distro/_utils.py | 2 + .../distro/aws_opentelemetry_configurator.py | 43 ++-- .../exporter/otlp/aws/common/constants.py | 6 +- .../logs/aws_batch_log_record_processor.py | 98 ++++---- .../otlp/aws/logs/otlp_aws_logs_exporter.py | 226 +++++++++++------- .../otlp/aws/traces/otlp_aws_span_exporter.py | 11 +- .../opentelemetry/distro/llo_handler.py | 37 ++- .../distro/patches/_bedrock_patches.py | 4 + .../opentelemetry/distro/test_llo_handler.py | 115 +++++++++ 9 files changed, 370 insertions(+), 172 deletions(-) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py index 011a1d19d..f550ffeb3 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py @@ -11,6 +11,7 @@ AGENT_OBSERVABILITY_ENABLED = "AGENT_OBSERVABILITY_ENABLED" + def is_installed(req: str) -> bool: """Is the given required package installed?""" @@ -24,5 +25,6 @@ def is_installed(req: str) -> bool: return False return True + def is_agent_observability_enabled() -> bool: return os.environ.get(AGENT_OBSERVABILITY_ENABLED, "false").lower() == "true" diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py index 8809455ad..7f46ab3f9 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py @@ -3,15 +3,15 @@ # Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License. import os import re -from logging import NOTSET, CRITICAL, Logger, getLogger +from logging import NOTSET, Logger, getLogger from typing import ClassVar, Dict, List, Type, Union from importlib_metadata import version from typing_extensions import override from amazon.opentelemetry.distro._aws_attribute_keys import AWS_LOCAL_SERVICE -from amazon.opentelemetry.distro._utils import is_agent_observability_enabled from amazon.opentelemetry.distro._aws_resource_attribute_configurator import get_service_attribute +from amazon.opentelemetry.distro._utils import is_agent_observability_enabled from amazon.opentelemetry.distro.always_record_sampler import AlwaysRecordSampler from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import ( AttributePropagatingSpanProcessorBuilder, @@ -22,13 +22,14 @@ AwsMetricAttributesSpanExporterBuilder, ) from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder +from amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor import AwsBatchLogRecordProcessor from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader from amazon.opentelemetry.distro.scope_based_filtering_view import ScopeBasedRetainingView -from opentelemetry._logs import set_logger_provider, get_logger_provider +from opentelemetry._logs import get_logger_provider, set_logger_provider from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter @@ -123,24 +124,6 @@ class AwsOpenTelemetryConfigurator(_OTelSDKConfigurator): # pylint: disable=no-self-use @override def _configure(self, **kwargs): - - print(f"OTEL_EXPORTER_OTLP_LOGS_HEADERS: {os.environ.get('OTEL_EXPORTER_OTLP_LOGS_HEADERS', 'Not set')}") - print(f"OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED: {os.environ.get('OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED', 'Not set')}") - print(f"OTEL_METRICS_EXPORTER: {os.environ.get('OTEL_METRICS_EXPORTER', 'Not set')}") - print(f"OTEL_TRACES_EXPORTER: {os.environ.get('OTEL_TRACES_EXPORTER', 'Not set')}") - print(f"OTEL_LOGS_EXPORTER: {os.environ.get('OTEL_LOGS_EXPORTER', 'Not set')}") - print(f"OTEL_PYTHON_DISTRO: {os.environ.get('OTEL_PYTHON_DISTRO', 'Not set')}") - print(f"OTEL_PYTHON_CONFIGURATOR: {os.environ.get('OTEL_PYTHON_CONFIGURATOR', 'Not set')}") - print(f"OTEL_EXPORTER_OTLP_PROTOCOL: {os.environ.get('OTEL_EXPORTER_OTLP_PROTOCOL', 'Not set')}") - print(f"OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: {os.environ.get('OTEL_EXPORTER_OTLP_TRACES_ENDPOINT', 'Not set')}") - print(f"OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: {os.environ.get('OTEL_EXPORTER_OTLP_LOGS_ENDPOINT', 'Not set')}") - print(f"OTEL_RESOURCE_ATTRIBUTES: {os.environ.get('OTEL_RESOURCE_ATTRIBUTES', 'Not set')}") - print(f"AGENT_OBSERVABILITY_ENABLED: {os.environ.get('AGENT_OBSERVABILITY_ENABLED', 'Not set')}") - print(f"AWS_CLOUDWATCH_LOG_GROUP: {os.environ.get('AWS_CLOUDWATCH_LOG_GROUP', 'Not set')}") - print(f"AWS_CLOUDWATCH_LOG_STREAM: {os.environ.get('AWS_CLOUDWATCH_LOG_STREAM', 'Not set')}") - print(f"OTEL_PYTHON_DISABLED_INSTRUMENTATIONS: {os.environ.get('OTEL_PYTHON_DISABLED_INSTRUMENTATIONS', 'Not set')}") - print(f"PYTHONPATH: {os.environ.get('PYTHONPATH', 'Not set')}") - if _is_defer_to_workers_enabled() and _is_wsgi_master_process(): _logger.info( "Skipping ADOT initialization since deferral to worker is enabled, and this is a master process." @@ -193,9 +176,6 @@ def _initialize_components(): resource=resource, ) _init_metrics(metric_exporters, resource) - logging_enabled = os.getenv(_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED, "false") - if logging_enabled.strip().lower() == "true": - _init_logging(log_exporters, resource) def _init_logging( @@ -205,7 +185,7 @@ def _init_logging( # Provides a default OTLP log exporter when the environment is not set. # This is the behavior for the logs exporters for other languages. - if not exporters and os.environ.get(OTEL_LOGS_EXPORTER) == None: + if not exporters and os.environ.get(OTEL_LOGS_EXPORTER) is None: exporters = {"otlp": OTLPLogExporter} provider = LoggerProvider(resource=resource) @@ -214,7 +194,11 @@ def _init_logging( for _, exporter_class in exporters.items(): exporter_args: Dict[str, any] = {} log_exporter = _customize_logs_exporter(exporter_class(**exporter_args), resource) - provider.add_log_record_processor(AwsBatchLogRecordProcessor(exporter=log_exporter)) + + if isinstance(log_exporter, OTLPAwsLogExporter): + provider.add_log_record_processor(AwsBatchLogRecordProcessor(exporter=log_exporter)) + else: + provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter)) handler = LoggingHandler(level=NOTSET, logger_provider=provider) @@ -385,7 +369,10 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) -> _logger.info("Detected using AWS OTLP Traces Endpoint.") if isinstance(span_exporter, OTLPSpanExporter): - span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint) + if is_agent_observability_enabled(): + span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint, logger_provider=get_logger_provider()) + else: + span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint) else: _logger.warning( @@ -650,4 +637,4 @@ def create_exporter(self): endpoint=application_signals_endpoint, preferred_temporality=temporality_dict ) - raise RuntimeError(f"Unsupported AWS Application Signals export protocol: {protocol} ") \ No newline at end of file + raise RuntimeError(f"Unsupported AWS Application Signals export protocol: {protocol} ") diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/common/constants.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/common/constants.py index 5fa75cc7d..a78e92259 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/common/constants.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/common/constants.py @@ -1,2 +1,4 @@ -BASE_LOG_BUFFER_BYTE_SIZE = 450000 -MAX_LOG_REQUEST_BYTE_SIZE = 1048576 # https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html \ No newline at end of file +BASE_LOG_BUFFER_BYTE_SIZE = 2000 +MAX_LOG_REQUEST_BYTE_SIZE = ( + 1048576 # https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html +) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/aws_batch_log_record_processor.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/aws_batch_log_record_processor.py index a55d41253..90948a3f6 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/aws_batch_log_record_processor.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/aws_batch_log_record_processor.py @@ -1,47 +1,51 @@ -from time import sleep -import json import logging -import os -import threading from typing import Mapping, Sequence -from opentelemetry.sdk._logs.export import BatchLogRecordProcessor -from amazon.opentelemetry.distro.exporter.otlp.aws.common.constants import MAX_LOG_REQUEST_BYTE_SIZE, BASE_LOG_BUFFER_BYTE_SIZE -from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter -from opentelemetry.sdk._logs._internal.export import BatchLogExportStrategy, detach, attach, set_value, _SUPPRESS_INSTRUMENTATION_KEY +from amazon.opentelemetry.distro.exporter.otlp.aws.common.constants import ( + BASE_LOG_BUFFER_BYTE_SIZE, + MAX_LOG_REQUEST_BYTE_SIZE, +) +from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter from opentelemetry.sdk._logs import LogData +from opentelemetry.sdk._logs._internal.export import ( + _SUPPRESS_INSTRUMENTATION_KEY, + BatchLogExportStrategy, + attach, + detach, + set_value, +) +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor from opentelemetry.util.types import AnyValue _logger = logging.getLogger(__name__) + class AwsBatchLogRecordProcessor(BatchLogRecordProcessor): - def __init__( + def __init__( self, exporter: OTLPAwsLogExporter, schedule_delay_millis: float | None = None, max_export_batch_size: int | None = None, export_timeout_millis: float | None = None, - max_queue_size: int | None = None - ): + max_queue_size: int | None = None, + ): super().__init__( exporter=exporter, schedule_delay_millis=schedule_delay_millis, max_export_batch_size=max_export_batch_size, export_timeout_millis=export_timeout_millis, - max_queue_size=max_queue_size + max_queue_size=max_queue_size, ) - # Code based off of: # https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L143 def _export(self, batch_strategy: BatchLogExportStrategy) -> None: """ - Overrides the batching behavior of upstream's export method. Preserves existing batching behavior but - will intermediarly export small log batches if the size of the data in the batch is at or above AWS CloudWatch's maximum request size limit - of 1 MB. + Preserves existing batching behavior but will intermediarly export small log batches if the size of the data in the batch is at or + above AWS CloudWatch's maximum request size limit of 1 MB. - - Data size of exported batches will ALWAYS be <= 1 MB except for the case below: + - Data size of exported batches will ALWAYS be <= 1 MB except for the case below: - If the data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1 """ @@ -57,78 +61,70 @@ def _export(self, batch_strategy: BatchLogExportStrategy) -> None: batch_length = min(self._max_export_batch_size, len(self._queue)) batch_data_size = 0 batch = [] - + for _ in range(batch_length): - + log_data = self._queue.pop() log_size = self._get_size_of_log(log_data) - + if batch and (batch_data_size + log_size > MAX_LOG_REQUEST_BYTE_SIZE): # if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE then len(batch) == 1 if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE: self._exporter.set_gen_ai_flag() - + self._exporter.export(batch) batch_data_size = 0 batch = [] - + batch_data_size += log_size batch.append(log_data) - + if batch: # if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE then len(batch) == 1 if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE: self._exporter.set_gen_ai_flag() - + self._exporter.export(batch) except Exception: # pylint: disable=broad-exception-caught _logger.exception("Exception while exporting logs.") - detach(token) - - def _get_size_of_log(self, log_data: LogData) -> int: + detach(token) + + @staticmethod + def _get_size_of_log(log_data: LogData) -> int: """ - Estimates the size of a given LogData based on the size of the body + a buffer amount representing a rough guess of other data present - in the log. + Estimates the size of a given LogData based on the size of the body + a buffer + amount representing a rough guess of other data present in the log. """ size = BASE_LOG_BUFFER_BYTE_SIZE body = log_data.log_record.body - + if body: - size += self._get_size_of_any_value(body) + size += AwsBatchLogRecordProcessor._get_size_of_any_value(body) return size - def _get_size_of_any_value(self, val: AnyValue) -> int: + @staticmethod + def _get_size_of_any_value(val: AnyValue) -> int: """ Recursively calculates the size of an AnyValue type in bytes. """ size = 0 - + if isinstance(val, str) or isinstance(val, bytes): return len(val) - + if isinstance(val, bool): - if val: - return 4 #len(True) = 4 - return 5 #len(False) = 5 - + return 4 if val else 5 + if isinstance(val, int) or isinstance(val, float): return len(str(val)) - + if isinstance(val, Sequence): for content in val: - size += self._get_size_of_any_value(content) - + size += AwsBatchLogRecordProcessor._get_size_of_any_value(content) + if isinstance(val, Mapping): for _, content in val.items(): - size += self._get_size_of_any_value(content) - - return size - - + size += AwsBatchLogRecordProcessor._get_size_of_any_value(content) - - - - - \ No newline at end of file + return size diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py index 8937dafae..187e6d9c3 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py @@ -2,35 +2,33 @@ # SPDX-License-Identifier: Apache-2.0 import gzip -from io import BytesIO import logging +from io import BytesIO from time import sleep from typing import Dict, Mapping, Optional, Sequence -from email.utils import parsedate_to_datetime -from datetime import datetime +import requests from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession -from amazon.opentelemetry.distro.exporter.otlp.aws.common.constants import MAX_LOG_REQUEST_BYTE_SIZE, BASE_LOG_BUFFER_BYTE_SIZE +from amazon.opentelemetry.distro.exporter.otlp.aws.common.constants import BASE_LOG_BUFFER_BYTE_SIZE +from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs from opentelemetry.exporter.otlp.proto.http import Compression from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter, _create_exp_backoff_generator -from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs -from opentelemetry.sdk._logs.export import ( - LogExportResult, -) from opentelemetry.sdk._logs import ( LogData, ) - -import requests - +from opentelemetry.sdk._logs.export import ( + LogExportResult, +) _logger = logging.getLogger(__name__) + class OTLPAwsLogExporter(OTLPLogExporter): - _LARGE_LOG_HEADER = {'x-aws-log-semantics': 'otel'} - _RETRY_AFTER_HEADER = 'Retry-After' - + COUNT = 0 + _LARGE_LOG_HEADER = {"x-aws-log-semantics": "otel"} + _RETRY_AFTER_HEADER = "Retry-After" # https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling + def __init__( self, endpoint: Optional[str] = None, @@ -42,7 +40,7 @@ def __init__( ): self._gen_ai_flag = False self._aws_region = None - + if endpoint: self._aws_region = endpoint.split(".")[1] @@ -57,76 +55,91 @@ def __init__( compression=Compression.Gzip, session=AwsAuthSession(aws_region=self._aws_region, service="logs"), ) - + + # Code based off of: + # https://github.com/open-telemetry/opentelemetry-python/blob/main/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py#L167 def export(self, batch: Sequence[LogData]) -> LogExportResult: + print(f"Exporting batch of {len(batch)} logs") print("TOTAL DATA SIZE " + str(sum(self._get_size_of_log(logz) for logz in batch))) - print("GEN_AI_FLAG " + str(self._gen_ai_flag)) + self.COUNT += len(batch) + print("COUNT " + str(self.COUNT)) - return super().export(batch) - - def set_gen_ai_flag(self): - self._gen_ai_flag = True + """ + Exports the given batch of OTLP log data. + Behaviors of how this export will work - - @staticmethod - def _retryable(resp: requests.Response) -> bool: - if resp.status_code == 429 or resp.status_code == 503: - return True + 1. Always compresses the serialized data into gzip before sending. - return OTLPLogExporter._retryable(resp) + 2. If self._gen_ai_flag is enabled, the log data is > 1 MB a + and the assumption is that the log is a normalized gen.ai LogEvent. + - inject the 'x-aws-log-semantics' flag into the header. - def _export(self, serialized_data: bytes) -> requests.Response: - """ - Exports the given serialized OTLP log data. Behaviors of how this export will work. + 3. Retry behavior is now the following: + - if the response contains a status code that is retryable and the response contains Retry-After in its + headers, the serialized data will be exported after that set delay - 1. Always compresses the serialized data into gzip before sending. - - 2. If self._gen_ai_flag is enabled, the log data is > 1 MB and we assume that the log contains normalized gen.ai attributes. - - in this case we inject the 'x-aws-log-semantics' flag into the header. - - 3. Retry behavior is now the following: - - if the response contains a status code that is retryable and the response contains Retry-After in its headers, - the serialized data will be exported after that set delay - - - if the reponse does not contain that Retry-After header, default back to the current iteration of the + - if the response does not contain that Retry-After header, default back to the current iteration of the exponential backoff delay """ + + if self._shutdown: + _logger.warning("Exporter already shutdown, ignoring batch") + return LogExportResult.FAILURE + + serialized_data = encode_logs(batch).SerializeToString() + gzip_data = BytesIO() with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: gzip_stream.write(serialized_data) - + data = gzip_data.getvalue() - def send(): + backoff = _create_exp_backoff_generator(max_value=self._MAX_RETRY_TIMEOUT) + + while True: + resp = self._send(data) + + print(f"Response status: {resp.status_code}") + print(f"Response headers: {resp.headers}") try: - return self._session.post( - url=self._endpoint, - headers=self._LARGE_LOG_HEADER if self._gen_ai_flag else None, - data=data, - verify=self._certificate_file, - timeout=self._timeout, - cert=self._client_cert, + print(f"Response body: {resp.text}") + except: + print("Could not print response body") + + if resp.ok: + return LogExportResult.SUCCESS + + if not self._retryable(resp): + _logger.error( + "Failed to export logs batch code: %s, reason: %s", + resp.status_code, + resp.text, ) - except ConnectionError: - return self._session.post( - url=self._endpoint, - headers=self._LARGE_LOG_HEADER if self._gen_ai_flag else None, - data=data, - verify=self._certificate_file, - timeout=self._timeout, - cert=self._client_cert, + self._gen_ai_flag = False + return LogExportResult.FAILURE + + # https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling + maybe_retry_after = resp.headers.get(self._RETRY_AFTER_HEADER, None) + + # Set the next retry delay to the value of the Retry-After response in the headers. + # If Retry-After is not present in the headers, default to the next iteration of the + # exponential backoff strategy. + + delay = self._parse_retryable_header(maybe_retry_after) + + if delay == -1: + delay = next(backoff, self._MAX_RETRY_TIMEOUT) + + if delay == self._MAX_RETRY_TIMEOUT: + _logger.error( + "Transient error %s encountered while exporting logs batch. " + "No Retry-After header found and all backoff retries exhausted. " + "Logs will not be exported.", + resp.reason, ) - - backoff = list(_create_exp_backoff_generator(self._MAX_RETRY_TIMEOUT)) - - while True: - resp = send() - - if not self._retryable(resp) or not backoff: - return resp - - retry_after = resp.headers.get(self._RETRY_AFTER_HEADER, None) - delay = backoff.pop(0) if retry_after == None else self._parse_retryable_header(retry_after) + self._gen_ai_flag = False + return LogExportResult.FAILURE _logger.warning( "Transient error %s encountered while exporting logs batch, retrying in %ss.", @@ -134,22 +147,64 @@ def send(): delay, ) - sleep(delay) - continue - + sleep(delay) + + def set_gen_ai_flag(self): + """ + Sets the gen_ai flag to true to signal injecting the LLO flag to the headers of the export request. + """ + self._gen_ai_flag = True + + def _send(self, serialized_data: bytes): + try: + return self._session.post( + url=self._endpoint, + headers=self._LARGE_LOG_HEADER if self._gen_ai_flag else None, + data=serialized_data, + verify=self._certificate_file, + timeout=self._timeout, + cert=self._client_cert, + ) + except ConnectionError: + return self._session.post( + url=self._endpoint, + headers=self._LARGE_LOG_HEADER if self._gen_ai_flag else None, + data=serialized_data, + verify=self._certificate_file, + timeout=self._timeout, + cert=self._client_cert, + ) + + @staticmethod + def _retryable(resp: requests.Response) -> bool: + """ + Is it a retryable response? + """ + if resp.status_code == 429 or resp.status_code == 503: + return True + + return OTLPLogExporter._retryable(resp) + + def _parse_retryable_header(self, retry_header: Optional[str]) -> float: + """ + Converts the given retryable header into a delay in seconds, returns -1 if there's no header + or error with the parsing + """ + + if not retry_header: + return -1 - def _parse_retryable_header(self, retry_header: str) -> float: - "Converts the given retryable header into a delay in seconds, returns -1 if there's an error with the parsing" try: return float(retry_header) except ValueError: - return -1.0 + return -1 def _get_size_of_log(self, log_data: LogData): - # Rough estimate of the size of the LogData based on size of the content body + a buffer to account for other information in logs. + # Rough estimate of the size of the LogData based on size of + # the content body + a buffer to account for other information in logs. size = BASE_LOG_BUFFER_BYTE_SIZE body = log_data.log_record.body - + if body: size += self._get_size_of_any_value(log_data.log_record.body) @@ -157,29 +212,24 @@ def _get_size_of_log(self, log_data: LogData): def _get_size_of_any_value(self, val) -> int: size = 0 - + if isinstance(val, str) or isinstance(val, bytes): return len(val) - + if isinstance(val, bool): if val: - return 4 #len(True) = 4 - return 5 #len(False) = 5 - + return 4 # len(True) = 4 + return 5 # len(False) = 5 + if isinstance(val, int) or isinstance(val, float): return len(str(val)) - + if isinstance(val, Sequence): for content in val: size += self._get_size_of_any_value(content) - + if isinstance(val, Mapping): for _, content in val.items(): size += self._get_size_of_any_value(content) - - return size - - - - + return size diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py index 7b46d47ba..537fa6291 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py @@ -3,12 +3,17 @@ from typing import Dict, Optional, Sequence +from amazon.opentelemetry.distro._utils import is_agent_observability_enabled from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession from amazon.opentelemetry.distro.llo_handler import LLOHandler +<<<<<<< HEAD from amazon.opentelemetry.distro._utils import is_agent_observability_enabled from opentelemetry.sdk._logs import LoggerProvider +======= +>>>>>>> 770f906 (add custom batch export) from opentelemetry.exporter.otlp.proto.http import Compression from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk._logs import LoggerProvider from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExportResult @@ -25,7 +30,11 @@ def __init__( headers: Optional[Dict[str, str]] = None, timeout: Optional[int] = None, compression: Optional[Compression] = None, +<<<<<<< HEAD logger_provider: Optional[LoggerProvider] = None +======= + logger_provider: Optional[LoggerProvider] = None, +>>>>>>> 770f906 (add custom batch export) ): self._aws_region = None @@ -52,4 +61,4 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: llo_processed_spans = self._llo_handler.process_spans(spans) return super().export(llo_processed_spans) - return super().export(spans) \ No newline at end of file + return super().export(spans) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py index 63a6641e9..3ef3ef935 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py @@ -1,12 +1,17 @@ import logging import re +from typing import Any, Dict, List, Optional, Sequence +<<<<<<< HEAD from typing import Any, Dict, List, Optional, Sequence from opentelemetry.attributes import BoundedAttributes +======= +>>>>>>> 770f906 (add custom batch export) from opentelemetry._events import Event -from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.attributes import BoundedAttributes from opentelemetry.sdk._events import EventLoggerProvider +<<<<<<< HEAD from opentelemetry.sdk.trace import ReadableSpan, Event as SpanEvent # Message event types @@ -23,6 +28,30 @@ OPENLIT_COMPLETION = "gen_ai.completion" OPENLIT_REVISED_PROMPT = "gen_ai.content.revised_prompt" +# Roles +ROLE_SYSTEM = "system" +ROLE_USER = "user" +ROLE_ASSISTANT = "assistant" +======= +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk.trace import Event as SpanEvent +from opentelemetry.sdk.trace import ReadableSpan +>>>>>>> 770f906 (add custom batch export) + +# Message event types +GEN_AI_SYSTEM_MESSAGE = "gen_ai.system.message" +GEN_AI_USER_MESSAGE = "gen_ai.user.message" +GEN_AI_ASSISTANT_MESSAGE = "gen_ai.assistant.message" + +# Framework-specific attribute keys +TRACELOOP_ENTITY_INPUT = "traceloop.entity.input" +TRACELOOP_ENTITY_OUTPUT = "traceloop.entity.output" +OPENINFERENCE_INPUT_VALUE = "input.value" +OPENINFERENCE_OUTPUT_VALUE = "output.value" +OPENLIT_PROMPT = "gen_ai.prompt" +OPENLIT_COMPLETION = "gen_ai.completion" +OPENLIT_REVISED_PROMPT = "gen_ai.content.revised_prompt" + # Roles ROLE_SYSTEM = "system" ROLE_USER = "user" @@ -678,4 +707,8 @@ def _get_gen_ai_event(self, name, span_ctx, timestamp, attributes, body): trace_id=span_ctx.trace_id, span_id=span_ctx.span_id, trace_flags=span_ctx.trace_flags, - ) \ No newline at end of file +<<<<<<< HEAD + ) +======= + ) +>>>>>>> 770f906 (add custom batch export) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py index 70dfe36c4..b363ecf2d 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py @@ -225,6 +225,10 @@ def on_success(self, span: Span, result: _BotoResultT, instrumentor_context=None # Currently no attributes to extract from the result pass + def on_success(self, span: Span, result: _BotoResultT, instrumentor_context=None): + # Currently no attributes to extract from the result + pass + class _BedrockExtension(_AwsSdkExtension): """ diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py index bb4ac51be..70136f441 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py @@ -1,5 +1,9 @@ from unittest import TestCase +<<<<<<< HEAD from unittest.mock import MagicMock, patch, call +======= +from unittest.mock import MagicMock, call, patch +>>>>>>> 770f906 (add custom batch export) from amazon.opentelemetry.distro.llo_handler import LLOHandler from opentelemetry._events import Event @@ -67,6 +71,7 @@ def test_is_llo_attribute_no_match(self): self.assertFalse(self.llo_handler._is_llo_attribute("some.other.attribute")) def test_is_llo_attribute_traceloop_match(self): +<<<<<<< HEAD """ Test _is_llo_attribute method with Traceloop patterns """ @@ -82,6 +87,23 @@ def test_is_llo_attribute_openlit_match(self): self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.prompt")) self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.completion")) self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.content.revised_prompt")) +======= + """ + Test _is_llo_attribute method with Traceloop patterns + """ + # Test exact matches for Traceloop attributes + self.assertTrue(self.llo_handler._is_llo_attribute("traceloop.entity.input")) + self.assertTrue(self.llo_handler._is_llo_attribute("traceloop.entity.output")) + + def test_is_llo_attribute_openlit_match(self): + """ + Test _is_llo_attribute method with OpenLit patterns + """ + # Test exact matches for direct OpenLit attributes + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.prompt")) + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.completion")) + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.content.revised_prompt")) +>>>>>>> 770f906 (add custom batch export) def test_is_llo_attribute_openinference_match(self): """ @@ -296,6 +318,7 @@ def test_extract_traceloop_events(self): self.assertEqual(output_event.timestamp, 1234567899) # end_time def test_extract_openlit_direct_prompt(self): +<<<<<<< HEAD """ Test _extract_openlit_span_event_attributes with direct prompt attribute """ @@ -395,6 +418,98 @@ def test_extract_openlit_revised_prompt(self): self.assertEqual(event.attributes["gen_ai.system"], "openlit") self.assertEqual(event.attributes["original_attribute"], "gen_ai.content.revised_prompt") self.assertEqual(event.timestamp, 1234567890) # start_time +======= + """ + Test _extract_openlit_span_event_attributes with direct prompt attribute + """ + attributes = {"gen_ai.prompt": "user direct prompt", "gen_ai.system": "openlit"} + + span = self._create_mock_span(attributes) + + events = self.llo_handler._extract_openlit_span_event_attributes(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.user.message") + self.assertEqual(event.body["content"], "user direct prompt") + self.assertEqual(event.body["role"], "user") + self.assertEqual(event.attributes["gen_ai.system"], "openlit") + self.assertEqual(event.attributes["original_attribute"], "gen_ai.prompt") + self.assertEqual(event.timestamp, 1234567890) # start_time + + def test_extract_openlit_direct_completion(self): + """ + Test _extract_openlit_span_event_attributes with direct completion attribute + """ + attributes = {"gen_ai.completion": "assistant direct completion", "gen_ai.system": "openlit"} + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + events = self.llo_handler._extract_openlit_span_event_attributes(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.assistant.message") + self.assertEqual(event.body["content"], "assistant direct completion") + self.assertEqual(event.body["role"], "assistant") + self.assertEqual(event.attributes["gen_ai.system"], "openlit") + self.assertEqual(event.attributes["original_attribute"], "gen_ai.completion") + self.assertEqual(event.timestamp, 1234567899) # end_time + + def test_extract_openlit_all_attributes(self): + """ + Test _extract_openlit_span_event_attributes with all OpenLit attributes + """ + attributes = { + "gen_ai.prompt": "user prompt", + "gen_ai.completion": "assistant response", + "gen_ai.content.revised_prompt": "revised prompt", + "gen_ai.system": "langchain", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + events = self.llo_handler._extract_openlit_span_event_attributes(span, attributes) + + self.assertEqual(len(events), 3) + + # Check that all events have the correct system + for event in events: + self.assertEqual(event.attributes["gen_ai.system"], "langchain") + + # Check we have the expected event types + event_types = {event.name for event in events} + self.assertIn("gen_ai.user.message", event_types) + self.assertIn("gen_ai.assistant.message", event_types) + self.assertIn("gen_ai.system.message", event_types) + + # Check original attributes + original_attrs = {event.attributes["original_attribute"] for event in events} + self.assertIn("gen_ai.prompt", original_attrs) + self.assertIn("gen_ai.completion", original_attrs) + self.assertIn("gen_ai.content.revised_prompt", original_attrs) + + def test_extract_openlit_revised_prompt(self): + """ + Test _extract_openlit_span_event_attributes with revised prompt attribute + """ + attributes = {"gen_ai.content.revised_prompt": "revised system prompt", "gen_ai.system": "openlit"} + + span = self._create_mock_span(attributes) + + events = self.llo_handler._extract_openlit_span_event_attributes(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.system.message") + self.assertEqual(event.body["content"], "revised system prompt") + self.assertEqual(event.body["role"], "system") + self.assertEqual(event.attributes["gen_ai.system"], "openlit") + self.assertEqual(event.attributes["original_attribute"], "gen_ai.content.revised_prompt") + self.assertEqual(event.timestamp, 1234567890) # start_time +>>>>>>> 770f906 (add custom batch export) def test_extract_openinference_direct_attributes(self): """ From b370cb0c51fdaedd7a6f4200d6a9cc9f7b0948e8 Mon Sep 17 00:00:00 2001 From: liustve Date: Sun, 18 May 2025 00:14:41 +0000 Subject: [PATCH 4/5] add custom batch export --- .../otlp/aws/traces/otlp_aws_span_exporter.py | 9 -- .../opentelemetry/distro/llo_handler.py | 33 ----- .../opentelemetry/distro/test_llo_handler.py | 124 ------------------ 3 files changed, 166 deletions(-) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py index 537fa6291..98d649fd4 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py @@ -6,11 +6,6 @@ from amazon.opentelemetry.distro._utils import is_agent_observability_enabled from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession from amazon.opentelemetry.distro.llo_handler import LLOHandler -<<<<<<< HEAD -from amazon.opentelemetry.distro._utils import is_agent_observability_enabled -from opentelemetry.sdk._logs import LoggerProvider -======= ->>>>>>> 770f906 (add custom batch export) from opentelemetry.exporter.otlp.proto.http import Compression from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk._logs import LoggerProvider @@ -30,11 +25,7 @@ def __init__( headers: Optional[Dict[str, str]] = None, timeout: Optional[int] = None, compression: Optional[Compression] = None, -<<<<<<< HEAD - logger_provider: Optional[LoggerProvider] = None -======= logger_provider: Optional[LoggerProvider] = None, ->>>>>>> 770f906 (add custom batch export) ): self._aws_region = None diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py index 3ef3ef935..b63d76393 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py @@ -2,41 +2,12 @@ import re from typing import Any, Dict, List, Optional, Sequence -<<<<<<< HEAD -from typing import Any, Dict, List, Optional, Sequence - -from opentelemetry.attributes import BoundedAttributes -======= ->>>>>>> 770f906 (add custom batch export) from opentelemetry._events import Event from opentelemetry.attributes import BoundedAttributes from opentelemetry.sdk._events import EventLoggerProvider -<<<<<<< HEAD -from opentelemetry.sdk.trace import ReadableSpan, Event as SpanEvent - -# Message event types -GEN_AI_SYSTEM_MESSAGE = "gen_ai.system.message" -GEN_AI_USER_MESSAGE = "gen_ai.user.message" -GEN_AI_ASSISTANT_MESSAGE = "gen_ai.assistant.message" - -# Framework-specific attribute keys -TRACELOOP_ENTITY_INPUT = "traceloop.entity.input" -TRACELOOP_ENTITY_OUTPUT = "traceloop.entity.output" -OPENINFERENCE_INPUT_VALUE = "input.value" -OPENINFERENCE_OUTPUT_VALUE = "output.value" -OPENLIT_PROMPT = "gen_ai.prompt" -OPENLIT_COMPLETION = "gen_ai.completion" -OPENLIT_REVISED_PROMPT = "gen_ai.content.revised_prompt" - -# Roles -ROLE_SYSTEM = "system" -ROLE_USER = "user" -ROLE_ASSISTANT = "assistant" -======= from opentelemetry.sdk._logs import LoggerProvider from opentelemetry.sdk.trace import Event as SpanEvent from opentelemetry.sdk.trace import ReadableSpan ->>>>>>> 770f906 (add custom batch export) # Message event types GEN_AI_SYSTEM_MESSAGE = "gen_ai.system.message" @@ -707,8 +678,4 @@ def _get_gen_ai_event(self, name, span_ctx, timestamp, attributes, body): trace_id=span_ctx.trace_id, span_id=span_ctx.span_id, trace_flags=span_ctx.trace_flags, -<<<<<<< HEAD - ) -======= ) ->>>>>>> 770f906 (add custom batch export) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py index 70136f441..80895ff3e 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py @@ -1,9 +1,5 @@ from unittest import TestCase -<<<<<<< HEAD -from unittest.mock import MagicMock, patch, call -======= from unittest.mock import MagicMock, call, patch ->>>>>>> 770f906 (add custom batch export) from amazon.opentelemetry.distro.llo_handler import LLOHandler from opentelemetry._events import Event @@ -71,23 +67,6 @@ def test_is_llo_attribute_no_match(self): self.assertFalse(self.llo_handler._is_llo_attribute("some.other.attribute")) def test_is_llo_attribute_traceloop_match(self): -<<<<<<< HEAD - """ - Test _is_llo_attribute method with Traceloop patterns - """ - # Test exact matches for Traceloop attributes - self.assertTrue(self.llo_handler._is_llo_attribute("traceloop.entity.input")) - self.assertTrue(self.llo_handler._is_llo_attribute("traceloop.entity.output")) - - def test_is_llo_attribute_openlit_match(self): - """ - Test _is_llo_attribute method with OpenLit patterns - """ - # Test exact matches for direct OpenLit attributes - self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.prompt")) - self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.completion")) - self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.content.revised_prompt")) -======= """ Test _is_llo_attribute method with Traceloop patterns """ @@ -103,7 +82,6 @@ def test_is_llo_attribute_openlit_match(self): self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.prompt")) self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.completion")) self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.content.revised_prompt")) ->>>>>>> 770f906 (add custom batch export) def test_is_llo_attribute_openinference_match(self): """ @@ -318,107 +296,6 @@ def test_extract_traceloop_events(self): self.assertEqual(output_event.timestamp, 1234567899) # end_time def test_extract_openlit_direct_prompt(self): -<<<<<<< HEAD - """ - Test _extract_openlit_span_event_attributes with direct prompt attribute - """ - attributes = { - "gen_ai.prompt": "user direct prompt", - "gen_ai.system": "openlit" - } - - span = self._create_mock_span(attributes) - - events = self.llo_handler._extract_openlit_span_event_attributes(span, attributes) - - self.assertEqual(len(events), 1) - event = events[0] - self.assertEqual(event.name, "gen_ai.user.message") - self.assertEqual(event.body["content"], "user direct prompt") - self.assertEqual(event.body["role"], "user") - self.assertEqual(event.attributes["gen_ai.system"], "openlit") - self.assertEqual(event.attributes["original_attribute"], "gen_ai.prompt") - self.assertEqual(event.timestamp, 1234567890) # start_time - - def test_extract_openlit_direct_completion(self): - """ - Test _extract_openlit_span_event_attributes with direct completion attribute - """ - attributes = { - "gen_ai.completion": "assistant direct completion", - "gen_ai.system": "openlit" - } - - span = self._create_mock_span(attributes) - span.end_time = 1234567899 - - events = self.llo_handler._extract_openlit_span_event_attributes(span, attributes) - - self.assertEqual(len(events), 1) - event = events[0] - self.assertEqual(event.name, "gen_ai.assistant.message") - self.assertEqual(event.body["content"], "assistant direct completion") - self.assertEqual(event.body["role"], "assistant") - self.assertEqual(event.attributes["gen_ai.system"], "openlit") - self.assertEqual(event.attributes["original_attribute"], "gen_ai.completion") - self.assertEqual(event.timestamp, 1234567899) # end_time - - def test_extract_openlit_all_attributes(self): - """ - Test _extract_openlit_span_event_attributes with all OpenLit attributes - """ - attributes = { - "gen_ai.prompt": "user prompt", - "gen_ai.completion": "assistant response", - "gen_ai.content.revised_prompt": "revised prompt", - "gen_ai.system": "langchain" - } - - span = self._create_mock_span(attributes) - span.end_time = 1234567899 - - events = self.llo_handler._extract_openlit_span_event_attributes(span, attributes) - - self.assertEqual(len(events), 3) - - # Check that all events have the correct system - for event in events: - self.assertEqual(event.attributes["gen_ai.system"], "langchain") - - # Check we have the expected event types - event_types = {event.name for event in events} - self.assertIn("gen_ai.user.message", event_types) - self.assertIn("gen_ai.assistant.message", event_types) - self.assertIn("gen_ai.system.message", event_types) - - # Check original attributes - original_attrs = {event.attributes["original_attribute"] for event in events} - self.assertIn("gen_ai.prompt", original_attrs) - self.assertIn("gen_ai.completion", original_attrs) - self.assertIn("gen_ai.content.revised_prompt", original_attrs) - - def test_extract_openlit_revised_prompt(self): - """ - Test _extract_openlit_span_event_attributes with revised prompt attribute - """ - attributes = { - "gen_ai.content.revised_prompt": "revised system prompt", - "gen_ai.system": "openlit" - } - - span = self._create_mock_span(attributes) - - events = self.llo_handler._extract_openlit_span_event_attributes(span, attributes) - - self.assertEqual(len(events), 1) - event = events[0] - self.assertEqual(event.name, "gen_ai.system.message") - self.assertEqual(event.body["content"], "revised system prompt") - self.assertEqual(event.body["role"], "system") - self.assertEqual(event.attributes["gen_ai.system"], "openlit") - self.assertEqual(event.attributes["original_attribute"], "gen_ai.content.revised_prompt") - self.assertEqual(event.timestamp, 1234567890) # start_time -======= """ Test _extract_openlit_span_event_attributes with direct prompt attribute """ @@ -509,7 +386,6 @@ def test_extract_openlit_revised_prompt(self): self.assertEqual(event.attributes["gen_ai.system"], "openlit") self.assertEqual(event.attributes["original_attribute"], "gen_ai.content.revised_prompt") self.assertEqual(event.timestamp, 1234567890) # start_time ->>>>>>> 770f906 (add custom batch export) def test_extract_openinference_direct_attributes(self): """ From c96e03b2daee2077d5e942f4759136e864cf7bc1 Mon Sep 17 00:00:00 2001 From: liustve Date: Sun, 18 May 2025 00:27:14 +0000 Subject: [PATCH 5/5] formatting --- .../amazon/opentelemetry/distro/patches/_bedrock_patches.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py index b363ecf2d..26a6a3682 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py @@ -220,10 +220,6 @@ def extract_attributes(self, attributes: _AttributeMapT): knowledge_base_id = self._call_context.params.get(_KNOWLEDGE_BASE_ID) if knowledge_base_id: attributes[AWS_BEDROCK_KNOWLEDGE_BASE_ID] = knowledge_base_id - - def on_success(self, span: Span, result: _BotoResultT, instrumentor_context=None): - # Currently no attributes to extract from the result - pass def on_success(self, span: Span, result: _BotoResultT, instrumentor_context=None): # Currently no attributes to extract from the result