|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
3 | 3 | import asyncio |
| 4 | +import gc |
4 | 5 | import logging |
| 6 | +import queue |
5 | 7 | import sys |
| 8 | +import threading |
6 | 9 | import uuid |
7 | 10 | from concurrent.futures import ThreadPoolExecutor |
8 | 11 | from dataclasses import dataclass |
|
19 | 22 | from temporalio import activity, workflow |
20 | 23 | from temporalio.client import Client, WithStartWorkflowOperation, WorkflowUpdateStage |
21 | 24 | from temporalio.common import RetryPolicy, WorkflowIDConflictPolicy |
22 | | -from temporalio.contrib.opentelemetry import TracingInterceptor |
| 25 | +from temporalio.contrib.opentelemetry import ( |
| 26 | + TracingInterceptor, |
| 27 | + TracingWorkflowInboundInterceptor, |
| 28 | +) |
23 | 29 | from temporalio.contrib.opentelemetry import workflow as otel_workflow |
24 | 30 | from temporalio.exceptions import ApplicationError, ApplicationErrorCategory |
25 | 31 | from temporalio.testing import WorkflowEnvironment |
@@ -560,61 +566,48 @@ async def test_opentelemetry_benign_exception(client: Client): |
560 | 566 | # * signal failure and wft failure from signal |
561 | 567 |
|
562 | 568 |
|
563 | | -async def test_opentelemetry_safe_detach(client: Client): |
564 | | - # This test simulates forcing eviction. This purposely raises GeneratorExit on |
565 | | - # GC which triggers the finally which could run on any thread Python |
566 | | - # chooses. When this occurs, we should not detach the token from the context |
567 | | - # b/c the context no longer exists |
| 569 | +def test_opentelemetry_safe_detach(): |
| 570 | + class _fake_self: |
| 571 | + def _load_workflow_context_carrier(*args): |
| 572 | + return None |
568 | 573 |
|
569 | | - # Create a tracer that has an in-memory exporter |
570 | | - exporter = InMemorySpanExporter() |
571 | | - provider = TracerProvider() |
572 | | - provider.add_span_processor(SimpleSpanProcessor(exporter)) |
573 | | - tracer = get_tracer(__name__, tracer_provider=provider) |
| 574 | + def _set_on_context(self, ctx): |
| 575 | + return opentelemetry.context.set_value("test-key", "test-value", ctx) |
574 | 576 |
|
575 | | - async with Worker( |
576 | | - client, |
577 | | - workflows=[CacheEvictionTearDownWorkflow, WaitForeverWorkflow], |
578 | | - activities=[wait_forever_activity], |
579 | | - max_cached_workflows=0, |
580 | | - task_queue=f"task_queue_{uuid.uuid4()}", |
581 | | - disable_safe_workflow_eviction=True, |
582 | | - interceptors=[TracingInterceptor(tracer)], |
583 | | - ) as worker: |
584 | | - # Put a hook to catch unraisable exceptions |
585 | | - old_hook = sys.unraisablehook |
586 | | - hook_calls: List[sys.UnraisableHookArgs] = [] |
587 | | - sys.unraisablehook = hook_calls.append |
588 | | - |
589 | | - with LogCapturer().logs_captured(opentelemetry.context.logger) as capturer: |
590 | | - try: |
591 | | - handle = await client.start_workflow( |
592 | | - CacheEvictionTearDownWorkflow.run, |
593 | | - id=f"wf-{uuid.uuid4()}", |
594 | | - task_queue=worker.task_queue, |
595 | | - ) |
596 | | - |
597 | | - # CacheEvictionTearDownWorkflow requires 3 signals to be sent |
598 | | - await handle.signal(CacheEvictionTearDownWorkflow.signal) |
599 | | - await handle.signal(CacheEvictionTearDownWorkflow.signal) |
600 | | - await handle.signal(CacheEvictionTearDownWorkflow.signal) |
| 577 | + def _completed_span(*args, **kwargs): |
| 578 | + pass |
601 | 579 |
|
602 | | - await handle.result() |
603 | | - finally: |
604 | | - sys.unraisablehook = old_hook |
605 | | - |
606 | | - # Confirm at least 1 exception |
607 | | - if len(hook_calls) < 1: |
608 | | - logging.warning( |
609 | | - "Expected at least 1 exception. Unable to properly verify context detachment" |
610 | | - ) |
611 | | - |
612 | | - def otel_context_error(record: logging.LogRecord) -> bool: |
613 | | - return ( |
614 | | - record.name == "opentelemetry.context" |
615 | | - and "Failed to detach context" in record.message |
616 | | - ) |
| 580 | + # create a context manager and force enter to happen on this thread |
| 581 | + context_manager = TracingWorkflowInboundInterceptor._top_level_workflow_context( |
| 582 | + _fake_self(), # type: ignore |
| 583 | + success_is_complete=True, |
| 584 | + ) |
| 585 | + context_manager.__enter__() |
| 586 | + |
| 587 | + # move reference to context manager into queue |
| 588 | + q: queue.Queue = queue.Queue() |
| 589 | + q.put(context_manager) |
| 590 | + del context_manager |
| 591 | + |
| 592 | + def worker(): |
| 593 | + # pull reference from queue and delete the last reference |
| 594 | + context_manager = q.get() |
| 595 | + del context_manager |
| 596 | + # force gc |
| 597 | + gc.collect() |
| 598 | + |
| 599 | + with LogCapturer().logs_captured(opentelemetry.context.logger) as capturer: |
| 600 | + # run forced gc on other thread so exit happens there |
| 601 | + t = threading.Thread(target=worker) |
| 602 | + t.start() |
| 603 | + t.join(timeout=5) |
| 604 | + |
| 605 | + def otel_context_error(record: logging.LogRecord) -> bool: |
| 606 | + return ( |
| 607 | + record.name == "opentelemetry.context" |
| 608 | + and "Failed to detach context" in record.message |
| 609 | + ) |
617 | 610 |
|
618 | | - assert ( |
619 | | - capturer.find(otel_context_error) is None |
620 | | - ), "Detach from context message should not be logged" |
| 611 | + assert ( |
| 612 | + capturer.find(otel_context_error) is None |
| 613 | + ), "Detach from context message should not be logged" |
0 commit comments