Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
from typing import Iterator, Optional

from opentelemetry import context as otel_context
from opentelemetry._logs import Logger, LoggerProvider, get_logger
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
)
Expand All @@ -77,6 +78,7 @@
from opentelemetry.util.genai.span_utils import (
_apply_error_attributes,
_apply_finish_attributes,
_emit_content_event,
)
from opentelemetry.util.genai.types import Error, LLMInvocation
from opentelemetry.util.genai.version import __version__
Expand All @@ -88,12 +90,22 @@ class TelemetryHandler:
them as spans, metrics, and events.
"""

def __init__(self, tracer_provider: TracerProvider | None = None):
def __init__(
self,
tracer_provider: TracerProvider | None = None,
logger_provider: LoggerProvider | None = None,
):
self._tracer = get_tracer(
__name__,
__version__,
tracer_provider,
schema_url=Schemas.V1_36_0.value,
schema_url=Schemas.V1_37_0.value,
)
self._logger: Logger = get_logger(
__name__,
__version__,
logger_provider=logger_provider,
schema_url=Schemas.V1_37_0.value,
)

def start_llm(
Expand All @@ -119,6 +131,7 @@ def stop_llm(self, invocation: LLMInvocation) -> LLMInvocation: # pylint: disab
return invocation

_apply_finish_attributes(invocation.span, invocation)
_emit_content_event(self._logger, invocation)
# Detach context and end span
otel_context.detach(invocation.context_token)
invocation.span.end()
Expand All @@ -133,6 +146,7 @@ def fail_llm( # pylint: disable=no-self-use
return invocation

_apply_error_attributes(invocation.span, error)
_emit_content_event(self._logger, invocation)
# Detach context and end span
otel_context.detach(invocation.context_token)
invocation.span.end()
Expand Down Expand Up @@ -165,6 +179,7 @@ def llm(

def get_telemetry_handler(
tracer_provider: TracerProvider | None = None,
logger_provider: LoggerProvider | None = None,
) -> TelemetryHandler:
"""
Returns a singleton TelemetryHandler instance.
Expand All @@ -173,6 +188,9 @@ def get_telemetry_handler(
get_telemetry_handler, "_default_handler", None
)
if handler is None:
handler = TelemetryHandler(tracer_provider=tracer_provider)
handler = TelemetryHandler(
tracer_provider=tracer_provider,
logger_provider=logger_provider,
)
setattr(get_telemetry_handler, "_default_handler", handler)
return handler
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
# limitations under the License.

from dataclasses import asdict
from typing import List
from typing import Any, Dict, List, Optional

from opentelemetry._logs import Logger, LogRecord
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
)
Expand All @@ -39,47 +40,47 @@
)


def _apply_common_span_attributes(
span: Span, invocation: LLMInvocation
) -> None:
"""Apply attributes shared by finish() and error() and compute metrics.

Returns (genai_attributes) for use with metrics.
"""
span.update_name(
f"{GenAI.GenAiOperationNameValues.CHAT.value} {invocation.request_model}".strip()
)
span.set_attribute(
GenAI.GEN_AI_OPERATION_NAME, GenAI.GenAiOperationNameValues.CHAT.value
)
def _collect_invocation_attributes(
invocation: LLMInvocation,
) -> Dict[str, Any]:
"""Build baseline GenAI semantic convention attributes for the invocation."""
attributes: Dict[str, Any] = {
GenAI.GEN_AI_OPERATION_NAME: GenAI.GenAiOperationNameValues.CHAT.value,
}
if invocation.request_model:
span.set_attribute(
GenAI.GEN_AI_REQUEST_MODEL, invocation.request_model
)
attributes[GenAI.GEN_AI_REQUEST_MODEL] = invocation.request_model
if invocation.provider is not None:
# TODO: clean provider name to match GenAiProviderNameValues?
span.set_attribute(GenAI.GEN_AI_PROVIDER_NAME, invocation.provider)

attributes[GenAI.GEN_AI_PROVIDER_NAME] = invocation.provider
if invocation.output_messages:
span.set_attribute(
GenAI.GEN_AI_RESPONSE_FINISH_REASONS,
[gen.finish_reason for gen in invocation.output_messages],
)

