From 2326ccda594694829ffeeb337a5ccbf8ff4a035d Mon Sep 17 00:00:00 2001 From: Pavan Sudheendra Date: Mon, 6 Oct 2025 12:09:40 +0100 Subject: [PATCH 1/6] feat: Add traceloop translator Signed-off-by: Pavan Sudheendra --- .../TRANSLATOR_README.md | 41 +++ .../traceloop_rules_example.py | 45 +++ .../util/genai/processors/__init__.py | 6 + .../util/genai/processors/span_transformer.py | 104 +++++++ .../processors/traceloop_span_generator.py | 62 +++++ .../processors/traceloop_span_processor.py | 259 ++++++++++++++++++ .../tests/test_traceloop_span_processor.py | 182 ++++++++++++ 7 files changed, 699 insertions(+) create mode 100644 util/opentelemetry-util-genai-dev/TRANSLATOR_README.md create mode 100644 util/opentelemetry-util-genai-dev/examples/traceloop_span_transformation/traceloop_rules_example.py create mode 100644 util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/__init__.py create mode 100644 util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/span_transformer.py create mode 100644 util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/traceloop_span_generator.py create mode 100644 util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/traceloop_span_processor.py create mode 100644 util/opentelemetry-util-genai-dev/tests/test_traceloop_span_processor.py diff --git a/util/opentelemetry-util-genai-dev/TRANSLATOR_README.md b/util/opentelemetry-util-genai-dev/TRANSLATOR_README.md new file mode 100644 index 0000000000..46b59dc6d9 --- /dev/null +++ b/util/opentelemetry-util-genai-dev/TRANSLATOR_README.md @@ -0,0 +1,41 @@ +# Translator + +## Automatic Span Processing (Recommended) + +Add `TraceloopSpanProcessor` to your TracerProvider to automatically transform all matching spans: + +```python +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.util.genai.processors import TraceloopSpanProcessor + +# Set up tracer provider +provider = TracerProvider() + +# Add processor - transforms all matching spans automatically +processor = TraceloopSpanProcessor( + attribute_transformations={ + "remove": ["debug_info"], + "rename": {"model_ver": "llm.model.version"}, + "add": {"service.name": "my-llm"} + }, + name_transformations={"chat *": "llm.openai.chat"}, + traceloop_attributes={ + "traceloop.entity.name": "MyLLMEntity" + } +) +provider.add_span_processor(processor) +trace.set_tracer_provider(provider) + +``` + +## Transformation Rules + +### Attributes +- **Remove**: `"remove": ["field1", "field2"]` +- **Rename**: `"rename": {"old_name": "new_name"}` +- **Add**: `"add": {"key": "value"}` + +### Span Names +- **Direct**: `"old name": "new name"` +- **Pattern**: `"chat *": "llm.chat"` (wildcard matching) \ No newline at end of file diff --git a/util/opentelemetry-util-genai-dev/examples/traceloop_span_transformation/traceloop_rules_example.py b/util/opentelemetry-util-genai-dev/examples/traceloop_span_transformation/traceloop_rules_example.py new file mode 100644 index 0000000000..a1670ec56b --- /dev/null +++ b/util/opentelemetry-util-genai-dev/examples/traceloop_span_transformation/traceloop_rules_example.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import json +import os +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter + +from opentelemetry.util.genai.processors.traceloop_span_processor import TraceloopSpanProcessor + +RULE_SPEC = { + "rules": [ + { + "attribute_transformations": { + "rename": {"llm.provider": "ai.system.vendor"}, + "add": {"example.rule": "chat"}, + }, + "name_transformations": {"chat *": "genai.chat"}, + }, + { + "attribute_transformations": {"add": {"processed.embedding": True}}, + "name_transformations": {"*": "genai.embedding"}, + }, + ] +} +os.environ["OTEL_GENAI_SPAN_TRANSFORM_RULES"] = json.dumps(RULE_SPEC) + +# Set up tracing +provider = TracerProvider() +provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) +# Add the Traceloop processor +provider.add_span_processor(TraceloopSpanProcessor()) +trace.set_tracer_provider(provider) +tracer = trace.get_tracer(__name__) + +print("Creating spans ...\n") + +with tracer.start_as_current_span("chat gpt-4") as span: + span.set_attribute("llm.provider", "openai") + span.set_attribute("debug_info", "remove me if rule had remove") + +with tracer.start_as_current_span("vector encode") as span: + span.set_attribute("custom.kind", "embedding") diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/__init__.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/__init__.py new file mode 100644 index 0000000000..23e5d9a1c5 --- /dev/null +++ b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/__init__.py @@ -0,0 +1,6 @@ +from .traceloop_span_processor import TraceloopSpanProcessor, TransformationRule + +__all__ = [ + "TraceloopSpanProcessor", + "TransformationRule", +] \ No newline at end of file diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/span_transformer.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/span_transformer.py new file mode 100644 index 0000000000..96348bb0d2 --- /dev/null +++ b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/span_transformer.py @@ -0,0 +1,104 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Dict, Optional + +from opentelemetry.sdk.trace import ReadableSpan + +from .traceloop_span_generator import TraceloopSpanGenerator +from ..types import LLMInvocation + + +def _apply_attribute_transformations( + base: Dict[str, Any], transformations: Optional[Dict[str, Any]] +) -> Dict[str, Any]: # pragma: no cover - trivial helpers + if not transformations: + return base + # Order: remove -> rename -> add (so add always wins) + remove_keys = transformations.get("remove") or [] + for k in remove_keys: + base.pop(k, None) + rename_map = transformations.get("rename") or {} + for old, new in rename_map.items(): + if old in base: + base[new] = base.pop(old) + add_map = transformations.get("add") or {} + for k, v in add_map.items(): + base[k] = v + return base + + +def _derive_new_name( + original_name: str, name_transformations: Optional[Dict[str, str]] +) -> Optional[str]: # pragma: no cover - simple matching + if not name_transformations: + return None + import fnmatch + + for pattern, new_name in name_transformations.items(): + try: + if fnmatch.fnmatch(original_name, pattern): + return new_name + except Exception: # defensive + continue + return None + + +def transform_existing_span_to_telemetry( + existing_span: ReadableSpan, + attribute_transformations: Optional[Dict[str, Any]] = None, + name_transformations: Optional[Dict[str, str]] = None, + traceloop_attributes: Optional[Dict[str, Any]] = None, + generator: Optional[TraceloopSpanGenerator] = None, +) -> LLMInvocation: + """Create a synthetic LLMInvocation span from an ended (or ending) span. + + Returns the synthetic ``LLMInvocation`` used purely as a carrier for the new span. + """ + base_attrs: Dict[str, Any] = ( + dict(existing_span.attributes) if existing_span.attributes else {} + ) + + # Apply transformations + base_attrs = _apply_attribute_transformations( + base_attrs, attribute_transformations + ) + if traceloop_attributes: + base_attrs.update(traceloop_attributes) + + # Span name rewrite (store so generator can use & remove later) + new_name = _derive_new_name(existing_span.name, name_transformations) + if new_name: + base_attrs["_traceloop_new_name"] = new_name + + # Determine request_model (best-effort, fallback to unknown) + request_model = ( + base_attrs.get("gen_ai.request.model") + or base_attrs.get("llm.request.model") + or base_attrs.get("ai.model.name") + or "unknown" + ) + + invocation = LLMInvocation( + request_model=str(request_model), + attributes=base_attrs, + messages=[], # empty; original content not reconstructed here + ) + + if generator is None: + generator = TraceloopSpanGenerator(capture_content=True) + generator.start(invocation) + if existing_span.end_time is not None: + generator.finish(invocation) + return invocation diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/traceloop_span_generator.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/traceloop_span_generator.py new file mode 100644 index 0000000000..c3e7ec00b0 --- /dev/null +++ b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/traceloop_span_generator.py @@ -0,0 +1,62 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional +from opentelemetry import trace +from opentelemetry.trace import Tracer +from ..types import LLMInvocation + +class TraceloopSpanGenerator: + def __init__(self, tracer: Optional[Tracer] = None, capture_content: bool = False): + self._tracer = tracer or trace.get_tracer(__name__) + self._capture_content = capture_content + + def start(self, invocation: LLMInvocation): + override = getattr(invocation, "attributes", {}).get("_traceloop_new_name") + if override: + span_name = override + else: + name = getattr(invocation, "request_model", "llm") + span_name = f"chat {name}" if not str(name).startswith("chat ") else str(name) + span = self._tracer.start_span(span_name, kind=trace.SpanKind.CLIENT) + invocation.span = span + invocation.context_token = trace.use_span(span, end_on_exit=False) + invocation.context_token.__enter__() + # apply starting attributes + for k, v in getattr(invocation, "attributes", {}).items(): + try: + span.set_attribute(k, v) + except Exception: + pass + + def finish(self, invocation: LLMInvocation): + span = getattr(invocation, "span", None) + if not span: + return + # re-apply attributes (after transformations) + for k, v in getattr(invocation, "attributes", {}).items(): + try: + span.set_attribute(k, v) + except Exception: + pass + token = getattr(invocation, "context_token", None) + if token and hasattr(token, "__exit__"): + try: + token.__exit__(None, None, None) + except Exception: + pass + span.end() + + def error(self, error, invocation: LLMInvocation): # pragma: no cover - unused in tests now + self.finish(invocation) diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/traceloop_span_processor.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/traceloop_span_processor.py new file mode 100644 index 0000000000..8180e2b147 --- /dev/null +++ b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/traceloop_span_processor.py @@ -0,0 +1,259 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +import fnmatch +import json +import logging +import os +import re +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional + +from opentelemetry.context import Context +from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor +from opentelemetry.trace import Span + +from .span_transformer import transform_existing_span_to_telemetry +from .traceloop_span_generator import TraceloopSpanGenerator + +_ENV_RULES = "OTEL_GENAI_SPAN_TRANSFORM_RULES" + + +@dataclass +class TransformationRule: + """Represents a single conditional transformation rule. + + Fields map closely to the JSON structure accepted via the environment + variable. All fields are optional; empty rule never matches. + """ + + match_name: Optional[str] = None # glob pattern (e.g. "chat *") + match_scope: Optional[str] = None # regex or substring (case-insensitive) + match_attributes: Dict[str, Optional[str]] = field(default_factory=dict) + + attribute_transformations: Dict[str, Any] = field(default_factory=dict) + name_transformations: Dict[str, str] = field(default_factory=dict) + traceloop_attributes: Dict[str, Any] = field(default_factory=dict) + + def matches(self, span: ReadableSpan) -> bool: # pragma: no cover - simple logic + if self.match_name: + if not fnmatch.fnmatch(span.name, self.match_name): + return False + if self.match_scope: + scope = getattr(span, "instrumentation_scope", None) + scope_name = getattr(scope, "name", "") if scope else "" + pattern = self.match_scope + # Accept either regex (contains meta chars) or simple substring + try: + if any(ch in pattern for ch in ".^$|()[]+?\\"): + if not re.search(pattern, scope_name, re.IGNORECASE): + return False + else: + if pattern.lower() not in scope_name.lower(): + return False + except re.error: + # Bad regex – treat as non-match but log once + logging.warning("Invalid regex in match_scope: %s", pattern) + return False + if self.match_attributes: + for k, expected in self.match_attributes.items(): + if k not in span.attributes: + return False + if expected is not None and str(span.attributes.get(k)) != str(expected): + return False + return True + + +def _load_rules_from_env() -> List[TransformationRule]: + raw = os.getenv(_ENV_RULES) + if not raw: + return [] + try: + data = json.loads(raw) + rules_spec = data.get("rules") if isinstance(data, dict) else None + if not isinstance(rules_spec, list): + logging.warning("%s must contain a 'rules' list", _ENV_RULES) + return [] + rules: List[TransformationRule] = [] + for r in rules_spec: + if not isinstance(r, dict): + continue + match = r.get("match", {}) if isinstance(r.get("match"), dict) else {} + rules.append( + TransformationRule( + match_name=match.get("name"), + match_scope=match.get("scope"), + match_attributes=match.get("attributes", {}) or {}, + attribute_transformations=r.get("attribute_transformations", {}) or {}, + name_transformations=r.get("name_transformations", {}) or {}, + traceloop_attributes=r.get("traceloop_attributes", {}) or {}, + ) + ) + return rules + except Exception as exc: # broad: we never want to break app startup + logging.warning("Failed to parse %s: %s", _ENV_RULES, exc) + return [] + + +class TraceloopSpanProcessor(SpanProcessor): + """ + A span processor that automatically applies transformation rules to spans. + + This processor can be added to your TracerProvider to automatically transform + all spans according to your transformation rules. + """ + + def __init__( + self, + attribute_transformations: Optional[Dict[str, Any]] = None, + name_transformations: Optional[Dict[str, str]] = None, + traceloop_attributes: Optional[Dict[str, Any]] = None, + span_filter: Optional[Callable[[ReadableSpan], bool]] = None, + generator: Optional[TraceloopSpanGenerator] = None, + rules: Optional[List[TransformationRule]] = None, + load_env_rules: bool = True, + ): + """ + Initialize the Traceloop span processor. + + Args: + attribute_transformations: Rules for transforming span attributes + name_transformations: Rules for transforming span names + traceloop_attributes: Additional Traceloop-specific attributes to add + span_filter: Optional filter function to determine which spans to transform + generator: Optional custom TraceloopSpanGenerator + """ + self.attribute_transformations = attribute_transformations or {} + self.name_transformations = name_transformations or {} + self.traceloop_attributes = traceloop_attributes or {} + self.span_filter = span_filter or self._default_span_filter + self.generator = generator or TraceloopSpanGenerator( + capture_content=True + ) + # Load rule set (env + explicit). Explicit rules first for precedence. + env_rules = _load_rules_from_env() if load_env_rules else [] + self.rules: List[TransformationRule] = list(rules or []) + env_rules + if self.rules: + logging.getLogger(__name__).debug( + "TraceloopSpanProcessor loaded %d transformation rules (explicit=%d env=%d)", + len(self.rules), len(rules or []), len(env_rules) + ) + + def _default_span_filter(self, span: ReadableSpan) -> bool: + """Default filter: Transform spans that look like LLM/AI calls. + + Previously this required both a name and at least one attribute. Some + tests (and real-world scenarios) emit spans with meaningful names like + "chat gpt-4" before any model/provider attributes are recorded. We now + allow name-only detection; attributes merely increase confidence. + """ + if not span.name: + return False + + # Check for common LLM/AI span indicators + llm_indicators = [ + "chat", + "completion", + "llm", + "ai", + "gpt", + "claude", + "gemini", + "openai", + "anthropic", + "cohere", + "huggingface", + ] + + span_name_lower = span.name.lower() + for indicator in llm_indicators: + if indicator in span_name_lower: + return True + + # Check attributes for AI/LLM markers (if any attributes present) + if span.attributes: + for attr_key in span.attributes.keys(): + attr_key_lower = str(attr_key).lower() + if any( + marker in attr_key_lower + for marker in ["llm", "ai", "gen_ai", "model"] + ): + return True + return False + + def on_start( + self, span: Span, parent_context: Optional[Context] = None + ) -> None: + """Called when a span is started.""" + pass + + def on_end(self, span: ReadableSpan) -> None: + """ + Called when a span is ended. + """ + try: + # Check if this span should be transformed (cheap heuristic first) + if not self.span_filter(span): + return + # Skip spans we already produced (recursion guard) + if span.attributes and "_traceloop_processed" in span.attributes: + return + + # Determine which transformation set to use + applied_rule: Optional[TransformationRule] = None + for rule in self.rules: + try: + if rule.matches(span): + applied_rule = rule + break + except Exception as match_err: # pragma: no cover - defensive + logging.warning("Rule match error ignored: %s", match_err) + + sentinel = {"_traceloop_processed": True} + if applied_rule is not None: + transform_existing_span_to_telemetry( + existing_span=span, + attribute_transformations=applied_rule.attribute_transformations, + name_transformations=applied_rule.name_transformations, + traceloop_attributes={**applied_rule.traceloop_attributes, **sentinel}, + generator=self.generator, + ) + else: + # Fallback to legacy single-set behavior + transform_existing_span_to_telemetry( + existing_span=span, + attribute_transformations=self.attribute_transformations, + name_transformations=self.name_transformations, + traceloop_attributes={**self.traceloop_attributes, **sentinel}, + generator=self.generator, + ) + + except Exception as e: + # Don't let transformation errors break the original span processing + import logging + + logging.warning( + f"TraceloopSpanProcessor failed to transform span: {e}" + ) + + def shutdown(self) -> None: + """Called when the tracer provider is shutdown.""" + pass + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """Force flush any buffered spans.""" + return True \ No newline at end of file diff --git a/util/opentelemetry-util-genai-dev/tests/test_traceloop_span_processor.py b/util/opentelemetry-util-genai-dev/tests/test_traceloop_span_processor.py new file mode 100644 index 0000000000..7167389b51 --- /dev/null +++ b/util/opentelemetry-util-genai-dev/tests/test_traceloop_span_processor.py @@ -0,0 +1,182 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import json +from typing import List + +import pytest +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace import TracerProvider as SDKTracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +try: # Prefer direct export if available (older versions) + from opentelemetry.sdk.trace.export import InMemorySpanExporter # type: ignore +except ImportError: # pragma: no cover - fallback path for newer versions + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( # type: ignore + InMemorySpanExporter, + ) +from opentelemetry import trace + +from opentelemetry.util.genai.processors.traceloop_span_processor import ( + TraceloopSpanProcessor, + TransformationRule, +) +from opentelemetry.util.genai.processors.traceloop_span_generator import ( + TraceloopSpanGenerator, +) + + +@pytest.fixture +def tracer_provider(): + # Provide a fresh provider per test to avoid cross-test processor leakage + # (we intentionally DO NOT set it globally to keep isolation). + return SDKTracerProvider() + + +@pytest.fixture +def in_memory_exporter(): + return InMemorySpanExporter() + + +@pytest.fixture +def tracer(tracer_provider, in_memory_exporter): + tracer_provider.add_span_processor(SimpleSpanProcessor(in_memory_exporter)) + return tracer_provider.get_tracer(__name__) + + +def _find_transformed_spans(spans: List[ReadableSpan]): + # Heuristic: transformed spans have the sentinel attribute + return [s for s in spans if s.attributes.get("_traceloop_processed")] + + +def test_fallback_single_rule(tracer_provider, tracer, in_memory_exporter): + # Rename an existing attribute instead of adding a new one. + processor = TraceloopSpanProcessor( + attribute_transformations={"rename": {"llm.provider": "service.name"}}, + name_transformations={"chat *": "genai.chat"}, + generator=TraceloopSpanGenerator(tracer=tracer), + ) + tracer_provider.add_span_processor(processor) + + with tracer.start_as_current_span("chat gpt-4") as span: + span.set_attribute("llm.provider", "openai") + + spans = in_memory_exporter.get_finished_spans() + transformed = _find_transformed_spans(spans) + # Original + transformed + assert len(transformed) == 1 + t = transformed[0] + assert t.name == "genai.chat" + # Value preserved from original attribute + assert t.attributes["service.name"] == "openai" + assert t.attributes["_traceloop_processed"] is True + + +def test_rule_precedence(tracer_provider, tracer, in_memory_exporter): + rules = [ + TransformationRule( + match_name="chat *", + attribute_transformations={"rename": {"marker": "first.marker"}}, + name_transformations={"chat *": "first.chat"}, + ), + TransformationRule( + match_name="chat gpt-*", + attribute_transformations={"rename": {"marker": "second.marker"}}, + name_transformations={"chat gpt-*": "second.chat"}, + ), + ] + processor = TraceloopSpanProcessor( + rules=rules, + load_env_rules=False, + generator=TraceloopSpanGenerator(tracer=tracer), + ) + tracer_provider.add_span_processor(processor) + + with tracer.start_as_current_span("chat gpt-4") as span: + span.set_attribute("marker", True) + + spans = in_memory_exporter.get_finished_spans() + transformed = _find_transformed_spans(spans) + assert transformed, "Expected transformed span" + # First rule wins + assert transformed[0].name == "first.chat" + assert transformed[0].attributes.get("first.marker") is True + assert "second.marker" not in transformed[0].attributes + + +def test_env_rule_overrides(tracer_provider, tracer, in_memory_exporter, monkeypatch): + env_spec = { + "rules": [ + { + "match": {"name": "chat *"}, + "attribute_transformations": {"rename": {"marker": "env.marker"}}, + "name_transformations": {"chat *": "env.chat"}, + } + ] + } + monkeypatch.setenv("OTEL_GENAI_SPAN_TRANSFORM_RULES", json.dumps(env_spec)) + + processor = TraceloopSpanProcessor( + attribute_transformations={"rename": {"marker": "fallback.marker"}}, + name_transformations={"chat *": "fallback.chat"}, + generator=TraceloopSpanGenerator(tracer=tracer), + ) + tracer_provider.add_span_processor(processor) + + with tracer.start_as_current_span("chat model") as span: + span.set_attribute("marker", 123) + + spans = in_memory_exporter.get_finished_spans() + transformed = _find_transformed_spans(spans) + assert transformed + span = transformed[0] + assert span.name == "env.chat" # env rule used + assert span.attributes.get("env.marker") == 123 + # Fallback rename should not happen because env rule applied instead + assert "fallback.marker" not in span.attributes + + +def test_recursion_guard(tracer_provider, tracer, in_memory_exporter): + # Span already marked as processed should not be processed again + processor = TraceloopSpanProcessor( + attribute_transformations={"rename": {"foo": "service.name"}}, + generator=TraceloopSpanGenerator(tracer=tracer), + ) + tracer_provider.add_span_processor(processor) + + # Manually create a span that already has sentinel + with tracer.start_as_current_span("chat something") as span: + span.set_attribute("_traceloop_processed", True) + + spans = in_memory_exporter.get_finished_spans() + transformed = _find_transformed_spans(spans) + # Only the original (pre-marked) should exist, no new duplicate + assert len(transformed) == 1 + assert transformed[0].name == "chat something" + + +def test_non_matching_span_not_transformed(tracer_provider, tracer, in_memory_exporter): + processor = TraceloopSpanProcessor( + attribute_transformations={"rename": {"some.attr": "unused"}}, + generator=TraceloopSpanGenerator(tracer=tracer), + ) + tracer_provider.add_span_processor(processor) + + with tracer.start_as_current_span("unrelated operation"): + pass + + spans = in_memory_exporter.get_finished_spans() + transformed = _find_transformed_spans(spans) + assert not transformed From 45df77aaf67bd37a93b5d862cf2e8071af08a421 Mon Sep 17 00:00:00 2001 From: Pavan Sudheendra Date: Mon, 6 Oct 2025 16:35:04 +0100 Subject: [PATCH 2/6] feat: update examples Signed-off-by: Pavan Sudheendra --- .../traceloop_rules_example.py | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/util/opentelemetry-util-genai-dev/examples/traceloop_span_transformation/traceloop_rules_example.py b/util/opentelemetry-util-genai-dev/examples/traceloop_span_transformation/traceloop_rules_example.py index a1670ec56b..53362d222b 100644 --- a/util/opentelemetry-util-genai-dev/examples/traceloop_span_transformation/traceloop_rules_example.py +++ b/util/opentelemetry-util-genai-dev/examples/traceloop_span_transformation/traceloop_rules_example.py @@ -13,16 +13,20 @@ RULE_SPEC = { "rules": [ { + # NOTE: In Python dicts, duplicate keys are overwritten. The earlier + # version used two separate "rename" entries so only the last one + # survived. Combine them into a single mapping and optionally + # remove noisy attributes. "attribute_transformations": { - "rename": {"llm.provider": "ai.system.vendor"}, - "add": {"example.rule": "chat"}, + "rename": { + "traceloop.entity.input": "gen_ai.input.messages", + "traceloop.entity.output": "gen_ai.output.messages", + }, + # Demonstrate removal (uncomment to test): + # "remove": ["debug_info"], }, "name_transformations": {"chat *": "genai.chat"}, - }, - { - "attribute_transformations": {"add": {"processed.embedding": True}}, - "name_transformations": {"*": "genai.embedding"}, - }, + } ] } os.environ["OTEL_GENAI_SPAN_TRANSFORM_RULES"] = json.dumps(RULE_SPEC) @@ -38,7 +42,7 @@ print("Creating spans ...\n") with tracer.start_as_current_span("chat gpt-4") as span: - span.set_attribute("llm.provider", "openai") + span.set_attribute("traceloop.entity.input", "some data") span.set_attribute("debug_info", "remove me if rule had remove") with tracer.start_as_current_span("vector encode") as span: From d44f95944124b14f202355b1c209a98a41110ca2 Mon Sep 17 00:00:00 2001 From: Pavan Sudheendra Date: Tue, 7 Oct 2025 15:27:38 +0100 Subject: [PATCH 3/6] feat: updates Signed-off-by: Pavan Sudheendra --- .../traceloop_rules_example.py | 43 ++++-- .../opentelemetry/util/genai/emitters/span.py | 14 +- .../processors/traceloop_span_processor.py | 130 +++++++++++++++--- .../tests/test_traceloop_span_processor.py | 52 ++++--- 4 files changed, 182 insertions(+), 57 deletions(-) diff --git a/util/opentelemetry-util-genai-dev/examples/traceloop_span_transformation/traceloop_rules_example.py b/util/opentelemetry-util-genai-dev/examples/traceloop_span_transformation/traceloop_rules_example.py index 53362d222b..7da331537f 100644 --- a/util/opentelemetry-util-genai-dev/examples/traceloop_span_transformation/traceloop_rules_example.py +++ b/util/opentelemetry-util-genai-dev/examples/traceloop_span_transformation/traceloop_rules_example.py @@ -6,9 +6,20 @@ import os from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter +from opentelemetry.sdk.trace.export import ( + SimpleSpanProcessor, + ConsoleSpanExporter, +) -from opentelemetry.util.genai.processors.traceloop_span_processor import TraceloopSpanProcessor +from opentelemetry.util.genai.processors.traceloop_span_processor import ( + TraceloopSpanProcessor, +) +"""Example: Traceloop span transformation via handler (implicit handler). + +The TraceloopSpanProcessor now emits via TelemetryHandler by default. You do not +need to instantiate a TelemetryHandler manually unless you want custom provider +or meter wiring. This example relies on the global singleton handler. +""" RULE_SPEC = { "rules": [ @@ -31,19 +42,21 @@ } os.environ["OTEL_GENAI_SPAN_TRANSFORM_RULES"] = json.dumps(RULE_SPEC) -# Set up tracing -provider = TracerProvider() -provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) -# Add the Traceloop processor -provider.add_span_processor(TraceloopSpanProcessor()) -trace.set_tracer_provider(provider) -tracer = trace.get_tracer(__name__) +def run_example(): + # Set up tracing provider and exporter + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) + trace.set_tracer_provider(provider) + tracer = trace.get_tracer(__name__) + + # Add processor (handler emission is default; no explicit TelemetryHandler needed) + provider.add_span_processor(TraceloopSpanProcessor()) -print("Creating spans ...\n") + print("\n== Default handler emission mode ==\n") + with tracer.start_as_current_span("chat gpt-4") as span: + span.set_attribute("traceloop.entity.input", "some data") + span.set_attribute("debug_info", "remove me if rule had remove") -with tracer.start_as_current_span("chat gpt-4") as span: - span.set_attribute("traceloop.entity.input", "some data") - span.set_attribute("debug_info", "remove me if rule had remove") -with tracer.start_as_current_span("vector encode") as span: - span.set_attribute("custom.kind", "embedding") +if __name__ == "main__" or __name__ == "__main__": # dual support + run_example() diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/span.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/span.py index 6130405e8b..674bae968c 100644 --- a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/span.py +++ b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/span.py @@ -254,10 +254,16 @@ def on_start( elif isinstance(invocation, EmbeddingInvocation): self._start_embedding(invocation) else: - # Use operation field for span name (defaults to "chat") - operation = getattr(invocation, "operation", "chat") - model_name = invocation.request_model - span_name = f"{operation} {model_name}" + # Use override if processor supplied one; else operation+model + override = getattr(invocation, "attributes", {}).get( + "gen_ai.override.span_name" + ) + if override: + span_name = str(override) + else: + operation = getattr(invocation, "operation", "chat") + model_name = invocation.request_model + span_name = f"{operation} {model_name}" cm = self._tracer.start_as_current_span( span_name, kind=SpanKind.CLIENT, end_on_exit=False ) diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/traceloop_span_processor.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/traceloop_span_processor.py index 8180e2b147..22af673f13 100644 --- a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/traceloop_span_processor.py +++ b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/traceloop_span_processor.py @@ -27,8 +27,11 @@ from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor from opentelemetry.trace import Span -from .span_transformer import transform_existing_span_to_telemetry -from .traceloop_span_generator import TraceloopSpanGenerator +from opentelemetry.util.genai.types import LLMInvocation +from opentelemetry.util.genai.handler import ( + get_telemetry_handler, + TelemetryHandler, +) _ENV_RULES = "OTEL_GENAI_SPAN_TRANSFORM_RULES" @@ -123,9 +126,10 @@ def __init__( name_transformations: Optional[Dict[str, str]] = None, traceloop_attributes: Optional[Dict[str, Any]] = None, span_filter: Optional[Callable[[ReadableSpan], bool]] = None, - generator: Optional[TraceloopSpanGenerator] = None, rules: Optional[List[TransformationRule]] = None, load_env_rules: bool = True, + telemetry_handler: Optional[TelemetryHandler] = None, + # Legacy synthetic span duplication removed – always emit via handler ): """ Initialize the Traceloop span processor. @@ -135,18 +139,15 @@ def __init__( name_transformations: Rules for transforming span names traceloop_attributes: Additional Traceloop-specific attributes to add span_filter: Optional filter function to determine which spans to transform - generator: Optional custom TraceloopSpanGenerator """ self.attribute_transformations = attribute_transformations or {} self.name_transformations = name_transformations or {} self.traceloop_attributes = traceloop_attributes or {} self.span_filter = span_filter or self._default_span_filter - self.generator = generator or TraceloopSpanGenerator( - capture_content=True - ) # Load rule set (env + explicit). Explicit rules first for precedence. env_rules = _load_rules_from_env() if load_env_rules else [] self.rules: List[TransformationRule] = list(rules or []) + env_rules + self.telemetry_handler = telemetry_handler if self.rules: logging.getLogger(__name__).debug( "TraceloopSpanProcessor loaded %d transformation rules (explicit=%d env=%d)", @@ -224,22 +225,31 @@ def on_end(self, span: ReadableSpan) -> None: logging.warning("Rule match error ignored: %s", match_err) sentinel = {"_traceloop_processed": True} + # Decide which transformation config to apply if applied_rule is not None: - transform_existing_span_to_telemetry( - existing_span=span, - attribute_transformations=applied_rule.attribute_transformations, - name_transformations=applied_rule.name_transformations, - traceloop_attributes={**applied_rule.traceloop_attributes, **sentinel}, - generator=self.generator, - ) + attr_tx = applied_rule.attribute_transformations + name_tx = applied_rule.name_transformations + extra_tl_attrs = {**applied_rule.traceloop_attributes, **sentinel} else: - # Fallback to legacy single-set behavior - transform_existing_span_to_telemetry( - existing_span=span, - attribute_transformations=self.attribute_transformations, - name_transformations=self.name_transformations, - traceloop_attributes={**self.traceloop_attributes, **sentinel}, - generator=self.generator, + attr_tx = self.attribute_transformations + name_tx = self.name_transformations + extra_tl_attrs = {**self.traceloop_attributes, **sentinel} + + # Always emit via TelemetryHandler + invocation = self._build_invocation( + span, + attribute_transformations=attr_tx, + name_transformations=name_tx, + traceloop_attributes=extra_tl_attrs, + ) + invocation.attributes.setdefault("_traceloop_processed", True) + handler = self.telemetry_handler or get_telemetry_handler() + try: + handler.start_llm(invocation) + handler.stop_llm(invocation) + except Exception as emit_err: # pragma: no cover - defensive + logging.getLogger(__name__).warning( + "Telemetry handler emission failed: %s", emit_err ) except Exception as e: @@ -256,4 +266,80 @@ def shutdown(self) -> None: def force_flush(self, timeout_millis: int = 30000) -> bool: """Force flush any buffered spans.""" - return True \ No newline at end of file + return True + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + def _apply_attribute_transformations( + self, base: Dict[str, Any], transformations: Optional[Dict[str, Any]] + ) -> Dict[str, Any]: + if not transformations: + return base + remove_keys = transformations.get("remove") or [] + for k in remove_keys: + base.pop(k, None) + rename_map = transformations.get("rename") or {} + for old, new in rename_map.items(): + if old in base: + base[new] = base.pop(old) + add_map = transformations.get("add") or {} + for k, v in add_map.items(): + base[k] = v + return base + + def _derive_new_name( + self, original_name: str, name_transformations: Optional[Dict[str, str]] + ) -> Optional[str]: + if not name_transformations: + return None + import fnmatch + + for pattern, new_name in name_transformations.items(): + try: + if fnmatch.fnmatch(original_name, pattern): + return new_name + except Exception: + continue + return None + + def _build_invocation( + self, + existing_span: ReadableSpan, + *, + attribute_transformations: Optional[Dict[str, Any]] = None, + name_transformations: Optional[Dict[str, str]] = None, + traceloop_attributes: Optional[Dict[str, Any]] = None, + ) -> LLMInvocation: + base_attrs: Dict[str, Any] = ( + dict(existing_span.attributes) if existing_span.attributes else {} + ) + base_attrs = self._apply_attribute_transformations( + base_attrs, attribute_transformations + ) + if traceloop_attributes: + base_attrs.update(traceloop_attributes) + new_name = self._derive_new_name( + existing_span.name, name_transformations + ) + if new_name: + # Provide override for SpanEmitter (we extended it to honor this) + base_attrs.setdefault("gen_ai.override.span_name", new_name) + request_model = ( + base_attrs.get("gen_ai.request.model") + or base_attrs.get("llm.request.model") + or base_attrs.get("ai.model.name") + or "unknown" + ) + invocation = LLMInvocation( + request_model=str(request_model), + attributes=base_attrs, + messages=[], + ) + # Mark operation heuristically from original span name + lowered = existing_span.name.lower() + if lowered.startswith("embed"): + invocation.operation = "embedding" # type: ignore[attr-defined] + elif lowered.startswith("chat"): + invocation.operation = "chat" # type: ignore[attr-defined] + return invocation \ No newline at end of file diff --git a/util/opentelemetry-util-genai-dev/tests/test_traceloop_span_processor.py b/util/opentelemetry-util-genai-dev/tests/test_traceloop_span_processor.py index 7167389b51..9d1e3e3d99 100644 --- a/util/opentelemetry-util-genai-dev/tests/test_traceloop_span_processor.py +++ b/util/opentelemetry-util-genai-dev/tests/test_traceloop_span_processor.py @@ -33,9 +33,8 @@ TraceloopSpanProcessor, TransformationRule, ) -from opentelemetry.util.genai.processors.traceloop_span_generator import ( - TraceloopSpanGenerator, -) + +from opentelemetry.util.genai.handler import TelemetryHandler @pytest.fixture @@ -56,17 +55,22 @@ def tracer(tracer_provider, in_memory_exporter): return tracer_provider.get_tracer(__name__) +@pytest.fixture +def telemetry_handler(tracer_provider): + # Bind handler to the per-test tracer provider so emitted spans flow to test exporter + return TelemetryHandler(tracer_provider=tracer_provider) + + def _find_transformed_spans(spans: List[ReadableSpan]): # Heuristic: transformed spans have the sentinel attribute return [s for s in spans if s.attributes.get("_traceloop_processed")] -def test_fallback_single_rule(tracer_provider, tracer, in_memory_exporter): - # Rename an existing attribute instead of adding a new one. +def test_fallback_single_rule(tracer_provider, tracer, in_memory_exporter, telemetry_handler): processor = TraceloopSpanProcessor( attribute_transformations={"rename": {"llm.provider": "service.name"}}, name_transformations={"chat *": "genai.chat"}, - generator=TraceloopSpanGenerator(tracer=tracer), + telemetry_handler=telemetry_handler, ) tracer_provider.add_span_processor(processor) @@ -75,8 +79,7 @@ def test_fallback_single_rule(tracer_provider, tracer, in_memory_exporter): spans = in_memory_exporter.get_finished_spans() transformed = _find_transformed_spans(spans) - # Original + transformed - assert len(transformed) == 1 + assert len(transformed) == 1 # only handler-emitted span carries sentinel t = transformed[0] assert t.name == "genai.chat" # Value preserved from original attribute @@ -84,7 +87,7 @@ def test_fallback_single_rule(tracer_provider, tracer, in_memory_exporter): assert t.attributes["_traceloop_processed"] is True -def test_rule_precedence(tracer_provider, tracer, in_memory_exporter): +def test_rule_precedence(tracer_provider, tracer, in_memory_exporter, telemetry_handler): rules = [ TransformationRule( match_name="chat *", @@ -100,7 +103,7 @@ def test_rule_precedence(tracer_provider, tracer, in_memory_exporter): processor = TraceloopSpanProcessor( rules=rules, load_env_rules=False, - generator=TraceloopSpanGenerator(tracer=tracer), + telemetry_handler=telemetry_handler, ) tracer_provider.add_span_processor(processor) @@ -116,7 +119,7 @@ def test_rule_precedence(tracer_provider, tracer, in_memory_exporter): assert "second.marker" not in transformed[0].attributes -def test_env_rule_overrides(tracer_provider, tracer, in_memory_exporter, monkeypatch): +def test_env_rule_overrides(tracer_provider, tracer, in_memory_exporter, monkeypatch, telemetry_handler): env_spec = { "rules": [ { @@ -131,7 +134,7 @@ def test_env_rule_overrides(tracer_provider, tracer, in_memory_exporter, monkeyp processor = TraceloopSpanProcessor( attribute_transformations={"rename": {"marker": "fallback.marker"}}, name_transformations={"chat *": "fallback.chat"}, - generator=TraceloopSpanGenerator(tracer=tracer), + telemetry_handler=telemetry_handler, ) tracer_provider.add_span_processor(processor) @@ -148,11 +151,11 @@ def test_env_rule_overrides(tracer_provider, tracer, in_memory_exporter, monkeyp assert "fallback.marker" not in span.attributes -def test_recursion_guard(tracer_provider, tracer, in_memory_exporter): +def test_recursion_guard(tracer_provider, tracer, in_memory_exporter, telemetry_handler): # Span already marked as processed should not be processed again processor = TraceloopSpanProcessor( attribute_transformations={"rename": {"foo": "service.name"}}, - generator=TraceloopSpanGenerator(tracer=tracer), + telemetry_handler=telemetry_handler, ) tracer_provider.add_span_processor(processor) @@ -167,10 +170,10 @@ def test_recursion_guard(tracer_provider, tracer, in_memory_exporter): assert transformed[0].name == "chat something" -def test_non_matching_span_not_transformed(tracer_provider, tracer, in_memory_exporter): +def test_non_matching_span_not_transformed(tracer_provider, tracer, in_memory_exporter, telemetry_handler): processor = TraceloopSpanProcessor( attribute_transformations={"rename": {"some.attr": "unused"}}, - generator=TraceloopSpanGenerator(tracer=tracer), + telemetry_handler=telemetry_handler, ) tracer_provider.add_span_processor(processor) @@ -180,3 +183,20 @@ def test_non_matching_span_not_transformed(tracer_provider, tracer, in_memory_ex spans = in_memory_exporter.get_finished_spans() transformed = _find_transformed_spans(spans) assert not transformed + + +def test_handler_emission_default(tracer_provider, tracer, in_memory_exporter, telemetry_handler): + processor = TraceloopSpanProcessor( + attribute_transformations={"rename": {"orig.attr": "renamed.attr"}}, + name_transformations={"chat *": "genai.chat"}, + telemetry_handler=telemetry_handler, + ) + tracer_provider.add_span_processor(processor) + with tracer.start_as_current_span("chat model-x") as span: + span.set_attribute("orig.attr", 42) + spans = in_memory_exporter.get_finished_spans() + telemetry_spans = [s for s in spans if s.name == "genai.chat"] + assert telemetry_spans, "Expected telemetry span emitted via handler (default mode)" + tel = telemetry_spans[0] + assert tel.attributes.get("renamed.attr") == 42 + assert tel.attributes.get("_traceloop_processed") is True From 471ed5bdf5abfc8fa14e1101a7f7aa44f7406830 Mon Sep 17 00:00:00 2001 From: Pavan Sudheendra Date: Tue, 7 Oct 2025 18:51:29 +0100 Subject: [PATCH 4/6] feat: update README file name Signed-off-by: Pavan Sudheendra --- .../README.translator.md | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 util/opentelemetry-util-genai-dev/README.translator.md diff --git a/util/opentelemetry-util-genai-dev/README.translator.md b/util/opentelemetry-util-genai-dev/README.translator.md new file mode 100644 index 0000000000..46b59dc6d9 --- /dev/null +++ b/util/opentelemetry-util-genai-dev/README.translator.md @@ -0,0 +1,41 @@ +# Translator + +## Automatic Span Processing (Recommended) + +Add `TraceloopSpanProcessor` to your TracerProvider to automatically transform all matching spans: + +```python +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.util.genai.processors import TraceloopSpanProcessor + +# Set up tracer provider +provider = TracerProvider() + +# Add processor - transforms all matching spans automatically +processor = TraceloopSpanProcessor( + attribute_transformations={ + "remove": ["debug_info"], + "rename": {"model_ver": "llm.model.version"}, + "add": {"service.name": "my-llm"} + }, + name_transformations={"chat *": "llm.openai.chat"}, + traceloop_attributes={ + "traceloop.entity.name": "MyLLMEntity" + } +) +provider.add_span_processor(processor) +trace.set_tracer_provider(provider) + +``` + +## Transformation Rules + +### Attributes +- **Remove**: `"remove": ["field1", "field2"]` +- **Rename**: `"rename": {"old_name": "new_name"}` +- **Add**: `"add": {"key": "value"}` + +### Span Names +- **Direct**: `"old name": "new name"` +- **Pattern**: `"chat *": "llm.chat"` (wildcard matching) \ No newline at end of file From 10578ca514e22cd8817e52d04a992d0679256f0e Mon Sep 17 00:00:00 2001 From: Pavan Sudheendra Date: Wed, 8 Oct 2025 14:44:24 +0100 Subject: [PATCH 5/6] feat: rewrite, switch to emitters Signed-off-by: Pavan Sudheendra --- .../README.traceloop_translator.md | 90 +++++ .../traceloop_rules_example.py | 103 ++++-- .../util/genai/emitters/configuration.py | 20 + .../genai/emitters/traceloop_translator.py | 120 ++++++ .../util/genai/processors/__init__.py | 6 - .../util/genai/processors/span_transformer.py | 104 ------ .../processors/traceloop_span_generator.py | 62 ---- .../processors/traceloop_span_processor.py | 345 ------------------ .../tests/test_traceloop_span_processor.py | 202 ---------- .../test_traceloop_translator_emitter.py | 137 +++++++ 10 files changed, 431 insertions(+), 758 deletions(-) create mode 100644 util/opentelemetry-util-genai-dev/README.traceloop_translator.md create mode 100644 util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/traceloop_translator.py delete mode 100644 util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/__init__.py delete mode 100644 util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/span_transformer.py delete mode 100644 util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/traceloop_span_generator.py delete mode 100644 util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/traceloop_span_processor.py delete mode 100644 util/opentelemetry-util-genai-dev/tests/test_traceloop_span_processor.py create mode 100644 util/opentelemetry-util-genai-dev/tests/test_traceloop_translator_emitter.py diff --git a/util/opentelemetry-util-genai-dev/README.traceloop_translator.md b/util/opentelemetry-util-genai-dev/README.traceloop_translator.md new file mode 100644 index 0000000000..34cb49d8ae --- /dev/null +++ b/util/opentelemetry-util-genai-dev/README.traceloop_translator.md @@ -0,0 +1,90 @@ +# Traceloop -> GenAI Semantic Convention Translator Emitter + +This optional emitter promotes legacy `traceloop.*` attributes attached to an `LLMInvocation` into +Semantic Convention (or forward-looking custom `gen_ai.*`) attributes **before** the standard +Semantic Convention span emitter runs. It does **not** create its own span. + +## Why Use It? +If you have upstream code (or the Traceloop compat emitter) producing `traceloop.*` keys but you +want downstream dashboards/tools to rely on GenAI semantic conventions, enabling this translator +lets you transition without rewriting upstream code immediately. + +## What It Does +At `on_start` of an `LLMInvocation` it scans `invocation.attributes` for keys beginning with +`traceloop.` and (non-destructively) adds corresponding keys: + +| Traceloop Key (prefixed or raw) | Added Key | Notes | +|---------------------------------|---------------------------|-------| +| `traceloop.workflow.name` / `workflow.name` | `gen_ai.workflow.name` | Custom (not yet in spec) | +| `traceloop.entity.name` / `entity.name` | `gen_ai.agent.name` | Approximates entity as agent name | +| `traceloop.entity.path` / `entity.path` | `gen_ai.workflow.path` | Custom placeholder | +| `traceloop.callback.name` / `callback.name` | `gen_ai.callback.name` | Also sets `gen_ai.operation.source` if absent | +| `traceloop.callback.id` / `callback.id` | `gen_ai.callback.id` | Custom | +| `traceloop.entity.input` / `entity.input` | `gen_ai.input.messages` | Serialized form already present | +| `traceloop.entity.output` / `entity.output` | `gen_ai.output.messages`| Serialized form already present | + +Existing `gen_ai.*` keys are never overwritten. + +## Enabling +Fast path (no entry point needed): + +```bash +export OTEL_GENAI_ENABLE_TRACELOOP_TRANSLATOR=1 +export OTEL_INSTRUMENTATION_GENAI_EMITTERS=span,traceloop_compat + +Optional (remove original traceloop.* after promotion): +export OTEL_GENAI_TRACELOOP_TRANSLATOR_STRIP_LEGACY=1 +``` + +The flag auto-prepends the translator before the semantic span emitter. You can still add +`traceloop_translator` explicitly once an entry point is created. + +You can also load this emitter the same way as other extra emitters. There are two common patterns: + +### 1. Via `OTEL_INSTRUMENTATION_GENAI_EMITTERS` with an extra token +If your emitter loading logic supports extra entry-point based names directly (depending on branch state), add the translator token (e.g. `traceloop_translator`). Example: + +```bash +export OTEL_INSTRUMENTATION_GENAI_EMITTERS=span,traceloop_translator,traceloop_compat +``` + +Ordering is important: we request placement `before=semconv_span` in the spec, but if your environment override reorders span emitters you can enforce explicitly (see next section). + +### 2. Using Category Override Environment Variable +If your build supports category overrides (as implemented in `configuration.py`), you can prepend: + +```bash +export OTEL_INSTRUMENTATION_GENAI_EMITTERS=span,traceloop_compat +export OTEL_INSTRUMENTATION_GENAI_EMITTERS_SPAN=prepend:TraceloopTranslator +``` + +The override ensures the translator emitter runs before the semantic span emitter regardless of default resolution order. + +## Example +Minimal Python snippet (assuming emitters are loaded via entry points and the translator is installed): + +```python +from opentelemetry.util.genai.handler import get_telemetry_handler +from opentelemetry.util.genai.types import LLMInvocation, InputMessage, OutputMessage, Text + +inv = LLMInvocation( + request_model="gpt-4", + input_messages=[InputMessage(role="user", parts=[Text("Hello")])], + attributes={ + "traceloop.entity.name": "ChatLLM", + "traceloop.workflow.name": "user_flow", + "traceloop.callback.name": "root_chain", + "traceloop.entity.input": "[{'role':'user','content':'Hello'}]", + }, +) +handler = get_telemetry_handler() +handler.start_llm(inv) +inv.output_messages = [OutputMessage(role="assistant", parts=[Text("Hi")], finish_reason="stop")] +handler.stop_llm(inv) +# Result: final semantic span contains gen_ai.agent.name, gen_ai.workflow.name, gen_ai.input.messages, etc. +``` + +## Non-Goals +- It does not remove or rename original `traceloop.*` attributes (no destructive behavior yet). +- It does not attempt deep semantic inference; mappings are intentionally conservative. +- It does not serialize messages itself—relies on upstream emitters to have placed serialized content already. diff --git a/util/opentelemetry-util-genai-dev/examples/traceloop_span_transformation/traceloop_rules_example.py b/util/opentelemetry-util-genai-dev/examples/traceloop_span_transformation/traceloop_rules_example.py index 7da331537f..479574b792 100644 --- a/util/opentelemetry-util-genai-dev/examples/traceloop_span_transformation/traceloop_rules_example.py +++ b/util/opentelemetry-util-genai-dev/examples/traceloop_span_transformation/traceloop_rules_example.py @@ -2,8 +2,34 @@ from __future__ import annotations -import json -import os +"""Example: Emitting Traceloop-compatible spans and translating legacy attributes. + +This example shows how to enable the external Traceloop compatibility emitter +(`traceloop_compat`) alongside standard semantic convention spans. The legacy +TraceloopSpanProcessor & transformation rules have been removed. + +Prerequisites: + pip install opentelemetry-util-genai-emitters-traceloop + +Environment (basic – compat only): + export OTEL_INSTRUMENTATION_GENAI_EMITTERS=traceloop_compat + +Environment (semantic + compat + translator promotion – simple flag): + export OTEL_GENAI_ENABLE_TRACELOOP_TRANSLATOR=1 + export OTEL_INSTRUMENTATION_GENAI_EMITTERS=span,traceloop_compat + +Alternative (explicit token if registered via entry point): + export OTEL_INSTRUMENTATION_GENAI_EMITTERS=span,traceloop_translator,traceloop_compat + (If ordering needs enforcement you can use category override, e.g. + export OTEL_INSTRUMENTATION_GENAI_EMITTERS_SPAN=prepend:TraceloopTranslator ) + +Optional: capture message content (both span + event): + export OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGES=both + +Run this example to see two spans per invocation: the semconv span and the +Traceloop-compatible span. +""" + from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ( @@ -11,52 +37,51 @@ ConsoleSpanExporter, ) -from opentelemetry.util.genai.processors.traceloop_span_processor import ( - TraceloopSpanProcessor, +from opentelemetry.util.genai.handler import get_telemetry_handler +from opentelemetry.util.genai.types import ( + LLMInvocation, + InputMessage, + OutputMessage, + Text, ) -"""Example: Traceloop span transformation via handler (implicit handler). - -The TraceloopSpanProcessor now emits via TelemetryHandler by default. You do not -need to instantiate a TelemetryHandler manually unless you want custom provider -or meter wiring. This example relies on the global singleton handler. -""" -RULE_SPEC = { - "rules": [ - { - # NOTE: In Python dicts, duplicate keys are overwritten. The earlier - # version used two separate "rename" entries so only the last one - # survived. Combine them into a single mapping and optionally - # remove noisy attributes. - "attribute_transformations": { - "rename": { - "traceloop.entity.input": "gen_ai.input.messages", - "traceloop.entity.output": "gen_ai.output.messages", - }, - # Demonstrate removal (uncomment to test): - # "remove": ["debug_info"], - }, - "name_transformations": {"chat *": "genai.chat"}, - } - ] -} -os.environ["OTEL_GENAI_SPAN_TRANSFORM_RULES"] = json.dumps(RULE_SPEC) def run_example(): - # Set up tracing provider and exporter provider = TracerProvider() provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) trace.set_tracer_provider(provider) - tracer = trace.get_tracer(__name__) - # Add processor (handler emission is default; no explicit TelemetryHandler needed) - provider.add_span_processor(TraceloopSpanProcessor()) + # Build a telemetry handler (singleton) – emitters are chosen via env vars + handler = get_telemetry_handler(tracer_provider=provider) + + # Include a few illustrative Traceloop-style attributes. + # These will be mapped/prefixed automatically by the Traceloop compat emitter. + invocation = LLMInvocation( + request_model="gpt-4", + input_messages=[InputMessage(role="user", parts=[Text("Hello")])], + attributes={ + "custom.attribute": "value", # arbitrary user attribute + "traceloop.entity.name": "ChatLLM", + "traceloop.workflow.name": "main_flow", + "traceloop.entity.path": "root/branch/leaf", + "traceloop.entity.input": "Hi" + }, + ) + + handler.start_llm(invocation) + # Simulate model output + invocation.output_messages = [ + OutputMessage( + role="assistant", parts=[Text("Hi there!")], finish_reason="stop" + ) + ] + handler.stop_llm(invocation) - print("\n== Default handler emission mode ==\n") - with tracer.start_as_current_span("chat gpt-4") as span: - span.set_attribute("traceloop.entity.input", "some data") - span.set_attribute("debug_info", "remove me if rule had remove") + print("\nInvocation complete. Check exporter output above for:" + "\n * SemanticConvention span containing promoted gen_ai.* keys" + "\n * Traceloop compat span (legacy format)" + "\nIf translator emitter enabled, attributes like gen_ai.agent.name should be present.\n") -if __name__ == "main__" or __name__ == "__main__": # dual support +if __name__ == "__main__": run_example() diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/configuration.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/configuration.py index d66d45c00a..62ca98577a 100644 --- a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/configuration.py +++ b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/configuration.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import os from dataclasses import dataclass from types import MethodType from typing import Any, Dict, Iterable, List, Sequence @@ -99,6 +100,25 @@ def _register(spec: EmitterSpec) -> None: target.append(spec) spec_registry[spec.name] = spec + if os.getenv("OTEL_GENAI_ENABLE_TRACELOOP_TRANSLATOR"): + try: + from .traceloop_translator import ( + TraceloopTranslatorEmitter, # type: ignore + ) + + _register( + EmitterSpec( + name="TraceloopTranslator", + category=_CATEGORY_SPAN, + factory=lambda ctx: TraceloopTranslatorEmitter(), + mode="prepend", # ensure it runs before semantic span emitter + ) + ) + except Exception: # pragma: no cover - defensive + _logger.exception( + "Failed to initialize TraceloopTranslator emitter despite flag set" + ) + if settings.enable_span and not settings.only_traceloop_compat: _register( EmitterSpec( diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/traceloop_translator.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/traceloop_translator.py new file mode 100644 index 0000000000..c861ecfd22 --- /dev/null +++ b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/emitters/traceloop_translator.py @@ -0,0 +1,120 @@ +"""Traceloop -> GenAI Semantic Convention translation emitter. + +This emitter runs early in the span category chain and *mutates* the invocation +attributes in-place, translating a subset of legacy ``traceloop.*`` attributes +into semantic convention (``gen_ai.*``) or structured invocation fields so that +subsequent emitters (e.g. the primary semconv span emitter) naturally record +the standardized form. + +It intentionally does NOT emit its own span. It simply rewrites data. + +If both the original TraceloopCompatEmitter and this translator are enabled, +the pipeline order should be: translator -> semconv span -> traceloop compat span. +The translator only promotes data; it does not delete the legacy attributes by +default (configurable via env var in future if needed). +""" + +from __future__ import annotations + +import os +from typing import Any, Dict, Optional + +from .spec import EmitterFactoryContext, EmitterSpec +from ..interfaces import EmitterMeta +from ..types import LLMInvocation + +# Mapping from traceloop attribute key (without prefix) to either: +# - a gen_ai semantic convention attribute key +# - a special handler function name (prefixed with "@") for structured placement. +_TRACELOOP_TO_SEMCONV: Dict[str, str] = { + "workflow.name": "gen_ai.workflow.name", # custom (not in spec yet) + "entity.name": "gen_ai.agent.name", # approximate: treat entity as agent name + "entity.path": "gen_ai.workflow.path", # custom placeholder (maps from traceloop.entity.path or entity.path) + # callback metadata (custom placeholders until standardized) + "callback.name": "gen_ai.callback.name", + "callback.id": "gen_ai.callback.id", + # span.kind is redundant (SpanKind already encodes); omitted +} + +# Input/output content attributes – when present we map them to message serialization +# helpers by copying into invocation.attributes under semconv-like provisional keys. +_CONTENT_MAPPING = { + "entity.input": "gen_ai.input.messages", + "entity.output": "gen_ai.output.messages", +} + + +_STRIP_FLAG = "OTEL_GENAI_TRACELOOP_TRANSLATOR_STRIP_LEGACY" + + +class TraceloopTranslatorEmitter(EmitterMeta): + role = "span" + name = "traceloop_translator" + + def __init__(self) -> None: # no tracer needed – we do not create spans + pass + + def handles(self, obj: object) -> bool: # only care about LLM invocations + return isinstance(obj, LLMInvocation) + + def on_start(self, invocation: LLMInvocation) -> None: # mutate attributes + attrs = getattr(invocation, "attributes", None) + if not attrs: + return + strip_legacy = bool(os.getenv(_STRIP_FLAG)) + for key in list(attrs.keys()): + value = attrs.get(key) + is_prefixed = False + if key.startswith("traceloop."): + raw_key = key[len("traceloop.") :] + is_prefixed = True + elif key in _TRACELOOP_TO_SEMCONV or key in _CONTENT_MAPPING: + raw_key = key + else: + continue + + # Content mapping + if raw_key in _CONTENT_MAPPING: + target = _CONTENT_MAPPING[raw_key] + attrs.setdefault(target, value) + else: + mapped = _TRACELOOP_TO_SEMCONV.get(raw_key) + if mapped: + attrs.setdefault(mapped, value) + if raw_key == "callback.name" and isinstance(value, str): + attrs.setdefault("gen_ai.operation.source", value) + + # Optionally remove legacy prefixed variant after promotion + if strip_legacy and is_prefixed: + try: + attrs.pop(key, None) + except Exception: # pragma: no cover - defensive + pass + + # No-op finish & error hooks – translation is only needed once. + def on_end(self, invocation: LLMInvocation) -> None: # pragma: no cover - trivial + return + + def on_error(self, error, invocation: LLMInvocation) -> None: # pragma: no cover - trivial + return + + +def traceloop_translator_emitters() -> list[EmitterSpec]: + def _factory(ctx: EmitterFactoryContext) -> TraceloopTranslatorEmitter: + return TraceloopTranslatorEmitter() + + return [ + EmitterSpec( + name="TraceloopTranslator", + category="span", + factory=_factory, + mode="prepend", # ensure earliest so promotion happens before SemanticConvSpan is added + after=(), + ) + ] + + +__all__ = [ + "TraceloopTranslatorEmitter", + "traceloop_translator_emitters", +] diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/__init__.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/__init__.py deleted file mode 100644 index 23e5d9a1c5..0000000000 --- a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -from .traceloop_span_processor import TraceloopSpanProcessor, TransformationRule - -__all__ = [ - "TraceloopSpanProcessor", - "TransformationRule", -] \ No newline at end of file diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/span_transformer.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/span_transformer.py deleted file mode 100644 index 96348bb0d2..0000000000 --- a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/span_transformer.py +++ /dev/null @@ -1,104 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Any, Dict, Optional - -from opentelemetry.sdk.trace import ReadableSpan - -from .traceloop_span_generator import TraceloopSpanGenerator -from ..types import LLMInvocation - - -def _apply_attribute_transformations( - base: Dict[str, Any], transformations: Optional[Dict[str, Any]] -) -> Dict[str, Any]: # pragma: no cover - trivial helpers - if not transformations: - return base - # Order: remove -> rename -> add (so add always wins) - remove_keys = transformations.get("remove") or [] - for k in remove_keys: - base.pop(k, None) - rename_map = transformations.get("rename") or {} - for old, new in rename_map.items(): - if old in base: - base[new] = base.pop(old) - add_map = transformations.get("add") or {} - for k, v in add_map.items(): - base[k] = v - return base - - -def _derive_new_name( - original_name: str, name_transformations: Optional[Dict[str, str]] -) -> Optional[str]: # pragma: no cover - simple matching - if not name_transformations: - return None - import fnmatch - - for pattern, new_name in name_transformations.items(): - try: - if fnmatch.fnmatch(original_name, pattern): - return new_name - except Exception: # defensive - continue - return None - - -def transform_existing_span_to_telemetry( - existing_span: ReadableSpan, - attribute_transformations: Optional[Dict[str, Any]] = None, - name_transformations: Optional[Dict[str, str]] = None, - traceloop_attributes: Optional[Dict[str, Any]] = None, - generator: Optional[TraceloopSpanGenerator] = None, -) -> LLMInvocation: - """Create a synthetic LLMInvocation span from an ended (or ending) span. - - Returns the synthetic ``LLMInvocation`` used purely as a carrier for the new span. - """ - base_attrs: Dict[str, Any] = ( - dict(existing_span.attributes) if existing_span.attributes else {} - ) - - # Apply transformations - base_attrs = _apply_attribute_transformations( - base_attrs, attribute_transformations - ) - if traceloop_attributes: - base_attrs.update(traceloop_attributes) - - # Span name rewrite (store so generator can use & remove later) - new_name = _derive_new_name(existing_span.name, name_transformations) - if new_name: - base_attrs["_traceloop_new_name"] = new_name - - # Determine request_model (best-effort, fallback to unknown) - request_model = ( - base_attrs.get("gen_ai.request.model") - or base_attrs.get("llm.request.model") - or base_attrs.get("ai.model.name") - or "unknown" - ) - - invocation = LLMInvocation( - request_model=str(request_model), - attributes=base_attrs, - messages=[], # empty; original content not reconstructed here - ) - - if generator is None: - generator = TraceloopSpanGenerator(capture_content=True) - generator.start(invocation) - if existing_span.end_time is not None: - generator.finish(invocation) - return invocation diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/traceloop_span_generator.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/traceloop_span_generator.py deleted file mode 100644 index c3e7ec00b0..0000000000 --- a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/traceloop_span_generator.py +++ /dev/null @@ -1,62 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Optional -from opentelemetry import trace -from opentelemetry.trace import Tracer -from ..types import LLMInvocation - -class TraceloopSpanGenerator: - def __init__(self, tracer: Optional[Tracer] = None, capture_content: bool = False): - self._tracer = tracer or trace.get_tracer(__name__) - self._capture_content = capture_content - - def start(self, invocation: LLMInvocation): - override = getattr(invocation, "attributes", {}).get("_traceloop_new_name") - if override: - span_name = override - else: - name = getattr(invocation, "request_model", "llm") - span_name = f"chat {name}" if not str(name).startswith("chat ") else str(name) - span = self._tracer.start_span(span_name, kind=trace.SpanKind.CLIENT) - invocation.span = span - invocation.context_token = trace.use_span(span, end_on_exit=False) - invocation.context_token.__enter__() - # apply starting attributes - for k, v in getattr(invocation, "attributes", {}).items(): - try: - span.set_attribute(k, v) - except Exception: - pass - - def finish(self, invocation: LLMInvocation): - span = getattr(invocation, "span", None) - if not span: - return - # re-apply attributes (after transformations) - for k, v in getattr(invocation, "attributes", {}).items(): - try: - span.set_attribute(k, v) - except Exception: - pass - token = getattr(invocation, "context_token", None) - if token and hasattr(token, "__exit__"): - try: - token.__exit__(None, None, None) - except Exception: - pass - span.end() - - def error(self, error, invocation: LLMInvocation): # pragma: no cover - unused in tests now - self.finish(invocation) diff --git a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/traceloop_span_processor.py b/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/traceloop_span_processor.py deleted file mode 100644 index 22af673f13..0000000000 --- a/util/opentelemetry-util-genai-dev/src/opentelemetry/util/genai/processors/traceloop_span_processor.py +++ /dev/null @@ -1,345 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from __future__ import annotations - -import fnmatch -import json -import logging -import os -import re -from dataclasses import dataclass, field -from typing import Any, Callable, Dict, List, Optional - -from opentelemetry.context import Context -from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor -from opentelemetry.trace import Span - -from opentelemetry.util.genai.types import LLMInvocation -from opentelemetry.util.genai.handler import ( - get_telemetry_handler, - TelemetryHandler, -) - -_ENV_RULES = "OTEL_GENAI_SPAN_TRANSFORM_RULES" - - -@dataclass -class TransformationRule: - """Represents a single conditional transformation rule. - - Fields map closely to the JSON structure accepted via the environment - variable. All fields are optional; empty rule never matches. - """ - - match_name: Optional[str] = None # glob pattern (e.g. "chat *") - match_scope: Optional[str] = None # regex or substring (case-insensitive) - match_attributes: Dict[str, Optional[str]] = field(default_factory=dict) - - attribute_transformations: Dict[str, Any] = field(default_factory=dict) - name_transformations: Dict[str, str] = field(default_factory=dict) - traceloop_attributes: Dict[str, Any] = field(default_factory=dict) - - def matches(self, span: ReadableSpan) -> bool: # pragma: no cover - simple logic - if self.match_name: - if not fnmatch.fnmatch(span.name, self.match_name): - return False - if self.match_scope: - scope = getattr(span, "instrumentation_scope", None) - scope_name = getattr(scope, "name", "") if scope else "" - pattern = self.match_scope - # Accept either regex (contains meta chars) or simple substring - try: - if any(ch in pattern for ch in ".^$|()[]+?\\"): - if not re.search(pattern, scope_name, re.IGNORECASE): - return False - else: - if pattern.lower() not in scope_name.lower(): - return False - except re.error: - # Bad regex – treat as non-match but log once - logging.warning("Invalid regex in match_scope: %s", pattern) - return False - if self.match_attributes: - for k, expected in self.match_attributes.items(): - if k not in span.attributes: - return False - if expected is not None and str(span.attributes.get(k)) != str(expected): - return False - return True - - -def _load_rules_from_env() -> List[TransformationRule]: - raw = os.getenv(_ENV_RULES) - if not raw: - return [] - try: - data = json.loads(raw) - rules_spec = data.get("rules") if isinstance(data, dict) else None - if not isinstance(rules_spec, list): - logging.warning("%s must contain a 'rules' list", _ENV_RULES) - return [] - rules: List[TransformationRule] = [] - for r in rules_spec: - if not isinstance(r, dict): - continue - match = r.get("match", {}) if isinstance(r.get("match"), dict) else {} - rules.append( - TransformationRule( - match_name=match.get("name"), - match_scope=match.get("scope"), - match_attributes=match.get("attributes", {}) or {}, - attribute_transformations=r.get("attribute_transformations", {}) or {}, - name_transformations=r.get("name_transformations", {}) or {}, - traceloop_attributes=r.get("traceloop_attributes", {}) or {}, - ) - ) - return rules - except Exception as exc: # broad: we never want to break app startup - logging.warning("Failed to parse %s: %s", _ENV_RULES, exc) - return [] - - -class TraceloopSpanProcessor(SpanProcessor): - """ - A span processor that automatically applies transformation rules to spans. - - This processor can be added to your TracerProvider to automatically transform - all spans according to your transformation rules. - """ - - def __init__( - self, - attribute_transformations: Optional[Dict[str, Any]] = None, - name_transformations: Optional[Dict[str, str]] = None, - traceloop_attributes: Optional[Dict[str, Any]] = None, - span_filter: Optional[Callable[[ReadableSpan], bool]] = None, - rules: Optional[List[TransformationRule]] = None, - load_env_rules: bool = True, - telemetry_handler: Optional[TelemetryHandler] = None, - # Legacy synthetic span duplication removed – always emit via handler - ): - """ - Initialize the Traceloop span processor. - - Args: - attribute_transformations: Rules for transforming span attributes - name_transformations: Rules for transforming span names - traceloop_attributes: Additional Traceloop-specific attributes to add - span_filter: Optional filter function to determine which spans to transform - """ - self.attribute_transformations = attribute_transformations or {} - self.name_transformations = name_transformations or {} - self.traceloop_attributes = traceloop_attributes or {} - self.span_filter = span_filter or self._default_span_filter - # Load rule set (env + explicit). Explicit rules first for precedence. - env_rules = _load_rules_from_env() if load_env_rules else [] - self.rules: List[TransformationRule] = list(rules or []) + env_rules - self.telemetry_handler = telemetry_handler - if self.rules: - logging.getLogger(__name__).debug( - "TraceloopSpanProcessor loaded %d transformation rules (explicit=%d env=%d)", - len(self.rules), len(rules or []), len(env_rules) - ) - - def _default_span_filter(self, span: ReadableSpan) -> bool: - """Default filter: Transform spans that look like LLM/AI calls. - - Previously this required both a name and at least one attribute. Some - tests (and real-world scenarios) emit spans with meaningful names like - "chat gpt-4" before any model/provider attributes are recorded. We now - allow name-only detection; attributes merely increase confidence. - """ - if not span.name: - return False - - # Check for common LLM/AI span indicators - llm_indicators = [ - "chat", - "completion", - "llm", - "ai", - "gpt", - "claude", - "gemini", - "openai", - "anthropic", - "cohere", - "huggingface", - ] - - span_name_lower = span.name.lower() - for indicator in llm_indicators: - if indicator in span_name_lower: - return True - - # Check attributes for AI/LLM markers (if any attributes present) - if span.attributes: - for attr_key in span.attributes.keys(): - attr_key_lower = str(attr_key).lower() - if any( - marker in attr_key_lower - for marker in ["llm", "ai", "gen_ai", "model"] - ): - return True - return False - - def on_start( - self, span: Span, parent_context: Optional[Context] = None - ) -> None: - """Called when a span is started.""" - pass - - def on_end(self, span: ReadableSpan) -> None: - """ - Called when a span is ended. - """ - try: - # Check if this span should be transformed (cheap heuristic first) - if not self.span_filter(span): - return - # Skip spans we already produced (recursion guard) - if span.attributes and "_traceloop_processed" in span.attributes: - return - - # Determine which transformation set to use - applied_rule: Optional[TransformationRule] = None - for rule in self.rules: - try: - if rule.matches(span): - applied_rule = rule - break - except Exception as match_err: # pragma: no cover - defensive - logging.warning("Rule match error ignored: %s", match_err) - - sentinel = {"_traceloop_processed": True} - # Decide which transformation config to apply - if applied_rule is not None: - attr_tx = applied_rule.attribute_transformations - name_tx = applied_rule.name_transformations - extra_tl_attrs = {**applied_rule.traceloop_attributes, **sentinel} - else: - attr_tx = self.attribute_transformations - name_tx = self.name_transformations - extra_tl_attrs = {**self.traceloop_attributes, **sentinel} - - # Always emit via TelemetryHandler - invocation = self._build_invocation( - span, - attribute_transformations=attr_tx, - name_transformations=name_tx, - traceloop_attributes=extra_tl_attrs, - ) - invocation.attributes.setdefault("_traceloop_processed", True) - handler = self.telemetry_handler or get_telemetry_handler() - try: - handler.start_llm(invocation) - handler.stop_llm(invocation) - except Exception as emit_err: # pragma: no cover - defensive - logging.getLogger(__name__).warning( - "Telemetry handler emission failed: %s", emit_err - ) - - except Exception as e: - # Don't let transformation errors break the original span processing - import logging - - logging.warning( - f"TraceloopSpanProcessor failed to transform span: {e}" - ) - - def shutdown(self) -> None: - """Called when the tracer provider is shutdown.""" - pass - - def force_flush(self, timeout_millis: int = 30000) -> bool: - """Force flush any buffered spans.""" - return True - - # ------------------------------------------------------------------ - # Internal helpers - # ------------------------------------------------------------------ - def _apply_attribute_transformations( - self, base: Dict[str, Any], transformations: Optional[Dict[str, Any]] - ) -> Dict[str, Any]: - if not transformations: - return base - remove_keys = transformations.get("remove") or [] - for k in remove_keys: - base.pop(k, None) - rename_map = transformations.get("rename") or {} - for old, new in rename_map.items(): - if old in base: - base[new] = base.pop(old) - add_map = transformations.get("add") or {} - for k, v in add_map.items(): - base[k] = v - return base - - def _derive_new_name( - self, original_name: str, name_transformations: Optional[Dict[str, str]] - ) -> Optional[str]: - if not name_transformations: - return None - import fnmatch - - for pattern, new_name in name_transformations.items(): - try: - if fnmatch.fnmatch(original_name, pattern): - return new_name - except Exception: - continue - return None - - def _build_invocation( - self, - existing_span: ReadableSpan, - *, - attribute_transformations: Optional[Dict[str, Any]] = None, - name_transformations: Optional[Dict[str, str]] = None, - traceloop_attributes: Optional[Dict[str, Any]] = None, - ) -> LLMInvocation: - base_attrs: Dict[str, Any] = ( - dict(existing_span.attributes) if existing_span.attributes else {} - ) - base_attrs = self._apply_attribute_transformations( - base_attrs, attribute_transformations - ) - if traceloop_attributes: - base_attrs.update(traceloop_attributes) - new_name = self._derive_new_name( - existing_span.name, name_transformations - ) - if new_name: - # Provide override for SpanEmitter (we extended it to honor this) - base_attrs.setdefault("gen_ai.override.span_name", new_name) - request_model = ( - base_attrs.get("gen_ai.request.model") - or base_attrs.get("llm.request.model") - or base_attrs.get("ai.model.name") - or "unknown" - ) - invocation = LLMInvocation( - request_model=str(request_model), - attributes=base_attrs, - messages=[], - ) - # Mark operation heuristically from original span name - lowered = existing_span.name.lower() - if lowered.startswith("embed"): - invocation.operation = "embedding" # type: ignore[attr-defined] - elif lowered.startswith("chat"): - invocation.operation = "chat" # type: ignore[attr-defined] - return invocation \ No newline at end of file diff --git a/util/opentelemetry-util-genai-dev/tests/test_traceloop_span_processor.py b/util/opentelemetry-util-genai-dev/tests/test_traceloop_span_processor.py deleted file mode 100644 index 9d1e3e3d99..0000000000 --- a/util/opentelemetry-util-genai-dev/tests/test_traceloop_span_processor.py +++ /dev/null @@ -1,202 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import annotations - -import json -from typing import List - -import pytest -from opentelemetry.sdk.trace import ReadableSpan -from opentelemetry.sdk.trace import TracerProvider as SDKTracerProvider -from opentelemetry.sdk.trace.export import SimpleSpanProcessor -try: # Prefer direct export if available (older versions) - from opentelemetry.sdk.trace.export import InMemorySpanExporter # type: ignore -except ImportError: # pragma: no cover - fallback path for newer versions - from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( # type: ignore - InMemorySpanExporter, - ) -from opentelemetry import trace - -from opentelemetry.util.genai.processors.traceloop_span_processor import ( - TraceloopSpanProcessor, - TransformationRule, -) - -from opentelemetry.util.genai.handler import TelemetryHandler - - -@pytest.fixture -def tracer_provider(): - # Provide a fresh provider per test to avoid cross-test processor leakage - # (we intentionally DO NOT set it globally to keep isolation). - return SDKTracerProvider() - - -@pytest.fixture -def in_memory_exporter(): - return InMemorySpanExporter() - - -@pytest.fixture -def tracer(tracer_provider, in_memory_exporter): - tracer_provider.add_span_processor(SimpleSpanProcessor(in_memory_exporter)) - return tracer_provider.get_tracer(__name__) - - -@pytest.fixture -def telemetry_handler(tracer_provider): - # Bind handler to the per-test tracer provider so emitted spans flow to test exporter - return TelemetryHandler(tracer_provider=tracer_provider) - - -def _find_transformed_spans(spans: List[ReadableSpan]): - # Heuristic: transformed spans have the sentinel attribute - return [s for s in spans if s.attributes.get("_traceloop_processed")] - - -def test_fallback_single_rule(tracer_provider, tracer, in_memory_exporter, telemetry_handler): - processor = TraceloopSpanProcessor( - attribute_transformations={"rename": {"llm.provider": "service.name"}}, - name_transformations={"chat *": "genai.chat"}, - telemetry_handler=telemetry_handler, - ) - tracer_provider.add_span_processor(processor) - - with tracer.start_as_current_span("chat gpt-4") as span: - span.set_attribute("llm.provider", "openai") - - spans = in_memory_exporter.get_finished_spans() - transformed = _find_transformed_spans(spans) - assert len(transformed) == 1 # only handler-emitted span carries sentinel - t = transformed[0] - assert t.name == "genai.chat" - # Value preserved from original attribute - assert t.attributes["service.name"] == "openai" - assert t.attributes["_traceloop_processed"] is True - - -def test_rule_precedence(tracer_provider, tracer, in_memory_exporter, telemetry_handler): - rules = [ - TransformationRule( - match_name="chat *", - attribute_transformations={"rename": {"marker": "first.marker"}}, - name_transformations={"chat *": "first.chat"}, - ), - TransformationRule( - match_name="chat gpt-*", - attribute_transformations={"rename": {"marker": "second.marker"}}, - name_transformations={"chat gpt-*": "second.chat"}, - ), - ] - processor = TraceloopSpanProcessor( - rules=rules, - load_env_rules=False, - telemetry_handler=telemetry_handler, - ) - tracer_provider.add_span_processor(processor) - - with tracer.start_as_current_span("chat gpt-4") as span: - span.set_attribute("marker", True) - - spans = in_memory_exporter.get_finished_spans() - transformed = _find_transformed_spans(spans) - assert transformed, "Expected transformed span" - # First rule wins - assert transformed[0].name == "first.chat" - assert transformed[0].attributes.get("first.marker") is True - assert "second.marker" not in transformed[0].attributes - - -def test_env_rule_overrides(tracer_provider, tracer, in_memory_exporter, monkeypatch, telemetry_handler): - env_spec = { - "rules": [ - { - "match": {"name": "chat *"}, - "attribute_transformations": {"rename": {"marker": "env.marker"}}, - "name_transformations": {"chat *": "env.chat"}, - } - ] - } - monkeypatch.setenv("OTEL_GENAI_SPAN_TRANSFORM_RULES", json.dumps(env_spec)) - - processor = TraceloopSpanProcessor( - attribute_transformations={"rename": {"marker": "fallback.marker"}}, - name_transformations={"chat *": "fallback.chat"}, - telemetry_handler=telemetry_handler, - ) - tracer_provider.add_span_processor(processor) - - with tracer.start_as_current_span("chat model") as span: - span.set_attribute("marker", 123) - - spans = in_memory_exporter.get_finished_spans() - transformed = _find_transformed_spans(spans) - assert transformed - span = transformed[0] - assert span.name == "env.chat" # env rule used - assert span.attributes.get("env.marker") == 123 - # Fallback rename should not happen because env rule applied instead - assert "fallback.marker" not in span.attributes - - -def test_recursion_guard(tracer_provider, tracer, in_memory_exporter, telemetry_handler): - # Span already marked as processed should not be processed again - processor = TraceloopSpanProcessor( - attribute_transformations={"rename": {"foo": "service.name"}}, - telemetry_handler=telemetry_handler, - ) - tracer_provider.add_span_processor(processor) - - # Manually create a span that already has sentinel - with tracer.start_as_current_span("chat something") as span: - span.set_attribute("_traceloop_processed", True) - - spans = in_memory_exporter.get_finished_spans() - transformed = _find_transformed_spans(spans) - # Only the original (pre-marked) should exist, no new duplicate - assert len(transformed) == 1 - assert transformed[0].name == "chat something" - - -def test_non_matching_span_not_transformed(tracer_provider, tracer, in_memory_exporter, telemetry_handler): - processor = TraceloopSpanProcessor( - attribute_transformations={"rename": {"some.attr": "unused"}}, - telemetry_handler=telemetry_handler, - ) - tracer_provider.add_span_processor(processor) - - with tracer.start_as_current_span("unrelated operation"): - pass - - spans = in_memory_exporter.get_finished_spans() - transformed = _find_transformed_spans(spans) - assert not transformed - - -def test_handler_emission_default(tracer_provider, tracer, in_memory_exporter, telemetry_handler): - processor = TraceloopSpanProcessor( - attribute_transformations={"rename": {"orig.attr": "renamed.attr"}}, - name_transformations={"chat *": "genai.chat"}, - telemetry_handler=telemetry_handler, - ) - tracer_provider.add_span_processor(processor) - with tracer.start_as_current_span("chat model-x") as span: - span.set_attribute("orig.attr", 42) - spans = in_memory_exporter.get_finished_spans() - telemetry_spans = [s for s in spans if s.name == "genai.chat"] - assert telemetry_spans, "Expected telemetry span emitted via handler (default mode)" - tel = telemetry_spans[0] - assert tel.attributes.get("renamed.attr") == 42 - assert tel.attributes.get("_traceloop_processed") is True diff --git a/util/opentelemetry-util-genai-dev/tests/test_traceloop_translator_emitter.py b/util/opentelemetry-util-genai-dev/tests/test_traceloop_translator_emitter.py new file mode 100644 index 0000000000..2e04fbd1a5 --- /dev/null +++ b/util/opentelemetry-util-genai-dev/tests/test_traceloop_translator_emitter.py @@ -0,0 +1,137 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import os +import importlib + +import pytest + +from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.types import LLMInvocation, InputMessage, OutputMessage, Text + + +@pytest.fixture(autouse=True) +def clear_env(monkeypatch): + # Ensure flags start unset each test + for key in [ + "OTEL_GENAI_ENABLE_TRACELOOP_TRANSLATOR", + "OTEL_GENAI_TRACELOOP_TRANSLATOR_STRIP_LEGACY", + "OTEL_INSTRUMENTATION_GENAI_EMITTERS", + ]: + monkeypatch.delenv(key, raising=False) + yield + for key in [ + "OTEL_GENAI_ENABLE_TRACELOOP_TRANSLATOR", + "OTEL_GENAI_TRACELOOP_TRANSLATOR_STRIP_LEGACY", + "OTEL_INSTRUMENTATION_GENAI_EMITTERS", + ]: + monkeypatch.delenv(key, raising=False) + + +def _fresh_handler(): + # Force re-parse of env + pipeline rebuild by reloading config-dependent modules + import opentelemetry.util.genai.emitters.configuration as cfg + importlib.reload(cfg) + import opentelemetry.util.genai.handler as handler_mod + importlib.reload(handler_mod) + return handler_mod.TelemetryHandler() + + +def test_translator_promotes_prefixed(monkeypatch): + monkeypatch.setenv("OTEL_GENAI_ENABLE_TRACELOOP_TRANSLATOR", "1") + # Ensure standard span + compat so we can observe merged attributes + monkeypatch.setenv("OTEL_INSTRUMENTATION_GENAI_EMITTERS", "span,traceloop_compat") + + handler = _fresh_handler() + inv = LLMInvocation( + request_model="gpt-4", + input_messages=[InputMessage(role="user", parts=[Text("hi")])], + attributes={ + "traceloop.workflow.name": "main_flow", + "traceloop.entity.name": "AgentX", + "traceloop.entity.path": "root/branch/leaf", + "traceloop.callback.name": "root_chain", + "traceloop.callback.id": "cb-123", + }, + ) + handler.start_llm(inv) + # Translator runs on start; attributes should be promoted now + assert inv.attributes.get("gen_ai.workflow.name") == "main_flow" + assert inv.attributes.get("gen_ai.agent.name") == "AgentX" + assert inv.attributes.get("gen_ai.workflow.path") == "root/branch/leaf" + assert inv.attributes.get("gen_ai.callback.name") == "root_chain" + assert inv.attributes.get("gen_ai.callback.id") == "cb-123" + # Original keys retained by default + assert "traceloop.entity.path" in inv.attributes + + +def test_translator_promotes_raw(monkeypatch): + monkeypatch.setenv("OTEL_GENAI_ENABLE_TRACELOOP_TRANSLATOR", "1") + monkeypatch.setenv("OTEL_INSTRUMENTATION_GENAI_EMITTERS", "span") + handler = _fresh_handler() + inv = LLMInvocation( + request_model="gpt-4", + input_messages=[], + attributes={ + "workflow.name": "flow_raw", + "entity.name": "AgentRaw", + "entity.path": "a/b/c", + }, + ) + handler.start_llm(inv) + assert inv.attributes.get("gen_ai.workflow.name") == "flow_raw" + assert inv.attributes.get("gen_ai.agent.name") == "AgentRaw" + assert inv.attributes.get("gen_ai.workflow.path") == "a/b/c" + + +def test_translator_does_not_overwrite(monkeypatch): + monkeypatch.setenv("OTEL_GENAI_ENABLE_TRACELOOP_TRANSLATOR", "1") + monkeypatch.setenv("OTEL_INSTRUMENTATION_GENAI_EMITTERS", "span") + handler = _fresh_handler() + inv = LLMInvocation( + request_model="gpt-4", + input_messages=[], + attributes={ + "traceloop.workflow.name": "legacy_name", + "gen_ai.workflow.name": "canonical_name", + }, + ) + handler.start_llm(inv) + # Existing canonical value preserved + assert inv.attributes.get("gen_ai.workflow.name") == "canonical_name" + + +def test_translator_strip_legacy(monkeypatch): + monkeypatch.setenv("OTEL_GENAI_ENABLE_TRACELOOP_TRANSLATOR", "1") + monkeypatch.setenv("OTEL_GENAI_TRACELOOP_TRANSLATOR_STRIP_LEGACY", "1") + monkeypatch.setenv("OTEL_INSTRUMENTATION_GENAI_EMITTERS", "span") + handler = _fresh_handler() + inv = LLMInvocation( + request_model="gpt-4", + input_messages=[], + attributes={ + "traceloop.entity.path": "strip/me", + }, + ) + handler.start_llm(inv) + assert inv.attributes.get("gen_ai.workflow.path") == "strip/me" + # Legacy removed + assert "traceloop.entity.path" not in inv.attributes + + +def test_callback_sets_operation_source(monkeypatch): + monkeypatch.setenv("OTEL_GENAI_ENABLE_TRACELOOP_TRANSLATOR", "1") + monkeypatch.setenv("OTEL_INSTRUMENTATION_GENAI_EMITTERS", "span") + handler = _fresh_handler() + inv = LLMInvocation( + request_model="gpt-4", + input_messages=[], + attributes={ + "traceloop.callback.name": "chain_node", + }, + ) + handler.start_llm(inv) + assert inv.attributes.get("gen_ai.callback.name") == "chain_node" + assert inv.attributes.get("gen_ai.operation.source") == "chain_node" From 256a7599e0da04ff128cceaac4a91e52fbb7065c Mon Sep 17 00:00:00 2001 From: Pavan Sudheendra Date: Wed, 8 Oct 2025 14:46:18 +0100 Subject: [PATCH 6/6] feat: update examples Signed-off-by: Pavan Sudheendra --- .../traceloop_rules_example.py | 28 ------------------- 1 file changed, 28 deletions(-) diff --git a/util/opentelemetry-util-genai-dev/examples/traceloop_span_transformation/traceloop_rules_example.py b/util/opentelemetry-util-genai-dev/examples/traceloop_span_transformation/traceloop_rules_example.py index 479574b792..5432f55a10 100644 --- a/util/opentelemetry-util-genai-dev/examples/traceloop_span_transformation/traceloop_rules_example.py +++ b/util/opentelemetry-util-genai-dev/examples/traceloop_span_transformation/traceloop_rules_example.py @@ -2,34 +2,6 @@ from __future__ import annotations -"""Example: Emitting Traceloop-compatible spans and translating legacy attributes. - -This example shows how to enable the external Traceloop compatibility emitter -(`traceloop_compat`) alongside standard semantic convention spans. The legacy -TraceloopSpanProcessor & transformation rules have been removed. - -Prerequisites: - pip install opentelemetry-util-genai-emitters-traceloop - -Environment (basic – compat only): - export OTEL_INSTRUMENTATION_GENAI_EMITTERS=traceloop_compat - -Environment (semantic + compat + translator promotion – simple flag): - export OTEL_GENAI_ENABLE_TRACELOOP_TRANSLATOR=1 - export OTEL_INSTRUMENTATION_GENAI_EMITTERS=span,traceloop_compat - -Alternative (explicit token if registered via entry point): - export OTEL_INSTRUMENTATION_GENAI_EMITTERS=span,traceloop_translator,traceloop_compat - (If ordering needs enforcement you can use category override, e.g. - export OTEL_INSTRUMENTATION_GENAI_EMITTERS_SPAN=prepend:TraceloopTranslator ) - -Optional: capture message content (both span + event): - export OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGES=both - -Run this example to see two spans per invocation: the semconv span and the -Traceloop-compatible span. -""" - from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import (