Skip to content

Commit 98c62a3

Browse files
clhainlquerel
andauthored
Perf - Add otel supporting functionality to framework context objects (open-telemetry#552)
The PR adds support for tracing / logging / metrics in the various framework context objects. E.g. automatic creation / end of spans with status, ability to fetch otel tracers, meters, and logging handler inside plugins, decorating spans with ctx metadata, and generating span events. --------- Co-authored-by: Laurent Quérel <l.querel@f5.com>
1 parent 2cc68c9 commit 98c62a3

File tree

4 files changed

+342
-32
lines changed

4 files changed

+342
-32
lines changed

tools/pipeline_perf_test/orchestrator/lib/core/context/base.py

Lines changed: 251 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,25 @@
55
different implementations of Contexts throughout the orchestrator.
66
"""
77

8+
import logging
89
import datetime
910
import json
10-
import time
1111
import traceback
1212
from enum import Enum
1313

14+
from contextlib import AbstractContextManager
1415
from dataclasses import dataclass, field
1516
from typing import Dict, List, Optional, TYPE_CHECKING
1617

18+
from opentelemetry.trace import Span, Status, StatusCode
19+
20+
from ..telemetry.telemetry_runtime import TelemetryRuntime
21+
from ..telemetry.test_event import TestEvent
22+
from ..telemetry.telemetry_client import TelemetryClient
1723

1824
if TYPE_CHECKING:
25+
from opentelemetry.sdk.metrics import Meter
26+
from opentelemetry.sdk.trace import Tracer
1927
from ..test_framework.test_suite import TestSuite
2028
from ..component.component import Component
2129

@@ -43,18 +51,126 @@ class BaseContext:
4351
name: Optional[str] = None
4452
status: Optional[ExecutionStatus] = ExecutionStatus.PENDING
4553
error: Optional[Exception] = None
46-
start_time: Optional[float] = None
47-
end_time: Optional[float] = None
54+
start_time: Optional[datetime.datetime] = None
55+
end_time: Optional[datetime.datetime] = None
4856
metadata: dict = field(default_factory=dict)
4957

58+
start_event_type: TestEvent = field(init=False, default=TestEvent.SUITE_START)
59+
end_event_type: TestEvent = field(init=False, default=TestEvent.SUITE_END)
60+
61+
span: Optional[Span] = field(default=None, init=False)
62+
span_cm: Optional[AbstractContextManager] = field(default=None, init=False)
63+
span_name: Optional[str] = None
64+
5065
parent_ctx: Optional["BaseContext"] = None
5166
child_contexts: List["BaseContext"] = field(default_factory=list)
5267

68+
def __post_init__(self):
69+
"""
70+
Initializes default metadata after object creation.
71+
72+
Sets the 'test.ctx.type' to the class name and 'test.ctx.name' to the object's name
73+
in the metadata dictionary, if they are not already defined.
74+
"""
75+
self.metadata.setdefault("test.ctx.type", self.__class__.__name__)
76+
self.metadata.setdefault("test.ctx.name", self.name)
77+
78+
def __enter__(self):
79+
"""
80+
Enters the context for use in a 'with' statement.
81+
82+
Starts the process or operation associated with this object
83+
and returns the object itself for use within the context block.
84+
"""
85+
self.start()
86+
return self
87+
88+
def __exit__(self, exc_type, exc_value, _traceback):
89+
"""
90+
Exits the context, handling completion or exceptions.
91+
92+
Updates the context status to ERROR if an exception occurred,
93+
or SUCCESS if the status was still RUNNING. Finalizes the context
94+
by calling the end() method.
95+
96+
Args:
97+
exc_type: The exception type, if any occurred.
98+
exc_value: The exception instance.
99+
_traceback: The traceback object.
100+
"""
101+
if exc_type:
102+
self.status = ExecutionStatus.ERROR
103+
self.error = exc_value
104+
elif self.status == ExecutionStatus.RUNNING:
105+
self.status = ExecutionStatus.SUCCESS
106+
self.end()
107+
108+
def start(self):
109+
"""
110+
Marks the beginning of the context or operation.
111+
112+
- Sets the execution status to RUNNING.
113+
- Records the current UTC time as the start time.
114+
- Initializes a tracing span (if a tracer is available) using the object's class name
115+
and optional `name` attribute for the span name.
116+
- Attaches contextual attributes to the tracing span.
117+
- Records a start event with a precise timestamp in nanoseconds.
118+
119+
This method is typically called at the beginning of a timed or monitored execution block,
120+
such as within a context manager (`__enter__`).
121+
"""
122+
self.status = ExecutionStatus.RUNNING
123+
self.start_time = datetime.datetime.now(tz=datetime.timezone.utc)
124+
125+
tracer = self.get_tracer("test-framework")
126+
if tracer:
127+
span_name = getattr(
128+
self,
129+
"span_name",
130+
f"{self.__class__.__name__} - {getattr(self, 'name', 'unnamed')}",
131+
)
132+
self.span_cm = tracer.start_as_current_span(span_name)
133+
self.span = self.span_cm.__enter__()
134+
attrs = self.merge_ctx_metadata()
135+
for k, v in attrs.items():
136+
self.span.set_attribute(k, v)
137+
138+
timestamp_unix_nanos = int(self.start_time.timestamp() * 1_000_000_000)
139+
self._record_start_event(timestamp_unix_nanos)
140+
141+
def end(self):
142+
"""
143+
Marks the end of the context or operation and logs its duration.
144+
145+
- Records the current UTC time as the end time.
146+
- Logs an end event with a precise timestamp in nanoseconds.
147+
- If a tracing span is active:
148+
- Sets the span's status based on the current execution status (SUCCESS, ERROR, etc.).
149+
- Closes the span context manager to finalize the trace.
150+
151+
This method is typically called at the end of a monitored or timed execution block,
152+
such as within a context manager (`__exit__`).
153+
"""
154+
self.end_time = datetime.datetime.now(tz=datetime.timezone.utc)
155+
timestamp_unix_nanos = int(self.end_time.timestamp() * 1_000_000_000)
156+
self._record_end_event(timestamp_unix_nanos)
157+
if self.span_cm and self.span:
158+
if self.status == ExecutionStatus.SUCCESS:
159+
self.span.set_status(StatusCode.OK)
160+
elif self.status == ExecutionStatus.ERROR:
161+
self.span.set_status(
162+
Status(StatusCode.ERROR, str(self.error) or "Context failed")
163+
)
164+
else:
165+
# Optional: Handle PENDING or other states explicitly
166+
self.span.set_status(Status(StatusCode.UNSET))
167+
self.span_cm.__exit__(None, None, None)
168+
53169
@property
54170
def duration(self) -> Optional[float]:
55171
"""Duration of the context between start/stop calls or none if not run/stopped."""
56172
if self.start_time is not None and self.end_time is not None:
57-
return self.end_time - self.start_time
173+
return (self.end_time - self.start_time).total_seconds()
58174
return None
59175

60176
def add_child_ctx(self, ctx: "BaseContext"):
@@ -84,7 +200,7 @@ def get_component_by_name(self, name: str) -> Optional["Component"]:
84200
Returns: The named component or none if not found.
85201
"""
86202
if hasattr(self, "parent_ctx"):
87-
return self.parent_ctx.get_component(name)
203+
return self.parent_ctx.get_component_by_name(name)
88204
raise NotImplementedError("This context does not support get_component_by_name")
89205

90206
def get_test_suite(self) -> Optional["TestSuite"]:
@@ -93,40 +209,145 @@ def get_test_suite(self) -> Optional["TestSuite"]:
93209
return self.parent_ctx.get_test_suite()
94210
raise NotImplementedError("This context does not support get_test_suite")
95211

96-
def log(self, message: str):
212+
def record_event(
213+
self, event_name: str, timestamp_unix_nanos: Optional[int] = None, **kwargs
214+
):
97215
"""
98-
Logs a message both to the logger and stores it in the context log list.
99-
100-
Args:
101-
message: the message to log
216+
Record an event, enriching it with context-specific metadata.
102217
"""
103-
msg_dict = {"ctx_type": self.__class__.__name__, "message": message}
104-
if self.name:
105-
msg_dict["name"] = self.name
218+
if not self.span or not self.span.is_recording():
219+
return
106220
if self.status:
107-
msg_dict["status"] = self.status
221+
kwargs.setdefault("test.ctx.status", self.status.value)
222+
if self.error:
223+
kwargs.setdefault("test.ctx.error", str(self.error))
108224
if self.duration:
109-
msg_dict["duration"] = f"{self.duration:.4f}"
225+
kwargs.setdefault("test.ctx.duration", self.duration)
226+
kwargs = self.merge_ctx_metadata(**kwargs)
227+
self.span.add_event(event_name, kwargs, timestamp=timestamp_unix_nanos)
228+
229+
def merge_ctx_metadata(self, **kwargs: dict):
230+
"""Merge context metadata and status with the supplied arguments."""
231+
if self.error:
232+
kwargs.setdefault("test.ctx.error", str(self.error))
233+
for key, value in self.metadata.items():
234+
if value:
235+
kwargs.setdefault(key, value)
236+
return kwargs
237+
238+
def get_logger(self, logger_name: str = __name__) -> logging.LoggerAdapter:
239+
"""
240+
Returns a context-aware logger with enriched metadata.
110241
111-
log_entry = f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {json.dumps(msg_dict)}"
112-
print(log_entry)
242+
- Retrieves a base logger using the specified logger name.
243+
- Merges contextual metadata from the object.
244+
- Adds non-empty metadata fields as 'extra' context to the logger.
245+
- Returns a `LoggerAdapter` that injects this context into all log records.
113246
114-
def start(self):
115-
"""Mark start of context (e.g., for timing)."""
116-
self.log("Context Starting...")
117-
self.status = ExecutionStatus.RUNNING
118-
self.start_time = time.time()
247+
Args:
248+
logger_name (str): The name of the logger to retrieve. Defaults to the current module's name.
119249
120-
def end(self):
121-
"""Mark end of context and log duration."""
122-
self.end_time = time.time()
123-
self.log("Context Ended")
250+
Returns:
251+
logging.LoggerAdapter: A logger adapter that includes contextual metadata for structured logging.
252+
"""
253+
base_logger = logging.getLogger(logger_name)
254+
extra = {}
255+
attrs = self.merge_ctx_metadata()
256+
for k, v in attrs.items():
257+
if v:
258+
extra[k] = v
259+
return logging.LoggerAdapter(base_logger, extra)
260+
261+
def get_tracer(
262+
self, name="default", runtime_name: str = TelemetryRuntime.type
263+
) -> Optional["Tracer"]:
264+
"""
265+
Retrieves a tracer instance from the telemetry runtime.
266+
267+
- Accesses the current test suite and retrieves the specified telemetry runtime.
268+
- Returns a tracer identified by the given name from that runtime.
269+
- If the telemetry runtime is not found, returns None.
270+
271+
Args:
272+
name (str): The name of the tracer to retrieve. Defaults to "default".
273+
runtime_name (str): The name/type of the telemetry runtime to use. Defaults to the class-level `TelemetryRuntime.type`.
274+
275+
Returns:
276+
Optional[Tracer]: A tracer instance for telemetry, or None if unavailable.
277+
"""
278+
ts = self.get_test_suite()
279+
telemetry_runtime: TelemetryRuntime = ts.get_runtime(runtime_name)
280+
if not telemetry_runtime:
281+
return
282+
return telemetry_runtime.get_tracer(name)
283+
284+
def get_meter(
285+
self, name="default", runtime_name: str = TelemetryRuntime.type
286+
) -> Optional["Meter"]:
287+
"""
288+
Retrieves a meter instance from the telemetry runtime.
289+
290+
- Accesses the current test suite and obtains the specified telemetry runtime.
291+
- Returns a meter identified by the given name from that runtime.
292+
- If the telemetry runtime is not available, returns None.
293+
294+
Args:
295+
name (str): The name of the meter to retrieve. Defaults to "default".
296+
runtime_name (str): The name/type of the telemetry runtime to use. Defaults to the class-level `TelemetryRuntime.type`.
297+
298+
Returns:
299+
Optional[Meter]: A meter instance for recording metrics, or None if unavailable.
300+
"""
301+
ts = self.get_test_suite()
302+
telemetry_runtime: TelemetryRuntime = ts.get_runtime(runtime_name)
303+
if not telemetry_runtime:
304+
return
305+
return telemetry_runtime.get_meter(name)
306+
307+
def get_telemetry_client(
308+
self, runtime_name: str = TelemetryRuntime.type
309+
) -> Optional[TelemetryClient]:
310+
"""
311+
Retrieves the telemetry client from the specified telemetry runtime.
312+
313+
- Accesses the current test suite to retrieve the telemetry runtime by name.
314+
- Returns the telemetry client associated with that runtime.
315+
- If the runtime is not found, returns None.
316+
317+
Args:
318+
runtime_name (str): The name/type of the telemetry runtime to use.
319+
Defaults to the class-level `TelemetryRuntime.type`.
320+
321+
Returns:
322+
Optional[TelemetryClient]: The telemetry client instance, or None if unavailable.
323+
"""
324+
ts = self.get_test_suite()
325+
telemetry_runtime: TelemetryRuntime = ts.get_runtime(runtime_name)
326+
if not telemetry_runtime:
327+
return
328+
return telemetry_runtime.get_client()
329+
330+
def get_metadata(self) -> dict:
331+
"""Return metadata specific to this context level (e.g. test_name, test_step)."""
332+
return self.metadata
333+
334+
def _record_start_event(self, timestamp_unix_nanos: Optional[int]):
335+
"""Hook to record a context-specific start event."""
336+
self.record_event(
337+
self.start_event_type.namespaced(), timestamp=timestamp_unix_nanos
338+
)
339+
340+
def _record_end_event(self, timestamp_unix_nanos: Optional[int]):
341+
"""Hook to record a context-specific end event."""
342+
self.record_event(
343+
self.end_event_type.namespaced(), timestamp=timestamp_unix_nanos
344+
)
124345

125346
@staticmethod
126-
def _format_time(timestamp: Optional[float]) -> str:
347+
def _format_time(timestamp: Optional[datetime.datetime]) -> str:
127348
if timestamp is None:
128349
return "None"
129-
return datetime.datetime.fromtimestamp(timestamp).isoformat()
350+
return timestamp.isoformat()
130351

131352
def summary_string(self, indent: int = 2) -> str:
132353
"""
@@ -168,8 +389,8 @@ def to_dict(self) -> dict:
168389
"name": self.name,
169390
"status": self.status.value if self.status else None,
170391
"error": str(self.error) if self.error else None,
171-
"start_time": self.start_time,
172-
"end_time": self.end_time,
392+
"start_time": self._format_time(self.start_time),
393+
"end_time": self._format_time(self.end_time),
173394
"duration": self.duration,
174395
"metadata": self.metadata,
175396
"child_contexts": [child.to_dict() for child in self.child_contexts],

tools/pipeline_perf_test/orchestrator/lib/core/context/component_hook_context.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@
3131
from enum import Enum
3232
from typing import Optional, TYPE_CHECKING
3333

34+
from ..telemetry.test_event import TestEvent
3435
from .base import BaseContext
35-
from .test_contexts import TestStepContext
3636

3737
if TYPE_CHECKING:
38+
from .test_contexts import TestStepContext
3839
from ..component.component import Component
3940

4041

@@ -81,13 +82,27 @@ class ComponentHookContext(BaseContext):
8182
parent_ctx: Optional["TestStepContext"] = None
8283
phase: Optional["HookableComponentPhase"] = None
8384

85+
def __post_init__(self):
86+
"""
87+
Performs additional initialization after object creation.
88+
"""
89+
super().__post_init__()
90+
self.start_event_type = TestEvent.HOOK_START
91+
self.end_event_type = TestEvent.HOOK_END
92+
if self.parent_ctx:
93+
merged_metadata = {**self.parent_ctx.metadata, **self.metadata}
94+
merged_metadata["test.ctx.component"] = self.parent_ctx.step.component.name
95+
self.metadata = merged_metadata
96+
self.metadata["test.ctx.phase"] = self.phase.value
97+
self.span_name = f"Run Component Hook: {self.name}"
98+
8499
def get_step_component(self) -> Optional["Component"]:
85100
"""Fetches the component instance on which this hook is firing.
86101
87102
Returns: the component instance or none.
88103
"""
89104
if self.parent_ctx is None:
90105
raise RuntimeError(
91-
"LifecycleHookContext.parent_ctx must be set to access the step component."
106+
"ComponentHookContext.parent_ctx must be set to access the step component."
92107
)
93108
return self.parent_ctx.step.component

0 commit comments

Comments
 (0)