Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

import logging
import timeit
from functools import wraps
from typing import Any, AsyncGenerator

Expand Down Expand Up @@ -57,7 +58,13 @@ async def _wrap_streaming_response(
"""Wrap streaming response to update invocation when done."""
try:
last_chunk = None
first_token_received = False
async for chunk in generator:
# Record time when first token is received
if not first_token_received:
first_token_received = True
invocation.monotonic_first_token_s = timeit.default_timer()

last_chunk = chunk
yield chunk

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def _assert_chat_span_attributes(
temperature: float = None,
max_tokens: int = None,
top_p: float = None,
expect_time_to_first_token: bool = False,
):
"""Assert common chat model span attributes"""
# Span name format: "chat {model}"
Expand Down Expand Up @@ -146,6 +147,24 @@ def _assert_chat_span_attributes(
)
assert span.attributes["gen_ai.request.top_p"] == top_p

# Assert time to first token for streaming responses (in nanoseconds)
if expect_time_to_first_token:
assert "gen_ai.response.time_to_first_token" in span.attributes, (
"Missing gen_ai.response.time_to_first_token"
)
ttft_ns = span.attributes["gen_ai.response.time_to_first_token"]
assert isinstance(ttft_ns, int), (
f"time_to_first_token should be an integer (nanoseconds), got {type(ttft_ns)}"
)
assert ttft_ns > 0, (
f"time_to_first_token should be positive, got {ttft_ns}"
)
else:
# For non-streaming responses, TTFT should not be present
assert "gen_ai.response.time_to_first_token" not in span.attributes, (
"gen_ai.response.time_to_first_token should not be present for non-streaming"
)


@pytest.mark.vcr()
def test_model_call_basic(instrument_no_content, span_exporter, request):
Expand Down Expand Up @@ -192,6 +211,7 @@ async def call_model():
request_model="qwen-max",
expect_input_messages=False, # Do not capture content by default
expect_output_messages=False, # Do not capture content by default
expect_time_to_first_token=True,
)

print("✓ Model call (basic) completed successfully")
Expand Down Expand Up @@ -237,6 +257,7 @@ async def call_model():
request_model="qwen-max",
expect_input_messages=False,
expect_output_messages=False,
expect_time_to_first_token=True,
)

print("✓ Model call (with messages) completed successfully")
Expand Down Expand Up @@ -274,6 +295,7 @@ async def test_model_call_async(instrument_no_content, span_exporter, request):
request_model="qwen-max",
expect_input_messages=False,
expect_output_messages=False,
expect_time_to_first_token=True,
)

print("✓ Model call (async) completed successfully")
Expand Down Expand Up @@ -321,6 +343,7 @@ async def call_model():
request_model="qwen-max",
expect_input_messages=False,
expect_output_messages=False,
expect_time_to_first_token=True,
)

print("✓ Model call (streaming) completed successfully")
Expand Down Expand Up @@ -369,6 +392,7 @@ async def call_model():
request_model="qwen-max",
expect_input_messages=False,
expect_output_messages=False,
expect_time_to_first_token=True,
)

print("✓ Model call (with parameters) completed successfully")
Expand Down Expand Up @@ -417,6 +441,7 @@ async def call_model():
request_model="qwen-max",
expect_input_messages=True, # Content capture enabled
expect_output_messages=True, # Content capture enabled
expect_time_to_first_token=False,
)

print("✓ Model call (with content capture) completed successfully")
Expand Down Expand Up @@ -459,6 +484,7 @@ async def call_model():
request_model="qwen-max",
expect_input_messages=False, # Content capture disabled
expect_output_messages=False, # Content capture disabled
expect_time_to_first_token=True,
)

print("✓ Model call (no content capture) completed successfully")
Expand Down Expand Up @@ -505,6 +531,7 @@ async def call_model(content: str):
request_model="qwen-max",
expect_input_messages=False,
expect_output_messages=False,
expect_time_to_first_token=True,
)

