Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/strands/event_loop/event_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ async def _handle_model_execution(
message = recover_message_on_max_tokens_reached(message)

if model_invoke_span:
tracer.end_model_invoke_span(model_invoke_span, message, usage, stop_reason)
tracer.end_model_invoke_span(model_invoke_span, message, usage, metrics, stop_reason)
break # Success! Break out of retry loop

except Exception as e:
Expand Down
26 changes: 21 additions & 5 deletions src/strands/event_loop/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import logging
import time
from typing import Any, AsyncGenerator, AsyncIterable, Optional

from ..models.model import Model
Expand Down Expand Up @@ -267,31 +268,38 @@ def handle_redact_content(event: RedactContentEvent, state: dict[str, Any]) -> N
state["message"]["content"] = [{"text": event["redactAssistantContentMessage"]}]


def extract_usage_metrics(event: MetadataEvent) -> tuple[Usage, Metrics]:
def extract_usage_metrics(event: MetadataEvent, time_to_first_byte_ms: int | None = None) -> tuple[Usage, Metrics]:
"""Extracts usage metrics from the metadata chunk.

Args:
event: metadata.
time_to_first_byte_ms: time to get the first byte from the model in milliseconds

Returns:
The extracted usage metrics and latency.
"""
usage = Usage(**event["usage"])
metrics = Metrics(**event["metrics"])
if time_to_first_byte_ms:
metrics["timeToFirstByteMs"] = time_to_first_byte_ms

return usage, metrics


async def process_stream(chunks: AsyncIterable[StreamEvent]) -> AsyncGenerator[TypedEvent, None]:
async def process_stream(
chunks: AsyncIterable[StreamEvent], start_time: float | None = None
) -> AsyncGenerator[TypedEvent, None]:
"""Processes the response stream from the API, constructing the final message and extracting usage metrics.

Args:
chunks: The chunks of the response stream from the model.
start_time: Time when the model request is initiated

Yields:
The reason for stopping, the constructed message, and the usage metrics.
"""
stop_reason: StopReason = "end_turn"
first_byte_time = None

state: dict[str, Any] = {
"message": {"role": "assistant", "content": []},
Expand All @@ -303,10 +311,14 @@ async def process_stream(chunks: AsyncIterable[StreamEvent]) -> AsyncGenerator[T
state["content"] = state["message"]["content"]

usage: Usage = Usage(inputTokens=0, outputTokens=0, totalTokens=0)
metrics: Metrics = Metrics(latencyMs=0)
metrics: Metrics = Metrics(latencyMs=0, timeToFirstByteMs=0)

async for chunk in chunks:
# Track first byte time when we get first content
if first_byte_time is None and ("contentBlockDelta" in chunk or "contentBlockStart" in chunk):
first_byte_time = time.time()
yield ModelStreamChunkEvent(chunk=chunk)

if "messageStart" in chunk:
state["message"] = handle_message_start(chunk["messageStart"], state["message"])
elif "contentBlockStart" in chunk:
Expand All @@ -319,7 +331,10 @@ async def process_stream(chunks: AsyncIterable[StreamEvent]) -> AsyncGenerator[T
elif "messageStop" in chunk:
stop_reason = handle_message_stop(chunk["messageStop"])
elif "metadata" in chunk:
usage, metrics = extract_usage_metrics(chunk["metadata"])
time_to_first_byte_ms = (
int(1000 * (first_byte_time - start_time)) if (start_time and first_byte_time) else None
)
usage, metrics = extract_usage_metrics(chunk["metadata"], time_to_first_byte_ms)
elif "redactContent" in chunk:
handle_redact_content(chunk["redactContent"], state)

Expand All @@ -346,7 +361,8 @@ async def stream_messages(
logger.debug("model=<%s> | streaming messages", model)

messages = remove_blank_messages_content_text(messages)
start_time = time.time()
chunks = model.stream(messages, tool_specs if tool_specs else None, system_prompt)

async for event in process_stream(chunks):
async for event in process_stream(chunks, start_time):
yield event
7 changes: 6 additions & 1 deletion src/strands/telemetry/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ def update_metrics(self, metrics: Metrics) -> None:
metrics: The metrics data to add to the accumulated totals.
"""
self._metrics_client.event_loop_latency.record(metrics["latencyMs"])
if metrics.get("timeToFirstByteMs") is not None:
self._metrics_client.model_time_to_first_token.record(metrics["timeToFirstByteMs"])
self.accumulated_metrics["latencyMs"] += metrics["latencyMs"]

def get_summary(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -448,7 +450,7 @@ class MetricsClient:
event_loop_output_tokens: Histogram
event_loop_cache_read_input_tokens: Histogram
event_loop_cache_write_input_tokens: Histogram

model_time_to_first_token: Histogram
tool_call_count: Counter
tool_success_count: Counter
tool_error_count: Counter
Expand Down Expand Up @@ -507,3 +509,6 @@ def create_instruments(self) -> None:
self.event_loop_cache_write_input_tokens = self.meter.create_histogram(
name=constants.STRANDS_EVENT_LOOP_CACHE_WRITE_INPUT_TOKENS, unit="token"
)
self.model_time_to_first_token = self.meter.create_histogram(
name=constants.STRANDS_MODEL_TIME_TO_FIRST_TOKEN, unit="ms"
)
1 change: 1 addition & 0 deletions src/strands/telemetry/metrics_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
STRANDS_EVENT_LOOP_OUTPUT_TOKENS = "strands.event_loop.output.tokens"
STRANDS_EVENT_LOOP_CACHE_READ_INPUT_TOKENS = "strands.event_loop.cache_read.input.tokens"
STRANDS_EVENT_LOOP_CACHE_WRITE_INPUT_TOKENS = "strands.event_loop.cache_write.input.tokens"
STRANDS_MODEL_TIME_TO_FIRST_TOKEN = "strands.model.time_to_first_token"
93 changes: 81 additions & 12 deletions src/strands/telemetry/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from ..agent.agent_result import AgentResult
from ..types.content import ContentBlock, Message, Messages
from ..types.streaming import StopReason, Usage
from ..types.streaming import Metrics, StopReason, Usage
from ..types.tools import ToolResult, ToolUse
from ..types.traces import Attributes, AttributeValue

Expand Down Expand Up @@ -153,6 +153,28 @@ def _set_attributes(self, span: Span, attributes: Dict[str, AttributeValue]) ->
for key, value in attributes.items():
span.set_attribute(key, value)

def _add_optional_usage_and_metrics_attributes(
self, attributes: Dict[str, AttributeValue], usage: Usage, metrics: Metrics
) -> None:
"""Add optional usage and metrics attributes if they have values.

Args:
attributes: Dictionary to add attributes to
usage: Token usage information from the model call
metrics: Metrics from the model call
"""
if "cacheReadInputTokens" in usage:
attributes["gen_ai.usage.cache_read_input_tokens"] = usage["cacheReadInputTokens"]

if "cacheWriteInputTokens" in usage:
attributes["gen_ai.usage.cache_write_input_tokens"] = usage["cacheWriteInputTokens"]

if metrics.get("timeToFirstByteMs", 0) > 0:
attributes["gen_ai.server.time_to_first_token"] = metrics["timeToFirstByteMs"]

if metrics.get("latencyMs", 0) > 0:
attributes["gen_ai.server.request.duration"] = metrics["latencyMs"]

def _end_span(
self,
span: Span,
Expand Down Expand Up @@ -277,14 +299,21 @@ def start_model_invoke_span(
return span

def end_model_invoke_span(
self, span: Span, message: Message, usage: Usage, stop_reason: StopReason, error: Optional[Exception] = None
self,
span: Span,
message: Message,
usage: Usage,
metrics: Metrics,
stop_reason: StopReason,
error: Optional[Exception] = None,
) -> None:
"""End a model invocation span with results and metrics.

Args:
span: The span to end.
message: The message response from the model.
usage: Token usage information from the model call.
metrics: Metrics from the model call.
stop_reason (StopReason): The reason the model stopped generating.
error: Optional exception if the model call failed.
"""
Expand All @@ -294,10 +323,11 @@ def end_model_invoke_span(
"gen_ai.usage.completion_tokens": usage["outputTokens"],
"gen_ai.usage.output_tokens": usage["outputTokens"],
"gen_ai.usage.total_tokens": usage["totalTokens"],
"gen_ai.usage.cache_read_input_tokens": usage.get("cacheReadInputTokens", 0),
"gen_ai.usage.cache_write_input_tokens": usage.get("cacheWriteInputTokens", 0),
}

# Add optional attributes if they have values
self._add_optional_usage_and_metrics_attributes(attributes, usage, metrics)

if self.use_latest_genai_conventions:
self._add_event(
span,
Expand All @@ -307,7 +337,7 @@ def end_model_invoke_span(
[
{
"role": message["role"],
"parts": [{"type": "text", "content": message["content"]}],
"parts": self._map_content_blocks_to_otel_parts(message["content"]),
"finish_reason": str(stop_reason),
}
]
Expand Down Expand Up @@ -362,7 +392,7 @@ def start_tool_call_span(self, tool: ToolUse, parent_span: Optional[Span] = None
"type": "tool_call",
"name": tool["name"],
"id": tool["toolUseId"],
"arguments": [{"content": tool["input"]}],
"arguments": tool["input"],
}
],
}
Expand Down Expand Up @@ -417,7 +447,7 @@ def end_tool_call_span(
{
"type": "tool_call_response",
"id": tool_result.get("toolUseId", ""),
"result": tool_result.get("content"),
"response": tool_result.get("content"),
}
],
}
Expand Down Expand Up @@ -504,7 +534,7 @@ def end_event_loop_cycle_span(
[
{
"role": tool_result_message["role"],
"parts": [{"type": "text", "content": tool_result_message["content"]}],
"parts": self._map_content_blocks_to_otel_parts(tool_result_message["content"]),
}
]
)
Expand Down Expand Up @@ -634,19 +664,23 @@ def start_multiagent_span(
)

span = self._start_span(operation, attributes=attributes, span_kind=trace_api.SpanKind.CLIENT)
content = serialize(task) if isinstance(task, list) else task

if self.use_latest_genai_conventions:
parts: list[dict[str, Any]] = []
if isinstance(task, list):
parts = self._map_content_blocks_to_otel_parts(task)
else:
parts = [{"type": "text", "content": task}]
self._add_event(
span,
"gen_ai.client.inference.operation.details",
{"gen_ai.input.messages": serialize([{"role": "user", "parts": [{"type": "text", "content": task}]}])},
{"gen_ai.input.messages": serialize([{"role": "user", "parts": parts}])},
)
else:
self._add_event(
span,
"gen_ai.user.message",
event_attributes={"content": content},
event_attributes={"content": serialize(task) if isinstance(task, list) else task},
)

return span
Expand Down Expand Up @@ -718,7 +752,7 @@ def _add_event_messages(self, span: Span, messages: Messages) -> None:
input_messages: list = []
for message in messages:
input_messages.append(
{"role": message["role"], "parts": [{"type": "text", "content": message["content"]}]}
{"role": message["role"], "parts": self._map_content_blocks_to_otel_parts(message["content"])}
)
self._add_event(
span, "gen_ai.client.inference.operation.details", {"gen_ai.input.messages": serialize(input_messages)}
Expand All @@ -731,6 +765,41 @@ def _add_event_messages(self, span: Span, messages: Messages) -> None:
{"content": serialize(message["content"])},
)

def _map_content_blocks_to_otel_parts(self, content_blocks: list[ContentBlock]) -> list[dict[str, Any]]:
"""Map ContentBlock objects to OpenTelemetry parts format."""
parts: list[dict[str, Any]] = []

for block in content_blocks:
if "text" in block:
# Standard TextPart
parts.append({"type": "text", "content": block["text"]})
elif "toolUse" in block:
# Standard ToolCallRequestPart
tool_use = block["toolUse"]
parts.append(
{
"type": "tool_call",
"name": tool_use["name"],
"id": tool_use["toolUseId"],
"arguments": tool_use["input"],
}
)
elif "toolResult" in block:
# Standard ToolCallResponsePart
tool_result = block["toolResult"]
parts.append(
{
"type": "tool_call_response",
"id": tool_result["toolUseId"],
"response": tool_result["content"],
}
)
else:
# For all other ContentBlock types, use the key as type and value as content
for key, value in block.items():
parts.append({"type": key, "content": value})
return parts


# Singleton instance for global access
_tracer_instance = None
Expand Down
7 changes: 5 additions & 2 deletions src/strands/types/event_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@ class Usage(TypedDict, total=False):
cacheWriteInputTokens: int


class Metrics(TypedDict):
class Metrics(TypedDict, total=False):
"""Performance metrics for model interactions.
Attributes:
latencyMs (int): Latency of the model request in milliseconds.
timeToFirstByteMs (int): Latency from sending model request to first
content chunk (contentBlockDelta or contentBlockStart) from the model in milliseconds.
"""

latencyMs: int
latencyMs: Required[int]
timeToFirstByteMs: int


StopReason = Literal[
Expand Down
4 changes: 2 additions & 2 deletions tests/strands/event_loop/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ def test_extract_usage_metrics_with_cache_tokens():
"content": [],
},
{"inputTokens": 0, "outputTokens": 0, "totalTokens": 0},
{"latencyMs": 0},
{"latencyMs": 0, "timeToFirstByteMs": 0},
),
},
],
Expand Down Expand Up @@ -781,7 +781,7 @@ async def test_stream_messages(agenerator, alist):
"end_turn",
{"role": "assistant", "content": [{"text": "test"}]},
{"inputTokens": 0, "outputTokens": 0, "totalTokens": 0},
{"latencyMs": 0},
{"latencyMs": 0, "timeToFirstByteMs": 0},
)
},
]
Expand Down
Loading