Skip to content

Commit 7a51529

Browse files
committed
Update test to use LogCapturer helper. Move shared test workflow and helpers to test/helpers
1 parent fc2a2e6 commit 7a51529

File tree

5 files changed

+153
-138
lines changed

5 files changed

+153
-138
lines changed

temporalio/contrib/opentelemetry.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
import opentelemetry.trace.propagation.tracecontext
2727
import opentelemetry.util.types
2828
from opentelemetry.context import Context
29-
from opentelemetry.trace import Span, SpanKind, Status, StatusCode, _Links
30-
from opentelemetry.util import types
29+
from opentelemetry.trace import Status, StatusCode
3130
from typing_extensions import Protocol, TypeAlias, TypedDict
3231

3332
import temporalio.activity
@@ -449,8 +448,7 @@ async def handle_query(self, input: temporalio.worker.HandleQueryInput) -> Any:
449448
)
450449
return await super().handle_query(input)
451450
finally:
452-
detach_context = opentelemetry.context.get_current()
453-
if detach_context is attach_context:
451+
if attach_context == opentelemetry.context.get_current():
454452
opentelemetry.context.detach(token)
455453

456454
def handle_update_validator(
@@ -541,8 +539,7 @@ def _top_level_workflow_context(
541539
kind=opentelemetry.trace.SpanKind.INTERNAL,
542540
)
543541

544-
detach_context = opentelemetry.context.get_current()
545-
if detach_context is attach_context:
542+
if attach_context == opentelemetry.context.get_current():
546543
opentelemetry.context.detach(token)
547544

548545
#

tests/contrib/test_opentelemetry.py

Lines changed: 30 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from datetime import timedelta
1010
from typing import Iterable, List, Optional
1111

12+
import opentelemetry.context
1213
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
1314
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
1415
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
@@ -22,11 +23,12 @@
2223
from temporalio.exceptions import ApplicationError, ApplicationErrorCategory
2324
from temporalio.testing import WorkflowEnvironment
2425
from temporalio.worker import UnsandboxedWorkflowRunner, Worker
25-
from tests.worker.test_workflow import (
26+
from tests.helpers.cache_evitction import (
2627
CacheEvictionTearDownWorkflow,
2728
WaitForeverWorkflow,
2829
wait_forever_activity,
2930
)
31+
from tests.helpers import LogCapturer
3032

3133
# Passing through because Python 3.9 has an import bug at
3234
# https://github.com/python/cpython/issues/91351
@@ -471,19 +473,6 @@ async def test_opentelemetry_safe_detach(client: Client):
471473
provider.add_span_processor(SimpleSpanProcessor(exporter))
472474
tracer = get_tracer(__name__, tracer_provider=provider)
473475

474-
class _OtelLogSpy(logging.Handler):
475-
def __init__(self, level: int | str = 0) -> None:
476-
self.seenOtelFailedMessage = False
477-
super().__init__(level)
478-
479-
def emit(self, record: logging.LogRecord) -> None:
480-
if not self.seenOtelFailedMessage:
481-
self.seenOtelFailedMessage = (
482-
record.levelno == logging.ERROR
483-
and record.name == "opentelemetry.context"
484-
and record.message == "Failed to detach context"
485-
)
486-
487476
async with Worker(
488477
client,
489478
workflows=[CacheEvictionTearDownWorkflow, WaitForeverWorkflow],
@@ -497,28 +486,34 @@ def emit(self, record: logging.LogRecord) -> None:
497486
old_hook = sys.unraisablehook
498487
hook_calls: List[sys.UnraisableHookArgs] = []
499488
sys.unraisablehook = hook_calls.append
500-
log_spy = _OtelLogSpy()
501-
logging.getLogger().addHandler(log_spy)
502-
try:
503-
handle = await client.start_workflow(
504-
CacheEvictionTearDownWorkflow.run,
505-
id=f"wf-{uuid.uuid4()}",
506-
task_queue=worker.task_queue,
507-
)
508489

509-
# CacheEvictionTearDownWorkflow requires 3 signals to be sent
510-
await handle.signal(CacheEvictionTearDownWorkflow.signal)
511-
await handle.signal(CacheEvictionTearDownWorkflow.signal)
512-
await handle.signal(CacheEvictionTearDownWorkflow.signal)
490+
with LogCapturer().logs_captured(opentelemetry.context.logger) as capturer:
491+
try:
492+
handle = await client.start_workflow(
493+
CacheEvictionTearDownWorkflow.run,
494+
id=f"wf-{uuid.uuid4()}",
495+
task_queue=worker.task_queue,
496+
)
497+
498+
# CacheEvictionTearDownWorkflow requires 3 signals to be sent
499+
await handle.signal(CacheEvictionTearDownWorkflow.signal)
500+
await handle.signal(CacheEvictionTearDownWorkflow.signal)
501+
await handle.signal(CacheEvictionTearDownWorkflow.signal)
513502

514-
await handle.result()
515-
finally:
516-
sys.unraisablehook = old_hook
517-
logging.getLogger().removeHandler(log_spy)
503+
await handle.result()
504+
finally:
505+
sys.unraisablehook = old_hook
518506

519-
# Confirm at least 1 exception
520-
assert hook_calls
507+
# Confirm at least 1 exception
508+
assert hook_calls
509+
510+
def otel_context_error(record: logging.LogRecord) -> bool:
511+
return (
512+
record.levelno == logging.ERROR
513+
and record.name == "opentelemetry.context"
514+
and record.message == "Failed to detach context"
515+
)
521516

522-
assert (
523-
not log_spy.seenOtelFailedMessage
524-
), "Detach from context message should not be logged"
517+
assert (
518+
capturer.find(otel_context_error) is None
519+
), "Detach from context message should not be logged"

tests/helpers/__init__.py

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,25 @@
11
import asyncio
2+
import logging
3+
import logging.handlers
4+
import queue
25
import socket
36
import time
47
import uuid
5-
from contextlib import closing
8+
from contextlib import closing, contextmanager
69
from dataclasses import dataclass
710
from datetime import datetime, timedelta, timezone
8-
from typing import Any, Awaitable, Callable, Optional, Sequence, Type, TypeVar, Union
11+
from typing import (
12+
Any,
13+
Awaitable,
14+
Callable,
15+
List,
16+
Optional,
17+
Sequence,
18+
Type,
19+
TypeVar,
20+
Union,
21+
cast,
22+
)
923

1024
from temporalio.api.common.v1 import WorkflowExecution
1125
from temporalio.api.enums.v1 import EventType as EventType
@@ -401,3 +415,34 @@ def _format_row(items: list[str], truncate: bool = False) -> str:
401415
padding = len(f" *: {elapsed_ms:>4} ")
402416
summary_row[col_idx] = f"{' ' * padding}[{summary}]"[: col_width - 3]
403417
print(_format_row(summary_row))
418+
419+
420+
class LogCapturer:
421+
def __init__(self) -> None:
422+
self.log_queue: queue.Queue[logging.LogRecord] = queue.Queue()
423+
424+
@contextmanager
425+
def logs_captured(self, *loggers: logging.Logger):
426+
handler = logging.handlers.QueueHandler(self.log_queue)
427+
428+
prev_levels = [l.level for l in loggers]
429+
for l in loggers:
430+
l.setLevel(logging.INFO)
431+
l.addHandler(handler)
432+
try:
433+
yield self
434+
finally:
435+
for i, l in enumerate(loggers):
436+
l.removeHandler(handler)
437+
l.setLevel(prev_levels[i])
438+
439+
def find_log(self, starts_with: str) -> Optional[logging.LogRecord]:
440+
return self.find(lambda l: l.message.startswith(starts_with))
441+
442+
def find(
443+
self, pred: Callable[[logging.LogRecord], bool]
444+
) -> Optional[logging.LogRecord]:
445+
for record in cast(List[logging.LogRecord], self.log_queue.queue):
446+
if pred(record):
447+
return record
448+
return None

tests/helpers/cache_evitction.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import asyncio
2+
from datetime import timedelta
3+
from temporalio import activity, workflow
4+
5+
6+
@activity.defn
7+
async def wait_forever_activity() -> None:
8+
await asyncio.Future()
9+
10+
11+
@workflow.defn
12+
class WaitForeverWorkflow:
13+
@workflow.run
14+
async def run(self) -> None:
15+
await asyncio.Future()
16+
17+
18+
@workflow.defn
19+
class CacheEvictionTearDownWorkflow:
20+
def __init__(self) -> None:
21+
self._signal_count = 0
22+
23+
@workflow.run
24+
async def run(self) -> None:
25+
# Start several things in background. This is just to show that eviction
26+
# can work even with these things running.
27+
tasks = [
28+
asyncio.create_task(
29+
workflow.execute_activity(
30+
wait_forever_activity, start_to_close_timeout=timedelta(hours=1)
31+
)
32+
),
33+
asyncio.create_task(
34+
workflow.execute_child_workflow(WaitForeverWorkflow.run)
35+
),
36+
asyncio.create_task(asyncio.sleep(1000)),
37+
asyncio.shield(
38+
workflow.execute_activity(
39+
wait_forever_activity, start_to_close_timeout=timedelta(hours=1)
40+
)
41+
),
42+
asyncio.create_task(workflow.wait_condition(lambda: False)),
43+
]
44+
gather_fut = asyncio.gather(*tasks, return_exceptions=True)
45+
# Let's also start something in the background that we never wait on
46+
asyncio.create_task(asyncio.sleep(1000))
47+
try:
48+
# Wait for signal count to reach 2
49+
await asyncio.sleep(0.01)
50+
await workflow.wait_condition(lambda: self._signal_count > 1)
51+
finally:
52+
# This finally, on eviction, is actually called but the command
53+
# should be ignored
54+
await asyncio.sleep(0.01)
55+
await workflow.wait_condition(lambda: self._signal_count > 2)
56+
# Cancel gather tasks and wait on them, but ignore the errors
57+
for task in tasks:
58+
task.cancel()
59+
await gather_fut
60+
61+
@workflow.signal
62+
async def signal(self) -> None:
63+
self._signal_count += 1
64+
65+
@workflow.query
66+
def signal_count(self) -> int:
67+
return self._signal_count

tests/worker/test_workflow.py

Lines changed: 6 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -143,11 +143,17 @@
143143
pause_and_assert,
144144
unpause_and_assert,
145145
workflow_update_exists,
146+
LogCapturer,
146147
)
147148
from tests.helpers.external_stack_trace import (
148149
ExternalStackTraceWorkflow,
149150
external_wait_cancel,
150151
)
152+
from tests.helpers.cache_evitction import (
153+
CacheEvictionTearDownWorkflow,
154+
WaitForeverWorkflow,
155+
wait_forever_activity,
156+
)
151157

152158
# Passing through because Python 3.9 has an import bug at
153159
# https://github.com/python/cpython/issues/91351
@@ -1996,37 +2002,6 @@ def last_signal(self) -> str:
19962002
return self._last_signal
19972003

19982004

1999-
class LogCapturer:
2000-
def __init__(self) -> None:
2001-
self.log_queue: queue.Queue[logging.LogRecord] = queue.Queue()
2002-
2003-
@contextmanager
2004-
def logs_captured(self, *loggers: logging.Logger):
2005-
handler = logging.handlers.QueueHandler(self.log_queue)
2006-
2007-
prev_levels = [l.level for l in loggers]
2008-
for l in loggers:
2009-
l.setLevel(logging.INFO)
2010-
l.addHandler(handler)
2011-
try:
2012-
yield self
2013-
finally:
2014-
for i, l in enumerate(loggers):
2015-
l.removeHandler(handler)
2016-
l.setLevel(prev_levels[i])
2017-
2018-
def find_log(self, starts_with: str) -> Optional[logging.LogRecord]:
2019-
return self.find(lambda l: l.message.startswith(starts_with))
2020-
2021-
def find(
2022-
self, pred: Callable[[logging.LogRecord], bool]
2023-
) -> Optional[logging.LogRecord]:
2024-
for record in cast(List[logging.LogRecord], self.log_queue.queue):
2025-
if pred(record):
2026-
return record
2027-
return None
2028-
2029-
20302005
async def test_workflow_logging(client: Client, env: WorkflowEnvironment):
20312006
workflow.logger.full_workflow_info_on_extra = True
20322007
with LogCapturer().logs_captured(
@@ -3739,70 +3714,6 @@ async def test_manual_result_type(client: Client):
37393714
assert res4 == ManualResultType(some_string="from-query")
37403715

37413716

3742-
@activity.defn
3743-
async def wait_forever_activity() -> None:
3744-
await asyncio.Future()
3745-
3746-
3747-
@workflow.defn
3748-
class WaitForeverWorkflow:
3749-
@workflow.run
3750-
async def run(self) -> None:
3751-
await asyncio.Future()
3752-
3753-
3754-
@workflow.defn
3755-
class CacheEvictionTearDownWorkflow:
3756-
def __init__(self) -> None:
3757-
self._signal_count = 0
3758-
3759-
@workflow.run
3760-
async def run(self) -> None:
3761-
# Start several things in background. This is just to show that eviction
3762-
# can work even with these things running.
3763-
tasks = [
3764-
asyncio.create_task(
3765-
workflow.execute_activity(
3766-
wait_forever_activity, start_to_close_timeout=timedelta(hours=1)
3767-
)
3768-
),
3769-
asyncio.create_task(
3770-
workflow.execute_child_workflow(WaitForeverWorkflow.run)
3771-
),
3772-
asyncio.create_task(asyncio.sleep(1000)),
3773-
asyncio.shield(
3774-
workflow.execute_activity(
3775-
wait_forever_activity, start_to_close_timeout=timedelta(hours=1)
3776-
)
3777-
),
3778-
asyncio.create_task(workflow.wait_condition(lambda: False)),
3779-
]
3780-
gather_fut = asyncio.gather(*tasks, return_exceptions=True)
3781-
# Let's also start something in the background that we never wait on
3782-
asyncio.create_task(asyncio.sleep(1000))
3783-
try:
3784-
# Wait for signal count to reach 2
3785-
await asyncio.sleep(0.01)
3786-
await workflow.wait_condition(lambda: self._signal_count > 1)
3787-
finally:
3788-
# This finally, on eviction, is actually called but the command
3789-
# should be ignored
3790-
await asyncio.sleep(0.01)
3791-
await workflow.wait_condition(lambda: self._signal_count > 2)
3792-
# Cancel gather tasks and wait on them, but ignore the errors
3793-
for task in tasks:
3794-
task.cancel()
3795-
await gather_fut
3796-
3797-
@workflow.signal
3798-
async def signal(self) -> None:
3799-
self._signal_count += 1
3800-
3801-
@workflow.query
3802-
def signal_count(self) -> int:
3803-
return self._signal_count
3804-
3805-
38063717
async def test_cache_eviction_tear_down(client: Client):
38073718
# This test simulates forcing eviction. This used to raise GeneratorExit on
38083719
# GC which triggered the finally which could run on any thread Python

0 commit comments

Comments
 (0)