print("✓ Model call (multiple sequential) completed successfully")
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ readme = "README.rst"
license = "Apache-2.0"
requires-python = ">=3.9"
authors = [
{ name = "LoongSuite Python Agent Authors", email = "qp467389@alibaba-inc.com" },
{ name = "Minghui Zhang", email = "zmh1625lumian@gmail.com" },
]
classifiers = [
"Development Status :: 4 - Beta",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

import logging
import timeit

from opentelemetry.util.genai.types import Error

Expand Down Expand Up @@ -169,9 +170,15 @@ def _wrap_sync_generator(
"""
last_response = None
accumulated_text = ""
first_token_received = False

try:
for chunk in generator:
# Record time when first token is received
if not first_token_received:
first_token_received = True
invocation.monotonic_first_token_s = timeit.default_timer()

last_response = chunk

# If incremental_output is True, accumulate text from each chunk
Expand Down Expand Up @@ -224,9 +231,15 @@ async def _wrap_async_generator(
"""
last_response = None
accumulated_text = ""
first_token_received = False

try:
async for chunk in generator:
# Record time when first token is received
if not first_token_received:
first_token_received = True
invocation.monotonic_first_token_s = timeit.default_timer()

last_response = chunk

# If incremental_output is True, accumulate text from each chunk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ def wrap_image_synthesis_async_call(
try:
# Create invocation object
invocation = _create_invocation_from_image_synthesis(kwargs, model)
invocation.attributes["gen_ai.request.async"] = True

# Start LLM invocation (creates span)
handler.start_llm(invocation)
Expand Down Expand Up @@ -196,7 +195,6 @@ def wrap_image_synthesis_wait(wrapped, instance, args, kwargs, handler=None):
invocation = _create_invocation_from_image_synthesis({}, "unknown")
# TODO: Add semantic conventions for wait operations
invocation.operation_name = "wait generate_content"
invocation.attributes["gen_ai.request.async"] = True
# Note: response_id will be set from response.output.task_id in _update_invocation_from_image_synthesis_response
# We set task_id here as a fallback
invocation.response_id = task_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

import logging
import timeit

from opentelemetry.util.genai.types import Error

Expand Down Expand Up @@ -111,9 +112,15 @@ def _wrap_multimodal_sync_generator(
"""
last_response = None
accumulated_text = ""
first_token_received = False

try:
for chunk in generator:
# Record time when first token is received
if not first_token_received:
first_token_received = True
invocation.monotonic_first_token_s = timeit.default_timer()

last_response = chunk

# If incremental_output is True, accumulate text
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ def wrap_video_synthesis_async_call(
try:
# Create invocation object
invocation = _create_invocation_from_video_synthesis(kwargs, model)
invocation.attributes["gen_ai.request.async"] = True

# Start LLM invocation (creates span)
handler.start_llm(invocation)
Expand Down Expand Up @@ -191,7 +190,6 @@ def wrap_video_synthesis_wait(wrapped, instance, args, kwargs, handler=None):
# Create invocation object
invocation = _create_invocation_from_video_synthesis({}, "unknown")
invocation.operation_name = "wait generate_content"
invocation.attributes["gen_ai.request.async"] = True
invocation.response_id = task_id

# Start LLM invocation (creates span)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def _extract_task_id(task: Any) -> Optional[str]:
if task_id:
return task_id
except (KeyError, AttributeError) as e:
logger.debug("Failed to extract task_id from task: %s", e)
logger.debug("Failed to extract task_id from task parameter: %s", e)

return None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,10 @@ def _extract_tool_definitions(kwargs: dict) -> list[ToolDefinition]:
if isinstance(plugins, list):
tools = plugins
except (json.JSONDecodeError, TypeError, AttributeError) as e:
# If parsing fails, return empty list
logger.debug(
"Failed to extract tool definitions from plugins: %s", e
"Failed to parse tool definitions from response: %s", e
)
# If parsing fails, return empty list
return tool_definitions

# Convert tool definitions to FunctionToolDefinition objects
Expand Down Expand Up @@ -421,8 +421,8 @@ def _extract_output_messages(response: Any) -> List[OutputMessage]:
)
)
except (KeyError, AttributeError) as e:
logger.debug("Failed to extract output messages from response: %s", e)
# If any attribute access fails, return empty list
logger.debug("Failed to extract output messages from response: %s", e)
return output_messages

return output_messages
Expand Down Expand Up @@ -554,7 +554,8 @@ def _update_invocation_from_response(
invocation.response_model_name = response_model
except (KeyError, AttributeError) as e:
logger.debug(
"Failed to extract response model name from response: %s", e
"Failed to extract response model from Generation response: %s",
e,
)

# Extract request ID (if available)
Expand All @@ -563,7 +564,9 @@ def _update_invocation_from_response(
if request_id:
invocation.response_id = request_id
except (KeyError, AttributeError) as e:
logger.debug("Failed to extract request id from response: %s", e)
logger.debug(
"Failed to extract request_id from Generation response: %s", e
)
except (KeyError, AttributeError) as e:
# If any attribute access fails, silently continue with available data
logger.debug(
Expand Down Expand Up @@ -591,7 +594,10 @@ def _create_accumulated_response(original_response, accumulated_text):
return original_response
except (AttributeError, TypeError) as e:
# If we can't modify, create a wrapper object
logger.debug("Failed to modify output text: %s", e)
logger.debug(
"Failed to modify output.text directly, creating wrapper: %s",
e,
)

# Create wrapper objects with accumulated text
class AccumulatedOutput:
Expand Down Expand Up @@ -622,5 +628,7 @@ def __init__(self, original_response, accumulated_output):
return AccumulatedResponse(original_response, accumulated_output)
except (KeyError, AttributeError) as e:
# If modification fails, return original response
logger.debug("Failed to create accumulated response: %s", e)
logger.debug(
"Failed to create accumulated response, returning original: %s", e
)
return original_response
Loading
Loading