Skip to content

Commit 6d703c0

Browse files
committed
wip: refactor test helpers into separate modules
1 parent 3d2596e commit 6d703c0

File tree

3 files changed

+187
-142
lines changed

3 files changed

+187
-142
lines changed

tests/contrib/opentelemetry/helpers/_init__.py

Whitespace-only changes.
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import concurrent.futures
5+
import logging
6+
import multiprocessing
7+
import multiprocessing.managers
8+
import threading
9+
import typing
10+
import uuid
11+
from dataclasses import dataclass
12+
from datetime import timedelta
13+
from typing import Any, Dict, Iterable, List, Optional, Sequence, Union
14+
15+
import opentelemetry.trace
16+
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
17+
from opentelemetry.sdk.trace.export import (
18+
SimpleSpanProcessor,
19+
SpanExporter,
20+
SpanExportResult,
21+
)
22+
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
23+
from opentelemetry.trace import get_current_span, get_tracer
24+
25+
from temporalio import activity, workflow
26+
from temporalio.client import Client
27+
from temporalio.common import RetryPolicy
28+
from temporalio.contrib.opentelemetry import TracingInterceptor
29+
from temporalio.contrib.opentelemetry import workflow as otel_workflow
30+
from temporalio.testing import WorkflowEnvironment
31+
from temporalio.worker import SharedStateManager, UnsandboxedWorkflowRunner, Worker
32+
33+
34+
@dataclass(frozen=True)
35+
class SerialisableSpan:
36+
"""A serialisable, incomplete representation of a span for testing purposes."""
37+
38+
@dataclass(frozen=True)
39+
class SpanContext:
40+
trace_id: int
41+
span_id: int
42+
43+
@classmethod
44+
def from_span_context(
45+
cls, context: opentelemetry.trace.SpanContext
46+
) -> "SerialisableSpan.SpanContext":
47+
return cls(
48+
trace_id=context.trace_id,
49+
span_id=context.span_id,
50+
)
51+
52+
@classmethod
53+
def from_optional_span_context(
54+
cls, context: Optional[opentelemetry.trace.SpanContext]
55+
) -> Optional["SerialisableSpan.SpanContext"]:
56+
if context is None:
57+
return None
58+
return cls.from_span_context(context)
59+
60+
@dataclass(frozen=True)
61+
class Link:
62+
context: SerialisableSpan.SpanContext
63+
attributes: Dict[str, Any]
64+
65+
name: str
66+
context: Optional[SpanContext]
67+
parent: Optional[SpanContext]
68+
attributes: Dict[str, Any]
69+
links: Sequence[Link]
70+
71+
@classmethod
72+
def from_readable_span(cls, span: ReadableSpan) -> "SerialisableSpan":
73+
return cls(
74+
name=span.name,
75+
context=cls.SpanContext.from_optional_span_context(span.context),
76+
parent=cls.SpanContext.from_optional_span_context(span.parent),
77+
attributes=dict(span.attributes or {}),
78+
links=tuple(
79+
cls.Link(
80+
context=cls.SpanContext.from_span_context(link.context),
81+
attributes=dict(span.attributes or {}),
82+
)
83+
for link in span.links
84+
),
85+
)
86+
87+
88+
SerialisableSpanListProxy: typing.TypeAlias = multiprocessing.managers.ListProxy[
89+
SerialisableSpan
90+
]
91+
92+
93+
def make_span_proxy_list(
94+
manager: multiprocessing.managers.SyncManager,
95+
) -> SerialisableSpanListProxy:
96+
"""Create a list proxy to share `SerialisableSpan` across processes."""
97+
return manager.list()
98+
99+
100+
class _ListProxySpanExporter(SpanExporter):
101+
"""Implementation of :class:`SpanExporter` that exports spans to a
102+
list proxy created by a multiprocessing manager.
103+
104+
This class is used for testing multiprocessing setups, as we can get access
105+
to the finished spans from the parent process.
106+
107+
In production, you would use `OTLPSpanExporter` or similar to export spans.
108+
Tracing is designed to be distributed, the child process can push collected
109+
spans directly to a collector or backend, which can reassemble the spans
110+
into a single trace.
111+
"""
112+
113+
def __init__(self, finished_spans: SerialisableSpanListProxy) -> None:
114+
self._finished_spans = finished_spans
115+
self._stopped = False
116+
self._lock = threading.Lock()
117+
118+
def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult:
119+
if self._stopped:
120+
return SpanExportResult.FAILURE
121+
with self._lock:
122+
# Note: ReadableSpan is not picklable, so convert to a DTO
123+
# Note: we could use `span.to_json()` but there isn't a `from_json`
124+
# and the serialisation isn't easily reversible, e.g. `parent` context
125+
# is lost, span/trace IDs are transformed into strings
126+
self._finished_spans.extend(
127+
[SerialisableSpan.from_readable_span(span) for span in spans]
128+
)
129+
return SpanExportResult.SUCCESS
130+
131+
def shutdown(self) -> None:
132+
self._stopped = True
133+
134+
def force_flush(self, timeout_millis: int = 30000) -> bool:
135+
return True
136+
137+
138+
def dump_spans(
139+
spans: Iterable[Union[ReadableSpan, SerialisableSpan]],
140+
*,
141+
parent_id: Optional[int] = None,
142+
with_attributes: bool = True,
143+
indent_depth: int = 0,
144+
) -> List[str]:
145+
ret: List[str] = []
146+
for span in spans:
147+
if (not span.parent and parent_id is None) or (
148+
span.parent and span.parent.span_id == parent_id
149+
):
150+
span_str = f"{' ' * indent_depth}{span.name}"
151+
if with_attributes:
152+
span_str += f" (attributes: {dict(span.attributes or {})})"
153+
# Add links
154+
if span.links:
155+
span_links: List[str] = []
156+
for link in span.links:
157+
for link_span in spans:
158+
if (
159+
link_span.context
160+
and link_span.context.span_id == link.context.span_id
161+
):
162+
span_links.append(link_span.name)
163+
span_str += f" (links: {', '.join(span_links)})"
164+
# Signals can duplicate in rare situations, so we make sure not to
165+
# re-add
166+
if "Signal" in span_str and span_str in ret:
167+
continue
168+
ret.append(span_str)
169+
ret += dump_spans(
170+
spans,
171+
parent_id=(span.context.span_id if span.context else None),
172+
with_attributes=with_attributes,
173+
indent_depth=indent_depth + 1,
174+
)
175+
return ret

