diff --git a/instrumentation-genai/opentelemetry-genai-sdk/README.rst b/instrumentation-genai/opentelemetry-genai-sdk/README.rst new file mode 100644 index 0000000000..f9a65cc60d --- /dev/null +++ b/instrumentation-genai/opentelemetry-genai-sdk/README.rst @@ -0,0 +1,27 @@ +Installation +============ + +Option 1: pip + requirements.txt +--------------------------------- +:: + + python3 -m venv .venv + source .venv/bin/activate + pip install -r requirements.txt + +Option 2: Poetry +---------------- +:: + + poetry install + +Running Tests +============= + +After installing dependencies, simply run: + +:: + + pytest + +This will discover and run `tests/test_sdk.py`. diff --git a/instrumentation-genai/opentelemetry-genai-sdk/pyproject.toml b/instrumentation-genai/opentelemetry-genai-sdk/pyproject.toml new file mode 100644 index 0000000000..5f657157ca --- /dev/null +++ b/instrumentation-genai/opentelemetry-genai-sdk/pyproject.toml @@ -0,0 +1,53 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "opentelemetry-genai-sdk" +dynamic = ["version"] +description = "OpenTelemetry GenAI SDK" +readme = "README.rst" +license = "Apache-2.0" +requires-python = ">=3.8" +authors = [ + { name = "OpenTelemetry Authors", email = "cncf-opentelemetry-contributors@lists.cncf.io" }, +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", +] +dependencies = [ + "opentelemetry-api ~= 1.36.0", + "opentelemetry-instrumentation ~= 0.57b0", + "opentelemetry-semantic-conventions ~= 0.57b0", +] + +[project.optional-dependencies] +test = [ + "pytest>=7.0.0", +] +# evaluation = ["deepevals>=0.1.0", "openlit-sdk>=0.1.0"] + +[project.urls] +Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation-genai/opentelemetry-genai-sdk" +Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib" + +[tool.hatch.version] +path = "src/opentelemetry/genai/sdk/version.py" + +[tool.hatch.build.targets.sdist] +include = [ + "/src", + "/tests", +] + +[tool.hatch.build.targets.wheel] +packages = ["src/opentelemetry"] diff --git a/instrumentation-genai/opentelemetry-genai-sdk/requirements.txt b/instrumentation-genai/opentelemetry-genai-sdk/requirements.txt new file mode 100644 index 0000000000..abfd86b393 --- /dev/null +++ b/instrumentation-genai/opentelemetry-genai-sdk/requirements.txt @@ -0,0 +1,10 @@ +# OpenTelemetry SDK +opentelemetry-api>=1.34.0 +opentelemetry-sdk>=1.34.0 + +# Testing +pytest>=7.0.0 + +# (Optional) evaluation libraries +# deepevals>=0.1.0 +# openlit-sdk>=0.1.0 diff --git a/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/api.py b/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/api.py new file mode 100644 index 0000000000..08d6b8c881 --- /dev/null +++ b/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/api.py @@ -0,0 +1,130 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from threading import Lock +from typing import List, Optional +from uuid import UUID + +from .types import LLMInvocation, ToolInvocation +from .exporters import SpanMetricEventExporter, SpanMetricExporter +from .data import Message, ChatGeneration, Error, ToolOutput, ToolFunction + +from opentelemetry.instrumentation.langchain.version import __version__ +from opentelemetry.metrics import get_meter +from opentelemetry.trace import get_tracer +from opentelemetry._events import get_event_logger +from opentelemetry._logs import get_logger +from opentelemetry.semconv.schemas import Schemas + + +class TelemetryClient: + """ + High-level client managing GenAI invocation lifecycles and exporting + them as spans, metrics, and events. + """ + def __init__(self, exporter_type_full: bool = True, **kwargs): + tracer_provider = kwargs.get("tracer_provider") + self._tracer = get_tracer( + __name__, __version__, tracer_provider, schema_url=Schemas.V1_28_0.value + ) + + meter_provider = kwargs.get("meter_provider") + self._meter = get_meter( + __name__, __version__, meter_provider, schema_url=Schemas.V1_28_0.value + ) + + event_logger_provider = kwargs.get("event_logger_provider") + self._event_logger = get_event_logger( + __name__, __version__, event_logger_provider=event_logger_provider, schema_url=Schemas.V1_28_0.value + ) + + logger_provider = kwargs.get("logger_provider") + self._logger = get_logger( + __name__, __version__, logger_provider=logger_provider, schema_url=Schemas.V1_28_0.value + ) + + self._exporter = ( + SpanMetricEventExporter(tracer=self._tracer, meter=self._meter, event_logger=self._event_logger, logger=self._event_logger) + if exporter_type_full + else SpanMetricExporter(tracer=self._tracer, meter=self._meter) + ) + + self._llm_registry: dict[UUID, LLMInvocation] = {} + self._tool_registry: dict[UUID, ToolInvocation] = {} + self._lock = Lock() + + def start_llm(self, prompts: List[Message], tool_functions: List[ToolFunction], run_id: UUID, parent_run_id: Optional[UUID] = None, **attributes): + invocation = LLMInvocation(messages=prompts , tool_functions=tool_functions, run_id=run_id, parent_run_id=parent_run_id, attributes=attributes) + with self._lock: + self._llm_registry[invocation.run_id] = invocation + self._exporter.init_llm(invocation) + + def stop_llm(self, run_id: UUID, chat_generations: List[ChatGeneration], **attributes) -> LLMInvocation: + with self._lock: + invocation = self._llm_registry.pop(run_id) + invocation.end_time = time.time() + invocation.chat_generations = chat_generations + invocation.attributes.update(attributes) + self._exporter.export_llm(invocation) + return invocation + + def fail_llm(self, run_id: UUID, error: Error, **attributes) -> LLMInvocation: + with self._lock: + invocation = self._llm_registry.pop(run_id) + invocation.end_time = time.time() + invocation.attributes.update(**attributes) + self._exporter.error_llm(error, invocation) + return invocation + + def start_tool(self, input_str: str, run_id: UUID, parent_run_id: Optional[UUID] = None, **attributes): + invocation = ToolInvocation(input_str=input_str , run_id=run_id, parent_run_id=parent_run_id, attributes=attributes) + with self._lock: + self._tool_registry[invocation.run_id] = invocation + self._exporter.init_tool(invocation) + + def stop_tool(self, run_id: UUID, output: ToolOutput, **attributes) -> ToolInvocation: + with self._lock: + invocation = self._tool_registry.pop(run_id) + invocation.end_time = time.time() + invocation.output = output + self._exporter.export_tool(invocation) + return invocation + + def fail_tool(self, run_id: UUID, error: Error, **attributes) -> ToolInvocation: + with self._lock: + invocation = self._tool_registry.pop(run_id) + invocation.end_time = time.time() + invocation.attributes.update(**attributes) + self._exporter.error_tool(error, invocation) + return invocation + +# Singleton accessor +_default_client: TelemetryClient | None = None + +def get_telemetry_client(exporter_type_full: bool = True, **kwargs) -> TelemetryClient: + global _default_client + if _default_client is None: + _default_client = TelemetryClient(exporter_type_full=exporter_type_full, **kwargs) + return _default_client + +# Module‐level convenience functions +def llm_start(prompts: List[Message], run_id: UUID, parent_run_id: Optional[UUID] = None, **attributes): + return get_telemetry_client().start_llm(prompts=prompts, run_id=run_id, parent_run_id=parent_run_id, **attributes) + +def llm_stop(run_id: UUID, chat_generations: List[ChatGeneration], **attributes) -> LLMInvocation: + return get_telemetry_client().stop_llm(run_id=run_id, chat_generations=chat_generations, **attributes) + +def llm_fail(run_id: UUID, error: Error, **attributes) -> LLMInvocation: + return get_telemetry_client().fail_llm(run_id=run_id, error=error, **attributes) diff --git a/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/data.py b/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/data.py new file mode 100644 index 0000000000..00634bdab4 --- /dev/null +++ b/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/data.py @@ -0,0 +1,41 @@ +from dataclasses import dataclass, field +from typing import List + + +@dataclass +class ToolOutput: + tool_call_id: str + content: str + +@dataclass +class ToolFunction: + name: str + description: str + parameters: str + +@dataclass +class ToolFunctionCall: + id: str + name: str + arguments: str + type: str + +@dataclass +class Message: + content: str + type: str + name: str + tool_call_id: str + tool_function_calls: List[ToolFunctionCall] = field(default_factory=list) + +@dataclass +class ChatGeneration: + content: str + type: str + finish_reason: str = None + tool_function_calls: List[ToolFunctionCall] = field(default_factory=list) + +@dataclass +class Error: + message: str + type: type[BaseException] \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/deepeval.py b/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/deepeval.py new file mode 100644 index 0000000000..bcb147c777 --- /dev/null +++ b/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/deepeval.py @@ -0,0 +1,13 @@ +from deepeval.models import DeepEvalBaseLLM +from deepeval.test_case import LLMTestCase +from deepeval.metrics import AnswerRelevancyMetric + + +def evaluate_answer_relevancy_metric(prompt:str, output:str, retrieval_context:list) -> AnswerRelevancyMetric: + test_case = LLMTestCase(input=prompt, + actual_output=output, + retrieval_context=retrieval_context,) + relevancy_metric = AnswerRelevancyMetric(threshold=0.5) + relevancy_metric.measure(test_case) + print(relevancy_metric.score, relevancy_metric.reason) + return relevancy_metric \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/evals.py b/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/evals.py new file mode 100644 index 0000000000..c9e64bcdbd --- /dev/null +++ b/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/evals.py @@ -0,0 +1,134 @@ +from abc import ABC, abstractmethod +from opentelemetry._events import Event + +from .types import LLMInvocation +from opentelemetry import trace +from opentelemetry.trace import ( + Tracer, +) +from opentelemetry import _events +from .deepeval import evaluate_answer_relevancy_metric +from opentelemetry.trace import SpanContext, Span +from opentelemetry.trace.span import NonRecordingSpan + + +class EvaluationResult: + """ + Standardized result for any GenAI evaluation. + """ + def __init__(self, score: float, details: dict = None): + self.score = score + self.details = details or {} + + +class Evaluator(ABC): + """ + Abstract base: any evaluation backend must implement. + """ + @abstractmethod + def evaluate(self, invocation: LLMInvocation) -> EvaluationResult: + """ + Evaluate a completed LLMInvocation and return a result. + """ + pass + +class DeepEvalEvaluator(Evaluator): + """ + Uses DeepEvals library for LLM-as-judge evaluations. + """ + def __init__(self, event_logger, tracer: Tracer = None, config: dict = None): + # e.g. load models, setup API keys + self.config = config or {} + self._tracer = tracer or trace.get_tracer(__name__) + self._event_logger = event_logger or _events.get_event_logger(__name__) + + def evaluate(self, invocation: LLMInvocation): + # stub: integrate with deepevals SDK + # result = deepevals.judge(invocation.prompt, invocation.response, **self.config) + human_message = next((msg for msg in invocation.messages if msg.type == "human"), None) + content = invocation.chat_generations[0].content + if content is not None and content != "": + eval_arm = evaluate_answer_relevancy_metric(human_message.content, invocation.chat_generations[0].content, []) + self._do_telemetry(invocation.messages[1].content, invocation.chat_generations[0].content, + invocation.span_id, invocation.trace_id, eval_arm) + + def _do_telemetry(self, query, output, parent_span_id, parent_trace_id, eval_arm): + + # emit event + body = { + "content": f"query: {query} output: {output}", + } + attributes = { + "gen_ai.evaluation.name": "relevance", + "gen_ai.evaluation.score": eval_arm.score, + "gen_ai.evaluation.reasoning": eval_arm.reason, + "gen_ai.evaluation.cost": eval_arm.evaluation_cost, + } + + event = Event( + name="gen_ai.evaluation.message", + attributes=attributes, + body=body if body else None, + span_id=parent_span_id, + trace_id=parent_trace_id, + ) + self._event_logger.emit(event) + + # create span + span_context = SpanContext( + trace_id=parent_trace_id, + span_id=parent_span_id, + is_remote=False, + ) + + span = NonRecordingSpan( + context=span_context, + ) + + tracer = trace.get_tracer(__name__) + + with tracer.start_as_current_span("evaluation relevance") as span: + # do evaluation + + span.add_link(span_context, attributes={ + "gen_ai.operation.name": "evaluation", + }) + span.set_attribute("gen_ai.operation.name", "evaluation") + span.set_attribute("gen_ai.evaluation.name", "relevance") + span.set_attribute("gen_ai.evaluation.score", eval_arm.score) + span.set_attribute("gen_ai.evaluation.label", "Pass") + span.set_attribute("gen_ai.evaluation.reasoning", eval_arm.reason) + span.set_attribute("gen_ai.evaluation.model", eval_arm.evaluation_model) + span.set_attribute("gen_ai.evaluation.cost", eval_arm.evaluation_cost) + #span.set_attribute("gen_ai.evaluation.verdict", eval_arm.verdicts) + + +class OpenLitEvaluator(Evaluator): + """ + Uses OpenLit or similar OSS evaluation library. + """ + def __init__(self, config: dict = None): + self.config = config or {} + + def evaluate(self, invocation: LLMInvocation) -> EvaluationResult: + # stub: integrate with openlit SDK + score = 0.0 # placeholder + details = {"method": "openlit"} + return EvaluationResult(score=score, details=details) + + +# Registry for easy lookup +EVALUATORS = { + "deepeval": DeepEvalEvaluator, + "openlit": OpenLitEvaluator, +} + + +def get_evaluator(name: str, event_logger = None, tracer: Tracer = None, config: dict = None) -> Evaluator: + """ + Factory: return an evaluator by name. + """ + cls = EVALUATORS.get(name.lower()) + if not cls: + raise ValueError(f"Unknown evaluator: {name}") + return cls(event_logger, tracer, config) \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/exporters.py b/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/exporters.py new file mode 100644 index 0000000000..eecca4b82f --- /dev/null +++ b/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/exporters.py @@ -0,0 +1,1040 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict, List, Optional +from dataclasses import dataclass, field +from uuid import UUID + +from opentelemetry.context import Context, get_current +from opentelemetry import trace +from opentelemetry.metrics import Meter +from opentelemetry.trace import ( + Span, + SpanKind, + Tracer, + set_span_in_context, + use_span, +) +from opentelemetry._events import Event +from opentelemetry._logs import LogRecord +from opentelemetry.semconv._incubating.attributes import gen_ai_attributes as GenAI +from opentelemetry.semconv.attributes import error_attributes as ErrorAttributes +from opentelemetry.trace.status import Status, StatusCode + +from .instruments import Instruments +from .types import LLMInvocation, ToolInvocation +from .data import Error, ToolFunctionCall + + +@dataclass +class _SpanState: + span: Span + context: Context + start_time: float + children: List[UUID] = field(default_factory=list) + +def _get_property_value(obj, property_name)-> object: + if isinstance(obj, dict): + return obj.get(property_name, None) + + return getattr(obj, property_name, None) + +def _message_to_event(message, tool_functions, provider_name, framework)-> Optional[Event]: + content = _get_property_value(message, "content") + # check if content is not None and should_collect_content() + type = _get_property_value(message, "type") + body = {} + if type == "tool": + name = message.name + tool_call_id = message.tool_call_id + body.update([ + ("content", content), + ("name", name), + ("tool_call_id", tool_call_id)] + ) + elif type == "ai": + tool_function_calls = [ + {"id": tfc.id, "name": tfc.name, "arguments": tfc.arguments, "type": getattr(tfc, "type", None)} for tfc in + message.tool_function_calls] if message.tool_function_calls else [] + tool_function_calls_str = str(tool_function_calls) if tool_function_calls else "" + body.update({ + "content": content if content else "", + "tool_calls": tool_function_calls_str + }) + # changes for bedrock start + elif type == "human" or type == "system": + body.update([ + ("content", content) + ]) + + attributes = { + # TODO: add below to opentelemetry.semconv._incubating.attributes.gen_ai_attributes + "gen_ai.framework": framework, + "gen_ai.provider.name": provider_name, + } + + # tools generation during first invocation of llm start -- + if tool_functions is not None: + for index, tool_function in enumerate(tool_functions): + attributes.update([ + (f"gen_ai.request.function.{index}.name", tool_function.name), + (f"gen_ai.request.function.{index}.description", tool_function.description), + (f"gen_ai.request.function.{index}.parameters", tool_function.parameters), + ]) + # tools generation during first invocation of llm end -- + + return Event( + name=f"gen_ai.{type}.message", + attributes=attributes, + body=body or None, + ) + +def _message_to_log_record(message, tool_functions, provider_name, framework)-> Optional[LogRecord]: + content = _get_property_value(message, "content") + # check if content is not None and should_collect_content() + type = _get_property_value(message, "type") + body = {} + if type == "tool": + name = message.name + tool_call_id = message.tool_call_id + body.update([ + ("content", content), + ("name", name), + ("tool_call_id", tool_call_id)] + ) + elif type == "ai": + tool_function_calls = [ + {"id": tfc.id, "name": tfc.name, "arguments": tfc.arguments, "type": getattr(tfc, "type", None)} for tfc in + message.tool_function_calls] if message.tool_function_calls else [] + tool_function_calls_str = str(tool_function_calls) if tool_function_calls else "" + body.update({ + "content": content if content else "", + "tool_calls": tool_function_calls_str + }) + # changes for bedrock start + elif type == "human" or type == "system": + body.update([ + ("content", content) + ]) + + attributes = { + # TODO: add below to opentelemetry.semconv._incubating.attributes.gen_ai_attributes + "gen_ai.framework": framework, + "gen_ai.provider.name": provider_name, + } + + # tools generation during first invocation of llm start -- + if tool_functions is not None: + for index, tool_function in enumerate(tool_functions): + attributes.update([ + (f"gen_ai.request.function.{index}.name", tool_function.name), + (f"gen_ai.request.function.{index}.description", tool_function.description), + (f"gen_ai.request.function.{index}.parameters", tool_function.parameters), + ]) + # tools generation during first invocation of llm end -- + + return LogRecord( + event_name=f"gen_ai.{type}.message", + attributes=attributes, + body=body or None, + ) + +def _chat_generation_to_event(chat_generation, index, prefix, provider_name, framework)-> Optional[Event]: + if chat_generation: + attributes = { + # TODO: add below to opentelemetry.semconv._incubating.attributes.gen_ai_attributes + "gen_ai.framework": framework, + "gen_ai.provider.name": provider_name, + } + + message = { + "content": chat_generation.content, + "type": chat_generation.type, + } + body = { + "index": index, + "finish_reason": chat_generation.finish_reason or "error", + "message": message, + } + + # tools generation during first invocation of llm start -- + tool_function_calls = chat_generation.tool_function_calls + if tool_function_calls is not None: + attributes.update( + chat_generation_tool_function_calls_attributes(tool_function_calls, prefix) + ) + # tools generation during first invocation of llm end -- + + return Event( + name="gen_ai.choice", + attributes=attributes, + body=body or None, + ) + +def _chat_generation_to_log_record(chat_generation, index, prefix, provider_name, framework)-> Optional[LogRecord]: + if chat_generation: + attributes = { + # TODO: add below to opentelemetry.semconv._incubating.attributes.gen_ai_attributes + "gen_ai.framework": framework, + "gen_ai.provider.name": provider_name, + } + + message = { + "content": chat_generation.content, + "type": chat_generation.type, + } + body = { + "index": index, + "finish_reason": chat_generation.finish_reason or "error", + "message": message, + } + + # tools generation during first invocation of llm start -- + tool_function_calls = chat_generation.tool_function_calls + if tool_function_calls is not None: + attributes.update( + chat_generation_tool_function_calls_attributes(tool_function_calls, prefix) + ) + # tools generation during first invocation of llm end -- + + return LogRecord( + event_name="gen_ai.choice", + attributes=attributes, + body=body or None, + ) + +def _input_to_event(input): + # TODO: add check should_collect_content() + if input is not None: + body = { + "content" : input, + "role": "tool", + } + attributes = { + "gen_ai.framework": "langchain", + } + + return Event( + name="gen_ai.tool.message", + attributes=attributes, + body=body if body else None, + ) + +def _input_to_log_record(input): + # TODO: add check should_collect_content() + if input is not None: + body = { + "content" : input, + "role": "tool", + } + attributes = { + "gen_ai.framework": "langchain", + } + + return LogRecord( + event_name="gen_ai.tool.message", + attributes=attributes, + body=body if body else None, + ) + +def _output_to_event(output): + if output is not None: + body = { + "content":output.content, + "id":output.tool_call_id, + "role":"tool", + } + attributes = { + "gen_ai.framework": "langchain", + } + + return Event( + name="gen_ai.tool.message", + attributes=attributes, + body=body if body else None, + ) + +def _output_to_log_record(output): + if output is not None: + body = { + "content":output.content, + "id":output.tool_call_id, + "role":"tool", + } + attributes = { + "gen_ai.framework": "langchain", + } + + return LogRecord( + event_name="gen_ai.tool.message", + attributes=attributes, + body=body if body else None, + ) + +def _get_metric_attributes_llm(request_model: Optional[str], response_model: Optional[str], + operation_name: Optional[str], provider_name: Optional[str], framework: Optional[str])-> Dict: + attributes = { + # TODO: add below to opentelemetry.semconv._incubating.attributes.gen_ai_attributes + "gen_ai.framework": framework, + } + if provider_name: + attributes["gen_ai.provider.name"] = provider_name + if operation_name: + attributes[GenAI.GEN_AI_OPERATION_NAME] = operation_name + if request_model: + attributes[GenAI.GEN_AI_REQUEST_MODEL] = request_model + if response_model: + attributes[GenAI.GEN_AI_RESPONSE_MODEL] = response_model + + return attributes + + +def chat_generation_tool_function_calls_attributes(tool_function_calls, prefix): + attributes = {} + for idx, tool_function_call in enumerate(tool_function_calls): + tool_call_prefix = f"{prefix}.tool_calls.{idx}" + attributes[f"{tool_call_prefix}.id"] = tool_function_call.id + attributes[f"{tool_call_prefix}.name"] = tool_function_call.name + attributes[f"{tool_call_prefix}.arguments"] = tool_function_call.arguments + return attributes + +class BaseExporter: + """ + Abstract base for exporters mapping GenAI types -> OpenTelemetry. + """ + + def init_llm(self, invocation: LLMInvocation): + raise NotImplementedError + + def init_tool(self, invocation: ToolInvocation): + raise NotImplementedError + + def export_llm(self, invocation: LLMInvocation): + raise NotImplementedError + + def export_tool(self, invocation: ToolInvocation): + raise NotImplementedError + + def error_llm(self, error: Error, invocation: LLMInvocation): + raise NotImplementedError + + def error_tool(self, error: Error, invocation: ToolInvocation): + raise NotImplementedError + +class SpanMetricEventExporter(BaseExporter): + """ + Emits spans, metrics and events for a full telemetry picture. + """ + def __init__(self, event_logger, logger, tracer: Tracer = None, meter: Meter = None): + self._tracer = tracer or trace.get_tracer(__name__) + instruments = Instruments(meter) + self._duration_histogram = instruments.operation_duration_histogram + self._token_histogram = instruments.token_usage_histogram + self._event_logger = event_logger + self._logger = logger + + # Map from run_id -> _SpanState, to keep track of spans and parent/child relationships + self.spans: Dict[UUID, _SpanState] = {} + + def _start_span( + self, + name: str, + kind: SpanKind, + parent_run_id: Optional[UUID] = None, + ) -> Span: + if parent_run_id is not None and parent_run_id in self.spans: + parent_span = self.spans[parent_run_id].span + ctx = set_span_in_context(parent_span) + span = self._tracer.start_span(name=name, kind=kind, context=ctx) + else: + # top-level or missing parent + span = self._tracer.start_span(name=name, kind=kind) + + return span + + def _end_span(self, run_id: UUID): + state = self.spans[run_id] + for child_id in state.children: + child_state = self.spans.get(child_id) + if child_state and child_state.span._end_time is None: + child_state.span.end() + if state.span._end_time is None: + state.span.end() + + def init_llm(self, invocation: LLMInvocation): + if invocation.parent_run_id is not None and invocation.parent_run_id in self.spans: + self.spans[invocation.parent_run_id].children.append(invocation.run_id) + + def export_llm(self, invocation: LLMInvocation): + request_model = invocation.attributes.get("request_model") + span = self._start_span( + name=f"{GenAI.GenAiOperationNameValues.CHAT.value} {request_model}", + kind=SpanKind.CLIENT, + parent_run_id=invocation.parent_run_id, + ) + + with use_span( + span, + end_on_exit=False, + ) as span: + for message in invocation.messages: + provider_name = invocation.attributes.get("provider_name") + # TODO: remove deprecated event logging and its initialization and use below logger instead + self._event_logger.emit(_message_to_event(message=message, tool_functions=invocation.tool_functions, + provider_name=provider_name, + framework=invocation.attributes.get("framework"))) + # TODO: logger is not emitting event name, fix it + self._logger.emit(_message_to_log_record(message=message, tool_functions=invocation.tool_functions, + provider_name=provider_name, + framework=invocation.attributes.get("framework"))) + + span_state = _SpanState(span=span, context=get_current(), start_time=invocation.start_time, ) + self.spans[invocation.run_id] = span_state + + provider_name = "" + attributes = invocation.attributes + if attributes: + top_p = attributes.get("request_top_p") + if top_p: + span.set_attribute(GenAI.GEN_AI_REQUEST_TOP_P, top_p) + frequency_penalty = attributes.get("request_frequency_penalty") + if frequency_penalty: + span.set_attribute( + GenAI.GEN_AI_REQUEST_FREQUENCY_PENALTY, frequency_penalty + ) + presence_penalty = attributes.get("request_presence_penalty") + if presence_penalty: + span.set_attribute( + GenAI.GEN_AI_REQUEST_PRESENCE_PENALTY, presence_penalty + ) + stop_sequences = attributes.get("request_stop_sequences") + if stop_sequences: + span.set_attribute( + GenAI.GEN_AI_REQUEST_STOP_SEQUENCES, stop_sequences + ) + seed = attributes.get("request_seed") + if seed: + span.set_attribute(GenAI.GEN_AI_REQUEST_SEED, seed) + max_tokens = attributes.get("request_max_tokens") + if max_tokens: + span.set_attribute(GenAI.GEN_AI_REQUEST_MAX_TOKENS, max_tokens) + provider_name = attributes.get("provider_name") + if provider_name: + # TODO: add to semantic conventions + span.set_attribute("gen_ai.provider.name", provider_name) + temperature = attributes.get("request_temperature") + if temperature: + span.set_attribute( + GenAI.GEN_AI_REQUEST_TEMPERATURE, temperature + ) + span.set_attribute(GenAI.GEN_AI_OPERATION_NAME, GenAI.GenAiOperationNameValues.CHAT.value) + if request_model: + span.set_attribute(GenAI.GEN_AI_REQUEST_MODEL, request_model) + + # TODO: add below to opentelemetry.semconv._incubating.attributes.gen_ai_attributes + framework = invocation.attributes.get("framework") + if framework: + span.set_attribute("gen_ai.framework", framework) + + # tools function during 1st and 2nd llm invocation request attributes start -- + if invocation.tool_functions is not None: + for index, tool_function in enumerate(invocation.tool_functions): + span.set_attribute(f"gen_ai.request.function.{index}.name", tool_function.name) + span.set_attribute(f"gen_ai.request.function.{index}.description", tool_function.description) + span.set_attribute(f"gen_ai.request.function.{index}.parameters", tool_function.parameters) + # tools request attributes end -- + + # span.set_attribute(GenAI.GEN_AI_SYSTEM, system) + + # Add response details as span attributes + tool_calls_attributes = {} + for index, chat_generation in enumerate(invocation.chat_generations): + # tools generation during first invocation of llm start -- + prefix = f"{GenAI.GEN_AI_COMPLETION}.{index}" + tool_function_calls = chat_generation.tool_function_calls + if tool_function_calls is not None: + tool_calls_attributes.update( + chat_generation_tool_function_calls_attributes(tool_function_calls, prefix) + ) + # tools attributes end -- + + # TODO: remove deprecated event logging and its initialization and use below logger instead + self._event_logger.emit(_chat_generation_to_event(chat_generation, index, prefix, provider_name, framework)) + # TODO: logger is not emitting event name, fix it + self._logger.emit(_chat_generation_to_log_record(chat_generation, index, prefix, provider_name, framework)) + span.set_attribute(f"{GenAI.GEN_AI_RESPONSE_FINISH_REASONS}.{index}", chat_generation.finish_reason) + + # TODO: decide if we want to show this as span attributes + # span.set_attributes(tool_calls_attributes) + + response_model = attributes.get("response_model_name") + if response_model: + span.set_attribute(GenAI.GEN_AI_RESPONSE_MODEL, response_model) + + response_id = attributes.get("response_id") + if response_id: + span.set_attribute(GenAI.GEN_AI_RESPONSE_ID, response_id) + + # usage + prompt_tokens = attributes.get("input_tokens") + if prompt_tokens: + span.set_attribute(GenAI.GEN_AI_USAGE_INPUT_TOKENS, prompt_tokens) + + completion_tokens = attributes.get("output_tokens") + if completion_tokens: + span.set_attribute(GenAI.GEN_AI_USAGE_OUTPUT_TOKENS, completion_tokens) + + metric_attributes = _get_metric_attributes_llm(request_model, response_model, + GenAI.GenAiOperationNameValues.CHAT.value, provider_name, framework) + + # Record token usage metrics + prompt_tokens_attributes = {GenAI.GEN_AI_TOKEN_TYPE: GenAI.GenAiTokenTypeValues.INPUT.value} + prompt_tokens_attributes.update(metric_attributes) + self._token_histogram.record(prompt_tokens, attributes=prompt_tokens_attributes) + + completion_tokens_attributes = {GenAI.GEN_AI_TOKEN_TYPE: GenAI.GenAiTokenTypeValues.COMPLETION.value} + completion_tokens_attributes.update(metric_attributes) + self._token_histogram.record(completion_tokens, attributes=completion_tokens_attributes) + + # End the LLM span + self._end_span(invocation.run_id) + invocation.span_id = span_state.span.get_span_context().span_id + invocation.trace_id = span_state.span.get_span_context().trace_id + + # Record overall duration metric + elapsed = invocation.end_time - invocation.start_time + self._duration_histogram.record(elapsed, attributes=metric_attributes) + + def error_llm(self, error: Error, invocation: LLMInvocation): + request_model = invocation.attributes.get("request_model") + span = self._start_span( + name=f"{GenAI.GenAiOperationNameValues.CHAT.value} {request_model}", + kind=SpanKind.CLIENT, + parent_run_id=invocation.parent_run_id, + ) + + with use_span( + span, + end_on_exit=False, + ) as span: + span_state = _SpanState(span=span, context=get_current(), start_time=invocation.start_time, ) + self.spans[invocation.run_id] = span_state + + provider_name = "" + attributes = invocation.attributes + if attributes: + top_p = attributes.get("request_top_p") + if top_p: + span.set_attribute(GenAI.GEN_AI_REQUEST_TOP_P, top_p) + frequency_penalty = attributes.get("request_frequency_penalty") + if frequency_penalty: + span.set_attribute( + GenAI.GEN_AI_REQUEST_FREQUENCY_PENALTY, frequency_penalty + ) + presence_penalty = attributes.get("request_presence_penalty") + if presence_penalty: + span.set_attribute( + GenAI.GEN_AI_REQUEST_PRESENCE_PENALTY, presence_penalty + ) + stop_sequences = attributes.get("request_stop_sequences") + if stop_sequences: + span.set_attribute( + GenAI.GEN_AI_REQUEST_STOP_SEQUENCES, stop_sequences + ) + seed = attributes.get("request_seed") + if seed: + span.set_attribute(GenAI.GEN_AI_REQUEST_SEED, seed) + max_tokens = attributes.get("request_max_tokens") + if max_tokens: + span.set_attribute(GenAI.GEN_AI_REQUEST_MAX_TOKENS, max_tokens) + provider_name = attributes.get("provider_name") + if provider_name: + # TODO: add to semantic conventions + span.set_attribute("gen_ai.provider.name", provider_name) + temperature = attributes.get("request_temperature") + if temperature: + span.set_attribute( + GenAI.GEN_AI_REQUEST_TEMPERATURE, temperature + ) + span.set_attribute(GenAI.GEN_AI_OPERATION_NAME, GenAI.GenAiOperationNameValues.CHAT.value) + if request_model: + span.set_attribute(GenAI.GEN_AI_REQUEST_MODEL, request_model) + + # TODO: add below to opentelemetry.semconv._incubating.attributes.gen_ai_attributes + framework = attributes.get("framework") + if framework: + span.set_attribute("gen_ai.framework", framework) + + span.set_status(Status(StatusCode.ERROR, error.message)) + if span.is_recording(): + span.set_attribute( + ErrorAttributes.ERROR_TYPE, error.type.__qualname__ + ) + + self._end_span(invocation.run_id) + + framework = attributes.get("framework") + + metric_attributes = _get_metric_attributes_llm(request_model, "", + GenAI.GenAiOperationNameValues.CHAT.value, provider_name, framework) + + # Record overall duration metric + elapsed = invocation.end_time - invocation.start_time + self._duration_histogram.record(elapsed, attributes=metric_attributes) + + def init_tool(self, invocation: ToolInvocation): + if invocation.parent_run_id is not None and invocation.parent_run_id in self.spans: + self.spans[invocation.parent_run_id].children.append(invocation.run_id) + + def export_tool(self, invocation: ToolInvocation): + attributes = invocation.attributes + tool_name = attributes.get("tool_name") + span = self._start_span( + name=f"{GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value} {tool_name}", + kind=SpanKind.INTERNAL, + parent_run_id=invocation.parent_run_id, + ) + with use_span( + span, + end_on_exit=False, + ) as span: + # TODO: remove deprecated event logging and its initialization and use below logger instead + self._event_logger.emit(_input_to_event(invocation.input_str)) + # TODO: logger is not emitting event name, fix it + self._logger.emit(_input_to_log_record(invocation.input_str)) + + span_state = _SpanState(span=span, context=get_current(), start_time=invocation.start_time) + self.spans[invocation.run_id] = span_state + + description = attributes.get("description") + span.set_attribute("gen_ai.tool.description", description) + span.set_attribute(GenAI.GEN_AI_TOOL_NAME, tool_name) + span.set_attribute(GenAI.GEN_AI_OPERATION_NAME, GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value) + + # TODO: if should_collect_content(): + span.set_attribute(GenAI.GEN_AI_TOOL_CALL_ID, invocation.output.tool_call_id) + # TODO: remove deprecated event logging and its initialization and use below logger instead + self._event_logger.emit(_output_to_event(invocation.output)) + # TODO: logger is not emitting event name, fix it + self._logger.emit(_output_to_log_record(invocation.output)) + + self._end_span(invocation.run_id) + + # Record overall duration metric + elapsed = invocation.end_time - invocation.start_time + metric_attributes = { + GenAI.GEN_AI_OPERATION_NAME: GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value + } + self._duration_histogram.record(elapsed, attributes=metric_attributes) + + def error_tool(self, error: Error, invocation: ToolInvocation): + tool_name = invocation.attributes.get("tool_name") + span = self._start_span( + name=f"{GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value} {tool_name}", + kind=SpanKind.INTERNAL, + parent_run_id=invocation.parent_run_id, + ) + with use_span( + span, + end_on_exit=False, + ) as span: + description = invocation.attributes.get("description") + span.set_attribute("gen_ai.tool.description", description) + span.set_attribute(GenAI.GEN_AI_TOOL_NAME, tool_name) + span.set_attribute(GenAI.GEN_AI_OPERATION_NAME, GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value) + + span_state = _SpanState(span=span, span_context=get_current(), start_time=invocation.start_time, system=tool_name) + self.spans[invocation.run_id] = span_state + + span.set_status(Status(StatusCode.ERROR, error.message)) + if span.is_recording(): + span.set_attribute( + ErrorAttributes.ERROR_TYPE, error.type.__qualname__ + ) + + self._end_span(invocation.run_id) + + # Record overall duration metric + elapsed = invocation.end_time - invocation.start_time + metric_attributes = { + GenAI.GEN_AI_SYSTEM: tool_name, + GenAI.GEN_AI_OPERATION_NAME: GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value + } + self._duration_histogram.record(elapsed, attributes=metric_attributes) + +class SpanMetricExporter(BaseExporter): + """ + Emits only spans and metrics (no events). + """ + def __init__(self, tracer: Tracer = None, meter: Meter = None): + self._tracer = tracer or trace.get_tracer(__name__) + instruments = Instruments(meter) + self._duration_histogram = instruments.operation_duration_histogram + self._token_histogram = instruments.token_usage_histogram + + # Map from run_id -> _SpanState, to keep track of spans and parent/child relationships + self.spans: Dict[UUID, _SpanState] = {} + + def _start_span( + self, + name: str, + kind: SpanKind, + parent_run_id: Optional[UUID] = None, + ) -> Span: + if parent_run_id is not None and parent_run_id in self.spans: + parent_span = self.spans[parent_run_id].span + ctx = set_span_in_context(parent_span) + span = self._tracer.start_span(name=name, kind=kind, context=ctx) + else: + # top-level or missing parent + span = self._tracer.start_span(name=name, kind=kind) + + return span + + def _end_span(self, run_id: UUID): + state = self.spans[run_id] + for child_id in state.children: + child_state = self.spans.get(child_id) + if child_state and child_state.span._end_time is None: + child_state.span.end() + if state.span._end_time is None: + state.span.end() + + def init_llm(self, invocation: LLMInvocation): + if invocation.parent_run_id is not None and invocation.parent_run_id in self.spans: + self.spans[invocation.parent_run_id].children.append(invocation.run_id) + + def export_llm(self, invocation: LLMInvocation): + request_model = invocation.attributes.get("request_model") + span = self._start_span( + name=f"{GenAI.GenAiOperationNameValues.CHAT.value} {request_model}", + kind=SpanKind.CLIENT, + parent_run_id=invocation.parent_run_id, + ) + + with use_span( + span, + end_on_exit=False, + ) as span: + span_state = _SpanState(span=span, context=get_current(), start_time=invocation.start_time) + self.spans[invocation.run_id] = span_state + + provider_name = "" + attributes = invocation.attributes + if attributes : + top_p = attributes.get("request_top_p") + if top_p: + span.set_attribute(GenAI.GEN_AI_REQUEST_TOP_P, top_p) + frequency_penalty = attributes.get("request_frequency_penalty") + if frequency_penalty: + span.set_attribute( + GenAI.GEN_AI_REQUEST_FREQUENCY_PENALTY, frequency_penalty + ) + presence_penalty = attributes.get("request_presence_penalty") + if presence_penalty: + span.set_attribute( + GenAI.GEN_AI_REQUEST_PRESENCE_PENALTY, presence_penalty + ) + stop_sequences = attributes.get("request_stop_sequences") + if stop_sequences: + span.set_attribute( + GenAI.GEN_AI_REQUEST_STOP_SEQUENCES, stop_sequences + ) + seed = attributes.get("request_seed") + if seed: + span.set_attribute(GenAI.GEN_AI_REQUEST_SEED, seed) + max_tokens = attributes.get("request_max_tokens") + if max_tokens: + span.set_attribute(GenAI.GEN_AI_REQUEST_MAX_TOKENS, max_tokens) + provider_name = attributes.get("provider_name") + if provider_name: + # TODO: add to semantic conventions + span.set_attribute("gen_ai.provider.name", provider_name) + temperature = attributes.get("request_temperature") + if temperature: + span.set_attribute( + GenAI.GEN_AI_REQUEST_TEMPERATURE, temperature + ) + span.set_attribute(GenAI.GEN_AI_OPERATION_NAME, GenAI.GenAiOperationNameValues.CHAT.value) + if request_model: + span.set_attribute(GenAI.GEN_AI_REQUEST_MODEL, request_model) + + # TODO: add below to opentelemetry.semconv._incubating.attributes.gen_ai_attributes + framework = invocation.attributes.get("framework") + if framework: + span.set_attribute("gen_ai.framework", framework) + # span.set_attribute(GenAI.GEN_AI_SYSTEM, system) + + # tools function during 1st and 2nd llm invocation request attributes start -- + if invocation.tool_functions is not None: + for index, tool_function in enumerate(invocation.tool_functions): + span.set_attribute(f"gen_ai.request.function.{index}.name", tool_function.name) + span.set_attribute(f"gen_ai.request.function.{index}.description", tool_function.description) + span.set_attribute(f"gen_ai.request.function.{index}.parameters", tool_function.parameters) + # tools request attributes end -- + + # tools support for 2nd llm invocation request attributes start -- + messages = invocation.messages if invocation.messages else None + for index, message in enumerate(messages): + content = message.content + type = message.type + tool_call_id = message.tool_call_id + # TODO: if should_collect_content(): + if type == "human" or type == "system": + span.set_attribute(f"gen_ai.prompt.{index}.content", content) + span.set_attribute(f"gen_ai.prompt.{index}.role", "human") + elif type == "tool": + span.set_attribute(f"gen_ai.prompt.{index}.content", content) + span.set_attribute(f"gen_ai.prompt.{index}.role", "tool") + span.set_attribute(f"gen_ai.prompt.{index}.tool_call_id", tool_call_id) + elif type == "ai": + tool_function_calls = message.tool_function_calls + if tool_function_calls is not None: + for index3, tool_function_call in enumerate(tool_function_calls): + span.set_attribute(f"gen_ai.prompt.{index}.tool_calls.{index3}.id", tool_function_call.id) + span.set_attribute(f"gen_ai.prompt.{index}.tool_calls.{index3}.arguments", tool_function_call.arguments) + span.set_attribute(f"gen_ai.prompt.{index}.tool_calls.{index3}.name", tool_function_call.name) + + # tools request attributes end -- + + # Add response details as span attributes + tool_calls_attributes = {} + for index, chat_generation in enumerate(invocation.chat_generations): + # tools attributes start -- + prefix = f"{GenAI.GEN_AI_COMPLETION}.{index}" + tool_function_calls = chat_generation.tool_function_calls + if tool_function_calls is not None: + tool_calls_attributes.update( + chat_generation_tool_function_calls_attributes(tool_function_calls, prefix) + ) + # tools attributes end -- + span.set_attribute(f"{GenAI.GEN_AI_RESPONSE_FINISH_REASONS} {index}", chat_generation.finish_reason) + + span.set_attributes(tool_calls_attributes) + + response_model = attributes.get("response_model_name") + if response_model: + span.set_attribute(GenAI.GEN_AI_RESPONSE_MODEL, response_model) + + response_id = attributes.get("response_id") + if response_id: + span.set_attribute(GenAI.GEN_AI_RESPONSE_ID, response_id) + + # usage + prompt_tokens = attributes.get("input_tokens") + if prompt_tokens: + span.set_attribute(GenAI.GEN_AI_USAGE_INPUT_TOKENS, prompt_tokens) + + completion_tokens = attributes.get("output_tokens") + if completion_tokens: + span.set_attribute(GenAI.GEN_AI_USAGE_OUTPUT_TOKENS, completion_tokens) + + # Add output content as span + for index, chat_generation in enumerate(invocation.chat_generations): + span.set_attribute(f"gen_ai.completion.{index}.content", chat_generation.content) + span.set_attribute(f"gen_ai.completion.{index}.role", chat_generation.type) + + metric_attributes = _get_metric_attributes_llm(request_model, response_model, + GenAI.GenAiOperationNameValues.CHAT.value, provider_name, framework,) + + # Record token usage metrics + prompt_tokens_attributes = {GenAI.GEN_AI_TOKEN_TYPE: GenAI.GenAiTokenTypeValues.INPUT.value} + prompt_tokens_attributes.update(metric_attributes) + self._token_histogram.record(prompt_tokens, attributes=prompt_tokens_attributes) + + completion_tokens_attributes = {GenAI.GEN_AI_TOKEN_TYPE: GenAI.GenAiTokenTypeValues.COMPLETION.value} + completion_tokens_attributes.update(metric_attributes) + self._token_histogram.record(completion_tokens, attributes=completion_tokens_attributes) + + # End the LLM span + self._end_span(invocation.run_id) + invocation.span_id = span_state.span.get_span_context().span_id + invocation.trace_id =span_state.span.get_span_context().trace_id + + # Record overall duration metric + elapsed = invocation.end_time - invocation.start_time + self._duration_histogram.record(elapsed, attributes=metric_attributes) + + + def error_llm(self, error: Error, invocation: LLMInvocation): + request_model = invocation.attributes.get("request_model") + span = self._start_span( + name=f"{GenAI.GenAiOperationNameValues.CHAT.value} {request_model}", + kind=SpanKind.CLIENT, + parent_run_id=invocation.parent_run_id, + ) + + with use_span( + span, + end_on_exit=False, + ) as span: + span_state = _SpanState(span=span, context=get_current(), start_time=invocation.start_time, ) + self.spans[invocation.run_id] = span_state + + provider_name = "" + attributes = invocation.attributes + if attributes: + top_p = attributes.get("request_top_p") + if top_p: + span.set_attribute(GenAI.GEN_AI_REQUEST_TOP_P, top_p) + frequency_penalty = attributes.get("request_frequency_penalty") + if frequency_penalty: + span.set_attribute( + GenAI.GEN_AI_REQUEST_FREQUENCY_PENALTY, frequency_penalty + ) + presence_penalty = attributes.get("request_presence_penalty") + if presence_penalty: + span.set_attribute( + GenAI.GEN_AI_REQUEST_PRESENCE_PENALTY, presence_penalty + ) + stop_sequences = attributes.get("request_stop_sequences") + if stop_sequences: + span.set_attribute( + GenAI.GEN_AI_REQUEST_STOP_SEQUENCES, stop_sequences + ) + seed = attributes.get("request_seed") + if seed: + span.set_attribute(GenAI.GEN_AI_REQUEST_SEED, seed) + max_tokens = attributes.get("request_max_tokens") + if max_tokens: + span.set_attribute(GenAI.GEN_AI_REQUEST_MAX_TOKENS, max_tokens) + provider_name = attributes.get("provider_name") + if provider_name: + # TODO: add to semantic conventions + span.set_attribute("gen_ai.provider.name", provider_name) + temperature = attributes.get("request_temperature") + if temperature: + span.set_attribute( + GenAI.GEN_AI_REQUEST_TEMPERATURE, temperature + ) + span.set_attribute(GenAI.GEN_AI_OPERATION_NAME, GenAI.GenAiOperationNameValues.CHAT.value) + if request_model: + span.set_attribute(GenAI.GEN_AI_REQUEST_MODEL, request_model) + + # TODO: add below to opentelemetry.semconv._incubating.attributes.gen_ai_attributes + framework = attributes.get("framework") + if framework: + span.set_attribute("gen_ai.framework", framework) + + # tools support for 2nd llm invocation request attributes start -- + messages = invocation.messages if invocation.messages else None + for index, message in enumerate(messages): + content = message.content + type = message.type + tool_call_id = message.tool_call_id + # TODO: if should_collect_content(): + if type == "human" or type == "system": + span.set_attribute(f"gen_ai.prompt.{index}.content", content) + span.set_attribute(f"gen_ai.prompt.{index}.role", "human") + elif type == "tool": + span.set_attribute(f"gen_ai.prompt.{index}.content", content) + span.set_attribute(f"gen_ai.prompt.{index}.role", "tool") + span.set_attribute(f"gen_ai.prompt.{index}.tool_call_id", tool_call_id) + elif type == "ai": + tool_function_calls = message.tool_function_calls + if tool_function_calls is not None: + for index3, tool_function_call in enumerate(tool_function_calls): + span.set_attribute(f"gen_ai.prompt.{index}.tool_calls.{index3}.id", tool_function_call.id) + span.set_attribute(f"gen_ai.prompt.{index}.tool_calls.{index3}.arguments", tool_function_call.arguments) + span.set_attribute(f"gen_ai.prompt.{index}.tool_calls.{index3}.name", tool_function_call.name) + + span.set_status(Status(StatusCode.ERROR, error.message)) + if span.is_recording(): + span.set_attribute( + ErrorAttributes.ERROR_TYPE, error.type.__qualname__ + ) + + self._end_span(invocation.run_id) + + framework = attributes.get("framework") + + metric_attributes = _get_metric_attributes_llm(request_model, "", + GenAI.GenAiOperationNameValues.CHAT.value, provider_name, framework) + + # Record overall duration metric + elapsed = invocation.end_time - invocation.start_time + self._duration_histogram.record(elapsed, attributes=metric_attributes) + + def init_tool(self, invocation: ToolInvocation): + if invocation.parent_run_id is not None and invocation.parent_run_id in self.spans: + self.spans[invocation.parent_run_id].children.append(invocation.run_id) + + def export_tool(self, invocation: ToolInvocation): + attributes = invocation.attributes + tool_name = attributes.get("tool_name") + span = self._start_span( + name=f"{GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value} {tool_name}", + kind=SpanKind.INTERNAL, + parent_run_id=invocation.parent_run_id, + ) + with use_span( + span, + end_on_exit=False, + ) as span: + span_state = _SpanState(span=span, context=get_current(), start_time=invocation.start_time) + self.spans[invocation.run_id] = span_state + + description = attributes.get("description") + span.set_attribute("gen_ai.tool.description", description) + span.set_attribute(GenAI.GEN_AI_TOOL_NAME, tool_name) + span.set_attribute(GenAI.GEN_AI_OPERATION_NAME, GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value) + + # TODO: if should_collect_content(): + span.set_attribute(GenAI.GEN_AI_TOOL_CALL_ID, invocation.output.tool_call_id) + # TODO: if should_collect_content(): + span.set_attribute("gen_ai.tool.output.content", invocation.output.content) + + self._end_span(invocation.run_id) + + # Record overall duration metric + elapsed = invocation.end_time - invocation.start_time + metric_attributes = { + GenAI.GEN_AI_OPERATION_NAME: GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value + } + self._duration_histogram.record(elapsed, attributes=metric_attributes) + + def error_tool(self, error: Error, invocation: ToolInvocation): + attributes = invocation.attributes + tool_name = attributes.get("tool_name") + span = self._start_span( + name=f"{GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value} {tool_name}", + kind=SpanKind.INTERNAL, + parent_run_id=invocation.parent_run_id, + ) + with use_span( + span, + end_on_exit=False, + ) as span: + span_state = _SpanState(span=span, context=get_current(), start_time=invocation.start_time) + self.spans[invocation.run_id] = span_state + + description = attributes.get("description") + span.set_attribute("gen_ai.tool.description", description) + span.set_attribute(GenAI.GEN_AI_TOOL_NAME, tool_name) + span.set_attribute(GenAI.GEN_AI_OPERATION_NAME, GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value) + + span.set_status(Status(StatusCode.ERROR, error.message)) + if span.is_recording(): + span.set_attribute( + ErrorAttributes.ERROR_TYPE, error.type.__qualname__ + ) + + self._end_span(invocation.run_id) + + # Record overall duration metric + elapsed = invocation.end_time - invocation.start_time + metric_attributes = { + GenAI.GEN_AI_OPERATION_NAME: GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value + } + self._duration_histogram.record(elapsed, attributes=metric_attributes) diff --git a/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/instruments.py b/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/instruments.py new file mode 100644 index 0000000000..cbe0a3fb21 --- /dev/null +++ b/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/instruments.py @@ -0,0 +1,54 @@ +from opentelemetry.metrics import Histogram, Meter +from opentelemetry.semconv._incubating.metrics import gen_ai_metrics + +# TODO: should this be in sdk or passed to the telemetry client? +_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [ + 0.01, + 0.02, + 0.04, + 0.08, + 0.16, + 0.32, + 0.64, + 1.28, + 2.56, + 5.12, + 10.24, + 20.48, + 40.96, + 81.92, +] + +# TODO: should this be in sdk or passed to the telemetry client? +_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS = [ + 1, + 4, + 16, + 64, + 256, + 1024, + 4096, + 16384, + 65536, + 262144, + 1048576, + 4194304, + 16777216, + 67108864, +] + + +class Instruments: + def __init__(self, meter: Meter): + self.operation_duration_histogram: Histogram = meter.create_histogram( + name=gen_ai_metrics.GEN_AI_CLIENT_OPERATION_DURATION, + description="GenAI operation duration", + unit="s", + explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS, + ) + self.token_usage_histogram: Histogram = meter.create_histogram( + name=gen_ai_metrics.GEN_AI_CLIENT_TOKEN_USAGE, + description="Measures number of input and output tokens used", + unit="{token}", + explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS, + ) diff --git a/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/types.py b/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/types.py new file mode 100644 index 0000000000..bea95ed333 --- /dev/null +++ b/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/types.py @@ -0,0 +1,49 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass, field +from typing import List, Optional +from uuid import UUID +import time + +from opentelemetry.genai.sdk.data import Message, ChatGeneration, ToolOutput, ToolFunction, ToolFunctionCall + +@dataclass +class LLMInvocation: + """ + Represents a single LLM call invocation. + """ + run_id: UUID + parent_run_id: Optional[UUID] = None + start_time: float = field(default_factory=time.time) + end_time: float = None + messages: List[Message] = field(default_factory=list) + chat_generations: List[ChatGeneration] = field(default_factory=list) + tool_functions: List[ToolFunction] = field(default_factory=list) + attributes: dict = field(default_factory=dict) + span_id: int = 0 + trace_id: int = 0 + +@dataclass +class ToolInvocation: + """ + Represents a single Tool call invocation. + """ + run_id: UUID + output: ToolOutput = None + parent_run_id: Optional[UUID] = None + start_time: float = field(default_factory=time.time) + end_time: float = None + input_str: Optional[str] = None + attributes: dict = field(default_factory=dict) \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/version.py b/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/version.py new file mode 100644 index 0000000000..b3c06d4883 --- /dev/null +++ b/instrumentation-genai/opentelemetry-genai-sdk/src/opentelemetry/genai/sdk/version.py @@ -0,0 +1 @@ +__version__ = "0.0.1" \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-genai-sdk/tests/pytest.ini b/instrumentation-genai/opentelemetry-genai-sdk/tests/pytest.ini new file mode 100644 index 0000000000..2c909c8d89 --- /dev/null +++ b/instrumentation-genai/opentelemetry-genai-sdk/tests/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +python_paths = ../src diff --git a/instrumentation-genai/opentelemetry-genai-sdk/tests/test_sdk.py b/instrumentation-genai/opentelemetry-genai-sdk/tests/test_sdk.py new file mode 100644 index 0000000000..ad7e77aee3 --- /dev/null +++ b/instrumentation-genai/opentelemetry-genai-sdk/tests/test_sdk.py @@ -0,0 +1,65 @@ +import pytest +from opentelemetry.genai.sdk.api import ( + llm_start, llm_stop, llm_fail, + tool_start, tool_stop, tool_fail, +) +from opentelemetry.genai.sdk.evals import get_evaluator, EvaluationResult +from opentelemetry.genai.sdk.exporters import SpanMetricEventExporter, SpanMetricExporter + +@pytest.fixture +def sample_llm_invocation(): + run_id = llm_start("test-model", "hello world", custom_attr="value") + invocation = llm_stop(run_id, response="hello back", extra="info") + return invocation + +@pytest.fixture +def sample_tool_invocation(): + run_id = tool_start("test-tool", {"input": 123}, flag=True) + invocation = tool_stop(run_id, output={"output": "ok"}, status="done") + return invocation + +def test_llm_start_and_stop(sample_llm_invocation): + inv = sample_llm_invocation + assert inv.model_name == "test-model" + assert inv.prompt == "hello world" + assert inv.response == "hello back" + assert inv.attributes.get("custom_attr") == "value" + assert inv.attributes.get("extra") == "info" + assert inv.end_time >= inv.start_time + +def test_tool_start_and_stop(sample_tool_invocation): + inv = sample_tool_invocation + assert inv.tool_name == "test-tool" + assert inv.input == {"input": 123} + assert inv.output == {"output": "ok"} + assert inv.attributes.get("flag") is True + assert inv.attributes.get("status") == "done" + assert inv.end_time >= inv.start_time + +@pytest.mark.parametrize("name,method", [ + ("deepevals", "deepevals"), + ("openlit", "openlit"), +]) +def test_evaluator_factory(name, method, sample_llm_invocation): + evaluator = get_evaluator(name) + result = evaluator.evaluate(sample_llm_invocation) + assert isinstance(result, EvaluationResult) + assert result.details.get("method") == method + +def test_exporters_no_error(sample_llm_invocation): + event_exporter = SpanMetricEventExporter() + metric_exporter = SpanMetricExporter() + event_exporter.export(sample_llm_invocation) + metric_exporter.export(sample_llm_invocation) + +def test_llm_fail(): + run_id = llm_start("fail-model", "prompt") + inv = llm_fail(run_id, error="something went wrong") + assert inv.attributes.get("error") == "something went wrong" + assert inv.end_time is not None + +def test_tool_fail(): + run_id = tool_start("fail-tool", {"x": 1}) + inv = tool_fail(run_id, error="tool error") + assert inv.attributes.get("error") == "tool error" + assert inv.end_time is not None diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/manual/.env b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/manual/.env new file mode 100644 index 0000000000..e7046c72cf --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/manual/.env @@ -0,0 +1,11 @@ +# Update this with your real OpenAI API key +OPENAI_API_KEY=sk-YOUR_API_KEY + +# Uncomment and change to your OTLP endpoint +# OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +# OTEL_EXPORTER_OTLP_PROTOCOL=grpc + +# Change to 'false' to hide prompt and completion content +OTEL_INSTRUMENTATION_LANGCHAIN_CAPTURE_MESSAGE_CONTENT=true + +OTEL_SERVICE_NAME=opentelemetry-python-langchain-manual \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/manual/README.rst b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/manual/README.rst new file mode 100644 index 0000000000..b8a463cbe4 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/manual/README.rst @@ -0,0 +1,47 @@ +OpenTelemetry LangChain Instrumentation Example +============================================== + +This is an example of how to instrument LangChain calls when configuring +OpenTelemetry SDK and Instrumentations manually. + +When :code:`main.py ` is run, it exports traces, metrics (and optionally logs) +to an OTLP-compatible endpoint. Traces include details such as the span name and other attributes. +Exports metrics like input and output token usage and durations for each operation. + +Environment variables: + +- ``OTEL_INSTRUMENTATION_LANGCHAIN_CAPTURE_MESSAGE_CONTENT=true`` can be used + to capture full prompt/response content. + +Setup +----- + +1. **Update** the :code:`.env <.env>` file with any environment variables you + need (e.g., your OpenAI key, or :code:`OTEL_EXPORTER_OTLP_ENDPOINT` if not + using the default http://localhost:4317). +2. Set up a virtual environment: + + .. code-block:: console + + python3 -m venv .venv + source .venv/bin/activate + pip install "python-dotenv[cli]" + pip install -r requirements.txt + +3. **(Optional)** Install a development version of the new instrumentation: + + .. code-block:: console + + # E.g., from a local path or a git repo + pip install -e /path/to/opentelemetry-python-contrib/instrumentation-genai/opentelemetry-instrumentation-langchain +Run +--- + +Run the example like this: + +.. code-block:: console + + dotenv run -- python main.py + +You should see an example span output while traces are exported to your +configured observability tool. \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/manual/main.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/manual/main.py new file mode 100644 index 0000000000..521cec7012 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/manual/main.py @@ -0,0 +1,69 @@ +from langchain_core.messages import HumanMessage, SystemMessage +from langchain_openai import ChatOpenAI + +from opentelemetry.instrumentation.langchain import LangChainInstrumentor + +from opentelemetry import _events, _logs, trace, metrics +from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( + OTLPLogExporter, +) +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, +) +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter + +from opentelemetry.sdk._events import EventLoggerProvider +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + +# configure tracing +trace.set_tracer_provider(TracerProvider()) +trace.get_tracer_provider().add_span_processor( + BatchSpanProcessor(OTLPSpanExporter()) +) + +metric_reader = PeriodicExportingMetricReader(OTLPMetricExporter()) +metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader])) + +# configure logging and events +_logs.set_logger_provider(LoggerProvider()) +_logs.get_logger_provider().add_log_record_processor( + BatchLogRecordProcessor(OTLPLogExporter()) +) +_events.set_event_logger_provider(EventLoggerProvider()) + +def main(): + + # Set up instrumentation + LangChainInstrumentor().instrument() + + # ChatOpenAI + llm = ChatOpenAI( + model="gpt-3.5-turbo", + temperature=0.1, + max_tokens=100, + top_p=0.9, + frequency_penalty=0.5, + presence_penalty=0.5, + stop_sequences=["\n", "Human:", "AI:"], + seed=100, + ) + + messages = [ + SystemMessage(content="You are a helpful assistant!"), + HumanMessage(content="What is the capital of France?"), + ] + + result = llm.invoke(messages) + + print("LLM output:\n", result) + + # Un-instrument after use + LangChainInstrumentor().uninstrument() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/manual/requirements.txt b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/manual/requirements.txt new file mode 100644 index 0000000000..a7360d050c --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/manual/requirements.txt @@ -0,0 +1,15 @@ +langchain==0.3.21 #todo: find the lowest compatible version +langchain_openai + +# Pin exact versions to ensure compatibility +opentelemetry-api==1.36.0 +opentelemetry-sdk==1.36.0 +opentelemetry-exporter-otlp-proto-grpc==1.36.0 +opentelemetry-semantic-conventions==0.57b0 +# Add these dependencies explicitly +opentelemetry-proto==1.36.0 + +python-dotenv[cli] +deepeval + +# For local development: `pip install -e /path/to/opentelemetry-instrumentation-langchain` \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/tools/.env b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/tools/.env new file mode 100644 index 0000000000..992f2de193 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/tools/.env @@ -0,0 +1,11 @@ +# Update this with your real OpenAI API key +OPENAI_API_KEY=sk-YOUR_API_KEY + +# Uncomment and change to your OTLP endpoint +# OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +# OTEL_EXPORTER_OTLP_PROTOCOL=grpc + +# Change to 'false' to hide prompt and completion content +OTEL_INSTRUMENTATION_LANGCHAIN_CAPTURE_MESSAGE_CONTENT=true + +OTEL_SERVICE_NAME=opentelemetry-python-langchain-tools \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/tools/README.rst b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/tools/README.rst new file mode 100644 index 0000000000..a5a7c7f8c8 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/tools/README.rst @@ -0,0 +1,47 @@ +OpenTelemetry LangChain Instrumentation Example +============================================== + +This is an example of how to instrument LangChain calls when configuring +OpenTelemetry SDK and Instrumentations manually. + +When :code:`main.py ` is run, it exports traces (and optionally logs) +to an OTLP-compatible endpoint. Traces include details such as the chain name, +LLM usage, token usage, and durations for each operation. + +Environment variables: + +- ``OTEL_INSTRUMENTATION_LANGCHAIN_CAPTURE_MESSAGE_CONTENT=true`` can be used + to capture full prompt/response content. + +Setup +----- + +1. **Update** the :code:`.env <.env>` file with any environment variables you + need (e.g., your OpenAI key, or :code:`OTEL_EXPORTER_OTLP_ENDPOINT` if not + using the default http://localhost:4317). +2. Set up a virtual environment: + + .. code-block:: console + + python3 -m venv .venv + source .venv/bin/activate + pip install "python-dotenv[cli]" + pip install -r requirements.txt + +3. **(Optional)** Install a development version of the new instrumentation: + + .. code-block:: console + + # E.g., from a local path or a git repo + pip install -e /path/to/opentelemetry-python-contrib/instrumentation-genai/opentelemetry-instrumentation-langchain +Run +--- + +Run the example like this: + +.. code-block:: console + + dotenv run -- python main.py + +You should see an example chain output while traces are exported to your +configured observability tool. \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/tools/main.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/tools/main.py new file mode 100644 index 0000000000..48901ca550 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/tools/main.py @@ -0,0 +1,125 @@ +from langchain_core.messages import HumanMessage +from langchain_openai import ChatOpenAI + +from opentelemetry.instrumentation.langchain import LangChainInstrumentor +from langchain_core.tools import tool +from flask import Flask, request, jsonify +import logging +from opentelemetry.instrumentation.flask import FlaskInstrumentor + +# todo: start a server span here +from opentelemetry import _events, _logs, trace, metrics +from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( + OTLPLogExporter, +) +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, +) +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter + +from opentelemetry.sdk._events import EventLoggerProvider +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + +# configure tracing +trace.set_tracer_provider(TracerProvider()) +trace.get_tracer_provider().add_span_processor( + BatchSpanProcessor(OTLPSpanExporter()) +) + +metric_reader = PeriodicExportingMetricReader(OTLPMetricExporter()) +metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader])) + +# configure logging and events +_logs.set_logger_provider(LoggerProvider()) +_logs.get_logger_provider().add_log_record_processor( + BatchLogRecordProcessor(OTLPLogExporter()) +) +_events.set_event_logger_provider(EventLoggerProvider()) + +# Set up logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Set up instrumentation +LangChainInstrumentor().instrument() + +@tool +def add(a: int, b: int) -> int: + """Add two integers. + + Args: + a: First integer + b: Second integer + """ + return a + b + +@tool +def multiply(a: int, b: int) -> int: + """Multiply two integers. + + Args: + a: First integer + b: Second integer + """ + return a * b + + +# ----------------------------------------------------------------------------- +# Flask app +# ----------------------------------------------------------------------------- +app = Flask(__name__) +FlaskInstrumentor().instrument_app(app) + +@app.post("/tools_add_multiply") +def tools(): + + """POST form-url-encoded or JSON with message (and optional session_id).""" + payload = request.get_json(silent=True) or request.form # allow either + query = payload.get("message") + if not query: + logger.error("Missing 'message' field in request") + return jsonify({"error": "Missing 'message' field."}), 400 + + try: + llm = ChatOpenAI( + model="gpt-3.5-turbo", + temperature=0.1, + max_tokens=100, + top_p=0.9, + frequency_penalty=0.5, + presence_penalty=0.5, + stop_sequences=["\n", "Human:", "AI:"], + seed=100, + ) + tools = [add, multiply] + llm_with_tools = llm.bind_tools(tools) + + messages = [HumanMessage(query)] + ai_msg = llm_with_tools.invoke(messages) + print("LLM output:\n", ai_msg) + messages.append(ai_msg) + + for tool_call in ai_msg.tool_calls: + selected_tool = {"add": add, "multiply": multiply}[tool_call["name"].lower()] + if selected_tool is not None: + tool_msg = selected_tool.invoke(tool_call) + messages.append(tool_msg) + print("messages:\n", messages) + + result = llm_with_tools.invoke(messages) + print("LLM output:\n", result) + logger.info(f"LLM response: {result.content}") + + return result.content + except Exception as e: + logger.error(f"Error processing chat request: {e}") + return jsonify({"error": "Internal server error"}), 500 + +if __name__ == "__main__": + # When run directly: python app.py + app.run(host="0.0.0.0", port=5001) \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/tools/requirements.txt b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/tools/requirements.txt new file mode 100644 index 0000000000..e7ab681e23 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/tools/requirements.txt @@ -0,0 +1,17 @@ +flask +waitress +langchain==0.3.21 #todo: find the lowest compatible version +langchain_openai + +opentelemetry-api==1.36.0 +opentelemetry-sdk~=1.36.0 +opentelemetry-exporter-otlp-proto-grpc~=1.36.0 +opentelemetry-semantic-conventions==0.57b0 +opentelemetry-proto==1.36.0 +opentelemetry-instrumentation-flask +# traceloop-sdk~=0.43.0 +python-dotenv[cli] +deepeval + +# For local developmen: `pip install -e /path/to/opentelemetry-instrumentation-langchain` + diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/zero-code/.env b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/zero-code/.env new file mode 100644 index 0000000000..10c4a26692 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/zero-code/.env @@ -0,0 +1,11 @@ +# Update this with your real OpenAI API key +OPENAI_API_KEY=sk-YOUR_API_KEY + +# Uncomment and change to your OTLP endpoint +# OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +# OTEL_EXPORTER_OTLP_PROTOCOL=grpc + +# Change to 'false' to hide prompt and completion content +OTEL_INSTRUMENTATION_LANGCHAIN_CAPTURE_MESSAGE_CONTENT=true + +OTEL_SERVICE_NAME=opentelemetry-python-langchain-zero-code \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/zero-code/README.rst b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/zero-code/README.rst new file mode 100644 index 0000000000..696a197158 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/zero-code/README.rst @@ -0,0 +1,47 @@ +OpenTelemetry LangChain Instrumentation Example +============================================== + +This is an example of how to instrument LangChain calls when configuring +OpenTelemetry SDK and Instrumentations manually. + +When :code:`main.py ` is run, it exports traces (and optionally logs) +to an OTLP-compatible endpoint. Traces include details such as the chain name, +LLM usage, token usage, and durations for each operation. + +Environment variables: + +- ``OTEL_INSTRUMENTATION_LANGCHAIN_CAPTURE_MESSAGE_CONTENT=true`` can be used + to capture full prompt/response content. + +Setup +----- + +1. **Update** the :code:`.env <.env>` file with any environment variables you + need (e.g., your OpenAI key, or :code:`OTEL_EXPORTER_OTLP_ENDPOINT` if not + using the default http://localhost:4317). +2. Set up a virtual environment: + + .. code-block:: console + + python3 -m venv .venv + source .venv/bin/activate + pip install "python-dotenv[cli]" + pip install -r requirements.txt + +3. **(Optional)** Install a development version of the new instrumentation: + + .. code-block:: console + + # E.g., from a local path or a git repo + pip install -e /path/to/opentelemetry-python-contrib/instrumentation-genai/opentelemetry-instrumentation-langchain +Run +--- + +Run the example like this: + +.. code-block:: console + + dotenv run -- opentelemetry-instrument python main.py + +You should see an example chain output while traces are exported to your +configured observability tool. \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/zero-code/main.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/zero-code/main.py new file mode 100644 index 0000000000..c46fc6c635 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/zero-code/main.py @@ -0,0 +1,17 @@ +from langchain_core.messages import HumanMessage, SystemMessage +from langchain_openai import ChatOpenAI + +def main(): + + llm = ChatOpenAI(model="gpt-3.5-turbo") + + messages = [ + SystemMessage(content="You are a helpful assistant!"), + HumanMessage(content="What is the capital of France?"), + ] + + result = llm.invoke(messages).content + print("LLM output:\n", result) + +if __name__ == "__main__": + main() diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/zero-code/requirements.txt b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/zero-code/requirements.txt new file mode 100644 index 0000000000..afdb3960fa --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/zero-code/requirements.txt @@ -0,0 +1,11 @@ +langchain==0.3.21 #todo: find the lowest compatible version +langchain_openai + +opentelemetry-sdk~=1.36.0 +opentelemetry-exporter-otlp-proto-grpc~=1.36.0 +opentelemetry-distro~=0.57b0 + +python-dotenv[cli] + +# For local developmen: `pip install -e /path/to/opentelemetry-instrumentation-langchain` + diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/pyproject.toml b/instrumentation-genai/opentelemetry-instrumentation-langchain/pyproject.toml index 55e24185f2..32a9462267 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/pyproject.toml +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/pyproject.toml @@ -25,9 +25,9 @@ classifiers = [ "Programming Language :: Python :: 3.13", ] dependencies = [ - "opentelemetry-api ~= 1.30", - "opentelemetry-instrumentation ~= 0.51b0", - "opentelemetry-semantic-conventions ~= 0.51b0" + "opentelemetry-api ~= 1.36.0", + "opentelemetry-instrumentation ~= 0.57b0", + "opentelemetry-semantic-conventions ~= 0.57b0" ] [project.optional-dependencies] diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/__init__.py index e69de29bb2..9ac9d43cab 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/__init__.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/__init__.py @@ -0,0 +1,140 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Langchain instrumentation supporting `ChatOpenAI`, it can be enabled by +using ``LangChainInstrumentor``. + +.. _langchain: https://pypi.org/project/langchain/ + +Usage +----- + +.. code:: python + + from opentelemetry.instrumentation.langchain import LangChainInstrumentor + from langchain_core.messages import HumanMessage, SystemMessage + from langchain_openai import ChatOpenAI + + LangChainInstrumentor().instrument() + + llm = ChatOpenAI(model="gpt-3.5-turbo") + messages = [ + SystemMessage(content="You are a helpful assistant!"), + HumanMessage(content="What is the capital of France?"), + ] + + result = llm.invoke(messages) + +API +--- +""" + +from typing import Collection + +from wrapt import wrap_function_wrapper + +from opentelemetry.instrumentation.langchain.config import Config +from opentelemetry.instrumentation.langchain.version import __version__ +from opentelemetry.instrumentation.langchain.package import _instruments +from opentelemetry.instrumentation.langchain.callback_handler import ( + OpenTelemetryLangChainCallbackHandler, +) +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.utils import unwrap + + +from opentelemetry.genai.sdk.api import get_telemetry_client +from opentelemetry.genai.sdk.api import TelemetryClient +from .utils import ( + should_emit_events, + get_evaluation_framework_name, +) +from opentelemetry.genai.sdk.evals import ( + get_evaluator, +) + +class LangChainInstrumentor(BaseInstrumentor): + """ + OpenTelemetry instrumentor for LangChain. + + This adds a custom callback handler to the LangChain callback manager + to capture chain, LLM, and tool events. It also wraps the internal + OpenAI invocation points (BaseChatOpenAI) to inject W3C trace headers + for downstream calls to OpenAI (or other providers). + """ + + def __init__(self, exception_logger=None, disable_trace_injection: bool = False): + """ + :param disable_trace_injection: If True, do not wrap OpenAI invocation + for trace-context injection. + """ + super().__init__() + self._disable_trace_injection = disable_trace_injection + Config.exception_logger = exception_logger + + self._telemetry: TelemetryClient | None = None + + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments + + def _instrument(self, **kwargs): + exporter_type_full = should_emit_events() + + # Instantiate a singleton TelemetryClient bound to our tracer & meter + self._telemetry = get_telemetry_client(exporter_type_full, **kwargs) + + # initialize evaluation framework if needed + evaluation_framework_name = get_evaluation_framework_name() + # TODO: add check for OTEL_INSTRUMENTATION_GENAI_EVALUATION_ENABLE + self._evaluation = get_evaluator(evaluation_framework_name) + + otel_callback_handler = OpenTelemetryLangChainCallbackHandler( + telemetry_client=self._telemetry, + evaluation_client=self._evaluation, + ) + + wrap_function_wrapper( + module="langchain_core.callbacks", + name="BaseCallbackManager.__init__", + wrapper=_BaseCallbackManagerInitWrapper(otel_callback_handler), + ) + + def _uninstrument(self, **kwargs): + """ + Cleanup instrumentation (unwrap). + """ + unwrap("langchain_core.callbacks.base", "BaseCallbackManager.__init__") + if not self._disable_trace_injection: + unwrap("langchain_openai.chat_models.base", "BaseChatOpenAI._generate") + unwrap("langchain_openai.chat_models.base", "BaseChatOpenAI._agenerate") + + +class _BaseCallbackManagerInitWrapper: + """ + Wrap the BaseCallbackManager __init__ to insert + custom callback handler in the manager's handlers list. + """ + + def __init__(self, callback_handler): + self._otel_handler = callback_handler + + def __call__(self, wrapped, instance, args, kwargs): + wrapped(*args, **kwargs) + # Ensure our OTel callback is present if not already. + for handler in instance.inheritable_handlers: + if isinstance(handler, type(self._otel_handler)): + break + else: + instance.add_handler(self._otel_handler, inherit=True) \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py new file mode 100644 index 0000000000..d99feccd96 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py @@ -0,0 +1,294 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import List, Optional, Union, Any, Dict +from uuid import UUID + +from langchain_core.callbacks import BaseCallbackHandler +from langchain_core.messages import BaseMessage +from langchain_core.outputs import LLMResult + +from opentelemetry.instrumentation.langchain.config import Config +from opentelemetry.instrumentation.langchain.utils import dont_throw +from .utils import get_property_value +from opentelemetry.genai.sdk.data import ( + Message, + ChatGeneration, + Error, + ToolOutput, ToolFunction, ToolFunctionCall +) +from .utils import should_enable_evaluation +from opentelemetry.genai.sdk.api import TelemetryClient +from opentelemetry.genai.sdk.evals import Evaluator +from opentelemetry.genai.sdk.types import LLMInvocation + +logger = logging.getLogger(__name__) + + +class OpenTelemetryLangChainCallbackHandler(BaseCallbackHandler): + """ + A callback handler for LangChain that uses OpenTelemetry to create spans + for chains, LLM calls, and tools. + """ + + def __init__( + self, + telemetry_client: TelemetryClient, + evaluation_client: Evaluator, + ) -> None: + super().__init__() + self._telemetry_client = telemetry_client + self._evaluation_client = evaluation_client + + @dont_throw + def on_chat_model_start( + self, + serialized: dict, + messages: List[List[BaseMessage]], + *, + run_id: UUID, + tags: Optional[List[str]] = None, + parent_run_id: Optional[UUID] = None, + metadata: Optional[Dict[str, Any]] = None, + **kwargs, + ): + if Config.is_instrumentation_suppressed(): + return + + system = serialized.get("name", kwargs.get("name", "ChatLLM")) + invocation_params = kwargs.get("invocation_params", {}) + + attributes = { + "system": system, + # TODO: add below to opentelemetry.semconv._incubating.attributes.gen_ai_attributes + "framework": "langchain", + } + + if invocation_params: + request_model = invocation_params.get("model_name") + if request_model: + attributes.update({"request_model": request_model}) + top_p = invocation_params.get("top_p") + if top_p: + attributes.update({"request_top_p": top_p}) + frequency_penalty = invocation_params.get("frequency_penalty") + if frequency_penalty: + attributes.update({"request_frequency_penalty": frequency_penalty}) + presence_penalty = invocation_params.get("presence_penalty") + if presence_penalty: + attributes.update({"request_presence_penalty": presence_penalty}) + stop_sequences = invocation_params.get("stop") + if stop_sequences: + attributes.update({"request_stop_sequences": stop_sequences}) + seed = invocation_params.get("seed") + if seed: + attributes.update({"request_seed": seed}) + + if metadata: + max_tokens = metadata.get("ls_max_tokens") + if max_tokens: + attributes.update({"request_max_tokens": max_tokens}) + provider_name = metadata.get("ls_provider") + if provider_name: + # TODO: add to semantic conventions + attributes.update({"provider_name": provider_name}) + temperature = metadata.get("ls_temperature") + if temperature: + attributes.update({"request_temperature": temperature}) + + # invoked during first invoke to llm with tool start -- + tool_functions: List[ToolFunction] = [] + tools = kwargs.get("invocation_params").get("tools") if kwargs.get("invocation_params") else None + if tools is not None: + for index, tool in enumerate(tools): + function = tool.get("function") + if function is not None: + tool_function = ToolFunction( + name=function.get("name"), + description=function.get("description"), + parameters=str(function.get("parameters")) + ) + tool_functions.append(tool_function) + # tool end -- + + + prompts: list[Message] = [] + for sub_messages in messages: + for message in sub_messages: + # llm invoked with all messages tool support start -- + additional_kwargs = get_property_value(message, "additional_kwargs") + tool_calls = get_property_value(additional_kwargs, "tool_calls") + tool_function_calls = [] + for tool_call in tool_calls or []: + tool_function_call = ToolFunctionCall( + id=tool_call.get("id"), + name=tool_call.get("function").get("name"), + arguments=str(tool_call.get("function").get("arguments")), + type=tool_call.get("type"), + ) + tool_function_calls.append(tool_function_call) + # tool support end -- + prompt = Message( + name=get_property_value(message, "name"), + content=get_property_value(message, "content"), + type=get_property_value(message, "type"), + tool_call_id=get_property_value(message, "tool_call_id"), + tool_function_calls=tool_function_calls, + ) + prompts.append(prompt) + + # Invoke genai-sdk api + self._telemetry_client.start_llm(prompts, tool_functions, run_id, parent_run_id, **attributes) + + @dont_throw + def on_llm_end( + self, + response: LLMResult, + *, + run_id: UUID, + parent_run_id: Union[UUID, None] = None, + **kwargs, + ): + if Config.is_instrumentation_suppressed(): + return + + chat_generations: list[ChatGeneration] = [] + tool_function_calls: list[ToolFunctionCall] = [] + for generation in getattr(response, "generations", []): + for chat_generation in generation: + # llm creates tool calls during first llm invoke tool support start -- + tool_calls = chat_generation.message.additional_kwargs.get("tool_calls") + for tool_call in tool_calls or []: + tool_function_call = ToolFunctionCall( + id=tool_call.get("id"), + name=tool_call.get("function").get("name"), + arguments=tool_call.get("function").get("arguments"), + type=tool_call.get("type"), + ) + tool_function_calls.append(tool_function_call) + # tool support end -- + if chat_generation.generation_info is not None: + finish_reason = chat_generation.generation_info.get("finish_reason") + content = get_property_value(chat_generation.message, "content") + chat = ChatGeneration( + content=content, + type=chat_generation.type, + finish_reason=finish_reason, + tool_function_calls=tool_function_calls, + ) + chat_generations.append(chat) + + response_model = response_id = None + llm_output = response.llm_output + if llm_output is not None: + response_model = llm_output.get("model_name") or llm_output.get("model") + response_id = llm_output.get("id") + + input_tokens = output_tokens = None + usage = response.llm_output.get("usage") or response.llm_output.get("token_usage") + if usage: + input_tokens = usage.get("prompt_tokens", 0) + output_tokens = usage.get("completion_tokens", 0) + + attributes = { + "response_model_name": response_model, + "response_id": response_id, + "input_tokens": input_tokens, + "output_tokens": output_tokens, + } + + # Invoke genai-sdk api + invocation: LLMInvocation = self._telemetry_client.stop_llm(run_id=run_id, chat_generations=chat_generations, **attributes) + + # generates evaluation child spans. + # pass only required attributes to evaluation client + if should_enable_evaluation(): + import asyncio + asyncio.create_task(self._evaluation_client.evaluate(invocation)) + # self._evaluation_client.evaluate(invocation) + + + @dont_throw + def on_tool_start( + self, + serialized: dict, + input_str: str, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + tags: Optional[list[str]] = None, + metadata: Optional[dict[str, Any]] = None, + **kwargs, + ): + if Config.is_instrumentation_suppressed(): + return + + tool_name = serialized.get("name") or kwargs.get("name") or "execute_tool" + attributes = { + "tool_name": tool_name, + "description": serialized.get("description"), + } + + # Invoke genai-sdk api + self._telemetry_client.start_tool(run_id=run_id, input_str=input_str, **attributes) + + @dont_throw + def on_tool_end( + self, + output: Any, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs, + ): + if Config.is_instrumentation_suppressed(): + return + + output = ToolOutput( + content=get_property_value(output, "content"), + tool_call_id=get_property_value(output, "tool_call_id"), + ) + # Invoke genai-sdk api + self._telemetry_client.stop_tool(run_id=run_id, output=output) + + @dont_throw + def on_llm_error( + self, + error: BaseException, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs, + ): + if Config.is_instrumentation_suppressed(): + return + + llm_error = Error(message=str(error), type=type(error)) + self._telemetry_client.fail_llm(run_id=run_id, error=llm_error, **kwargs) + + @dont_throw + def on_tool_error( + self, + error: BaseException, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs, + ): + if Config.is_instrumentation_suppressed(): + return + + tool_error = Error(message=str(error), type=type(error)) + self._telemetry_client.fail_tool(run_id=run_id, error=tool_error, **kwargs) \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/config.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/config.py new file mode 100644 index 0000000000..2e21ba43db --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/config.py @@ -0,0 +1,32 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +class Config: + """ + Shared static config for LangChain OTel instrumentation. + """ + + # Logger to handle exceptions during instrumentation + exception_logger = None + + # Globally suppress instrumentation + _suppress_instrumentation = False + + @classmethod + def suppress_instrumentation(cls, suppress: bool = True): + cls._suppress_instrumentation = suppress + + @classmethod + def is_instrumentation_suppressed(cls) -> bool: + return cls._suppress_instrumentation diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/utils.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/utils.py new file mode 100644 index 0000000000..d04fbb156e --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/utils.py @@ -0,0 +1,85 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import traceback + +logger = logging.getLogger(__name__) + +# By default, we do not record prompt or completion content. Set this +# environment variable to "true" to enable collection of message text. +OTEL_INSTRUMENTATION_LANGCHAIN_CAPTURE_MESSAGE_CONTENT = ( + "OTEL_INSTRUMENTATION_LANGCHAIN_CAPTURE_MESSAGE_CONTENT" +) + +OTEL_INSTRUMENTATION_GENAI_EXPORTER = ( + "OTEL_INSTRUMENTATION_GENAI_EXPORTER" +) + +OTEL_INSTRUMENTATION_GENAI_EVALUATION_FRAMEWORK = ( + "OTEL_INSTRUMENTATION_GENAI_EVALUATION_FRAMEWORK" +) + +OTEL_INSTRUMENTATION_GENAI_EVALUATION_ENABLE = ( + "OTEL_INSTRUMENTATION_GENAI_EVALUATION_ENABLE" +) + + +def should_collect_content() -> bool: + val = os.getenv(OTEL_INSTRUMENTATION_LANGCHAIN_CAPTURE_MESSAGE_CONTENT, "false") + return val.strip().lower() == "true" + +def should_emit_events() -> bool: + val = os.getenv(OTEL_INSTRUMENTATION_GENAI_EXPORTER, "SpanMetricEventExporter") + if val.strip().lower() == "spanmetriceventexporter": + return True + elif val.strip().lower() == "spanmetricexporter": + return False + else: + raise ValueError(f"Unknown exporter_type: {val}") + +def should_enable_evaluation() -> bool: + val = os.getenv(OTEL_INSTRUMENTATION_GENAI_EVALUATION_ENABLE, "True") + return val.strip().lower() == "true" + +def get_evaluation_framework_name() -> str: + val = os.getenv(OTEL_INSTRUMENTATION_GENAI_EVALUATION_FRAMEWORK, "Deepeval") + return val.strip().lower() + +def get_property_value(obj, property_name): + if isinstance(obj, dict): + return obj.get(property_name, None) + + return getattr(obj, property_name, None) + +def dont_throw(func): + """ + Decorator that catches and logs exceptions, rather than re-raising them, + to avoid interfering with user code if instrumentation fails. + """ + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + logger.debug( + "OpenTelemetry instrumentation for LangChain encountered an error in %s: %s", + func.__name__, + traceback.format_exc(), + ) + from opentelemetry.instrumentation.langchain.config import Config + if Config.exception_logger: + Config.exception_logger(e) + return None + return wrapper diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/.env.example b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/.env.example new file mode 100644 index 0000000000..c60337cb73 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/.env.example @@ -0,0 +1,11 @@ +# Update this with your real OpenAI API key +OPENAI_API_KEY= +APPKEY= +# Uncomment and change to your OTLP endpoint +OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +OTEL_EXPORTER_OTLP_PROTOCOL=grpc + +# Change to 'false' to hide prompt and completion content +OTEL_INSTRUMENTATION_LANGCHAIN_CAPTURE_MESSAGE_CONTENT=true + +OTEL_SERVICE_NAME=opentelemetry-python-langchain-manual \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/README.rst b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/README.rst new file mode 100644 index 0000000000..325c3d57b2 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/README.rst @@ -0,0 +1,3 @@ +Adding an .env file to set up the environment variables to run the tests. +The test is running by calling LLM APIs provided by Circuit. +There is an sample .env file in this directory. diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/conftest.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/conftest.py new file mode 100644 index 0000000000..d9569820aa --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/conftest.py @@ -0,0 +1,237 @@ +"""Unit tests configuration module.""" + +import json +import os + +import pytest +import yaml +# from openai import AsyncOpenAI, OpenAI +from langchain_openai import ChatOpenAI + +from opentelemetry.instrumentation.langchain import LangChainInstrumentor +from opentelemetry.instrumentation.langchain.utils import ( + OTEL_INSTRUMENTATION_LANGCHAIN_CAPTURE_MESSAGE_CONTENT, +) +from opentelemetry.sdk._events import EventLoggerProvider +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs.export import ( + InMemoryLogExporter, + SimpleLogRecordProcessor, +) +from opentelemetry.sdk.metrics import ( + MeterProvider, +) +from opentelemetry.sdk.metrics.export import ( + InMemoryMetricReader, +) +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) +from opentelemetry.sdk.trace.sampling import ALWAYS_OFF + + +@pytest.fixture(scope="function", name="span_exporter") +def fixture_span_exporter(): + exporter = InMemorySpanExporter() + yield exporter + + +@pytest.fixture(scope="function", name="log_exporter") +def fixture_log_exporter(): + exporter = InMemoryLogExporter() + yield exporter + + +@pytest.fixture(scope="function", name="metric_reader") +def fixture_metric_reader(): + exporter = InMemoryMetricReader() + yield exporter + + +@pytest.fixture(scope="function", name="tracer_provider") +def fixture_tracer_provider(span_exporter): + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + return provider + + +@pytest.fixture(scope="function", name="event_logger_provider") +def fixture_event_logger_provider(log_exporter): + provider = LoggerProvider() + provider.add_log_record_processor(SimpleLogRecordProcessor(log_exporter)) + event_logger_provider = EventLoggerProvider(provider) + + return event_logger_provider + + +@pytest.fixture(scope="function", name="meter_provider") +def fixture_meter_provider(metric_reader): + meter_provider = MeterProvider( + metric_readers=[metric_reader], + ) + + return meter_provider + + +@pytest.fixture(autouse=True) +def environment(): + if not os.getenv("OPENAI_API_KEY"): + os.environ["OPENAI_API_KEY"] = "test_openai_api_key" + + +@pytest.fixture +def chatOpenAI_client(): + return ChatOpenAI() + +@pytest.fixture(scope="module") +def vcr_config(): + return { + "filter_headers": [ + ("cookie", "test_cookie"), + ("authorization", "Bearer test_openai_api_key"), + ("openai-organization", "test_openai_org_id"), + ("openai-project", "test_openai_project_id"), + ], + "decode_compressed_response": True, + "before_record_response": scrub_response_headers, + } + + +@pytest.fixture(scope="function") +def instrument_no_content( + tracer_provider, event_logger_provider, meter_provider +): + os.environ.update( + {OTEL_INSTRUMENTATION_LANGCHAIN_CAPTURE_MESSAGE_CONTENT: "False"} + ) + + instrumentor = LangChainInstrumentor() + instrumentor.instrument( + tracer_provider=tracer_provider, + event_logger_provider=event_logger_provider, + meter_provider=meter_provider, + ) + + yield instrumentor + os.environ.pop(OTEL_INSTRUMENTATION_LANGCHAIN_CAPTURE_MESSAGE_CONTENT, None) + instrumentor.uninstrument() + + +@pytest.fixture(scope="function") +def instrument_with_content( + tracer_provider, event_logger_provider, meter_provider +): + os.environ.update( + {OTEL_INSTRUMENTATION_LANGCHAIN_CAPTURE_MESSAGE_CONTENT: "True"} + ) + instrumentor = LangChainInstrumentor() + instrumentor.instrument( + tracer_provider=tracer_provider, + event_logger_provider=event_logger_provider, + meter_provider=meter_provider, + ) + + yield instrumentor + os.environ.pop(OTEL_INSTRUMENTATION_LANGCHAIN_CAPTURE_MESSAGE_CONTENT, None) + instrumentor.uninstrument() + + +@pytest.fixture(scope="function") +def instrument_with_content_unsampled( + span_exporter, event_logger_provider, meter_provider +): + os.environ.update( + {OTEL_INSTRUMENTATION_LANGCHAIN_CAPTURE_MESSAGE_CONTENT: "True"} + ) + + tracer_provider = TracerProvider(sampler=ALWAYS_OFF) + tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + + instrumentor = LangChainInstrumentor() + instrumentor.instrument( + tracer_provider=tracer_provider, + event_logger_provider=event_logger_provider, + meter_provider=meter_provider, + ) + + yield instrumentor + os.environ.pop(OTEL_INSTRUMENTATION_LANGCHAIN_CAPTURE_MESSAGE_CONTENT, None) + instrumentor.uninstrument() + + +class LiteralBlockScalar(str): + """Formats the string as a literal block scalar, preserving whitespace and + without interpreting escape characters""" + + +def literal_block_scalar_presenter(dumper, data): + """Represents a scalar string as a literal block, via '|' syntax""" + return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|") + + +yaml.add_representer(LiteralBlockScalar, literal_block_scalar_presenter) + + +def process_string_value(string_value): + """Pretty-prints JSON or returns long strings as a LiteralBlockScalar""" + try: + json_data = json.loads(string_value) + return LiteralBlockScalar(json.dumps(json_data, indent=2)) + except (ValueError, TypeError): + if len(string_value) > 80: + return LiteralBlockScalar(string_value) + return string_value + + +def convert_body_to_literal(data): + """Searches the data for body strings, attempting to pretty-print JSON""" + if isinstance(data, dict): + for key, value in data.items(): + # Handle response body case (e.g., response.body.string) + if key == "body" and isinstance(value, dict) and "string" in value: + value["string"] = process_string_value(value["string"]) + + # Handle request body case (e.g., request.body) + elif key == "body" and isinstance(value, str): + data[key] = process_string_value(value) + + else: + convert_body_to_literal(value) + + elif isinstance(data, list): + for idx, choice in enumerate(data): + data[idx] = convert_body_to_literal(choice) + + return data + + +class PrettyPrintJSONBody: + """This makes request and response body recordings more readable.""" + + @staticmethod + def serialize(cassette_dict): + cassette_dict = convert_body_to_literal(cassette_dict) + return yaml.dump( + cassette_dict, default_flow_style=False, allow_unicode=True + ) + + @staticmethod + def deserialize(cassette_string): + return yaml.load(cassette_string, Loader=yaml.Loader) + + +@pytest.fixture(scope="module", autouse=True) +def fixture_vcr(vcr): + vcr.register_serializer("yaml", PrettyPrintJSONBody) + return vcr + + +def scrub_response_headers(response): + """ + This scrubs sensitive response headers. Note they are case-sensitive! + """ + response["headers"]["openai-organization"] = "test_openai_org_id" + response["headers"]["Set-Cookie"] = "test_set_cookie" + return response diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_langchain_llm.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_langchain_llm.py new file mode 100644 index 0000000000..6c3699c272 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_langchain_llm.py @@ -0,0 +1,558 @@ +"""Test suite for LangChain LLM instrumentation with OpenTelemetry. + +This module contains tests that verify the integration between LangChain LLM calls +and OpenTelemetry for observability, including spans, logs, and metrics. +""" +# Standard library imports +import json,os +from typing import Any, Dict, List, Optional + +# Third-party imports +import pytest +from langchain_core.messages import ( + HumanMessage, + SystemMessage, + ToolMessage, +) +from langchain_core.tools import tool +from langchain_openai import ChatOpenAI +from opentelemetry.sdk.metrics.export import Metric +from opentelemetry.sdk.trace import ReadableSpan, Span +from opentelemetry.semconv._incubating.attributes import event_attributes as EventAttributes +from opentelemetry.semconv._incubating.metrics import gen_ai_metrics +from opentelemetry.semconv._incubating.attributes import gen_ai_attributes + +# Constants +CHAT = gen_ai_attributes.GenAiOperationNameValues.CHAT.value +TOOL_OPERATION = "execute_tool" + +########################################### +# Assertion Helpers +########################################### + +# OpenAI Attributes Helpers + +def assert_openai_completion_attributes( + span: ReadableSpan, + request_model: str, + response: Any, + operation_name: str = "chat", +) -> None: + """Verify OpenAI completion attributes in a span. + + Args: + span: The span to check + request_model: Expected request model name + response: The LLM response object + operation_name: Expected operation name (default: "chat") + """ + return assert_all_openai_attributes( + span, + request_model, + response.response_metadata.get("model_name"), + response.response_metadata.get("token_usage").get("prompt_tokens"), + response.response_metadata.get("token_usage").get("completion_tokens"), + operation_name, + ) + +def assert_all_openai_attributes( + span: ReadableSpan, + request_model: str, + response_model: str = "gpt-4o-mini-2024-07-18", + input_tokens: Optional[int] = None, + output_tokens: Optional[int] = None, + operation_name: str = "chat", + span_name: str = "chat gpt-4o-mini", + system: str = "LangChain:ChatOpenAI", +): + assert span.name == span_name + + assert operation_name == span.attributes[gen_ai_attributes.GEN_AI_OPERATION_NAME] + + assert request_model == "gpt-4o-mini" + + assert response_model == "gpt-4o-mini-2024-07-18" + + assert gen_ai_attributes.GEN_AI_RESPONSE_ID in span.attributes + + if input_tokens: + assert ( + input_tokens + == span.attributes[gen_ai_attributes.GEN_AI_USAGE_INPUT_TOKENS] + ) + else: + assert gen_ai_attributes.GEN_AI_USAGE_INPUT_TOKENS not in span.attributes + + if output_tokens: + assert ( + output_tokens + == span.attributes[gen_ai_attributes.GEN_AI_USAGE_OUTPUT_TOKENS] + ) + else: + assert ( + gen_ai_attributes.GEN_AI_USAGE_OUTPUT_TOKENS not in span.attributes + ) + +def _assert_tool_request_functions_on_span( + span: Span, expected_tool_names: List[str] +) -> None: + """Verify tool request functions in span attributes. + + Args: + span: The span to check + expected_tool_names: List of expected tool names + """ + for i, name in enumerate(expected_tool_names): + assert span.attributes.get(f"gen_ai.request.function.{i}.name") == name + assert f"gen_ai.request.function.{i}.description" in span.attributes + assert f"gen_ai.request.function.{i}.parameters" in span.attributes + + + +# Log Assertion Helpers + +def assert_message_in_logs( + log: Any, + event_name: str, + expected_content: Dict[str, Any], + parent_span: Span, +) -> None: + """Verify a log message has the expected content and parent span. + + Args: + log: The log record to check + event_name: Expected event name + expected_content: Expected content in the log body + parent_span: Parent span for context verification + """ + assert log.log_record.attributes[EventAttributes.EVENT_NAME] == event_name + # assert ( + # TODO: use constant from GenAIAttributes.GenAiSystemValues after it is added there + # log.log_record.attributes[gen_ai_attributes.GEN_AI_SYSTEM] + # == "langchain" + # ) + + if not expected_content: + assert not log.log_record.body + else: + assert log.log_record.body + assert dict(log.log_record.body) == remove_none_values( + expected_content + ) + assert_log_parent(log, parent_span) + +def assert_log_parent(log, span): + if span: + assert log.log_record.trace_id == span.get_span_context().trace_id + assert log.log_record.span_id == span.get_span_context().span_id + assert ( + log.log_record.trace_flags == span.get_span_context().trace_flags + ) + +# Metric Assertion Helpers + +def remove_none_values(body): + result = {} + for key, value in body.items(): + if value is None: + continue + if isinstance(value, dict): + result[key] = remove_none_values(value) + elif isinstance(value, list): + result[key] = [remove_none_values(i) for i in value] + else: + result[key] = value + return result + +def assert_duration_metric(metric: Metric, parent_span: Span) -> None: + """Verify duration metric has expected structure and values. + + Args: + metric: The metric to verify + parent_span: Parent span for context verification + """ + assert metric is not None + assert len(metric.data.data_points) >= 1 + assert metric.data.data_points[0].sum > 0 + + assert_duration_metric_attributes(metric.data.data_points[0].attributes, parent_span) + assert_exemplars(metric.data.data_points[0].exemplars, metric.data.data_points[0].sum, parent_span) + +def assert_exemplars(exemplars, sum, parent_span): + assert len(exemplars) >= 1 + assert exemplars[0].value >= sum + assert exemplars[0].span_id == parent_span.get_span_context().span_id + assert exemplars[0].trace_id == parent_span.get_span_context().trace_id + +def assert_token_usage_metric(metric: Metric, parent_span: Span) -> None: + """Verify token usage metric has expected structure and values. + + Args: + metric: The metric to verify + parent_span: Parent span for context verification + """ + assert metric is not None + assert len(metric.data.data_points) == 2 + + assert metric.data.data_points[0].sum > 0 + assert_token_usage_metric_attributes(metric.data.data_points[0].attributes, parent_span) + assert_exemplars(metric.data.data_points[0].exemplars, metric.data.data_points[0].sum, parent_span) + + assert metric.data.data_points[1].sum > 0 + assert_token_usage_metric_attributes(metric.data.data_points[1].attributes, parent_span) + assert_exemplars(metric.data.data_points[1].exemplars, metric.data.data_points[1].sum, parent_span) + + +def assert_duration_metric_attributes(attributes: Dict[str, Any], parent_span: Span) -> None: + """Verify duration metric attributes. + + Args: + attributes: Metric attributes to verify + parent_span: Parent span for context verification + """ + assert len(attributes) == 5 + # assert attributes.get(gen_ai_attributes.GEN_AI_SYSTEM) == "langchain" + assert attributes.get( + gen_ai_attributes.GEN_AI_OPERATION_NAME) == gen_ai_attributes.GenAiOperationNameValues.CHAT.value + assert attributes.get(gen_ai_attributes.GEN_AI_REQUEST_MODEL) == parent_span.attributes[ + gen_ai_attributes.GEN_AI_REQUEST_MODEL + ] + assert attributes.get(gen_ai_attributes.GEN_AI_RESPONSE_MODEL) == parent_span.attributes[ + gen_ai_attributes.GEN_AI_RESPONSE_MODEL + ] + + +def assert_token_usage_metric_attributes( + attributes: Dict[str, Any], parent_span: Span +) -> None: + """Verify token usage metric attributes. + + Args: + attributes: Metric attributes to verify + parent_span: Parent span for context verification + """ + assert len(attributes) == 6 + # assert attributes.get(gen_ai_attributes.GEN_AI_SYSTEM) == "langchain" + assert attributes.get( + gen_ai_attributes.GEN_AI_OPERATION_NAME) == gen_ai_attributes.GenAiOperationNameValues.CHAT.value + assert attributes.get(gen_ai_attributes.GEN_AI_REQUEST_MODEL) == parent_span.attributes[ + gen_ai_attributes.GEN_AI_REQUEST_MODEL + ] + assert attributes.get(gen_ai_attributes.GEN_AI_RESPONSE_MODEL) == parent_span.attributes[ + gen_ai_attributes.GEN_AI_RESPONSE_MODEL + ] + + +def assert_duration_metric_with_tool(metric: Metric, spans: List[Span]) -> None: + """Verify duration metric when tools are involved. + + Args: + metric: The metric to verify + spans: List of spans for context verification + """ + assert spans, "No LLM CHAT spans found" + llm_points = [ + dp for dp in metric.data.data_points + if dp.attributes.get(gen_ai_attributes.GEN_AI_OPERATION_NAME) == CHAT + ] + assert len(llm_points) >= 1 + for dp in llm_points: + assert dp.sum > 0 + assert_duration_metric_attributes(dp.attributes, spans[0]) + + +def assert_token_usage_metric_with_tool(metric: Metric, spans: List[Span]) -> None: + """Verify token usage metric when tools are involved. + + Args: + metric: The metric to verify + spans: List of spans for context verification + """ + assert spans, "No LLM CHAT spans found" + llm_points = [ + dp for dp in metric.data.data_points + if dp.attributes.get(gen_ai_attributes.GEN_AI_OPERATION_NAME) == CHAT + ] + assert len(llm_points) >= 2 # Should have both input and output token metrics + for dp in llm_points: + assert dp.sum > 0 + assert_token_usage_metric_attributes(dp.attributes, spans[0]) + + + +########################################### +# Test Fixtures (from conftest.py) +# - span_exporter +# - log_exporter +# - metric_reader +# - chatOpenAI_client +# - instrument_with_content +########################################### + +########################################### +# Test Functions +########################################### + +def _get_llm_spans(spans: List[Span]) -> List[Span]: + """Filter spans to get only LLM chat spans. + + Args: + spans: List of spans to filter + + Returns: + List of spans that are LLM chat operations + """ + return [ + s for s in spans + if s.attributes.get(gen_ai_attributes.GEN_AI_OPERATION_NAME) == CHAT + ] + + +########################################### +# Test Functions +########################################### + +# Note: The following test functions use VCR to record and replay HTTP interactions +# for reliable and deterministic testing. Each test verifies both the functional +# behavior of the LLM calls and the associated OpenTelemetry instrumentation. + +# Basic LLM Call Tests + +@pytest.mark.vcr() +def test_langchain_call( + span_exporter, + log_exporter, + metric_reader, + chatOpenAI_client, # noqa: N803 + instrument_with_content: None, + monkeypatch, +) -> None: + """Test basic LLM call with telemetry verification. + + This test verifies that: + 1. The LLM call completes successfully + 2. Spans are generated with correct attributes + 3. Logs contain expected messages + 4. Metrics are recorded for the operation + """ + # Setup test LLM with dummy values + monkeypatch.setenv("OPENAI_API_KEY", "test-api-key") + monkeypatch.setenv("APPKEY", "test-app-key") + llm_model_value = "gpt-4o-mini" + llm = ChatOpenAI( + temperature=0.1, + api_key=os.getenv("OPENAI_API_KEY"), + base_url="https://chat-ai.cisco.com/openai/deployments/gpt-4o-mini", + model=llm_model_value, + default_headers={"api-key": os.getenv("OPENAI_API_KEY")}, + model_kwargs={"user": json.dumps({"appkey": os.getenv("APPKEY")})}, + ) + + # Prepare test messages + system_message = SystemMessage(content="You are a helpful assistant!") + user_message = HumanMessage(content="What is the capital of France?") + messages = [system_message, user_message] + + # Execute LLM call + response = llm.invoke(messages) + assert response.content == "The capital of France is Paris." + + # --- Verify Telemetry --- + + # 1. Check spans + spans = span_exporter.get_finished_spans() + assert spans, "No spans were exported" + assert_openai_completion_attributes(spans[0], llm_model_value, response) + + # 2. Check logs + logs = log_exporter.get_finished_logs() + print(f"logs: {logs}") + for log in logs: + print(f"log: {log}") + print(f"log attributes: {log.log_record.attributes}") + print(f"log body: {log.log_record.body}") + system_message = {"content": messages[0].content} + human_message = {"content": messages[1].content} + # will add the logs back once the logs are fixed + # assert_message_in_logs( + # logs[0], "gen_ai.system.message", system_message, spans[0] + # ) + # assert_message_in_logs( + # logs[1], "gen_ai.human.message", human_message, spans[0] + # ) + + chat_generation_event = { + "index": 0, + "finish_reason": "stop", + "message": { + "content": response.content, + "type": "ChatGeneration" + } + } + # assert_message_in_logs(logs[2], "gen_ai.choice", chat_generation_event, spans[0]) + + # 3. Check metrics + metrics = metric_reader.get_metrics_data().resource_metrics + + print(f"metrics: {metrics}") + assert len(metrics) == 1 + + metric_data = metrics[0].scope_metrics[0].metrics + for m in metric_data: + if m.name == gen_ai_metrics.GEN_AI_CLIENT_OPERATION_DURATION: + assert_duration_metric(m, spans[0]) + if m.name == gen_ai_metrics.GEN_AI_CLIENT_TOKEN_USAGE: + assert_token_usage_metric(m, spans[0]) + + +@pytest.mark.vcr() +def test_langchain_call_with_tools( + span_exporter, + log_exporter, + metric_reader, + instrument_with_content: None, + monkeypatch +) -> None: + """Test LLM call with tool usage and verify telemetry. + + This test verifies: + 1. Tool definitions and bindings work correctly + 2. Tool execution and response handling + 3. Telemetry includes tool-related spans and metrics + """ + # Define test tools + @tool + def add(a: int, b: int) -> int: + """Add two integers together.""" + return a + b + + @tool + def multiply(a: int, b: int) -> int: + """Multiply two integers together.""" + return a * b + + monkeypatch.setenv("OPENAI_API_KEY", "test-api-key") + monkeypatch.setenv("APPKEY", "test-app-key") + # Setup LLM with tools + llm = ChatOpenAI( + temperature=0.1, + api_key=os.getenv("OPENAI_API_KEY"), + base_url='https://chat-ai.cisco.com/openai/deployments/gpt-4o-mini', + model='gpt-4o-mini', + default_headers={"api-key": os.getenv("OPENAI_API_KEY")}, + model_kwargs={"user": json.dumps({"appkey": os.getenv("APPKEY")})}, + ) + + tools = [add, multiply] + llm_with_tools = llm.bind_tools(tools) + + # Test conversation flow + messages = [HumanMessage("Please add 2 and 3, then multiply 2 and 3.")] + + # First LLM call - should return tool calls + ai_msg = llm_with_tools.invoke(messages) + messages.append(ai_msg) + + # Process tool calls + tool_calls = getattr(ai_msg, "tool_calls", None) or \ + ai_msg.additional_kwargs.get("tool_calls", []) + + # Execute tools and collect results + name_map = {"add": add, "multiply": multiply} + for tc in tool_calls: + fn = tc.get("function", {}) + tool_name = (fn.get("name") or tc.get("name") or "").lower() + arg_str = fn.get("arguments") + args = json.loads(arg_str) if isinstance(arg_str, str) else (tc.get("args") or {}) + + selected_tool = name_map[tool_name] + tool_output = selected_tool.invoke(args) + + messages.append(ToolMessage( + content=str(tool_output), + name=tool_name, + tool_call_id=tc.get("id", "") + )) + + # Final LLM call with tool results + final = llm_with_tools.invoke(messages) + assert isinstance(final.content, str) and len(final.content) > 0 + assert "5" in final.content and "6" in final.content + + # --- Verify Telemetry --- + spans = span_exporter.get_finished_spans() + assert len(spans) >= 1 + _assert_tool_request_functions_on_span(spans[0], ["add", "multiply"]) + + # Verify logs + logs = log_exporter.get_finished_logs() + assert len(logs) >= 3 # system/user + gen_ai.choice + + choice_logs = [l for l in logs if l.log_record.attributes.get("event.name") == "gen_ai.choice"] + assert len(choice_logs) >= 1 + body = dict(choice_logs[0].log_record.body or {}) + assert "message" in body and isinstance(body["message"], dict) + assert body["message"].get("type") == "ChatGeneration" + assert isinstance(body["message"].get("content"), str) + + # Verify metrics with tool usage + llm_spans = _get_llm_spans(spans) + for rm in metric_reader.get_metrics_data().resource_metrics: + for scope in rm.scope_metrics: + for metric in scope.metrics: + if metric.name == "gen_ai.client.operation.duration": + assert_duration_metric_with_tool(metric, llm_spans) + elif metric.name == "gen_ai.client.token.usage": + assert_token_usage_metric_with_tool(metric, llm_spans) + + +# Tool-related Assertion Helpers +def assert_duration_metric_with_tool(metric: Metric, spans: List[Span]) -> None: + """Verify duration metric attributes when tools are involved. + + Args: + metric: The metric data points to verify + spans: List of spans for context verification + """ + llm_points = [ + dp for dp in metric.data.data_points + if dp.attributes.get(gen_ai_attributes.GEN_AI_OPERATION_NAME) == CHAT + ] + assert len(llm_points) >= 1 + for dp in llm_points: + assert_duration_metric_attributes(dp.attributes, spans[0]) + if getattr(dp, "exemplars", None): + assert_exemplar_matches_any_llm_span(dp.exemplars, spans) + + +def assert_token_usage_metric_with_tool(metric: Metric, spans: List[Span]) -> None: + """Verify token usage metric when tools are involved. + + Args: + metric: The metric to verify + spans: List of spans for context verification + """ + assert spans, "No LLM CHAT spans found" + + # Only consider CHAT datapoints (ignore tool) + llm_points = [ + dp for dp in metric.data.data_points + if dp.attributes.get(gen_ai_attributes.GEN_AI_OPERATION_NAME) == CHAT + ] + assert len(llm_points) >= 2 + + for dp in llm_points: + assert dp.sum > 0 + assert_token_usage_metric_attributes(dp.attributes, spans[0]) # use attrs from any LLM span + if getattr(dp, "exemplars", None): + assert_exemplar_matches_any_llm_span(dp.exemplars, spans) + +def assert_exemplar_matches_any_llm_span(exemplars, spans): + assert exemplars and len(exemplars) >= 1 + # Build a lookup of span_id -> (trace_id, span_obj) + by_id = {s.get_span_context().span_id: s for s in spans} + for ex in exemplars: + s = by_id.get(ex.span_id) + assert s is not None, f"exemplar.span_id not found among LLM spans: {ex.span_id}" + # Optional: also ensure consistent trace + assert ex.trace_id == s.get_span_context().trace_id \ No newline at end of file diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index 8a6b7ec6df..ff3423be08 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -5,7 +5,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## Unreleased +## [Unreleased] Repurpose the `OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT` environment variable when GEN AI stability mode is set to `gen_ai_latest_experimental`, -to take on an enum (`NO_CONTENT/SPAN_ONLY/EVENT_ONLY/SPAN_AND_EVENT`) instead of a boolean. Add a utility function to help parse this environment variable. \ No newline at end of file +to take on an enum (`NO_CONTENT/SPAN_ONLY/EVENT_ONLY/SPAN_AND_EVENT`) instead of a boolean. Add a utility function to help parse this environment variable. + +### Added + +- Generate Spans for LLM invocations +- Generate Metrics for LLM invocations +- Generate Logs for LLM invocations +- Helper functions for starting and finishing LLM invocations diff --git a/util/opentelemetry-util-genai/README.rst b/util/opentelemetry-util-genai/README.rst index 4c10b7d36b..6aad7d7d43 100644 --- a/util/opentelemetry-util-genai/README.rst +++ b/util/opentelemetry-util-genai/README.rst @@ -6,6 +6,39 @@ The GenAI Utils package will include boilerplate and helpers to standardize inst This package will provide APIs and decorators to minimize the work needed to instrument genai libraries, while providing standardization for generating both types of otel, "spans and metrics" and "spans, metrics and events" +This package provides these span attributes. +-> gen_ai.provider.name: Str(openai) +-> gen_ai.operation.name: Str(chat) +-> gen_ai.framework: Str(langchain) +-> gen_ai.system: Str(openai) # deprecated +-> gen_ai.request.model: Str(gpt-3.5-turbo) +-> gen_ai.response.finish_reasons: Slice(["stop"]) +-> gen_ai.response.model: Str(gpt-3.5-turbo-0125) +-> gen_ai.response.id: Str(chatcmpl-Bz8yrvPnydD9pObv625n2CGBPHS13) +-> gen_ai.usage.input_tokens: Int(24) +-> gen_ai.usage.output_tokens: Int(7) +-> gen_ai.input.messages: Str('[{"role": "Human", "parts": [{"content": "hello world", "type": "text"}]}]') +-> gen_ai.output.messages: Str('[{"role": "AI", "parts": [{"content": "hello back", "type": "text"}], "finish_reason": "stop"}]') + + +This package also provides these metric attributes. +Token Usage Metrics: +-> gen_ai.provider.name: Str(openai) +-> gen_ai.operation.name: Str(chat) +-> gen_ai.framework: Str(langchain) +-> gen_ai.request.model: Str(gpt-3.5-turbo) +-> gen_ai.response.model: Str(gpt-3.5-turbo-0125) +-> gen_ai.usage.input_tokens: Int(24) +-> gen_ai.usage.output_tokens: Int(7) +-> gen_ai.token.type: Str(input|output) + +Duration Metrics: +-> gen_ai.provider.name: Str(openai) +-> gen_ai.operation.name: Str(chat) +-> gen_ai.framework: Str(langchain) +-> gen_ai.request.model: Str(gpt-3.5-turbo) +-> gen_ai.response.model: Str(gpt-3.5-turbo-0125) + Installation ------------ diff --git a/util/opentelemetry-util-genai/pyproject.toml b/util/opentelemetry-util-genai/pyproject.toml index 280da37d58..e68ff37e0e 100644 --- a/util/opentelemetry-util-genai/pyproject.toml +++ b/util/opentelemetry-util-genai/pyproject.toml @@ -25,9 +25,9 @@ classifiers = [ "Programming Language :: Python :: 3.13", ] dependencies = [ - "opentelemetry-instrumentation ~= 0.51b0", - "opentelemetry-semantic-conventions ~= 0.51b0", - "opentelemetry-api>=1.31.0", + "opentelemetry-instrumentation ~= 0.57b0", + "opentelemetry-semantic-conventions ~= 0.57b0", + "opentelemetry-api>=1.36.0", ] [project.optional-dependencies] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/__init__.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/__init__.py index e69de29bb2..b0a6f42841 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/__init__.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/__init__.py @@ -0,0 +1,13 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/data.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/data.py new file mode 100644 index 0000000000..9dc09f465c --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/data.py @@ -0,0 +1,69 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass +from typing import List, Literal, Optional, Type, TypedDict + + +class TextPart(TypedDict): + type: Literal["text"] + content: str + + +# Keep room for future parts without changing the return type +# addition of tools can use Part = Union[TextPart, ToolPart] +Part = TextPart + + +class OtelMessage(TypedDict): + role: str + # role: Literal["user", "assistant", "system", "tool", "tool_message"] # TODO: check semconvs for allowed roles + parts: List[Part] + + +@dataclass +class Message: + content: str + type: str + name: str + + def _to_semconv_dict(self) -> OtelMessage: + """Convert the message to a dictionary suitable for OpenTelemetry semconvs. + + Ref: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/registry/attributes/gen-ai.md#gen-ai-input-messages + """ + + # TODO: Support tool_call and tool_call response + return { + "role": self.type, + "parts": [ + { + "content": self.content, + "type": "text", + } + ], + } + + +@dataclass +class ChatGeneration: + content: str + type: str + finish_reason: Optional[str] = None + + +@dataclass +class Error: + message: str + type: Type[BaseException] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/generators.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/generators.py new file mode 100644 index 0000000000..90ad0e27c5 --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/generators.py @@ -0,0 +1,281 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Span generation utilities for GenAI telemetry. + +This module maps GenAI (Generative AI) invocations to OpenTelemetry spans and +applies GenAI semantic convention attributes. + +Classes: + - BaseTelemetryGenerator: Abstract base for GenAI telemetry emitters. + - SpanGenerator: Concrete implementation that creates and finalizes spans + for LLM operations (e.g., chat) and records input/output messages when + experimental mode and content capture settings allow. + +Usage: + See `opentelemetry/util/genai/handler.py` for `TelemetryHandler`, which + constructs `LLMInvocation` objects and delegates to `SpanGenerator.start`, + `SpanGenerator.finish`, and `SpanGenerator.error` to produce spans that + follow the GenAI semantic conventions. +""" + +import json +from contextlib import contextmanager +from dataclasses import asdict, dataclass, field +from typing import Any, Dict, List, Optional +from uuid import UUID + +from opentelemetry import trace +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.semconv.attributes import ( + error_attributes as ErrorAttributes, +) +from opentelemetry.trace import ( + Span, + SpanKind, + Tracer, + set_span_in_context, + use_span, +) +from opentelemetry.trace.status import Status, StatusCode +from opentelemetry.util.genai.utils import ( + ContentCapturingMode, + get_content_capturing_mode, + is_experimental_mode, +) +from opentelemetry.util.types import AttributeValue + +from .types import Error, InputMessage, LLMInvocation, OutputMessage + + +@dataclass +class _SpanState: + span: Span + children: List[UUID] = field(default_factory=list) + + +def _apply_common_span_attributes( + span: Span, invocation: LLMInvocation +) -> None: + """Apply attributes shared by finish() and error() and compute metrics. + + Returns (genai_attributes) for use with metrics. + """ + request_model = invocation.attributes.get("request_model") + provider = invocation.attributes.get("provider") + + span.set_attribute( + GenAI.GEN_AI_OPERATION_NAME, GenAI.GenAiOperationNameValues.CHAT.value + ) + if request_model: + span.set_attribute(GenAI.GEN_AI_REQUEST_MODEL, request_model) + if provider is not None: + # TODO: clean provider name to match GenAiProviderNameValues? + span.set_attribute(GenAI.GEN_AI_PROVIDER_NAME, provider) + + finish_reasons: List[str] = [] + for gen in invocation.chat_generations: + finish_reasons.append(gen.finish_reason) + if finish_reasons: + span.set_attribute( + GenAI.GEN_AI_RESPONSE_FINISH_REASONS, finish_reasons + ) + + response_model = invocation.attributes.get("response_model_name") + response_id = invocation.attributes.get("response_id") + prompt_tokens = invocation.attributes.get("input_tokens") + completion_tokens = invocation.attributes.get("output_tokens") + _set_response_and_usage_attributes( + span, + response_model, + response_id, + prompt_tokens, + completion_tokens, + ) + + +def _set_response_and_usage_attributes( + span: Span, + response_model: Optional[str], + response_id: Optional[str], + prompt_tokens: Optional[AttributeValue], + completion_tokens: Optional[AttributeValue], +) -> None: + if response_model is not None: + span.set_attribute(GenAI.GEN_AI_RESPONSE_MODEL, response_model) + if response_id is not None: + span.set_attribute(GenAI.GEN_AI_RESPONSE_ID, response_id) + if isinstance(prompt_tokens, (int, float)): + span.set_attribute(GenAI.GEN_AI_USAGE_INPUT_TOKENS, prompt_tokens) + if isinstance(completion_tokens, (int, float)): + span.set_attribute(GenAI.GEN_AI_USAGE_OUTPUT_TOKENS, completion_tokens) + + +def _maybe_set_span_messages( + span: Span, + input_messages: List[InputMessage], + output_messages: List[OutputMessage], +) -> None: + if not is_experimental_mode() or get_content_capturing_mode() not in ( + ContentCapturingMode.SPAN_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ): + return + message_parts: List[Dict[str, Any]] = [ + asdict(message) for message in input_messages + ] + if message_parts: + span.set_attribute("gen_ai.input.messages", json.dumps(message_parts)) + + generation_parts: List[Dict[str, Any]] = [ + asdict(generation) for generation in output_messages + ] + if generation_parts: + span.set_attribute( + "gen_ai.output.messages", json.dumps(generation_parts) + ) + + +def _apply_finish_attributes(span: Span, invocation: LLMInvocation) -> None: + """Apply attributes/messages common to finish() paths.""" + _apply_common_span_attributes(span, invocation) + _maybe_set_span_messages( + span, invocation.messages, invocation.chat_generations + ) + + +def _apply_error_attributes(span: Span, error: Error) -> None: + """Apply status and error attributes common to error() paths.""" + span.set_status(Status(StatusCode.ERROR, error.message)) + if span.is_recording(): + span.set_attribute(ErrorAttributes.ERROR_TYPE, error.type.__qualname__) + + +class BaseTelemetryGenerator: + """ + Abstract base for emitters mapping GenAI types -> OpenTelemetry. + """ + + def start(self, invocation: LLMInvocation) -> None: + raise NotImplementedError + + def finish(self, invocation: LLMInvocation) -> None: + raise NotImplementedError + + def error(self, error: Error, invocation: LLMInvocation) -> None: + raise NotImplementedError + + +class SpanGenerator(BaseTelemetryGenerator): + """ + Generates only spans. + """ + + def __init__( + self, + tracer: Optional[Tracer] = None, + ): + self._tracer: Tracer = tracer or trace.get_tracer(__name__) + + # TODO: Map from run_id -> _SpanState, to keep track of spans and parent/child relationships + self.spans: Dict[UUID, _SpanState] = {} + + def _start_span( + self, + name: str, + kind: SpanKind, + parent_run_id: Optional[UUID] = None, + ) -> Span: + parent_span = ( + self.spans.get(parent_run_id) + if parent_run_id is not None + else None + ) + if parent_span is not None: + ctx = set_span_in_context(parent_span.span) + span = self._tracer.start_span(name=name, kind=kind, context=ctx) + else: + # top-level or missing parent + span = self._tracer.start_span(name=name, kind=kind) + set_span_in_context(span) + + return span + + def _end_span(self, run_id: UUID): + state = self.spans[run_id] + for child_id in state.children: + child_state = self.spans.get(child_id) + if child_state: + child_state.span.end() + state.span.end() + del self.spans[run_id] + + def start(self, invocation: LLMInvocation): + # Create/register the span; keep it active but do not end it here. + with self._start_span_for_invocation(invocation): + pass + + @contextmanager + def _start_span_for_invocation(self, invocation: LLMInvocation): + """Create/register a span for the invocation and yield it. + + The span is not ended automatically on exiting the context; callers + must finalize via _finalize_invocation. + """ + # Establish parent/child relationship if a parent span exists. + parent_state = ( + self.spans.get(invocation.parent_run_id) + if invocation.parent_run_id is not None + else None + ) + if parent_state is not None: + parent_state.children.append(invocation.run_id) + span = self._start_span( + name=f"{GenAI.GenAiOperationNameValues.CHAT.value} {invocation.request_model}", + kind=SpanKind.CLIENT, + parent_run_id=invocation.parent_run_id, + ) + with use_span(span, end_on_exit=False) as span: + span_state = _SpanState( + span=span, + ) + self.spans[invocation.run_id] = span_state + yield span + + def finish(self, invocation: LLMInvocation): + state = self.spans.get(invocation.run_id) + if state is None: + with self._start_span_for_invocation(invocation) as span: + _apply_finish_attributes(span, invocation) + self._end_span(invocation.run_id) + return + + span = state.span + _apply_finish_attributes(span, invocation) + self._end_span(invocation.run_id) + + def error(self, error: Error, invocation: LLMInvocation): + state = self.spans.get(invocation.run_id) + if state is None: + with self._start_span_for_invocation(invocation) as span: + _apply_error_attributes(span, error) + self._end_span(invocation.run_id) + return + + span = state.span + _apply_error_attributes(span, error) + self._end_span(invocation.run_id) \ No newline at end of file diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py new file mode 100644 index 0000000000..1d80b8f52d --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py @@ -0,0 +1,123 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Telemetry handler for GenAI invocations. + +This module exposes the `TelemetryHandler` class, which manages the lifecycle of +GenAI (Generative AI) invocations and emits telemetry data (spans and related attributes). +It supports starting, stopping, and failing LLM invocations. + +Classes: + - TelemetryHandler: Manages GenAI invocation lifecycles and emits telemetry. + +Functions: + - get_telemetry_handler: Returns a singleton `TelemetryHandler` instance. + +Usage: + handler = get_telemetry_handler() + handler.start_llm(prompts, run_id, **attrs) + handler.stop_llm(run_id, chat_generations, **attrs) + handler.fail_llm(run_id, error, **attrs) +""" + +import time +from typing import Any, List, Optional +from uuid import UUID + +from opentelemetry.semconv.schemas import Schemas +from opentelemetry.trace import get_tracer + +from .generators import SpanGenerator +from .types import Error, InputMessage, LLMInvocation, OutputMessage +from .version import __version__ + + +class TelemetryHandler: + """ + High-level handler managing GenAI invocation lifecycles and emitting + them as spans, metrics, and events. + """ + + def __init__(self, emitter_type_full: bool = True, **kwargs: Any): + tracer_provider = kwargs.get("tracer_provider") + self._tracer = get_tracer( + __name__, + __version__, + tracer_provider, + schema_url=Schemas.V1_36_0.value, + ) + + # TODO: trigger span+metric+event generation based on the full emitter flag + self._generator = SpanGenerator(tracer=self._tracer) + + self._llm_registry: dict[UUID, LLMInvocation] = {} + + def start_llm( + self, + request_model: str, + prompts: List[InputMessage], + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **attributes: Any, + ) -> LLMInvocation: + invocation = LLMInvocation( + request_model=request_model, + messages=prompts, + run_id=run_id, + parent_run_id=parent_run_id, + attributes=attributes, + ) + self._llm_registry[invocation.run_id] = invocation + self._generator.start(invocation) + return invocation + + def stop_llm( + self, + run_id: UUID, + chat_generations: List[OutputMessage], + **attributes: Any, + ) -> LLMInvocation: + invocation = self._llm_registry.pop(run_id) + invocation.end_time = time.time() + invocation.chat_generations = chat_generations + invocation.attributes.update(attributes) + self._generator.finish(invocation) + return invocation + + def fail_llm( + self, run_id: UUID, error: Error, **attributes: Any + ) -> LLMInvocation: + invocation = self._llm_registry.pop(run_id) + invocation.end_time = time.time() + invocation.attributes.update(**attributes) + self._generator.error(error, invocation) + return invocation + + +def get_telemetry_handler( + emitter_type_full: bool = True, **kwargs: Any +) -> TelemetryHandler: + """ + Returns a singleton TelemetryHandler instance. + """ + handler: Optional[TelemetryHandler] = getattr( + get_telemetry_handler, "_default_handler", None + ) + if handler is None: + handler = TelemetryHandler( + emitter_type_full=emitter_type_full, **kwargs + ) + setattr(get_telemetry_handler, "_default_handler", handler) + return handler \ No newline at end of file diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py new file mode 100644 index 0000000000..619e1cda2d --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py @@ -0,0 +1,68 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from opentelemetry.metrics import Histogram, Meter +from opentelemetry.semconv._incubating.metrics import gen_ai_metrics + +# TODO: should this be in utils or passed to the telemetry client? +_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [ + 0.01, + 0.02, + 0.04, + 0.08, + 0.16, + 0.32, + 0.64, + 1.28, + 2.56, + 5.12, + 10.24, + 20.48, + 40.96, + 81.92, +] + +# TODO: should this be in utils or passed to the telemetry client? +_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS = [ + 1, + 4, + 16, + 64, + 256, + 1024, + 4096, + 16384, + 65536, + 262144, + 1048576, + 4194304, + 16777216, + 67108864, +] + + +class Instruments: + def __init__(self, meter: Meter): + self.operation_duration_histogram: Histogram = meter.create_histogram( + name=gen_ai_metrics.GEN_AI_CLIENT_OPERATION_DURATION, + description="GenAI operation duration", + unit="s", + explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS, + ) + self.token_usage_histogram: Histogram = meter.create_histogram( + name=gen_ai_metrics.GEN_AI_CLIENT_TOKEN_USAGE, + description="Measures number of input and output tokens used", + unit="{token}", + explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS, + ) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py index 569e7e7e00..91e9350ae2 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py @@ -1,3 +1,4 @@ + # Copyright The OpenTelemetry Authors # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,9 +14,11 @@ # limitations under the License. -from dataclasses import dataclass +import time +from dataclasses import dataclass, field from enum import Enum -from typing import Any, Literal, Optional, Union +from typing import Any, Dict, List, Literal, Optional, Type, Union +from uuid import UUID class ContentCapturingMode(Enum): @@ -69,3 +72,27 @@ class OutputMessage: role: str parts: list[MessagePart] finish_reason: Union[str, FinishReason] + + +@dataclass +class LLMInvocation: + """ + Represents a single LLM call invocation. + """ + + run_id: UUID + request_model: str + parent_run_id: Optional[UUID] = None + start_time: float = field(default_factory=time.time) + end_time: Optional[float] = None + messages: List[InputMessage] = field(default_factory=list) + chat_generations: List[OutputMessage] = field(default_factory=list) + attributes: Dict[str, Any] = field(default_factory=dict) + span_id: int = 0 + trace_id: int = 0 + + +@dataclass +class Error: + message: str + type: Type[BaseException] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/utils.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/utils.py index 91cb9221f1..6cd11efb12 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/utils.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/utils.py @@ -28,19 +28,23 @@ logger = logging.getLogger(__name__) +def is_experimental_mode() -> bool: + return ( + _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( + _OpenTelemetryStabilitySignalType.GEN_AI, + ) + is _StabilityMode.GEN_AI_LATEST_EXPERIMENTAL + ) + + def get_content_capturing_mode() -> ContentCapturingMode: """This function should not be called when GEN_AI stability mode is set to DEFAULT. When the GEN_AI stability mode is DEFAULT this function will raise a ValueError -- see the code below.""" envvar = os.environ.get(OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT) - if ( - _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( - _OpenTelemetryStabilitySignalType.GEN_AI, - ) - == _StabilityMode.DEFAULT - ): + if not is_experimental_mode(): raise ValueError( - "This function should never be called when StabilityMode is default." + "This function should never be called when StabilityMode is not experimental." ) if not envvar: return ContentCapturingMode.NO_CONTENT diff --git a/util/opentelemetry-util-genai/tests/test_utils.py b/util/opentelemetry-util-genai/tests/test_utils.py index 675b6eba5f..59165be0f7 100644 --- a/util/opentelemetry-util-genai/tests/test_utils.py +++ b/util/opentelemetry-util-genai/tests/test_utils.py @@ -15,15 +15,28 @@ import os import unittest from unittest.mock import patch +from uuid import uuid4 +from opentelemetry import trace from opentelemetry.instrumentation._semconv import ( OTEL_SEMCONV_STABILITY_OPT_IN, _OpenTelemetrySemanticConventionStability, ) +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) from opentelemetry.util.genai.environment_variables import ( OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, ) -from opentelemetry.util.genai.types import ContentCapturingMode +from opentelemetry.util.genai.handler import get_telemetry_handler +from opentelemetry.util.genai.types import ( + ContentCapturingMode, + InputMessage, + OutputMessage, + Text, +) from opentelemetry.util.genai.utils import get_content_capturing_mode @@ -81,3 +94,128 @@ def test_get_content_capturing_mode_raises_exception_on_invalid_envvar( ) self.assertEqual(len(cm.output), 1) self.assertIn("INVALID_VALUE is not a valid option for ", cm.output[0]) + + +class TestTelemetryHandler(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor( + SimpleSpanProcessor(cls.span_exporter) + ) + trace.set_tracer_provider(tracer_provider) + + def setUp(self): + self.span_exporter = self.__class__.span_exporter + self.span_exporter.clear() + self.telemetry_handler = get_telemetry_handler() + + def tearDown(self): + # Clear spans and reset the singleton telemetry handler so each test starts clean + self.span_exporter.clear() + if hasattr(get_telemetry_handler, "_default_handler"): + delattr(get_telemetry_handler, "_default_handler") + + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="SPAN_ONLY", + ) + def test_llm_start_and_stop_creates_span(self): # pylint: disable=no-self-use + run_id = uuid4() + message = InputMessage( + role="Human", parts=[Text(content="hello world")] + ) + chat_generation = OutputMessage( + role="AI", parts=[Text(content="hello back")], finish_reason="stop" + ) + + # Start and stop LLM invocation + self.telemetry_handler.start_llm( + request_model="test-model", + prompts=[message], + run_id=run_id, + custom_attr="value", + provider="test-provider", + ) + invocation = self.telemetry_handler.stop_llm( + run_id, chat_generations=[chat_generation], extra="info" + ) + + # Get the spans that were created + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.name == "chat test-model" + assert span.kind == trace.SpanKind.CLIENT + + # Verify span attributes + assert span.attributes is not None + span_attrs = span.attributes + assert span_attrs.get("gen_ai.operation.name") == "chat" + assert span_attrs.get("gen_ai.provider.name") == "test-provider" + assert span.start_time is not None + assert span.end_time is not None + assert span.end_time > span.start_time + assert invocation.run_id == run_id + assert invocation.attributes.get("custom_attr") == "value" + assert invocation.attributes.get("extra") == "info" + + # Check messages captured on span + input_messages_json = span_attrs.get("gen_ai.input.messages") + output_messages_json = span_attrs.get("gen_ai.output.messages") + assert input_messages_json is not None + assert output_messages_json is not None + + assert isinstance(input_messages_json, str) + assert isinstance(output_messages_json, str) + + @patch_env_vars( + stability_mode="gen_ai_latest_experimental", + content_capturing="SPAN_ONLY", + ) + def test_parent_child_span_relationship(self): + parent_id = uuid4() + child_id = uuid4() + message = InputMessage(role="Human", parts=[Text(content="hi")]) + chat_generation = OutputMessage( + role="AI", parts=[Text(content="ok")], finish_reason="stop" + ) + + # Start parent and child (child references parent_run_id) + self.telemetry_handler.start_llm( + request_model="parent-model", + prompts=[message], + run_id=parent_id, + provider="test-provider", + ) + self.telemetry_handler.start_llm( + request_model="child-model", + prompts=[message], + run_id=child_id, + parent_run_id=parent_id, + provider="test-provider", + ) + + # Stop child first, then parent (order should not matter) + self.telemetry_handler.stop_llm( + child_id, chat_generations=[chat_generation] + ) + self.telemetry_handler.stop_llm( + parent_id, chat_generations=[chat_generation] + ) + + spans = self.span_exporter.get_finished_spans() + assert len(spans) == 2 + + # Identify spans irrespective of export order + child_span = next(s for s in spans if s.name == "chat child-model") + parent_span = next(s for s in spans if s.name == "chat parent-model") + + # Same trace + assert child_span.context.trace_id == parent_span.context.trace_id + # Child has parent set to parent's span id + assert child_span.parent is not None + assert child_span.parent.span_id == parent_span.context.span_id + # Parent should not have a parent (root) + assert parent_span.parent is None \ No newline at end of file