Skip to content

Commit 9a802bb

Browse files
authored
Merge branch 'main' into patch-1
2 parents 6275094 + 5994a45 commit 9a802bb

File tree

6 files changed

+208
-107
lines changed

6 files changed

+208
-107
lines changed

temporalio/contrib/opentelemetry.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
import opentelemetry.trace.propagation.tracecontext
2727
import opentelemetry.util.types
2828
from opentelemetry.context import Context
29-
from opentelemetry.trace import Span, SpanKind, Status, StatusCode, _Links
30-
from opentelemetry.util import types
29+
from opentelemetry.trace import Status, StatusCode
3130
from typing_extensions import Protocol, TypeAlias, TypedDict
3231

3332
import temporalio.activity
@@ -473,7 +472,12 @@ async def handle_query(self, input: temporalio.worker.HandleQueryInput) -> Any:
473472
)
474473
return await super().handle_query(input)
475474
finally:
476-
opentelemetry.context.detach(token)
475+
# In some exceptional cases this finally is executed with a
476+
# different contextvars.Context than the one the token was created
477+
# on. As such we do a best effort detach to avoid using a mismatched
478+
# token.
479+
if context is opentelemetry.context.get_current():
480+
opentelemetry.context.detach(token)
477481

478482
def handle_update_validator(
479483
self, input: temporalio.worker.HandleUpdateInput
@@ -545,6 +549,7 @@ def _top_level_workflow_context(
545549
exception: Optional[Exception] = None
546550
# Run under this context
547551
token = opentelemetry.context.attach(context)
552+
548553
try:
549554
yield None
550555
success = True
@@ -561,7 +566,13 @@ def _top_level_workflow_context(
561566
exception=exception,
562567
kind=opentelemetry.trace.SpanKind.INTERNAL,
563568
)
564-
opentelemetry.context.detach(token)
569+
570+
# In some exceptional cases this finally is executed with a
571+
# different contextvars.Context than the one the token was created
572+
# on. As such we do a best effort detach to avoid using a mismatched
573+
# token.
574+
if context is opentelemetry.context.get_current():
575+
opentelemetry.context.detach(token)
565576

566577
def _context_to_headers(
567578
self, headers: Mapping[str, temporalio.api.common.v1.Payload]

temporalio/worker/_worker.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,9 +203,11 @@ def __init__(
203203
interceptors already on the client that also implement
204204
:py:class:`Interceptor` are prepended to this list and should
205205
not be explicitly given here.
206-
build_id: Unique identifier for the current runtime. This is best
207-
set as a hash of all code and should change only when code does.
208-
If unset, a best-effort identifier is generated.
206+
build_id: A unique identifier for the current runtime, ideally provided as a
207+
representation of the complete source code. If not explicitly set, the system
208+
automatically generates a best-effort identifier by traversing and computing
209+
hashes of all modules in the codebase. In very large codebases this automatic
210+
process may significantly increase initialization time.
209211
Exclusive with `deployment_config`.
210212
WARNING: Deprecated. Use `deployment_config` instead.
211213
identity: Identity for this worker client. If unset, the client

tests/contrib/test_opentelemetry.py

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import gc
45
import logging
6+
import queue
7+
import sys
8+
import threading
59
import uuid
610
from concurrent.futures import ThreadPoolExecutor
711
from dataclasses import dataclass
812
from datetime import timedelta
913
from typing import Iterable, List, Optional
1014

15+
import opentelemetry.context
1116
import pytest
1217
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
1318
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
@@ -17,11 +22,20 @@
1722
from temporalio import activity, workflow
1823
from temporalio.client import Client, WithStartWorkflowOperation, WorkflowUpdateStage
1924
from temporalio.common import RetryPolicy, WorkflowIDConflictPolicy
20-
from temporalio.contrib.opentelemetry import TracingInterceptor
25+
from temporalio.contrib.opentelemetry import (
26+
TracingInterceptor,
27+
TracingWorkflowInboundInterceptor,
28+
)
2129
from temporalio.contrib.opentelemetry import workflow as otel_workflow
2230
from temporalio.exceptions import ApplicationError, ApplicationErrorCategory
2331
from temporalio.testing import WorkflowEnvironment
2432
from temporalio.worker import UnsandboxedWorkflowRunner, Worker
33+
from tests.helpers import LogCapturer
34+
from tests.helpers.cache_eviction import (
35+
CacheEvictionTearDownWorkflow,
36+
WaitForeverWorkflow,
37+
wait_forever_activity,
38+
)
2539

2640

2741
@dataclass
@@ -420,7 +434,10 @@ def dump_spans(
420434
span_links: List[str] = []
421435
for link in span.links:
422436
for link_span in spans:
423-
if link_span.context.span_id == link.context.span_id:
437+
if (
438+
link_span.context is not None
439+
and link_span.context.span_id == link.context.span_id
440+
):
424441
span_links.append(link_span.name)
425442
span_str += f" (links: {', '.join(span_links)})"
426443
# Signals can duplicate in rare situations, so we make sure not to
@@ -430,7 +447,7 @@ def dump_spans(
430447
ret.append(span_str)
431448
ret += dump_spans(
432449
spans,
433-
parent_id=span.context.span_id,
450+
parent_id=span.context.span_id if span.context else None,
434451
with_attributes=with_attributes,
435452
indent_depth=indent_depth + 1,
436453
)
@@ -547,3 +564,50 @@ async def test_opentelemetry_benign_exception(client: Client):
547564
# * workflow failure and wft failure
548565
# * signal with start
549566
# * signal failure and wft failure from signal
567+
568+
569+
def test_opentelemetry_safe_detach():
570+
class _fake_self:
571+
def _load_workflow_context_carrier(*args):
572+
return None
573+
574+
def _set_on_context(self, ctx):
575+
return opentelemetry.context.set_value("test-key", "test-value", ctx)
576+
577+
def _completed_span(*args, **kwargs):
578+
pass
579+
580+
# create a context manager and force enter to happen on this thread
581+
context_manager = TracingWorkflowInboundInterceptor._top_level_workflow_context(
582+
_fake_self(), # type: ignore
583+
success_is_complete=True,
584+
)
585+
context_manager.__enter__()
586+
587+
# move reference to context manager into queue
588+
q: queue.Queue = queue.Queue()
589+
q.put(context_manager)
590+
del context_manager
591+
592+
def worker():
593+
# pull reference from queue and delete the last reference
594+
context_manager = q.get()
595+
del context_manager
596+
# force gc
597+
gc.collect()
598+
599+
with LogCapturer().logs_captured(opentelemetry.context.logger) as capturer:
600+
# run forced gc on other thread so exit happens there
601+
t = threading.Thread(target=worker)
602+
t.start()
603+
t.join(timeout=5)
604+
605+
def otel_context_error(record: logging.LogRecord) -> bool:
606+
return (
607+
record.name == "opentelemetry.context"
608+
and "Failed to detach context" in record.message
609+
)
610+
611+
assert (
612+
capturer.find(otel_context_error) is None
613+
), "Detach from context message should not be logged"

tests/helpers/__init__.py

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,25 @@
11
import asyncio
2+
import logging
3+
import logging.handlers
4+
import queue
25
import socket
36
import time
47
import uuid
5-
from contextlib import closing
8+
from contextlib import closing, contextmanager
69
from dataclasses import dataclass
710
from datetime import datetime, timedelta, timezone
8-
from typing import Any, Awaitable, Callable, Optional, Sequence, Type, TypeVar, Union
11+
from typing import (
12+
Any,
13+
Awaitable,
14+
Callable,
15+
List,
16+
Optional,
17+
Sequence,
18+
Type,
19+
TypeVar,
20+
Union,
21+
cast,
22+
)
923

1024
from temporalio.api.common.v1 import WorkflowExecution
1125
from temporalio.api.enums.v1 import EventType as EventType
@@ -401,3 +415,34 @@ def _format_row(items: list[str], truncate: bool = False) -> str:
401415
padding = len(f" *: {elapsed_ms:>4} ")
402416
summary_row[col_idx] = f"{' ' * padding}[{summary}]"[: col_width - 3]
403417
print(_format_row(summary_row))
418+
419+
420+
class LogCapturer:
421+
def __init__(self) -> None:
422+
self.log_queue: queue.Queue[logging.LogRecord] = queue.Queue()
423+
424+
@contextmanager
425+
def logs_captured(self, *loggers: logging.Logger):
426+
handler = logging.handlers.QueueHandler(self.log_queue)
427+
428+
prev_levels = [l.level for l in loggers]
429+
for l in loggers:
430+
l.setLevel(logging.INFO)
431+
l.addHandler(handler)
432+
try:
433+
yield self
434+
finally:
435+
for i, l in enumerate(loggers):
436+
l.removeHandler(handler)
437+
l.setLevel(prev_levels[i])
438+
439+
def find_log(self, starts_with: str) -> Optional[logging.LogRecord]:
440+
return self.find(lambda l: l.message.startswith(starts_with))
441+
442+
def find(
443+
self, pred: Callable[[logging.LogRecord], bool]
444+
) -> Optional[logging.LogRecord]:
445+
for record in cast(List[logging.LogRecord], self.log_queue.queue):
446+
if pred(record):
447+
return record
448+
return None

tests/helpers/cache_eviction.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import asyncio
2+
from datetime import timedelta
3+
4+
from temporalio import activity, workflow
5+
6+
7+
@activity.defn
8+
async def wait_forever_activity() -> None:
9+
await asyncio.Future()
10+
11+
12+
@workflow.defn
13+
class WaitForeverWorkflow:
14+
@workflow.run
15+
async def run(self) -> None:
16+
await asyncio.Future()
17+
18+
19+
@workflow.defn
20+
class CacheEvictionTearDownWorkflow:
21+
def __init__(self) -> None:
22+
self._signal_count = 0
23+
24+
@workflow.run
25+
async def run(self) -> None:
26+
# Start several things in background. This is just to show that eviction
27+
# can work even with these things running.
28+
tasks = [
29+
asyncio.create_task(
30+
workflow.execute_activity(
31+
wait_forever_activity, start_to_close_timeout=timedelta(hours=1)
32+
)
33+
),
34+
asyncio.create_task(
35+
workflow.execute_child_workflow(WaitForeverWorkflow.run)
36+
),
37+
asyncio.create_task(asyncio.sleep(1000)),
38+
asyncio.shield(
39+
workflow.execute_activity(
40+
wait_forever_activity, start_to_close_timeout=timedelta(hours=1)
41+
)
42+
),
43+
asyncio.create_task(workflow.wait_condition(lambda: False)),
44+
]
45+
gather_fut = asyncio.gather(*tasks, return_exceptions=True)
46+
# Let's also start something in the background that we never wait on
47+
asyncio.create_task(asyncio.sleep(1000))
48+
try:
49+
# Wait for signal count to reach 2
50+
await asyncio.sleep(0.01)
51+
await workflow.wait_condition(lambda: self._signal_count > 1)
52+
finally:
53+
# This finally, on eviction, is actually called but the command
54+
# should be ignored
55+
await asyncio.sleep(0.01)
56+
await workflow.wait_condition(lambda: self._signal_count > 2)
57+
# Cancel gather tasks and wait on them, but ignore the errors
58+
for task in tasks:
59+
task.cancel()
60+
await gather_fut
61+
62+
@workflow.signal
63+
async def signal(self) -> None:
64+
self._signal_count += 1
65+
66+
@workflow.query
67+
def signal_count(self) -> int:
68+
return self._signal_count

0 commit comments

Comments
 (0)