tests/contrib/test_opentelemetry.py

Lines changed: 12 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,19 @@
55
import logging
66
import multiprocessing
77
import multiprocessing.managers
8-
import threading
98
import typing
109
import uuid
1110
from dataclasses import dataclass
1211
from datetime import timedelta
13-
from typing import Any, Dict, Iterable, List, Optional, Sequence, Union
12+
from typing import List, Optional
1413

1514
import opentelemetry.trace
16-
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
15+
from opentelemetry.sdk.trace import TracerProvider
1716
from opentelemetry.sdk.trace.export import (
1817
SimpleSpanProcessor,
19-
SpanExporter,
20-
SpanExportResult,
2118
)
2219
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
23-
from opentelemetry.trace import get_current_span, get_tracer
20+
from opentelemetry.trace import get_tracer
2421

2522
from temporalio import activity, workflow
2623
from temporalio.client import Client
@@ -29,6 +26,13 @@
2926
from temporalio.contrib.opentelemetry import workflow as otel_workflow
3027
from temporalio.testing import WorkflowEnvironment
3128
from temporalio.worker import SharedStateManager, UnsandboxedWorkflowRunner, Worker
29+
from tests.contrib.opentelemetry.helpers.tracing import (
30+
SerialisableSpan,
31+
_ListProxySpanExporter,
32+
dump_spans,
33+
make_span_proxy_list,
34+
SerialisableSpanListProxy,
35+
)
3236

3337
# Passing through because Python 3.9 has an import bug at
3438
# https://github.com/python/cpython/issues/91351
@@ -89,58 +93,6 @@ class TracingWorkflowActionContinueAsNew:
8993
ready_for_update: asyncio.Semaphore
9094

9195

92-
@dataclass(frozen=True)
93-
class SerialisableSpan:
94-
@dataclass(frozen=True)
95-
class SpanContext:
96-
trace_id: int
97-
span_id: int
98-
99-
@classmethod
100-
def from_span_context(
101-
cls, context: opentelemetry.trace.SpanContext
102-
) -> "SerialisableSpan.SpanContext":
103-
return cls(
104-
trace_id=context.trace_id,
105-
span_id=context.span_id,
106-
)
107-
108-
@classmethod
109-
def from_optional_span_context(
110-
cls, context: Optional[opentelemetry.trace.SpanContext]
111-
) -> Optional["SerialisableSpan.SpanContext"]:
112-
if context is None:
113-
return None
114-
return cls.from_span_context(context)
115-
116-
@dataclass(frozen=True)
117-
class Link:
118-
context: SerialisableSpan.SpanContext
119-
attributes: Dict[str, Any]
120-
121-
name: str
122-
context: Optional[SpanContext]
123-
parent: Optional[SpanContext]
124-
attributes: Dict[str, Any]
125-
links: Sequence[Link]
126-
127-
@classmethod
128-
def from_readable_span(cls, span: ReadableSpan) -> "SerialisableSpan":
129-
return cls(
130-
name=span.name,
131-
context=cls.SpanContext.from_optional_span_context(span.context),
132-
parent=cls.SpanContext.from_optional_span_context(span.parent),
133-
attributes=dict(span.attributes or {}),
134-
links=tuple(
135-
cls.Link(
136-
context=cls.SpanContext.from_span_context(link.context),
137-
attributes=dict(span.attributes or {}),
138-
)
139-
for link in span.links
140-
),
141-
)
142-
143-
14496
@workflow.defn
14597
class TracingWorkflow:
14698
def __init__(self) -> None:
@@ -361,46 +313,6 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment):
361313
]
362314

