diff --git a/llama-index-instrumentation/pyproject.toml b/llama-index-instrumentation/pyproject.toml index ec21284208a..6e4880671e6 100644 --- a/llama-index-instrumentation/pyproject.toml +++ b/llama-index-instrumentation/pyproject.toml @@ -7,7 +7,7 @@ dev = ["pytest>=8.4.0", "pytest-asyncio>=1.0.0", "pytest-cov>=6.1.1"] [project] name = "llama-index-instrumentation" -version = "0.4.2" +version = "0.4.3" description = "Instrumentation and Observability for LlamaIndex" license = "MIT" readme = "README.md" diff --git a/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py b/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py index 7a6d21af71f..b5ba3b2a8ef 100644 --- a/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py +++ b/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py @@ -24,6 +24,8 @@ _logger = logging.getLogger(__name__) +_INSTRUMENT_TAGS_KEY = "instrument_tags" + # ContextVar for managing active instrument tags active_instrument_tags: ContextVar[Dict[str, Any]] = ContextVar( "instrument_tags", default={} @@ -115,6 +117,15 @@ def root(self) -> "Dispatcher": assert self.manager is not None return self.manager.dispatchers[self.root_name] + def _walk_span_handlers(self) -> Generator[BaseSpanHandler, None, None]: + """Yield every span handler reachable via the propagation chain.""" + c: Optional[Dispatcher] = self + while c: + yield from c.span_handlers + if not c.propagate: + break + c = c.parent + def add_event_handler(self, handler: BaseEventHandler) -> None: """Add handler to set of handlers.""" self.event_handlers += [handler] @@ -261,6 +272,73 @@ def span_exit( else: c = c.parent + def capture_propagation_context(self) -> Dict[str, Any]: + """ + Capture trace propagation context from all registered span handlers. + + Each span handler namespaces its data under its own key. The Dispatcher + also captures active instrument_tags. The returned dict can be serialized + and passed to restore_propagation_context() in another process. + """ + result: Dict[str, Any] = {} + for h in self._walk_span_handlers(): + try: + result.update(h.capture_propagation_context()) + except BaseException: + _logger.warning("Error capturing propagation context", exc_info=True) + tags = active_instrument_tags.get() + if tags: + result[_INSTRUMENT_TAGS_KEY] = dict(tags) + return result + + def restore_propagation_context(self, context: Dict[str, Any]) -> None: + """ + Restore trace propagation context on all registered span handlers. + + Also restores instrument_tags so that subsequent spans see them. + """ + for h in self._walk_span_handlers(): + try: + h.restore_propagation_context(context) + except BaseException: + _logger.warning("Error restoring propagation context", exc_info=True) + tags = context.get(_INSTRUMENT_TAGS_KEY) + if tags: + active_instrument_tags.set(dict(tags)) + + def shutdown(self) -> None: + """ + Drop all open spans and close all handlers. + + Walks the dispatcher parent chain (same as other span methods), + drops every open span on every handler, then calls close() on + each handler. Exceptions are swallowed to match existing convention. + """ + _synthetic_bound_args = inspect.signature(lambda: None).bind() + _shutdown_err = RuntimeError("dispatcher shutdown") + + for h in self._walk_span_handlers(): + # Drop all open spans — snapshot keys since span_drop mutates the dict + for span_id in list(h.open_spans.keys()): + try: + h.span_drop( + id_=span_id, + bound_args=_synthetic_bound_args, + instance=None, + err=_shutdown_err, + ) + except BaseException: + _logger.debug( + "Error dropping span %s during shutdown", + span_id, + exc_info=True, + ) + # Close the handler + try: + h.close() + except BaseException: + _logger.warning("Error closing handler %s", h, exc_info=True) + def span(self, func: Callable[..., _R]) -> Callable[..., _R]: # The `span` decorator should be idempotent. try: diff --git a/llama-index-instrumentation/src/llama_index_instrumentation/span_handlers/base.py b/llama-index-instrumentation/src/llama_index_instrumentation/span_handlers/base.py index 29cc2a63d7d..ce87254fb61 100644 --- a/llama-index-instrumentation/src/llama_index_instrumentation/span_handlers/base.py +++ b/llama-index-instrumentation/src/llama_index_instrumentation/span_handlers/base.py @@ -141,6 +141,30 @@ def span_drop( with self.lock: del self.open_spans[id_] + def capture_propagation_context(self) -> Dict[str, Any]: + """ + Capture trace propagation context for serialization across process boundaries. + + Returns a dict that can be serialized and passed to restore_propagation_context() + in another process to re-establish trace continuity. + """ + return {} + + def restore_propagation_context(self, context: Dict[str, Any]) -> None: + """ + Restore trace propagation context received from another process. + + Should be called BEFORE span_enter so that new spans parent correctly. + """ + + def close(self) -> None: + """ + Optional cleanup hook called during dispatcher shutdown. + + Subclasses can override to flush buffers, release resources, etc. + Default is a no-op. + """ + @abstractmethod def new_span( self, diff --git a/llama-index-instrumentation/tests/test_propagation.py b/llama-index-instrumentation/tests/test_propagation.py new file mode 100644 index 00000000000..aaa7bb28a10 --- /dev/null +++ b/llama-index-instrumentation/tests/test_propagation.py @@ -0,0 +1,186 @@ +from typing import Any, Dict + +from llama_index_instrumentation.dispatcher import ( + Dispatcher, + Manager, + _INSTRUMENT_TAGS_KEY, + active_instrument_tags, +) +from llama_index_instrumentation.span_handlers.simple import SimpleSpanHandler + + +class PropagatingHandler(SimpleSpanHandler): + """Handler that captures/restores a fake trace context.""" + + def capture_propagation_context(self) -> Dict[str, Any]: + return {"test_handler": {"trace_id": "abc123", "span_id": "def456"}} + + def restore_propagation_context(self, context: Dict[str, Any]) -> None: + self._restored_context = context + + +def test_capture_propagation_context_basic(): + handler = PropagatingHandler() + d = Dispatcher(span_handlers=[handler], propagate=False) + + ctx = d.capture_propagation_context() + + assert ctx["test_handler"]["trace_id"] == "abc123" + assert ctx["test_handler"]["span_id"] == "def456" + + +def test_capture_includes_instrument_tags(): + handler = SimpleSpanHandler() + d = Dispatcher(span_handlers=[handler], propagate=False) + + token = active_instrument_tags.set({"user_id": "u1", "session": "s1"}) + try: + ctx = d.capture_propagation_context() + finally: + active_instrument_tags.reset(token) + + assert ctx[_INSTRUMENT_TAGS_KEY] == {"user_id": "u1", "session": "s1"} + + +def test_capture_omits_instrument_tags_when_empty(): + handler = SimpleSpanHandler() + d = Dispatcher(span_handlers=[handler], propagate=False) + + ctx = d.capture_propagation_context() + + assert _INSTRUMENT_TAGS_KEY not in ctx + + +def test_restore_propagation_context_basic(): + handler = PropagatingHandler() + d = Dispatcher(span_handlers=[handler], propagate=False) + + context = {"test_handler": {"trace_id": "abc123"}} + d.restore_propagation_context(context) + + assert handler._restored_context == context + + +def test_restore_sets_instrument_tags(): + handler = SimpleSpanHandler() + d = Dispatcher(span_handlers=[handler], propagate=False) + + d.restore_propagation_context({_INSTRUMENT_TAGS_KEY: {"user_id": "u1"}}) + + assert active_instrument_tags.get() == {"user_id": "u1"} + # cleanup + active_instrument_tags.set({}) + + +def test_capture_walks_parent_chain(): + parent_handler = PropagatingHandler() + child_handler = SimpleSpanHandler() + + parent = Dispatcher(name="parent", span_handlers=[parent_handler], propagate=False) + child = Dispatcher( + name="child", + span_handlers=[child_handler], + propagate=True, + parent_name="parent", + ) + manager = Manager(parent) + manager.add_dispatcher(child) + child.manager = manager + parent.manager = manager + + ctx = child.capture_propagation_context() + + # Should include parent handler's context via propagation + assert "test_handler" in ctx + + +def test_restore_walks_parent_chain(): + parent_handler = PropagatingHandler() + child_handler = PropagatingHandler() + + parent = Dispatcher(name="parent", span_handlers=[parent_handler], propagate=False) + child = Dispatcher( + name="child", + span_handlers=[child_handler], + propagate=True, + parent_name="parent", + ) + manager = Manager(parent) + manager.add_dispatcher(child) + child.manager = manager + parent.manager = manager + + context = {"test_handler": {"trace_id": "xyz"}} + child.restore_propagation_context(context) + + assert child_handler._restored_context == context + assert parent_handler._restored_context == context + + +def test_capture_stops_at_propagate_false(): + parent_handler = PropagatingHandler() + child_handler = SimpleSpanHandler() + + parent = Dispatcher(name="parent", span_handlers=[parent_handler], propagate=False) + child = Dispatcher( + name="child", + span_handlers=[child_handler], + propagate=False, # does NOT propagate + parent_name="parent", + ) + manager = Manager(parent) + manager.add_dispatcher(child) + child.manager = manager + parent.manager = manager + + ctx = child.capture_propagation_context() + + # Should NOT include parent handler's context + assert "test_handler" not in ctx + + +def test_roundtrip_capture_restore(): + """Capture from one dispatcher, restore on another — simulates cross-process.""" + source_handler = PropagatingHandler() + source = Dispatcher(span_handlers=[source_handler], propagate=False) + + token = active_instrument_tags.set({"env": "prod"}) + try: + ctx = source.capture_propagation_context() + finally: + active_instrument_tags.reset(token) + + dest_handler = PropagatingHandler() + dest = Dispatcher(span_handlers=[dest_handler], propagate=False) + + dest.restore_propagation_context(ctx) + + assert dest_handler._restored_context == ctx + assert active_instrument_tags.get() == {"env": "prod"} + # cleanup + active_instrument_tags.set({}) + + +def test_capture_swallows_handler_exceptions(): + class BrokenHandler(SimpleSpanHandler): + def capture_propagation_context(self) -> Dict[str, Any]: + raise RuntimeError("boom") + + handler = BrokenHandler() + d = Dispatcher(span_handlers=[handler], propagate=False) + + # Should not raise + ctx = d.capture_propagation_context() + assert isinstance(ctx, dict) + + +def test_restore_swallows_handler_exceptions(): + class BrokenHandler(SimpleSpanHandler): + def restore_propagation_context(self, context: Dict[str, Any]) -> None: + raise RuntimeError("boom") + + handler = BrokenHandler() + d = Dispatcher(span_handlers=[handler], propagate=False) + + # Should not raise + d.restore_propagation_context({"some": "data"}) diff --git a/llama-index-instrumentation/tests/test_shutdown.py b/llama-index-instrumentation/tests/test_shutdown.py new file mode 100644 index 00000000000..b059f04dddc --- /dev/null +++ b/llama-index-instrumentation/tests/test_shutdown.py @@ -0,0 +1,89 @@ +import inspect +from unittest.mock import MagicMock + +from llama_index_instrumentation.dispatcher import Dispatcher, Manager +from llama_index_instrumentation.span_handlers.simple import SimpleSpanHandler + + +def _make_bound_args(): + return inspect.signature(lambda: None).bind() + + +def test_shutdown_drops_all_open_spans(): + handler = SimpleSpanHandler() + d = Dispatcher(span_handlers=[handler], propagate=False) + + # Open some spans + for i in range(3): + handler.span_enter( + id_=f"span-{i}", + bound_args=_make_bound_args(), + parent_id=None, + ) + + assert len(handler.open_spans) == 3 + + d.shutdown() + + assert len(handler.open_spans) == 0 + assert len(handler.dropped_spans) == 3 + + +def test_shutdown_calls_close_on_handlers(): + close_mock = MagicMock() + + class TrackingHandler(SimpleSpanHandler): + def close(self) -> None: + close_mock() + + handler = TrackingHandler() + d = Dispatcher(span_handlers=[handler], propagate=False) + + d.shutdown() + + close_mock.assert_called_once() + + +def test_shutdown_walks_parent_chain(): + parent_handler = SimpleSpanHandler() + child_handler = SimpleSpanHandler() + + parent = Dispatcher(name="parent", span_handlers=[parent_handler], propagate=False) + child = Dispatcher( + name="child", + span_handlers=[child_handler], + propagate=True, + parent_name="parent", + ) + manager = Manager(parent) + manager.add_dispatcher(child) + child.manager = manager + parent.manager = manager + + # Open spans on both handlers + parent_handler.span_enter(id_="p-span", bound_args=_make_bound_args()) + child_handler.span_enter(id_="c-span", bound_args=_make_bound_args()) + + child.shutdown() + + assert len(parent_handler.open_spans) == 0 + assert len(child_handler.open_spans) == 0 + assert len(parent_handler.dropped_spans) == 1 + assert len(child_handler.dropped_spans) == 1 + + +def test_shutdown_is_idempotent(): + handler = SimpleSpanHandler() + d = Dispatcher(span_handlers=[handler], propagate=False) + + handler.span_enter(id_="span-1", bound_args=_make_bound_args()) + d.shutdown() + d.shutdown() # should not error + + assert len(handler.open_spans) == 0 + assert len(handler.dropped_spans) == 1 + + +def test_close_default_is_noop(): + handler = SimpleSpanHandler() + handler.close() # should not raise diff --git a/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py b/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py index 3676ba9b333..eb1104ed9ba 100644 --- a/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py +++ b/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py @@ -1,16 +1,17 @@ import inspect +import logging from datetime import datetime from typing import Any, Dict, List, Literal, Mapping, Optional, Sequence, Union, cast import llama_index_instrumentation as instrument -from llama_index.observability.otel.utils import flatten_dict +from llama_index.observability.otel.utils import _is_otel_supported_type, flatten_dict from llama_index_instrumentation.base.event import BaseEvent from llama_index_instrumentation.event_handlers import BaseEventHandler from llama_index_instrumentation.span import SimpleSpan, active_span_id from llama_index_instrumentation.span_handlers.simple import SimpleSpanHandler -from opentelemetry import context, trace +from opentelemetry import context, propagate, trace from opentelemetry.sdk.resources import SERVICE_NAME, Resource -from opentelemetry.sdk.trace import SpanProcessor, TracerProvider, _Span +from opentelemetry.sdk.trace import SpanProcessor, TracerProvider from opentelemetry.sdk.trace.export import ( BatchSpanProcessor, ConsoleSpanExporter, @@ -21,6 +22,8 @@ from pydantic import BaseModel, ConfigDict, Field, PrivateAttr from termcolor.termcolor import cprint +_logger = logging.getLogger(__name__) + class OTelEventAttributes(BaseModel): name: str @@ -45,10 +48,11 @@ class OTelCompatibleSpanHandler(SimpleSpanHandler): """OpenTelemetry-compatible span handler.""" _tracer: trace.Tracer = PrivateAttr() + _tracer_provider: Optional[TracerProvider] = PrivateAttr(default=None) _events_by_span: Dict[str, List[OTelEventAttributes]] = PrivateAttr( default_factory=dict, ) - all_spans: Dict[str, Union[trace.Span, _Span]] = Field( + all_spans: Dict[str, trace.Span] = Field( default_factory=dict, description="All the registered OpenTelemetry spans." ) debug: bool = Field( @@ -64,6 +68,7 @@ def __init__( completed_spans: Optional[List[SimpleSpan]] = None, dropped_spans: Optional[List[SimpleSpan]] = None, current_span_ids: Optional[Dict[Any, str]] = None, + tracer_provider: Optional[TracerProvider] = None, ): super().__init__( open_spans=open_spans or {}, @@ -72,14 +77,46 @@ def __init__( current_span_ids=cast(Dict[str, Any], current_span_ids or {}), ) self._tracer = tracer + self._tracer_provider = tracer_provider self._events_by_span = {} self.debug = debug + def close(self) -> None: + """Flush and shut down the OTel tracer provider.""" + provider = self._tracer_provider + if provider is None: + # Fall back to the global tracer provider + global_provider = trace.get_tracer_provider() + if isinstance(global_provider, TracerProvider): + provider = global_provider + if provider is not None: + try: + provider.shutdown() + except BaseException: + _logger.warning("Error shutting down tracer provider", exc_info=True) + @classmethod def class_name(cls) -> str: # type: ignore """Class name.""" return "OTelCompatibleSpanHandler" + _PROPAGATION_KEY = "otel" + + def capture_propagation_context(self) -> Dict[str, Any]: + """Serialize the current OTel trace context for cross-process propagation.""" + carrier: Dict[str, str] = {} + propagate.inject(carrier) + if carrier: + return {self._PROPAGATION_KEY: carrier} + return {} + + def restore_propagation_context(self, ctx: Dict[str, Any]) -> None: + """Restore OTel trace context from a serialized carrier dict.""" + carrier = ctx.get(self._PROPAGATION_KEY) + if carrier: + restored = propagate.extract(carrier) + context.attach(restored) + def new_span( self, id_: str, @@ -92,16 +129,32 @@ def new_span( span = super().new_span( id_, bound_args, instance, parent_span_id, tags, **kwargs ) - if parent_span_id is not None: + + # Strip UUID suffix from span name for clean grouping + span_name = id_.partition("-")[0] + + # Resolve parent context: + # 1. Same-process parent found in all_spans → use it + # 2. Otherwise → use ambient OTel context (set by restore_propagation_context + # for cross-process, or inherited naturally for same-process roots) + if parent_span_id is not None and parent_span_id in self.all_spans: ctx = set_span_in_context(span=self.all_spans[parent_span_id]) else: ctx = context.get_current() - ctx.update(bound_args.arguments) - otel_span = self._tracer.start_span(name=id_, context=ctx) + + otel_span = self._tracer.start_span(name=span_name, context=ctx) self.all_spans.update({id_: otel_span}) + + # Record instrument_tags as span attributes + if tags is not None: + for key, value in tags.items(): + if _is_otel_supported_type(value): + attr_key = key if "." in key else f"llamaindex.{key}" + otel_span.set_attribute(attr_key, value) + if self.debug: cprint( - f"Emitting span {id_} at time: {datetime.now()}", + f"Emitting span {span_name} at time: {datetime.now()}", color="yellow", attrs=["bold"], ) @@ -122,7 +175,10 @@ def prepare_to_exit_span( attrs=["bold"], ) sp = super().prepare_to_exit_span(id_, bound_args, instance, result, **kwargs) - span = self.all_spans.pop(id_) + span = self.all_spans.pop(id_, None) + if span is None: + _logger.warning("No OTel span found for %s in prepare_to_exit_span", id_) + return sp # Get and process events specific to this span events = self._events_by_span.pop(id_, []) @@ -148,13 +204,18 @@ def prepare_to_drop_span( attrs=["bold"], ) sp = super().prepare_to_drop_span(id_, bound_args, instance, err, **kwargs) - span = self.all_spans.pop(id_) + span = self.all_spans.pop(id_, None) + if span is None: + _logger.warning("No OTel span found for %s in prepare_to_drop_span", id_) + return sp # Get and process events specific to this span events = self._events_by_span.pop(id_, []) for event in events: span.add_event(name=event.name, attributes=event.attributes) + if err is not None: + span.record_exception(err) span.set_status(status=trace.StatusCode.ERROR, description=err.__str__()) span.end() return sp @@ -247,6 +308,7 @@ class LlamaIndexOpenTelemetry(BaseModel): description="Debug the start and end of span and the recording of events", ) _tracer: Optional[trace.Tracer] = PrivateAttr(default=None) + _tracer_provider_instance: Optional[TracerProvider] = PrivateAttr(default=None) def _start_otel( self, @@ -270,6 +332,7 @@ def _start_otel( tracer_provider.add_span_processor(extra_span_processor) tracer_provider.add_span_processor(span_processor) trace.set_tracer_provider(tracer_provider) + self._tracer_provider_instance = tracer_provider self._tracer = trace.get_tracer("llamaindex.opentelemetry.tracer") def start_registering( @@ -284,6 +347,7 @@ def start_registering( span_handler = OTelCompatibleSpanHandler( tracer=self._tracer, debug=self.debug, + tracer_provider=self._tracer_provider_instance, ) dispatcher.add_span_handler(span_handler) dispatcher.add_event_handler( diff --git a/llama-index-integrations/observability/llama-index-observability-otel/pyproject.toml b/llama-index-integrations/observability/llama-index-observability-otel/pyproject.toml index 1d8b1b44dbf..8a9a62a5b26 100644 --- a/llama-index-integrations/observability/llama-index-observability-otel/pyproject.toml +++ b/llama-index-integrations/observability/llama-index-observability-otel/pyproject.toml @@ -26,7 +26,7 @@ dev = [ [project] name = "llama-index-observability-otel" -version = "0.5.0" +version = "0.5.1" description = "llama-index observability integration with OpenTelemetry" authors = [{name = "Clelia Astra Bertelli", email = "clelia@runllama.ai"}] requires-python = ">=3.9,<4.0" @@ -37,7 +37,7 @@ dependencies = [ "opentelemetry-api>=1.33.0,<2", "opentelemetry-semantic-conventions>=0.54b0,<1", "termcolor>=3.1.0,<4", - "llama-index-instrumentation>=0.4.2,<0.5", + "llama-index-instrumentation>=0.4.3,<0.5", ] [tool.codespell] diff --git a/llama-index-integrations/observability/llama-index-observability-otel/tests/test_otel.py b/llama-index-integrations/observability/llama-index-observability-otel/tests/test_otel.py index ac4d3cb69a2..67ee9c434df 100644 --- a/llama-index-integrations/observability/llama-index-observability-otel/tests/test_otel.py +++ b/llama-index-integrations/observability/llama-index-observability-otel/tests/test_otel.py @@ -188,9 +188,6 @@ def instrumented_fn(arg1: str, arg2: int) -> str: assert isinstance(span, MockSpan) assert span.dict_context is not None assert span.dict_context.get("hello") == "world" - # ensure preservation of the bound arguments - assert span.dict_context.get("arg1") == "hello" - assert span.dict_context.get("arg2") == 1 def test_context_inheritance_empty_context() -> None: @@ -219,9 +216,342 @@ def instrumented_fn(arg1: str, arg2: int) -> str: assert isinstance(span, MockSpan) assert span.dict_context is not None assert span.dict_context.get("hello") is None - # ensure preservation of the bound arguments - assert span.dict_context.get("arg1") == "hello" - assert span.dict_context.get("arg2") == 1 + + +## --------------------------------------------------------------------------- +# Integration tests using InMemorySpanExporter +# --------------------------------------------------------------------------- + +from opentelemetry.sdk.trace.export import ( + SimpleSpanProcessor as _SimpleSpanProcessor, + SpanExporter, + SpanExportResult, +) + + +class InMemorySpanExporter(SpanExporter): + """Minimal in-memory exporter for testing.""" + + def __init__(self): + self._spans = [] + + def export(self, spans): + self._spans.extend(spans) + return SpanExportResult.SUCCESS + + def get_finished_spans(self): + return list(self._spans) + + def shutdown(self): + pass + + +def _fn(arg1: str = "hello") -> str: + return arg1 + + +_bound = inspect.BoundArguments( + signature=inspect.signature(_fn), + arguments=OrderedDict({"arg1": "hello"}), +) + + +def make_handler(): + exporter = InMemorySpanExporter() + provider = TracerProvider() + provider.add_span_processor(_SimpleSpanProcessor(exporter)) + tracer = provider.get_tracer("test") + handler = OTelCompatibleSpanHandler(tracer=tracer) + return exporter, handler, provider + + +def test_span_name_strips_uuid() -> None: + exporter, handler, provider = make_handler() + handler.span_enter(id_="MyWorkflow.run-abc123-def", bound_args=_bound) + handler.span_exit(id_="MyWorkflow.run-abc123-def", bound_args=_bound) + provider.force_flush() + spans = exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "MyWorkflow.run" + + +def test_missing_parent_no_crash() -> None: + exporter, handler, provider = make_handler() + handler.span_enter( + id_="span1-uuid", + bound_args=_bound, + parent_id="nonexistent-123", + ) + handler.span_exit(id_="span1-uuid", bound_args=_bound) + provider.force_flush() + spans = exporter.get_finished_spans() + assert len(spans) == 1 + + +def test_error_records_exception() -> None: + exporter, handler, provider = make_handler() + handler.span_enter(id_="err-span-uuid", bound_args=_bound) + err = ValueError("boom") + handler.span_drop(id_="err-span-uuid", bound_args=_bound, err=err) + provider.force_flush() + spans = exporter.get_finished_spans() + assert len(spans) == 1 + events = spans[0].events + exception_events = [e for e in events if e.name == "exception"] + assert len(exception_events) >= 1 + assert exception_events[0].attributes["exception.message"] == "boom" + + +def test_tags_recorded_as_attributes() -> None: + exporter, handler, provider = make_handler() + handler.span_enter( + id_="tag-span-uuid", + bound_args=_bound, + tags={ + "handler_id": "h1", + "run_id": "r1", + "myapp.custom": "user_val", + }, + ) + handler.span_exit(id_="tag-span-uuid", bound_args=_bound) + provider.force_flush() + spans = exporter.get_finished_spans() + assert len(spans) == 1 + attrs = dict(spans[0].attributes) + assert attrs["llamaindex.handler_id"] == "h1" + assert attrs["llamaindex.run_id"] == "r1" + # Dotted keys pass through without prefix + assert attrs["myapp.custom"] == "user_val" + + +def test_non_otel_types_skipped_in_tags() -> None: + exporter, handler, provider = make_handler() + handler.span_enter( + id_="type-span-uuid", + bound_args=_bound, + tags={ + "good": "yes", + "bad_dict": {"nested": True}, + "bad_none": None, + }, + ) + handler.span_exit(id_="type-span-uuid", bound_args=_bound) + provider.force_flush() + spans = exporter.get_finished_spans() + assert len(spans) == 1 + attrs = dict(spans[0].attributes) + assert attrs["llamaindex.good"] == "yes" + assert "llamaindex.bad_dict" not in attrs + assert "llamaindex.bad_none" not in attrs + + +def test_tags_not_mutated_by_new_span() -> None: + """Regression guard: new_span must not write internal keys into the tags dict.""" + exporter, handler, provider = make_handler() + tags: dict = {"user_key": "user_val"} + original_tags = dict(tags) + handler.span_enter(id_="root-span-uuid", bound_args=_bound, tags=tags) + handler.span_exit(id_="root-span-uuid", bound_args=_bound) + provider.force_flush() + # Tags should be unchanged — no _otel_traceparent or other internal keys injected + assert tags == original_tags + + +def test_capture_propagation_context() -> None: + """capture_propagation_context returns a dict with traceparent when a span is active.""" + exporter, handler, provider = make_handler() + # Create a span so there's an active trace context + handler.span_enter(id_="root-uuid", bound_args=_bound) + # Activate the OTel span in the current context so capture can see it + from opentelemetry.trace import set_span_in_context + + otel_span = handler.all_spans["root-uuid"] + ctx = set_span_in_context(otel_span) + context.attach(ctx) + + captured = handler.capture_propagation_context() + assert "otel" in captured + assert captured["otel"]["traceparent"].startswith("00-") + + handler.span_exit(id_="root-uuid", bound_args=_bound) + provider.force_flush() + + +def test_capture_restore_propagation_roundtrip() -> None: + """ + Full roundtrip: capture context in process A, restore in process B. + + Verifies: + - trace_id continuity (child belongs to same trace as parent) + - parent_span_id linkage (child's parent points to root span) + - tags become span attributes on both sides independently + - tags dict is not mutated by either side + - externally-set OTel context (e.g. baggage-like ambient values) propagates + through the traceparent mechanism + """ + from opentelemetry.trace import set_span_in_context + + # --- Process A: create root span with tags, capture context --- + exporter_a, handler_a, provider_a = make_handler() + + tags_a = {"handler_id": "h1", "run_id": "r1", "myapp.custom": "val_a"} + original_tags_a = dict(tags_a) + handler_a.span_enter(id_="root-uuid", bound_args=_bound, tags=tags_a) + + # Activate the OTel span in ambient context (simulating what the Dispatcher + # would do before a serialization boundary) + root_otel_span = handler_a.all_spans["root-uuid"] + context.attach(set_span_in_context(root_otel_span)) + + # Capture propagation context — this is what gets serialized across the boundary + captured_ctx = handler_a.capture_propagation_context() + assert "otel" in captured_ctx + assert captured_ctx["otel"]["traceparent"].startswith("00-") + + # Tags were NOT mutated by capture or span creation + assert tags_a == original_tags_a + + # Finish root span + handler_a.span_exit(id_="root-uuid", bound_args=_bound) + provider_a.force_flush() + root_spans = exporter_a.get_finished_spans() + assert len(root_spans) == 1 + root_span = root_spans[0] + root_trace_id = root_span.context.trace_id + root_span_id = root_span.context.span_id + + # Root span has tags as attributes + root_attrs = dict(root_span.attributes) + assert root_attrs["llamaindex.handler_id"] == "h1" + assert root_attrs["llamaindex.run_id"] == "r1" + assert root_attrs["myapp.custom"] == "val_a" + + # --- Process B: restore context, create child span with its own tags --- + exporter_b, handler_b, provider_b = make_handler() + + # Clean ambient context (simulating a fresh process) + context.attach(context.Context()) + + # Restore the captured propagation context + handler_b.restore_propagation_context(captured_ctx) + + # Create child span — parent_id references a span not in this handler's all_spans, + # so it falls back to ambient OTel context (which we just restored) + tags_b = {"handler_id": "h2", "run_id": "r2"} + original_tags_b = dict(tags_b) + handler_b.span_enter( + id_="child-uuid", + bound_args=_bound, + parent_id="gone-span", + tags=tags_b, + ) + handler_b.span_exit(id_="child-uuid", bound_args=_bound) + provider_b.force_flush() + + child_spans = exporter_b.get_finished_spans() + assert len(child_spans) == 1 + child_span = child_spans[0] + + # Trace continuity: same trace_id + assert child_span.context.trace_id == root_trace_id + + # Parent linkage: child's parent is the root span + assert child_span.parent is not None + assert child_span.parent.span_id == root_span_id + assert child_span.parent.trace_id == root_trace_id + + # Child has its own tags as attributes (independent of process A's tags) + child_attrs = dict(child_span.attributes) + assert child_attrs["llamaindex.handler_id"] == "h2" + assert child_attrs["llamaindex.run_id"] == "r2" + # Process A's tags are NOT on the child span + assert "myapp.custom" not in child_attrs + + # Tags were NOT mutated by span creation on either side + assert tags_b == original_tags_b + + +def test_dispatcher_propagation_roundtrip_with_tags() -> None: + """ + Full Dispatcher-level roundtrip: trace context + instrument_tags propagate together. + + Simulates: process A runs a workflow step with run_id tag, + captures context, process B restores it and creates a child span + that inherits the trace AND sees the tags. + """ + from llama_index_instrumentation.dispatcher import ( + Dispatcher, + Manager, + _INSTRUMENT_TAGS_KEY, + active_instrument_tags, + instrument_tags, + ) + from opentelemetry.trace import set_span_in_context + + exporter_a, handler_a, provider_a = make_handler() + exporter_b, handler_b, provider_b = make_handler() + + # Set up two dispatchers (simulating two processes, each with their own handler) + dispatcher_a = Dispatcher( + name="process_a", span_handlers=[handler_a], propagate=False + ) + dispatcher_a.manager = Manager(dispatcher_a) + + dispatcher_b = Dispatcher( + name="process_b", span_handlers=[handler_b], propagate=False + ) + dispatcher_b.manager = Manager(dispatcher_b) + + # --- Process A: run with tags, capture context --- + with instrument_tags({"run_id": "run-123", "handler_id": "wf-abc"}): + dispatcher_a.span_enter( + id_="root-uuid", bound_args=_bound, tags=active_instrument_tags.get() + ) + + # Activate OTel span in ambient context + root_otel_span = handler_a.all_spans["root-uuid"] + context.attach(set_span_in_context(root_otel_span)) + + captured = dispatcher_a.capture_propagation_context() + + # Verify captured structure has both namespaces + assert "otel" in captured + assert _INSTRUMENT_TAGS_KEY in captured + assert captured[_INSTRUMENT_TAGS_KEY]["run_id"] == "run-123" + assert captured[_INSTRUMENT_TAGS_KEY]["handler_id"] == "wf-abc" + + dispatcher_a.span_exit(id_="root-uuid", bound_args=_bound) + provider_a.force_flush() + root_span = exporter_a.get_finished_spans()[0] + + # --- Process B: restore context, create child --- + context.attach(context.Context()) # clean slate + dispatcher_b.restore_propagation_context(captured) + + # instrument_tags should now be active + restored_tags = active_instrument_tags.get() + assert restored_tags["run_id"] == "run-123" + assert restored_tags["handler_id"] == "wf-abc" + + dispatcher_b.span_enter( + id_="child-uuid", + bound_args=_bound, + parent_id="gone-span", + tags=restored_tags, + ) + dispatcher_b.span_exit(id_="child-uuid", bound_args=_bound) + provider_b.force_flush() + + child_span = exporter_b.get_finished_spans()[0] + + # Trace continuity + assert child_span.context.trace_id == root_span.context.trace_id + assert child_span.parent.span_id == root_span.context.span_id + + # Tags became attributes on the child span + child_attrs = dict(child_span.attributes) + assert child_attrs["llamaindex.run_id"] == "run-123" + assert child_attrs["llamaindex.handler_id"] == "wf-abc" def test_flatten_dict() -> None: diff --git a/llama-index-integrations/observability/llama-index-observability-otel/uv.lock b/llama-index-integrations/observability/llama-index-observability-otel/uv.lock index 488b91a3e92..3ab7587251b 100644 --- a/llama-index-integrations/observability/llama-index-observability-otel/uv.lock +++ b/llama-index-integrations/observability/llama-index-observability-otel/uv.lock @@ -1223,7 +1223,7 @@ wheels = [ [[package]] name = "llama-index-observability-otel" -version = "0.2.2" +version = "0.5.1" source = { editable = "." } dependencies = [ { name = "llama-index-instrumentation" }, @@ -1258,7 +1258,7 @@ dev = [ [package.metadata] requires-dist = [ - { name = "llama-index-instrumentation", specifier = ">=0.4.2" }, + { name = "llama-index-instrumentation", specifier = ">=0.4.3,<0.5" }, { name = "opentelemetry-api", specifier = ">=1.33.0,<2" }, { name = "opentelemetry-sdk", specifier = ">=1.33.0,<2" }, { name = "opentelemetry-semantic-conventions", specifier = ">=0.54b0,<1" },