From 689587f3c7ea783ca121a2a67c3b511cb495a98a Mon Sep 17 00:00:00 2001 From: Adrian Lyjak Date: Fri, 27 Feb 2026 14:33:33 -0500 Subject: [PATCH 1/9] otel attributes and pass through --- .../llama_index/observability/otel/base.py | 68 ++++++- .../tests/test_otel.py | 167 ++++++++++++++++++ .../llama-index-observability-otel/uv.lock | 4 +- 3 files changed, 230 insertions(+), 9 deletions(-) diff --git a/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py b/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py index 3676ba9b333..92cd1074a9d 100644 --- a/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py +++ b/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py @@ -8,7 +8,7 @@ from llama_index_instrumentation.event_handlers import BaseEventHandler from llama_index_instrumentation.span import SimpleSpan, active_span_id from llama_index_instrumentation.span_handlers.simple import SimpleSpanHandler -from opentelemetry import context, trace +from opentelemetry import context, propagate, trace from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import SpanProcessor, TracerProvider, _Span from opentelemetry.sdk.trace.export import ( @@ -80,6 +80,9 @@ def class_name(cls) -> str: # type: ignore """Class name.""" return "OTelCompatibleSpanHandler" + # Keys in the tags dict that are internal and should not be recorded as attributes + _INTERNAL_TAG_KEYS = frozenset({"parent_span_id", "_otel_traceparent"}) + def new_span( self, id_: str, @@ -92,16 +95,53 @@ def new_span( span = super().new_span( id_, bound_args, instance, parent_span_id, tags, **kwargs ) - if parent_span_id is not None: + + # Phase 1: Strip UUID suffix from span name for clean grouping + span_name = id_.partition("-")[0] + + # Phase 1: Resolve parent context with graceful fallback + is_root_like = True + if parent_span_id is not None and parent_span_id in self.all_spans: ctx = set_span_in_context(span=self.all_spans[parent_span_id]) - else: + is_root_like = False + elif ( + parent_span_id is not None + and tags is not None + and "_otel_traceparent" in tags + ): + # Phase 3: Recovery case — restore trace context from serialized traceparent + carrier = {"traceparent": tags["_otel_traceparent"]} + ctx = propagate.extract(carrier) + elif parent_span_id is None: ctx = context.get_current() ctx.update(bound_args.arguments) - otel_span = self._tracer.start_span(name=id_, context=ctx) + else: + # Parent referenced but not found and no traceparent — use ambient context + ctx = None + + otel_span = self._tracer.start_span(name=span_name, context=ctx) self.all_spans.update({id_: otel_span}) + + # Phase 2: Record instrument_tags as span attributes + if tags is not None: + for key, value in tags.items(): + if key in self._INTERNAL_TAG_KEYS: + continue + if isinstance(value, (str, int, float, bool)): + attr_key = key if "." in key else f"llamaindex.{key}" + otel_span.set_attribute(attr_key, value) + + # Phase 3: Capture traceparent on root-like spans for recovery + if is_root_like and isinstance(otel_span, _Span): + inject_carrier: Dict[str, str] = {} + ctx_with_span = set_span_in_context(otel_span) + propagate.inject(inject_carrier, context=ctx_with_span) + if "traceparent" in inject_carrier and tags is not None: + tags["_otel_traceparent"] = inject_carrier["traceparent"] + if self.debug: cprint( - f"Emitting span {id_} at time: {datetime.now()}", + f"Emitting span {span_name} at time: {datetime.now()}", color="yellow", attrs=["bold"], ) @@ -122,7 +162,13 @@ def prepare_to_exit_span( attrs=["bold"], ) sp = super().prepare_to_exit_span(id_, bound_args, instance, result, **kwargs) - span = self.all_spans.pop(id_) + span = self.all_spans.pop(id_, None) + if span is None: + cprint( + f"WARNING: no OTel span found for {id_} in prepare_to_exit_span", + color="red", + ) + return sp # Get and process events specific to this span events = self._events_by_span.pop(id_, []) @@ -148,13 +194,21 @@ def prepare_to_drop_span( attrs=["bold"], ) sp = super().prepare_to_drop_span(id_, bound_args, instance, err, **kwargs) - span = self.all_spans.pop(id_) + span = self.all_spans.pop(id_, None) + if span is None: + cprint( + f"WARNING: no OTel span found for {id_} in prepare_to_drop_span", + color="red", + ) + return sp # Get and process events specific to this span events = self._events_by_span.pop(id_, []) for event in events: span.add_event(name=event.name, attributes=event.attributes) + if err is not None: + span.record_exception(err) span.set_status(status=trace.StatusCode.ERROR, description=err.__str__()) span.end() return sp diff --git a/llama-index-integrations/observability/llama-index-observability-otel/tests/test_otel.py b/llama-index-integrations/observability/llama-index-observability-otel/tests/test_otel.py index ac4d3cb69a2..785aa70cefd 100644 --- a/llama-index-integrations/observability/llama-index-observability-otel/tests/test_otel.py +++ b/llama-index-integrations/observability/llama-index-observability-otel/tests/test_otel.py @@ -224,6 +224,173 @@ def instrumented_fn(arg1: str, arg2: int) -> str: assert span.dict_context.get("arg2") == 1 +## --------------------------------------------------------------------------- +# Integration tests using InMemorySpanExporter +# --------------------------------------------------------------------------- + +from opentelemetry.sdk.trace.export import ( + SimpleSpanProcessor as _SimpleSpanProcessor, + SpanExporter, + SpanExportResult, +) + + +class InMemorySpanExporter(SpanExporter): + """Minimal in-memory exporter for testing.""" + + def __init__(self): + self._spans = [] + + def export(self, spans): + self._spans.extend(spans) + return SpanExportResult.SUCCESS + + def get_finished_spans(self): + return list(self._spans) + + def shutdown(self): + pass + + +def _fn(arg1: str = "hello") -> str: + return arg1 + + +_bound = inspect.BoundArguments( + signature=inspect.signature(_fn), + arguments=OrderedDict({"arg1": "hello"}), +) + + +def make_handler(): + exporter = InMemorySpanExporter() + provider = TracerProvider() + provider.add_span_processor(_SimpleSpanProcessor(exporter)) + tracer = provider.get_tracer("test") + handler = OTelCompatibleSpanHandler(tracer=tracer) + return exporter, handler, provider + + +def test_span_name_strips_uuid() -> None: + exporter, handler, provider = make_handler() + handler.span_enter(id_="MyWorkflow.run-abc123-def", bound_args=_bound) + handler.span_exit(id_="MyWorkflow.run-abc123-def", bound_args=_bound) + provider.force_flush() + spans = exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "MyWorkflow.run" + + +def test_missing_parent_no_crash() -> None: + exporter, handler, provider = make_handler() + handler.span_enter( + id_="span1-uuid", + bound_args=_bound, + parent_id="nonexistent-123", + ) + handler.span_exit(id_="span1-uuid", bound_args=_bound) + provider.force_flush() + spans = exporter.get_finished_spans() + assert len(spans) == 1 + + +def test_error_records_exception() -> None: + exporter, handler, provider = make_handler() + handler.span_enter(id_="err-span-uuid", bound_args=_bound) + err = ValueError("boom") + handler.span_drop(id_="err-span-uuid", bound_args=_bound, err=err) + provider.force_flush() + spans = exporter.get_finished_spans() + assert len(spans) == 1 + events = spans[0].events + exception_events = [e for e in events if e.name == "exception"] + assert len(exception_events) >= 1 + assert exception_events[0].attributes["exception.message"] == "boom" + + +def test_tags_recorded_as_attributes() -> None: + exporter, handler, provider = make_handler() + handler.span_enter( + id_="tag-span-uuid", + bound_args=_bound, + tags={ + "handler_id": "h1", + "run_id": "r1", + "myapp.custom": "user_val", + "parent_span_id": "skip_me", + "_otel_traceparent": "skip_too", + }, + ) + handler.span_exit(id_="tag-span-uuid", bound_args=_bound) + provider.force_flush() + spans = exporter.get_finished_spans() + assert len(spans) == 1 + attrs = dict(spans[0].attributes) + assert attrs["llamaindex.handler_id"] == "h1" + assert attrs["llamaindex.run_id"] == "r1" + # Dotted keys pass through without prefix + assert attrs["myapp.custom"] == "user_val" + assert "llamaindex.parent_span_id" not in attrs + assert "llamaindex._otel_traceparent" not in attrs + + +def test_non_otel_types_skipped_in_tags() -> None: + exporter, handler, provider = make_handler() + handler.span_enter( + id_="type-span-uuid", + bound_args=_bound, + tags={ + "good": "yes", + "bad_dict": {"nested": True}, + "bad_none": None, + }, + ) + handler.span_exit(id_="type-span-uuid", bound_args=_bound) + provider.force_flush() + spans = exporter.get_finished_spans() + assert len(spans) == 1 + attrs = dict(spans[0].attributes) + assert attrs["llamaindex.good"] == "yes" + assert "llamaindex.bad_dict" not in attrs + assert "llamaindex.bad_none" not in attrs + + +def test_traceparent_injected_on_root_span() -> None: + exporter, handler, provider = make_handler() + tags: dict = {} + handler.span_enter(id_="root-span-uuid", bound_args=_bound, tags=tags) + assert "_otel_traceparent" in tags + assert tags["_otel_traceparent"].startswith("00-") + handler.span_exit(id_="root-span-uuid", bound_args=_bound) + provider.force_flush() + + +def test_traceparent_restored_on_recovery() -> None: + # Create first handler + root span to get a traceparent + exporter1, handler1, provider1 = make_handler() + tags: dict = {} + handler1.span_enter(id_="root-uuid", bound_args=_bound, tags=tags) + captured_tp = tags["_otel_traceparent"] + handler1.span_exit(id_="root-uuid", bound_args=_bound) + provider1.force_flush() + root_trace_id = exporter1.get_finished_spans()[0].context.trace_id + + # New handler — simulates recovery in a different process/context + exporter2, handler2, provider2 = make_handler() + handler2.span_enter( + id_="child-uuid", + bound_args=_bound, + parent_id="gone-span", + tags={"_otel_traceparent": captured_tp}, + ) + handler2.span_exit(id_="child-uuid", bound_args=_bound) + provider2.force_flush() + child_spans = exporter2.get_finished_spans() + assert len(child_spans) == 1 + # The child span should belong to the same trace as the root + assert child_spans[0].context.trace_id == root_trace_id + + def test_flatten_dict() -> None: nested_dict = {"a": 1, "b": {"c": 2, "d": {"e": 3}}} flattened = flatten_dict(nested_dict) diff --git a/llama-index-integrations/observability/llama-index-observability-otel/uv.lock b/llama-index-integrations/observability/llama-index-observability-otel/uv.lock index 488b91a3e92..7540c991a98 100644 --- a/llama-index-integrations/observability/llama-index-observability-otel/uv.lock +++ b/llama-index-integrations/observability/llama-index-observability-otel/uv.lock @@ -1223,7 +1223,7 @@ wheels = [ [[package]] name = "llama-index-observability-otel" -version = "0.2.2" +version = "0.5.0" source = { editable = "." } dependencies = [ { name = "llama-index-instrumentation" }, @@ -1258,7 +1258,7 @@ dev = [ [package.metadata] requires-dist = [ - { name = "llama-index-instrumentation", specifier = ">=0.4.2" }, + { name = "llama-index-instrumentation", specifier = ">=0.4.2,<0.5" }, { name = "opentelemetry-api", specifier = ">=1.33.0,<2" }, { name = "opentelemetry-sdk", specifier = ">=1.33.0,<2" }, { name = "opentelemetry-semantic-conventions", specifier = ">=0.54b0,<1" }, From e123b2cafb745f50f630901e5abe8082284a103f Mon Sep 17 00:00:00 2001 From: Adrian Lyjak Date: Fri, 27 Feb 2026 15:00:33 -0500 Subject: [PATCH 2/9] close --- .../llama_index_instrumentation/dispatcher.py | 39 ++++++++ .../span_handlers/base.py | 8 ++ .../tests/test_shutdown.py | 89 +++++++++++++++++++ .../llama_index/observability/otel/base.py | 24 +++++ 4 files changed, 160 insertions(+) create mode 100644 llama-index-instrumentation/tests/test_shutdown.py diff --git a/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py b/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py index 7a6d21af71f..4b47e505588 100644 --- a/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py +++ b/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py @@ -261,6 +261,45 @@ def span_exit( else: c = c.parent + def shutdown(self) -> None: + """ + Drop all open spans and close all handlers. + + Walks the dispatcher parent chain (same as other span methods), + drops every open span on every handler, then calls close() on + each handler. Exceptions are swallowed to match existing convention. + """ + _synthetic_bound_args = inspect.signature(lambda: None).bind() + _shutdown_err = RuntimeError("dispatcher shutdown") + + seen_handlers: set = set() + c: Optional[Dispatcher] = self + while c: + for h in c.span_handlers: + if id(h) in seen_handlers: + continue + seen_handlers.add(id(h)) + # Drop all open spans — snapshot keys since span_drop mutates the dict + for span_id in list(h.open_spans.keys()): + try: + h.span_drop( + id_=span_id, + bound_args=_synthetic_bound_args, + instance=None, + err=_shutdown_err, + ) + except BaseException: + pass + # Close the handler + try: + h.close() + except BaseException: + pass + if not c.propagate: + c = None + else: + c = c.parent + def span(self, func: Callable[..., _R]) -> Callable[..., _R]: # The `span` decorator should be idempotent. try: diff --git a/llama-index-instrumentation/src/llama_index_instrumentation/span_handlers/base.py b/llama-index-instrumentation/src/llama_index_instrumentation/span_handlers/base.py index 29cc2a63d7d..dabec66a9c2 100644 --- a/llama-index-instrumentation/src/llama_index_instrumentation/span_handlers/base.py +++ b/llama-index-instrumentation/src/llama_index_instrumentation/span_handlers/base.py @@ -141,6 +141,14 @@ def span_drop( with self.lock: del self.open_spans[id_] + def close(self) -> None: + """ + Optional cleanup hook called during dispatcher shutdown. + + Subclasses can override to flush buffers, release resources, etc. + Default is a no-op. + """ + @abstractmethod def new_span( self, diff --git a/llama-index-instrumentation/tests/test_shutdown.py b/llama-index-instrumentation/tests/test_shutdown.py new file mode 100644 index 00000000000..b059f04dddc --- /dev/null +++ b/llama-index-instrumentation/tests/test_shutdown.py @@ -0,0 +1,89 @@ +import inspect +from unittest.mock import MagicMock + +from llama_index_instrumentation.dispatcher import Dispatcher, Manager +from llama_index_instrumentation.span_handlers.simple import SimpleSpanHandler + + +def _make_bound_args(): + return inspect.signature(lambda: None).bind() + + +def test_shutdown_drops_all_open_spans(): + handler = SimpleSpanHandler() + d = Dispatcher(span_handlers=[handler], propagate=False) + + # Open some spans + for i in range(3): + handler.span_enter( + id_=f"span-{i}", + bound_args=_make_bound_args(), + parent_id=None, + ) + + assert len(handler.open_spans) == 3 + + d.shutdown() + + assert len(handler.open_spans) == 0 + assert len(handler.dropped_spans) == 3 + + +def test_shutdown_calls_close_on_handlers(): + close_mock = MagicMock() + + class TrackingHandler(SimpleSpanHandler): + def close(self) -> None: + close_mock() + + handler = TrackingHandler() + d = Dispatcher(span_handlers=[handler], propagate=False) + + d.shutdown() + + close_mock.assert_called_once() + + +def test_shutdown_walks_parent_chain(): + parent_handler = SimpleSpanHandler() + child_handler = SimpleSpanHandler() + + parent = Dispatcher(name="parent", span_handlers=[parent_handler], propagate=False) + child = Dispatcher( + name="child", + span_handlers=[child_handler], + propagate=True, + parent_name="parent", + ) + manager = Manager(parent) + manager.add_dispatcher(child) + child.manager = manager + parent.manager = manager + + # Open spans on both handlers + parent_handler.span_enter(id_="p-span", bound_args=_make_bound_args()) + child_handler.span_enter(id_="c-span", bound_args=_make_bound_args()) + + child.shutdown() + + assert len(parent_handler.open_spans) == 0 + assert len(child_handler.open_spans) == 0 + assert len(parent_handler.dropped_spans) == 1 + assert len(child_handler.dropped_spans) == 1 + + +def test_shutdown_is_idempotent(): + handler = SimpleSpanHandler() + d = Dispatcher(span_handlers=[handler], propagate=False) + + handler.span_enter(id_="span-1", bound_args=_make_bound_args()) + d.shutdown() + d.shutdown() # should not error + + assert len(handler.open_spans) == 0 + assert len(handler.dropped_spans) == 1 + + +def test_close_default_is_noop(): + handler = SimpleSpanHandler() + handler.close() # should not raise diff --git a/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py b/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py index 92cd1074a9d..96ee4315d16 100644 --- a/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py +++ b/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py @@ -45,6 +45,7 @@ class OTelCompatibleSpanHandler(SimpleSpanHandler): """OpenTelemetry-compatible span handler.""" _tracer: trace.Tracer = PrivateAttr() + _tracer_provider: Optional[TracerProvider] = PrivateAttr(default=None) _events_by_span: Dict[str, List[OTelEventAttributes]] = PrivateAttr( default_factory=dict, ) @@ -64,6 +65,7 @@ def __init__( completed_spans: Optional[List[SimpleSpan]] = None, dropped_spans: Optional[List[SimpleSpan]] = None, current_span_ids: Optional[Dict[Any, str]] = None, + tracer_provider: Optional[TracerProvider] = None, ): super().__init__( open_spans=open_spans or {}, @@ -72,9 +74,28 @@ def __init__( current_span_ids=cast(Dict[str, Any], current_span_ids or {}), ) self._tracer = tracer + self._tracer_provider = tracer_provider self._events_by_span = {} self.debug = debug + def close(self) -> None: + """Flush and shut down the OTel tracer provider.""" + provider = self._tracer_provider + if provider is None: + # Fall back to the global tracer provider + global_provider = trace.get_tracer_provider() + if isinstance(global_provider, TracerProvider): + provider = global_provider + if provider is not None: + try: + provider.force_flush() + except BaseException: + pass + try: + provider.shutdown() + except BaseException: + pass + @classmethod def class_name(cls) -> str: # type: ignore """Class name.""" @@ -301,6 +322,7 @@ class LlamaIndexOpenTelemetry(BaseModel): description="Debug the start and end of span and the recording of events", ) _tracer: Optional[trace.Tracer] = PrivateAttr(default=None) + _tracer_provider_instance: Optional[TracerProvider] = PrivateAttr(default=None) def _start_otel( self, @@ -324,6 +346,7 @@ def _start_otel( tracer_provider.add_span_processor(extra_span_processor) tracer_provider.add_span_processor(span_processor) trace.set_tracer_provider(tracer_provider) + self._tracer_provider_instance = tracer_provider self._tracer = trace.get_tracer("llamaindex.opentelemetry.tracer") def start_registering( @@ -338,6 +361,7 @@ def start_registering( span_handler = OTelCompatibleSpanHandler( tracer=self._tracer, debug=self.debug, + tracer_provider=self._tracer_provider_instance, ) dispatcher.add_span_handler(span_handler) dispatcher.add_event_handler( From bcbc19c274d48bbe5409e51bfcf6bd1c566ab28a Mon Sep 17 00:00:00 2001 From: Adrian Lyjak Date: Fri, 27 Feb 2026 16:52:54 -0500 Subject: [PATCH 3/9] serialize/deserialize context --- .../llama_index_instrumentation/dispatcher.py | 46 ++++ .../span_handlers/base.py | 16 ++ .../llama_index/observability/otel/base.py | 57 ++--- .../tests/test_otel.py | 226 +++++++++++++++--- 4 files changed, 281 insertions(+), 64 deletions(-) diff --git a/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py b/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py index 4b47e505588..acf88387dc0 100644 --- a/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py +++ b/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py @@ -261,6 +261,52 @@ def span_exit( else: c = c.parent + def capture_propagation_context(self) -> Dict[str, Any]: + """ + Capture trace propagation context from all registered span handlers. + + Each span handler namespaces its data under its own key. The Dispatcher + also captures active instrument_tags. The returned dict can be serialized + and passed to restore_propagation_context() in another process. + """ + result: Dict[str, Any] = {} + c: Optional[Dispatcher] = self + while c: + for h in c.span_handlers: + try: + result.update(h.capture_propagation_context()) + except BaseException: + pass + if not c.propagate: + c = None + else: + c = c.parent + tags = active_instrument_tags.get() + if tags: + result["instrument_tags"] = dict(tags) + return result + + def restore_propagation_context(self, context: Dict[str, Any]) -> None: + """ + Restore trace propagation context on all registered span handlers. + + Also restores instrument_tags so that subsequent spans see them. + """ + c: Optional[Dispatcher] = self + while c: + for h in c.span_handlers: + try: + h.restore_propagation_context(context) + except BaseException: + pass + if not c.propagate: + c = None + else: + c = c.parent + tags = context.get("instrument_tags") + if tags: + active_instrument_tags.set(dict(tags)) + def shutdown(self) -> None: """ Drop all open spans and close all handlers. diff --git a/llama-index-instrumentation/src/llama_index_instrumentation/span_handlers/base.py b/llama-index-instrumentation/src/llama_index_instrumentation/span_handlers/base.py index dabec66a9c2..ce87254fb61 100644 --- a/llama-index-instrumentation/src/llama_index_instrumentation/span_handlers/base.py +++ b/llama-index-instrumentation/src/llama_index_instrumentation/span_handlers/base.py @@ -141,6 +141,22 @@ def span_drop( with self.lock: del self.open_spans[id_] + def capture_propagation_context(self) -> Dict[str, Any]: + """ + Capture trace propagation context for serialization across process boundaries. + + Returns a dict that can be serialized and passed to restore_propagation_context() + in another process to re-establish trace continuity. + """ + return {} + + def restore_propagation_context(self, context: Dict[str, Any]) -> None: + """ + Restore trace propagation context received from another process. + + Should be called BEFORE span_enter so that new spans parent correctly. + """ + def close(self) -> None: """ Optional cleanup hook called during dispatcher shutdown. diff --git a/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py b/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py index 96ee4315d16..7c46c0679ad 100644 --- a/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py +++ b/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py @@ -10,7 +10,7 @@ from llama_index_instrumentation.span_handlers.simple import SimpleSpanHandler from opentelemetry import context, propagate, trace from opentelemetry.sdk.resources import SERVICE_NAME, Resource -from opentelemetry.sdk.trace import SpanProcessor, TracerProvider, _Span +from opentelemetry.sdk.trace import SpanProcessor, TracerProvider from opentelemetry.sdk.trace.export import ( BatchSpanProcessor, ConsoleSpanExporter, @@ -49,7 +49,7 @@ class OTelCompatibleSpanHandler(SimpleSpanHandler): _events_by_span: Dict[str, List[OTelEventAttributes]] = PrivateAttr( default_factory=dict, ) - all_spans: Dict[str, Union[trace.Span, _Span]] = Field( + all_spans: Dict[str, trace.Span] = Field( default_factory=dict, description="All the registered OpenTelemetry spans." ) debug: bool = Field( @@ -101,8 +101,22 @@ def class_name(cls) -> str: # type: ignore """Class name.""" return "OTelCompatibleSpanHandler" - # Keys in the tags dict that are internal and should not be recorded as attributes - _INTERNAL_TAG_KEYS = frozenset({"parent_span_id", "_otel_traceparent"}) + _PROPAGATION_KEY = "otel" + + def capture_propagation_context(self) -> Dict[str, Any]: + """Serialize the current OTel trace context for cross-process propagation.""" + carrier: Dict[str, str] = {} + propagate.inject(carrier) + if carrier: + return {self._PROPAGATION_KEY: carrier} + return {} + + def restore_propagation_context(self, ctx: Dict[str, Any]) -> None: + """Restore OTel trace context from a serialized carrier dict.""" + carrier = ctx.get(self._PROPAGATION_KEY) + if carrier: + restored = propagate.extract(carrier) + context.attach(restored) def new_span( self, @@ -117,49 +131,28 @@ def new_span( id_, bound_args, instance, parent_span_id, tags, **kwargs ) - # Phase 1: Strip UUID suffix from span name for clean grouping + # Strip UUID suffix from span name for clean grouping span_name = id_.partition("-")[0] - # Phase 1: Resolve parent context with graceful fallback - is_root_like = True + # Resolve parent context: + # 1. Same-process parent found in all_spans → use it + # 2. Otherwise → use ambient OTel context (set by restore_propagation_context + # for cross-process, or inherited naturally for same-process roots) if parent_span_id is not None and parent_span_id in self.all_spans: ctx = set_span_in_context(span=self.all_spans[parent_span_id]) - is_root_like = False - elif ( - parent_span_id is not None - and tags is not None - and "_otel_traceparent" in tags - ): - # Phase 3: Recovery case — restore trace context from serialized traceparent - carrier = {"traceparent": tags["_otel_traceparent"]} - ctx = propagate.extract(carrier) - elif parent_span_id is None: - ctx = context.get_current() - ctx.update(bound_args.arguments) else: - # Parent referenced but not found and no traceparent — use ambient context - ctx = None + ctx = context.get_current() otel_span = self._tracer.start_span(name=span_name, context=ctx) self.all_spans.update({id_: otel_span}) - # Phase 2: Record instrument_tags as span attributes + # Record instrument_tags as span attributes if tags is not None: for key, value in tags.items(): - if key in self._INTERNAL_TAG_KEYS: - continue if isinstance(value, (str, int, float, bool)): attr_key = key if "." in key else f"llamaindex.{key}" otel_span.set_attribute(attr_key, value) - # Phase 3: Capture traceparent on root-like spans for recovery - if is_root_like and isinstance(otel_span, _Span): - inject_carrier: Dict[str, str] = {} - ctx_with_span = set_span_in_context(otel_span) - propagate.inject(inject_carrier, context=ctx_with_span) - if "traceparent" in inject_carrier and tags is not None: - tags["_otel_traceparent"] = inject_carrier["traceparent"] - if self.debug: cprint( f"Emitting span {span_name} at time: {datetime.now()}", diff --git a/llama-index-integrations/observability/llama-index-observability-otel/tests/test_otel.py b/llama-index-integrations/observability/llama-index-observability-otel/tests/test_otel.py index 785aa70cefd..f6aadf8d6e9 100644 --- a/llama-index-integrations/observability/llama-index-observability-otel/tests/test_otel.py +++ b/llama-index-integrations/observability/llama-index-observability-otel/tests/test_otel.py @@ -188,9 +188,6 @@ def instrumented_fn(arg1: str, arg2: int) -> str: assert isinstance(span, MockSpan) assert span.dict_context is not None assert span.dict_context.get("hello") == "world" - # ensure preservation of the bound arguments - assert span.dict_context.get("arg1") == "hello" - assert span.dict_context.get("arg2") == 1 def test_context_inheritance_empty_context() -> None: @@ -219,9 +216,6 @@ def instrumented_fn(arg1: str, arg2: int) -> str: assert isinstance(span, MockSpan) assert span.dict_context is not None assert span.dict_context.get("hello") is None - # ensure preservation of the bound arguments - assert span.dict_context.get("arg1") == "hello" - assert span.dict_context.get("arg2") == 1 ## --------------------------------------------------------------------------- @@ -317,8 +311,6 @@ def test_tags_recorded_as_attributes() -> None: "handler_id": "h1", "run_id": "r1", "myapp.custom": "user_val", - "parent_span_id": "skip_me", - "_otel_traceparent": "skip_too", }, ) handler.span_exit(id_="tag-span-uuid", bound_args=_bound) @@ -330,8 +322,6 @@ def test_tags_recorded_as_attributes() -> None: assert attrs["llamaindex.run_id"] == "r1" # Dotted keys pass through without prefix assert attrs["myapp.custom"] == "user_val" - assert "llamaindex.parent_span_id" not in attrs - assert "llamaindex._otel_traceparent" not in attrs def test_non_otel_types_skipped_in_tags() -> None: @@ -355,40 +345,212 @@ def test_non_otel_types_skipped_in_tags() -> None: assert "llamaindex.bad_none" not in attrs -def test_traceparent_injected_on_root_span() -> None: +def test_tags_not_mutated_by_new_span() -> None: + """Regression guard: new_span must not write internal keys into the tags dict.""" exporter, handler, provider = make_handler() - tags: dict = {} + tags: dict = {"user_key": "user_val"} + original_tags = dict(tags) handler.span_enter(id_="root-span-uuid", bound_args=_bound, tags=tags) - assert "_otel_traceparent" in tags - assert tags["_otel_traceparent"].startswith("00-") handler.span_exit(id_="root-span-uuid", bound_args=_bound) provider.force_flush() + # Tags should be unchanged — no _otel_traceparent or other internal keys injected + assert tags == original_tags -def test_traceparent_restored_on_recovery() -> None: - # Create first handler + root span to get a traceparent - exporter1, handler1, provider1 = make_handler() - tags: dict = {} - handler1.span_enter(id_="root-uuid", bound_args=_bound, tags=tags) - captured_tp = tags["_otel_traceparent"] - handler1.span_exit(id_="root-uuid", bound_args=_bound) - provider1.force_flush() - root_trace_id = exporter1.get_finished_spans()[0].context.trace_id +def test_capture_propagation_context() -> None: + """capture_propagation_context returns a dict with traceparent when a span is active.""" + exporter, handler, provider = make_handler() + # Create a span so there's an active trace context + handler.span_enter(id_="root-uuid", bound_args=_bound) + # Activate the OTel span in the current context so capture can see it + from opentelemetry.trace import set_span_in_context + + otel_span = handler.all_spans["root-uuid"] + ctx = set_span_in_context(otel_span) + context.attach(ctx) + + captured = handler.capture_propagation_context() + assert "otel" in captured + assert captured["otel"]["traceparent"].startswith("00-") + + handler.span_exit(id_="root-uuid", bound_args=_bound) + provider.force_flush() + - # New handler — simulates recovery in a different process/context - exporter2, handler2, provider2 = make_handler() - handler2.span_enter( +def test_capture_restore_propagation_roundtrip() -> None: + """ + Full roundtrip: capture context in process A, restore in process B. + + Verifies: + - trace_id continuity (child belongs to same trace as parent) + - parent_span_id linkage (child's parent points to root span) + - tags become span attributes on both sides independently + - tags dict is not mutated by either side + - externally-set OTel context (e.g. baggage-like ambient values) propagates + through the traceparent mechanism + """ + from opentelemetry.trace import set_span_in_context + + # --- Process A: create root span with tags, capture context --- + exporter_a, handler_a, provider_a = make_handler() + + tags_a = {"handler_id": "h1", "run_id": "r1", "myapp.custom": "val_a"} + original_tags_a = dict(tags_a) + handler_a.span_enter(id_="root-uuid", bound_args=_bound, tags=tags_a) + + # Activate the OTel span in ambient context (simulating what the Dispatcher + # would do before a serialization boundary) + root_otel_span = handler_a.all_spans["root-uuid"] + context.attach(set_span_in_context(root_otel_span)) + + # Capture propagation context — this is what gets serialized across the boundary + captured_ctx = handler_a.capture_propagation_context() + assert "otel" in captured_ctx + assert captured_ctx["otel"]["traceparent"].startswith("00-") + + # Tags were NOT mutated by capture or span creation + assert tags_a == original_tags_a + + # Finish root span + handler_a.span_exit(id_="root-uuid", bound_args=_bound) + provider_a.force_flush() + root_spans = exporter_a.get_finished_spans() + assert len(root_spans) == 1 + root_span = root_spans[0] + root_trace_id = root_span.context.trace_id + root_span_id = root_span.context.span_id + + # Root span has tags as attributes + root_attrs = dict(root_span.attributes) + assert root_attrs["llamaindex.handler_id"] == "h1" + assert root_attrs["llamaindex.run_id"] == "r1" + assert root_attrs["myapp.custom"] == "val_a" + + # --- Process B: restore context, create child span with its own tags --- + exporter_b, handler_b, provider_b = make_handler() + + # Clean ambient context (simulating a fresh process) + context.attach(context.Context()) + + # Restore the captured propagation context + handler_b.restore_propagation_context(captured_ctx) + + # Create child span — parent_id references a span not in this handler's all_spans, + # so it falls back to ambient OTel context (which we just restored) + tags_b = {"handler_id": "h2", "run_id": "r2"} + original_tags_b = dict(tags_b) + handler_b.span_enter( id_="child-uuid", bound_args=_bound, parent_id="gone-span", - tags={"_otel_traceparent": captured_tp}, + tags=tags_b, ) - handler2.span_exit(id_="child-uuid", bound_args=_bound) - provider2.force_flush() - child_spans = exporter2.get_finished_spans() + handler_b.span_exit(id_="child-uuid", bound_args=_bound) + provider_b.force_flush() + + child_spans = exporter_b.get_finished_spans() assert len(child_spans) == 1 - # The child span should belong to the same trace as the root - assert child_spans[0].context.trace_id == root_trace_id + child_span = child_spans[0] + + # Trace continuity: same trace_id + assert child_span.context.trace_id == root_trace_id + + # Parent linkage: child's parent is the root span + assert child_span.parent is not None + assert child_span.parent.span_id == root_span_id + assert child_span.parent.trace_id == root_trace_id + + # Child has its own tags as attributes (independent of process A's tags) + child_attrs = dict(child_span.attributes) + assert child_attrs["llamaindex.handler_id"] == "h2" + assert child_attrs["llamaindex.run_id"] == "r2" + # Process A's tags are NOT on the child span + assert "myapp.custom" not in child_attrs + + # Tags were NOT mutated by span creation on either side + assert tags_b == original_tags_b + + +def test_dispatcher_propagation_roundtrip_with_tags() -> None: + """ + Full Dispatcher-level roundtrip: trace context + instrument_tags propagate together. + + Simulates: process A runs a workflow step with run_id tag, + captures context, process B restores it and creates a child span + that inherits the trace AND sees the tags. + """ + from llama_index_instrumentation.dispatcher import ( + Dispatcher, + Manager, + active_instrument_tags, + instrument_tags, + ) + from opentelemetry.trace import set_span_in_context + + exporter_a, handler_a, provider_a = make_handler() + exporter_b, handler_b, provider_b = make_handler() + + # Set up two dispatchers (simulating two processes, each with their own handler) + dispatcher_a = Dispatcher( + name="process_a", span_handlers=[handler_a], propagate=False + ) + dispatcher_a.manager = Manager(dispatcher_a) + + dispatcher_b = Dispatcher( + name="process_b", span_handlers=[handler_b], propagate=False + ) + dispatcher_b.manager = Manager(dispatcher_b) + + # --- Process A: run with tags, capture context --- + with instrument_tags({"run_id": "run-123", "handler_id": "wf-abc"}): + dispatcher_a.span_enter( + id_="root-uuid", bound_args=_bound, tags=active_instrument_tags.get() + ) + + # Activate OTel span in ambient context + root_otel_span = handler_a.all_spans["root-uuid"] + context.attach(set_span_in_context(root_otel_span)) + + captured = dispatcher_a.capture_propagation_context() + + # Verify captured structure has both namespaces + assert "otel" in captured + assert "instrument_tags" in captured + assert captured["instrument_tags"]["run_id"] == "run-123" + assert captured["instrument_tags"]["handler_id"] == "wf-abc" + + dispatcher_a.span_exit(id_="root-uuid", bound_args=_bound) + provider_a.force_flush() + root_span = exporter_a.get_finished_spans()[0] + + # --- Process B: restore context, create child --- + context.attach(context.Context()) # clean slate + dispatcher_b.restore_propagation_context(captured) + + # instrument_tags should now be active + restored_tags = active_instrument_tags.get() + assert restored_tags["run_id"] == "run-123" + assert restored_tags["handler_id"] == "wf-abc" + + dispatcher_b.span_enter( + id_="child-uuid", + bound_args=_bound, + parent_id="gone-span", + tags=restored_tags, + ) + dispatcher_b.span_exit(id_="child-uuid", bound_args=_bound) + provider_b.force_flush() + + child_span = exporter_b.get_finished_spans()[0] + + # Trace continuity + assert child_span.context.trace_id == root_span.context.trace_id + assert child_span.parent.span_id == root_span.context.span_id + + # Tags became attributes on the child span + child_attrs = dict(child_span.attributes) + assert child_attrs["llamaindex.run_id"] == "run-123" + assert child_attrs["llamaindex.handler_id"] == "wf-abc" def test_flatten_dict() -> None: From 1973991bb43adaed6d216098c2b62c0760efce72 Mon Sep 17 00:00:00 2001 From: Adrian Lyjak Date: Fri, 27 Feb 2026 19:48:47 -0500 Subject: [PATCH 4/9] tests --- .../tests/test_propagation.py | 190 ++++++++++++++++++ 1 file changed, 190 insertions(+) create mode 100644 llama-index-instrumentation/tests/test_propagation.py diff --git a/llama-index-instrumentation/tests/test_propagation.py b/llama-index-instrumentation/tests/test_propagation.py new file mode 100644 index 00000000000..3643e1a1fa4 --- /dev/null +++ b/llama-index-instrumentation/tests/test_propagation.py @@ -0,0 +1,190 @@ +import inspect +from typing import Any, Dict + +from llama_index_instrumentation.dispatcher import ( + Dispatcher, + Manager, + active_instrument_tags, +) +from llama_index_instrumentation.span_handlers.simple import SimpleSpanHandler + + +def _make_bound_args(): + return inspect.signature(lambda: None).bind() + + +class PropagatingHandler(SimpleSpanHandler): + """Handler that captures/restores a fake trace context.""" + + def capture_propagation_context(self) -> Dict[str, Any]: + return {"test_handler": {"trace_id": "abc123", "span_id": "def456"}} + + def restore_propagation_context(self, context: Dict[str, Any]) -> None: + self._restored_context = context + + +def test_capture_propagation_context_basic(): + handler = PropagatingHandler() + d = Dispatcher(span_handlers=[handler], propagate=False) + + ctx = d.capture_propagation_context() + + assert ctx["test_handler"]["trace_id"] == "abc123" + assert ctx["test_handler"]["span_id"] == "def456" + + +def test_capture_includes_instrument_tags(): + handler = SimpleSpanHandler() + d = Dispatcher(span_handlers=[handler], propagate=False) + + token = active_instrument_tags.set({"user_id": "u1", "session": "s1"}) + try: + ctx = d.capture_propagation_context() + finally: + active_instrument_tags.reset(token) + + assert ctx["instrument_tags"] == {"user_id": "u1", "session": "s1"} + + +def test_capture_omits_instrument_tags_when_empty(): + handler = SimpleSpanHandler() + d = Dispatcher(span_handlers=[handler], propagate=False) + + ctx = d.capture_propagation_context() + + assert "instrument_tags" not in ctx + + +def test_restore_propagation_context_basic(): + handler = PropagatingHandler() + d = Dispatcher(span_handlers=[handler], propagate=False) + + context = {"test_handler": {"trace_id": "abc123"}} + d.restore_propagation_context(context) + + assert handler._restored_context == context + + +def test_restore_sets_instrument_tags(): + handler = SimpleSpanHandler() + d = Dispatcher(span_handlers=[handler], propagate=False) + + d.restore_propagation_context({"instrument_tags": {"user_id": "u1"}}) + + assert active_instrument_tags.get() == {"user_id": "u1"} + # cleanup + active_instrument_tags.set({}) + + +def test_capture_walks_parent_chain(): + parent_handler = PropagatingHandler() + child_handler = SimpleSpanHandler() + + parent = Dispatcher(name="parent", span_handlers=[parent_handler], propagate=False) + child = Dispatcher( + name="child", + span_handlers=[child_handler], + propagate=True, + parent_name="parent", + ) + manager = Manager(parent) + manager.add_dispatcher(child) + child.manager = manager + parent.manager = manager + + ctx = child.capture_propagation_context() + + # Should include parent handler's context via propagation + assert "test_handler" in ctx + + +def test_restore_walks_parent_chain(): + parent_handler = PropagatingHandler() + child_handler = PropagatingHandler() + + parent = Dispatcher(name="parent", span_handlers=[parent_handler], propagate=False) + child = Dispatcher( + name="child", + span_handlers=[child_handler], + propagate=True, + parent_name="parent", + ) + manager = Manager(parent) + manager.add_dispatcher(child) + child.manager = manager + parent.manager = manager + + context = {"test_handler": {"trace_id": "xyz"}} + child.restore_propagation_context(context) + + assert child_handler._restored_context == context + assert parent_handler._restored_context == context + + +def test_capture_stops_at_propagate_false(): + parent_handler = PropagatingHandler() + child_handler = SimpleSpanHandler() + + parent = Dispatcher(name="parent", span_handlers=[parent_handler], propagate=False) + child = Dispatcher( + name="child", + span_handlers=[child_handler], + propagate=False, # does NOT propagate + parent_name="parent", + ) + manager = Manager(parent) + manager.add_dispatcher(child) + child.manager = manager + parent.manager = manager + + ctx = child.capture_propagation_context() + + # Should NOT include parent handler's context + assert "test_handler" not in ctx + + +def test_roundtrip_capture_restore(): + """Capture from one dispatcher, restore on another — simulates cross-process.""" + source_handler = PropagatingHandler() + source = Dispatcher(span_handlers=[source_handler], propagate=False) + + token = active_instrument_tags.set({"env": "prod"}) + try: + ctx = source.capture_propagation_context() + finally: + active_instrument_tags.reset(token) + + dest_handler = PropagatingHandler() + dest = Dispatcher(span_handlers=[dest_handler], propagate=False) + + dest.restore_propagation_context(ctx) + + assert dest_handler._restored_context == ctx + assert active_instrument_tags.get() == {"env": "prod"} + # cleanup + active_instrument_tags.set({}) + + +def test_capture_swallows_handler_exceptions(): + class BrokenHandler(SimpleSpanHandler): + def capture_propagation_context(self) -> Dict[str, Any]: + raise RuntimeError("boom") + + handler = BrokenHandler() + d = Dispatcher(span_handlers=[handler], propagate=False) + + # Should not raise + ctx = d.capture_propagation_context() + assert isinstance(ctx, dict) + + +def test_restore_swallows_handler_exceptions(): + class BrokenHandler(SimpleSpanHandler): + def restore_propagation_context(self, context: Dict[str, Any]) -> None: + raise RuntimeError("boom") + + handler = BrokenHandler() + d = Dispatcher(span_handlers=[handler], propagate=False) + + # Should not raise + d.restore_propagation_context({"some": "data"}) From 9f1f6351326d223eb6a092eb338a4fa46b6cdbde Mon Sep 17 00:00:00 2001 From: Adrian Lyjak Date: Mon, 2 Mar 2026 11:38:05 -0500 Subject: [PATCH 5/9] /simplify --- llama-index-instrumentation/tests/test_propagation.py | 5 ----- .../llama_index/observability/otel/base.py | 8 ++------ 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/llama-index-instrumentation/tests/test_propagation.py b/llama-index-instrumentation/tests/test_propagation.py index 3643e1a1fa4..3628d9eb7c5 100644 --- a/llama-index-instrumentation/tests/test_propagation.py +++ b/llama-index-instrumentation/tests/test_propagation.py @@ -1,4 +1,3 @@ -import inspect from typing import Any, Dict from llama_index_instrumentation.dispatcher import ( @@ -9,10 +8,6 @@ from llama_index_instrumentation.span_handlers.simple import SimpleSpanHandler -def _make_bound_args(): - return inspect.signature(lambda: None).bind() - - class PropagatingHandler(SimpleSpanHandler): """Handler that captures/restores a fake trace context.""" diff --git a/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py b/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py index 7c46c0679ad..a3c608f32b9 100644 --- a/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py +++ b/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py @@ -3,7 +3,7 @@ from typing import Any, Dict, List, Literal, Mapping, Optional, Sequence, Union, cast import llama_index_instrumentation as instrument -from llama_index.observability.otel.utils import flatten_dict +from llama_index.observability.otel.utils import _is_otel_supported_type, flatten_dict from llama_index_instrumentation.base.event import BaseEvent from llama_index_instrumentation.event_handlers import BaseEventHandler from llama_index_instrumentation.span import SimpleSpan, active_span_id @@ -87,10 +87,6 @@ def close(self) -> None: if isinstance(global_provider, TracerProvider): provider = global_provider if provider is not None: - try: - provider.force_flush() - except BaseException: - pass try: provider.shutdown() except BaseException: @@ -149,7 +145,7 @@ def new_span( # Record instrument_tags as span attributes if tags is not None: for key, value in tags.items(): - if isinstance(value, (str, int, float, bool)): + if _is_otel_supported_type(value): attr_key = key if "." in key else f"llamaindex.{key}" otel_span.set_attribute(attr_key, value) From d09ba800c537fc78ec865ecf04468553df18cbd2 Mon Sep 17 00:00:00 2001 From: Adrian Lyjak Date: Mon, 2 Mar 2026 11:40:30 -0500 Subject: [PATCH 6/9] slim back boilerplate --- .../llama_index_instrumentation/dispatcher.py | 81 +++++++++---------- 1 file changed, 36 insertions(+), 45 deletions(-) diff --git a/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py b/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py index acf88387dc0..c558e566eb5 100644 --- a/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py +++ b/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py @@ -115,6 +115,15 @@ def root(self) -> "Dispatcher": assert self.manager is not None return self.manager.dispatchers[self.root_name] + def _walk_span_handlers(self) -> Generator[BaseSpanHandler, None, None]: + """Yield every span handler reachable via the propagation chain.""" + c: Optional[Dispatcher] = self + while c: + yield from c.span_handlers + if not c.propagate: + break + c = c.parent + def add_event_handler(self, handler: BaseEventHandler) -> None: """Add handler to set of handlers.""" self.event_handlers += [handler] @@ -270,17 +279,11 @@ def capture_propagation_context(self) -> Dict[str, Any]: and passed to restore_propagation_context() in another process. """ result: Dict[str, Any] = {} - c: Optional[Dispatcher] = self - while c: - for h in c.span_handlers: - try: - result.update(h.capture_propagation_context()) - except BaseException: - pass - if not c.propagate: - c = None - else: - c = c.parent + for h in self._walk_span_handlers(): + try: + result.update(h.capture_propagation_context()) + except BaseException: + pass tags = active_instrument_tags.get() if tags: result["instrument_tags"] = dict(tags) @@ -292,17 +295,11 @@ def restore_propagation_context(self, context: Dict[str, Any]) -> None: Also restores instrument_tags so that subsequent spans see them. """ - c: Optional[Dispatcher] = self - while c: - for h in c.span_handlers: - try: - h.restore_propagation_context(context) - except BaseException: - pass - if not c.propagate: - c = None - else: - c = c.parent + for h in self._walk_span_handlers(): + try: + h.restore_propagation_context(context) + except BaseException: + pass tags = context.get("instrument_tags") if tags: active_instrument_tags.set(dict(tags)) @@ -319,32 +316,26 @@ def shutdown(self) -> None: _shutdown_err = RuntimeError("dispatcher shutdown") seen_handlers: set = set() - c: Optional[Dispatcher] = self - while c: - for h in c.span_handlers: - if id(h) in seen_handlers: - continue - seen_handlers.add(id(h)) - # Drop all open spans — snapshot keys since span_drop mutates the dict - for span_id in list(h.open_spans.keys()): - try: - h.span_drop( - id_=span_id, - bound_args=_synthetic_bound_args, - instance=None, - err=_shutdown_err, - ) - except BaseException: - pass - # Close the handler + for h in self._walk_span_handlers(): + if id(h) in seen_handlers: + continue + seen_handlers.add(id(h)) + # Drop all open spans — snapshot keys since span_drop mutates the dict + for span_id in list(h.open_spans.keys()): try: - h.close() + h.span_drop( + id_=span_id, + bound_args=_synthetic_bound_args, + instance=None, + err=_shutdown_err, + ) except BaseException: pass - if not c.propagate: - c = None - else: - c = c.parent + # Close the handler + try: + h.close() + except BaseException: + pass def span(self, func: Callable[..., _R]) -> Callable[..., _R]: # The `span` decorator should be idempotent. From eb0daec0fc8e46cfe628b0bbcb5d963316b3cfb3 Mon Sep 17 00:00:00 2001 From: Adrian Lyjak Date: Mon, 2 Mar 2026 11:42:35 -0500 Subject: [PATCH 7/9] more simple --- .../src/llama_index_instrumentation/dispatcher.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py b/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py index c558e566eb5..ff07348706e 100644 --- a/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py +++ b/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py @@ -315,11 +315,7 @@ def shutdown(self) -> None: _synthetic_bound_args = inspect.signature(lambda: None).bind() _shutdown_err = RuntimeError("dispatcher shutdown") - seen_handlers: set = set() for h in self._walk_span_handlers(): - if id(h) in seen_handlers: - continue - seen_handlers.add(id(h)) # Drop all open spans — snapshot keys since span_drop mutates the dict for span_id in list(h.open_spans.keys()): try: From 0dc832acee41a63f5d9e49ea4b81254d0191aec6 Mon Sep 17 00:00:00 2001 From: Adrian Lyjak Date: Mon, 2 Mar 2026 11:58:06 -0500 Subject: [PATCH 8/9] more cleanup --- .../llama_index_instrumentation/dispatcher.py | 18 ++++++++++++------ .../tests/test_propagation.py | 7 ++++--- .../llama_index/observability/otel/base.py | 5 ++++- .../tests/test_otel.py | 7 ++++--- 4 files changed, 24 insertions(+), 13 deletions(-) diff --git a/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py b/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py index ff07348706e..b5ba3b2a8ef 100644 --- a/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py +++ b/llama-index-instrumentation/src/llama_index_instrumentation/dispatcher.py @@ -24,6 +24,8 @@ _logger = logging.getLogger(__name__) +_INSTRUMENT_TAGS_KEY = "instrument_tags" + # ContextVar for managing active instrument tags active_instrument_tags: ContextVar[Dict[str, Any]] = ContextVar( "instrument_tags", default={} @@ -283,10 +285,10 @@ def capture_propagation_context(self) -> Dict[str, Any]: try: result.update(h.capture_propagation_context()) except BaseException: - pass + _logger.warning("Error capturing propagation context", exc_info=True) tags = active_instrument_tags.get() if tags: - result["instrument_tags"] = dict(tags) + result[_INSTRUMENT_TAGS_KEY] = dict(tags) return result def restore_propagation_context(self, context: Dict[str, Any]) -> None: @@ -299,8 +301,8 @@ def restore_propagation_context(self, context: Dict[str, Any]) -> None: try: h.restore_propagation_context(context) except BaseException: - pass - tags = context.get("instrument_tags") + _logger.warning("Error restoring propagation context", exc_info=True) + tags = context.get(_INSTRUMENT_TAGS_KEY) if tags: active_instrument_tags.set(dict(tags)) @@ -326,12 +328,16 @@ def shutdown(self) -> None: err=_shutdown_err, ) except BaseException: - pass + _logger.debug( + "Error dropping span %s during shutdown", + span_id, + exc_info=True, + ) # Close the handler try: h.close() except BaseException: - pass + _logger.warning("Error closing handler %s", h, exc_info=True) def span(self, func: Callable[..., _R]) -> Callable[..., _R]: # The `span` decorator should be idempotent. diff --git a/llama-index-instrumentation/tests/test_propagation.py b/llama-index-instrumentation/tests/test_propagation.py index 3628d9eb7c5..aaa7bb28a10 100644 --- a/llama-index-instrumentation/tests/test_propagation.py +++ b/llama-index-instrumentation/tests/test_propagation.py @@ -3,6 +3,7 @@ from llama_index_instrumentation.dispatcher import ( Dispatcher, Manager, + _INSTRUMENT_TAGS_KEY, active_instrument_tags, ) from llama_index_instrumentation.span_handlers.simple import SimpleSpanHandler @@ -38,7 +39,7 @@ def test_capture_includes_instrument_tags(): finally: active_instrument_tags.reset(token) - assert ctx["instrument_tags"] == {"user_id": "u1", "session": "s1"} + assert ctx[_INSTRUMENT_TAGS_KEY] == {"user_id": "u1", "session": "s1"} def test_capture_omits_instrument_tags_when_empty(): @@ -47,7 +48,7 @@ def test_capture_omits_instrument_tags_when_empty(): ctx = d.capture_propagation_context() - assert "instrument_tags" not in ctx + assert _INSTRUMENT_TAGS_KEY not in ctx def test_restore_propagation_context_basic(): @@ -64,7 +65,7 @@ def test_restore_sets_instrument_tags(): handler = SimpleSpanHandler() d = Dispatcher(span_handlers=[handler], propagate=False) - d.restore_propagation_context({"instrument_tags": {"user_id": "u1"}}) + d.restore_propagation_context({_INSTRUMENT_TAGS_KEY: {"user_id": "u1"}}) assert active_instrument_tags.get() == {"user_id": "u1"} # cleanup diff --git a/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py b/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py index a3c608f32b9..5c72c7575ff 100644 --- a/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py +++ b/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py @@ -1,8 +1,11 @@ import inspect +import logging from datetime import datetime from typing import Any, Dict, List, Literal, Mapping, Optional, Sequence, Union, cast import llama_index_instrumentation as instrument + +_logger = logging.getLogger(__name__) from llama_index.observability.otel.utils import _is_otel_supported_type, flatten_dict from llama_index_instrumentation.base.event import BaseEvent from llama_index_instrumentation.event_handlers import BaseEventHandler @@ -90,7 +93,7 @@ def close(self) -> None: try: provider.shutdown() except BaseException: - pass + _logger.warning("Error shutting down tracer provider", exc_info=True) @classmethod def class_name(cls) -> str: # type: ignore diff --git a/llama-index-integrations/observability/llama-index-observability-otel/tests/test_otel.py b/llama-index-integrations/observability/llama-index-observability-otel/tests/test_otel.py index f6aadf8d6e9..67ee9c434df 100644 --- a/llama-index-integrations/observability/llama-index-observability-otel/tests/test_otel.py +++ b/llama-index-integrations/observability/llama-index-observability-otel/tests/test_otel.py @@ -482,6 +482,7 @@ def test_dispatcher_propagation_roundtrip_with_tags() -> None: from llama_index_instrumentation.dispatcher import ( Dispatcher, Manager, + _INSTRUMENT_TAGS_KEY, active_instrument_tags, instrument_tags, ) @@ -515,9 +516,9 @@ def test_dispatcher_propagation_roundtrip_with_tags() -> None: # Verify captured structure has both namespaces assert "otel" in captured - assert "instrument_tags" in captured - assert captured["instrument_tags"]["run_id"] == "run-123" - assert captured["instrument_tags"]["handler_id"] == "wf-abc" + assert _INSTRUMENT_TAGS_KEY in captured + assert captured[_INSTRUMENT_TAGS_KEY]["run_id"] == "run-123" + assert captured[_INSTRUMENT_TAGS_KEY]["handler_id"] == "wf-abc" dispatcher_a.span_exit(id_="root-uuid", bound_args=_bound) provider_a.force_flush() From 651225e6bd1e1ebce83e2908302bb55dc3e1b284 Mon Sep 17 00:00:00 2001 From: Adrian Lyjak Date: Mon, 2 Mar 2026 12:27:28 -0500 Subject: [PATCH 9/9] backout of cprint crap --- .../llama_index/observability/otel/base.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py b/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py index 5c72c7575ff..eb1104ed9ba 100644 --- a/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py +++ b/llama-index-integrations/observability/llama-index-observability-otel/llama_index/observability/otel/base.py @@ -4,8 +4,6 @@ from typing import Any, Dict, List, Literal, Mapping, Optional, Sequence, Union, cast import llama_index_instrumentation as instrument - -_logger = logging.getLogger(__name__) from llama_index.observability.otel.utils import _is_otel_supported_type, flatten_dict from llama_index_instrumentation.base.event import BaseEvent from llama_index_instrumentation.event_handlers import BaseEventHandler @@ -24,6 +22,8 @@ from pydantic import BaseModel, ConfigDict, Field, PrivateAttr from termcolor.termcolor import cprint +_logger = logging.getLogger(__name__) + class OTelEventAttributes(BaseModel): name: str @@ -177,10 +177,7 @@ def prepare_to_exit_span( sp = super().prepare_to_exit_span(id_, bound_args, instance, result, **kwargs) span = self.all_spans.pop(id_, None) if span is None: - cprint( - f"WARNING: no OTel span found for {id_} in prepare_to_exit_span", - color="red", - ) + _logger.warning("No OTel span found for %s in prepare_to_exit_span", id_) return sp # Get and process events specific to this span @@ -209,10 +206,7 @@ def prepare_to_drop_span( sp = super().prepare_to_drop_span(id_, bound_args, instance, err, **kwargs) span = self.all_spans.pop(id_, None) if span is None: - cprint( - f"WARNING: no OTel span found for {id_} in prepare_to_drop_span", - color="red", - ) + _logger.warning("No OTel span found for %s in prepare_to_drop_span", id_) return sp # Get and process events specific to this span