Skip to content

Commit 3247f71

Browse files
committed
Experiment with new tracer
1 parent 5e9b2ba commit 3247f71

File tree

2 files changed

+95
-2
lines changed

2 files changed

+95
-2
lines changed

temporalio/contrib/opentelemetry.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import opentelemetry.trace
2626
import opentelemetry.trace.propagation.tracecontext
2727
import opentelemetry.util.types
28+
from opentelemetry.context import Context
29+
from opentelemetry.trace import SpanKind, _Links, Span, StatusCode, Status
30+
from opentelemetry.util import types
2831
from typing_extensions import Protocol, TypeAlias, TypedDict
2932

3033
import temporalio.activity
@@ -34,6 +37,7 @@
3437
import temporalio.exceptions
3538
import temporalio.worker
3639
import temporalio.workflow
40+
from temporalio.exceptions import ApplicationError, ApplicationErrorCategory
3741

3842
# OpenTelemetry dynamically, lazily chooses its context implementation at
3943
# runtime. When first accessed, they use pkg_resources.iter_entry_points + load.
@@ -661,6 +665,50 @@ def start_local_activity(
661665
return super().start_local_activity(input)
662666

663667

668+
class TemporalTracer(opentelemetry.trace.Tracer):
669+
def __init__(self, tracer: opentelemetry.trace.Tracer):
670+
self._tracer = tracer
671+
super().__init__()
672+
673+
def start_span(self, name: str, context: Optional[Context] = None, kind: SpanKind = SpanKind.INTERNAL,
674+
attributes: types.Attributes = None, links: _Links = None, start_time: Optional[int] = None,
675+
record_exception: bool = True, set_status_on_exception: bool = True) -> Span:
676+
return self._tracer.start_span(name, context, kind, attributes, links, start_time, record_exception, set_status_on_exception)
677+
678+
@staticmethod
679+
def handle_exception(exc: Exception, span: Span, record_exception: bool, set_status_on_exception: bool) -> None:
680+
if record_exception:
681+
span.record_exception(exc)
682+
683+
# Set status in case exception was raised
684+
if set_status_on_exception:
685+
span.set_status(
686+
Status(
687+
status_code=StatusCode.ERROR,
688+
description=f"{type(exc).__name__}: {exc}",
689+
)
690+
)
691+
692+
@contextmanager
693+
def start_as_current_span(self, name: str, context: Optional[Context] = None, kind: SpanKind = SpanKind.INTERNAL,
694+
attributes: types.Attributes = None, links: _Links = None,
695+
start_time: Optional[int] = None, record_exception: bool = True,
696+
set_status_on_exception: bool = True, end_on_exit: bool = True) -> Iterator[Span]:
697+
698+
with self._tracer.start_as_current_span(name, context, kind, attributes, links, start_time, False, False, end_on_exit) as span:
699+
try:
700+
yield span
701+
702+
# TODO: Catch base exception and handle cancellation errors
703+
except ApplicationError as exc:
704+
if exc.category != ApplicationErrorCategory.BENIGN:
705+
self.handle_exception(exc, span, record_exception, set_status_on_exception)
706+
raise
707+
except Exception as exc:
708+
self.handle_exception(exc, span, record_exception, set_status_on_exception)
709+
raise
710+
711+
664712
class workflow:
665713
"""Contains static methods that are safe to call from within a workflow.
666714

tests/contrib/test_opentelemetry.py

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,22 @@
33
import asyncio
44
import logging
55
import uuid
6+
from concurrent.futures import ThreadPoolExecutor
67
from dataclasses import dataclass
78
from datetime import timedelta
89
from typing import Iterable, List, Optional
910

1011
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
1112
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
1213
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
13-
from opentelemetry.trace import get_tracer
14+
from opentelemetry.trace import get_tracer, StatusCode
1415

1516
from temporalio import activity, workflow
1617
from temporalio.client import Client
1718
from temporalio.common import RetryPolicy
18-
from temporalio.contrib.opentelemetry import TracingInterceptor
19+
from temporalio.contrib.opentelemetry import TracingInterceptor, TemporalTracer
1920
from temporalio.contrib.opentelemetry import workflow as otel_workflow
21+
from temporalio.exceptions import ApplicationError, ApplicationErrorCategory
2022
from temporalio.testing import WorkflowEnvironment
2123
from temporalio.worker import UnsandboxedWorkflowRunner, Worker
2224

@@ -386,6 +388,49 @@ async def test_opentelemetry_always_create_workflow_spans(client: Client):
386388
assert spans[0].name == "RunWorkflow:SimpleWorkflow"
387389

388390

391+
attempted = False
392+
@activity.defn
393+
def benign_activity() -> str:
394+
global attempted
395+
if attempted:
396+
return "done"
397+
print("Raising")
398+
attempted = True
399+
raise ApplicationError(category=ApplicationErrorCategory.BENIGN, message="Benign Error")
400+
401+
@workflow.defn
402+
class BenignWorkflow:
403+
@workflow.run
404+
async def run(self) -> str:
405+
return await workflow.execute_activity(benign_activity, schedule_to_close_timeout=timedelta(seconds=1))
406+
407+
async def test_opentelemetry_exception_tracing(client: Client):
408+
# Create a tracer that has an in-memory exporter
409+
exporter = InMemorySpanExporter()
410+
provider = TracerProvider()
411+
provider.add_span_processor(SimpleSpanProcessor(exporter))
412+
tracer = TemporalTracer(get_tracer(__name__, tracer_provider=provider))
413+
# Create new client with tracer interceptor
414+
client_config = client.config()
415+
client_config["interceptors"] = [TracingInterceptor(tracer)]
416+
client = Client(**client_config)
417+
418+
async with Worker(
419+
client,
420+
task_queue=f"task_queue_{uuid.uuid4()}",
421+
workflows=[BenignWorkflow],
422+
activities=[benign_activity],
423+
activity_executor=ThreadPoolExecutor(max_workers=1),
424+
) as worker:
425+
assert "done" == await client.execute_workflow(
426+
BenignWorkflow.run,
427+
id=f"workflow_{uuid.uuid4()}",
428+
task_queue=worker.task_queue,
429+
retry_policy=RetryPolicy(maximum_attempts=2, initial_interval=timedelta(milliseconds=10)),
430+
)
431+
spans = exporter.get_finished_spans()
432+
assert all(span.status.status_code == StatusCode.UNSET for span in spans)
433+
389434
# TODO(cretz): Additional tests to write
390435
# * query without interceptor (no headers)
391436
# * workflow without interceptor (no headers) but query with interceptor (headers)

0 commit comments

Comments
 (0)