Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from concurrent.futures import Future, ThreadPoolExecutor
from dataclasses import asdict, dataclass
from functools import partial
from typing import Any, Callable, Literal, TextIO, cast
from typing import Any, Callable, Literal, TextIO, cast, Union
from uuid import uuid4

import fsspec
Expand Down Expand Up @@ -147,8 +147,8 @@ def upload(
inputs: list[types.InputMessage],
outputs: list[types.OutputMessage],
system_instruction: list[types.MessagePart],
span: Span | None = None,
log_record: LogRecord | None = None,
span: Union[Span, None] = None,
log_record: Union[LogRecord, None] = None,
**kwargs: Any,
) -> None:
completion = Completion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import os
from dataclasses import dataclass
from typing import Dict
from typing import Dict, Union

from .emitters.spec import CategoryOverride
from .environment_variables import (
Expand Down Expand Up @@ -130,7 +130,7 @@ def parse_env() -> Settings:

def _parse_category_override(
category: str, raw: str
) -> CategoryOverride | None: # pragma: no cover - thin parsing
) -> Union[CategoryOverride, None]: # pragma: no cover - thin parsing
if not raw:
return None
text = raw.strip()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import logging
from typing import Any, Iterable, Iterator, Mapping, Sequence
from typing import Any, Iterable, Iterator, Mapping, Sequence, Union

