From fa0c0db6fcf4e3956f19350705365169787970a1 Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Wed, 22 Oct 2025 13:15:22 +0200 Subject: [PATCH 01/11] Add tags to gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index c31f84940..c3447e5d1 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ temporalio/bridge/temporal_sdk_bridge* /sdk-python.iml /.zed *.DS_Store +tags From 97e82910a9b5b94c706641ec6ee0a1406fdaf4b9 Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Wed, 22 Oct 2025 13:15:18 +0200 Subject: [PATCH 02/11] adds failing baggage propagation test --- tests/contrib/test_opentelemetry.py | 69 +++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/test_opentelemetry.py index 0b797f606..c23720aa7 100644 --- a/tests/contrib/test_opentelemetry.py +++ b/tests/contrib/test_opentelemetry.py @@ -411,6 +411,29 @@ async def run(self) -> str: ) +@activity.defn +async def read_baggage_activity() -> dict: + """Activity that reads baggage from context.""" + from opentelemetry import baggage + + return { + "user_id": baggage.get_baggage("user.id"), + "tenant_id": baggage.get_baggage("tenant.id"), + } + + +@workflow.defn +class BaggageTestWorkflow: + """Workflow that executes activity and returns baggage values.""" + + @workflow.run + async def run(self) -> dict: + return await workflow.execute_activity( + read_baggage_activity, + start_to_close_timeout=timedelta(seconds=10), + ) + + async def test_opentelemetry_benign_exception(client: Client): # Create a tracer that has an in-memory exporter exporter = InMemorySpanExporter() @@ -442,6 +465,52 @@ async def test_opentelemetry_benign_exception(client: Client): assert all(span.status.status_code == StatusCode.UNSET for span in spans) +async def test_opentelemetry_baggage_propagation_basic( + client: Client, env: WorkflowEnvironment +): + """Test that baggage values propagate from workflow to activity. + + This test currently FAILS, demonstrating the bug. + """ + from opentelemetry import baggage, context + + exporter = InMemorySpanExporter() + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + tracer = get_tracer(__name__, tracer_provider=provider) + + client_config = client.config() + client_config["interceptors"] = [TracingInterceptor(tracer)] + client = Client(**client_config) + + task_queue = f"task_queue_{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[BaggageTestWorkflow], + activities=[read_baggage_activity], + ): + ctx = baggage.set_baggage("user.id", "test-user-123") + ctx = baggage.set_baggage("tenant.id", "acme-corp", context=ctx) + + token = context.attach(ctx) + try: + result = await client.execute_workflow( + BaggageTestWorkflow.run, + id=f"workflow_{uuid.uuid4()}", + task_queue=task_queue, + ) + finally: + context.detach(token) + + assert ( + result["user_id"] == "test-user-123" + ), "user.id baggage should propagate to activity" + assert ( + result["tenant_id"] == "acme-corp" + ), "tenant.id baggage should propagate to activity" + + # TODO(cretz): Additional tests to write # * query without interceptor (no headers) # * workflow without interceptor (no headers) but query with interceptor (headers) From 8dd4cbf0e4bfae69bf58288c1c02db54112fc601 Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Wed, 22 Oct 2025 13:15:13 +0200 Subject: [PATCH 03/11] adds baggage propagation in activity interceptor Change the activity interceptor to use context.attach()/detach() pattern instead of passing context as a parameter to start_as_current_span(). The fix follows the standard OpenTelemetry pattern used by other instrumentations (django, gRPC, etc.) and ensures proper context management with try/finally for detach. --- temporalio/contrib/opentelemetry.py | 62 ++++++++++++++++++----------- tests/contrib/test_opentelemetry.py | 19 ++------- 2 files changed, 43 insertions(+), 38 deletions(-) diff --git a/temporalio/contrib/opentelemetry.py b/temporalio/contrib/opentelemetry.py index 04d40d544..2435a93cf 100644 --- a/temporalio/contrib/opentelemetry.py +++ b/temporalio/contrib/opentelemetry.py @@ -17,6 +17,16 @@ cast, ) +import temporalio.activity +import temporalio.api.common.v1 +import temporalio.client +import temporalio.converter +import temporalio.exceptions +import temporalio.worker +import temporalio.workflow +from temporalio.exceptions import ApplicationError, ApplicationErrorCategory +from typing_extensions import Protocol, TypeAlias, TypedDict + import opentelemetry.baggage.propagation import opentelemetry.context import opentelemetry.context.context @@ -26,18 +36,7 @@ import opentelemetry.trace.propagation.tracecontext import opentelemetry.util.types from opentelemetry.context import Context -from opentelemetry.trace import Span, SpanKind, Status, StatusCode, _Links -from opentelemetry.util import types -from typing_extensions import Protocol, TypeAlias, TypedDict - -import temporalio.activity -import temporalio.api.common.v1 -import temporalio.client -import temporalio.converter -import temporalio.exceptions -import temporalio.worker -import temporalio.workflow -from temporalio.exceptions import ApplicationError, ApplicationErrorCategory +from opentelemetry.trace import Status, StatusCode # OpenTelemetry dynamically, lazily chooses its context implementation at # runtime. When first accessed, they use pkg_resources.iter_entry_points + load. @@ -306,17 +305,34 @@ async def execute_activity( self, input: temporalio.worker.ExecuteActivityInput ) -> Any: info = temporalio.activity.info() - with self.root._start_as_current_span( - f"RunActivity:{info.activity_type}", - context=self.root._context_from_headers(input.headers), - attributes={ - "temporalWorkflowID": info.workflow_id, - "temporalRunID": info.workflow_run_id, - "temporalActivityID": info.activity_id, - }, - kind=opentelemetry.trace.SpanKind.SERVER, - ): - return await super().execute_activity(input) + extracted_ctx = self.root._context_from_headers(input.headers) + + if extracted_ctx: + token = opentelemetry.context.attach(extracted_ctx) + try: + with self.root._start_as_current_span( + f"RunActivity:{info.activity_type}", + attributes={ + "temporalWorkflowID": info.workflow_id, + "temporalRunID": info.workflow_run_id, + "temporalActivityID": info.activity_id, + }, + kind=opentelemetry.trace.SpanKind.SERVER, + ): + return await super().execute_activity(input) + finally: + opentelemetry.context.detach(token) + else: + with self.root._start_as_current_span( + f"RunActivity:{info.activity_type}", + attributes={ + "temporalWorkflowID": info.workflow_id, + "temporalRunID": info.workflow_run_id, + "temporalActivityID": info.activity_id, + }, + kind=opentelemetry.trace.SpanKind.SERVER, + ): + return await super().execute_activity(input) class _InputWithHeaders(Protocol): diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/test_opentelemetry.py index c23720aa7..aeb98a98c 100644 --- a/tests/contrib/test_opentelemetry.py +++ b/tests/contrib/test_opentelemetry.py @@ -8,11 +8,11 @@ from datetime import timedelta from typing import Iterable, List, Optional +from opentelemetry import baggage, context from opentelemetry.sdk.trace import ReadableSpan, TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter from opentelemetry.trace import StatusCode, get_tracer - from temporalio import activity, workflow from temporalio.client import Client from temporalio.common import RetryPolicy @@ -413,9 +413,6 @@ async def run(self) -> str: @activity.defn async def read_baggage_activity() -> dict: - """Activity that reads baggage from context.""" - from opentelemetry import baggage - return { "user_id": baggage.get_baggage("user.id"), "tenant_id": baggage.get_baggage("tenant.id"), @@ -423,9 +420,7 @@ async def read_baggage_activity() -> dict: @workflow.defn -class BaggageTestWorkflow: - """Workflow that executes activity and returns baggage values.""" - +class ReadBaggageTestWorkflow: @workflow.run async def run(self) -> dict: return await workflow.execute_activity( @@ -468,12 +463,6 @@ async def test_opentelemetry_benign_exception(client: Client): async def test_opentelemetry_baggage_propagation_basic( client: Client, env: WorkflowEnvironment ): - """Test that baggage values propagate from workflow to activity. - - This test currently FAILS, demonstrating the bug. - """ - from opentelemetry import baggage, context - exporter = InMemorySpanExporter() provider = TracerProvider() provider.add_span_processor(SimpleSpanProcessor(exporter)) @@ -487,7 +476,7 @@ async def test_opentelemetry_baggage_propagation_basic( async with Worker( client, task_queue=task_queue, - workflows=[BaggageTestWorkflow], + workflows=[ReadBaggageTestWorkflow], activities=[read_baggage_activity], ): ctx = baggage.set_baggage("user.id", "test-user-123") @@ -496,7 +485,7 @@ async def test_opentelemetry_baggage_propagation_basic( token = context.attach(ctx) try: result = await client.execute_workflow( - BaggageTestWorkflow.run, + ReadBaggageTestWorkflow.run, id=f"workflow_{uuid.uuid4()}", task_queue=task_queue, ) From aa5bed78755de5143c4f00e0899cd858804f8875 Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Wed, 22 Oct 2025 13:15:03 +0200 Subject: [PATCH 04/11] adds baggage propagation tests Add additional tests to verify baggage propagation in scenarios: - multiple values - local activity - retries in activity --- tests/contrib/test_opentelemetry.py | 192 ++++++++++++++++++++++++++-- 1 file changed, 181 insertions(+), 11 deletions(-) diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/test_opentelemetry.py index aeb98a98c..eef9b37c6 100644 --- a/tests/contrib/test_opentelemetry.py +++ b/tests/contrib/test_opentelemetry.py @@ -4,9 +4,10 @@ import logging import uuid from concurrent.futures import ThreadPoolExecutor +from contextlib import contextmanager from dataclasses import dataclass from datetime import timedelta -from typing import Iterable, List, Optional +from typing import Dict, Generator, Iterable, List, Optional from opentelemetry import baggage, context from opentelemetry.sdk.trace import ReadableSpan, TracerProvider @@ -412,7 +413,7 @@ async def run(self) -> str: @activity.defn -async def read_baggage_activity() -> dict: +async def read_baggage_activity() -> Dict[str, str | None]: return { "user_id": baggage.get_baggage("user.id"), "tenant_id": baggage.get_baggage("tenant.id"), @@ -422,13 +423,65 @@ async def read_baggage_activity() -> dict: @workflow.defn class ReadBaggageTestWorkflow: @workflow.run - async def run(self) -> dict: + async def run(self) -> Dict[str, str | None]: return await workflow.execute_activity( read_baggage_activity, start_to_close_timeout=timedelta(seconds=10), ) +@activity.defn +async def read_multiple_baggage_activity() -> Dict[str, str | None]: + return { + "user_id": baggage.get_baggage("user.id"), + "tenant_id": baggage.get_baggage("tenant.id"), + "request_id": baggage.get_baggage("request.id"), + "trace_id": baggage.get_baggage("trace.id"), + "custom_field": baggage.get_baggage("custom.field"), + } + + +@workflow.defn +class MultipleBaggageTestWorkflow: + @workflow.run + async def run(self) -> Dict[str, str | None]: + return await workflow.execute_activity( + read_multiple_baggage_activity, + start_to_close_timeout=timedelta(seconds=10), + ) + + +@workflow.defn +class LocalActivityBaggageTestWorkflow: + @workflow.run + async def run(self) -> Dict[str, str | None]: + return await workflow.execute_local_activity( + read_baggage_activity, + start_to_close_timeout=timedelta(seconds=10), + ) + + +retry_attempt_baggage_values: List[Optional[str]] = [] + + +@activity.defn +async def failing_baggage_activity() -> None: + retry_attempt_baggage_values.append(baggage.get_baggage("user.id")) + if activity.info().attempt < 2: + raise RuntimeError("Intentional failure") + + +@workflow.defn +class RetryBaggageTestWorkflow: + @workflow.run + async def run(self) -> None: + await workflow.execute_activity( + failing_baggage_activity, + start_to_close_timeout=timedelta(seconds=10), + retry_policy=RetryPolicy(initial_interval=timedelta(milliseconds=1)), + ) + + async def test_opentelemetry_benign_exception(client: Client): # Create a tracer that has an in-memory exporter exporter = InMemorySpanExporter() @@ -460,6 +513,19 @@ async def test_opentelemetry_benign_exception(client: Client): assert all(span.status.status_code == StatusCode.UNSET for span in spans) +@contextmanager +def baggage_values(values: Dict[str, str]) -> Generator[None, None, None]: + ctx = context.get_current() + for key, value in values.items(): + ctx = baggage.set_baggage(key, value, context=ctx) + + token = context.attach(ctx) + try: + yield + finally: + context.detach(token) + + async def test_opentelemetry_baggage_propagation_basic( client: Client, env: WorkflowEnvironment ): @@ -479,27 +545,131 @@ async def test_opentelemetry_baggage_propagation_basic( workflows=[ReadBaggageTestWorkflow], activities=[read_baggage_activity], ): - ctx = baggage.set_baggage("user.id", "test-user-123") - ctx = baggage.set_baggage("tenant.id", "acme-corp", context=ctx) - - token = context.attach(ctx) - try: + with baggage_values({"user.id": "test-user-123", "tenant.id": "some-corp"}): result = await client.execute_workflow( ReadBaggageTestWorkflow.run, id=f"workflow_{uuid.uuid4()}", task_queue=task_queue, ) - finally: - context.detach(token) assert ( result["user_id"] == "test-user-123" ), "user.id baggage should propagate to activity" assert ( - result["tenant_id"] == "acme-corp" + result["tenant_id"] == "some-corp" ), "tenant.id baggage should propagate to activity" +async def test_opentelemetry_baggage_propagation_multiple_values( + client: Client, env: WorkflowEnvironment +): + exporter = InMemorySpanExporter() + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + tracer = get_tracer(__name__, tracer_provider=provider) + + client_config = client.config() + client_config["interceptors"] = [TracingInterceptor(tracer)] + client = Client(**client_config) + + task_queue = f"task_queue_{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[MultipleBaggageTestWorkflow], + activities=[read_multiple_baggage_activity], + ): + with baggage_values( + { + "user.id": "test-user-123", + "tenant.id": "some-corp", + "request.id": "req-456", + "trace.id": "trace-789", + "custom.field": "custom-value", + } + ): + result = await client.execute_workflow( + MultipleBaggageTestWorkflow.run, + id=f"workflow_{uuid.uuid4()}", + task_queue=task_queue, + ) + + assert result["user_id"] == "test-user-123" + assert result["tenant_id"] == "some-corp" + assert result["request_id"] == "req-456" + assert result["trace_id"] == "trace-789" + assert result["custom_field"] == "custom-value" + + +async def test_opentelemetry_baggage_propagation_local_activity( + client: Client, env: WorkflowEnvironment +): + exporter = InMemorySpanExporter() + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + tracer = get_tracer(__name__, tracer_provider=provider) + + client_config = client.config() + client_config["interceptors"] = [TracingInterceptor(tracer)] + client = Client(**client_config) + + task_queue = f"task_queue_{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[LocalActivityBaggageTestWorkflow], + activities=[read_baggage_activity], + ): + with baggage_values( + { + "user.id": "test-user-456", + "tenant.id": "local-corp", + } + ): + result = await client.execute_workflow( + LocalActivityBaggageTestWorkflow.run, + id=f"workflow_{uuid.uuid4()}", + task_queue=task_queue, + ) + + assert result["user_id"] == "test-user-456" + assert result["tenant_id"] == "local-corp" + + +async def test_opentelemetry_baggage_propagation_with_retries( + client: Client, env: WorkflowEnvironment +): + global retry_attempt_baggage_values + retry_attempt_baggage_values = [] + + exporter = InMemorySpanExporter() + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + tracer = get_tracer(__name__, tracer_provider=provider) + + client_config = client.config() + client_config["interceptors"] = [TracingInterceptor(tracer)] + client = Client(**client_config) + + task_queue = f"task_queue_{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[RetryBaggageTestWorkflow], + activities=[failing_baggage_activity], + ): + with baggage_values({"user.id": "test-user-retry"}): + await client.execute_workflow( + RetryBaggageTestWorkflow.run, + id=f"workflow_{uuid.uuid4()}", + task_queue=task_queue, + ) + + # Verify baggage was present on all attempts + assert len(retry_attempt_baggage_values) == 2 + assert all(v == "test-user-retry" for v in retry_attempt_baggage_values) + + # TODO(cretz): Additional tests to write # * query without interceptor (no headers) # * workflow without interceptor (no headers) but query with interceptor (headers) From 360ebcb0e0f15abc94ec313bd196327b541af277 Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Wed, 22 Oct 2025 13:14:57 +0200 Subject: [PATCH 05/11] adds more tests for baggage propagation Two important edge case tests: - exceptions handling - when no current context is available --- tests/contrib/test_opentelemetry.py | 95 +++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/test_opentelemetry.py index eef9b37c6..80af14af9 100644 --- a/tests/contrib/test_opentelemetry.py +++ b/tests/contrib/test_opentelemetry.py @@ -482,6 +482,44 @@ async def run(self) -> None: ) +@activity.defn +async def exception_baggage_activity() -> None: + user_id = baggage.get_baggage("user.id") + if user_id != "test-user-123": + raise AssertionError(f"Expected user.id='test-user-123', got '{user_id}'") + raise RuntimeError("Intentional activity failure") + + +@workflow.defn +class BaggageExceptionWorkflow: + @workflow.run + async def run(self) -> str: + try: + await workflow.execute_activity( + exception_baggage_activity, + start_to_close_timeout=timedelta(seconds=10), + retry_policy=RetryPolicy(maximum_attempts=1), + ) + except Exception as e: + return f"exception_handled: {e.failure.cause.application_failure_info.type}" + return "no_exception" + + +@activity.defn +async def simple_no_context_activity() -> str: + return "success" + + +@workflow.defn +class SimpleNoContextWorkflow: + @workflow.run + async def run(self) -> str: + return await workflow.execute_activity( + simple_no_context_activity, + start_to_close_timeout=timedelta(seconds=10), + ) + + async def test_opentelemetry_benign_exception(client: Client): # Create a tracer that has an in-memory exporter exporter = InMemorySpanExporter() @@ -670,6 +708,63 @@ async def test_opentelemetry_baggage_propagation_with_retries( assert all(v == "test-user-retry" for v in retry_attempt_baggage_values) +async def test_opentelemetry_baggage_exception_handling( + client: Client, env: WorkflowEnvironment +): + exporter = InMemorySpanExporter() + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + tracer = get_tracer(__name__, tracer_provider=provider) + + client_config = client.config() + client_config["interceptors"] = [TracingInterceptor(tracer)] + client = Client(**client_config) + + task_queue = f"task_queue_{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[BaggageExceptionWorkflow], + activities=[exception_baggage_activity], + ): + with baggage_values({"user.id": "test-user-123"}): + result = await client.execute_workflow( + BaggageExceptionWorkflow.run, + id=f"workflow_{uuid.uuid4()}", + task_queue=task_queue, + ) + + assert result == "exception_handled: RuntimeError" + + +async def test_opentelemetry_interceptor_works_if_no_context( + client: Client, env: WorkflowEnvironment +): + exporter = InMemorySpanExporter() + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + tracer = get_tracer(__name__, tracer_provider=provider) + + client_config = client.config() + client_config["interceptors"] = [TracingInterceptor(tracer)] + client = Client(**client_config) + + task_queue = f"task_queue_{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[SimpleNoContextWorkflow], + activities=[simple_no_context_activity], + ): + result = await client.execute_workflow( + SimpleNoContextWorkflow.run, + id=f"workflow_{uuid.uuid4()}", + task_queue=task_queue, + ) + + assert result == "success" + + # TODO(cretz): Additional tests to write # * query without interceptor (no headers) # * workflow without interceptor (no headers) but query with interceptor (headers) From 24efa024cc69c9db7a13498b0edf36061f940fb6 Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Wed, 22 Oct 2025 13:14:43 +0200 Subject: [PATCH 06/11] moves context handling to _start_as_current_span --- temporalio/contrib/opentelemetry.py | 91 +++++++++++++---------------- 1 file changed, 41 insertions(+), 50 deletions(-) diff --git a/temporalio/contrib/opentelemetry.py b/temporalio/contrib/opentelemetry.py index 2435a93cf..ba9c5f018 100644 --- a/temporalio/contrib/opentelemetry.py +++ b/temporalio/contrib/opentelemetry.py @@ -172,29 +172,37 @@ def _start_as_current_span( kind: opentelemetry.trace.SpanKind, context: Optional[Context] = None, ) -> Iterator[None]: - with self.tracer.start_as_current_span( - name, - attributes=attributes, - kind=kind, - context=context, - set_status_on_exception=False, - ) as span: - if input: - input.headers = self._context_to_headers(input.headers) - try: - yield None - except Exception as exc: - if ( - not isinstance(exc, ApplicationError) - or exc.category != ApplicationErrorCategory.BENIGN - ): - span.set_status( - Status( - status_code=StatusCode.ERROR, - description=f"{type(exc).__name__}: {exc}", + if context: + token = opentelemetry.context.attach(context) + else: + token = None + try: + with self.tracer.start_as_current_span( + name, + attributes=attributes, + kind=kind, + context=context, + set_status_on_exception=False, + ) as span: + if input: + input.headers = self._context_to_headers(input.headers) + try: + yield None + except Exception as exc: + if ( + not isinstance(exc, ApplicationError) + or exc.category != ApplicationErrorCategory.BENIGN + ): + span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{type(exc).__name__}: {exc}", + ) ) - ) - raise + raise + finally: + if token: + opentelemetry.context.detach(token) def _completed_workflow_span( self, params: _CompletedWorkflowSpanParams @@ -305,34 +313,17 @@ async def execute_activity( self, input: temporalio.worker.ExecuteActivityInput ) -> Any: info = temporalio.activity.info() - extracted_ctx = self.root._context_from_headers(input.headers) - - if extracted_ctx: - token = opentelemetry.context.attach(extracted_ctx) - try: - with self.root._start_as_current_span( - f"RunActivity:{info.activity_type}", - attributes={ - "temporalWorkflowID": info.workflow_id, - "temporalRunID": info.workflow_run_id, - "temporalActivityID": info.activity_id, - }, - kind=opentelemetry.trace.SpanKind.SERVER, - ): - return await super().execute_activity(input) - finally: - opentelemetry.context.detach(token) - else: - with self.root._start_as_current_span( - f"RunActivity:{info.activity_type}", - attributes={ - "temporalWorkflowID": info.workflow_id, - "temporalRunID": info.workflow_run_id, - "temporalActivityID": info.activity_id, - }, - kind=opentelemetry.trace.SpanKind.SERVER, - ): - return await super().execute_activity(input) + with self.root._start_as_current_span( + f"RunActivity:{info.activity_type}", + context=self.root._context_from_headers(input.headers), + attributes={ + "temporalWorkflowID": info.workflow_id, + "temporalRunID": info.workflow_run_id, + "temporalActivityID": info.activity_id, + }, + kind=opentelemetry.trace.SpanKind.SERVER, + ): + return await super().execute_activity(input) class _InputWithHeaders(Protocol): From b2401daf2ae01062a32fe5213b35ba1e3e0e5cfb Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Wed, 22 Oct 2025 13:14:37 +0200 Subject: [PATCH 07/11] cleanup and improvements after review --- temporalio/contrib/opentelemetry.py | 5 +- tests/contrib/test_opentelemetry.py | 302 ++++++++-------------------- 2 files changed, 85 insertions(+), 222 deletions(-) diff --git a/temporalio/contrib/opentelemetry.py b/temporalio/contrib/opentelemetry.py index ba9c5f018..7bc5f4249 100644 --- a/temporalio/contrib/opentelemetry.py +++ b/temporalio/contrib/opentelemetry.py @@ -172,10 +172,7 @@ def _start_as_current_span( kind: opentelemetry.trace.SpanKind, context: Optional[Context] = None, ) -> Iterator[None]: - if context: - token = opentelemetry.context.attach(context) - else: - token = None + token = opentelemetry.context.attach(context) if context else None try: with self.tracer.start_as_current_span( name, diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/test_opentelemetry.py index 80af14af9..3868fb7f8 100644 --- a/tests/contrib/test_opentelemetry.py +++ b/tests/contrib/test_opentelemetry.py @@ -412,114 +412,6 @@ async def run(self) -> str: ) -@activity.defn -async def read_baggage_activity() -> Dict[str, str | None]: - return { - "user_id": baggage.get_baggage("user.id"), - "tenant_id": baggage.get_baggage("tenant.id"), - } - - -@workflow.defn -class ReadBaggageTestWorkflow: - @workflow.run - async def run(self) -> Dict[str, str | None]: - return await workflow.execute_activity( - read_baggage_activity, - start_to_close_timeout=timedelta(seconds=10), - ) - - -@activity.defn -async def read_multiple_baggage_activity() -> Dict[str, str | None]: - return { - "user_id": baggage.get_baggage("user.id"), - "tenant_id": baggage.get_baggage("tenant.id"), - "request_id": baggage.get_baggage("request.id"), - "trace_id": baggage.get_baggage("trace.id"), - "custom_field": baggage.get_baggage("custom.field"), - } - - -@workflow.defn -class MultipleBaggageTestWorkflow: - @workflow.run - async def run(self) -> Dict[str, str | None]: - return await workflow.execute_activity( - read_multiple_baggage_activity, - start_to_close_timeout=timedelta(seconds=10), - ) - - -@workflow.defn -class LocalActivityBaggageTestWorkflow: - @workflow.run - async def run(self) -> Dict[str, str | None]: - return await workflow.execute_local_activity( - read_baggage_activity, - start_to_close_timeout=timedelta(seconds=10), - ) - - -retry_attempt_baggage_values: List[Optional[str]] = [] - - -@activity.defn -async def failing_baggage_activity() -> None: - retry_attempt_baggage_values.append(baggage.get_baggage("user.id")) - if activity.info().attempt < 2: - raise RuntimeError("Intentional failure") - - -@workflow.defn -class RetryBaggageTestWorkflow: - @workflow.run - async def run(self) -> None: - await workflow.execute_activity( - failing_baggage_activity, - start_to_close_timeout=timedelta(seconds=10), - retry_policy=RetryPolicy(initial_interval=timedelta(milliseconds=1)), - ) - - -@activity.defn -async def exception_baggage_activity() -> None: - user_id = baggage.get_baggage("user.id") - if user_id != "test-user-123": - raise AssertionError(f"Expected user.id='test-user-123', got '{user_id}'") - raise RuntimeError("Intentional activity failure") - - -@workflow.defn -class BaggageExceptionWorkflow: - @workflow.run - async def run(self) -> str: - try: - await workflow.execute_activity( - exception_baggage_activity, - start_to_close_timeout=timedelta(seconds=10), - retry_policy=RetryPolicy(maximum_attempts=1), - ) - except Exception as e: - return f"exception_handled: {e.failure.cause.application_failure_info.type}" - return "no_exception" - - -@activity.defn -async def simple_no_context_activity() -> str: - return "success" - - -@workflow.defn -class SimpleNoContextWorkflow: - @workflow.run - async def run(self) -> str: - return await workflow.execute_activity( - simple_no_context_activity, - start_to_close_timeout=timedelta(seconds=10), - ) - - async def test_opentelemetry_benign_exception(client: Client): # Create a tracer that has an in-memory exporter exporter = InMemorySpanExporter() @@ -564,27 +456,44 @@ def baggage_values(values: Dict[str, str]) -> Generator[None, None, None]: context.detach(token) -async def test_opentelemetry_baggage_propagation_basic( - client: Client, env: WorkflowEnvironment -): - exporter = InMemorySpanExporter() - provider = TracerProvider() - provider.add_span_processor(SimpleSpanProcessor(exporter)) - tracer = get_tracer(__name__, tracer_provider=provider) - +@pytest.fixture +def client_with_tracing(client: Client) -> Client: + tracer = get_tracer(__name__, tracer_provider=TracerProvider()) client_config = client.config() client_config["interceptors"] = [TracingInterceptor(tracer)] - client = Client(**client_config) + return Client(**client_config) + +@activity.defn +async def read_baggage_activity() -> Dict[str, str | None]: + return { + "user_id": baggage.get_baggage("user.id"), + "tenant_id": baggage.get_baggage("tenant.id"), + } + + +@workflow.defn +class ReadBaggageTestWorkflow: + @workflow.run + async def run(self) -> Dict[str, str | None]: + return await workflow.execute_activity( + read_baggage_activity, + start_to_close_timeout=timedelta(seconds=10), + ) + + +async def test_opentelemetry_baggage_propagation_basic( + client_with_tracing: Client, env: WorkflowEnvironment +): task_queue = f"task_queue_{uuid.uuid4()}" async with Worker( - client, + client_with_tracing, task_queue=task_queue, workflows=[ReadBaggageTestWorkflow], activities=[read_baggage_activity], ): with baggage_values({"user.id": "test-user-123", "tenant.id": "some-corp"}): - result = await client.execute_workflow( + result = await client_with_tracing.execute_workflow( ReadBaggageTestWorkflow.run, id=f"workflow_{uuid.uuid4()}", task_queue=task_queue, @@ -598,65 +507,33 @@ async def test_opentelemetry_baggage_propagation_basic( ), "tenant.id baggage should propagate to activity" -async def test_opentelemetry_baggage_propagation_multiple_values( - client: Client, env: WorkflowEnvironment -): - exporter = InMemorySpanExporter() - provider = TracerProvider() - provider.add_span_processor(SimpleSpanProcessor(exporter)) - tracer = get_tracer(__name__, tracer_provider=provider) - - client_config = client.config() - client_config["interceptors"] = [TracingInterceptor(tracer)] - client = Client(**client_config) +@activity.defn +async def read_baggage_local_activity() -> Dict[str, str | None]: + return { + "user_id": baggage.get_baggage("user.id"), + "tenant_id": baggage.get_baggage("tenant.id"), + } - task_queue = f"task_queue_{uuid.uuid4()}" - async with Worker( - client, - task_queue=task_queue, - workflows=[MultipleBaggageTestWorkflow], - activities=[read_multiple_baggage_activity], - ): - with baggage_values( - { - "user.id": "test-user-123", - "tenant.id": "some-corp", - "request.id": "req-456", - "trace.id": "trace-789", - "custom.field": "custom-value", - } - ): - result = await client.execute_workflow( - MultipleBaggageTestWorkflow.run, - id=f"workflow_{uuid.uuid4()}", - task_queue=task_queue, - ) - assert result["user_id"] == "test-user-123" - assert result["tenant_id"] == "some-corp" - assert result["request_id"] == "req-456" - assert result["trace_id"] == "trace-789" - assert result["custom_field"] == "custom-value" +@workflow.defn +class LocalActivityBaggageTestWorkflow: + @workflow.run + async def run(self) -> Dict[str, str | None]: + return await workflow.execute_local_activity( + read_baggage_local_activity, + start_to_close_timeout=timedelta(seconds=10), + ) async def test_opentelemetry_baggage_propagation_local_activity( - client: Client, env: WorkflowEnvironment + client_with_tracing: Client, env: WorkflowEnvironment ): - exporter = InMemorySpanExporter() - provider = TracerProvider() - provider.add_span_processor(SimpleSpanProcessor(exporter)) - tracer = get_tracer(__name__, tracer_provider=provider) - - client_config = client.config() - client_config["interceptors"] = [TracingInterceptor(tracer)] - client = Client(**client_config) - task_queue = f"task_queue_{uuid.uuid4()}" async with Worker( - client, + client_with_tracing, task_queue=task_queue, workflows=[LocalActivityBaggageTestWorkflow], - activities=[read_baggage_activity], + activities=[read_baggage_local_activity], ): with baggage_values( { @@ -664,7 +541,7 @@ async def test_opentelemetry_baggage_propagation_local_activity( "tenant.id": "local-corp", } ): - result = await client.execute_workflow( + result = await client_with_tracing.execute_workflow( LocalActivityBaggageTestWorkflow.run, id=f"workflow_{uuid.uuid4()}", task_queue=task_queue, @@ -674,30 +551,42 @@ async def test_opentelemetry_baggage_propagation_local_activity( assert result["tenant_id"] == "local-corp" +retry_attempt_baggage_values: List[Optional[str]] = [] + + +@activity.defn +async def failing_baggage_activity() -> None: + retry_attempt_baggage_values.append(baggage.get_baggage("user.id")) + if activity.info().attempt < 2: + raise RuntimeError("Intentional failure") + + +@workflow.defn +class RetryBaggageTestWorkflow: + @workflow.run + async def run(self) -> None: + await workflow.execute_activity( + failing_baggage_activity, + start_to_close_timeout=timedelta(seconds=10), + retry_policy=RetryPolicy(initial_interval=timedelta(milliseconds=1)), + ) + + async def test_opentelemetry_baggage_propagation_with_retries( - client: Client, env: WorkflowEnvironment + client_with_tracing: Client, env: WorkflowEnvironment ): global retry_attempt_baggage_values retry_attempt_baggage_values = [] - exporter = InMemorySpanExporter() - provider = TracerProvider() - provider.add_span_processor(SimpleSpanProcessor(exporter)) - tracer = get_tracer(__name__, tracer_provider=provider) - - client_config = client.config() - client_config["interceptors"] = [TracingInterceptor(tracer)] - client = Client(**client_config) - task_queue = f"task_queue_{uuid.uuid4()}" async with Worker( - client, + client_with_tracing, task_queue=task_queue, workflows=[RetryBaggageTestWorkflow], activities=[failing_baggage_activity], ): with baggage_values({"user.id": "test-user-retry"}): - await client.execute_workflow( + await client_with_tracing.execute_workflow( RetryBaggageTestWorkflow.run, id=f"workflow_{uuid.uuid4()}", task_queue=task_queue, @@ -708,55 +597,32 @@ async def test_opentelemetry_baggage_propagation_with_retries( assert all(v == "test-user-retry" for v in retry_attempt_baggage_values) -async def test_opentelemetry_baggage_exception_handling( - client: Client, env: WorkflowEnvironment -): - exporter = InMemorySpanExporter() - provider = TracerProvider() - provider.add_span_processor(SimpleSpanProcessor(exporter)) - tracer = get_tracer(__name__, tracer_provider=provider) - - client_config = client.config() - client_config["interceptors"] = [TracingInterceptor(tracer)] - client = Client(**client_config) +@activity.defn +async def simple_no_context_activity() -> str: + return "success" - task_queue = f"task_queue_{uuid.uuid4()}" - async with Worker( - client, - task_queue=task_queue, - workflows=[BaggageExceptionWorkflow], - activities=[exception_baggage_activity], - ): - with baggage_values({"user.id": "test-user-123"}): - result = await client.execute_workflow( - BaggageExceptionWorkflow.run, - id=f"workflow_{uuid.uuid4()}", - task_queue=task_queue, - ) - assert result == "exception_handled: RuntimeError" +@workflow.defn +class SimpleNoContextWorkflow: + @workflow.run + async def run(self) -> str: + return await workflow.execute_activity( + simple_no_context_activity, + start_to_close_timeout=timedelta(seconds=10), + ) async def test_opentelemetry_interceptor_works_if_no_context( - client: Client, env: WorkflowEnvironment + client_with_tracing: Client, env: WorkflowEnvironment ): - exporter = InMemorySpanExporter() - provider = TracerProvider() - provider.add_span_processor(SimpleSpanProcessor(exporter)) - tracer = get_tracer(__name__, tracer_provider=provider) - - client_config = client.config() - client_config["interceptors"] = [TracingInterceptor(tracer)] - client = Client(**client_config) - task_queue = f"task_queue_{uuid.uuid4()}" async with Worker( - client, + client_with_tracing, task_queue=task_queue, workflows=[SimpleNoContextWorkflow], activities=[simple_no_context_activity], ): - result = await client.execute_workflow( + result = await client_with_tracing.execute_workflow( SimpleNoContextWorkflow.run, id=f"workflow_{uuid.uuid4()}", task_queue=task_queue, From b76bd45df82f3185ec37f1becca7214498401c2f Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Wed, 22 Oct 2025 13:14:22 +0200 Subject: [PATCH 08/11] adds test for context cleanup in the interceptor --- tests/contrib/test_opentelemetry.py | 92 ++++++++++++++++++++++++++++- 1 file changed, 90 insertions(+), 2 deletions(-) diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/test_opentelemetry.py index 3868fb7f8..c7e068bd9 100644 --- a/tests/contrib/test_opentelemetry.py +++ b/tests/contrib/test_opentelemetry.py @@ -7,7 +7,7 @@ from contextlib import contextmanager from dataclasses import dataclass from datetime import timedelta -from typing import Dict, Generator, Iterable, List, Optional +from typing import Callable, Dict, Generator, Iterable, List, Optional from opentelemetry import baggage, context from opentelemetry.sdk.trace import ReadableSpan, TracerProvider @@ -574,7 +574,7 @@ async def run(self) -> None: async def test_opentelemetry_baggage_propagation_with_retries( client_with_tracing: Client, env: WorkflowEnvironment -): +) -> None: global retry_attempt_baggage_values retry_attempt_baggage_values = [] @@ -597,6 +597,94 @@ async def test_opentelemetry_baggage_propagation_with_retries( assert all(v == "test-user-retry" for v in retry_attempt_baggage_values) +@activity.defn +async def context_clear_noop_activity() -> None: + pass + + +@activity.defn +async def context_clear_exception_activity() -> None: + raise Exception("Simulated exception") + + +@workflow.defn +class ContextClearWorkflow: + @workflow.run + async def run(self) -> None: + await workflow.execute_activity( + context_clear_noop_activity, + start_to_close_timeout=timedelta(seconds=10), + retry_policy=RetryPolicy( + maximum_attempts=1, initial_interval=timedelta(milliseconds=1) + ), + ) + + +EXPECT_FAILURE = True + + +@pytest.mark.parametrize( + "activity,expect_failure", + [ + (context_clear_noop_activity, not EXPECT_FAILURE), + (context_clear_exception_activity, EXPECT_FAILURE), + ], +) +async def test_opentelemetry_context_restored_after_activity( + client_with_tracing: Client, + env: WorkflowEnvironment, + activity: Callable[[], None], + expect_failure: bool, +) -> None: + attach_count = 0 + detach_count = 0 + original_attach = context.attach + original_detach = context.detach + + def tracked_attach(ctx): + nonlocal attach_count + attach_count += 1 + return original_attach(ctx) + + def tracked_detach(token): + nonlocal detach_count + detach_count += 1 + return original_detach(token) + + context.attach = tracked_attach + context.detach = tracked_detach + + try: + task_queue = f"task_queue_{uuid.uuid4()}" + async with Worker( + client_with_tracing, + task_queue=task_queue, + workflows=[ContextClearWorkflow], + activities=[activity], + ): + with baggage_values({"user.id": "test-123"}): + try: + await client_with_tracing.execute_workflow( + ContextClearWorkflow.run, + id=f"workflow_{uuid.uuid4()}", + task_queue=task_queue, + ) + assert ( + not expect_failure + ), "This test should have raised an exception" + except Exception: + assert expect_failure, "This test is not expeced to raise" + + assert ( + attach_count == detach_count + ), f"Context leak detected: {attach_count} attaches vs {detach_count} detaches. " + assert attach_count > 0, "Expected at least one context attach/detach" + + finally: + context.attach = original_attach + context.detach = original_detach + + @activity.defn async def simple_no_context_activity() -> str: return "success" From 945816431ccde058a9847050e17ceedb4a88f11c Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Wed, 22 Oct 2025 19:37:14 +0200 Subject: [PATCH 09/11] fixes static checks --- temporalio/contrib/opentelemetry.py | 20 ++++++++--------- tests/contrib/test_opentelemetry.py | 34 ++++++++++++++++++----------- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/temporalio/contrib/opentelemetry.py b/temporalio/contrib/opentelemetry.py index 7bc5f4249..89e231fec 100644 --- a/temporalio/contrib/opentelemetry.py +++ b/temporalio/contrib/opentelemetry.py @@ -17,16 +17,6 @@ cast, ) -import temporalio.activity -import temporalio.api.common.v1 -import temporalio.client -import temporalio.converter -import temporalio.exceptions -import temporalio.worker -import temporalio.workflow -from temporalio.exceptions import ApplicationError, ApplicationErrorCategory -from typing_extensions import Protocol, TypeAlias, TypedDict - import opentelemetry.baggage.propagation import opentelemetry.context import opentelemetry.context.context @@ -37,6 +27,16 @@ import opentelemetry.util.types from opentelemetry.context import Context from opentelemetry.trace import Status, StatusCode +from typing_extensions import Protocol, TypeAlias, TypedDict + +import temporalio.activity +import temporalio.api.common.v1 +import temporalio.client +import temporalio.converter +import temporalio.exceptions +import temporalio.worker +import temporalio.workflow +from temporalio.exceptions import ApplicationError, ApplicationErrorCategory # OpenTelemetry dynamically, lazily chooses its context implementation at # runtime. When first accessed, they use pkg_resources.iter_entry_points + load. diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/test_opentelemetry.py index c7e068bd9..f6b4827ab 100644 --- a/tests/contrib/test_opentelemetry.py +++ b/tests/contrib/test_opentelemetry.py @@ -7,13 +7,14 @@ from contextlib import contextmanager from dataclasses import dataclass from datetime import timedelta -from typing import Callable, Dict, Generator, Iterable, List, Optional +from typing import Callable, Dict, Generator, Iterable, List, Optional, cast from opentelemetry import baggage, context from opentelemetry.sdk.trace import ReadableSpan, TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter from opentelemetry.trace import StatusCode, get_tracer + from temporalio import activity, workflow from temporalio.client import Client from temporalio.common import RetryPolicy @@ -464,18 +465,22 @@ def client_with_tracing(client: Client) -> Client: return Client(**client_config) +def get_baggage_value(key: str) -> str: + return cast("str", baggage.get_baggage(key)) + + @activity.defn -async def read_baggage_activity() -> Dict[str, str | None]: +async def read_baggage_activity() -> Dict[str, str]: return { - "user_id": baggage.get_baggage("user.id"), - "tenant_id": baggage.get_baggage("tenant.id"), + "user_id": get_baggage_value("user.id"), + "tenant_id": get_baggage_value("tenant.id"), } @workflow.defn class ReadBaggageTestWorkflow: @workflow.run - async def run(self) -> Dict[str, str | None]: + async def run(self) -> Dict[str, str]: return await workflow.execute_activity( read_baggage_activity, start_to_close_timeout=timedelta(seconds=10), @@ -508,17 +513,20 @@ async def test_opentelemetry_baggage_propagation_basic( @activity.defn -async def read_baggage_local_activity() -> Dict[str, str | None]: - return { - "user_id": baggage.get_baggage("user.id"), - "tenant_id": baggage.get_baggage("tenant.id"), - } +async def read_baggage_local_activity() -> Dict[str, str]: + return cast( + Dict[str, str], + { + "user_id": get_baggage_value("user.id"), + "tenant_id": get_baggage_value("tenant.id"), + }, + ) @workflow.defn class LocalActivityBaggageTestWorkflow: @workflow.run - async def run(self) -> Dict[str, str | None]: + async def run(self) -> Dict[str, str]: return await workflow.execute_local_activity( read_baggage_local_activity, start_to_close_timeout=timedelta(seconds=10), @@ -551,12 +559,12 @@ async def test_opentelemetry_baggage_propagation_local_activity( assert result["tenant_id"] == "local-corp" -retry_attempt_baggage_values: List[Optional[str]] = [] +retry_attempt_baggage_values: List[str] = [] @activity.defn async def failing_baggage_activity() -> None: - retry_attempt_baggage_values.append(baggage.get_baggage("user.id")) + retry_attempt_baggage_values.append(get_baggage_value("user.id")) if activity.info().attempt < 2: raise RuntimeError("Intentional failure") From 3ca4e2ac9b188b6c95fec1e051e76d9b6503e938 Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Wed, 22 Oct 2025 20:11:56 +0200 Subject: [PATCH 10/11] only clear context if exit on the same thread as enter --- temporalio/contrib/opentelemetry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporalio/contrib/opentelemetry.py b/temporalio/contrib/opentelemetry.py index ff84abc79..c2a9de45b 100644 --- a/temporalio/contrib/opentelemetry.py +++ b/temporalio/contrib/opentelemetry.py @@ -198,7 +198,7 @@ def _start_as_current_span( ) raise finally: - if token: + if token and context is opentelemetry.context.get_current(): opentelemetry.context.detach(token) def _completed_workflow_span( From 41550051d6e293c0ca37c55cbe57e8f2080c9c99 Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Sat, 25 Oct 2025 12:33:49 +0200 Subject: [PATCH 11/11] removes global constant --- tests/contrib/test_opentelemetry.py | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/test_opentelemetry.py index 4968eb2da..fb4759be9 100644 --- a/tests/contrib/test_opentelemetry.py +++ b/tests/contrib/test_opentelemetry.py @@ -4,7 +4,6 @@ import gc import logging import queue -import sys import threading import uuid from concurrent.futures import ThreadPoolExecutor @@ -33,11 +32,6 @@ from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker from tests.helpers import LogCapturer -from tests.helpers.cache_eviction import ( - CacheEvictionTearDownWorkflow, - WaitForeverWorkflow, - wait_forever_activity, -) @dataclass @@ -744,14 +738,11 @@ async def run(self) -> None: ) -EXPECT_FAILURE = True - - @pytest.mark.parametrize( "activity,expect_failure", [ - (context_clear_noop_activity, not EXPECT_FAILURE), - (context_clear_exception_activity, EXPECT_FAILURE), + (context_clear_noop_activity, not True), + (context_clear_exception_activity, True), ], ) async def test_opentelemetry_context_restored_after_activity(