Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
fc2a2e6
ensure that the context used to detach the token is the same as what …
VegetarianOrc Oct 11, 2025
7a51529
Update test to use LogCapturer helper. Move shared test workflow and …
VegetarianOrc Oct 13, 2025
1b8d9e1
run formatter
VegetarianOrc Oct 13, 2025
b656e84
Fix up test with log capturer. Only log warnings if there are no hook…
VegetarianOrc Oct 13, 2025
5abc5a4
run formatter
VegetarianOrc Oct 13, 2025
d1bf879
Move to a class based context manager to manually manage context
VegetarianOrc Oct 14, 2025
6ca781c
apply context management to openai_agents tracing
VegetarianOrc Oct 16, 2025
6060b35
use 3.11 create_task when possible to avoid an extra context copy
VegetarianOrc Oct 16, 2025
13e99a3
run formatter
VegetarianOrc Oct 16, 2025
853f603
fix typing lint errors
VegetarianOrc Oct 16, 2025
7ae97d0
Fix a few more typing errors
VegetarianOrc Oct 16, 2025
aaccb00
Use a lambda to wrap task creation to appease the type linter
VegetarianOrc Oct 16, 2025
6deb5b1
Merge branch 'main' into otel/context-detach
VegetarianOrc Oct 16, 2025
fe8b0a5
revert manual context management explorations
VegetarianOrc Oct 17, 2025
2274ba4
Add comment explaining the check. Use to ensure that the context is …
VegetarianOrc Oct 17, 2025
460ed71
use original variable name
VegetarianOrc Oct 17, 2025
83ae1fb
Fix typo
VegetarianOrc Oct 17, 2025
86b6951
Merge branch 'main' into otel/context-detach
VegetarianOrc Oct 20, 2025
1146a8c
merge main
VegetarianOrc Oct 20, 2025
da45161
Merge branch 'main' into otel/context-detach
VegetarianOrc Oct 20, 2025
c053aa6
Add some logs to help debug test flaking with timeouts
VegetarianOrc Oct 21, 2025
6e23365
apply formatting
VegetarianOrc Oct 21, 2025
db3bb0e
Revert "apply formatting"
VegetarianOrc Oct 21, 2025
50cbd42
Revert "Add some logs to help debug test flaking with timeouts"
VegetarianOrc Oct 21, 2025
c806d6b
move safe detach test to a model that forces __exit__ on a different …
VegetarianOrc Oct 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions temporalio/contrib/opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
import opentelemetry.trace.propagation.tracecontext
import opentelemetry.util.types
from opentelemetry.context import Context
from opentelemetry.trace import Span, SpanKind, Status, StatusCode, _Links
from opentelemetry.util import types
from opentelemetry.trace import Status, StatusCode
from typing_extensions import Protocol, TypeAlias, TypedDict

import temporalio.activity
Expand Down Expand Up @@ -473,7 +472,12 @@ async def handle_query(self, input: temporalio.worker.HandleQueryInput) -> Any:
)
return await super().handle_query(input)
finally:
opentelemetry.context.detach(token)
# In some exceptional cases this finally is executed with a
# different contextvars.Context than the one the token was created
# on. As such we do a best effort detach to avoid using a mismatched
# token.
if context is opentelemetry.context.get_current():
opentelemetry.context.detach(token)

