diff --git a/util/opentelemetry-util-genai/README.rst b/util/opentelemetry-util-genai/README.rst index 711ebf97bc..c2a3e780c5 100644 --- a/util/opentelemetry-util-genai/README.rst +++ b/util/opentelemetry-util-genai/README.rst @@ -6,6 +6,24 @@ The GenAI Utils package will include boilerplate and helpers to standardize inst This package will provide APIs and decorators to minimize the work needed to instrument genai libraries, while providing standardization for generating both types of otel, "spans and metrics" and "spans, metrics and events" +This package provides these span attributes. +-> gen_ai.operation.name: Str(chat) +-> gen_ai.system: Str(ChatOpenAI) +-> gen_ai.request.model: Str(gpt-3.5-turbo) +-> gen_ai.request.top_p: Double(0.9) +-> gen_ai.request.frequency_penalty: Double(0.5) +-> gen_ai.request.presence_penalty: Double(0.5) +-> gen_ai.request.stop_sequences: Slice(["\n","Human:","AI:"]) +-> gen_ai.request.seed: Int(100) +-> gen_ai.request.max_tokens: Int(100) +-> gen_ai.provider.name: Str(openai) +-> gen_ai.request.temperature: Double(0.1) +-> gen_ai.response.finish_reasons: Slice(["stop"]) +-> gen_ai.response.model: Str(gpt-3.5-turbo-0125) +-> gen_ai.response.id: Str(chatcmpl-Bz8yrvPnydD9pObv625n2CGBPHS13) +-> gen_ai.usage.input_tokens: Int(24) +-> gen_ai.usage.output_tokens: Int(7) + Installation ------------ diff --git a/util/opentelemetry-util-genai/pyproject.toml b/util/opentelemetry-util-genai/pyproject.toml index 280da37d58..e68ff37e0e 100644 --- a/util/opentelemetry-util-genai/pyproject.toml +++ b/util/opentelemetry-util-genai/pyproject.toml @@ -25,9 +25,9 @@ classifiers = [ "Programming Language :: Python :: 3.13", ] dependencies = [ - "opentelemetry-instrumentation ~= 0.51b0", - "opentelemetry-semantic-conventions ~= 0.51b0", - "opentelemetry-api>=1.31.0", + "opentelemetry-instrumentation ~= 0.57b0", + "opentelemetry-semantic-conventions ~= 0.57b0", + "opentelemetry-api>=1.36.0", ] [project.optional-dependencies] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/data.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/data.py new file mode 100644 index 0000000000..90b41ef49a --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/data.py @@ -0,0 +1,38 @@ +from dataclasses import dataclass + + +@dataclass +class Message: + content: str + type: str + name: str + + def _to_part_dict(self): + """Convert the message to a dictionary suitable for OpenTelemetry semconvs. + + Ref: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/registry/attributes/gen-ai.md#gen-ai-input-messages + """ + + # Support tool_call and tool_call response + return { + "role": self.type, + "parts": [ + { + "content": self.content, + "type": "text", + } + ], + } + + +@dataclass +class ChatGeneration: + content: str + type: str + finish_reason: str = None + + +@dataclass +class Error: + message: str + type: type[BaseException] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters.py new file mode 100644 index 0000000000..da96a3fca5 --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/emitters.py @@ -0,0 +1,651 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass, field +from typing import Dict, List, Optional +from uuid import UUID + +from opentelemetry import trace +from opentelemetry._events import Event +from opentelemetry._logs import LogRecord +from opentelemetry.context import Context, get_current +from opentelemetry.metrics import Meter +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.semconv.attributes import ( + error_attributes as ErrorAttributes, +) +from opentelemetry.trace import ( + Span, + SpanKind, + Tracer, + set_span_in_context, + use_span, +) +from opentelemetry.trace.status import Status, StatusCode +from opentelemetry.util.types import Attributes + +from .data import Error +from .instruments import Instruments +from .types import LLMInvocation + + +@dataclass +class _SpanState: + span: Span + span_context: Context + start_time: float + request_model: Optional[str] = None + system: Optional[str] = None + db_system: Optional[str] = None + children: List[UUID] = field(default_factory=list) + + +def _get_property_value(obj, property_name) -> object: + if isinstance(obj, dict): + return obj.get(property_name, None) + + return getattr(obj, property_name, None) + + +def _message_to_event(message, provider_name, framework) -> Optional[Event]: + content = _get_property_value(message, "content") + # TODO: check if content is not None and should_collect_content() + if content: + # update this to event.gen_ai.client.inference.operation.details: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-events.md + message_type = _get_property_value(message, "type") + message_type = "user" if message_type == "human" else message_type + body = {"content": content} + attributes = { + # TODO: add below to opentelemetry.semconv._incubating.attributes.gen_ai_attributes + "gen_ai.provider.name": provider_name, # Added in 1.37 - https://github.com/open-telemetry/semantic-conventions/blob/main/docs/registry/attributes/gen-ai.md#gen-ai-provider-name + "gen_ai.framework": framework, + GenAI.GEN_AI_SYSTEM: provider_name, # Deprecated: Removed in 1.37 + } + + return Event( + name=f"gen_ai.{message_type}.message", + attributes=attributes, + body=body or None, + ) + + +def _message_to_log_record( + message, provider_name, framework +) -> Optional[LogRecord]: + content = _get_property_value(message, "content") + # check if content is not None and should_collect_content() + message_type = _get_property_value(message, "type") + body = {"content": content} + + attributes = { + # TODO: add below to opentelemetry.semconv._incubating.attributes.gen_ai_attributes + "gen_ai.framework": framework, + "gen_ai.provider.name": provider_name, + GenAI.GEN_AI_SYSTEM: provider_name, # Deprecated: use "gen_ai.provider.name" + } + + return LogRecord( + event_name=f"gen_ai.{message_type}.message", + attributes=attributes, + body=body or None, + ) + + +def _chat_generation_to_event( + chat_generation, index, provider_name, framework +) -> Optional[Event]: + if chat_generation.content: + attributes = { + # TODO: add below to opentelemetry.semconv._incubating.attributes.gen_ai_attributes + "gen_ai.provider.name": provider_name, # added in 1.37 - https://github.com/open-telemetry/semantic-conventions/blob/main/docs/registry/attributes/gen-ai.md#gen-ai-provider-name + "gen_ai.framework": framework, + GenAI.GEN_AI_SYSTEM: provider_name, # Deprecated: removed in 1.37 + } + + message = { + "content": chat_generation.content, + "type": chat_generation.type, + } + body = { + "index": index, + "finish_reason": chat_generation.finish_reason or "error", + "message": message, + } + + return Event( + name="gen_ai.choice", + attributes=attributes, + body=body or None, + ) + + +def _chat_generation_to_log_record( + chat_generation, index, prefix, provider_name, framework +) -> Optional[LogRecord]: + if chat_generation: + attributes = { + # TODO: add below to opentelemetry.semconv._incubating.attributes.gen_ai_attributes + "gen_ai.framework": framework, + "gen_ai.provider.name": provider_name, + GenAI.GEN_AI_SYSTEM: provider_name, # Deprecated: removed in 1.37 + } + + message = { + "content": chat_generation.content, + "type": chat_generation.type, + } + body = { + "index": index, + "finish_reason": chat_generation.finish_reason or "error", + "message": message, + } + + return LogRecord( + event_name="gen_ai.choice", + attributes=attributes, + body=body or None, + ) + + +def _get_metric_attributes( + request_model: Optional[str], + response_model: Optional[str], + operation_name: Optional[str], + system: Optional[str], + framework: Optional[str], +) -> Dict: + attributes = { + # TODO: add below to opentelemetry.semconv._incubating.attributes.gen_ai_attributes + "gen_ai.framework": framework, + } + if system: + attributes["gen_ai.provider.name"] = system + if operation_name: + attributes[GenAI.GEN_AI_OPERATION_NAME] = operation_name + if request_model: + attributes[GenAI.GEN_AI_REQUEST_MODEL] = request_model + if response_model: + attributes[GenAI.GEN_AI_RESPONSE_MODEL] = response_model + + return attributes + + +class BaseEmitter: + """ + Abstract base for emitters mapping GenAI types -> OpenTelemetry. + """ + + def init(self, invocation: LLMInvocation): + raise NotImplementedError + + def emit(self, invocation: LLMInvocation): + raise NotImplementedError + + def error(self, error: Error, invocation: LLMInvocation): + raise NotImplementedError + + +class SpanMetricEventEmitter(BaseEmitter): + """ + Emits spans, metrics and events for a full telemetry picture. + """ + + def __init__( + self, event_logger, tracer: Tracer = None, meter: Meter = None + ): + self._tracer = tracer or trace.get_tracer(__name__) + instruments = Instruments(meter) + self._duration_histogram = instruments.operation_duration_histogram + self._token_histogram = instruments.token_usage_histogram + self._event_logger = event_logger + + # Map from run_id -> _SpanState, to keep track of spans and parent/child relationships + self.spans: Dict[UUID, _SpanState] = {} + + def _start_span( + self, + name: str, + kind: SpanKind, + parent_run_id: Optional[UUID] = None, + ) -> Span: + if parent_run_id is not None and parent_run_id in self.spans: + parent_span = self.spans[parent_run_id].span + ctx = set_span_in_context(parent_span) + span = self._tracer.start_span(name=name, kind=kind, context=ctx) + else: + # top-level or missing parent + span = self._tracer.start_span(name=name, kind=kind) + + return span + + def _end_span(self, run_id: UUID): + state = self.spans[run_id] + for child_id in state.children: + child_state = self.spans.get(child_id) + if child_state: + child_state.span.end() + state.span.end() + + def init(self, invocation: LLMInvocation): + if ( + invocation.parent_run_id is not None + and invocation.parent_run_id in self.spans + ): + self.spans[invocation.parent_run_id].children.append( + invocation.run_id + ) + + for message in invocation.messages: + system = invocation.attributes.get("system") + self._event_logger.emit( + _message_to_event( + message=message, + provider_name=system, + framework=invocation.attributes.get("framework"), + ) + ) + + def emit(self, invocation: LLMInvocation): + system = invocation.attributes.get("system") + span = self._start_span( + name=f"{system}.chat", + kind=SpanKind.CLIENT, + parent_run_id=invocation.parent_run_id, + ) + + with use_span( + span, + end_on_exit=False, + ) as span: + request_model = invocation.attributes.get("request_model") + span_state = _SpanState( + span=span, + span_context=get_current(), + request_model=request_model, + system=system, + start_time=invocation.start_time, + ) + self.spans[invocation.run_id] = span_state + + span.set_attribute( + GenAI.GEN_AI_OPERATION_NAME, + GenAI.GenAiOperationNameValues.CHAT.value, + ) + + if request_model: + span.set_attribute(GenAI.GEN_AI_REQUEST_MODEL, request_model) + + # TODO: add below to opentelemetry.semconv._incubating.attributes.gen_ai_attributes + framework = invocation.attributes.get("framework") + if framework is not None: + span.set_attribute("gen_ai.framework", framework) + + if system is not None: + span.set_attribute( + GenAI.GEN_AI_SYSTEM, system + ) # Deprecated: use "gen_ai.provider.name" + # TODO: add below to opentelemetry.semconv._incubating.attributes.gen_ai_attributes + span.set_attribute("gen_ai.provider.name", system) + + finish_reasons = [] + for index, chat_generation in enumerate( + invocation.chat_generations + ): + self._event_logger.emit( + _chat_generation_to_event( + chat_generation, index, system, framework + ) + ) + finish_reasons.append(chat_generation.finish_reason) + + if finish_reasons is not None and len(finish_reasons) > 0: + span.set_attribute( + GenAI.GEN_AI_RESPONSE_FINISH_REASONS, finish_reasons + ) + + response_model = invocation.attributes.get("response_model_name") + if response_model is not None: + span.set_attribute(GenAI.GEN_AI_RESPONSE_MODEL, response_model) + + response_id = invocation.attributes.get("response_id") + if response_id is not None: + span.set_attribute(GenAI.GEN_AI_RESPONSE_ID, response_id) + + # usage + prompt_tokens = invocation.attributes.get("input_tokens") + if prompt_tokens is not None: + span.set_attribute( + GenAI.GEN_AI_USAGE_INPUT_TOKENS, prompt_tokens + ) + + completion_tokens = invocation.attributes.get("output_tokens") + if completion_tokens is not None: + span.set_attribute( + GenAI.GEN_AI_USAGE_OUTPUT_TOKENS, completion_tokens + ) + + metric_attributes = _get_metric_attributes( + request_model, + response_model, + GenAI.GenAiOperationNameValues.CHAT.value, + system, + framework, + ) + + # Record token usage metrics + prompt_tokens_attributes = { + GenAI.GEN_AI_TOKEN_TYPE: GenAI.GenAiTokenTypeValues.INPUT.value, + } + prompt_tokens_attributes.update(metric_attributes) + self._token_histogram.record( + prompt_tokens, attributes=prompt_tokens_attributes + ) + + completion_tokens_attributes = { + GenAI.GEN_AI_TOKEN_TYPE: GenAI.GenAiTokenTypeValues.COMPLETION.value + } + completion_tokens_attributes.update(metric_attributes) + self._token_histogram.record( + completion_tokens, attributes=completion_tokens_attributes + ) + + # End the LLM span + self._end_span(invocation.run_id) + + # Record overall duration metric + elapsed = invocation.end_time - invocation.start_time + self._duration_histogram.record( + elapsed, attributes=metric_attributes + ) + + def error(self, error: Error, invocation: LLMInvocation): + system = invocation.attributes.get("system") + span = self._start_span( + name=f"{system}.chat", + kind=SpanKind.CLIENT, + parent_run_id=invocation.parent_run_id, + ) + + with use_span( + span, + end_on_exit=False, + ) as span: + request_model = invocation.attributes.get("request_model") + system = invocation.attributes.get("system") + + span_state = _SpanState( + span=span, + span_context=get_current(), + request_model=request_model, + system=system, + start_time=invocation.start_time, + ) + self.spans[invocation.run_id] = span_state + + span.set_status(Status(StatusCode.ERROR, error.message)) + if span.is_recording(): + span.set_attribute( + ErrorAttributes.ERROR_TYPE, error.type.__qualname__ + ) + + self._end_span(invocation.run_id) + + response_model = invocation.attributes.get("response_model_name") + framework = invocation.attributes.get("framework") + + metric_attributes = _get_metric_attributes( + request_model, + response_model, + GenAI.GenAiOperationNameValues.CHAT.value, + system, + framework, + ) + + # Record overall duration metric + elapsed = invocation.end_time - invocation.start_time + self._duration_histogram.record( + elapsed, attributes=metric_attributes + ) + + +class SpanMetricEmitter(BaseEmitter): + """ + Emits only spans and metrics (no events). + """ + + def __init__(self, tracer: Tracer = None, meter: Meter = None): + self._tracer = tracer or trace.get_tracer(__name__) + instruments = Instruments(meter) + self._duration_histogram = instruments.operation_duration_histogram + self._token_histogram = instruments.token_usage_histogram + + # Map from run_id -> _SpanState, to keep track of spans and parent/child relationships + self.spans: Dict[UUID, _SpanState] = {} + + def _start_span( + self, + name: str, + kind: SpanKind, + parent_run_id: Optional[UUID] = None, + ) -> Span: + if parent_run_id is not None and parent_run_id in self.spans: + parent_span = self.spans[parent_run_id].span + ctx = set_span_in_context(parent_span) + span = self._tracer.start_span(name=name, kind=kind, context=ctx) + else: + # top-level or missing parent + span = self._tracer.start_span(name=name, kind=kind) + + return span + + def _end_span(self, run_id: UUID): + state = self.spans[run_id] + for child_id in state.children: + child_state = self.spans.get(child_id) + if child_state and child_state.span._end_time is None: + child_state.span.end() + if state.span._end_time is None: + state.span.end() + + def init(self, invocation: LLMInvocation): + if ( + invocation.parent_run_id is not None + and invocation.parent_run_id in self.spans + ): + self.spans[invocation.parent_run_id].children.append( + invocation.run_id + ) + + def emit(self, invocation: LLMInvocation): + system = invocation.attributes.get("system") + span = self._start_span( + name=f"{system}.chat", + kind=SpanKind.CLIENT, + parent_run_id=invocation.parent_run_id, + ) + + with use_span( + span, + end_on_exit=False, + ) as span: + request_model = invocation.attributes.get("request_model") + span_state = _SpanState( + span=span, + span_context=get_current(), + request_model=request_model, + system=system, + start_time=invocation.start_time, + ) + self.spans[invocation.run_id] = span_state + + span.set_attribute( + GenAI.GEN_AI_OPERATION_NAME, + GenAI.GenAiOperationNameValues.CHAT.value, + ) + + if request_model is not None: + span.set_attribute(GenAI.GEN_AI_REQUEST_MODEL, request_model) + + # TODO: add below to opentelemetry.semconv._incubating.attributes.gen_ai_attributes + framework = invocation.attributes.get("framework") + if framework is not None: + span.set_attribute( + "gen_ai.framework", invocation.attributes.get("framework") + ) + span.set_attribute( + GenAI.GEN_AI_SYSTEM, system + ) # Deprecated: use "gen_ai.provider.name" + # TODO: add below to opentelemetry.semconv._incubating.attributes.gen_ai_attributes + span.set_attribute("gen_ai.provider.name", system) + + finish_reasons = [] + for index, chat_generation in enumerate( + invocation.chat_generations + ): + finish_reasons.append(chat_generation.finish_reason) + if finish_reasons is not None and len(finish_reasons) > 0: + span.set_attribute( + GenAI.GEN_AI_RESPONSE_FINISH_REASONS, finish_reasons + ) + + response_model = invocation.attributes.get("response_model_name") + if response_model is not None: + span.set_attribute(GenAI.GEN_AI_RESPONSE_MODEL, response_model) + + response_id = invocation.attributes.get("response_id") + if response_id is not None: + span.set_attribute(GenAI.GEN_AI_RESPONSE_ID, response_id) + + # usage + prompt_tokens = invocation.attributes.get("input_tokens") + if prompt_tokens is not None: + span.set_attribute( + GenAI.GEN_AI_USAGE_INPUT_TOKENS, prompt_tokens + ) + + completion_tokens = invocation.attributes.get("output_tokens") + if completion_tokens is not None: + span.set_attribute( + GenAI.GEN_AI_USAGE_OUTPUT_TOKENS, completion_tokens + ) + + message_parts: List[Attributes] = [] + for index, message in enumerate(invocation.messages): + message_parts.append(message._to_part_dict()) + + if len(message_parts) > 0: + span.set_attribute("gen_ai.input.messages", message_parts) + + # for index, message in enumerate(invocation.messages): + # content = message.content + # # Set these attributes to upcoming semconv: https://github.com/open-telemetry/semantic-conventions/pull/2179 + # span.set_attribute(f"gen_ai.input.messages.{index}.content", [content._to_part_dict()]) + # span.set_attribute(f"gen_ai.input.messages.{index}.role", message.type) + + for index, chat_generation in enumerate( + invocation.chat_generations + ): + # Set these attributes to upcoming semconv: https://github.com/open-telemetry/semantic-conventions/pull/2179 + span.set_attribute( + f"gen_ai.completion.{index}.content", + chat_generation.content, + ) + span.set_attribute( + f"gen_ai.completion.{index}.role", chat_generation.type + ) + + metric_attributes = _get_metric_attributes( + request_model, + response_model, + GenAI.GenAiOperationNameValues.CHAT.value, + system, + framework, + ) + + # Record token usage metrics + prompt_tokens_attributes = { + GenAI.GEN_AI_TOKEN_TYPE: GenAI.GenAiTokenTypeValues.INPUT.value + } + prompt_tokens_attributes.update(metric_attributes) + self._token_histogram.record( + prompt_tokens, attributes=prompt_tokens_attributes + ) + + completion_tokens_attributes = { + GenAI.GEN_AI_TOKEN_TYPE: GenAI.GenAiTokenTypeValues.COMPLETION.value + } + completion_tokens_attributes.update(metric_attributes) + self._token_histogram.record( + completion_tokens, attributes=completion_tokens_attributes + ) + + # End the LLM span + self._end_span(invocation.run_id) + + # Record overall duration metric + elapsed = invocation.end_time - invocation.start_time + self._duration_histogram.record( + elapsed, attributes=metric_attributes + ) + + def error(self, error: Error, invocation: LLMInvocation): + system = invocation.attributes.get("system") + span = self._start_span( + name=f"{system}.chat", + kind=SpanKind.CLIENT, + parent_run_id=invocation.parent_run_id, + ) + + with use_span( + span, + end_on_exit=False, + ) as span: + request_model = invocation.attributes.get("request_model") + system = invocation.attributes.get("system") + + span_state = _SpanState( + span=span, + span_context=get_current(), + request_model=request_model, + system=system, + start_time=invocation.start_time, + ) + self.spans[invocation.run_id] = span_state + + span.set_status(Status(StatusCode.ERROR, error.message)) + if span.is_recording(): + span.set_attribute( + ErrorAttributes.ERROR_TYPE, error.type.__qualname__ + ) + + self._end_span(invocation.run_id) + + response_model = invocation.attributes.get("response_model_name") + framework = invocation.attributes.get("framework") + + metric_attributes = _get_metric_attributes( + request_model, + response_model, + GenAI.GenAiOperationNameValues.CHAT.value, + system, + framework, + ) + + # Record overall duration metric + elapsed = invocation.end_time - invocation.start_time + self._duration_histogram.record( + elapsed, attributes=metric_attributes + ) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py new file mode 100644 index 0000000000..1208c4bc02 --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py @@ -0,0 +1,160 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from threading import Lock +from typing import Any, List, Optional +from uuid import UUID + +from opentelemetry._events import get_event_logger +from opentelemetry.metrics import get_meter +from opentelemetry.semconv.schemas import Schemas +from opentelemetry.trace import get_tracer + +from .data import ChatGeneration, Error, Message +from .emitters import SpanMetricEmitter, SpanMetricEventEmitter +from .types import LLMInvocation + +# TODO: Get the tool version for emitting spans, use GenAI Utils for now +from .version import __version__ + + +class TelemetryHandler: + """ + High-level handler managing GenAI invocation lifecycles and emitting + them as spans, metrics, and events. + """ + + def __init__(self, emitter_type_full: bool = True, **kwargs: Any): + tracer_provider = kwargs.get("tracer_provider") + self._tracer = get_tracer( + __name__, + __version__, + tracer_provider, + schema_url=Schemas.V1_36_0.value, + ) + + meter_provider = kwargs.get("meter_provider") + self._meter = get_meter( + __name__, + __version__, + meter_provider, + schema_url=Schemas.V1_36_0.value, + ) + + event_logger_provider = kwargs.get("event_logger_provider") + self._event_logger = get_event_logger( + __name__, + __version__, + event_logger_provider=event_logger_provider, + schema_url=Schemas.V1_36_0.value, + ) + + self._emitter = ( + SpanMetricEventEmitter( + tracer=self._tracer, + meter=self._meter, + event_logger=self._event_logger, + ) + if emitter_type_full + else SpanMetricEmitter(tracer=self._tracer, meter=self._meter) + ) + + self._llm_registry: dict[UUID, LLMInvocation] = {} + self._lock = Lock() + + def start_llm( + self, + prompts: List[Message], + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **attributes: Any, + ) -> None: + invocation = LLMInvocation( + messages=prompts, + run_id=run_id, + parent_run_id=parent_run_id, + attributes=attributes, + ) + with self._lock: + self._llm_registry[invocation.run_id] = invocation + self._emitter.init(invocation) + + def stop_llm( + self, + run_id: UUID, + chat_generations: List[ChatGeneration], + **attributes: Any, + ) -> LLMInvocation: + with self._lock: + invocation = self._llm_registry.pop(run_id) + invocation.end_time = time.time() + invocation.chat_generations = chat_generations + invocation.attributes.update(attributes) + self._emitter.emit(invocation) + return invocation + + def fail_llm( + self, run_id: UUID, error: Error, **attributes: Any + ) -> LLMInvocation: + with self._lock: + invocation = self._llm_registry.pop(run_id) + invocation.end_time = time.time() + invocation.attributes.update(**attributes) + self._emitter.error(error, invocation) + return invocation + + +# Singleton accessor +_default_handler: Optional[TelemetryHandler] = None + + +def get_telemetry_handler( + emitter_type_full: bool = True, **kwargs: Any +) -> TelemetryHandler: + global _default_handler + if _default_handler is None: + _default_handler = TelemetryHandler( + emitter_type_full=emitter_type_full, **kwargs + ) + return _default_handler + + +# Module‐level convenience functions +def llm_start( + prompts: List[Message], + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **attributes: Any, +) -> None: + return get_telemetry_handler().start_llm( + prompts=prompts, + run_id=run_id, + parent_run_id=parent_run_id, + **attributes, + ) + + +def llm_stop( + run_id: UUID, chat_generations: List[ChatGeneration], **attributes: Any +) -> LLMInvocation: + return get_telemetry_handler().stop_llm( + run_id=run_id, chat_generations=chat_generations, **attributes + ) + + +def llm_fail(run_id: UUID, error: Error, **attributes: Any) -> LLMInvocation: + return get_telemetry_handler().fail_llm( + run_id=run_id, error=error, **attributes + ) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py new file mode 100644 index 0000000000..d3df787501 --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py @@ -0,0 +1,54 @@ +from opentelemetry.metrics import Histogram, Meter +from opentelemetry.semconv._incubating.metrics import gen_ai_metrics + +# TODO: should this be in utils or passed to the telemetry client? +_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [ + 0.01, + 0.02, + 0.04, + 0.08, + 0.16, + 0.32, + 0.64, + 1.28, + 2.56, + 5.12, + 10.24, + 20.48, + 40.96, + 81.92, +] + +# TODO: should this be in utils or passed to the telemetry client? +_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS = [ + 1, + 4, + 16, + 64, + 256, + 1024, + 4096, + 16384, + 65536, + 262144, + 1048576, + 4194304, + 16777216, + 67108864, +] + + +class Instruments: + def __init__(self, meter: Meter): + self.operation_duration_histogram: Histogram = meter.create_histogram( + name=gen_ai_metrics.GEN_AI_CLIENT_OPERATION_DURATION, + description="GenAI operation duration", + unit="s", + explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS, + ) + self.token_usage_histogram: Histogram = meter.create_histogram( + name=gen_ai_metrics.GEN_AI_CLIENT_TOKEN_USAGE, + description="Measures number of input and output tokens used", + unit="{token}", + explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS, + ) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py new file mode 100644 index 0000000000..cdb4e2f38b --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py @@ -0,0 +1,37 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional +from uuid import UUID + +from .data import ChatGeneration, Message + + +@dataclass +class LLMInvocation: + """ + Represents a single LLM call invocation. + """ + + run_id: UUID + parent_run_id: Optional[UUID] = None + start_time: float = field(default_factory=time.time) + end_time: Optional[float] = None + messages: List[Message] = field(default_factory=list) + chat_generations: List[ChatGeneration] = field(default_factory=list) + attributes: Dict[str, Any] = field(default_factory=dict) + span_id: int = 0 + trace_id: int = 0 diff --git a/util/opentelemetry-util-genai/tests/test_utils.py b/util/opentelemetry-util-genai/tests/test_utils.py new file mode 100644 index 0000000000..c15dc7bb81 --- /dev/null +++ b/util/opentelemetry-util-genai/tests/test_utils.py @@ -0,0 +1,78 @@ +from uuid import uuid4 + +import pytest + +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) +from opentelemetry.util.genai.client import ( + llm_start, + llm_stop, +) +from opentelemetry.util.genai.types import ( + ChatGeneration, + Message, +) + + +@pytest.fixture +def telemetry_setup(): + """Set up telemetry providers for testing""" + # Set up in-memory span exporter to capture spans + memory_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(memory_exporter)) + + # Set the tracer provider + trace.set_tracer_provider(tracer_provider) + + yield memory_exporter + + # Cleanup + memory_exporter.clear() + # Reset to default tracer provider + trace.set_tracer_provider(trace.NoOpTracerProvider()) + + +def test_llm_start_and_stop_creates_span(telemetry_setup): + memory_exporter = telemetry_setup + + run_id = uuid4() + message = Message(content="hello world", type="Human", name="message name") + chat_generation = ChatGeneration(content="hello back", type="AI") + + # Start and stop LLM invocation + llm_start( + [message], run_id=run_id, custom_attr="value", system="test-system" + ) + invocation = llm_stop( + run_id, chat_generations=[chat_generation], extra="info" + ) + + # Get the spans that were created + spans = memory_exporter.get_finished_spans() + + # Verify span was created + assert len(spans) == 1 + span = spans[0] + + # Verify span properties + assert span.name == "test-system.chat" + assert span.kind == trace.SpanKind.CLIENT + + # Verify span attributes + assert span.attributes.get("gen_ai.operation.name") == "chat" + assert span.attributes.get("gen_ai.system") == "test-system" + # Add more attribute checks as needed + + # Verify span timing + assert span.start_time > 0 + assert span.end_time > span.start_time + + # Verify invocation data + assert invocation.run_id == run_id + assert invocation.attributes.get("custom_attr") == "value" + assert invocation.attributes.get("extra") == "info"