diff --git a/temporalio/contrib/opentelemetry.py b/temporalio/contrib/opentelemetry.py index 380b666dc..04d40d544 100644 --- a/temporalio/contrib/opentelemetry.py +++ b/temporalio/contrib/opentelemetry.py @@ -25,6 +25,9 @@ import opentelemetry.trace import opentelemetry.trace.propagation.tracecontext import opentelemetry.util.types +from opentelemetry.context import Context +from opentelemetry.trace import Span, SpanKind, Status, StatusCode, _Links +from opentelemetry.util import types from typing_extensions import Protocol, TypeAlias, TypedDict import temporalio.activity @@ -34,6 +37,7 @@ import temporalio.exceptions import temporalio.worker import temporalio.workflow +from temporalio.exceptions import ApplicationError, ApplicationErrorCategory # OpenTelemetry dynamically, lazily chooses its context implementation at # runtime. When first accessed, they use pkg_resources.iter_entry_points + load. @@ -167,11 +171,31 @@ def _start_as_current_span( attributes: opentelemetry.util.types.Attributes, input: Optional[_InputWithHeaders] = None, kind: opentelemetry.trace.SpanKind, + context: Optional[Context] = None, ) -> Iterator[None]: - with self.tracer.start_as_current_span(name, attributes=attributes, kind=kind): + with self.tracer.start_as_current_span( + name, + attributes=attributes, + kind=kind, + context=context, + set_status_on_exception=False, + ) as span: if input: input.headers = self._context_to_headers(input.headers) - yield None + try: + yield None + except Exception as exc: + if ( + not isinstance(exc, ApplicationError) + or exc.category != ApplicationErrorCategory.BENIGN + ): + span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{type(exc).__name__}: {exc}", + ) + ) + raise def _completed_workflow_span( self, params: _CompletedWorkflowSpanParams @@ -282,7 +306,7 @@ async def execute_activity( self, input: temporalio.worker.ExecuteActivityInput ) -> Any: info = temporalio.activity.info() - with self.root.tracer.start_as_current_span( + with self.root._start_as_current_span( f"RunActivity:{info.activity_type}", context=self.root._context_from_headers(input.headers), attributes={ diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/test_opentelemetry.py index e42e6b977..0b797f606 100644 --- a/tests/contrib/test_opentelemetry.py +++ b/tests/contrib/test_opentelemetry.py @@ -3,6 +3,7 @@ import asyncio import logging import uuid +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import timedelta from typing import Iterable, List, Optional @@ -10,13 +11,14 @@ from opentelemetry.sdk.trace import ReadableSpan, TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter -from opentelemetry.trace import get_tracer +from opentelemetry.trace import StatusCode, get_tracer from temporalio import activity, workflow from temporalio.client import Client from temporalio.common import RetryPolicy from temporalio.contrib.opentelemetry import TracingInterceptor from temporalio.contrib.opentelemetry import workflow as otel_workflow +from temporalio.exceptions import ApplicationError, ApplicationErrorCategory from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker @@ -386,6 +388,60 @@ async def test_opentelemetry_always_create_workflow_spans(client: Client): assert spans[0].name == "RunWorkflow:SimpleWorkflow" +attempted = False + + +@activity.defn +def benign_activity() -> str: + global attempted + if attempted: + return "done" + attempted = True + raise ApplicationError( + category=ApplicationErrorCategory.BENIGN, message="Benign Error" + ) + + +@workflow.defn +class BenignWorkflow: + @workflow.run + async def run(self) -> str: + return await workflow.execute_activity( + benign_activity, schedule_to_close_timeout=timedelta(seconds=1) + ) + + +async def test_opentelemetry_benign_exception(client: Client): + # Create a tracer that has an in-memory exporter + exporter = InMemorySpanExporter() + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + tracer = get_tracer(__name__, tracer_provider=provider) + + # Create new client with tracer interceptor + client_config = client.config() + client_config["interceptors"] = [TracingInterceptor(tracer)] + client = Client(**client_config) + + async with Worker( + client, + task_queue=f"task_queue_{uuid.uuid4()}", + workflows=[BenignWorkflow], + activities=[benign_activity], + activity_executor=ThreadPoolExecutor(max_workers=1), + ) as worker: + assert "done" == await client.execute_workflow( + BenignWorkflow.run, + id=f"workflow_{uuid.uuid4()}", + task_queue=worker.task_queue, + retry_policy=RetryPolicy( + maximum_attempts=2, initial_interval=timedelta(milliseconds=10) + ), + ) + spans = exporter.get_finished_spans() + assert all(span.status.status_code == StatusCode.UNSET for span in spans) + + # TODO(cretz): Additional tests to write # * query without interceptor (no headers) # * workflow without interceptor (no headers) but query with interceptor (headers)