def handle_update_validator(
self, input: temporalio.worker.HandleUpdateInput
Expand Down Expand Up @@ -545,6 +549,7 @@ def _top_level_workflow_context(
exception: Optional[Exception] = None
# Run under this context
token = opentelemetry.context.attach(context)

try:
yield None
success = True
Expand All @@ -561,7 +566,13 @@ def _top_level_workflow_context(
exception=exception,
kind=opentelemetry.trace.SpanKind.INTERNAL,
)
opentelemetry.context.detach(token)

# In some exceptional cases this finally is executed with a
# different contextvars.Context than the one the token was created
# on. As such we do a best effort detach to avoid using a mismatched
# token.
if context is opentelemetry.context.get_current():
opentelemetry.context.detach(token)

def _context_to_headers(
self, headers: Mapping[str, temporalio.api.common.v1.Payload]
Expand Down
70 changes: 67 additions & 3 deletions tests/contrib/test_opentelemetry.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
from __future__ import annotations

import asyncio
import gc
import logging
import queue
import sys
import threading
import uuid
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import timedelta
from typing import Iterable, List, Optional

import opentelemetry.context
import pytest
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
Expand All @@ -17,11 +22,20 @@
from temporalio import activity, workflow
from temporalio.client import Client, WithStartWorkflowOperation, WorkflowUpdateStage
from temporalio.common import RetryPolicy, WorkflowIDConflictPolicy
from temporalio.contrib.opentelemetry import TracingInterceptor
from temporalio.contrib.opentelemetry import (
TracingInterceptor,
TracingWorkflowInboundInterceptor,
)
from temporalio.contrib.opentelemetry import workflow as otel_workflow
from temporalio.exceptions import ApplicationError, ApplicationErrorCategory
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import UnsandboxedWorkflowRunner, Worker
from tests.helpers import LogCapturer
from tests.helpers.cache_eviction import (
CacheEvictionTearDownWorkflow,
WaitForeverWorkflow,
wait_forever_activity,
)


@dataclass
Expand Down Expand Up @@ -420,7 +434,10 @@ def dump_spans(
span_links: List[str] = []
for link in span.links:
for link_span in spans:
if link_span.context.span_id == link.context.span_id:
if (
link_span.context is not None
and link_span.context.span_id == link.context.span_id
):
span_links.append(link_span.name)
span_str += f" (links: {', '.join(span_links)})"
# Signals can duplicate in rare situations, so we make sure not to
Expand All @@ -430,7 +447,7 @@ def dump_spans(
ret.append(span_str)
ret += dump_spans(
spans,
parent_id=span.context.span_id,
parent_id=span.context.span_id if span.context else None,
with_attributes=with_attributes,
indent_depth=indent_depth + 1,
)
Expand Down Expand Up @@ -547,3 +564,50 @@ async def test_opentelemetry_benign_exception(client: Client):
# * workflow failure and wft failure
# * signal with start
# * signal failure and wft failure from signal


def test_opentelemetry_safe_detach():
class _fake_self:
def _load_workflow_context_carrier(*args):
return None

def _set_on_context(self, ctx):
return opentelemetry.context.set_value("test-key", "test-value", ctx)

def _completed_span(*args, **kwargs):
pass

# create a context manager and force enter to happen on this thread
context_manager = TracingWorkflowInboundInterceptor._top_level_workflow_context(
_fake_self(), # type: ignore
success_is_complete=True,
)
context_manager.__enter__()

# move reference to context manager into queue
q: queue.Queue = queue.Queue()
q.put(context_manager)
del context_manager

def worker():
# pull reference from queue and delete the last reference
context_manager = q.get()
del context_manager
# force gc
gc.collect()

with LogCapturer().logs_captured(opentelemetry.context.logger) as capturer:
# run forced gc on other thread so exit happens there
t = threading.Thread(target=worker)
t.start()
t.join(timeout=5)

def otel_context_error(record: logging.LogRecord) -> bool:
return (
record.name == "opentelemetry.context"
and "Failed to detach context" in record.message
)

assert (
capturer.find(otel_context_error) is None
), "Detach from context message should not be logged"
49 changes: 47 additions & 2 deletions tests/helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
import asyncio
import logging
import logging.handlers
import queue
import socket
import time
import uuid
from contextlib import closing
from contextlib import closing, contextmanager
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Awaitable, Callable, Optional, Sequence, Type, TypeVar, Union
from typing import (
Any,
Awaitable,
Callable,
List,
Optional,
Sequence,
Type,
TypeVar,
Union,
cast,
)

from temporalio.api.common.v1 import WorkflowExecution
from temporalio.api.enums.v1 import EventType as EventType
Expand Down Expand Up @@ -401,3 +415,34 @@ def _format_row(items: list[str], truncate: bool = False) -> str:
padding = len(f" *: {elapsed_ms:>4} ")
summary_row[col_idx] = f"{' ' * padding}[{summary}]"[: col_width - 3]
print(_format_row(summary_row))


class LogCapturer:
def __init__(self) -> None:
self.log_queue: queue.Queue[logging.LogRecord] = queue.Queue()

@contextmanager
def logs_captured(self, *loggers: logging.Logger):
handler = logging.handlers.QueueHandler(self.log_queue)

prev_levels = [l.level for l in loggers]
for l in loggers:
l.setLevel(logging.INFO)
l.addHandler(handler)
try:
yield self
finally:
for i, l in enumerate(loggers):
l.removeHandler(handler)
l.setLevel(prev_levels[i])

def find_log(self, starts_with: str) -> Optional[logging.LogRecord]:
return self.find(lambda l: l.message.startswith(starts_with))

def find(
self, pred: Callable[[logging.LogRecord], bool]
) -> Optional[logging.LogRecord]:
for record in cast(List[logging.LogRecord], self.log_queue.queue):
if pred(record):
return record
return None
68 changes: 68 additions & 0 deletions tests/helpers/cache_eviction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import asyncio
from datetime import timedelta

from temporalio import activity, workflow


@activity.defn
async def wait_forever_activity() -> None:
await asyncio.Future()


@workflow.defn
class WaitForeverWorkflow:
@workflow.run
async def run(self) -> None:
await asyncio.Future()


@workflow.defn
class CacheEvictionTearDownWorkflow:
def __init__(self) -> None:
self._signal_count = 0

@workflow.run
async def run(self) -> None:
# Start several things in background. This is just to show that eviction
# can work even with these things running.
tasks = [
asyncio.create_task(
workflow.execute_activity(
wait_forever_activity, start_to_close_timeout=timedelta(hours=1)
)
),
asyncio.create_task(
workflow.execute_child_workflow(WaitForeverWorkflow.run)
),
asyncio.create_task(asyncio.sleep(1000)),
asyncio.shield(
workflow.execute_activity(
wait_forever_activity, start_to_close_timeout=timedelta(hours=1)
)
),
asyncio.create_task(workflow.wait_condition(lambda: False)),
]
gather_fut = asyncio.gather(*tasks, return_exceptions=True)
# Let's also start something in the background that we never wait on
asyncio.create_task(asyncio.sleep(1000))
try:
# Wait for signal count to reach 2
await asyncio.sleep(0.01)
await workflow.wait_condition(lambda: self._signal_count > 1)
finally:
# This finally, on eviction, is actually called but the command
# should be ignored
await asyncio.sleep(0.01)
await workflow.wait_condition(lambda: self._signal_count > 2)
# Cancel gather tasks and wait on them, but ignore the errors
for task in tasks:
task.cancel()
await gather_fut

@workflow.signal
async def signal(self) -> None:
self._signal_count += 1

@workflow.query
def signal_count(self) -> int:
return self._signal_count
Loading
Loading