Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions sdk/agenta/sdk/decorators/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from typing import Callable, Optional, Any, Dict, List, Union

import warnings

from opentelemetry import context as otel_context
from opentelemetry.context import attach, detach

Expand Down Expand Up @@ -34,6 +36,37 @@

log = get_module_logger(__name__)

_PREINIT_INSTRUMENTATION_WARNING_EMITTED = False
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The global variable _PREINIT_INSTRUMENTATION_WARNING_EMITTED is not thread-safe. In a multi-threaded environment, multiple threads could simultaneously check if the warning has been emitted (line 56), both see False, and both proceed to emit the warning. This could result in multiple warnings being emitted instead of just one. Consider using a threading lock or thread-local storage to ensure thread-safety.

Copilot uses AI. Check for mistakes.


def _is_tracing_initialized() -> bool:
singleton = getattr(ag, "DEFAULT_AGENTA_SINGLETON_INSTANCE", None)
tracing = getattr(singleton, "tracing", None) if singleton is not None else None

return bool(
tracing is not None
and getattr(tracing, "tracer_provider", None) is not None
and getattr(tracing, "tracer", None) is not None
)


def _warn_instrumentation_before_init_once(handler_name: str) -> None:
global _PREINIT_INSTRUMENTATION_WARNING_EMITTED # pylint: disable=global-statement

if _PREINIT_INSTRUMENTATION_WARNING_EMITTED:
return

_PREINIT_INSTRUMENTATION_WARNING_EMITTED = True

message = (
"Agenta SDK warning: an instrumented function was called before `ag.init()`.\n"
f"- Function: {handler_name}\n"
"- Impact: this call will run without Agenta tracing/export.\n"
"- Fix: call `ag.init()` once at startup (before invoking any `@ag.instrument()` / `@ag.workflow` code)."
)

warnings.warn(message, RuntimeWarning, stacklevel=4)


def _has_instrument(handler: Callable[..., Any]) -> bool:
return bool(getattr(handler, "__has_instrument__", False))
Expand Down Expand Up @@ -80,12 +113,18 @@ def __call__(self, handler: Callable[..., Any]):
is_sync_generator = isgeneratorfunction(handler)
is_async_generator = isasyncgenfunction(handler)

handler_name = f"{getattr(handler, '__module__', '<unknown>')}.{getattr(handler, '__qualname__', getattr(handler, '__name__', '<unknown>'))}"
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line exceeds typical line length conventions (appears to be ~140+ characters). Consider breaking it into multiple lines for better readability. For example, compute qualname separately first, then construct the full handler_name.

Suggested change
handler_name = f"{getattr(handler, '__module__', '<unknown>')}.{getattr(handler, '__qualname__', getattr(handler, '__name__', '<unknown>'))}"
qualname = getattr(handler, '__qualname__', getattr(handler, '__name__', '<unknown>'))
handler_name = f"{getattr(handler, '__module__', '<unknown>')}.{qualname}"

Copilot uses AI. Check for mistakes.

# ---- ASYNC GENERATOR ----
if is_async_generator:

@wraps(handler)
def astream_wrapper(*args, **kwargs):
with tracing_context_manager(context=TracingContext.get()):
if not _is_tracing_initialized():
_warn_instrumentation_before_init_once(handler_name)
return handler(*args, **kwargs)

# debug_otel_context("[BEFORE STREAM] [BEFORE SETUP]")

captured_ctx = otel_context.get_current()
Expand Down Expand Up @@ -154,6 +193,10 @@ async def wrapped_generator():
@wraps(handler)
def stream_wrapper(*args, **kwargs):
with tracing_context_manager(context=TracingContext.get()):
if not _is_tracing_initialized():
_warn_instrumentation_before_init_once(handler_name)
return handler(*args, **kwargs)

self._parse_type_and_kind()

token = self._attach_baggage()
Expand Down Expand Up @@ -217,6 +260,10 @@ def wrapped_generator():
@wraps(handler)
async def awrapper(*args, **kwargs):
with tracing_context_manager(context=TracingContext.get()):
if not _is_tracing_initialized():
_warn_instrumentation_before_init_once(handler_name)
return await handler(*args, **kwargs)

self._parse_type_and_kind()

