diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/examples/multi_agent_travel_planner/.env b/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/examples/multi_agent_travel_planner/.env new file mode 100644 index 0000000000..4a2903d080 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/examples/multi_agent_travel_planner/.env @@ -0,0 +1,8 @@ +OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 +OTEL_SEMCONV_STABILITY_OPT_IN=gen_ai_latest_experimental +OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT=true +OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT_MODE=SPAN_AND_EVENT +OTEL_INSTRUMENTATION_GENAI_EMITTERS=span_metric_event,splunk +OTEL_INSTRUMENTATION_GENAI_EMITTERS_EVALUATION=replace-category:SplunkEvaluationResults +OTEL_INSTRUMENTATION_GENAI_EVALS_RESULTS_AGGREGATION=true +OTEL_INSTRUMENTATION_GENAI_DEBUG=true diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/examples/multi_agent_travel_planner/requirements.admehra.txt b/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/examples/multi_agent_travel_planner/requirements.admehra.txt new file mode 100644 index 0000000000..eaa5c2c3d4 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain-dev/examples/multi_agent_travel_planner/requirements.admehra.txt @@ -0,0 +1,155 @@ +aiohappyeyeballs==2.6.1 +aiohttp==3.13.2 +aiosignal==1.4.0 +annotated-types==0.7.0 +anthropic==0.72.0 +anyio==4.11.0 +attrs==25.4.0 +backoff==2.2.1 +cachetools==6.2.1 +certifi==2025.10.5 +charset-normalizer==3.4.4 +click==8.2.1 +colorama==0.4.6 +cuid==0.4 +deepeval==3.3.9 +Deprecated==1.3.1 +distro==1.9.0 +docstring_parser==0.17.0 +execnet==2.1.1 +filelock==3.20.0 +frozenlist==1.8.0 +fsspec==2025.10.0 +google-auth==2.42.1 +google-genai==1.47.0 +googleapis-common-protos==1.71.0 +grpcio==1.76.0 +h11==0.16.0 +hf-xet==1.2.0 +httpcore==1.0.9 +httpx==0.28.1 +huggingface-hub==1.0.1 +idna==3.11 +importlib_metadata==8.7.0 +inflection==0.5.1 +iniconfig==2.3.0 +Jinja2==3.1.6 +jiter==0.11.1 +jsonpatch==1.33 +jsonpointer==3.0.0 +langchain==1.0.3 +langchain-core==1.0.2 +langchain-openai==1.0.1 +langgraph==1.0.2 +langgraph-checkpoint==3.0.0 +langgraph-prebuilt==1.0.2 +langgraph-sdk==0.2.9 +langsmith==0.4.39 +markdown-it-py==4.0.0 +MarkupSafe==3.0.3 +mdurl==0.1.2 +monotonic==1.6 +multidict==6.7.0 +nest-asyncio==1.6.0 +ollama==0.6.0 +openai==2.6.1 +opentelemetry-api==1.38.0 +opentelemetry-exporter-otlp==1.38.0 +opentelemetry-exporter-otlp-proto-common==1.38.0 +opentelemetry-exporter-otlp-proto-grpc==1.38.0 +opentelemetry-exporter-otlp-proto-http==1.38.0 +opentelemetry-instrumentation==0.59b0 +opentelemetry-instrumentation-alephalpha==0.47.5 +opentelemetry-instrumentation-anthropic==0.47.5 +opentelemetry-instrumentation-bedrock==0.47.5 +opentelemetry-instrumentation-chromadb==0.47.5 +opentelemetry-instrumentation-cohere==0.47.5 +opentelemetry-instrumentation-crewai==0.47.5 +opentelemetry-instrumentation-google-generativeai==0.47.5 +opentelemetry-instrumentation-groq==0.47.5 +opentelemetry-instrumentation-haystack==0.47.5 +opentelemetry-instrumentation-lancedb==0.47.5 +-e git+ssh://git@github.com/zhirafovod/opentelemetry-python-contrib.git@7b13c32a3b1c8ad66bee8c49e31120cc586fa1ad#egg=opentelemetry_instrumentation_langchain&subdirectory=instrumentation-genai/opentelemetry-instrumentation-langchain-dev +opentelemetry-instrumentation-llamaindex==0.47.5 +opentelemetry-instrumentation-logging==0.59b0 +opentelemetry-instrumentation-marqo==0.47.5 +opentelemetry-instrumentation-mcp==0.47.5 +opentelemetry-instrumentation-milvus==0.47.5 +opentelemetry-instrumentation-mistralai==0.47.5 +opentelemetry-instrumentation-ollama==0.47.5 +opentelemetry-instrumentation-openai==0.47.5 +opentelemetry-instrumentation-openai-agents==0.47.5 +opentelemetry-instrumentation-pinecone==0.47.5 +opentelemetry-instrumentation-qdrant==0.47.5 +opentelemetry-instrumentation-redis==0.59b0 +opentelemetry-instrumentation-replicate==0.47.5 +opentelemetry-instrumentation-requests==0.59b0 +opentelemetry-instrumentation-sagemaker==0.47.5 +opentelemetry-instrumentation-sqlalchemy==0.59b0 +opentelemetry-instrumentation-threading==0.59b0 +opentelemetry-instrumentation-together==0.47.5 +opentelemetry-instrumentation-transformers==0.47.5 +opentelemetry-instrumentation-urllib3==0.59b0 +opentelemetry-instrumentation-vertexai==0.47.5 +opentelemetry-instrumentation-watsonx==0.47.5 +opentelemetry-instrumentation-weaviate==0.47.5 +opentelemetry-instrumentation-writer==0.47.5 +opentelemetry-proto==1.38.0 +opentelemetry-sdk==1.38.0 +opentelemetry-semantic-conventions==0.59b0 +opentelemetry-semantic-conventions-ai==0.4.13 +-e git+ssh://git@github.com/zhirafovod/opentelemetry-python-contrib.git@7b13c32a3b1c8ad66bee8c49e31120cc586fa1ad#egg=opentelemetry_util_genai&subdirectory=util/opentelemetry-util-genai-dev +-e git+ssh://git@github.com/zhirafovod/opentelemetry-python-contrib.git@7b13c32a3b1c8ad66bee8c49e31120cc586fa1ad#egg=opentelemetry_util_genai_evals&subdirectory=util/opentelemetry-util-genai-evals +-e git+ssh://git@github.com/zhirafovod/opentelemetry-python-contrib.git@7b13c32a3b1c8ad66bee8c49e31120cc586fa1ad#egg=opentelemetry_util_genai_evals_deepeval&subdirectory=util/opentelemetry-util-genai-evals-deepeval +-e git+ssh://git@github.com/zhirafovod/opentelemetry-python-contrib.git@7b13c32a3b1c8ad66bee8c49e31120cc586fa1ad#egg=opentelemetry_util_genai_traceloop_translator&subdirectory=util/opentelemetry-util-genai-traceloop-translator +-e git+ssh://git@github.com/zhirafovod/opentelemetry-python-contrib.git@7b13c32a3b1c8ad66bee8c49e31120cc586fa1ad#egg=opentelemetry_util_http&subdirectory=util/opentelemetry-util-http +orjson==3.11.4 +ormsgpack==1.11.0 +packaging==25.0 +pluggy==1.6.0 +portalocker==3.2.0 +posthog==3.25.0 +propcache==0.4.1 +protobuf==6.33.0 +pyasn1==0.6.1 +pyasn1_modules==0.4.2 +pydantic==2.12.3 +pydantic_core==2.41.4 +pyfiglet==1.0.4 +Pygments==2.19.2 +pytest==8.4.2 +pytest-asyncio==1.2.0 +pytest-repeat==0.9.4 +pytest-rerunfailures==12.0 +pytest-xdist==3.8.0 +python-dateutil==2.9.0.post0 +python-dotenv==1.2.1 +PyYAML==6.0.3 +regex==2025.10.23 +requests==2.32.5 +requests-toolbelt==1.0.0 +rich==14.2.0 +rsa==4.9.1 +sentry-sdk==2.43.0 +setuptools==80.9.0 +shellingham==1.5.4 +six==1.17.0 +sniffio==1.3.1 +tabulate==0.9.0 +tenacity==9.1.2 +tiktoken==0.12.0 +tokenizers==0.22.1 +tqdm==4.67.1 +traceloop-sdk==0.47.5 +typer==0.20.0 +typer-slim==0.20.0 +typing-inspection==0.4.2 +typing_extensions==4.15.0 +urllib3==2.5.0 +websockets==15.0.1 +wheel==0.45.1 +wrapt==1.17.3 +xxhash==3.6.0 +yarl==1.22.0 +zipp==3.23.0 +zstandard==0.25.0 diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/span.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/span.py index e27b86b14a..5df90ae602 100644 --- a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/span.py +++ b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/span.py @@ -65,6 +65,26 @@ _SPAN_ALLOWED_SUPPLEMENTAL_KEYS: tuple[str, ...] = ( "gen_ai.framework", "gen_ai.request.id", + # Workflow / Agent attributes (from Traceloop translator) + "gen_ai.workflow.name", + "gen_ai.workflow.path", + "gen_ai.workflow.version", + "gen_ai.agent.name", + "gen_ai.agent.id", + "gen_ai.span.kind", + "gen_ai.association.properties", + "gen_ai.conversation.id", + # Prompt registry attributes + "gen_ai.prompt.managed", + "gen_ai.prompt.key", + "gen_ai.prompt.version", + "gen_ai.prompt.version_name", + "gen_ai.prompt.version_hash", + "gen_ai.prompt.template", + "gen_ai.prompt.template_variables", + # Operation name override (for span name transformation) + "gen_ai.override.span_name", + # Custom attributes (allow any custom.* prefix for flexibility) ) _SPAN_BLOCKED_SUPPLEMENTAL_KEYS: set[str] = {"request_top_p", "ls_temperature"} @@ -297,16 +317,25 @@ def on_start( elif isinstance(invocation, EmbeddingInvocation): self._start_embedding(invocation) else: - # Use operation field for span name (defaults to "chat") - operation = getattr(invocation, "operation", "chat") - model_name = invocation.request_model - span_name = f"{operation} {model_name}" - parent_span = getattr(invocation, "parent_span", None) - parent_ctx = ( - trace.set_span_in_context(parent_span) - if parent_span is not None - else None - ) + # Check for span name override first (for Traceloop task/workflow spans) + span_name_override = invocation.attributes.get("gen_ai.override.span_name") + if span_name_override: + span_name = str(span_name_override) + else: + # Use operation field for span name (defaults to "chat") + operation = getattr(invocation, "operation", "chat") + model_name = invocation.request_model + span_name = f"{operation} {model_name}" + + # Check for parent context (from TraceloopSpanProcessor) or parent span + parent_ctx = getattr(invocation, "parent_context", None) + if parent_ctx is None: + parent_span = getattr(invocation, "parent_span", None) + parent_ctx = ( + trace.set_span_in_context(parent_span) + if parent_span is not None + else None + ) cm = self._tracer.start_as_current_span( span_name, kind=SpanKind.CLIENT, diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/utils.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/utils.py index 279bd3ff58..97eff967d9 100644 --- a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/utils.py +++ b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/utils.py @@ -128,9 +128,18 @@ def filter_semconv_gen_ai_attributes( for key, value in attributes.items(): if not isinstance(key, str): continue - if key not in allowed: + # Allow exact matches + if key in allowed: + filtered[key] = value continue - filtered[key] = value + # Allow gen_ai.* attributes that follow semantic convention patterns + if key.startswith("gen_ai."): + # Check if it matches known patterns with array indices + # e.g., gen_ai.prompt.N.*, gen_ai.completion.N.*, gen_ai.choice.N.* + import re + if re.match(r"gen_ai\.(prompt|completion|choice)\.\d+\.", key): + filtered[key] = value + continue return filtered diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/handler.py index 4d7ba031b4..ceb2ac4544 100644 --- a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/handler.py +++ b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/handler.py @@ -69,6 +69,7 @@ def genai_debug_log(*_args: Any, **_kwargs: Any) -> None: # type: ignore from opentelemetry import _logs from opentelemetry import metrics as _metrics from opentelemetry import trace as _trace_mod +from opentelemetry.context import Context from opentelemetry.semconv.schemas import Schemas from opentelemetry.trace import get_tracer from opentelemetry.util.genai.emitters.configuration import ( @@ -341,8 +342,15 @@ def _refresh_capture_content( def start_llm( self, invocation: LLMInvocation, + parent_context: Context | None = None, ) -> LLMInvocation: - """Start an LLM invocation and create a pending span entry.""" + """Start an LLM invocation and create a pending span entry. + + Args: + invocation: The LLM invocation to start + parent_context: Optional parent context for the span. If provided, the new span + will be a child of the span in this context. + """ # Ensure capture content settings are current self._refresh_capture_content() genai_debug_log("handler.start_llm.begin", invocation) @@ -355,6 +363,9 @@ def start_llm( invocation.agent_name = top_name if not invocation.agent_id: invocation.agent_id = top_id + # Store parent context if provided for emitter to use + if parent_context is not None: + invocation.parent_context = parent_context # type: ignore[attr-defined] # Start invocation span; tracer context propagation handles parent/child links self._emitter.on_start(invocation) # Register span if created diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/types.py index 86c462572a..f7e3301716 100644 --- a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/types.py +++ b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/types.py @@ -59,7 +59,42 @@ def _new_str_any_dict() -> dict[str, Any]: @dataclass(kw_only=True) class GenAI: - """Base type for all GenAI telemetry entities.""" + """Base type for all GenAI telemetry entities. + + All GenAI types (LLMInvocation, EmbeddingInvocation, AgentCreation, AgentInvocation, + Workflow, Step, ToolCall, etc.) inherit from this base class and have access to: + + Span and Context Management: + - span: The OpenTelemetry span associated with this entity + - span_context: The span context for distributed tracing + - context_token: Token for managing span context lifecycle + - trace_id, span_id, trace_flags: Raw trace/span identifiers + + Timing: + - start_time: When the operation started (auto-generated) + - end_time: When the operation completed + + Identification: + - run_id: Unique identifier for this execution (auto-generated UUID) + - parent_run_id: Optional parent execution identifier for hierarchies + + Provider and Framework: + - provider: AI provider (e.g., "openai", "anthropic", "cohere") + - framework: Framework used (e.g., "langchain", "llamaindex") + - system: AI system identifier (semantic convention attribute) + + Agent and Conversation Context: + - agent_name, agent_id: Agent identification + - conversation_id: Conversation/session identifier + - data_source_id: Data source identifier for RAG scenarios + + Behavior Control: + - mutate_original_span: Whether to modify the original span with transformations + (used by span processors like TraceloopSpanProcessor) + + Custom Attributes: + - attributes: Dictionary for custom/additional attributes not covered by fields + """ context_token: Optional[ContextToken] = None span: Optional[Span] = None @@ -97,6 +132,14 @@ class GenAI: default=None, metadata={"semconv": GenAIAttributes.GEN_AI_DATA_SOURCE_ID}, ) + mutate_original_span: bool = field( + default=True, + metadata={ + "description": "Whether to mutate the original span with transformed attributes. " + "When True, the original span will be modified with renamed/transformed attributes. " + "When False, only new spans will be created with the transformations." + }, + ) def semantic_convention_attributes(self) -> dict[str, Any]: """Return semantic convention attributes defined on this dataclass.""" diff --git a/util/opentelemetry-util-genai-traceloop-translator/.deepeval/.deepeval_telemetry.txt b/util/opentelemetry-util-genai-traceloop-translator/.deepeval/.deepeval_telemetry.txt new file mode 100644 index 0000000000..e2d6536b74 --- /dev/null +++ b/util/opentelemetry-util-genai-traceloop-translator/.deepeval/.deepeval_telemetry.txt @@ -0,0 +1,4 @@ +DEEPEVAL_ID=9d4289a3-58c6-41fe-869d-dc6a00930bea +DEEPEVAL_STATUS=old +DEEPEVAL_LAST_FEATURE=evaluation +DEEPEVAL_EVALUATION_STATUS=old diff --git a/util/opentelemetry-util-genai-traceloop-translator/.gitignore b/util/opentelemetry-util-genai-traceloop-translator/.gitignore new file mode 100644 index 0000000000..4c37f4e6df --- /dev/null +++ b/util/opentelemetry-util-genai-traceloop-translator/.gitignore @@ -0,0 +1,8 @@ +.env + +__pycache__/ +.vscode/ +*.pyc +.DS_Store + +# Ignore example output files \ No newline at end of file diff --git a/util/opentelemetry-util-genai-traceloop-translator/README.rst b/util/opentelemetry-util-genai-traceloop-translator/README.rst index e88e8fadcb..8fce0bf450 100644 --- a/util/opentelemetry-util-genai-traceloop-translator/README.rst +++ b/util/opentelemetry-util-genai-traceloop-translator/README.rst @@ -1,22 +1,9 @@ -OpenTelemetry GenAI Traceloop Translator Emitter -=============================================== - -This optional emitter promotes legacy ``traceloop.*`` attributes attached to an ``LLMInvocation`` into -GenAI semantic convention (or forward-looking custom ``gen_ai.*``) attributes *before* the standard -semantic span emitter runs. It does **not** create or end spans; it only rewrites / adds attributes on -the invocation (and, if present, sets them on the existing span). - -Why ---- -If upstream code (or the Traceloop compatibility emitter) still produces ``traceloop.*`` keys but you want -downstream dashboards to rely on the GenAI semantic conventions, enabling this translator lets you migrate -incrementally without rewriting the upstream instrumentation immediately. - -Behavior --------- -The translator runs on ``on_start`` and ``on_end`` of an ``LLMInvocation``. It scans ``invocation.attributes`` -and adds mapped keys (never overwriting existing ``gen_ai.*``). Content normalization (input/output messages) -is gated by the ``OTEL_GENAI_CONTENT_CAPTURE`` env var. +OpenTelemetry GenAI Traceloop Translator +========================================= + +This package automatically translates Traceloop-instrumented spans into OpenTelemetry GenAI semantic conventions. +It intercepts spans with ``traceloop.*`` attributes and creates corresponding spans with ``gen_ai.*`` attributes, +enabling seamless integration between Traceloop instrumentation and GenAI observability tools. Mapping Table ------------- @@ -27,71 +14,51 @@ Traceloop Key Added Key ``traceloop.workflow.name`` ``gen_ai.workflow.name`` ``traceloop.entity.name`` ``gen_ai.agent.name`` ``traceloop.entity.path`` ``gen_ai.workflow.path`` -``traceloop.prompt.managed`` ``gen_ai.prompt.managed`` -``traceloop.prompt.key`` ``gen_ai.prompt.key`` -``traceloop.prompt.version`` ``gen_ai.prompt.version`` -``traceloop.prompt.version_name`` ``gen_ai.prompt.version_name`` -``traceloop.prompt.version_hash`` ``gen_ai.prompt.version_hash`` -``traceloop.prompt.template`` ``gen_ai.prompt.template`` -``traceloop.prompt.template_variables`` ``gen_ai.prompt.template_variables`` -``traceloop.correlation.id`` ``gen_ai.conversation.id` +``traceloop.correlation.id`` ``gen_ai.conversation.id`` ``traceloop.entity.input`` ``gen_ai.input.messages`` ``traceloop.entity.output`` ``gen_ai.output.messages`` -============================== ================================ ============================================ +============================== ================================ -Note: Legacy callback fields like ``traceloop.callback.name`` / ``traceloop.callback.id`` are **not** currently -mapped. Add them to the mapping table if/when needed. -Legacy Stripping ----------------- -By default, successfully mapped legacy keys are removed after translation to reduce attribute duplication. -Control via environment: +Installation +------------ +.. code-block:: bash -``OTEL_GENAI_TRACELOOP_TRANSLATOR_STRIP_LEGACY`` - * ``true`` / unset: strip mapped ``traceloop.*`` keys - * ``false`` / ``0``: retain both legacy and new keys + pip install opentelemetry-util-genai-traceloop-translator -Only keys that were actually mapped/normalized are stripped to avoid accidental data loss. +Quick Start (Automatic Registration) +------------------------------------- +The easiest way to use the translator is to simply import it - no manual setup required! -Environment Flags ------------------ -``OTEL_INSTRUMENTATION_GENAI_EMITTERS``: Include ``traceloop_translator`` alongside ``span`` (e.g. ``span,traceloop_translator``) -``OTEL_GENAI_CONTENT_CAPTURE``: Enable input/output content mapping ("1" to enable; "0" disables) -``OTEL_GENAI_MAP_CORRELATION_TO_CONVERSATION``: Toggle correlation → conversation id mapping -``OTEL_GENAI_TRACELOOP_TRANSLATOR_STRIP_LEGACY``: Toggle legacy key removal (see above) +.. code-block:: python -Example -------- -.. code-block:: bash + import os + from openai import OpenAI - export OTEL_INSTRUMENTATION_GENAI_EMITTERS="span,traceloop_translator" - export OTEL_GENAI_CONTENT_CAPTURE="1" -.. code-block:: python + from traceloop.sdk import Traceloop + from traceloop.sdk.decorators import workflow - from opentelemetry.util.genai.handler import get_telemetry_handler - from opentelemetry.util.genai.types import LLMInvocation, InputMessage, Text - - handler = get_telemetry_handler() - inv = LLMInvocation( - request_model="gpt-4", - input_messages=[InputMessage(role="user", parts=[Text("Hello")])], - attributes={ - "traceloop.workflow.name": "flowA", - "traceloop.entity.name": "AgentA", - "traceloop.entity.input": ["Hello"], # various shapes normalized - }, - ) - handler.start_llm(inv) - inv.output_messages = [] # add output content if desired - handler.stop_llm(inv) - # inv.attributes now include gen_ai.workflow.name, gen_ai.agent.name, gen_ai.input.messages (legacy removed if strip enabled) + client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) -Installation ------------- -.. code-block:: bash + Traceloop.init(app_name="story_service") - pip install opentelemetry-util-genai-traceloop-translator + + @workflow(name="streaming_story") + def joke_workflow(): + stream = client.chat.completions.create( + model="gpt-4o-2024-05-13", + messages=[{"role": "user", "content": "Tell me a story about opentelemetry"}], + stream=True, + ) + + for part in stream: + print(part.choices[0].delta.content or "", end="") + print() + + + joke_workflow() + # The translator automatically creates new gen_ai.* attributes based on the mapping. Tests ----- diff --git a/util/opentelemetry-util-genai-traceloop-translator/examples/traceloop_example.py b/util/opentelemetry-util-genai-traceloop-translator/examples/traceloop_example.py index c9105cef96..7185f13286 100644 --- a/util/opentelemetry-util-genai-traceloop-translator/examples/traceloop_example.py +++ b/util/opentelemetry-util-genai-traceloop-translator/examples/traceloop_example.py @@ -25,10 +25,14 @@ try: from traceloop.sdk import Traceloop from traceloop.sdk.decorators import workflow + # Initialize Traceloop (disable_batch so spans flush immediately for local demos) Traceloop.init(disable_batch=True, api_endpoint="http://localhost:4318") except ImportError: - print("[traceloop] traceloop-sdk not installed. Run 'pip install traceloop-sdk' to enable workflow tracing.") + print( + "[traceloop] traceloop-sdk not installed. Run 'pip install traceloop-sdk' to enable workflow tracing." + ) + @workflow(name="llm_invocation_example") def run_example(provider=None): @@ -38,11 +42,11 @@ def run_example(provider=None): max_retries=2, ) messages = [ - ( - "system", - "You are a helpful assistant.", - ), - ("human", "what is the significance of 42?."), + ( + "system", + "You are a helpful assistant.", + ), + ("human", "what is the significance of 42?."), ] ai_msg = llm.invoke(messages) return ai_msg @@ -50,10 +54,15 @@ def run_example(provider=None): if __name__ == "__main__": # Enable translator emitter + keep legacy keys for demonstration - os.environ.setdefault("OTEL_INSTRUMENTATION_GENAI_EMITTERS", "span,traceloop_translator") - os.environ.setdefault("OTEL_GENAI_TRACELOOP_TRANSLATOR_STRIP_LEGACY", "false") # keep original traceloop.* to compare - os.environ.setdefault("OTEL_GENAI_CONTENT_CAPTURE", "1") # ensure input/output content mapping - + os.environ.setdefault( + "OTEL_INSTRUMENTATION_GENAI_EMITTERS", "span,traceloop_translator" + ) + os.environ.setdefault( + "OTEL_GENAI_TRACELOOP_TRANSLATOR_STRIP_LEGACY", "false" + ) # keep original traceloop.* to compare + os.environ.setdefault( + "OTEL_GENAI_CONTENT_CAPTURE", "1" + ) # ensure input/output content mapping # Avoid overriding an existing SDK TracerProvider. Reuse if already configured. existing = trace.get_tracer_provider() @@ -61,7 +70,9 @@ def run_example(provider=None): provider = existing # Attach a ConsoleSpanExporter for demo purposes (may duplicate if already added). try: - provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) + provider.add_span_processor( + SimpleSpanProcessor(ConsoleSpanExporter()) + ) except Exception: pass else: @@ -73,7 +84,7 @@ def run_example(provider=None): except Exception: # If another component set a provider concurrently, fall back to that. provider = trace.get_tracer_provider() - + # Build a telemetry handler (singleton) – emitters are chosen via env vars handler = get_telemetry_handler(tracer_provider=provider) @@ -84,15 +95,17 @@ def run_example(provider=None): input_messages=[InputMessage(role="user", parts=[Text("Hello")])], ) # Populate attributes after construction (avoids mismatch if constructor signature changes) - invocation.attributes.update({ - "traceloop.workflow.name": "demo_flow", # workflow identifier - "traceloop.entity.name": "ChatLLM", # agent/entity name - "traceloop.entity.path": "demo_flow/ChatLLM/step_1", # hierarchical path - "traceloop.entity.output": [ # raw input messages (will be normalized & mapped) - {"role": "user", "content": "Hello"} - ], - "traceloop.span.kind": "workflow", # helps infer gen_ai.operation.name - }) + invocation.attributes.update( + { + "traceloop.workflow.name": "demo_flow", # workflow identifier + "traceloop.entity.name": "ChatLLM", # agent/entity name + "traceloop.entity.path": "demo_flow/ChatLLM/step_1", # hierarchical path + "traceloop.entity.output": [ # raw input messages (will be normalized & mapped) + {"role": "user", "content": "Hello"} + ], + "traceloop.span.kind": "workflow", # helps infer gen_ai.operation.name + } + ) print("Before start (raw attributes):", invocation.attributes) handler.start_llm(invocation) @@ -109,4 +122,4 @@ def run_example(provider=None): handler.stop_llm(invocation) print("After stop (translated attributes):", invocation.attributes) - run_example(provider=provider) \ No newline at end of file + run_example(provider=provider) diff --git a/util/opentelemetry-util-genai-traceloop-translator/examples/traceloop_processor_example.py b/util/opentelemetry-util-genai-traceloop-translator/examples/traceloop_processor_example.py new file mode 100644 index 0000000000..a811fbb445 --- /dev/null +++ b/util/opentelemetry-util-genai-traceloop-translator/examples/traceloop_processor_example.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import os +from dotenv import load_dotenv + +# Load .env first +load_dotenv() + +try: + from traceloop.sdk import Traceloop + from traceloop.sdk.decorators import task, workflow, agent, tool + from openai import OpenAI + + Traceloop.init(disable_batch=True, api_endpoint="http://localhost:4318") +except ImportError: + raise RuntimeError("Install traceloop-sdk: pip install traceloop-sdk") + +client = OpenAI(api_key=os.environ["OPENAI_API_KEY"]) + + +@agent(name="joke_translation") +def translate_joke_to_pirate(joke: str): + completion = client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": f"Translate the below joke to pirate-like english:\n\n{joke}"}], + ) + + history_jokes_tool() + + return completion.choices[0].message.content + + +@tool(name="history_jokes") +def history_jokes_tool(): + completion = client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": f"get some history jokes"}], + ) + + return completion.choices[0].message.content + +@task(name="joke_creation") +def create_joke(): + completion = client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[ + {"role": "user", "content": "Tell me a joke about opentelemetry"} + ], + ) + + return completion.choices[0].message.content + + +@task(name="signature_generation") +def generate_signature(joke: str): + completion = client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[ + {"role": "user", "content": "Also tell me a joke about yourself!"} + ], + ) + + return completion.choices[0].message.content + + +@workflow(name="pirate_joke_generator") +def joke_workflow(): + eng_joke = create_joke() + # pirate_joke = translate_joke_to_pirate(eng_joke) + print(translate_joke_to_pirate(eng_joke)) + signature = generate_signature(eng_joke) + print(eng_joke + "\n\n" + signature) + + +if __name__ == "__main__": + # run_example() + joke_workflow() diff --git a/util/opentelemetry-util-genai-traceloop-translator/pyproject.toml b/util/opentelemetry-util-genai-traceloop-translator/pyproject.toml index f794a219fc..f46ac855a5 100644 --- a/util/opentelemetry-util-genai-traceloop-translator/pyproject.toml +++ b/util/opentelemetry-util-genai-traceloop-translator/pyproject.toml @@ -28,12 +28,16 @@ dependencies = [ "opentelemetry-instrumentation ~= 0.52b1", "opentelemetry-semantic-conventions ~= 0.52b1", "opentelemetry-api>=1.31.0", + "opentelemetry-sdk>=1.31.0", "opentelemetry-util-genai", # depends on base utilities & handler ] [project.entry-points."opentelemetry_util_genai_emitters"] traceloop_translator = "opentelemetry.util.genai.emitters.traceloop_translator:traceloop_translator_emitters" +[project.entry-points.opentelemetry_configurator] +traceloop_translator = "opentelemetry.util.genai.traceloop:_auto_enable" + [project.optional-dependencies] test = ["pytest>=7.0.0"] @@ -51,4 +55,10 @@ include = [ ] [tool.hatch.build.targets.wheel] -packages = ["src/opentelemetry"] \ No newline at end of file +packages = ["src/opentelemetry"] +include = [ + "src/opentelemetry_util_genai_traceloop_translator.pth", +] + +[tool.hatch.build.targets.wheel.force-include] +"src/opentelemetry_util_genai_traceloop_translator.pth" = "opentelemetry_util_genai_traceloop_translator.pth" \ No newline at end of file diff --git a/util/opentelemetry-util-genai-traceloop-translator/requirements-examples.txt b/util/opentelemetry-util-genai-traceloop-translator/requirements-examples.txt index 2742a6db6a..99272a3094 100644 --- a/util/opentelemetry-util-genai-traceloop-translator/requirements-examples.txt +++ b/util/opentelemetry-util-genai-traceloop-translator/requirements-examples.txt @@ -6,5 +6,7 @@ # OpenTelemetry SDK pieces used in the example opentelemetry-sdk>=1.31.1 - +traceloop-sdk +python-dotenv +openai # (ConsoleSpanExporter is in the SDK extras; no additional exporter deps needed.) diff --git a/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/emitters/content_normalizer.py b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/emitters/content_normalizer.py index 4fca5b6b00..efa19221ad 100644 --- a/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/emitters/content_normalizer.py +++ b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/emitters/content_normalizer.py @@ -27,7 +27,9 @@ def _coerce_text_part(content: Any) -> Dict[str, Any]: return {"type": "text", "content": content} -def normalize_traceloop_content(raw: Any, direction: str) -> List[Dict[str, Any]]: +def normalize_traceloop_content( + raw: Any, direction: str +) -> List[Dict[str, Any]]: """Normalize traceloop entity input/output blob into GenAI message schema. direction: 'input' | 'output' @@ -38,10 +40,16 @@ def normalize_traceloop_content(raw: Any, direction: str) -> List[Dict[str, Any] normalized: List[Dict[str, Any]] = [] limit = INPUT_MAX if direction == "input" else OUTPUT_MAX for m in raw[:limit]: - role = m.get("role", "user" if direction == "input" else "assistant") + role = m.get( + "role", "user" if direction == "input" else "assistant" + ) content_val = m.get("content") if content_val is None: - temp = {k: v for k, v in m.items() if k not in ("role", "finish_reason", "finishReason")} + temp = { + k: v + for k, v in m.items() + if k not in ("role", "finish_reason", "finishReason") + } content_val = temp or "" parts = [_coerce_text_part(content_val)] msg: Dict[str, Any] = {"role": role, "parts": parts} @@ -54,34 +62,75 @@ def normalize_traceloop_content(raw: Any, direction: str) -> List[Dict[str, Any] # Dict variants if isinstance(raw, dict): # OpenAI choices - if direction == "output" and "choices" in raw and isinstance(raw["choices"], list): + if ( + direction == "output" + and "choices" in raw + and isinstance(raw["choices"], list) + ): out_msgs: List[Dict[str, Any]] = [] for choice in raw["choices"][:OUTPUT_MAX]: - message = choice.get("message") if isinstance(choice, dict) else None + message = ( + choice.get("message") if isinstance(choice, dict) else None + ) if message and isinstance(message, dict): role = message.get("role", "assistant") - content_val = message.get("content") or message.get("text") or "" + content_val = ( + message.get("content") or message.get("text") or "" + ) else: role = "assistant" - content_val = choice.get("text") or choice.get("content") or json.dumps(choice) + content_val = ( + choice.get("text") + or choice.get("content") + or json.dumps(choice) + ) parts = [_coerce_text_part(content_val)] - finish_reason = choice.get("finish_reason") or choice.get("finishReason") or "stop" - out_msgs.append({"role": role, "parts": parts, "finish_reason": finish_reason}) + finish_reason = ( + choice.get("finish_reason") + or choice.get("finishReason") + or "stop" + ) + out_msgs.append( + { + "role": role, + "parts": parts, + "finish_reason": finish_reason, + } + ) return out_msgs # Gemini candidates - if direction == "output" and "candidates" in raw and isinstance(raw["candidates"], list): + if ( + direction == "output" + and "candidates" in raw + and isinstance(raw["candidates"], list) + ): out_msgs: List[Dict[str, Any]] = [] for cand in raw["candidates"][:OUTPUT_MAX]: role = cand.get("role", "assistant") cand_content = cand.get("content") if isinstance(cand_content, list): - joined = "\n".join([str(p.get("text", p.get("content", p))) for p in cand_content]) + joined = "\n".join( + [ + str(p.get("text", p.get("content", p))) + for p in cand_content + ] + ) content_val = joined else: content_val = cand_content or json.dumps(cand) parts = [_coerce_text_part(content_val)] - finish_reason = cand.get("finish_reason") or cand.get("finishReason") or "stop" - out_msgs.append({"role": role, "parts": parts, "finish_reason": finish_reason}) + finish_reason = ( + cand.get("finish_reason") + or cand.get("finishReason") + or "stop" + ) + out_msgs.append( + { + "role": role, + "parts": parts, + "finish_reason": finish_reason, + } + ) return out_msgs # messages array if "messages" in raw and isinstance(raw["messages"], list): @@ -92,9 +141,20 @@ def normalize_traceloop_content(raw: Any, direction: str) -> List[Dict[str, Any] if isinstance(inner, list): return normalize_traceloop_content(inner, direction) if isinstance(inner, dict): - return [{"role": "user" if direction == "input" else "assistant", "parts": [_coerce_text_part(inner)]}] + return [ + { + "role": "user" + if direction == "input" + else "assistant", + "parts": [_coerce_text_part(inner)], + } + ] # tool calls - if direction == "output" and "tool_calls" in raw and isinstance(raw["tool_calls"], list): + if ( + direction == "output" + and "tool_calls" in raw + and isinstance(raw["tool_calls"], list) + ): out_msgs: List[Dict[str, Any]] = [] for tc in raw["tool_calls"][:OUTPUT_MAX]: part = { @@ -103,12 +163,28 @@ def normalize_traceloop_content(raw: Any, direction: str) -> List[Dict[str, Any] "arguments": tc.get("arguments"), "id": tc.get("id"), } - finish_reason = tc.get("finish_reason") or tc.get("finishReason") or "tool_call" - out_msgs.append({"role": "assistant", "parts": [part], "finish_reason": finish_reason}) + finish_reason = ( + tc.get("finish_reason") + or tc.get("finishReason") + or "tool_call" + ) + out_msgs.append( + { + "role": "assistant", + "parts": [part], + "finish_reason": finish_reason, + } + ) return out_msgs body = {k: v for k, v in raw.items() if k != "role"} if direction == "output": - return [{"role": "assistant", "parts": [_coerce_text_part(body)], "finish_reason": "stop"}] + return [ + { + "role": "assistant", + "parts": [_coerce_text_part(body)], + "finish_reason": "stop", + } + ] return [{"role": "user", "parts": [_coerce_text_part(body)]}] # JSON string @@ -118,7 +194,13 @@ def normalize_traceloop_content(raw: Any, direction: str) -> List[Dict[str, Any] return normalize_traceloop_content(parsed, direction) except Exception: if direction == "output": - return [{"role": "assistant", "parts": [_coerce_text_part(raw)], "finish_reason": "stop"}] + return [ + { + "role": "assistant", + "parts": [_coerce_text_part(raw)], + "finish_reason": "stop", + } + ] return [{"role": "user", "parts": [_coerce_text_part(raw)]}] # List of raw strings @@ -126,12 +208,23 @@ def normalize_traceloop_content(raw: Any, direction: str) -> List[Dict[str, Any] msgs: List[Dict[str, Any]] = [] limit = INPUT_MAX if direction == "input" else OUTPUT_MAX for s in raw[:limit]: - msgs.append({"role": "user" if direction == "input" else "assistant", "parts": [_coerce_text_part(s)]}) + msgs.append( + { + "role": "user" if direction == "input" else "assistant", + "parts": [_coerce_text_part(s)], + } + ) return msgs # Generic fallback if direction == "output": - return [{"role": "assistant", "parts": [_coerce_text_part(raw)], "finish_reason": "stop"}] + return [ + { + "role": "assistant", + "parts": [_coerce_text_part(raw)], + "finish_reason": "stop", + } + ] return [{"role": "user", "parts": [_coerce_text_part(raw)]}] @@ -142,4 +235,4 @@ def normalize_traceloop_content(raw: Any, direction: str) -> List[Dict[str, Any] "OUTPUT_MAX", "MSG_CONTENT_MAX", "PROMPT_TEMPLATE_MAX", -] \ No newline at end of file +] diff --git a/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/emitters/message_reconstructor.py b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/emitters/message_reconstructor.py new file mode 100644 index 0000000000..a7064dc508 --- /dev/null +++ b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/emitters/message_reconstructor.py @@ -0,0 +1,217 @@ +""" +Reconstruct LangChain message objects from Traceloop serialized data. + +This module enables evaluations to work with Traceloop SDK alone, +without requiring LangChain instrumentation. +""" + +from __future__ import annotations + +import json +import logging +from typing import Any, Dict, List, Optional + +from opentelemetry.util.genai.emitters.content_normalizer import normalize_traceloop_content + +_logger = logging.getLogger(__name__) + + +def reconstruct_messages_from_traceloop( + input_data: Any, + output_data: Any +) -> tuple[Optional[List[Any]], Optional[List[Any]]]: + """ + Reconstruct LangChain message objects from Traceloop serialized data. + + Args: + input_data: Raw traceloop.entity.input value (string or dict) + output_data: Raw traceloop.entity.output value (string or dict) + + Returns: + Tuple of (input_messages, output_messages) as LangChain BaseMessage lists, + or (None, None) if reconstruction fails or LangChain is not available. + + This function: + 1. Parses the JSON-serialized Traceloop data + 2. Normalizes it to standard message format + 3. Reconstructs LangChain BaseMessage objects (HumanMessage, AIMessage, etc.) + 4. Returns them for use in evaluations + + If LangChain is not installed, returns (None, None) gracefully. + """ + try: + # Import LangChain message classes (optional dependency) + try: + from langchain_core.messages import ( + BaseMessage, + HumanMessage, + AIMessage, + SystemMessage, + ToolMessage, + FunctionMessage, + ) + except ImportError: + _logger.debug( + "LangChain not available; message reconstruction skipped. " + "Install langchain-core to enable evaluations with Traceloop." + ) + return None, None + + input_messages = None + output_messages = None + + # Reconstruct input messages + if input_data: + try: + # Normalize the Traceloop data to standard format + normalized_input = normalize_traceloop_content(input_data, "input") + input_messages = _convert_normalized_to_langchain( + normalized_input, "input" + ) + _logger.debug( + f"Reconstructed {len(input_messages)} input messages from Traceloop data" + ) + except Exception as e: + _logger.debug(f"Failed to reconstruct input messages: {e}") + + # Reconstruct output messages + if output_data: + try: + # Normalize the Traceloop data to standard format + normalized_output = normalize_traceloop_content(output_data, "output") + output_messages = _convert_normalized_to_langchain( + normalized_output, "output" + ) + _logger.debug( + f"Reconstructed {len(output_messages)} output messages from Traceloop data" + ) + except Exception as e: + _logger.debug(f"Failed to reconstruct output messages: {e}") + + return input_messages, output_messages + + except Exception as e: + _logger.debug(f"Message reconstruction failed: {e}") + return None, None + + +def _convert_normalized_to_langchain( + normalized_messages: List[Dict[str, Any]], + direction: str +) -> List[Any]: + """ + Convert normalized message format to LangChain BaseMessage objects. + + Args: + normalized_messages: List of normalized messages from normalize_traceloop_content + direction: 'input' or 'output' (for logging/debugging) + + Returns: + List of LangChain BaseMessage objects + + Normalized message format: + { + "role": "user" | "assistant" | "system" | "tool" | "function", + "parts": [{"type": "text", "content": "..."}, ...], + "finish_reason": "stop" # optional, for output messages + } + """ + from langchain_core.messages import ( + HumanMessage, + AIMessage, + SystemMessage, + ToolMessage, + FunctionMessage, + ) + + langchain_messages = [] + + for msg in normalized_messages: + role = msg.get("role", "user" if direction == "input" else "assistant") + parts = msg.get("parts", []) + + # Extract content from parts (typically just text parts) + content_parts = [] + for part in parts: + if isinstance(part, dict): + if part.get("type") == "text": + content_parts.append(part.get("content", "")) + elif part.get("type") == "tool_call": + # For tool calls, keep the structured data + content_parts.append(json.dumps(part)) + else: + # Unknown part type, serialize it + content_parts.append(json.dumps(part)) + else: + # Non-dict part, stringify it + content_parts.append(str(part)) + + # Join all content parts + content = "\n".join(content_parts) if content_parts else "" + + # Map role to LangChain message class + if role == "user": + langchain_msg = HumanMessage(content=content) + elif role == "assistant": + # Include finish_reason in additional_kwargs if present + additional_kwargs = {} + if "finish_reason" in msg: + additional_kwargs["finish_reason"] = msg["finish_reason"] + langchain_msg = AIMessage( + content=content, + additional_kwargs=additional_kwargs if additional_kwargs else {} + ) + elif role == "system": + langchain_msg = SystemMessage(content=content) + elif role == "tool": + langchain_msg = ToolMessage( + content=content, + tool_call_id=msg.get("tool_call_id", "unknown") + ) + elif role == "function": + langchain_msg = FunctionMessage( + content=content, + name=msg.get("name", "unknown") + ) + else: + # Unknown role, default to HumanMessage + _logger.debug(f"Unknown role '{role}', defaulting to HumanMessage") + langchain_msg = HumanMessage(content=content) + + # CRITICAL FIX: Add .parts attribute for GenAI evaluation compatibility + # GenAI evaluations expect message.parts (list of Text/ToolCall objects) + # but LangChain messages only have .content (str) + # We add .parts here to bridge the gap without requiring LangChain instrumentation + try: + # Import Text from GenAI types + from opentelemetry.util.genai.types import Text + + # Create a Text part from the content + text_part = Text(content=content, type="text") + + # Add .parts attribute (monkeypatch on the instance) + langchain_msg.parts = [text_part] # type: ignore[attr-defined] + + _logger.debug( + f"Added .parts attribute to {type(langchain_msg).__name__} " + f"for evaluation compatibility" + ) + except ImportError: + # GenAI types not available, evaluations won't work but won't crash + _logger.debug( + "GenAI types not available; .parts attribute not added. " + "Evaluations will not work." + ) + except Exception as e: + # Unexpected error, log but don't crash + _logger.debug(f"Failed to add .parts attribute: {e}") + + langchain_messages.append(langchain_msg) + + return langchain_messages + + +__all__ = [ + "reconstruct_messages_from_traceloop", +] + diff --git a/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/emitters/traceloop_translator.py b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/emitters/traceloop_translator.py index 97ec039cac..d3362786ab 100644 --- a/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/emitters/traceloop_translator.py +++ b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/emitters/traceloop_translator.py @@ -5,12 +5,15 @@ import json from typing import Dict, Any -from opentelemetry.util.genai.emitters.spec import EmitterFactoryContext, EmitterSpec +from opentelemetry.util.genai.emitters.spec import ( + EmitterFactoryContext, + EmitterSpec, +) from opentelemetry.util.genai.interfaces import EmitterMeta from opentelemetry.util.genai.types import LLMInvocation, Error # https://github.com/traceloop/openllmetry/blob/main/packages/opentelemetry-semantic-conventions-ai/opentelemetry/semconv_ai/__init__.py -# TODO: rename appropriately +# TODO: rename appropriately _TRACELOOP_TO_SEMCONV: Dict[str, str] = { # Workflow / entity hierarchy (proposed extension namespace) "traceloop.workflow.name": "gen_ai.workflow.name", @@ -42,14 +45,18 @@ } _STRIP_FLAG = "OTEL_GENAI_TRACELOOP_TRANSLATOR_STRIP_LEGACY" + + # NOTE: Previous implementation evaluated strip flag at import time and never applied. # We now evaluate dynamically for each translation to honor late env changes and actually strip. def _is_strip_legacy_enabled() -> bool: return os.getenv(_STRIP_FLAG, "true") not in ("0", "false", "False") + # Content capture opt-in: default off for sensitive prompt/template + messages _CONTENT_CAPTURE_FLAG = "OTEL_GENAI_CONTENT_CAPTURE" + def _is_content_capture_enabled() -> bool: """Return True if content capture is enabled (evaluated at call time). @@ -61,14 +68,22 @@ def _is_content_capture_enabled() -> bool: val = os.getenv(_CONTENT_CAPTURE_FLAG, "1") return val not in ("0", "false", "False") + # Correlation->conversation mapping strictness toggle _MAP_CORRELATION_FLAG = "OTEL_GENAI_MAP_CORRELATION_TO_CONVERSATION" -_map_correlation = os.getenv(_MAP_CORRELATION_FLAG, "true") not in ("0", "false", "False") +_map_correlation = os.getenv(_MAP_CORRELATION_FLAG, "true") not in ( + "0", + "false", + "False", +) # Regex to validate conversation id (avoid mapping arbitrary large/high-cardinality values) _CONVERSATION_ID_PATTERN = re.compile(r"^[A-Za-z0-9._\-]{1,128}$") -from .content_normalizer import normalize_traceloop_content, maybe_truncate_template +from .content_normalizer import ( + normalize_traceloop_content, + maybe_truncate_template, +) class TraceloopTranslatorEmitter(EmitterMeta): @@ -89,7 +104,9 @@ def _translate_attributes(self, invocation: LLMInvocation) -> None: attrs = getattr(invocation, "attributes", None) if not attrs: return - legacy_keys_to_strip: list[str] = [] # track keys we successfully mapped; only strip those + legacy_keys_to_strip: list[ + str + ] = [] # track keys we successfully mapped; only strip those # Capture original presence to decide on fallback later had_traceloop_input = "traceloop.entity.input" in attrs had_traceloop_output = "traceloop.entity.output" in attrs @@ -97,7 +114,11 @@ def _translate_attributes(self, invocation: LLMInvocation) -> None: value = attrs.get(key) is_prefixed = key.startswith("traceloop.") raw_key = key - if not (is_prefixed or raw_key in _TRACELOOP_TO_SEMCONV or raw_key in _CONTENT_MAPPING): + if not ( + is_prefixed + or raw_key in _TRACELOOP_TO_SEMCONV + or raw_key in _CONTENT_MAPPING + ): continue # Content mapping (entity input/output) @@ -121,26 +142,34 @@ def _translate_attributes(self, invocation: LLMInvocation) -> None: if mapped == "gen_ai.conversation.id": if not _map_correlation: mapped = None - elif not isinstance(value, str) or not _CONVERSATION_ID_PATTERN.match(value): + elif not isinstance( + value, str + ) or not _CONVERSATION_ID_PATTERN.match(value): mapped = None if mapped: - if mapped == "gen_ai.prompt.template" and not _is_content_capture_enabled(): + if ( + mapped == "gen_ai.prompt.template" + and not _is_content_capture_enabled() + ): # Skip sensitive template unless opted-in mapped = None - elif mapped == "gen_ai.prompt.template" and isinstance(value, str): + elif mapped == "gen_ai.prompt.template" and isinstance( + value, str + ): value = maybe_truncate_template(value) if mapped: attrs.setdefault(mapped, value) legacy_keys_to_strip.append(raw_key) # Also push onto span immediately (order-independent visibility) span = getattr(invocation, "span", None) - if span is not None and mapped not in getattr(span, "attributes", {}): + if span is not None and mapped not in getattr( + span, "attributes", {} + ): try: span.set_attribute(mapped, value) except Exception: # pragma: no cover pass - # Heuristic: infer operation name for tool/workflow invocations if absent if attrs.get("gen_ai.operation.name") is None: span_kind = attrs.get("traceloop.span.kind") @@ -151,7 +180,11 @@ def _translate_attributes(self, invocation: LLMInvocation) -> None: # Fallback: if caller provided invocation.input_messages but no traceloop.entity.input # and translator didn't already set gen_ai.input.messages, derive it now. - if _is_content_capture_enabled() and not had_traceloop_input and "gen_ai.input.messages" not in attrs: + if ( + _is_content_capture_enabled() + and not had_traceloop_input + and "gen_ai.input.messages" not in attrs + ): input_messages = getattr(invocation, "input_messages", None) if input_messages: try: @@ -159,43 +192,83 @@ def _translate_attributes(self, invocation: LLMInvocation) -> None: serialized = json.dumps(norm) attrs.setdefault("gen_ai.input.messages", serialized) span = getattr(invocation, "span", None) - if span is not None and "gen_ai.input.messages" not in getattr(span, "attributes", {}): + if ( + span is not None + and "gen_ai.input.messages" + not in getattr(span, "attributes", {}) + ): try: - span.set_attribute("gen_ai.input.messages", serialized) + span.set_attribute( + "gen_ai.input.messages", serialized + ) except Exception: # pragma: no cover pass except Exception: # Best effort; store raw repr - fallback = json.dumps([getattr(m, "__dict__", str(m)) for m in input_messages]) + fallback = json.dumps( + [ + getattr(m, "__dict__", str(m)) + for m in input_messages + ] + ) attrs.setdefault("gen_ai.input.messages", fallback) span = getattr(invocation, "span", None) - if span is not None and "gen_ai.input.messages" not in getattr(span, "attributes", {}): + if ( + span is not None + and "gen_ai.input.messages" + not in getattr(span, "attributes", {}) + ): try: - span.set_attribute("gen_ai.input.messages", fallback) + span.set_attribute( + "gen_ai.input.messages", fallback + ) except Exception: # pragma: no cover pass # Fallback for output: only after model produced output_messages - if _is_content_capture_enabled() and not had_traceloop_output and "gen_ai.output.messages" not in attrs: + if ( + _is_content_capture_enabled() + and not had_traceloop_output + and "gen_ai.output.messages" not in attrs + ): output_messages = getattr(invocation, "output_messages", None) if output_messages: try: - norm = normalize_traceloop_content(output_messages, "output") + norm = normalize_traceloop_content( + output_messages, "output" + ) serialized = json.dumps(norm) attrs.setdefault("gen_ai.output.messages", serialized) span = getattr(invocation, "span", None) - if span is not None and "gen_ai.output.messages" not in getattr(span, "attributes", {}): + if ( + span is not None + and "gen_ai.output.messages" + not in getattr(span, "attributes", {}) + ): try: - span.set_attribute("gen_ai.output.messages", serialized) + span.set_attribute( + "gen_ai.output.messages", serialized + ) except Exception: # pragma: no cover pass except Exception: - fallback = json.dumps([getattr(m, "__dict__", str(m)) for m in output_messages]) + fallback = json.dumps( + [ + getattr(m, "__dict__", str(m)) + for m in output_messages + ] + ) attrs.setdefault("gen_ai.output.messages", fallback) span = getattr(invocation, "span", None) - if span is not None and "gen_ai.output.messages" not in getattr(span, "attributes", {}): + if ( + span is not None + and "gen_ai.output.messages" + not in getattr(span, "attributes", {}) + ): try: - span.set_attribute("gen_ai.output.messages", fallback) + span.set_attribute( + "gen_ai.output.messages", fallback + ) except Exception: # pragma: no cover pass @@ -223,9 +296,9 @@ def _factory(ctx: EmitterFactoryContext) -> TraceloopTranslatorEmitter: name="TraceloopTranslator", category="span", factory=_factory, - mode="append" + mode="append", ) ] -__all__ = ["TraceloopTranslatorEmitter", "traceloop_translator_emitters"] \ No newline at end of file +__all__ = ["TraceloopTranslatorEmitter", "traceloop_translator_emitters"] diff --git a/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/processor/__init__.py b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/processor/__init__.py new file mode 100644 index 0000000000..8ebd5746e1 --- /dev/null +++ b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/processor/__init__.py @@ -0,0 +1,19 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Traceloop span processor and transformation utilities.""" + +from .traceloop_span_processor import TraceloopSpanProcessor + +__all__ = ["TraceloopSpanProcessor"] diff --git a/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/processor/traceloop_span_processor.py b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/processor/traceloop_span_processor.py new file mode 100644 index 0000000000..229fd27a36 --- /dev/null +++ b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/processor/traceloop_span_processor.py @@ -0,0 +1,868 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +import fnmatch +import json +import logging +import os +import re +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional + +from opentelemetry.context import Context +from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor +from opentelemetry.trace import Span + +from opentelemetry.util.genai.types import LLMInvocation +from opentelemetry.util.genai.handler import ( + get_telemetry_handler, + TelemetryHandler, +) +from opentelemetry.util.genai.emitters.content_normalizer import ( + normalize_traceloop_content, +) +from opentelemetry.util.genai.emitters.message_reconstructor import ( + reconstruct_messages_from_traceloop, +) + +_ENV_RULES = "OTEL_GENAI_SPAN_TRANSFORM_RULES" + + +@dataclass +class TransformationRule: + """Represents a single conditional transformation rule. + + Fields map closely to the JSON structure accepted via the environment + variable. All fields are optional; empty rule never matches. + """ + + match_name: Optional[str] = None # glob pattern (e.g. "chat *") + match_scope: Optional[str] = None # regex or substring (case-insensitive) + match_attributes: Dict[str, Optional[str]] = field(default_factory=dict) + + attribute_transformations: Dict[str, Any] = field(default_factory=dict) + name_transformations: Dict[str, str] = field(default_factory=dict) + traceloop_attributes: Dict[str, Any] = field(default_factory=dict) + + def matches( + self, span: ReadableSpan + ) -> bool: # pragma: no cover - simple logic + if self.match_name: + if not fnmatch.fnmatch(span.name, self.match_name): + return False + if self.match_scope: + scope = getattr(span, "instrumentation_scope", None) + scope_name = getattr(scope, "name", "") if scope else "" + pattern = self.match_scope + # Accept either regex (contains meta chars) or simple substring + try: + if any(ch in pattern for ch in ".^$|()[]+?\\"): + if not re.search(pattern, scope_name, re.IGNORECASE): + return False + else: + if pattern.lower() not in scope_name.lower(): + return False + except re.error: + # Bad regex – treat as non-match but log once + logging.warning("Invalid regex in match_scope: %s", pattern) + return False + if self.match_attributes: + for k, expected in self.match_attributes.items(): + if k not in span.attributes: + return False + if expected is not None and str(span.attributes.get(k)) != str( + expected + ): + return False + return True + + +def _load_rules_from_env() -> List[TransformationRule]: + raw = os.getenv(_ENV_RULES) + if not raw: + return [] + try: + data = json.loads(raw) + rules_spec = data.get("rules") if isinstance(data, dict) else None + if not isinstance(rules_spec, list): + logging.warning("%s must contain a 'rules' list", _ENV_RULES) + return [] + rules: List[TransformationRule] = [] + for r in rules_spec: + if not isinstance(r, dict): + continue + match = ( + r.get("match", {}) if isinstance(r.get("match"), dict) else {} + ) + rules.append( + TransformationRule( + match_name=match.get("name"), + match_scope=match.get("scope"), + match_attributes=match.get("attributes", {}) or {}, + attribute_transformations=r.get( + "attribute_transformations", {} + ) + or {}, + name_transformations=r.get("name_transformations", {}) + or {}, + traceloop_attributes=r.get("traceloop_attributes", {}) + or {}, + ) + ) + return rules + except Exception as exc: # broad: we never want to break app startup + logging.warning("Failed to parse %s: %s", _ENV_RULES, exc) + return [] + + +class TraceloopSpanProcessor(SpanProcessor): + """ + A span processor that automatically applies transformation rules to spans. + + This processor can be added to your TracerProvider to automatically transform + all spans according to your transformation rules. + """ + + def __init__( + self, + attribute_transformations: Optional[Dict[str, Any]] = None, + name_transformations: Optional[Dict[str, str]] = None, + traceloop_attributes: Optional[Dict[str, Any]] = None, + span_filter: Optional[Callable[[ReadableSpan], bool]] = None, + rules: Optional[List[TransformationRule]] = None, + load_env_rules: bool = True, + telemetry_handler: Optional[TelemetryHandler] = None, + mutate_original_span: bool = True, + ): + """ + Initialize the Traceloop span processor. + + Args: + attribute_transformations: Rules for transforming span attributes + name_transformations: Rules for transforming span names + traceloop_attributes: Additional Traceloop-specific attributes to add + span_filter: Optional filter function to determine which spans to transform + rules: Optional list of TransformationRule objects for conditional transformations + load_env_rules: Whether to load transformation rules from OTEL_GENAI_SPAN_TRANSFORM_RULES + telemetry_handler: Optional TelemetryHandler for emitting transformed spans + mutate_original_span: Whether to mutate original spans at the processor level. + This flag works in conjunction with the mutate_original_span field on + individual GenAI objects. Both must be True for mutation to occur. + Default is True for backward compatibility. + """ + self.attribute_transformations = attribute_transformations or {} + self.name_transformations = name_transformations or {} + self.traceloop_attributes = traceloop_attributes or {} + self.span_filter = span_filter or self._default_span_filter + # Load rule set (env + explicit). Explicit rules first for precedence. + env_rules = _load_rules_from_env() if load_env_rules else [] + self.rules: List[TransformationRule] = list(rules or []) + env_rules + self.telemetry_handler = telemetry_handler + self.mutate_original_span = mutate_original_span + if self.rules: + logging.getLogger(__name__).debug( + "TraceloopSpanProcessor loaded %d transformation rules (explicit=%d env=%d)", + len(self.rules), + len(rules or []), + len(env_rules), + ) + self._processed_span_ids = set() + # Mapping from original span_id to translated INVOCATION (not span) for parent-child relationship preservation + self._original_to_translated_invocation: Dict[int, Any] = {} + # Buffer spans to process them in the correct order (parents before children) + self._span_buffer: List[ReadableSpan] = [] + self._processing_buffer = False + + def _default_span_filter(self, span: ReadableSpan) -> bool: + """Default filter: Transform spans that look like LLM/AI calls. + + Filters out spans that don't appear to be LLM-related while keeping + Traceloop task/workflow spans for transformation. + """ + if not span.name: + return False + + # Always process Traceloop task/workflow spans (they need transformation) + if span.attributes: + span_kind = span.attributes.get("traceloop.span.kind") + if span_kind in ("task", "workflow", "tool", "agent"): + return True + + # Check for common LLM/AI span indicators + llm_indicators = [ + "chat", + "completion", + "llm", + "ai", + "gpt", + "claude", + "gemini", + "openai", + "anthropic", + "cohere", + "huggingface", + ] + + span_name_lower = span.name.lower() + for indicator in llm_indicators: + if indicator in span_name_lower: + return True + + # Check attributes for AI/LLM markers (if any attributes present) + if span.attributes: + # Check for traceloop entity attributes + if ( + "traceloop.entity.input" in span.attributes + or "traceloop.entity.output" in span.attributes + ): + # We already filtered task/workflow spans above, so if we get here + # it means it has model data + return True + # Check for other AI/LLM markers + for attr_key in span.attributes.keys(): + attr_key_lower = str(attr_key).lower() + if any( + marker in attr_key_lower + for marker in ["llm", "ai", "gen_ai", "model"] + ): + return True + return False + + def on_start( + self, span: Span, parent_context: Optional[Context] = None + ) -> None: + """Called when a span is started.""" + pass + + def _process_span_translation(self, span: ReadableSpan) -> Optional[Any]: + """Process a single span translation with proper parent mapping. + + Returns the invocation object if a translation was created, None otherwise. + """ + logger = logging.getLogger(__name__) + + # Skip synthetic spans we already produced (recursion guard) - use different sentinel + # NOTE: _traceloop_processed is set by mutation, _traceloop_translated is set by translation + if span.attributes and "_traceloop_translated" in span.attributes: + return None + + # Check if this span should be transformed + if not self.span_filter(span): + logger.debug("Span %s filtered out by span_filter", span.name) + return None + + logger.debug("Processing span for transformation: %s (kind=%s)", + span.name, + span.attributes.get("traceloop.span.kind") if span.attributes else None) + + # avoid emitting multiple synthetic spans if on_end invoked repeatedly. + span_id_int = getattr(getattr(span, "context", None), "span_id", None) + if span_id_int is not None: + if span_id_int in self._processed_span_ids: + return None + self._processed_span_ids.add(span_id_int) + + # Determine which transformation set to use + applied_rule: Optional[TransformationRule] = None + for rule in self.rules: + try: + if rule.matches(span): + applied_rule = rule + break + except Exception as match_err: # pragma: no cover - defensive + logging.warning("Rule match error ignored: %s", match_err) + + sentinel = {"_traceloop_processed": True} + # Decide which transformation config to apply + if applied_rule is not None: + attr_tx = applied_rule.attribute_transformations + name_tx = applied_rule.name_transformations + extra_tl_attrs = { + **applied_rule.traceloop_attributes, + **sentinel, + } + else: + attr_tx = self.attribute_transformations + name_tx = self.name_transformations + extra_tl_attrs = {**self.traceloop_attributes, **sentinel} + + # Build invocation (mutation already happened in on_end before this method) + invocation = self._build_invocation( + span, + attribute_transformations=attr_tx, + name_transformations=name_tx, + traceloop_attributes=extra_tl_attrs, + ) + invocation.attributes.setdefault("_traceloop_processed", True) + + # Always emit via TelemetryHandler + handler = self.telemetry_handler or get_telemetry_handler() + try: + # Find the translated parent span if the original span has a parent + parent_context = None + if span.parent: + parent_span_id = getattr(span.parent, "span_id", None) + if ( + parent_span_id + and parent_span_id + in self._original_to_translated_invocation + ): + # We found the translated invocation of the parent - use its span + translated_parent_invocation = ( + self._original_to_translated_invocation[parent_span_id] + ) + translated_parent_span = getattr( + translated_parent_invocation, "span", None + ) + if ( + translated_parent_span + and hasattr(translated_parent_span, "is_recording") + and translated_parent_span.is_recording() + ): + from opentelemetry.trace import set_span_in_context + + parent_context = set_span_in_context( + translated_parent_span + ) + + # Store mapping BEFORE starting the span so children can find it + original_span_id = getattr( + getattr(span, "context", None), "span_id", None + ) + + # DEBUG: Log invocation details before starting + _logger = logging.getLogger(__name__) + _logger.debug( + "🔍 TRACELOOP PROCESSOR: Starting LLM invocation for span '%s' (kind=%s, model=%s)", + span.name, + span.attributes.get("traceloop.span.kind") if span.attributes else None, + invocation.request_model + ) + _logger.debug( + "🔍 TRACELOOP PROCESSOR: Invocation has %d input messages, %d output messages", + len(invocation.input_messages) if invocation.input_messages else 0, + len(invocation.output_messages) if invocation.output_messages else 0 + ) + + handler.start_llm(invocation, parent_context=parent_context) + + # DEBUG: Confirm span was created + if invocation.span: + _logger.debug( + "🔍 TRACELOOP PROCESSOR: Synthetic span created with ID %s", + invocation.span.get_span_context().span_id if hasattr(invocation.span, 'get_span_context') else 'unknown' + ) + else: + _logger.warning( + "⚠️ TRACELOOP PROCESSOR: No synthetic span created for '%s'", + span.name + ) + + # Set the sentinel attribute immediately on the new span to prevent recursion + if invocation.span and invocation.span.is_recording(): + invocation.span.set_attribute("_traceloop_translated", True) + # Store the mapping from original span_id to translated INVOCATION (we'll close it later) + if original_span_id: + self._original_to_translated_invocation[ + original_span_id + ] = invocation + # DON'T call stop_llm yet - we'll do that after processing all children + return invocation + except Exception as emit_err: # pragma: no cover - defensive + logging.getLogger(__name__).warning( + "Telemetry handler emission failed: %s", emit_err + ) + return None + + def on_end(self, span: ReadableSpan) -> None: + """ + Called when a span is ended. Mutate immediately, then process based on span type. + + HYBRID APPROACH: + 1. ALL spans get attribute translation immediately (via _mutate_span_if_needed) + 2. LLM spans get processed immediately for evaluations + 3. Non-LLM spans are buffered for optional batch processing + """ + _logger = logging.getLogger(__name__) + + try: + # STEP 1: Always mutate immediately (ALL spans get attribute translation) + self._mutate_span_if_needed(span) + + # STEP 2: Check if this is an LLM span that needs evaluation + if self._is_llm_span(span): + _logger.debug( + "🔍 TRACELOOP PROCESSOR: LLM span '%s' detected! Processing immediately for evaluations", + span.name + ) + # Process LLM spans IMMEDIATELY - create synthetic span and trigger evaluations + invocation = self._process_span_translation(span) + if invocation: + # Close the invocation immediately to trigger evaluations + handler = self.telemetry_handler or get_telemetry_handler() + try: + handler.stop_llm(invocation) + _logger.debug( + "🔍 TRACELOOP PROCESSOR: LLM invocation completed, evaluations should trigger" + ) + except Exception as stop_err: + _logger.warning( + "Failed to stop LLM invocation: %s", stop_err + ) + else: + # Non-LLM spans (tasks, workflows, tools) - buffer for optional batch processing + _logger.debug( + "🔍 TRACELOOP PROCESSOR: Non-LLM span '%s', buffering (%d in buffer)", + span.name, + len(self._span_buffer) + 1 + ) + self._span_buffer.append(span) + + # Process buffer when root span arrives (optional, for synthetic spans of workflows) + if span.parent is None and not self._processing_buffer: + _logger.debug( + "🔍 TRACELOOP PROCESSOR: ROOT SPAN detected, processing buffered non-LLM spans" + ) + self._processing_buffer = True + try: + spans_to_process = self._sort_spans_by_hierarchy( + self._span_buffer + ) + + invocations_to_close = [] + for buffered_span in spans_to_process: + result_invocation = self._process_span_translation( + buffered_span + ) + if result_invocation: + invocations_to_close.append(result_invocation) + + handler = self.telemetry_handler or get_telemetry_handler() + for invocation in reversed(invocations_to_close): + try: + handler.stop_llm(invocation) + except Exception as stop_err: + _logger.warning( + "Failed to stop invocation: %s", stop_err + ) + + self._span_buffer.clear() + self._original_to_translated_invocation.clear() + finally: + self._processing_buffer = False + + except Exception as e: + # Don't let transformation errors break the original span processing + logging.warning( + f"TraceloopSpanProcessor failed to transform span: {e}" + ) + + def _sort_spans_by_hierarchy( + self, spans: List[ReadableSpan] + ) -> List[ReadableSpan]: + """Sort spans so parents come before children.""" + # Build a map of span_id to span + span_map = {} + for s in spans: + span_id = getattr(getattr(s, "context", None), "span_id", None) + if span_id: + span_map[span_id] = s + + # Build dependency graph: child -> parent + result = [] + visited = set() + + def visit(span: ReadableSpan) -> None: + span_id = getattr(getattr(span, "context", None), "span_id", None) + if not span_id or span_id in visited: + return + + # Visit parent first + if span.parent: + parent_id = getattr(span.parent, "span_id", None) + if parent_id and parent_id in span_map: + visit(span_map[parent_id]) + + # Then add this span + visited.add(span_id) + result.append(span) + + # Visit all spans + for span in spans: + visit(span) + + return result + + def shutdown(self) -> None: + """Called when the tracer provider is shutdown.""" + pass + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """Force flush any buffered spans.""" + return True + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + def _is_llm_span(self, span: ReadableSpan) -> bool: + """ + Detect if this is a span that should trigger evaluations. + + Returns True for: + 1. Actual LLM API call spans (ChatOpenAI.chat, etc.) - detected by model attribute or name + 2. Task/Agent spans with message data - detected by presence of entity.input/output or gen_ai.input.messages + + Returns False for spans that don't need evaluation (utility tasks, tools without messages, etc.) + """ + _logger = logging.getLogger(__name__) + + if not span.attributes: + return False + + # PRIORITY 1: Check if this span has message data (task/agent spans with entity.input/output) + # These are the spans where message reconstruction will work! + has_input_messages = ( + "traceloop.entity.input" in span.attributes or + "gen_ai.input.messages" in span.attributes + ) + has_output_messages = ( + "traceloop.entity.output" in span.attributes or + "gen_ai.output.messages" in span.attributes + ) + + if has_input_messages or has_output_messages: + # This is a task/agent span with message data - PERFECT for evaluations! + span_kind = span.attributes.get("traceloop.span.kind") or span.attributes.get("gen_ai.span.kind") + _logger.debug( + "🔍 TRACELOOP PROCESSOR: Span '%s' (kind=%s) has message data (input=%s, output=%s) - WILL EVALUATE", + span.name, span_kind, has_input_messages, has_output_messages + ) + return True + + # PRIORITY 2: Check for explicit LLM span kind (even without messages, for compatibility) + span_kind = span.attributes.get("traceloop.span.kind") or span.attributes.get("gen_ai.span.kind") + if span_kind == "llm": + _logger.debug("🔍 TRACELOOP PROCESSOR: Span '%s' has span_kind='llm'", span.name) + return True + + # PRIORITY 3: Check for model attributes (strong indicator of LLM call) + if any(key in span.attributes for key in [ + "llm.request.model", + "gen_ai.request.model", + "ai.model.name" + ]): + _logger.debug("🔍 TRACELOOP PROCESSOR: Span '%s' has model attribute", span.name) + return True + + # PRIORITY 4: Name-based detection (ChatOpenAI.chat, etc.) + span_name_lower = span.name.lower() + + # Explicit excludes (utility spans that never have evaluable content) + exclude_keywords = ["should_continue", "model_to_tools", "tools_to_model"] + if any(ex in span_name_lower for ex in exclude_keywords): + _logger.debug("🔍 TRACELOOP PROCESSOR: Span '%s' excluded by keyword", span.name) + return False + + # LLM indicators in span name + llm_indicators = ["chatopenai", "chatgoogleai", "chatanthropic", "chatvertexai", "openai.chat", "completion", "gpt-", "claude-", "gemini-", "llama-"] + for indicator in llm_indicators: + if indicator in span_name_lower: + _logger.debug("🔍 TRACELOOP PROCESSOR: Span '%s' matches LLM indicator '%s'", span.name, indicator) + return True + + _logger.debug("🔍 TRACELOOP PROCESSOR: Span '%s' is NOT an evaluation span (no messages, no model)", span.name) + return False + + def _mutate_span_if_needed(self, span: ReadableSpan) -> None: + """Mutate the original span's attributes and name if configured to do so. + + This should be called early in on_end() before other processors see the span. + """ + # Check if this span should be transformed + if not self.span_filter(span): + return + + # Skip if already processed + if span.attributes and "_traceloop_processed" in span.attributes: + return + + # Determine which transformation set to use + applied_rule: Optional[TransformationRule] = None + for rule in self.rules: + try: + if rule.matches(span): + applied_rule = rule + break + except Exception as match_err: # pragma: no cover - defensive + logging.warning("Rule match error ignored: %s", match_err) + + # Decide which transformation config to apply + if applied_rule is not None: + attr_tx = applied_rule.attribute_transformations + name_tx = applied_rule.name_transformations + else: + attr_tx = self.attribute_transformations + name_tx = self.name_transformations + + # Check if mutation is enabled (both processor-level and per-invocation level) + # For now, we only check processor-level since we don't have the invocation yet + should_mutate = self.mutate_original_span + + # Mutate attributes + if should_mutate and attr_tx: + try: + if hasattr(span, "_attributes"): + original = dict(span._attributes) if span._attributes else {} # type: ignore[attr-defined] + mutated = self._apply_attribute_transformations( + original.copy(), attr_tx + ) + # Mark as processed + mutated["_traceloop_processed"] = True + # Clear and update the underlying _attributes dict + span._attributes.clear() # type: ignore[attr-defined] + span._attributes.update(mutated) # type: ignore[attr-defined] + logging.getLogger(__name__).debug( + "Mutated span %s attributes: %s -> %s keys", + span.name, + len(original), + len(mutated), + ) + else: + logging.getLogger(__name__).warning( + "Span %s does not have _attributes; mutation skipped", span.name + ) + except Exception as mut_err: + logging.getLogger(__name__).debug( + "Attribute mutation skipped due to error: %s", mut_err + ) + + # Mutate name + if should_mutate and name_tx: + try: + new_name = self._derive_new_name(span.name, name_tx) + if new_name and hasattr(span, "_name"): + span._name = new_name # type: ignore[attr-defined] + logging.getLogger(__name__).debug( + "Mutated span name: %s -> %s", span.name, new_name + ) + elif new_name and hasattr(span, "update_name"): + try: + span.update_name(new_name) # type: ignore[attr-defined] + except Exception: + pass + except Exception as name_err: + logging.getLogger(__name__).debug( + "Span name mutation failed: %s", name_err + ) + + def _apply_attribute_transformations( + self, base: Dict[str, Any], transformations: Optional[Dict[str, Any]] + ) -> Dict[str, Any]: + if not transformations: + return base + remove_keys = transformations.get("remove") or [] + for k in remove_keys: + base.pop(k, None) + rename_map = transformations.get("rename") or {} + for old, new in rename_map.items(): + if old in base: + value = base.pop(old) + # Special handling for entity input/output - normalize and serialize + if old in ( + "traceloop.entity.input", + "traceloop.entity.output", + ): + try: + direction = "input" if "input" in old else "output" + normalized = normalize_traceloop_content( + value, direction + ) + value = json.dumps(normalized) + except Exception as e: + # If normalization fails, try to serialize as-is + logging.getLogger(__name__).warning( + f"Failed to normalize {old}: {e}, using raw value" + ) + try: + value = ( + json.dumps(value) + if not isinstance(value, str) + else value + ) + except Exception: + value = str(value) + base[new] = value + add_map = transformations.get("add") or {} + for k, v in add_map.items(): + base[k] = v + return base + + def _derive_new_name( + self, + original_name: str, + name_transformations: Optional[Dict[str, str]], + ) -> Optional[str]: + if not name_transformations: + return None + import fnmatch + + for pattern, new_name in name_transformations.items(): + try: + if fnmatch.fnmatch(original_name, pattern): + return new_name + except Exception: + continue + return None + + def _build_invocation( + self, + existing_span: ReadableSpan, + *, + attribute_transformations: Optional[Dict[str, Any]] = None, + name_transformations: Optional[Dict[str, str]] = None, + traceloop_attributes: Optional[Dict[str, Any]] = None, + ) -> LLMInvocation: + base_attrs: Dict[str, Any] = ( + dict(existing_span.attributes) if existing_span.attributes else {} + ) + + # DEBUG: Log attribute keys to understand what's in the span + _logger = logging.getLogger(__name__) + _logger.debug( + f"🔍 SPAN ATTRIBUTES: span_name={existing_span.name}, " + f"keys={sorted([k for k in base_attrs.keys() if 'input' in k.lower() or 'output' in k.lower() or 'message' in k.lower() or 'entity' in k.lower()])}" + ) + + # BEFORE transforming attributes, extract original message data + # for message reconstruction (needed for evaluations) + # Try both old format (traceloop.entity.*) and new format (gen_ai.*) + original_input_data = base_attrs.get("gen_ai.input.messages") or base_attrs.get("traceloop.entity.input") + original_output_data = base_attrs.get("gen_ai.output.messages") or base_attrs.get("traceloop.entity.output") + + _logger.debug( + f"🔍 MESSAGE DATA: input_data={'' if original_input_data else ''}, " + f"output_data={'' if original_output_data else ''}" + ) + + # Apply attribute transformations + base_attrs = self._apply_attribute_transformations( + base_attrs, attribute_transformations + ) + if traceloop_attributes: + # Transform traceloop_attributes before adding them to avoid re-introducing legacy keys + transformed_tl_attrs = self._apply_attribute_transformations( + traceloop_attributes.copy(), attribute_transformations + ) + base_attrs.update(transformed_tl_attrs) + + # Final cleanup: remove any remaining traceloop.* keys that weren't in the rename map + # This catches any attributes added by the Traceloop SDK or other sources + keys_to_remove = [ + k for k in base_attrs.keys() if k.startswith("traceloop.") + ] + for k in keys_to_remove: + base_attrs.pop(k, None) + + new_name = self._derive_new_name( + existing_span.name, name_transformations + ) + + # Try to get model from various attribute sources + request_model = ( + base_attrs.get("gen_ai.request.model") + or base_attrs.get("gen_ai.response.model") + or base_attrs.get("llm.request.model") + or base_attrs.get("ai.model.name") + ) + + # Infer model from original span name pattern like "chat gpt-4" if not found + if not request_model and existing_span.name: + # Simple heuristic: take token(s) after first space + parts = existing_span.name.strip().split() + if len(parts) >= 2: + candidate = parts[-1] # Prefer last token (e.g., "gpt-4") + # Basic sanity: exclude generic words that appear in indicators list + if candidate.lower() not in { + "chat", + "completion", + "llm", + "ai", + }: + request_model = candidate + + # For Traceloop task/workflow spans without model info, preserve original span name + # instead of generating "chat unknown" or similar + span_kind = base_attrs.get("gen_ai.span.kind") or base_attrs.get( + "traceloop.span.kind" + ) + if not request_model and span_kind in ( + "task", + "workflow", + "agent", + "tool", + ): + # Use the original span name to avoid "chat unknown" + if not new_name: + new_name = existing_span.name + request_model = "unknown" # Still need a model for LLMInvocation + elif not request_model: + # Default to "unknown" only if we still don't have a model + request_model = "unknown" + + # For spans that already have gen_ai.* attributes + # preserve the original span name unless explicitly overridden + if not new_name and base_attrs.get("gen_ai.system"): + new_name = existing_span.name + + # Set the span name override if we have one + if new_name: + # Provide override for SpanEmitter (we extended it to honor this) + base_attrs.setdefault("gen_ai.override.span_name", new_name) + + # Reconstruct LangChain message objects from Traceloop serialized data + # This enables evaluations to work without requiring LangChain instrumentation + input_messages = None + output_messages = None + if original_input_data or original_output_data: + try: + input_messages, output_messages = reconstruct_messages_from_traceloop( + original_input_data, original_output_data + ) + if input_messages or output_messages: + logging.getLogger(__name__).debug( + "Successfully reconstructed messages from Traceloop data: " + f"input={len(input_messages or [])} output={len(output_messages or [])}" + ) + except Exception as e: + logging.getLogger(__name__).debug( + f"Message reconstruction failed: {e}" + ) + + # Create invocation with reconstructed messages + invocation = LLMInvocation( + request_model=str(request_model), + attributes=base_attrs, + input_messages=input_messages or [], + output_messages=output_messages or [], + ) + # Mark operation heuristically from original span name + lowered = existing_span.name.lower() + if lowered.startswith("embed"): + invocation.operation = "embedding" # type: ignore[attr-defined] + elif lowered.startswith("chat"): + invocation.operation = "chat" # type: ignore[attr-defined] + return invocation diff --git a/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/traceloop/__init__.py b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/traceloop/__init__.py new file mode 100644 index 0000000000..1062aec89f --- /dev/null +++ b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/traceloop/__init__.py @@ -0,0 +1,210 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import logging +import os +from typing import Any, Dict + +from opentelemetry import trace + +_ENV_DISABLE = "OTEL_GENAI_DISABLE_TRACELOOP_TRANSLATOR" +_LOGGER = logging.getLogger(__name__) + +# Default attribute transformation mappings (traceloop.* -> gen_ai.*) +# This is the single source of truth for all attribute mappings. +# +# Mapping Status Legend: +# ✅ OFFICIAL: Documented in OpenTelemetry GenAI semantic conventions +# ⚠️ CUSTOM: Traceloop-specific, not in official semconv (extension attributes) +_DEFAULT_ATTR_TRANSFORMATIONS = { + "rename": { + # Content mappings (special handling with normalization) + "traceloop.entity.input": "gen_ai.input.messages", # OFFICIAL + "traceloop.entity.output": "gen_ai.output.messages", # OFFICIAL + # Agent and conversation + "traceloop.entity.name": "gen_ai.agent.name", # OFFICIAL (agent spans) + "traceloop.correlation.id": "gen_ai.conversation.id", # OFFICIAL (Conditionally Required) + # ⚠️ CUSTOM: Workflow / entity hierarchy (Traceloop-specific extensions) + "traceloop.workflow.name": "gen_ai.workflow.name", + "traceloop.entity.path": "gen_ai.workflow.path", + "traceloop.association.properties": "gen_ai.association.properties", + "traceloop.entity.version": "gen_ai.workflow.version", + "traceloop.span.kind": "gen_ai.span.kind", + } +} + +# Default span name transformation mappings +_DEFAULT_NAME_TRANSFORMATIONS = {"chat *": "genai.chat"} + + +def enable_traceloop_translator( + *, + attribute_transformations: Dict[str, Any] | None = None, + name_transformations: Dict[str, str] | None = None, + mutate_original_span: bool = True, +) -> bool: + """Enable the Traceloop span translator processor. + + This function registers the TraceloopSpanProcessor with the global tracer provider. + It's safe to call multiple times (idempotent). + + Args: + attribute_transformations: Custom attribute transformation rules. + If None, uses default transformations (traceloop.* -> gen_ai.*). + name_transformations: Custom span name transformation rules. + If None, uses default transformations (chat * -> genai.chat). + mutate_original_span: If True, mutate the original span's attributes. + If False, only create new synthetic spans. + + Returns: + True if the processor was registered, False if already registered or disabled. + + Example: + >>> from opentelemetry.util.genai.traceloop import enable_traceloop_translator + >>> enable_traceloop_translator() + """ + # Import here to avoid circular imports + from ..processor.traceloop_span_processor import TraceloopSpanProcessor + + provider = trace.get_tracer_provider() + + # Check if provider supports span processors + if not hasattr(provider, "add_span_processor"): + _LOGGER.warning( + "Tracer provider does not support span processors. " + "TraceloopSpanProcessor cannot be registered. " + "Make sure you're using the OpenTelemetry SDK TracerProvider." + ) + return False + + # Check for existing processor to avoid duplicates + for attr_name in ("_active_span_processors", "_span_processors"): + existing = getattr(provider, attr_name, []) + if isinstance(existing, (list, tuple)): + for proc in existing: + if isinstance(proc, TraceloopSpanProcessor): + _LOGGER.debug( + "TraceloopSpanProcessor already registered; skipping duplicate" + ) + return False + + try: + processor = TraceloopSpanProcessor( + attribute_transformations=attribute_transformations + or _DEFAULT_ATTR_TRANSFORMATIONS, + name_transformations=name_transformations + or _DEFAULT_NAME_TRANSFORMATIONS, + mutate_original_span=mutate_original_span, + ) + provider.add_span_processor(processor) + _LOGGER.info( + "TraceloopSpanProcessor registered automatically " + "(disable with %s=true)", + _ENV_DISABLE, + ) + return True + except Exception as exc: + _LOGGER.warning( + "Failed to register TraceloopSpanProcessor: %s", exc, exc_info=True + ) + return False + + +def _auto_enable() -> None: + """Automatically enable the translator unless explicitly disabled. + + This uses a deferred registration approach that works even if called before + the TracerProvider is set up. It hooks into the OpenTelemetry trace module + to register the processor as soon as a real TracerProvider is available. + """ + if os.getenv(_ENV_DISABLE, "").lower() in {"1", "true", "yes", "on"}: + _LOGGER.debug( + "TraceloopSpanProcessor auto-registration skipped (disabled via %s)", + _ENV_DISABLE, + ) + return + + # Try immediate registration first + provider = trace.get_tracer_provider() + if hasattr(provider, "add_span_processor"): + # Real provider exists - register immediately + enable_traceloop_translator() + else: + # ProxyTracerProvider or None - defer registration + _LOGGER.debug( + "TracerProvider not ready yet; deferring TraceloopSpanProcessor registration" + ) + _install_deferred_registration() + + +def _install_deferred_registration() -> None: + """Install a hook to register the processor when TracerProvider becomes available.""" + from ..processor.traceloop_span_processor import TraceloopSpanProcessor + + # Wrap the trace.set_tracer_provider function to intercept when it's called + original_set_tracer_provider = trace.set_tracer_provider + + def wrapped_set_tracer_provider(tracer_provider): + """Wrapped version that auto-registers our processor.""" + # Call the original first + result = original_set_tracer_provider(tracer_provider) + + # Now try to register our processor + try: + if hasattr(tracer_provider, "add_span_processor"): + # Check if already registered to avoid duplicates + already_registered = False + for attr_name in ( + "_active_span_processors", + "_span_processors", + ): + existing = getattr(tracer_provider, attr_name, []) + if isinstance(existing, (list, tuple)): + for proc in existing: + if isinstance(proc, TraceloopSpanProcessor): + already_registered = True + break + if already_registered: + break + + if not already_registered: + processor = TraceloopSpanProcessor( + attribute_transformations=_DEFAULT_ATTR_TRANSFORMATIONS, + name_transformations=_DEFAULT_NAME_TRANSFORMATIONS, + mutate_original_span=True, + ) + tracer_provider.add_span_processor(processor) + _LOGGER.info( + "TraceloopSpanProcessor registered (deferred) after TracerProvider setup" + ) + except Exception as exc: + _LOGGER.debug( + "Failed to auto-register TraceloopSpanProcessor: %s", exc + ) + + return result + + # Install the wrapper + trace.set_tracer_provider = wrapped_set_tracer_provider + + +# Auto-enable on import (unless disabled) +_auto_enable() + + +__all__ = [ + "enable_traceloop_translator", +] diff --git a/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/version.py b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/version.py index 345217c166..831adef5bc 100644 --- a/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/version.py +++ b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/version.py @@ -1,2 +1,2 @@ __all__ = ["__version__"] -__version__ = "0.0.0.dev0" # dynamic version replaced by hatch \ No newline at end of file +__version__ = "0.0.0.dev0" # dynamic version replaced by hatch diff --git a/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry_util_genai_traceloop_translator.pth b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry_util_genai_traceloop_translator.pth new file mode 100644 index 0000000000..0c8a2889a6 --- /dev/null +++ b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry_util_genai_traceloop_translator.pth @@ -0,0 +1 @@ +import opentelemetry.util.genai.traceloop diff --git a/util/opentelemetry-util-genai-traceloop-translator/tests/test_traceloop_integration.py b/util/opentelemetry-util-genai-traceloop-translator/tests/test_traceloop_integration.py new file mode 100644 index 0000000000..b863bf91af --- /dev/null +++ b/util/opentelemetry-util-genai-traceloop-translator/tests/test_traceloop_integration.py @@ -0,0 +1,630 @@ +"""Integration tests based on real-world Traceloop SDK usage patterns. + +These tests simulate the patterns shown in the traceloop_processor_example.py file, +testing nested workflows, agents, tasks, and tools with proper parent-child relationships. +""" + +import json +import os +import pytest +from unittest.mock import Mock, patch + +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider, ReadableSpan +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) +from opentelemetry.trace import SpanKind + +from opentelemetry.util.genai.processor.traceloop_span_processor import ( + TraceloopSpanProcessor, +) + + +@pytest.fixture(autouse=True) +def reset_env(): + """Reset environment before each test.""" + os.environ["OTEL_GENAI_CONTENT_CAPTURE"] = "1" + yield + if "OTEL_GENAI_CONTENT_CAPTURE" in os.environ: + del os.environ["OTEL_GENAI_CONTENT_CAPTURE"] + + +@pytest.fixture +def setup_tracer(): + """Setup tracer with processor and exporter.""" + exporter = InMemorySpanExporter() + provider = TracerProvider() + + # Add TraceloopSpanProcessor with attribute transformations + processor = TraceloopSpanProcessor( + attribute_transformations={ + "remove": [], + "rename": { + "traceloop.span.kind": "gen_ai.span.kind", + "traceloop.workflow.name": "gen_ai.workflow.name", + "traceloop.entity.name": "gen_ai.agent.name", + "traceloop.entity.path": "gen_ai.workflow.path", + "traceloop.entity.input": "gen_ai.input.messages", + "traceloop.entity.output": "gen_ai.output.messages", + "traceloop.correlation.id": "gen_ai.conversation.id", + }, + "add": {} + } + ) + provider.add_span_processor(processor) + + # Then add exporter + provider.add_span_processor(SimpleSpanProcessor(exporter)) + + tracer = provider.get_tracer(__name__) + + return tracer, exporter, provider + + +class TestWorkflowPattern: + """Test workflow pattern from the example.""" + + def test_simple_workflow_with_tasks(self, setup_tracer): + """Test @workflow pattern with nested @task spans.""" + tracer, exporter, _ = setup_tracer + + # Simulate: @workflow(name="pirate_joke_generator") + with tracer.start_as_current_span("pirate_joke_generator") as workflow_span: + workflow_span.set_attribute("traceloop.span.kind", "workflow") + workflow_span.set_attribute("traceloop.workflow.name", "pirate_joke_generator") + workflow_span.set_attribute("traceloop.entity.name", "pirate_joke_generator") + + # Simulate: @task(name="joke_creation") + with tracer.start_as_current_span("joke_creation") as task_span: + task_span.set_attribute("traceloop.span.kind", "task") + task_span.set_attribute("traceloop.entity.name", "joke_creation") + task_span.set_attribute("traceloop.workflow.name", "pirate_joke_generator") + + # Simulate OpenAI call within task + with tracer.start_as_current_span("chat gpt-3.5-turbo") as llm_span: + llm_span.set_attribute("gen_ai.request.model", "gpt-3.5-turbo") + llm_span.set_attribute("gen_ai.system", "openai") + llm_span.set_attribute( + "gen_ai.prompt.0.content", + "Tell me a joke about opentelemetry" + ) + llm_span.set_attribute( + "gen_ai.completion.0.content", + "Why did the trace cross the road?" + ) + + spans = exporter.get_finished_spans() + + # Should have original spans + synthetic spans + assert len(spans) >= 3, f"Expected at least 3 spans, got {len(spans)}" + + # Find workflow spans (original mutated + synthetic) + workflow_spans = [ + s for s in spans + if s.attributes and s.attributes.get("gen_ai.workflow.name") == "pirate_joke_generator" + ] + assert len(workflow_spans) >= 1, "Should have at least one workflow span" + + # Find task spans + task_spans = [ + s for s in spans + if s.name == "joke_creation" or ( + s.attributes and s.attributes.get("gen_ai.agent.name") == "joke_creation" + ) + ] + assert len(task_spans) >= 1, "Should have at least one task span" + + # Verify no traceloop.* attributes remain on any span (mutation) + for span in spans: + if span.attributes: + traceloop_keys = [k for k in span.attributes.keys() if k.startswith("traceloop.")] + # Exclude the _traceloop_processed marker + traceloop_keys = [k for k in traceloop_keys if k != "_traceloop_processed"] + assert len(traceloop_keys) == 0, ( + f"Span {span.name} should not have traceloop.* attributes, found: {traceloop_keys}" + ) + + def test_nested_agent_with_tool(self, setup_tracer): + """Test @agent pattern with nested @tool calls.""" + tracer, exporter, _ = setup_tracer + + # Simulate: @agent(name="joke_translation") + with tracer.start_as_current_span("joke_translation") as agent_span: + agent_span.set_attribute("traceloop.span.kind", "agent") + agent_span.set_attribute("traceloop.entity.name", "joke_translation") + agent_span.set_attribute("traceloop.workflow.name", "pirate_joke_generator") + agent_span.set_attribute( + "traceloop.entity.input", + json.dumps({"joke": "Why did the trace cross the road?"}) + ) + + # Simulate OpenAI call within agent + with tracer.start_as_current_span("chat gpt-3.5-turbo") as llm_span: + llm_span.set_attribute("gen_ai.request.model", "gpt-3.5-turbo") + llm_span.set_attribute("gen_ai.system", "openai") + + # Simulate: @tool(name="history_jokes") + with tracer.start_as_current_span("history_jokes") as tool_span: + tool_span.set_attribute("traceloop.span.kind", "tool") + tool_span.set_attribute("traceloop.entity.name", "history_jokes") + tool_span.set_attribute("traceloop.workflow.name", "pirate_joke_generator") + + # Simulate OpenAI call within tool + with tracer.start_as_current_span("chat gpt-3.5-turbo") as tool_llm_span: + tool_llm_span.set_attribute("gen_ai.request.model", "gpt-3.5-turbo") + tool_llm_span.set_attribute("gen_ai.system", "openai") + tool_llm_span.set_attribute( + "gen_ai.prompt.0.content", + "get some history jokes" + ) + + agent_span.set_attribute( + "traceloop.entity.output", + json.dumps({"response": "Arr! Why did the trace walk the plank?"}) + ) + + spans = exporter.get_finished_spans() + + # Should have multiple spans + assert len(spans) >= 4, f"Expected at least 4 spans, got {len(spans)}" + + # Find agent spans + agent_spans = [ + s for s in spans + if s.attributes and ( + s.attributes.get("gen_ai.agent.name") == "joke_translation" or + s.attributes.get("gen_ai.span.kind") == "agent" + ) + ] + assert len(agent_spans) >= 1, "Should have at least one agent span" + + # Find tool spans + tool_spans = [ + s for s in spans + if s.attributes and ( + s.attributes.get("gen_ai.agent.name") == "history_jokes" or + s.attributes.get("gen_ai.span.kind") == "tool" + ) + ] + assert len(tool_spans) >= 1, "Should have at least one tool span" + + # Verify input/output were captured and normalized + agent_with_input = [ + s for s in agent_spans + if s.attributes and "gen_ai.input.messages" in s.attributes + ] + if agent_with_input: + input_data = json.loads(agent_with_input[0].attributes["gen_ai.input.messages"]) + assert isinstance(input_data, list), "Input should be normalized to message array" + + +class TestParentChildRelationships: + """Test that parent-child relationships are preserved across transformations.""" + + def test_parent_child_hierarchy_preserved(self, setup_tracer): + """Test that synthetic spans maintain parent-child relationships.""" + tracer, exporter, _ = setup_tracer + + with tracer.start_as_current_span("workflow") as parent: + parent.set_attribute("traceloop.span.kind", "workflow") + parent.set_attribute("traceloop.workflow.name", "test_workflow") + + with tracer.start_as_current_span("task") as child: + child.set_attribute("traceloop.span.kind", "task") + child.set_attribute("traceloop.entity.name", "test_task") + child.set_attribute("traceloop.workflow.name", "test_workflow") + + spans = exporter.get_finished_spans() + + # Build parent-child map from context + span_map = {} + for span in spans: + span_id = span.context.span_id if span.context else None + if span_id: + span_map[span_id] = span + + # Find child spans (those with parents) + child_spans = [s for s in spans if s.parent is not None] + + assert len(child_spans) >= 1, "Should have at least one child span" + + # Verify at least one child has a valid parent reference + valid_parent_refs = 0 + for child in child_spans: + if child.parent and child.parent.span_id in span_map: + valid_parent_refs += 1 + + assert valid_parent_refs >= 1, ( + "At least one child should have a valid parent reference" + ) + + +class TestContentNormalization: + """Test content normalization patterns from the example.""" + + def test_normalize_entity_input_output(self, setup_tracer): + """Test that traceloop.entity.input and output are normalized properly.""" + tracer, exporter, _ = setup_tracer + + with tracer.start_as_current_span("test_task") as span: + span.set_attribute("traceloop.span.kind", "task") + span.set_attribute("traceloop.entity.name", "test_task") + span.set_attribute("traceloop.workflow.name", "test_workflow") + # Various input formats that should be normalized + span.set_attribute( + "traceloop.entity.input", + json.dumps({ + "messages": [ + {"role": "user", "content": "Translate this joke to pirate"} + ] + }) + ) + span.set_attribute( + "traceloop.entity.output", + json.dumps({ + "choices": [ + { + "message": {"role": "assistant", "content": "Arr matey!"}, + "finish_reason": "stop" + } + ] + }) + ) + + spans = exporter.get_finished_spans() + + # Find spans with normalized content - check both original (mutated) and synthetic + spans_with_input = [ + s for s in spans + if s.attributes and "gen_ai.input.messages" in s.attributes + ] + + # Should have at least the mutated original span with gen_ai.input.messages + assert len(spans_with_input) >= 1, f"Should have spans with normalized input, got {len(spans)} spans total" + + # Verify normalization + for span in spans_with_input: + input_str = span.attributes.get("gen_ai.input.messages") + if input_str: + input_data = json.loads(input_str) + assert isinstance(input_data, list), "Input should be list of messages" + if input_data: + assert "role" in input_data[0], "Messages should have role field" + + # Check output normalization + spans_with_output = [ + s for s in spans + if s.attributes and "gen_ai.output.messages" in s.attributes + ] + + if spans_with_output: + output_str = spans_with_output[0].attributes.get("gen_ai.output.messages") + output_data = json.loads(output_str) + assert isinstance(output_data, list), "Output should be list of messages" + + def test_normalize_string_input(self, setup_tracer): + """Test normalization of simple string inputs.""" + tracer, exporter, _ = setup_tracer + + with tracer.start_as_current_span("test_task") as span: + span.set_attribute("traceloop.span.kind", "task") + span.set_attribute("traceloop.entity.name", "test_task") + span.set_attribute("traceloop.workflow.name", "test_workflow") + # Simple string input + span.set_attribute("traceloop.entity.input", "Hello world") + + spans = exporter.get_finished_spans() + + # Should handle string input gracefully - check that span was processed + assert len(spans) >= 1, "Should have at least one span" + + # Check if any spans have gen_ai attributes (mutation occurred) + spans_with_genai = [ + s for s in spans + if s.attributes and any(k.startswith("gen_ai.") for k in s.attributes.keys()) + ] + + assert len(spans_with_genai) >= 1, "Should have spans with gen_ai.* attributes after processing" + + def test_normalize_list_of_strings(self, setup_tracer): + """Test normalization of list inputs.""" + tracer, exporter, _ = setup_tracer + + with tracer.start_as_current_span("test_task") as span: + span.set_attribute("traceloop.span.kind", "task") + span.set_attribute("traceloop.entity.name", "test_task") + span.set_attribute("traceloop.workflow.name", "test_workflow") + # List input + span.set_attribute( + "traceloop.entity.input", + json.dumps(["Message 1", "Message 2", "Message 3"]) + ) + + spans = exporter.get_finished_spans() + + # Check that spans were processed + assert len(spans) >= 1, "Should have at least one span" + + # Verify that gen_ai attributes exist (processing occurred) + spans_with_genai = [ + s for s in spans + if s.attributes and "gen_ai.span.kind" in s.attributes + ] + assert len(spans_with_genai) >= 1, "Should have processed spans with gen_ai attributes" + + +class TestModelInference: + """Test model name inference from span names and attributes.""" + + def test_infer_model_from_span_name(self, setup_tracer): + """Test that model is inferred from span name like 'chat gpt-3.5-turbo'.""" + tracer, exporter, _ = setup_tracer + + # Simulate OpenAI instrumentation span naming pattern + with tracer.start_as_current_span("chat gpt-3.5-turbo") as span: + span.set_attribute("gen_ai.system", "openai") + # No explicit gen_ai.request.model attribute + + spans = exporter.get_finished_spans() + + # Should have spans with inferred model + spans_with_model = [ + s for s in spans + if s.attributes and s.attributes.get("gen_ai.request.model") + ] + + if spans_with_model: + # Model should be inferred as "gpt-3.5-turbo" + model = spans_with_model[0].attributes.get("gen_ai.request.model") + assert model is not None, "Model should be inferred" + + def test_preserve_explicit_model(self, setup_tracer): + """Test that explicit model attributes are preserved.""" + tracer, exporter, _ = setup_tracer + + with tracer.start_as_current_span("chat completion") as span: + span.set_attribute("gen_ai.request.model", "gpt-4") + span.set_attribute("gen_ai.system", "openai") + + spans = exporter.get_finished_spans() + + # Should preserve explicit model + spans_with_model = [ + s for s in spans + if s.attributes and s.attributes.get("gen_ai.request.model") == "gpt-4" + ] + + assert len(spans_with_model) >= 1, "Should preserve explicit model attribute" + + +class TestSpanFiltering: + """Test span filtering logic.""" + + def test_filters_non_llm_spans(self, setup_tracer): + """Test that non-LLM spans are filtered out.""" + tracer, exporter, _ = setup_tracer + + # Create a span that shouldn't be transformed + with tracer.start_as_current_span("database_query") as span: + span.set_attribute("db.system", "postgresql") + span.set_attribute("db.statement", "SELECT * FROM users") + + spans = exporter.get_finished_spans() + + # Should only have the original span, no synthetic spans + assert len(spans) == 1, f"Expected 1 span (non-LLM filtered), got {len(spans)}" + + # Original span should not have gen_ai.* attributes + span = spans[0] + gen_ai_attrs = [k for k in span.attributes.keys() if k.startswith("gen_ai.")] + assert len(gen_ai_attrs) == 0, "Non-LLM span should not have gen_ai.* attributes" + + def test_includes_traceloop_spans(self, setup_tracer): + """Test that Traceloop task/workflow spans are included.""" + tracer, exporter, _ = setup_tracer + + # Traceloop spans should always be included + with tracer.start_as_current_span("my_custom_task") as span: + span.set_attribute("traceloop.span.kind", "task") + span.set_attribute("traceloop.entity.name", "my_custom_task") + span.set_attribute("traceloop.workflow.name", "test_workflow") + + spans = exporter.get_finished_spans() + + # Should have original + synthetic span + assert len(spans) >= 1, "Traceloop spans should be processed" + + # At least one span should have gen_ai.span.kind (from mutation or synthetic span) + spans_with_kind = [ + s for s in spans + if s.attributes and s.attributes.get("gen_ai.span.kind") == "task" + ] + assert len(spans_with_kind) >= 1, f"Traceloop task should be transformed, got {len(spans)} spans" + + +class TestOperationInference: + """Test operation type inference.""" + + def test_infer_chat_operation(self, setup_tracer): + """Test that 'chat' operation is inferred from span name.""" + tracer, exporter, _ = setup_tracer + + with tracer.start_as_current_span("chat gpt-4") as span: + span.set_attribute("gen_ai.system", "openai") + span.set_attribute("gen_ai.request.model", "gpt-4") + + spans = exporter.get_finished_spans() + + # The processor creates synthetic spans with operation.name + # Check if we have spans with gen_ai attributes (indicates processing) + spans_with_genai = [ + s for s in spans + if s.attributes and "gen_ai.system" in s.attributes + ] + + assert len(spans_with_genai) >= 1, f"Should have processed spans with gen_ai attributes, got {len(spans)} total spans" + + def test_infer_embedding_operation(self, setup_tracer): + """Test that 'embedding' operation is inferred from span name.""" + tracer, exporter, _ = setup_tracer + + with tracer.start_as_current_span("embedding text-embedding-ada-002") as span: + span.set_attribute("gen_ai.system", "openai") + span.set_attribute("gen_ai.request.model", "text-embedding-ada-002") + + spans = exporter.get_finished_spans() + + # Check that embedding spans are processed + spans_with_embedding = [ + s for s in spans + if s.attributes and "text-embedding" in s.attributes.get("gen_ai.request.model", "") + ] + + assert len(spans_with_embedding) >= 1, f"Should process embedding spans, got {len(spans)} total spans" + + +class TestComplexWorkflow: + """Test complete workflow simulating the example end-to-end.""" + + def test_full_pirate_joke_workflow(self, setup_tracer): + """Test complete workflow pattern from the example.""" + tracer, exporter, _ = setup_tracer + + # Main workflow + with tracer.start_as_current_span("pirate_joke_generator") as workflow: + workflow.set_attribute("traceloop.span.kind", "workflow") + workflow.set_attribute("traceloop.workflow.name", "pirate_joke_generator") + + # Task 1: Create joke + with tracer.start_as_current_span("joke_creation") as task1: + task1.set_attribute("traceloop.span.kind", "task") + task1.set_attribute("traceloop.entity.name", "joke_creation") + task1.set_attribute("traceloop.workflow.name", "pirate_joke_generator") + + with tracer.start_as_current_span("chat gpt-3.5-turbo") as llm1: + llm1.set_attribute("gen_ai.request.model", "gpt-3.5-turbo") + llm1.set_attribute("gen_ai.system", "openai") + + # Agent: Translate joke + with tracer.start_as_current_span("joke_translation") as agent: + agent.set_attribute("traceloop.span.kind", "agent") + agent.set_attribute("traceloop.entity.name", "joke_translation") + agent.set_attribute("traceloop.workflow.name", "pirate_joke_generator") + + with tracer.start_as_current_span("chat gpt-3.5-turbo") as llm2: + llm2.set_attribute("gen_ai.request.model", "gpt-3.5-turbo") + llm2.set_attribute("gen_ai.system", "openai") + + # Tool within agent + with tracer.start_as_current_span("history_jokes") as tool: + tool.set_attribute("traceloop.span.kind", "tool") + tool.set_attribute("traceloop.entity.name", "history_jokes") + tool.set_attribute("traceloop.workflow.name", "pirate_joke_generator") + + with tracer.start_as_current_span("chat gpt-3.5-turbo") as llm3: + llm3.set_attribute("gen_ai.request.model", "gpt-3.5-turbo") + llm3.set_attribute("gen_ai.system", "openai") + + # Task 2: Generate signature + with tracer.start_as_current_span("signature_generation") as task2: + task2.set_attribute("traceloop.span.kind", "task") + task2.set_attribute("traceloop.entity.name", "signature_generation") + task2.set_attribute("traceloop.workflow.name", "pirate_joke_generator") + + with tracer.start_as_current_span("chat gpt-3.5-turbo") as llm4: + llm4.set_attribute("gen_ai.request.model", "gpt-3.5-turbo") + llm4.set_attribute("gen_ai.system", "openai") + + spans = exporter.get_finished_spans() + + # Should have many spans (original mutated + synthetic) + assert len(spans) >= 8, f"Expected at least 8 spans in full workflow, got {len(spans)}" + + # Verify workflow span exists - look for spans with the workflow name + workflow_spans = [ + s for s in spans + if s.attributes and s.attributes.get("gen_ai.workflow.name") == "pirate_joke_generator" + ] + assert len(workflow_spans) >= 1, f"Should have workflow span, got {len(spans)} total spans, workflow_spans={len(workflow_spans)}" + + # Verify all task names are present + task_names = {"joke_creation", "signature_generation"} + found_tasks = set() + for span in spans: + if span.attributes: + agent_name = span.attributes.get("gen_ai.agent.name") + if agent_name in task_names: + found_tasks.add(agent_name) + + assert len(found_tasks) >= 1, f"Should find task spans, found: {found_tasks}" + + # Verify no traceloop.* attributes remain (mutation) + for span in spans: + if span.attributes: + traceloop_keys = [ + k for k in span.attributes.keys() + if k.startswith("traceloop.") and k != "_traceloop_processed" + ] + assert len(traceloop_keys) == 0, ( + f"Span {span.name} should not have traceloop.* attributes" + ) + + +class TestEdgeCases: + """Test edge cases and error handling.""" + + def test_span_without_attributes(self, setup_tracer): + """Test handling of spans without attributes.""" + tracer, exporter, _ = setup_tracer + + with tracer.start_as_current_span("test_span"): + pass # No attributes + + spans = exporter.get_finished_spans() + + # Should handle gracefully without errors + assert len(spans) >= 1, "Should handle span without attributes" + + def test_malformed_input_json(self, setup_tracer): + """Test handling of malformed JSON in input.""" + tracer, exporter, _ = setup_tracer + + with tracer.start_as_current_span("test_task") as span: + span.set_attribute("traceloop.span.kind", "task") + span.set_attribute("traceloop.entity.name", "test_task") + # Malformed JSON + span.set_attribute("traceloop.entity.input", "{invalid json}") + + spans = exporter.get_finished_spans() + + # Should handle gracefully without crashing + assert len(spans) >= 1, "Should handle malformed JSON gracefully" + + def test_empty_workflow_name(self, setup_tracer): + """Test handling of empty workflow name.""" + tracer, exporter, _ = setup_tracer + + with tracer.start_as_current_span("test_workflow") as span: + span.set_attribute("traceloop.span.kind", "workflow") + span.set_attribute("traceloop.workflow.name", "") # Empty + + spans = exporter.get_finished_spans() + + # Should handle empty values gracefully + assert len(spans) >= 1, "Should handle empty workflow name" + + def test_recursive_processing_prevention(self, setup_tracer): + """Test that spans marked as processed are not processed again.""" + tracer, exporter, _ = setup_tracer + + with tracer.start_as_current_span("test_span") as span: + span.set_attribute("traceloop.span.kind", "task") + span.set_attribute("_traceloop_processed", True) # Already processed marker + + spans = exporter.get_finished_spans() + + # Should not create duplicate synthetic spans + # With the marker, it should be filtered out + assert len(spans) >= 1, "Should handle already-processed spans" diff --git a/util/opentelemetry-util-genai-traceloop-translator/tests/test_translator.py b/util/opentelemetry-util-genai-traceloop-translator/tests/test_translator.py deleted file mode 100644 index 8afca025c1..0000000000 --- a/util/opentelemetry-util-genai-traceloop-translator/tests/test_translator.py +++ /dev/null @@ -1,145 +0,0 @@ -import json -from opentelemetry.util.genai.handler import get_telemetry_handler -from opentelemetry.util.genai.types import LLMInvocation - - -def _handler(monkeypatch): - monkeypatch.setenv("OTEL_INSTRUMENTATION_GENAI_EMITTERS", "span,traceloop_translator") - monkeypatch.setenv("OTEL_GENAI_CONTENT_CAPTURE", "1") # enable content - return get_telemetry_handler() - - -def test_normalizes_string_blob(monkeypatch): - h = _handler(monkeypatch) - blob = "{\"messages\":[{\"role\":\"user\",\"parts\":[{\"type\":\"text\",\"content\":\"Hello\"}]}]}" - inv = LLMInvocation(request_model="gpt-4", input_messages=[], attributes={"traceloop.entity.input": blob}) - h.start_llm(inv) - data = json.loads(inv.attributes.get("gen_ai.input.messages")) - assert isinstance(data, list) - assert data[0]["role"] == "user" - assert data[0]["parts"][0]["type"] == "text" - - -def test_normalizes_inputs_dict(monkeypatch): - h = _handler(monkeypatch) - raw = json.dumps({"inputs": {"title": "Tragedy at sunset on the beach", "era": "Victorian England"}}) - inv = LLMInvocation(request_model="gpt-4", input_messages=[], attributes={"traceloop.entity.input": raw}) - h.start_llm(inv) - arr = json.loads(inv.attributes.get("gen_ai.input.messages")) - assert arr[0]["role"] == "user" - # content should include title key - assert "Tragedy at sunset" in arr[0]["parts"][0]["content"] - - -def test_normalizes_list_of_strings(monkeypatch): - h = _handler(monkeypatch) - inv = LLMInvocation(request_model="gpt-4", input_messages=[], attributes={"traceloop.entity.input": ["Hello", "World"]}) - h.start_llm(inv) - arr = json.loads(inv.attributes.get("gen_ai.input.messages")) - assert len(arr) == 2 - assert arr[0]["parts"][0]["content"] == "Hello" - - -def test_normalizes_dict_messages(monkeypatch): - h = _handler(monkeypatch) - raw = {"messages": [{"role": "user", "content": "Ping"}, {"role": "assistant", "content": "Pong"}]} - inv = LLMInvocation(request_model="gpt-4", input_messages=[], attributes={"traceloop.entity.input": raw}) - h.start_llm(inv) - arr = json.loads(inv.attributes.get("gen_ai.input.messages")) - assert arr[1]["role"] == "assistant" - assert arr[1]["parts"][0]["content"] == "Pong" - - -def test_output_normalization(monkeypatch): - h = _handler(monkeypatch) - out_raw = [{"role": "assistant", "parts": ["Answer"], "finish_reason": "stop"}] - inv = LLMInvocation(request_model="gpt-4", input_messages=[], attributes={"traceloop.entity.output": out_raw}) - h.start_llm(inv) - arr = json.loads(inv.attributes.get("gen_ai.output.messages")) - assert arr[0]["finish_reason"] == "stop" - - -def test_output_openai_choices(monkeypatch): - h = _handler(monkeypatch) - raw = {"choices": [ - {"message": {"role": "assistant", "content": "Hello there"}, "finish_reason": "stop"}, - {"message": {"role": "assistant", "content": "Hi again"}, "finish_reason": "length"}, - ]} - inv = LLMInvocation(request_model="gpt-4", input_messages=[], attributes={"traceloop.entity.output": raw}) - h.start_llm(inv) - arr = json.loads(inv.attributes.get("gen_ai.output.messages")) - assert arr[1]["parts"][0]["content"] == "Hi again" - assert arr[1]["finish_reason"] == "length" - - -def test_output_candidates(monkeypatch): - h = _handler(monkeypatch) - raw = {"candidates": [ - {"role": "assistant", "content": [{"text": "Choice A"}]}, - {"role": "assistant", "content": [{"text": "Choice B"}], "finish_reason": "stop"}, - ]} - inv = LLMInvocation(request_model="gpt-4", input_messages=[], attributes={"traceloop.entity.output": raw}) - h.start_llm(inv) - arr = json.loads(inv.attributes.get("gen_ai.output.messages")) - assert arr[0]["parts"][0]["content"].startswith("Choice A") - - -def test_output_tool_calls(monkeypatch): - h = _handler(monkeypatch) - raw = {"tool_calls": [ - {"name": "get_weather", "arguments": {"city": "Paris"}, "id": "call1"}, - {"name": "lookup_user", "arguments": {"id": 42}, "id": "call2", "finish_reason": "tool_call"}, - ]} - inv = LLMInvocation(request_model="gpt-4", input_messages=[], attributes={"traceloop.entity.output": raw}) - h.start_llm(inv) - arr = json.loads(inv.attributes.get("gen_ai.output.messages")) - assert arr[0]["parts"][0]["type"] == "tool_call" - assert arr[1]["finish_reason"] == "tool_call" - - -def test_no_content_capture(monkeypatch): - monkeypatch.setenv("OTEL_INSTRUMENTATION_GENAI_EMITTERS", "span,traceloop_translator") - monkeypatch.setenv("OTEL_GENAI_CONTENT_CAPTURE", "0") # disable - h = get_telemetry_handler() - inv = LLMInvocation(request_model="gpt-4", input_messages=[], attributes={"traceloop.entity.input": "Hello"}) - h.start_llm(inv) - assert "gen_ai.input.messages" not in inv.attributes - - - -def test_workflow_name_mapping(monkeypatch): - h = _handler(monkeypatch) - inv = LLMInvocation(request_model="gpt-4", input_messages=[], attributes={"traceloop.workflow.name": "FlowA"}) - h.start_llm(inv) - assert inv.attributes.get("gen_ai.workflow.name") == "FlowA" - - -def test_strip_legacy(monkeypatch): - h = _handler(monkeypatch) - monkeypatch.setenv("OTEL_GENAI_TRACELOOP_TRANSLATOR_STRIP_LEGACY", "1") - inv = LLMInvocation(request_model="gpt-4", input_messages=[], attributes={"traceloop.entity.path": "x/y/z"}) - h.start_llm(inv) - assert inv.attributes.get("gen_ai.workflow.path") == "x/y/z" - assert "traceloop.entity.path" not in inv.attributes - - -def test_conversation_id_mapping(monkeypatch): - h = _handler(monkeypatch) - inv = LLMInvocation(request_model="gpt-4", input_messages=[], attributes={"traceloop.correlation.id": "conv_123"}) - h.start_llm(inv) - assert inv.attributes.get("gen_ai.conversation.id") == "conv_123" - - -def test_conversation_id_invalid(monkeypatch): - h = _handler(monkeypatch) - bad = "this id has spaces" # fails regex - inv = LLMInvocation(request_model="gpt-4", input_messages=[], attributes={"traceloop.correlation.id": bad}) - h.start_llm(inv) - assert inv.attributes.get("gen_ai.conversation.id") is None - - -def test_operation_inference(monkeypatch): - h = _handler(monkeypatch) - inv = LLMInvocation(request_model="gpt-4", input_messages=[], attributes={"traceloop.span.kind": "tool"}) - h.start_llm(inv) - assert inv.attributes.get("gen_ai.operation.name") == "execute_tool" \ No newline at end of file diff --git a/util/opentelemetry-util-genai-traceloop-translator/tests/test_translator_basic.py b/util/opentelemetry-util-genai-traceloop-translator/tests/test_translator_basic.py deleted file mode 100644 index 6a931cfcb8..0000000000 --- a/util/opentelemetry-util-genai-traceloop-translator/tests/test_translator_basic.py +++ /dev/null @@ -1,20 +0,0 @@ -from opentelemetry.util.genai.handler import get_telemetry_handler -from opentelemetry.util.genai.types import LLMInvocation, InputMessage, Text - - -def test_basic_promotion(monkeypatch): - monkeypatch.setenv("OTEL_INSTRUMENTATION_GENAI_EMITTERS", "span,traceloop_translator") - handler = get_telemetry_handler() - inv = LLMInvocation( - request_model="gpt-4", - input_messages=[InputMessage(role="user", parts=[Text("Hi")])], - attributes={ - "traceloop.workflow.name": "flowX", - "traceloop.entity.name": "AgentX", - "traceloop.callback.name": "root_cb", - }, - ) - handler.start_llm(inv) - handler.stop_llm(inv) - assert inv.attributes.get("gen_ai.workflow.name") == "flowX" - assert inv.attributes.get("gen_ai.agent.name") == "AgentX"