363315

364-
def dump_spans(
365-
spans: Iterable[Union[ReadableSpan, SerialisableSpan]],
366-
*,
367-
parent_id: Optional[int] = None,
368-
with_attributes: bool = True,
369-
indent_depth: int = 0,
370-
) -> List[str]:
371-
ret: List[str] = []
372-
for span in spans:
373-
if (not span.parent and parent_id is None) or (
374-
span.parent and span.parent.span_id == parent_id
375-
):
376-
span_str = f"{' ' * indent_depth}{span.name}"
377-
if with_attributes:
378-
span_str += f" (attributes: {dict(span.attributes or {})})"
379-
# Add links
380-
if span.links:
381-
span_links: List[str] = []
382-
for link in span.links:
383-
for link_span in spans:
384-
if (
385-
link_span.context
386-
and link_span.context.span_id == link.context.span_id
387-
):
388-
span_links.append(link_span.name)
389-
span_str += f" (links: {', '.join(span_links)})"
390-
# Signals can duplicate in rare situations, so we make sure not to
391-
# re-add
392-
if "Signal" in span_str and span_str in ret:
393-
continue
394-
ret.append(span_str)
395-
ret += dump_spans(
396-
spans,
397-
parent_id=(span.context.span_id if span.context else None),
398-
with_attributes=with_attributes,
399-
indent_depth=indent_depth + 1,
400-
)
401-
return ret
402-
403-
404316
@workflow.defn
405317
class SimpleWorkflow:
406318
@workflow.run
@@ -503,9 +415,7 @@ async def test_activity_trace_propagation(
503415
# Create a proxy list using the server process manager which we'll use
504416
# to access finished spans in the process pool
505417
manager = multiprocessing.Manager()
506-
finished_spans_proxy = typing.cast(
507-
multiprocessing.managers.ListProxy[SerialisableSpan], manager.list()
508-
)
418+
finished_spans_proxy = make_span_proxy_list(manager)
509419

510420
# Create a worker with a process pool activity executor
511421
async with Worker(
@@ -535,48 +445,8 @@ async def test_activity_trace_propagation(
535445
]
536446

537447

538-
class _ListProxySpanExporter(SpanExporter):
539-
"""Implementation of :class:`SpanExporter` that exports spans to a
540-
list proxy created by a multiprocessing manager.
541-
542-
This class is used for testing multiprocessing setups, as we can get access
543-
to the finished spans from the parent process.
544-
545-
In production, you would use `OTLPSpanExporter` or similar to export spans.
546-
Tracing is designed to be distributed, the child process can push collected
547-
spans directly to a collector or backend, which can reassemble the spans
548-
into a single trace.
549-
"""
550-
551-
def __init__(
552-
self, finished_spans: multiprocessing.managers.ListProxy[SerialisableSpan]
553-
) -> None:
554-
self._finished_spans = finished_spans
555-
self._stopped = False
556-
self._lock = threading.Lock()
557-
558-
def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult:
559-
if self._stopped:
560-
return SpanExportResult.FAILURE
561-
with self._lock:
562-
# Note: ReadableSpan is not picklable, so convert to a DTO
563-
# Note: we could use `span.to_json()` but there isn't a `from_json`
564-
# and the serialisation isn't easily reversible, e.g. `parent` context
565-
# is lost, span/trace IDs are transformed into strings
566-
self._finished_spans.extend(
567-
[SerialisableSpan.from_readable_span(span) for span in spans]
568-
)
569-
return SpanExportResult.SUCCESS
570-
571-
def shutdown(self) -> None:
572-
self._stopped = True
573-
574-
def force_flush(self, timeout_millis: int = 30000) -> bool:
575-
return True
576-
577-
578448
def activity_trace_propagation_initializer(
579-
_finished_spans_proxy: multiprocessing.managers.ListProxy[SerialisableSpan],
449+
_finished_spans_proxy: SerialisableSpanListProxy,
580450
) -> None:
581451
"""Initializer for the process pool worker to export spans to a shared list."""
582452
_exporter = _ListProxySpanExporter(_finished_spans_proxy)

0 commit comments

Comments
 (0)