token = self._attach_baggage()
Expand Down Expand Up @@ -250,6 +297,10 @@ async def awrapper(*args, **kwargs):
@wraps(handler)
def wrapper(*args, **kwargs):
with tracing_context_manager(context=TracingContext.get()):
if not _is_tracing_initialized():
_warn_instrumentation_before_init_once(handler_name)
return handler(*args, **kwargs)

self._parse_type_and_kind()

token = self._attach_baggage()
Expand Down
103 changes: 102 additions & 1 deletion sdk/tests/unit/test_tracing_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@

import pytest
import asyncio
from unittest.mock import Mock, MagicMock, patch
from types import SimpleNamespace
from unittest.mock import Mock, patch

from agenta.sdk.decorators.tracing import instrument
import agenta.sdk.decorators.tracing as tracing_decorators


class TestExistingFunctionality:
Expand Down Expand Up @@ -680,3 +682,102 @@ def parameterized_generator(prompt):

# Verify span was set to OK status
self.mock_span.set_status.assert_called_with("OK")


class TestPreInitInstrumentationWarnings:
def setup_method(self):
tracing_decorators._PREINIT_INSTRUMENTATION_WARNING_EMITTED = (
False # pylint: disable=protected-access
)

@patch(
"agenta.sdk.decorators.tracing.ag",
new=SimpleNamespace(
DEFAULT_AGENTA_SINGLETON_INSTANCE=SimpleNamespace(tracing=None),
),
)
def test_sync_function_warns_once_and_still_executes(self, recwarn):
@instrument()
def add(x, y):
return x + y

assert add(1, 2) == 3
assert add(2, 3) == 5

runtime_warnings = [
w for w in recwarn if issubclass(w.category, RuntimeWarning)
]
assert len(runtime_warnings) == 1
message = str(runtime_warnings[0].message)
assert "called before `ag.init()`" in message
assert "Fix: call `ag.init()`" in message

@pytest.mark.asyncio
@patch(
"agenta.sdk.decorators.tracing.ag",
new=SimpleNamespace(
DEFAULT_AGENTA_SINGLETON_INSTANCE=SimpleNamespace(tracing=None),
),
)
async def test_async_function_warns_once_and_still_executes(self, recwarn):
@instrument()
async def mul(x, y):
await asyncio.sleep(0.001)
return x * y

assert await mul(2, 3) == 6
assert await mul(3, 4) == 12

runtime_warnings = [
w for w in recwarn if issubclass(w.category, RuntimeWarning)
]
assert len(runtime_warnings) == 1
Comment on lines +722 to +734
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test validates the number of warnings but doesn't check the warning message content like the sync function test does (lines 711-713). Consider adding similar assertions to verify the warning message contains "called before ag.init()" and "Fix: call ag.init()" for consistency and completeness.

Copilot uses AI. Check for mistakes.

@patch(
"agenta.sdk.decorators.tracing.ag",
new=SimpleNamespace(
DEFAULT_AGENTA_SINGLETON_INSTANCE=SimpleNamespace(tracing=None),
),
)
def test_sync_generator_warns_once_and_still_executes(self, recwarn):
@instrument()
def gen():
yield "a"
yield "b"

assert list(gen()) == ["a", "b"]
assert list(gen()) == ["a", "b"]

runtime_warnings = [
w for w in recwarn if issubclass(w.category, RuntimeWarning)
]
assert len(runtime_warnings) == 1

@pytest.mark.asyncio
@patch(
"agenta.sdk.decorators.tracing.ag",
new=SimpleNamespace(
DEFAULT_AGENTA_SINGLETON_INSTANCE=SimpleNamespace(tracing=None),
),
)
async def test_async_generator_warns_once_and_still_executes(self, recwarn):
@instrument()
async def gen():
yield "a"
await asyncio.sleep(0.001)
yield "b"

first = []
async for item in gen():
first.append(item)
assert first == ["a", "b"]

second = []
async for item in gen():
second.append(item)
assert second == ["a", "b"]

runtime_warnings = [
w for w in recwarn if issubclass(w.category, RuntimeWarning)
]
assert len(runtime_warnings) == 1
Comment on lines +763 to +783
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test validates the number of warnings but doesn't check the warning message content like the sync function test does (lines 711-713). Consider adding similar assertions to verify the warning message contains "called before ag.init()" and "Fix: call ag.init()" for consistency and completeness.

Copilot uses AI. Check for mistakes.