from ..interfaces import EmitterMeta, EmitterProtocol
from ..types import Error, EvaluationResult
Expand Down Expand Up @@ -37,10 +37,10 @@ class CompositeEmitter(EmitterMeta):
def __init__(
self,
*,
span_emitters: Iterable[EmitterProtocol] | None = None,
metrics_emitters: Iterable[EmitterProtocol] | None = None,
content_event_emitters: Iterable[EmitterProtocol] | None = None,
evaluation_emitters: Iterable[EmitterProtocol] | None = None,
span_emitters: Union[Iterable[EmitterProtocol], None] = None,
metrics_emitters: Union[Iterable[EmitterProtocol], None] = None,
content_event_emitters: Union[Iterable[EmitterProtocol], None] = None,
evaluation_emitters: Union[Iterable[EmitterProtocol], None] = None,
) -> None:
self._categories: dict[str, list[EmitterProtocol]] = {
"span": list(span_emitters or []),
Expand All @@ -64,7 +64,7 @@ def on_error(self, error: Error, obj: Any) -> None: # type: ignore[override]
def on_evaluation_results(
self,
results: Sequence[EvaluationResult],
obj: Any | None = None,
obj: Union[Any, None] = None,
) -> None: # type: ignore[override]
if not results:
return
Expand All @@ -79,7 +79,7 @@ def on_evaluation_results(
# Introspection helpers used during configuration refresh

def iter_emitters(
self, categories: Sequence[str] | None = None
self, categories: Union[Sequence[str], None] = None
) -> Iterator[EmitterProtocol]:
names = categories or (
"span",
Expand Down Expand Up @@ -108,9 +108,9 @@ def _dispatch(
categories: Sequence[str],
method_name: str,
*,
obj: Any | None = None,
error: Error | None = None,
results: Sequence[EvaluationResult] | None = None,
obj: Union[Any, None] = None,
error: Union[Error, None] = None,
results: Union[Sequence[EvaluationResult], None] = None,
) -> None:
for category in categories:
emitters = self._categories.get(category)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import logging
import os
from typing import Any, Dict, Optional, Sequence
from typing import Any, Dict, Optional, Sequence, Union

from opentelemetry import _events as _otel_events

Expand All @@ -23,13 +23,13 @@
from ..types import EvaluationResult, GenAI


def _get_request_model(invocation: GenAI) -> str | None:
def _get_request_model(invocation: GenAI) -> Union[str, None]:
return getattr(invocation, "request_model", None) or getattr(
invocation, "model", None
)


def _get_response_id(invocation: GenAI) -> str | None: # best-effort
def _get_response_id(invocation: GenAI) -> Union[str, None]: # best-effort
return getattr(invocation, "response_id", None)


Expand Down Expand Up @@ -128,7 +128,7 @@ def _direct_factory(_name: str): # ignore metric name, single hist
def on_evaluation_results( # type: ignore[override]
self,
results: Sequence[EvaluationResult],
obj: Any | None = None,
obj: Union[Any, None] = None,
) -> None:
invocation = obj if isinstance(obj, GenAI) else None
if invocation is None:
Expand Down Expand Up @@ -314,7 +314,7 @@ def __init__(
def on_evaluation_results( # type: ignore[override]
self,
results: Sequence[EvaluationResult],
obj: Any | None = None,
obj: Union[Any, None] = None,
) -> None:
if self._event_logger is None:
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import json # noqa: F401 (kept for backward compatibility if external code relies on this module re-exporting json)
from dataclasses import asdict # noqa: F401
from typing import Any, Optional
from typing import Any, Optional, Union

from opentelemetry import trace
from opentelemetry.semconv._incubating.attributes import (
Expand Down Expand Up @@ -202,7 +202,7 @@ def _apply_start_attrs(self, invocation: GenAIType):
# Agent context (already covered by semconv metadata on base fields)

def _apply_finish_attrs(
self, invocation: LLMInvocation | EmbeddingInvocation
self, invocation: Union[LLMInvocation, EmbeddingInvocation]
):
span = getattr(invocation, "span", None)
if span is None:
Expand Down Expand Up @@ -255,7 +255,7 @@ def _apply_finish_attrs(

# ---- lifecycle -------------------------------------------------------
def on_start(
self, invocation: LLMInvocation | EmbeddingInvocation
self, invocation: Union[LLMInvocation, EmbeddingInvocation]
) -> None: # type: ignore[override]
# Handle new agentic types
if isinstance(invocation, Workflow):
Expand Down Expand Up @@ -289,7 +289,7 @@ def on_start(
invocation.context_token = cm # type: ignore[assignment]
self._apply_start_attrs(invocation)

def on_end(self, invocation: LLMInvocation | EmbeddingInvocation) -> None: # type: ignore[override]
def on_end(self, invocation: Union[LLMInvocation, EmbeddingInvocation]) -> None: # type: ignore[override]
if isinstance(invocation, Workflow):
self._finish_workflow(invocation)
elif isinstance(invocation, AgentInvocation):
Expand All @@ -312,7 +312,7 @@ def on_end(self, invocation: LLMInvocation | EmbeddingInvocation) -> None: # ty
span.end()

def on_error(
self, error: Error, invocation: LLMInvocation | EmbeddingInvocation
self, error: Error, invocation: Union[LLMInvocation, EmbeddingInvocation]
) -> None: # type: ignore[override]
if isinstance(invocation, Workflow):
self._error_workflow(error, invocation)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any, Callable, Mapping, Sequence
from typing import Any, Callable, Mapping, Sequence, Union

from ..interfaces import EmitterProtocol

Expand Down Expand Up @@ -30,7 +30,7 @@ class EmitterSpec:
mode: str = "append"
after: Sequence[str] = field(default_factory=tuple)
before: Sequence[str] = field(default_factory=tuple)
invocation_types: Sequence[str] | None = None
invocation_types: Union[Sequence[str], None] = None


@dataclass(frozen=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import json
from dataclasses import asdict
from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence
from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence, Union

from opentelemetry import trace
from opentelemetry._logs import (
Expand Down Expand Up @@ -56,7 +56,7 @@


def filter_semconv_gen_ai_attributes(
attributes: Mapping[str, Any] | None,
attributes: Union[Mapping[str, Any], None],
*,
extras: Iterable[str] = (),
) -> dict[str, Any]:
Expand Down Expand Up @@ -564,7 +564,7 @@ def _record_token_metrics(

def _record_duration(
duration_histogram: Histogram,
invocation: LLMInvocation | EmbeddingInvocation | ToolCall,
invocation: Union[LLMInvocation, EmbeddingInvocation, ToolCall],
metric_attributes: Dict[str, AttributeValue],
*,
span: Optional[Span] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from __future__ import annotations

from abc import ABC
from typing import Iterable, Mapping, Sequence
from typing import Iterable, Mapping, Sequence, Union

from opentelemetry.util.genai.types import (
AgentInvocation,
Expand All @@ -35,10 +35,10 @@ class Evaluator(ABC):

def __init__(
self,
metrics: Iterable[str] | None = None,
metrics: Union[Iterable[str], None] = None,
*,
invocation_type: str | None = None,
options: Mapping[str, str] | None = None,
invocation_type: Union[str, None] = None,
options: Union[Mapping[str, str], None] = None,
) -> None:
default_metrics = (
self.default_metrics_for(invocation_type)
Expand All @@ -65,7 +65,7 @@ def default_metrics(self) -> Sequence[str]: # pragma: no cover - trivial
return ()

def default_metrics_for(
self, invocation_type: str | None
self, invocation_type: Union[str, None]
) -> Sequence[str]:
mapping = self.default_metrics_by_type()
if invocation_type and invocation_type in mapping:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import threading
import time
from dataclasses import dataclass
from typing import TYPE_CHECKING, Mapping, Protocol, Sequence
from typing import TYPE_CHECKING, Mapping, Protocol, Sequence, Union

from ..callbacks import CompletionCallback
from ..environment_variables import (
Expand Down Expand Up @@ -75,8 +75,8 @@ def __init__(
self,
handler: "TelemetryHandler",
*,
interval: float | None = None,
aggregate_results: bool | None = None,
interval: Union[float, None] = None,
aggregate_results: Union[bool, None] = None,
) -> None:
self._handler = handler
evaluation_sample_rate = _read_evaluation_sample_rate()
Expand All @@ -91,7 +91,7 @@ def __init__(
self._evaluators = self._instantiate_evaluators(self._plans)
self._queue: queue.Queue[GenAI] = queue.Queue()
self._shutdown = threading.Event()
self._worker: threading.Thread | None = None
self._worker: Union[threading.Thread, None] = None
if self.has_evaluators:
self._worker = threading.Thread(
target=self._worker_loop,
Expand Down Expand Up @@ -138,7 +138,7 @@ def offer(self, invocation: GenAI) -> None:
"Failed to enqueue invocation for evaluation", exc_info=True
)

def wait_for_all(self, timeout: float | None = None) -> None:
def wait_for_all(self, timeout: Union[float, None] = None) -> None:
if not self.has_evaluators:
return
if timeout is None:
Expand Down Expand Up @@ -239,11 +239,14 @@ def _emit_results(
return flattened

def _flag_invocation(self, invocation: GenAI) -> None:
# print(f"_flag_invocation:")
if not self.has_evaluators:
return
attributes = getattr(invocation, "attributes", None)
# print(f"attributes inside _flag_invocation: {attributes}")
if isinstance(attributes, dict):
attributes.setdefault("gen_ai.evaluation.executed", True)
# print(f"attributes inside _flag_invocation: {attributes['gen_ai.evaluation.executed']}")

# Configuration ------------------------------------------------------
def _load_plans(self) -> Sequence[EvaluatorPlan]:
Expand Down Expand Up @@ -383,7 +386,7 @@ def _generate_default_plans(self) -> Sequence[EvaluatorPlan]:
# Environment parsing helpers


def _read_raw_evaluator_config() -> str | None:
def _read_raw_evaluator_config() -> Union[str, None]:
return _get_env(OTEL_INSTRUMENTATION_GENAI_EVALS_EVALUATORS)


Expand Down Expand Up @@ -422,7 +425,7 @@ def _read_evaluation_sample_rate() -> float:
return value


def _get_env(name: str) -> str | None:
def _get_env(name: str) -> Union[str, None]:
import os

return os.environ.get(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import inspect
import logging
from dataclasses import dataclass
from typing import Callable, Dict, Mapping, Sequence
from typing import Callable, Dict, Mapping, Sequence, Union

from opentelemetry.util._importlib_metadata import (
entry_points,
Expand Down Expand Up @@ -45,9 +45,9 @@ class EvaluatorRegistration:
def _call_with_optional_params(
target: EvaluatorFactory,
*,
metrics: Sequence[str] | None = None,
invocation_type: str | None = None,
options: Mapping[str, str] | None = None,
metrics: Union[Sequence[str], None] = None,
invocation_type: Union[str, None] = None,
options: Union[Mapping[str, str], None] = None,
) -> Evaluator:
"""Call a factory/constructor handling optional ``metrics`` gracefully."""

Expand Down Expand Up @@ -169,7 +169,7 @@ def _load_entry_points() -> None:
"Failed to load evaluator entry point '%s': %s", ep.name, exc
)
continue
registration: EvaluatorRegistration | None = None
registration: Union[EvaluatorRegistration, None] = None
if isinstance(target, EvaluatorRegistration):
registration = target
elif hasattr(target, "factory") and hasattr(target, "default_metrics"):
Expand Down Expand Up @@ -204,10 +204,10 @@ def _load_entry_points() -> None:

def get_evaluator(
name: str,
metrics: Sequence[str] | None = None,
metrics: Union[Sequence[str], None] = None,
*,
invocation_type: str | None = None,
options: Mapping[str, str] | None = None,
invocation_type: Union[str, None] = None,
options: Union[Mapping[str, str], None] = None,
) -> Evaluator:
_load_entry_points()
key = name.lower()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def genai_debug_log(*_args: Any, **_kwargs: Any) -> None: # type: ignore
_TRUTHY_VALUES = {"1", "true", "yes", "on"}


def _is_truthy_env(value: str | None) -> bool:
def _is_truthy_env(value: Optional[str]) -> bool:
if value is None:
return False
return value.strip().lower() in _TRUTHY_VALUES
Expand Down
Loading
Loading