attributes[GenAI.GEN_AI_RESPONSE_FINISH_REASONS] = [
gen.finish_reason for gen in invocation.output_messages
]
if invocation.response_model_name is not None:
span.set_attribute(
GenAI.GEN_AI_RESPONSE_MODEL, invocation.response_model_name
attributes[GenAI.GEN_AI_RESPONSE_MODEL] = (
invocation.response_model_name
)
if invocation.response_id is not None:
span.set_attribute(GenAI.GEN_AI_RESPONSE_ID, invocation.response_id)
attributes[GenAI.GEN_AI_RESPONSE_ID] = invocation.response_id
if invocation.input_tokens is not None:
span.set_attribute(
GenAI.GEN_AI_USAGE_INPUT_TOKENS, invocation.input_tokens
)
attributes[GenAI.GEN_AI_USAGE_INPUT_TOKENS] = invocation.input_tokens
if invocation.output_tokens is not None:
span.set_attribute(
GenAI.GEN_AI_USAGE_OUTPUT_TOKENS, invocation.output_tokens
)
attributes[GenAI.GEN_AI_USAGE_OUTPUT_TOKENS] = invocation.output_tokens
return attributes


def _messages_to_json(messages: List[Any]) -> str:
"""Serialize message dataclasses to JSON using the GenAI utility."""
return gen_ai_json_dumps([asdict(message) for message in messages])


def _apply_common_span_attributes(
span: Span, invocation: LLMInvocation
) -> None:
"""Apply attributes shared by finish() and error() and compute metrics."""
span.update_name(
f"{GenAI.GenAiOperationNameValues.CHAT.value} {invocation.request_model}".strip()
)
span.set_attributes(_collect_invocation_attributes(invocation))


def _maybe_set_span_messages(
Expand All @@ -95,14 +96,12 @@ def _maybe_set_span_messages(
if input_messages:
span.set_attribute(
GenAI.GEN_AI_INPUT_MESSAGES,
gen_ai_json_dumps([asdict(message) for message in input_messages]),
_messages_to_json(input_messages),
)
if output_messages:
span.set_attribute(
GenAI.GEN_AI_OUTPUT_MESSAGES,
gen_ai_json_dumps(
[asdict(message) for message in output_messages]
),
_messages_to_json(output_messages),
)


Expand All @@ -122,7 +121,47 @@ def _apply_error_attributes(span: Span, error: Error) -> None:
span.set_attribute(ErrorAttributes.ERROR_TYPE, error.type.__qualname__)


def _build_event_attributes(invocation: LLMInvocation) -> Dict[str, Any]:
attributes = _collect_invocation_attributes(invocation)
if invocation.attributes:
attributes.update(invocation.attributes)
return attributes


def _build_event_body(invocation: LLMInvocation) -> Optional[Dict[str, str]]:
body: Dict[str, str] = {}
if invocation.input_messages:
body["input_messages"] = _messages_to_json(invocation.input_messages)
if invocation.output_messages:
body["output_messages"] = _messages_to_json(invocation.output_messages)
return body or None


def _emit_content_event(logger: Logger, invocation: LLMInvocation) -> None:
if not is_experimental_mode():
return
try:
content_mode = get_content_capturing_mode()
except ValueError:
return
if content_mode not in (
ContentCapturingMode.EVENT_ONLY,
ContentCapturingMode.SPAN_AND_EVENT,
):
return

event_body = _build_event_body(invocation)
event_attributes = _build_event_attributes(invocation)
log_record = LogRecord(
event_name="gen_ai.client.inference.operation.details",
attributes=event_attributes,
body=event_body,
)
logger.emit(log_record)


__all__ = [
"_apply_finish_attributes",
"_apply_error_attributes",
"_emit_content_event",
]
111 changes: 110 additions & 1 deletion util/opentelemetry-util-genai/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
OTEL_SEMCONV_STABILITY_OPT_IN,
_OpenTelemetrySemanticConventionStability,
)
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import (
InMemoryLogExporter,
SimpleLogRecordProcessor,
)
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
Expand All @@ -37,6 +42,7 @@
from opentelemetry.util.genai.handler import get_telemetry_handler
from opentelemetry.util.genai.types import (
ContentCapturingMode,
Error,
InputMessage,
LLMInvocation,
OutputMessage,
Expand Down Expand Up @@ -108,13 +114,22 @@ def setUp(self):
tracer_provider.add_span_processor(
SimpleSpanProcessor(self.span_exporter)
)

self.log_exporter = InMemoryLogExporter()
self.logger_provider = LoggerProvider()
self.logger_provider.add_log_record_processor(
SimpleLogRecordProcessor(self.log_exporter)
)
self.telemetry_handler = get_telemetry_handler(
tracer_provider=tracer_provider
tracer_provider=tracer_provider,
logger_provider=self.logger_provider,
)

def tearDown(self):
# Clear spans and reset the singleton telemetry handler so each test starts clean
self.span_exporter.clear()
self.log_exporter.clear()
self.logger_provider.shutdown()
if hasattr(get_telemetry_handler, "_default_handler"):
delattr(get_telemetry_handler, "_default_handler")

Expand Down Expand Up @@ -257,6 +272,100 @@ def test_parent_child_span_relationship(self):
# Parent should not have a parent (root)
assert parent_span.parent is None

@patch_env_vars(
stability_mode="gen_ai_latest_experimental",
content_capturing="SPAN_AND_EVENT",
)
def test_llm_stop_emits_event_when_events_enabled(self):
message = InputMessage(
role="Human", parts=[Text(content="hello world")]
)
chat_generation = OutputMessage(
role="AI", parts=[Text(content="hello back")], finish_reason="stop"
)

with self.telemetry_handler.llm() as invocation:
invocation.request_model = "event-model"
invocation.input_messages = [message]
invocation.provider = "test-provider"
invocation.attributes = {"custom_attr": "value"}
invocation.output_messages = [chat_generation]

logs = self.log_exporter.get_finished_logs()
assert len(logs) == 1
log_record = logs[0].log_record
assert (
log_record.event_name
== "gen_ai.client.inference.operation.details"
)

event_attributes = log_record.attributes
assert event_attributes.get("gen_ai.operation.name") == "chat"
assert event_attributes.get("gen_ai.request.model") == "event-model"
assert event_attributes.get("gen_ai.provider.name") == "test-provider"
assert event_attributes.get("custom_attr") == "value"

body = log_record.body
assert body is not None
assert "input_messages" in body
assert "output_messages" in body

inputs = json.loads(body["input_messages"])
outputs = json.loads(body["output_messages"])
assert len(inputs) == 1
assert len(outputs) == 1
assert inputs[0].get("role") == "Human"
assert outputs[0].get("role") == "AI"

@patch_env_vars(
stability_mode="gen_ai_latest_experimental",
content_capturing="SPAN_ONLY",
)
def test_llm_stop_does_not_emit_event_when_events_disabled(self):
message = InputMessage(role="Human", parts=[Text(content="hi")])

with self.telemetry_handler.llm() as invocation:
invocation.request_model = "no-event-model"
invocation.input_messages = [message]
invocation.output_messages = []

logs = self.log_exporter.get_finished_logs()
assert len(logs) == 0

@patch_env_vars(
stability_mode="gen_ai_latest_experimental",
content_capturing="EVENT_ONLY",
)
def test_llm_fail_emits_event_when_events_enabled(self):
class BoomError(RuntimeError):
pass

message = InputMessage(role="user", parts=[Text(content="hi")])
invocation = LLMInvocation(
request_model="fail-event-model",
input_messages=[message],
provider="provider-x",
attributes={"custom_attr": "value"},
)

self.telemetry_handler.start_llm(invocation)
self.telemetry_handler.fail_llm(
invocation, Error(message="boom", type=BoomError)
)

logs = self.log_exporter.get_finished_logs()
assert len(logs) == 1
log_record = logs[0].log_record
assert (
log_record.event_name
== "gen_ai.client.inference.operation.details"
)
event_attributes = log_record.attributes
assert (
event_attributes.get("gen_ai.request.model") == "fail-event-model"
)
assert event_attributes.get("gen_ai.provider.name") == "provider-x"

def test_llm_context_manager_error_path_records_error_status_and_attrs(
self,
):
Expand Down
Loading